import pyodide
from io import StringIO
import csv
import influxdb_client
# Pyodide doesn't support multiprocessing, create a dummy object so imports
# don't throw exceptions
import sys
sys.modules['_multiprocessing'] = object
# Get bits from the Influx DB client
from influxdb_client.client.flux_csv_parser import FluxResponseMetadataMode, FluxCsvParser, FluxSerializationMode, _FluxCsvParserMetadata
import asyncio
# We're going to overlay some queries into the class
async def new_query(self, query: str, org=None, params: dict = None):
# We'll come back to this
#q = self._create_query(query, self.default_dialect, params)
#print(q)
headers = {
"Authorization" : "Token " + self._influxdb_client.token,
"content-type" : "application/vnd.flux"
}
url = self._influxdb_client.url + "api/v2/query?org=" + self._influxdb_client.org
a = await pyodide.http.pyfetch(url=url,
method="POST",
headers=headers,
body=query
)
response = await a.string()
# The client would call _parser.generator here, but that calls _parse_flux_response() which'll treat
# us as a readable object, so we implement around itt
_parser = self._to_tables_parser(response, self._get_query_options(), FluxResponseMetadataMode.only_names)
list(self.generator(self, response, _parser))
return _parser.table_list()
# Replacement for
# https://github.com/influxdata/influxdb-client-python/blob/[5168a04](/commits/jira-projects/MISC/5168a04983e3b70a9451fa3629fd54db08b91ecd.html)/influxdb_client/client/flux_csv_parser.py#L105
def generator(self, csv, _parser):
for val in self._parse_flux_response(self, csv, _parser):
yield val
# Replaces line 115 in the same file
def _parse_flux_response(self, csv_str, _parser):
metadata = _FluxCsvParserMetadata()
f = StringIO(csv_str)
rows = csv.reader(f)
for row in rows:
for val in _parser._parse_flux_response_row(metadata, row):
print(val)
yield val
def tableise(results):
# Take FluxTable and print a HTML table
id = 0
for table in results:
records = []
cols = [x.label for x in table.columns]
for record in table.records:
r = []
for column in table.columns:
# Fetch each of the columns
r.append(record[column.label])
records.append(r)
insert_output_table(table.get_group_key(), cols, records, id)
id += 1
def insert_output_table(group_key, cols, records, id):
# Construct and insert the HTML table
d = document.createElement('div')
s = document.createElement('span')
s.innerHTML = f"Key: {group_key}"
d.appendChild(s)
t = document.createElement("table")
t.className = "sortable"
t.id = f"tbl{id}"
hr = document.createElement("tr")
for column in cols:
th = document.createElement("th")
th.innerHTML = column
hr.appendChild(th)
t.appendChild(hr)
print("Doing rows " + str(len(records)))
# Now do the data-rows
for record in records:
tr = document.createElement("tr")
for col in record:
td = document.createElement("td")
td.innerHTML = col
tr.appendChild(td)
t.appendChild(tr)
# Append the table to the div
d.appendChild(t)
# Push it all into the page
Element("output").element.appendChild(d)
async def main():
# Get creds etc from the DOM
#
# pyscript's "Element" function gets us a javascript DOM element
org = Element("org").element.value
token = Element("token").element.value
url = Element("url").element.value
query = Element("query").element.value
# Set up the client
client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
query_api = client.query_api()
# Overlay our functions into the object
query_api.query = new_query
query_api.generator = generator
query_api._parse_flux_response = _parse_flux_response
# Run the query
r = await query_api.query(query_api,query)
# Iterate through the result object
results = []
for table in r:
for record in table.records:
results.append((record.get_field(), record.get_value()))
# Turn into a HTML table
tableise(r)
# Prettify the tables
Element("prettify").element.click()
# Put the button text back
Element("submit").element.innerHTML = "Run"
def start_loop(event):
# Trigger the async loop
Element("submit").element.innerHTML = "running"
loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)
# Enable the button
submit = document.getElementById("submit")
submit.addEventListener("click", pyodide.create_proxy(start_loop))
submit.innerHTML = "Run"