Skip to content

Commit

Permalink
API: Add batch route, which allows users to send a list of requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ummer Taahir authored and Ummer Taahir committed Jun 27, 2024
1 parent b3347ef commit ea30660
Show file tree
Hide file tree
Showing 7 changed files with 713 additions and 63 deletions.
1 change: 1 addition & 0 deletions src/api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
circular_average,
circular_standard_deviation,
summary,
batch
)
from src.api.auth.azuread import oauth2_scheme

Expand Down
143 changes: 143 additions & 0 deletions src/api/v1/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import numpy as np
from fastapi import HTTPException, Depends, Body # , JSONResponse

from src.api.v1.models import (
BaseQueryParams,
BaseHeaders,
BatchBodyParams,
BatchResponse,
LimitOffsetQueryParams,
HTTPError,
)
from src.api.auth.azuread import oauth2_scheme
from src.api.v1.common import (
common_api_setup_tasks,
json_response_batch,
lookup_before_get,
)
from src.api.FastAPIApp import api_v1_router
from src.api.v1.common import lookup_before_get
from concurrent.futures import *


ROUTE_FUNCTION_MAPPING = {
"/api/v1/events/raw": "raw",
"/api/v1/events/latest": "latest",
"/api/v1/events/resample": "resample",
"/api/v1/events/plot": "plot",
"/api/v1/events/interpolate": "interpolate",
"/api/v1/events/interpolationattime": "interpolationattime",
"/api/v1/events/circularaverage": "circularaverage",
"/api/v1/events/circularstandarddeviation": "circularstandarddeviation",
"/api/v1/events/timeweightedaverage": "timeweightedaverage",
"/api/v1/events/summary": "summary",
"/api/v1/events/metadata": "metadata",
"/api/v1/sql/execute": "execute",
}


async def batch_events_get(
base_query_parameters, base_headers, batch_query_parameters, limit_offset_parameters
):

try:

(connection, parameters) = common_api_setup_tasks(
base_query_parameters=base_query_parameters,
base_headers=base_headers,
)

# Validate the parameters
parsed_requests = []
for request in batch_query_parameters.requests:

# If required, combine request body and parameters:
parameters = request["params"]
if request["method"] == "POST":
if request["body"] == None:
raise Exception(
"Incorrectly formatted request provided: All POST requests require a body"
)
parameters = {**parameters, **request["body"]}

# Map the url to a specific function
try:
func = ROUTE_FUNCTION_MAPPING[request["url"]]
except:
raise Exception(
"Unsupported url: Only relative base urls are supported. Please provide any parameters in the params key"
)

# Rename tag_name to tag_names, if required
if "tag_name" in parameters.keys():
parameters["tag_names"] = parameters.pop("tag_name")

# Append to array
parsed_requests.append({"func": func, "parameters": parameters})

# Request the data for each concurrently with threadpool
with ThreadPoolExecutor(max_workers=10) as executor:
# Use executor.map to preserve order
results = executor.map(
lambda arguments: lookup_before_get(*arguments),
[
(parsed_request["func"], connection, parsed_request["parameters"])
for parsed_request in parsed_requests
],
)

return json_response_batch(results)

except Exception as e:
print(e)
logging.error(str(e))
raise HTTPException(status_code=400, detail=str(e))


post_description = """
## Batch
Retrieval of timeseries data via a POST method to enable providing a list of requests including the route and parameters
"""


@api_v1_router.post(
path="/events/batch",
name="Batch POST",
description=post_description,
tags=["Events"],
dependencies=[Depends(oauth2_scheme)],
responses={200: {"model": BatchResponse}, 400: {"model": HTTPError}},
openapi_extra={
"externalDocs": {
"description": "RTDIP Batch Query Documentation",
"url": "https://www.rtdip.io/sdk/code-reference/query/functions/time_series/batch/",
}
},
)
async def batch_post(
base_query_parameters: BaseQueryParams = Depends(),
batch_query_parameters: BatchBodyParams = Body(default=...),
base_headers: BaseHeaders = Depends(),
limit_offset_query_parameters: LimitOffsetQueryParams = Depends(),
):
return await batch_events_get(
base_query_parameters,
base_headers,
batch_query_parameters,
limit_offset_query_parameters,
)
106 changes: 74 additions & 32 deletions src/api/v1/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pandas as pd

from fastapi import Response
from fastapi.responses import JSONResponse
from pandas import DataFrame
from pandas.io.json import build_table_schema
from src.sdk.python.rtdip_sdk.connectors import DatabricksSQLConnection
Expand All @@ -30,6 +31,7 @@
from src.sdk.python.rtdip_sdk.connectors import TURBODBCSQLConnection
from src.api.auth import azuread
from .models import BaseHeaders, FieldSchema, LimitOffsetQueryParams, PaginationRow
from decimal import Decimal


def common_api_setup_tasks( # NOSONAR
Expand Down Expand Up @@ -183,32 +185,67 @@ def json_response(
media_type="application/json",
)

def lookup_before_get(func_name: str, connection: DatabricksSQLConnection, parameters: Dict):

def json_response_batch(data_list: List[DataFrame]) -> Response:

# Function to parse dataframe into dictionary along with schema
def get_as_dict(data):

def convert_value(x):
if isinstance(x, pd.Timestamp):
return x.isoformat(timespec="nanoseconds")
elif isinstance(x, pd.Timedelta):
return x.isoformat()
elif isinstance(x, Decimal):
return float(x)
return x

print("before this?")
data_parsed = data.map(convert_value)
schema = build_table_schema(data_parsed, index=False, primary_key=False)
data_dict = data_parsed.to_dict(orient="records")

return {"schema": schema, "data": data_dict}

# Parse each dataframe into a dictionary containing the schema and the data as dict
dict_content = {"data": [get_as_dict(data) for data in data_list]}

print("================")
print(dict_content)
print("================")

return JSONResponse(content=dict_content)


def lookup_before_get(
func_name: str, connection: DatabricksSQLConnection, parameters: Dict
):

# query mapping endpoint for tablenames - returns tags as array under each table key
tag_table_mapping = query_mapping_endpoint(
tags = parameters["tag_names"],
mapping_endpoint = os.getenv("DATABRICKS_SERVING_ENDPOINT"),
connection=connection
)
tags=parameters["tag_names"],
mapping_endpoint=os.getenv("DATABRICKS_SERVING_ENDPOINT"),
connection=connection,
)

# create list of parameter dicts for each table
request_list = []
for table in tag_table_mapping:
params = parameters.copy()
params["tag_names"] = tag_table_mapping[table]
params.update(split_table_name(table)) # Adds business_unit, asset, data_security_level, data_type
request = {
"type": func_name,
"parameters_dict": params
}
params.update(
split_table_name(table)
) # Adds business_unit, asset, data_security_level, data_type
request = {"type": func_name, "parameters_dict": params}
request_list.append(request)

# run function with each parameters concurrently
results = batch.get(connection, request_list)

# Append/concat results as required
data = concatenate_dfs_and_order(dfs_arr=results, pivot=False, tags=parameters["tag_names"])
data = concatenate_dfs_and_order(
dfs_arr=results, pivot=False, tags=parameters["tag_names"]
)

return data

Expand All @@ -217,84 +254,89 @@ def query_mapping_endpoint(tags: list, mapping_endpoint: str, connection: Dict):

# Form header dict with token from connection
token = swap_for_databricks_token(connection.access_token)
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

# Create body of request
data = {
"dataframe_records": [ {"TagName": tag} for tag in tags]
}
data = {"dataframe_records": [{"TagName": tag} for tag in tags]}
data_json = json.dumps(data, allow_nan=True)

# Make request to mapping endpoint
response = requests.post(headers=headers, url=mapping_endpoint, data=data_json)
if response.status_code != 200:
raise Exception(f'Request failed with status {response.status_code}, {response.text}')
raise Exception(
f"Request failed with status {response.status_code}, {response.text}"
)
result = response.json()

# Map tags to tables, where all tags belonging to each table are stored in an array
tag_table_mapping = {}
for row in result["outputs"]:
# Check results are returned
if any(row[x] == None for x in ["CatalogName", "SchemaName", "DataTable"]):
raise Exception(f"One or more tags do not have tables associated with them, the data belongs to a confidential table, or you do not have access. If the tag belongs to a confidential table and you do have access, please supply the business_unit, asset, data_security_level and data_type")
raise Exception(
f"One or more tags do not have tables associated with them, the data belongs to a confidential table, or you do not have access. If the tag belongs to a confidential table and you do have access, please supply the business_unit, asset, data_security_level and data_type"
)

# Construct full tablename from output
table_name = f"""{row["CatalogName"]}.{row["SchemaName"]}.{row["DataTable"]}"""

# Store table names along with tags in dict (all tags that share table under same key)
if table_name not in tag_table_mapping:
tag_table_mapping[table_name] = []

tag_table_mapping[table_name].append(row["TagName"])

return tag_table_mapping


def split_table_name(str):

try:
# Retireve parts by splitting string
parts = str.split('.')
parts = str.split(".")
business_unit = parts[0]
schema = parts[1]
asset_security_type = parts[2].split('_')
asset_security_type = parts[2].split("_")

# check if of correct format
if schema != "sensors" and ("events" not in str or "metadata" not in str):
raise Exception()

# Get the asset, data security level and type
asset = asset_security_type[0].lower()
data_security_level = asset_security_type[1].lower()
data_type = asset_security_type[len(asset_security_type) - 1].lower() # i.e. the final part


data_type = asset_security_type[
len(asset_security_type) - 1
].lower() # i.e. the final part

# Return the formatted object
return {
"business_unit": business_unit,
"asset": asset,
"data_security_level": data_security_level,
"data_type": data_type
"data_type": data_type,
}
except Exception as e:
raise Exception("Unsupported table name format supplied. Please use the format 'businessunit.schema.asset.datasecurityevel_events_datatype")
raise Exception(
"Unsupported table name format supplied. Please use the format 'businessunit.schema.asset.datasecurityevel_events_datatype"
)


def concatenate_dfs_and_order(dfs_arr: List[DataFrame], pivot: bool, tags: list):
if pivot:
# If pivoted, then must add columns horizontally
concat_df = pd.concat(dfs_arr, axis=1, ignore_index=False)
concat_df = concat_df.loc[:,~concat_df.columns.duplicated()]
concat_df = concat_df.loc[:, ~concat_df.columns.duplicated()]

# reorder columns so that they match the order of the tags provided
time_col = concat_df.columns.to_list()[0]
cols = [time_col, *tags]
concat_df = concat_df[cols]

else:
# Otherwise, can concat vertically
concat_df = pd.concat(dfs_arr, axis=0, ignore_index=True)

return concat_df


Expand Down
Loading

0 comments on commit ea30660

Please sign in to comment.