Skip to content

Commit

Permalink
[BugFix] Fix Parsing Error In SEC Form13F (#6961)
Browse files Browse the repository at this point in the history
* fix parsing error

* remove commented out item
  • Loading branch information
deeleeramone authored Nov 25, 2024
1 parent 23c8780 commit 99b7614
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
from datetime import date as dateType
from typing import Literal, Optional

from pydantic import Field, field_validator

from openbb_core.provider.abstract.data import Data
from openbb_core.provider.abstract.query_params import QueryParams
from openbb_core.provider.utils.descriptions import (
QUERY_DESCRIPTIONS,
)
from pydantic import Field, field_validator


class Form13FHRQueryParams(QueryParams):
Expand Down Expand Up @@ -60,7 +59,7 @@ class Form13FHRData(Data):
)
security_type: Optional[Literal["SH", "PRN"]] = Field(
default=None,
description="The total number of shares of the class of security"
description="Whether the principal amount represents the number of shares"
+ " or the principal amount of such class."
+ " 'SH' for shares. 'PRN' for principal amount."
+ " Convertible debt securities are reported as 'PRN'.",
Expand All @@ -70,24 +69,29 @@ class Form13FHRData(Data):
description="Defined when the holdings being reported are put or call options."
+ " Only long positions are reported.",
)
investment_discretion: Optional[str] = Field(
default=None,
description="The investment discretion held by the Manager."
+ " Sole, shared-defined (DFN), or shared-other (OTR).",
)
voting_authority_sole: Optional[int] = Field(
default=None,
description="The number of shares for which the Manager"
+ " exercises sole voting authority (none).",
+ " exercises sole voting authority.",
)
voting_authority_shared: Optional[int] = Field(
default=None,
description="The number of shares for which the Manager"
+ " exercises a defined shared voting authority (none).",
+ " exercises a defined shared voting authority.",
)
voting_authority_other: Optional[int] = Field(
voting_authority_none: Optional[int] = Field(
default=None,
description="The number of shares for which the Manager"
+ " exercises other shared voting authority (none).",
+ " exercises no voting authority.",
)
principal_amount: int = Field(
description="The total number of shares of the class of security"
+ " or the principal amount of such class."
+ " or the principal amount of such class. Defined by the 'security_type'."
+ " Only long positions are reported"
)
value: int = Field(
Expand Down
74 changes: 43 additions & 31 deletions openbb_platform/providers/sec/openbb_sec/models/form_13FHR.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# pylint: disable =unused-argument

from typing import Any, Dict, List, Optional
from typing import Any, Optional

from openbb_core.provider.abstract.fetcher import Fetcher
from openbb_core.provider.standard_models.form_13FHR import (
Expand Down Expand Up @@ -35,61 +35,73 @@ class SecForm13FHRData(Form13FHRData):
)


class SecForm13FHRFetcher(Fetcher[SecForm13FHRQueryParams, List[SecForm13FHRData]]):
class SecForm13FHRFetcher(Fetcher[SecForm13FHRQueryParams, list[SecForm13FHRData]]):
"""SEC Form 13F-HR Fetcher."""

@staticmethod
def transform_query(params: Dict[str, Any]) -> SecForm13FHRQueryParams:
def transform_query(params: dict[str, Any]) -> SecForm13FHRQueryParams:
"""Transform the query."""
return SecForm13FHRQueryParams(**params)

@staticmethod
async def aextract_data(
query: SecForm13FHRQueryParams,
credentials: Optional[Dict[str, str]],
credentials: Optional[dict[str, str]],
**kwargs: Any,
) -> List[Dict]:
) -> list[dict]:
"""Return the raw data from the SEC endpoint."""
# pylint: disable=import-outside-toplevel
import asyncio # noqa
from openbb_sec.utils import parse_13f # noqa
from openbb_core.app.model.abstract.error import OpenBBError
from openbb_core.provider.utils.errors import EmptyDataError
from openbb_sec.utils import parse_13f

symbol = query.symbol
urls = []
urls: list = []
cik = symbol.isnumeric()
filings = (
await parse_13f.get_13f_candidates(symbol=symbol)
if cik is False
else await parse_13f.get_13f_candidates(cik=symbol)
)
if query.limit and query.date is None:
urls = filings.iloc[: query.limit].to_list()
if query.date is not None:
date = parse_13f.date_to_quarter_end(query.date.strftime("%Y-%m-%d"))
filings.index = filings.index.astype(str)
urls = [filings.loc[date]]
try:
filings = (
await parse_13f.get_13f_candidates(symbol=symbol)
if cik is False
else await parse_13f.get_13f_candidates(cik=symbol)
)
if query.limit and query.date is None:
urls = filings.iloc[: query.limit].to_list()
if query.date is not None:
date = parse_13f.date_to_quarter_end(query.date.strftime("%Y-%m-%d"))
filings.index = filings.index.astype(str)
urls = [filings.loc[date]]

results = []
results: list = []

async def get_filing(url):
"""Get a single 13F-HR filing and parse it."""
async def get_filing(url):
"""Get a single 13F-HR filing and parse it."""
data = await parse_13f.parse_13f_hr(url)

data = await parse_13f.parse_13f_hr(url)
if len(data) > 0:
results.extend(data)

if len(data) > 0:
results.extend(data.to_dict("records"))
await asyncio.gather(*[get_filing(url) for url in urls])

await asyncio.gather(*[get_filing(url) for url in urls])
if not results:
raise EmptyDataError("No data was returned with the given parameters.")

return sorted(
results, key=lambda d: [d["period_ending"], d["weight"]], reverse=True
)
return results
except OpenBBError as e:
raise e from e

@staticmethod
def transform_data(
query: SecForm13FHRQueryParams,
data: List[Dict],
data: list[dict],
**kwargs: Any,
) -> List[SecForm13FHRData]:
) -> list[SecForm13FHRData]:
"""Transform the data."""
return [SecForm13FHRData.model_validate(d) for d in data]
return [
SecForm13FHRData.model_validate(d)
for d in sorted(
data,
key=lambda d: [d["period_ending"], d["weight"]],
reverse=True,
)
]
4 changes: 4 additions & 0 deletions openbb_platform/providers/sec/openbb_sec/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def get_all_companies(use_cache: bool = True) -> DataFrame:
cache=SQLiteBackend(cache_dir, expire_after=3600 * 24 * 2)
) as session:
try:
await session.delete_expired_responses()
response = await amake_request(url, headers=SEC_HEADERS, session=session) # type: ignore
finally:
await session.close()
Expand Down Expand Up @@ -71,6 +72,7 @@ async def callback(response, session):
cache=SQLiteBackend(cache_dir, expire_after=3600 * 24 * 2)
) as session:
try:
await session.delete_expired_responses()
response = await amake_request(url, headers=SEC_HEADERS, session=session, response_callback=callback) # type: ignore
finally:
await session.close()
Expand Down Expand Up @@ -104,6 +106,7 @@ async def get_mf_and_etf_map(use_cache: bool = True) -> DataFrame:
cache=SQLiteBackend(cache_dir, expire_after=3600 * 24 * 2)
) as session:
try:
await session.delete_expired_responses()
response = await amake_request(url, headers=SEC_HEADERS, session=session, response_callback=sec_callback) # type: ignore
finally:
await session.close()
Expand Down Expand Up @@ -329,6 +332,7 @@ async def get_nport_candidates(symbol: str, use_cache: bool = True) -> List[Dict
cache_dir = f"{get_user_cache_directory()}/http/sec_etf"
async with CachedSession(cache=SQLiteBackend(cache_dir)) as session:
try:
await session.delete_expired_responses()
response = await amake_request(url, session=session, headers=HEADERS, response_callback=sec_callback) # type: ignore
finally:
await session.close()
Expand Down
92 changes: 52 additions & 40 deletions openbb_platform/providers/sec/openbb_sec/utils/parse_13f.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
"""Utility functions for parsing SEC Form 13F-HR."""

from typing import Any, Dict, Optional
from typing import Any, Optional

from openbb_core.app.model.abstract.error import OpenBBError
from openbb_core.provider.utils.helpers import amake_request
from openbb_sec.models.company_filings import SecCompanyFilingsFetcher
from openbb_sec.utils.definitions import HEADERS
from pandas import DataFrame, offsets, to_datetime


def date_to_quarter_end(date: str) -> str:
"""Convert a date to the end of the calendar quarter."""
# pylint: disable=import-outside-toplevel
from pandas import to_datetime
from pandas.tseries.offsets import QuarterEnd

return (
(to_datetime(date).to_period("Q").to_timestamp("D") + offsets.QuarterEnd())
(to_datetime(date).to_period("Q").to_timestamp("D") + QuarterEnd())
.date()
.strftime("%Y-%m-%d")
)


async def get_13f_candidates(symbol: Optional[str] = None, cik: Optional[str] = None):
"""Get the 13F-HR filings for a given symbol or CIK."""
# pylint: disable=import-outside-toplevel
from openbb_sec.models.company_filings import SecCompanyFilingsFetcher
from pandas import DataFrame, to_datetime

fetcher = SecCompanyFilingsFetcher()
params: Dict[str, Any] = {}
params: dict[str, Any] = {}
if cik is not None:
params["cik"] = str(cik)
if symbol is not None:
Expand Down Expand Up @@ -56,24 +60,31 @@ async def complete_submission_callback(response, _):

async def get_complete_submission(url: str):
"""Get the Complete Submission TXT file string from the SEC API."""
# pylint: disable=import-outside-toplevel
from openbb_core.provider.utils.helpers import amake_request
from openbb_sec.utils.definitions import HEADERS

return await amake_request(
url, headers=HEADERS, response_callback=complete_submission_callback
)


def parse_header(filing_str: str) -> Dict:
def parse_header(filing_str: str) -> dict:
"""Parse the header of a Complete Submission TXT file string."""
# pylint: disable=import-outside-toplevel
import xmltodict
from bs4 import BeautifulSoup

header_dict = {}
header_dict: dict = {}
soup = (
filing_str
if filing_str.__class__.__name__ == "BeautifulSoup"
else BeautifulSoup(filing_str, "xml")
)
try:
soup = BeautifulSoup(filing_str, "lxml-xml")
header_xml = soup.find("headerData")
header_dict = xmltodict.parse(str(header_xml))["headerData"]
except KeyError:
soup = BeautifulSoup(filing_str, features="lxml")
header_xml = soup.find("type")
header_dict = xmltodict.parse(str(header_xml)).get("type")
if header_dict:
Expand Down Expand Up @@ -116,20 +127,17 @@ async def parse_13f_hr(filing: str):
# pylint: disable=import-outside-toplevel
import xmltodict
from bs4 import BeautifulSoup

data = DataFrame()
from numpy import nan
from pandas import DataFrame, to_datetime

# Check if the input string is a URL
if filing.startswith("https://"):
filing = await get_complete_submission(filing) # type: ignore

# Validate the submission so we know that we can parse it.
if get_submission_type(filing) not in ("13F-HR", "13F-HR/A"):
raise OpenBBError("Submission type is not 13F-HR.")

soup = BeautifulSoup(filing, "lxml-xml")
soup = BeautifulSoup(filing, "xml")

info_table = soup.find_all("informationTable")

if not info_table:
info_table = soup.find_all("table")[-1]

Expand All @@ -144,57 +152,61 @@ async def parse_13f_hr(filing: str):
+ " Documents filed before Q2 2013 are not supported."
)

period_ending = get_period_ending(filing)
period_ending = get_period_ending(soup)
data = (
DataFrame(parsed_xml)
if isinstance(parsed_xml, list)
else DataFrame([parsed_xml])
)
data.columns = data.columns.str.replace("ns1:", "")
data["value"] = data["value"].astype(int)
security_type = []
principal_amount = []
data.loc[:, "value"] = data["value"].astype(int)
security_type: list = []
principal_amount: list = []

# Unpack the nested objects
try:
security_type = [d.get("sshPrnamtType") for d in data["shrsOrPrnAmt"]]
data["security_type"] = security_type
principal_amount = [d.get("sshPrnamt") for d in data["shrsOrPrnAmt"]]
data["principal_amount"] = principal_amount
data.pop("shrsOrPrnAmt")
data.loc[:, "security_type"] = security_type
principal_amount = [int(d.get("sshPrnamt", 0)) for d in data["shrsOrPrnAmt"]]
data.loc[:, "principal_amount"] = principal_amount
_ = data.pop("shrsOrPrnAmt")
except ValueError:
pass
try:
sole = [d.get("Sole") for d in data["votingAuthority"]]
shared = [d.get("Shared") for d in data["votingAuthority"]]
none = [d.get("None") for d in data["votingAuthority"]]
data["voting_authority_sole"] = [int(s) for s in sole]
data["voting_authority_shared"] = [int(s) for s in shared]
data["voting_authority_none"] = [int(s) for s in none]
data.pop("votingAuthority")
data.loc[:, "voting_authority_sole"] = [int(s) if s else 0 for s in sole]
data.loc[:, "voting_authority_shared"] = [int(s) if s else 0 for s in shared]
data.loc[:, "voting_authority_none"] = [int(s) if s else 0 for s in none]
_ = data.pop("votingAuthority")
except ValueError:
pass

if "putCall" in data.columns:
data.loc[:, "putCall"] = data["putCall"].fillna("--")

# Add the period ending so that the filing is identified when multiple are requested.
data["period_ending"] = to_datetime(period_ending, yearfirst=False).date()

df = DataFrame(data)
df["principal_amount"] = df["principal_amount"].astype(int)

# Aggregate the data because there are multiple entries for each security and we need the totals.
# We break it down by CUSIP, security type, and option type.
agg_index = ["cusip", "security_type", "putCall"]
agg_index = [
"period_ending",
"nameOfIssuer",
"cusip",
"titleOfClass",
"security_type",
"putCall",
"investmentDiscretion",
]
agg_columns = {
"period_ending": "first",
"nameOfIssuer": "first",
"titleOfClass": "first",
"value": "sum",
"principal_amount": "sum",
"voting_authority_sole": "sum",
"voting_authority_shared": "sum",
"voting_authority_none": "sum",
}

# Only aggregate columns that exist in the DataFrame
agg_columns = {k: v for k, v in agg_columns.items() if k in df.columns}
agg_index = [k for k in agg_index if k in df.columns]
Expand All @@ -213,7 +225,7 @@ async def parse_13f_hr(filing: str):

return (
df.reset_index()
.fillna("N/A")
.replace({nan: None, "--": None})
.sort_values(by="weight", ascending=False)
.replace("N/A", None)
.to_dict("records")
)

0 comments on commit 99b7614

Please sign in to comment.