Skip to content

Commit

Permalink
GRAMEX-83 ⁃ Jd InfluxDB (#445)
Browse files Browse the repository at this point in the history
* ENH: Add MongoDB insert

* ENH: Update (PUT) implemented

* Add JSON path keys and JSON values via .=

* Initial InfluxDB read and write

* Allow better querying for InfluxDB

* Clear differentiation between fields and tags

* Update is the same as insert

* Delete API

* Coerce all field values into floats

* InfluxDB timestamps must be TZ aware

Co-authored-by: S Anand <[email protected]>
  • Loading branch information
jaidevd and sanand0 authored Sep 24, 2021
1 parent 725f756 commit 146b436
Showing 1 changed file with 173 additions and 5 deletions.
178 changes: 173 additions & 5 deletions gramex/data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'''
Interact with data from the browser
'''
from datetime import datetime
import io
import os
import re
Expand Down Expand Up @@ -1498,14 +1499,181 @@ def _insert_mongodb(url, rows, meta=None, database=None, collection=None, **kwar
return len(result.inserted_ids)


def _get_influxdb_schema(client, bucket):
imports = 'import "influxdata/influxdb/schema"\n'
meas = client.query_api().query(
imports + f'schema.measurements(bucket: "{bucket}")'
)[0]
tags = client.query_api().query(imports + f'schema.tagKeys(bucket: "{bucket}")')[0]
tags = [r.get_value() for r in tags.records]
tags = [r for r in tags if not r.startswith("_")]
fields = client.query_api().query(
imports + f'schema.fieldKeys(bucket: "{bucket}")'
)[0]
return {
"_measurement": [r.get_value() for r in meas.records],
"_tags": tags,
"_fields": [r.get_value() for r in fields.records]
}


_influxdb_op_map = {"<~": "<=", ">~": ">=", "!": "!=", "": "=="}


def _influxdb_offset_limit(controls):
offset = controls.pop("_offset", ["-30d"])[0]
limit = controls.pop("_limit", [False])[0]
offset = f"start: {offset}"
if isinstance(limit, str) and not limit.isdigit():
offset += f", stop: {limit}"
limit = False

return offset, limit


def _filter_influxdb(url, controls, args, org=None, bucket=None, query=None, **kwargs):
args.pop("bucket")
with _influxdb_client(url, org=org, **kwargs) as db:
schema = _get_influxdb_schema(db, bucket)
cols = schema["_fields"] + schema["_tags"] + schema["_measurement"]
q = db.query_api()
offset, limit = _influxdb_offset_limit(controls)
query = f'from(bucket: "{bucket}")|>range({offset})\n'

filters = []
wheres = []
to_drop = []
for col in controls.pop("_c", []):
if col.startswith("-"):
if col[1:] in schema["_fields"]:
wheres.append(f'r._field != "{col[1:]}"')
else:
to_drop.append(col[1:])
elif col in schema["_fields"]:
col, agg, op = _filter_col(col, cols)
op = _influxdb_op_map.get(op, op)
wheres.append(f'r._field {op} "{col}"')
if len(wheres):
wheres = " or ".join(wheres)
filters.append(f"|> filter(fn: (r) => {wheres})")
for key, vals in args.items():
col, agg, op = _filter_col(key, cols)
op = _influxdb_op_map.get(op, op)
if col in schema["_fields"]:
where = " or ".join([f'r._field == "{col}" and r._value {op} {v}' for v in vals])
else:
where = " or ".join([f'r["{col}"] {op} "{v}"' for v in vals])
filters.append(f"|> filter(fn: (r) => {where})")
query += "\n".join(filters)
if to_drop:
to_drop = ",".join([f'"{k}"' for k in to_drop])
query += f"\n|> drop(columns: [{to_drop}])"

if limit:
query += f"|> tail(n: {limit})\n"

app_log.debug("Running InfluxDB query: \n" + query)

df = q.query_data_frame(query)
return df.drop(["result", "table"], axis=1, errors="ignore")


def _delete_influxdb(url, controls, args, org=None, bucket=None, **kwargs):
with _influxdb_client(url, org=org, **kwargs) as db:
schema = _get_influxdb_schema(db, bucket)
start = args.pop('_time>', ['0'])[0]
stop = args.pop('_time<', ['99999999999'])[0]
measurement = args.pop('_measurement', schema['_measurement'])[0]
predicate = f'_measurement="{measurement}"'
tags = [(k, v[0]) for k, v in args.items() if k in schema['_tags']]
if tags:
tag_predicate = " OR ".join([f'{k}="{v}"' for k, v in tags])
predicate += ' AND ' + f'({tag_predicate})'
db.delete_api().delete(start, stop, predicate, bucket, org)
# InfluxDB Python client doesn't return anything on DELETE, we return a mock number here.
return 0


def _influxdb_client(url, token, org, debug=None, timeout=60_000, enable_gzip=False,
default_tags=None, **kwargs):
from influxdb_client import InfluxDBClient

url = re.sub(r"^influxdb:", "", url)
return InfluxDBClient(
url, token, org=org, debug=debug, enable_gzip=enable_gzip, default_tags=default_tags,
timeout=timeout
)


def _timestamp_df(df, index_col="_time"):
from tzlocal import get_localzone
now = datetime.now(get_localzone())
if index_col not in df:
df[index_col] = [now] * len(df)
else:
df[index_col] = pd.to_datetime(df[index_col], errors="coerce")
df[index_col].fillna(value=now, inplace=True)
return df.set_index(index_col)


def _get_ts_points(df, measurement, tags):
from influxdb_client import Point

points = []
tags = df[tags]
fields = df.drop(tags, axis=1)
for (t, field), (_, tag) in zip(fields.astype(float).iterrows(), tags.iterrows()):
p = Point(measurement).time(t)
[p.tag(t, tval) for t, tval in tag.to_dict().items()]
[p.field(f, fval) for f, fval in field.to_dict().items()]
points.append(p)
return points


def _insert_influxdb(url, rows, meta, args, bucket, **kwargs):
measurement = rows.pop("measurement").unique()[0]
tags = (
rows.pop("tags").dropna().drop_duplicates().tolist() if "tags" in rows else []
)
# Ensure that the index is timestamped
rows = _timestamp_df(rows)

# Ensure that all columns except tags are floats
field_columns = [c for c in rows if c not in tags]
rows[field_columns] = rows[field_columns].astype(float)
from influxdb_client.client.write_api import ASYNCHRONOUS, WriteOptions

with _influxdb_client(url, **kwargs) as db:
with db.write_api(
write_options=WriteOptions(
ASYNCHRONOUS, batch_size=50_000, flush_interval=10_000
)
) as client:
client.write(
bucket=bucket,
org=db.org,
record=rows,
data_frame_measurement_name=measurement,
data_frame_tag_columns=tags,
)
meta["inserted"] = [{"id": ix} for ix, _ in rows.iterrows()]
return len(rows)


# add test case for inserting nested value ?parent.={child:value}
# curl --globoff -I -X POST 'http://127.0.0.1:9988/?x.={"2":3}&y.={"true":true}&Name=abcd'
# add test case for updating nested value ?parent.child.={key:value}
# curl --globoff -I -X PUT 'http://127.0.0.1:9988/?x.2=4&y.true.=[2,3]&Name=abcd'
# add test case for nested document query ?parent.child=value
plugins['mongodb'] = {
'filter': _filter_mongodb,
'delete': _delete_mongodb,
'insert': _insert_mongodb,
'update': _update_mongodb,
plugins["mongodb"] = {
"filter": _filter_mongodb,
"delete": _delete_mongodb,
"insert": _insert_mongodb,
"update": _update_mongodb,
}
plugins["influxdb"] = {
"filter": _filter_influxdb,
"delete": _delete_influxdb,
"insert": _insert_influxdb,
"update": _insert_influxdb,
}

0 comments on commit 146b436

Please sign in to comment.