Skip to content

Commit

Permalink
ENH: Add support to BQL queries (#28)
Browse files Browse the repository at this point in the history
Add support BQL

---------

Authored-by: Pedro Teles <[email protected]>
  • Loading branch information
avantgardeam authored Jul 26, 2023
1 parent cd3c8f2 commit efb6f15
Showing 1 changed file with 134 additions and 4 deletions.
138 changes: 134 additions & 4 deletions src/blp/blp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
from numbers import Number
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Sequence, Union
import json

import blpapi
import pandas
Expand Down Expand Up @@ -490,6 +491,7 @@ class BlpQuery(BlpSession):
"FieldListRequest": "//blp/apiflds",
"instrumentListRequest": "//blp/instruments",
"GetFills": "//blp/emsx.history",
"sendQuery": "//blp/bqlsvc",
}

def __init__(
Expand Down Expand Up @@ -820,9 +822,7 @@ def beqs(
query = create_eqs_query(screen_name, screen_type, overrides, options)
df = self.query(query, self.parser, self.collect_to_beqs)
columns = ["security"] + [col for col in df.columns if col != "security"]
df = (
df.sort_values("security").reset_index(drop=True).loc[:, columns]
) # Ticker = security
df = df.sort_values("security").reset_index(drop=True).loc[:, columns] # Ticker = security
return df

def collect_to_beqs(self, responses: Iterable) -> pandas.DataFrame:
Expand All @@ -838,6 +838,81 @@ def collect_to_beqs(self, responses: Iterable) -> pandas.DataFrame:
df = pandas.DataFrame(rows)
return self.cast_columns(df, fields)

def bql(
self,
expression: str,
overrides: Optional[Sequence] = None,
options: Optional[Dict] = None,
) -> pandas.DataFrame:
"""Bloomberg query language request.
Args:
expression: BQL expression
overrides: List of tuples containing the field to override and its value
options: key value pairs to to set in request
Returns: A pandas.DataFrame with columns ["security", "field", "secondary_name", "secondary_value", "value"]
Examples:
>>> bquery = blp.BlpQuery().start()
>>> bquery.bql(expression="get(px_last()) for(['AAPL US Equity', 'IBM US Equity'])")
The resulting DataFrame will look like this:
security field secondary_name secondary_value value
0 AAPL US Equity px_last() CURRENCY USD 192.755005
1 IBM US Equity px_last() CURRENCY USD 139.289993
2 AAPL US Equity px_last() DATE 2023-07-24T00:00:00Z 192.755005
3 IBM US Equity px_last() DATE 2023-07-24T00:00:00Z 139.289993
"""
query = create_bql_query(expression, overrides, options)

bql_parser = BlpParser(
processor_steps=[
BlpParser._clean_bql_response,
BlpParser._validate_event,
BlpParser._validate_response_error,
]
)

df = self.query(query, bql_parser, self.collect_to_bql)

df = df[["id", "field", "secondary_name", "secondary_value", "value"]]

df = df.rename(columns={"id": "security"})

return df

def collect_to_bql(self, responses: Iterable) -> pandas.DataFrame:
"""Collector for bql()."""
data = []
fields = {"secondary_name", "secondary_value", "field", "id", "value"}
for field in responses:
field_df = pandas.DataFrame(field)

id_vars = ["field", "id", "value"]
secondary_columns = field_df.columns.difference(id_vars)

if len(secondary_columns) == 0:
# If we dont have any secondary columns, we just add empty columns
field_df["secondary_name"] = None
field_df["secondary_value"] = None
else:
# If we have multiple secondary columns, we need to melt the dataframe
field_df = field_df.melt(
id_vars=id_vars,
value_vars=field_df.columns.difference(id_vars),
var_name="secondary_name",
value_name="secondary_value",
)

column_order = ["secondary_name", "secondary_value", "field", "id", "value"]
field_df = field_df[column_order]

data.append(field_df)

df = pandas.concat(data)
return self.cast_columns(df, fields)

def bdp(
self,
securities: Sequence[str],
Expand Down Expand Up @@ -1073,7 +1148,6 @@ class BlpParser:
"""

def __init__(self, processor_steps: Optional[Sequence] = None, raise_security_errors: bool = True):

if processor_steps is None and raise_security_errors:
processor_steps = [
self._validate_event,
Expand All @@ -1092,6 +1166,22 @@ def __init__(self, processor_steps: Optional[Sequence] = None, raise_security_er
]
self._processor_steps = processor_steps

@staticmethod
def _clean_bql_response(response, _):
"""
The purpose of this method is to standardize a BQL (Bloomberg Query Language) response.
BQL responses differ from standard responses, hence the need for cleanup to make them more consistent.
"""
aux = json.loads(response["message"]["element"])

if aux["responseExceptions"]:
aux["responseError"] = aux["responseExceptions"][0]["message"]
del aux["responseExceptions"]

response["message"]["element"] = {"BQLResponse": aux}

return response

@staticmethod
def _validate_event(response, _):
if response["eventType"] not in (blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE):
Expand Down Expand Up @@ -1334,6 +1424,8 @@ def __call__(self, response, request_data):
sec_data_parser = self._parse_tick_security_data
elif rtype == "BeqsResponse":
sec_data_parser = self._parse_equity_screening_data
elif rtype == "BQLResponse":
sec_data_parser = self._parse_bql_data
elif rtype == "fieldResponse":
sec_data_parser = self._parse_field_info_data
elif rtype == "InstrumentListResponse":
Expand All @@ -1347,6 +1439,7 @@ def __call__(self, response, request_data):
"IntradayBarResponse",
"IntradayTickResponse",
"BeqsResponse",
"BQLResponse",
"fieldResponse",
"InstrumentListResponse",
"GetFillsResponse",
Expand Down Expand Up @@ -1444,6 +1537,25 @@ def _parse_equity_screening_data(response, _):
}
yield result

@staticmethod
def _parse_bql_data(response, _):
rtype = list(response["message"]["element"].keys())[0]
response_data = response["message"]["element"][rtype]["results"]

for field in response_data.values():
# ID column may be a security ticker
field_data = {
"field": field["name"],
"id": field["idColumn"]["values"],
"value": field["valuesColumn"]["values"],
}

# Secondary columns may be DATE or CURRENCY, for example
for secondary_column in field["secondaryColumns"]:
field_data[secondary_column["name"]] = secondary_column["values"]

yield field_data

@staticmethod
def _parse_field_info_data(response, request_data):
rtype = "fieldResponse"
Expand Down Expand Up @@ -1529,6 +1641,24 @@ def create_query(request_type: str, values: Dict, overrides: Optional[Sequence]
return request_dict


def create_bql_query(
expression: str,
overrides: Optional[Sequence] = None,
options: Optional[Dict] = None,
) -> Dict:
"""Create a sendQuery dictionary request.
Args:
expression: BQL query string
Returns: A dictionary representation of a blpapi.Request
"""
values = {"expression": expression}
if options:
values.update(options)
return create_query("sendQuery", values, overrides)


def create_eqs_query(
screen_name: str,
screen_type: str = "PRIVATE",
Expand Down

0 comments on commit efb6f15

Please sign in to comment.