Skip to content

Commit

Permalink
ENH: Add support beqs (#26)
Browse files Browse the repository at this point in the history
Add support for beqs

---------

Co-authored-by: Matthew <[email protected]>
  • Loading branch information
avantgardeam and matthewgilbert authored Jun 21, 2023
1 parent 1014004 commit cd3c8f2
Showing 1 changed file with 86 additions and 5 deletions.
91 changes: 86 additions & 5 deletions src/blp/blp.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
_TOKEN_FAILURE = blpapi.Name("TokenGenerationFailure")
_SESSION_TERMINATED = blpapi.Name("SessionTerminated")
_SESSION_DOWN = blpapi.Name("SessionConnectionDown")
_MARKET_DATA_EVENTS = [
blpapi.Name("MarketDataEvents"),
blpapi.Name("MarketBarStart"),
blpapi.Name("MarketBarUpdate")
]
_MARKET_DATA_EVENTS = [blpapi.Name("MarketDataEvents"), blpapi.Name("MarketBarStart"), blpapi.Name("MarketBarUpdate")]

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -489,6 +485,7 @@ class BlpQuery(BlpSession):
"ReferenceDataRequest": "//blp/refdata",
"IntradayTickRequest": "//blp/refdata",
"IntradayBarRequest": "//blp/refdata",
"BeqsRequest": "//blp/refdata",
"FieldInfoRequest": "//blp/apiflds",
"FieldListRequest": "//blp/apiflds",
"instrumentListRequest": "//blp/instruments",
Expand Down Expand Up @@ -803,6 +800,44 @@ def collect_to_bdh(self, responses: Iterable) -> Dict[str, pandas.DataFrame]:

return dfs

def beqs(
self,
screen_name: str,
screen_type: str = "PRIVATE",
overrides: Optional[Sequence] = None,
options: Optional[Dict] = None,
) -> pandas.DataFrame:
"""Bloomberg equity screening request.
Args:
screen_name: name of the screen
screen_type: type of screen, either 'PRIVATE' or 'GLOBAL'
overrides: List of tuples containing the field to override and its value
options: key value pairs to to set in request
Returns: A pandas.DataFrame with columns ['security', eqs_data[0], ...]
"""
query = create_eqs_query(screen_name, screen_type, overrides, options)
df = self.query(query, self.parser, self.collect_to_beqs)
columns = ["security"] + [col for col in df.columns if col != "security"]
df = (
df.sort_values("security").reset_index(drop=True).loc[:, columns]
) # Ticker = security
return df

def collect_to_beqs(self, responses: Iterable) -> pandas.DataFrame:
"""Collector for beqs()."""
rows = []
fields = {"security"}
for response in responses:
data = response["data"]
# possible some fields are missing for different securities
fields = fields.union(set(response["fields"]))
data["security"] = response["security"]
rows.append(data)
df = pandas.DataFrame(rows)
return self.cast_columns(df, fields)

def bdp(
self,
securities: Sequence[str],
Expand Down Expand Up @@ -1070,6 +1105,7 @@ def _validate_response_type(response, _):
"ReferenceDataResponse",
"HistoricalDataResponse",
"IntradayBarResponse",
"BeqsResponse",
"IntradayTickResponse",
"fieldResponse",
"InstrumentListResponse",
Expand All @@ -1093,6 +1129,7 @@ def _process_field_exception(response, _):
if rtype in (
"IntradayBarResponse",
"IntradayTickResponse",
"BeqsResponse",
"fieldResponse",
"InstrumentListResponse",
"GetFillsResponse",
Expand Down Expand Up @@ -1135,6 +1172,7 @@ def _validate_security_error(response, _):
if rtype in (
"IntradayBarResponse",
"IntradayTickResponse",
"BeqsResponse",
"fieldResponse",
"InstrumentListResponse",
"GetFillsResponse",
Expand All @@ -1155,6 +1193,7 @@ def _warn_security_error(response, _):
if rtype in (
"IntradayBarResponse",
"IntradayTickResponse",
"BeqsResponse",
"fieldResponse",
"InstrumentListResponse",
"GetFillsResponse",
Expand Down Expand Up @@ -1293,6 +1332,8 @@ def __call__(self, response, request_data):
sec_data_parser = self._parse_bar_security_data
elif rtype == "IntradayTickResponse":
sec_data_parser = self._parse_tick_security_data
elif rtype == "BeqsResponse":
sec_data_parser = self._parse_equity_screening_data
elif rtype == "fieldResponse":
sec_data_parser = self._parse_field_info_data
elif rtype == "InstrumentListResponse":
Expand All @@ -1305,6 +1346,7 @@ def __call__(self, response, request_data):
"HistoricalDataResponse",
"IntradayBarResponse",
"IntradayTickResponse",
"BeqsResponse",
"fieldResponse",
"InstrumentListResponse",
"GetFillsResponse",
Expand Down Expand Up @@ -1388,6 +1430,20 @@ def _parse_tick_security_data(response, request_data):
}
yield result

@staticmethod
def _parse_equity_screening_data(response, _):
rtype = list(response["message"]["element"].keys())[0]
response_data = response["message"]["element"][rtype]["data"]
fields = list(response_data["fieldDisplayUnits"]["fieldDisplayUnits"].keys())

for sec_data in response_data["securityData"]:
result = {
"security": sec_data["securityData"]["security"],
"fields": fields,
"data": sec_data["securityData"]["fieldData"]["fieldData"],
}
yield result

@staticmethod
def _parse_field_info_data(response, request_data):
rtype = "fieldResponse"
Expand Down Expand Up @@ -1473,6 +1529,31 @@ def create_query(request_type: str, values: Dict, overrides: Optional[Sequence]
return request_dict


def create_eqs_query(
screen_name: str,
screen_type: str = "PRIVATE",
overrides: Optional[Sequence] = None,
options: Optional[Dict] = None,
) -> Dict:
"""Create a BeqsRequest dictionary request.
Args:
screen_name: name of the screen
screen_type: type of screen; either PRIVATE or GLOBAL
overrides: List of tuples containing the field to override and its value
options: key value pairs to to set in request
Returns: A dictionary representation of a blpapi.Request
"""
values = {
"screenName": screen_name,
"screenType": screen_type,
}
if options:
values.update(options)
return create_query("BeqsRequest", values, overrides)


def create_historical_query(
securities: Union[str, Sequence[str]],
fields: Union[str, Sequence[str]],
Expand Down

0 comments on commit cd3c8f2

Please sign in to comment.