diff --git a/src/blp/blp.py b/src/blp/blp.py index e4e8e45..19f63c7 100644 --- a/src/blp/blp.py +++ b/src/blp/blp.py @@ -5,6 +5,7 @@ import threading from numbers import Number from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Sequence, Union +import json import blpapi import pandas @@ -490,6 +491,7 @@ class BlpQuery(BlpSession): "FieldListRequest": "//blp/apiflds", "instrumentListRequest": "//blp/instruments", "GetFills": "//blp/emsx.history", + "sendQuery": "//blp/bqlsvc", } def __init__( @@ -820,9 +822,7 @@ def beqs( query = create_eqs_query(screen_name, screen_type, overrides, options) df = self.query(query, self.parser, self.collect_to_beqs) columns = ["security"] + [col for col in df.columns if col != "security"] - df = ( - df.sort_values("security").reset_index(drop=True).loc[:, columns] - ) # Ticker = security + df = df.sort_values("security").reset_index(drop=True).loc[:, columns] # Ticker = security return df def collect_to_beqs(self, responses: Iterable) -> pandas.DataFrame: @@ -838,6 +838,81 @@ def collect_to_beqs(self, responses: Iterable) -> pandas.DataFrame: df = pandas.DataFrame(rows) return self.cast_columns(df, fields) + def bql( + self, + expression: str, + overrides: Optional[Sequence] = None, + options: Optional[Dict] = None, + ) -> pandas.DataFrame: + """Bloomberg query language request. + + Args: + expression: BQL expression + overrides: List of tuples containing the field to override and its value + options: key value pairs to to set in request + + Returns: A pandas.DataFrame with columns ["security", "field", "secondary_name", "secondary_value", "value"] + + Examples: + >>> bquery = blp.BlpQuery().start() + >>> bquery.bql(expression="get(px_last()) for(['AAPL US Equity', 'IBM US Equity'])") + + The resulting DataFrame will look like this: + security field secondary_name secondary_value value + 0 AAPL US Equity px_last() CURRENCY USD 192.755005 + 1 IBM US Equity px_last() CURRENCY USD 139.289993 + 2 AAPL US Equity px_last() DATE 2023-07-24T00:00:00Z 192.755005 + 3 IBM US Equity px_last() DATE 2023-07-24T00:00:00Z 139.289993 + """ + query = create_bql_query(expression, overrides, options) + + bql_parser = BlpParser( + processor_steps=[ + BlpParser._clean_bql_response, + BlpParser._validate_event, + BlpParser._validate_response_error, + ] + ) + + df = self.query(query, bql_parser, self.collect_to_bql) + + df = df[["id", "field", "secondary_name", "secondary_value", "value"]] + + df = df.rename(columns={"id": "security"}) + + return df + + def collect_to_bql(self, responses: Iterable) -> pandas.DataFrame: + """Collector for bql().""" + data = [] + fields = {"secondary_name", "secondary_value", "field", "id", "value"} + for field in responses: + field_df = pandas.DataFrame(field) + + id_vars = ["field", "id", "value"] + secondary_columns = field_df.columns.difference(id_vars) + + if len(secondary_columns) == 0: + # If we dont have any secondary columns, we just add empty columns + field_df["secondary_name"] = None + field_df["secondary_value"] = None + else: + # If we have multiple secondary columns, we need to melt the dataframe + field_df = field_df.melt( + id_vars=id_vars, + value_vars=field_df.columns.difference(id_vars), + var_name="secondary_name", + value_name="secondary_value", + ) + + column_order = ["secondary_name", "secondary_value", "field", "id", "value"] + field_df = field_df[column_order] + + data.append(field_df) + + df = pandas.concat(data) + return self.cast_columns(df, fields) + def bdp( self, securities: Sequence[str], @@ -1073,7 +1148,6 @@ class BlpParser: """ def __init__(self, processor_steps: Optional[Sequence] = None, raise_security_errors: bool = True): - if processor_steps is None and raise_security_errors: processor_steps = [ self._validate_event, @@ -1092,6 +1166,22 @@ def __init__(self, processor_steps: Optional[Sequence] = None, raise_security_er ] self._processor_steps = processor_steps + @staticmethod + def _clean_bql_response(response, _): + """ + The purpose of this method is to standardize a BQL (Bloomberg Query Language) response. + BQL responses differ from standard responses, hence the need for cleanup to make them more consistent. + """ + aux = json.loads(response["message"]["element"]) + + if aux["responseExceptions"]: + aux["responseError"] = aux["responseExceptions"][0]["message"] + del aux["responseExceptions"] + + response["message"]["element"] = {"BQLResponse": aux} + + return response + @staticmethod def _validate_event(response, _): if response["eventType"] not in (blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE): @@ -1334,6 +1424,8 @@ def __call__(self, response, request_data): sec_data_parser = self._parse_tick_security_data elif rtype == "BeqsResponse": sec_data_parser = self._parse_equity_screening_data + elif rtype == "BQLResponse": + sec_data_parser = self._parse_bql_data elif rtype == "fieldResponse": sec_data_parser = self._parse_field_info_data elif rtype == "InstrumentListResponse": @@ -1347,6 +1439,7 @@ def __call__(self, response, request_data): "IntradayBarResponse", "IntradayTickResponse", "BeqsResponse", + "BQLResponse", "fieldResponse", "InstrumentListResponse", "GetFillsResponse", @@ -1444,6 +1537,25 @@ def _parse_equity_screening_data(response, _): } yield result + @staticmethod + def _parse_bql_data(response, _): + rtype = list(response["message"]["element"].keys())[0] + response_data = response["message"]["element"][rtype]["results"] + + for field in response_data.values(): + # ID column may be a security ticker + field_data = { + "field": field["name"], + "id": field["idColumn"]["values"], + "value": field["valuesColumn"]["values"], + } + + # Secondary columns may be DATE or CURRENCY, for example + for secondary_column in field["secondaryColumns"]: + field_data[secondary_column["name"]] = secondary_column["values"] + + yield field_data + @staticmethod def _parse_field_info_data(response, request_data): rtype = "fieldResponse" @@ -1529,6 +1641,24 @@ def create_query(request_type: str, values: Dict, overrides: Optional[Sequence] return request_dict +def create_bql_query( + expression: str, + overrides: Optional[Sequence] = None, + options: Optional[Dict] = None, +) -> Dict: + """Create a sendQuery dictionary request. + + Args: + expression: BQL query string + + Returns: A dictionary representation of a blpapi.Request + """ + values = {"expression": expression} + if options: + values.update(options) + return create_query("sendQuery", values, overrides) + + def create_eqs_query( screen_name: str, screen_type: str = "PRIVATE",