diff --git a/dart_fss/filings/reports.py b/dart_fss/filings/reports.py
index b48bc21..13afa03 100644
--- a/dart_fss/filings/reports.py
+++ b/dart_fss/filings/reports.py
@@ -359,6 +359,11 @@ def attached_files(): return [x for x in self.attached_files if determinant(x.fi
@property
def xbrl(self):
+ if self._xbrl is None:
+ self.load_xbrl()
+ return self._xbrl
+
+ def load_xbrl(self):
""" XBRL 데이터 반환"""
import tempfile
if self._xbrl is None:
diff --git a/dart_fss/filings/search_result.py b/dart_fss/filings/search_result.py
index 05c874e..3d65ab7 100644
--- a/dart_fss/filings/search_result.py
+++ b/dart_fss/filings/search_result.py
@@ -57,6 +57,10 @@ def to_dict(self) -> Dict:
'report_list': [x.to_dict() for x in self.report_list]
}
+ def pop(self, index=-1):
+ """ 주어진 index 의 리포트를 반환하며, 리스트에서 삭제하는 함수"""
+ return self._report_list.pop(index)
+
def __repr__(self):
from pprint import pformat
return pformat(self.to_dict())
diff --git a/dart_fss/fs/extract.py b/dart_fss/fs/extract.py
index b13304b..dec1952 100644
--- a/dart_fss/fs/extract.py
+++ b/dart_fss/fs/extract.py
@@ -1,11 +1,10 @@
# -*- coding: utf-8 -*-
import re
import math
-import copy
import numpy as np
import pandas as pd
-from typing import Union, List, Dict, Tuple, Pattern
+from typing import Union, List, Dict, Tuple, Pattern, Optional
from collections import OrderedDict
from pandas import DataFrame
from datetime import datetime
@@ -176,7 +175,7 @@ def column_ko_to_en(ko):
row_length = len(thead.find_all('tr'))
row_length = row_length + 1 if row_length == 1 else row_length
# row-sapn, col-span을 처리하기 위한 Matrix
- columns_matrix = [[None for y in range(col_length)] for x in range(row_length)]
+ columns_matrix = [[None for _y in range(col_length)] for _x in range(row_length)]
for idx, tr in enumerate(thead.find_all('tr')):
start_idx = 0
for ele_idx, element in enumerate(columns_matrix[idx]):
@@ -228,7 +227,7 @@ def column_ko_to_en(ko):
column.append(item)
continue
elif idx == 1 and (item is None or regex.search(item) is None):
- sec_item.append(label[lang][separate])
+ sec_item.append(label[lang][separate])
else:
pass
@@ -322,56 +321,66 @@ def convert_tbody_to_dataframe(columns: list, fs_table: dict):
def seek_table(tables: List, includes: Pattern,
excludes: Union[Pattern, None] = None) -> Tuple[Union[str, None], Union[str, None], Union[str, None]]:
""" Table 검색 """
+ # 날짜 검색을 위한 Regular Expression
regex = re.compile(r'\d{4}(.*?)\d{1,2}(.*?)\d{1,2}')
+
+ # Header Tag 가 아닌 경우 저장
+ not_headers = []
+
+ # Minimum Row Number
+ MIN_ROW_NUMBER = 4
+
for table in tables:
+ # Table 의 Row 가 4개 이하인 경우 재무제표 테이블이 아닌것으로 판정
+ rows = table.find_all('tr')
+ if len(rows) < MIN_ROW_NUMBER:
+ continue
+
for tag in table.previous_siblings:
+ # tag 가 tables 에 있으면 검색 종료
if tag in tables:
break
+ # tag 가 Tag Object 인 경우에만 검색 진행
if isinstance(tag, Tag):
+ # title 검색
children = tag.findChildren(text=includes)
for child in children:
title = child
if title:
title = re.sub(r'\s+', '', title)
+ # 만약 타이틀에 제외될 단어 포함시 Pass
if excludes and excludes.search(title):
+ not_headers.append(tag)
continue
+
+ # 타이틀이 너무 길때 Pass
if len(title) > 12:
- continue
- header = table.find_previous('table', class_='nb')
- if header is None:
- continue
- tr_list = header.find_all('tr')
- if len(tr_list) < 2:
+ not_headers.append(tag)
continue
- tr_cnt = 0
- for tr in tr_list:
- if regex.search(tr.text):
- tr_cnt += 1
+ headers = table.find_all_previous('table', class_='nb')
+ for header in headers:
- if tr_cnt == 0:
- found = table.find_previous(text=regex)
- if found is None:
+ # Header 가 None 이거나 not_headers 에 포함된 경우 Pass
+ if header is None or header in not_headers:
continue
- header = found.parent
- extract_text = re.sub('<.*?>', '\n', str(header))
- extract_text = extract_text.split('\n')
- html = '
'
-
- error = False
- for t in extract_text:
- if t.strip() == '':
- pass
- else:
- if len(t) > 100:
- error = True
- break
- html += '' + t + ' |
'
- if error:
+
+ # Row 가 2개 이하인 경우 Pass
+ tr_list = header.find_all('tr')
+ if len(tr_list) < 2:
continue
- html += '
'
- header = BeautifulSoup(html, 'html.parser')
- return title, header, table
+
+ # 검색된 날짜가 한개도 없을 경우 Pass
+ datetime_cnt = 0
+ for tr in tr_list:
+ if regex.search(tr.text):
+ datetime_cnt += 1
+
+ if datetime_cnt == 0:
+ continue
+
+ return title, header, table
+
return None, None, None
@@ -509,7 +518,7 @@ def analyze_html(report: Report, fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
'includes': r'재무제표 OR 감사보고서',
'excludes': r'주석 OR 결합 OR 의견 OR 수정 OR 검토보고서',
'scope': ['attached_reports', 'pages'],
- 'options': {'title': True} # 첨부보고서 및 연결보고서의 title 까지 검색
+ 'options': {'title': True} # 첨부보고서 및 연결보고서의 title 까지 검색
}
if separate:
@@ -532,7 +541,7 @@ def analyze_html(report: Report, fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
return extract_results
-def find_all_columns(df: DataFrame, query: str) -> list:
+def find_all_columns(df: DataFrame, query: str) -> pd.Index:
"""
DataFrame의 column을 검색어를 통해 검색하는 함수
@@ -561,6 +570,8 @@ def find_all_columns(df: DataFrame, query: str) -> list:
else:
if regex.search(' '.join(item)):
results.append(column)
+ if len(results) > 0:
+ results = pd.MultiIndex.from_tuples(results)
return results
@@ -570,15 +581,15 @@ def extract_account_title(title):
title = title[0]
elif len(title) > 1:
title = ''.join(title[1:])
- title = re.sub(r'\[.*?\]|\(.*?\)|<.*?>', '', title)
+ title = re.sub(r'\[.*?\]|\(.*?\)|<.*?>|[^가-힣|a-z|A-Z]', '', title)
title = re.sub(r'\s+', '', title)
return title
-def compare_df_and_ndf_label(column: Tuple[Union[str, Tuple[str]]],
- df: DataFrame, ndf: DataFrame, ldf: DataFrame,
- ndata: List[Union[float, str, None]],
- nlabels: List[str]) -> Tuple[List[Union[float, str]], List[str]]:
+def compare_df_and_ndf_label_and_concept(column: Tuple[Union[str, Tuple[str]]],
+ df: DataFrame, ndf: DataFrame, ldf: DataFrame,
+ ndata: List[Union[float, str, None]],
+ nlabels: List[str]) -> Tuple[List[Union[float, str]], List[str]]:
"""
Labels 을 시용하여 데이터를 검색하는 함수
@@ -590,6 +601,8 @@ def compare_df_and_ndf_label(column: Tuple[Union[str, Tuple[str]]],
데이터를 추가할 DataFrame
ndf: dict of { str: DataFrame }
데이터를 검색할 DataFrame
+ ldf: dict of { str: DataFrame }
+ Label DataFrame
ndata: list of float
추가할 column의 데이터 리스트
nlabels: list of str
@@ -604,47 +617,109 @@ def compare_df_and_ndf_label(column: Tuple[Union[str, Tuple[str]]],
df_label_column = find_all_columns(df, 'label_ko')[0]
ndf_label_column = find_all_columns(ndf, 'label_ko')[0]
+ concept_none_data = {}
+ df_concept_column = find_all_columns(df, 'concept_id')
+ ndf_concept_column = find_all_columns(ndf, 'concept_id')
+
+ # concept_id 컬럼이 존재하는지 여부 조사
+ concept_exist = len(df_concept_column) * len(ndf_concept_column) != 0
+ if concept_exist:
+ df_concept_column = df_concept_column[0]
+ ndf_concept_column = ndf_concept_column[0]
+
+ en_none_data = {}
+ df_en_column = find_all_columns(df, 'label_en')
+ ndf_en_column = find_all_columns(ndf, 'label_en')
+
+ # label_en 컬럼이 존재하는지 여부 조사
+ en_exist = len(df_en_column) * len(ndf_en_column) != 0
+ if en_exist:
+ df_en_column = df_en_column[0]
+ ndf_en_column = ndf_en_column[0]
+
for idx, value in enumerate(ndata):
if isinstance(value, str):
+ # 이전에 검색된 데이터가 문자인 경우 pass
pass
elif value is None:
+ # 이전에 검색된 데이터가 없는 경우 pass
pass
elif math.isnan(value):
+ # 이전에 검색된 데이터가 유효한 값이 아닌 경우 pass
pass
else:
+ # 올바른 값이 경우 검색 X
continue
+ # label 추출
label = df[df_label_column].iloc[idx]
label = re.sub(r'\s+', '', label)
label = extract_account_title(label)
label_set = set(ldf.iloc[idx])
label_set.add(label)
+ # (index, label_set) 리스트 생성
label_none_data.append((idx, label_set))
- matched = []
+ # concept_id가 존재하는 경우 concept_id도 추가로 검색
+ if concept_exist:
+ concept = df[df_concept_column].iloc[idx]
+ concept_none_data[concept] = idx
+
+ # label_en가 존재하는 경우 label_en도 추가로 검색
+ if en_exist:
+ en = df[df_en_column].iloc[idx]
+ en_none_data[en] = idx
+
+ # 기존 Dataframe index 중 사용된 결과 값 리스트
used = []
+
for idx in range(len(ndf)):
- if idx in matched:
- continue
+ # 검색된 값
+ value_found = None
+ # 검색된 기존 Dataframe 의 index
+ index_found = None
+
+ # 검색할 label 명
label = extract_account_title(ndf[ndf_label_column].iloc[idx])
- for index, label_set in label_none_data:
- if index in used:
+ if concept_exist:
+ # 추가할 Dataframe 의 concept_id
+ concept = ndf[ndf_concept_column].iloc[idx]
+ index_found = concept_none_data.get(concept)
+ if index_found in used:
continue
- if label in label_set:
- value = ndf[column].iloc[idx]
- if isinstance(value, str):
- pass
- else:
- used.append(index)
- matched.append(idx)
- ndata[index] = value
- nlabels[index] = label
+ elif index_found is not None:
+ value_found = ndf[column].iloc[idx]
+
+ if index_found is None:
+ if en_exist:
+ en = ndf[ndf_en_column].iloc[idx]
+ index_found = en_none_data.get(en)
+ if index_found in used:
+ continue
+ elif index_found is not None:
+ value_found = ndf[column].iloc[idx]
+
+ if index_found is None:
+ for index, label_set in label_none_data:
+ if index in used:
+ continue
+ if label in label_set:
+ value_found = ndf[column].iloc[idx]
+ index_found = index
+ break
+
+ if index_found is None:
+ pass
+ elif isinstance(index_found, int):
+ used.append(index_found)
+ ndata[index_found] = value_found
+ nlabels[index_found] = label
return ndata, nlabels
-def compare_df_and_ndf_value(column: Tuple[Union[str, Tuple[str]]],
+def compare_df_and_ndf_value(column: pd.Index,
df: DataFrame, ndf: DataFrame,
ndata: List[Union[float, str, None]],
nlabels: List[str]) -> Tuple[List[Union[float, str]], List[str]]:
@@ -669,34 +744,45 @@ def compare_df_and_ndf_value(column: Tuple[Union[str, Tuple[str]]],
tuple of list
추가할 column의 데이터 리스트, 추가할 column의 label 리스트
"""
- df_columns = set(df.columns.tolist())
- ndf_columns = set(ndf.columns.tolist())
- overlap = df_columns.intersection(ndf_columns)
+ _, df_columns = split_columns_concept_data(df.columns)
+ _, ndf_columns = split_columns_concept_data(ndf.columns)
+ overlap = set(df_columns).intersection(set(ndf_columns))
nko_column = find_all_columns(ndf, r'label_ko')
index_used = []
for idx in range(len(df)):
+ nvalue = None
+ nlabel = ''
for col in overlap:
- nvalue = None
- nlabel = ''
value = df[col].iloc[idx]
if isinstance(value, str):
pass
elif value is None:
pass
- elif value and math.isnan(value):
+ elif math.isnan(value):
pass
else:
+ sign = 1
+ # Ref와 일치하는 값을 가지는 row index 찾기
w = ndf[ndf[col] == value].dropna(axis=1, how='all').dropna(how='all')
+ # 만약 찾지 못하는 경우 Ref의 값의 음수와 동일한 값을 가지는 row index 찾기
+ if len(w) == 0:
+ sign = -1
+ w = ndf[ndf[col] == -value].dropna(axis=1, how='all').dropna(how='all')
+
+ found = False
if len(w) > 0:
for index in w.index.values:
if index not in index_used:
- nvalue = ndf[column].iloc[index]
+ nvalue = sign * ndf[column].iloc[index]
nlabel = ndf[nko_column].iloc[index][0]
nlabel = extract_account_title(nlabel)
index_used.append(index)
+ found = True
break
+ if found:
+ break
if nvalue and math.isnan(nvalue):
nvalue = None
@@ -705,7 +791,7 @@ def compare_df_and_ndf_value(column: Tuple[Union[str, Tuple[str]]],
return ndata, nlabels
-additional_comparison_function = [compare_df_and_ndf_label]
+additional_comparison_function = [compare_df_and_ndf_label_and_concept]
def init_label(fs_df: Dict[str, DataFrame],
@@ -757,101 +843,97 @@ def init_label(fs_df: Dict[str, DataFrame],
label_df[tp] = pd.DataFrame(columns=nlabel_columns)
if len(concept_column) == 1:
- label_df[tp][label_columns[0]] = [extract_account_title(x) for x in list(df[concept_column[0]])]
+ label_df[tp][nlabel_columns[0]] = [extract_account_title(x) for x in list(df[concept_column[0]])]
for column in date_columns:
label_df[tp][column] = list(df[ko_column])
return label_df
-def merge_fs(fs_df: Dict[str, DataFrame], label_df: Dict[str, DataFrame],
- report: Report, fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
- lang: str = 'ko', separate: bool = False):
+def merge_fs(fs_df: Dict[str, DataFrame],
+ nfs_df: Dict[str, DataFrame],
+ label_df: Dict[str, DataFrame],
+ fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf')):
"""
재무제표 DataFrame과 Report의 데이터를 합쳐주는 Method
Parameters
----------
fs_df: dict of {str: DataFrame}
- 재무제표
+ 데이터를 추가할 DataFrame
+ nfs_df: dict of {str: DataFrame}
+ 새로운 데이터를 검색할 DataFrame
label_df: dict of {str: DataFrame}
재무제표 검색결과시 추출된 값의 Label
- report: Report
- Report
fs_tp: tuple of str, optional
'bs' 재무상태표, 'is' 손익계산서, 'cis' 포괄손익계산서, 'cf' 현금흐름표
- lang: str, optional
- 'ko' 한글, 'en' 영문
- separate: bool, optional
- 개별재무제표 여부
-
Returns
-------
tuple of dict of {str: DataFrame}
재무제표, 추출된 Label 리스트
"""
- try:
- global additional_comparison_function
- # 보고서의 웹페이지에서 재무제표 추출
- nfs_df = analyze_html(report=report, fs_tp=fs_tp, lang=lang, separate=separate)
-
- for tp in fs_df:
- if tp in fs_tp:
- # 추가될 재무제표의 DataFrame
- df = fs_df[tp]
-
- # 새로 추가할 재무제표
- ndf = nfs_df[tp]
-
- # 재무제표가 없을시 추가 검색 X
- if df is None:
- if ndf is None:
- continue
- else:
- fs_df[tp] = ndf.copy(deep=True)
- df = fs_df[tp]
+ global additional_comparison_function
+
+ for tp in fs_df:
+ if tp in fs_tp:
+ # 추가될 재무제표의 DataFrame
+ df = fs_df[tp]
- # 검색된 재무제표가 없을시 추가 검색 X
+ # 새로 추가할 재무제표
+ ndf = nfs_df[tp]
+
+ # 재무제표가 없을시 추가 검색 X
+ if df is None:
if ndf is None:
continue
+ else:
+ fs_df[tp] = ndf.copy(deep=True)
+ df = fs_df[tp]
+
+ # 검색된 재무제표가 없을시 추가 검색 X
+ if ndf is None:
+ continue
- # label_df가 없을시 초기화
- if label_df.get(tp) is None:
- label_df = init_label(fs_df=fs_df, fs_tp=fs_tp, label_df=label_df)
+ # label_df가 없을시 초기화
+ if label_df.get(tp) is None:
+ label_df = init_label(fs_df=fs_df, fs_tp=fs_tp, label_df=label_df)
- df_columns = set(df.columns.tolist())
- ndf_columns = set(ndf.columns.tolist())
+ _, df_columns = split_columns_concept_data(df.columns)
+ _, ndf_columns = split_columns_concept_data(ndf.columns)
+ df_columns = set(df_columns.tolist())
+ ndf_columns = set(ndf_columns.tolist())
- overlap = df_columns.intersection(ndf_columns)
+ overlap = df_columns.intersection(ndf_columns)
- date_regex = re.compile(r'\d{8}')
- diff = [x for x in (ndf_columns - overlap) if date_regex.search(x[0])]
- diff.sort(key=lambda x: date_regex.findall(x[0])[0], reverse=True)
+ date_regex = re.compile(r'\d{8}')
+ diff = [x for x in (ndf_columns - overlap) if date_regex.search(x[0])]
+ diff.sort(key=lambda x: date_regex.findall(x[0])[0], reverse=True)
- # Data가 동일할 경우 Continue
- if len(diff) == 0:
- continue
+ # Data가 동일할 경우 Continue
+ if len(diff) == 0:
+ continue
- for column in diff:
- ndata = [None for _ in range(len(df))]
- nlabels = ['' for _ in range(len(df))]
- if len(overlap) > 0:
- ndata, nlabels = compare_df_and_ndf_value(column, df, ndf, ndata, nlabels)
+ diff = pd.MultiIndex.from_tuples(diff)
+ overlap = list(overlap)
- for compare_func in additional_comparison_function:
- ndata, nlabels = compare_func(column, df, ndf, label_df[tp], ndata, nlabels)
+ for column in diff:
+ ndata = [None for _ in range(len(df))]
+ nlabels = ['' for _ in range(len(df))]
+ if len(overlap) > 0:
+ ndata, nlabels = compare_df_and_ndf_value(column, df, ndf, ndata, nlabels)
- label_df[tp][column] = nlabels
- fs_df[tp][column] = ndata
- return fs_df, label_df
- except Exception:
- msg = 'An error occurred while fetching or analyzing {}.'.format(report.to_dict())
- raise RuntimeError(msg)
+ for compare_func in additional_comparison_function:
+ ndata, nlabels = compare_func(column, df, ndf, label_df[tp], ndata, nlabels)
+
+ label_df[tp][column] = nlabels
+ fs_df[tp][column] = ndata
+
+ return fs_df, label_df
def analyze_xbrl(report, fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'), separate: bool = False, lang: str = 'ko',
show_abstract: bool = False, show_class: bool = True, show_depth: int = 10,
- show_concept: bool = True, separator: bool = True) -> Dict[str, DataFrame]:
+ show_concept: bool = True, separator: bool = True) -> Union[Dict[str, DataFrame], None]:
"""
Report의 xbrl 파일 분석을 통한 재무제표 추출
@@ -878,7 +960,7 @@ class 표시 여부
Returns
-------
- dict of {str : DataFrame}
+ dict of {str : DataFrame} or None
pandas DataFrame
"""
@@ -933,9 +1015,29 @@ def get_cf():
return statements
-def sorting_columns(statements: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
+def split_columns_concept_data(columns: pd.Index) -> Tuple[Optional[pd.Index], Optional[pd.Index]]:
regex = re.compile(r'\d{8}')
+ concept_columns = []
+ data_columns = []
+ for column in columns:
+ df_column_date = regex.findall(column[0])
+ if len(df_column_date) == 0:
+ concept_columns.append(column)
+ else:
+ data_columns.append(column)
+ if len(concept_columns) > 0:
+ concept_columns = pd.MultiIndex.from_tuples(concept_columns)
+ else:
+ concept_columns = None
+ if len(data_columns) > 0:
+ data_columns = pd.MultiIndex.from_tuples(data_columns)
+ else:
+ data_columns = None
+ return concept_columns, data_columns
+
+
+def sorting_data_columns(columns: pd.Index) -> pd.Index:
def sorting(value):
if isinstance(value, str):
return value
@@ -943,27 +1045,34 @@ def sorting(value):
ret = [x for x in value]
return tuple(ret)
+ regex = re.compile(r'\d{8}')
+ data_columns = []
+ for column in columns:
+ df_column_date = regex.findall(column[0])
+ data_columns.append([column, df_column_date])
+
+ data_columns.sort(key=lambda x: sorting(x[1]), reverse=True)
+ data_columns = [x[0] for x in data_columns]
+ data_columns = pd.MultiIndex.from_tuples(data_columns)
+ return data_columns
+
+
+def sorting_columns(statements: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
+
for tp in statements:
df = statements[tp]
if df is None:
continue
+ concept_columns, data_columns = split_columns_concept_data(df.columns)
+ if data_columns is not None:
+ data_columns = sorting_data_columns(data_columns)
- columns = df.columns
- concept_columns = []
- date_columns = []
- for column in columns:
- df_column_date = regex.findall(column[0])
- if len(df_column_date) == 0:
- concept_columns.append(column)
- else:
- date_columns.append([column, df_column_date])
-
- date_columns.sort(key=lambda x: sorting(x[1]), reverse=True)
- date_columns = [x[0] for x in date_columns]
+ if concept_columns is not None and data_columns is not None:
+ ncolumns = concept_columns.tolist() + data_columns.tolist()
+ ncolumns = pd.MultiIndex.from_tuples(ncolumns)
+ else:
+ ncolumns = df.columns
- ncolumns = concept_columns + date_columns
- # convert list to numpy array
- ncolumns = np.array(ncolumns, dtype=object)
statements[tp] = statements[tp][ncolumns]
return statements
@@ -990,6 +1099,56 @@ def drop_empty_columns(df: Dict[str, DataFrame], label_df: bool = False) -> Dict
return df
+
+def analyze_report(report: Report,
+ fs_tp: Tuple[str] = ('bs', 'is', 'cis', 'cf'),
+ separate: bool = False,
+ lang: str = 'ko',
+ separator: bool = True) -> Dict[str, Optional[DataFrame]]:
+ # 2012년 이후 데이터만 XBRL 데이터 추출
+ year = int(report.rcept_dt[:4])
+ if year > 2011:
+ xbrl = report.xbrl
+ else:
+ xbrl = None
+
+ # XBRL File check
+ if xbrl is not None:
+ if separate is False and not xbrl.exist_consolidated():
+ raise NotFoundConsolidated('Could not find consolidated financial statements')
+ fs_df = analyze_xbrl(report, fs_tp=fs_tp, separate=separate, lang=lang,
+ show_abstract=False, show_class=True, show_depth=10,
+ show_concept=True, separator=separator)
+ else:
+ fs_df = analyze_html(report, fs_tp=fs_tp, separate=separate, lang=lang)
+
+ return fs_df
+
+
+def search_annual_report(corp_code: str,
+ bgn_de: str,
+ end_de: str = None,
+ separate: bool = False):
+
+ reports = []
+ try:
+ # 사업보고서 검색(최종보고서)
+ reports = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
+ pblntf_detail_ty='A001', page_count=100, last_reprt_at='Y')
+ except NoDataReceived:
+ # 감사보고서 검색
+ if separate:
+ pblntf_detail_ty = 'F001'
+ else:
+ pblntf_detail_ty = 'F002'
+ reports = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
+ pblntf_detail_ty=pblntf_detail_ty, page_count=100, last_reprt_at='Y')
+ finally:
+ if len(reports) == 0:
+ raise RuntimeError('Could not find an annual report')
+ return reports
+
+
def extract(corp_code: str,
bgn_de: str,
end_de: str = None,
@@ -1031,67 +1190,62 @@ def extract(corp_code: str,
else:
from tqdm import tqdm
- # 재무제표 검색 결과
- statements = None
- reports = []
- try:
- # 사업보고서 검색(최종보고서)
- reports = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
- pblntf_detail_ty='A001', page_count=100, last_reprt_at='Y')
- except NoDataReceived:
- # 감사보고서 검색
- if separate:
- pblntf_detail_ty = 'F001'
- else:
- pblntf_detail_ty = 'F002'
- reports = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
- pblntf_detail_ty=pblntf_detail_ty, page_count=100, last_reprt_at='Y')
- finally:
- if len(reports) == 0:
- raise RuntimeError('Could not find an annual report')
+ import dart_fss as dart
+ dart.utils.spinner.spinner_enable = False
- next_index = 0
- for idx, _ in enumerate(reports):
- # 가장 최근 보고서의 경우 XBRL 파일을 이용하여 재무제표 검색
- latest_report = reports[idx]
- latest_xbrl = latest_report.xbrl
- # XBRL 파일이 존재할 때
- if latest_xbrl is not None:
- if separate is False and not latest_xbrl.exist_consolidated():
+ reports = search_annual_report(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de, separate=separate)
+ try:
+ length = len(reports)
+ statements = None
+ label_df = None
+ # Spinner disable
+
+ for _ in tqdm(range(length), desc='Annual reports', unit='report'):
+ report = reports.pop(0)
+ if statements is None:
+ statements = analyze_report(report=report,
+ fs_tp=fs_tp,
+ separate=separate,
+ lang=lang,
+ separator=separator)
+ if separate is False and all([statements[tp] is None for tp in statements]):
raise NotFoundConsolidated('Could not find consolidated financial statements')
+ # initialize label dictionary
+ label_df = init_label(statements, fs_tp=fs_tp)
- # XBRL 정보를 이용하여 재무제표 정보 초기화
- analyzed_results = analyze_xbrl(latest_report, fs_tp=fs_tp, separate=separate, lang=lang,
- show_abstract=False, show_class=True,
- show_depth=10, show_concept=True, separator=separator)
- statements = copy.deepcopy(analyzed_results)
else:
- statements = analyze_html(latest_report, fs_tp=fs_tp, separate=separate, lang=lang)
- # Report 에 재무제표 정보 없이 수정 사항만 기록된 경우 다음 리포트 검색
- if statements is not None:
- next_index = idx + 1
- break
-
- if separate is False and all([statements[tp] is None for tp in statements]):
- raise NotFoundConsolidated('Could not find consolidated financial statements')
-
- # initialize label dictionary
- label_df = init_label(statements, fs_tp=fs_tp)
-
- for report in tqdm(reports[next_index:], desc='Annual reports', unit='report'):
- statements, label_df = merge_fs(statements, label_df, report, fs_tp=fs_tp, separate=separate, lang=lang)
+ nstatements = analyze_report(report=report,
+ fs_tp=fs_tp,
+ separate=separate,
+ lang=lang,
+ separator=separator)
+ statements, label_df = merge_fs(statements, nstatements, fs_tp=fs_tp, label_df=label_df)
if str_compare(report_tp, 'half') or str_compare(report_tp, 'quarter'):
half = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
pblntf_detail_ty='A002', page_count=100, last_reprt_at='Y')
- for report in tqdm(half, desc='Semiannual reports', unit='report'):
- statements, label_df = merge_fs(statements, label_df, report, fs_tp=fs_tp, separate=separate, lang=lang)
+ length = len(half)
+ for _ in tqdm(range(length), desc='Semiannual reports', unit='report'):
+ report = half.pop(0)
+ nstatements = analyze_report(report=report,
+ fs_tp=fs_tp,
+ separate=separate,
+ lang=lang,
+ separator=separator)
+ statements, label_df = merge_fs(statements, nstatements, fs_tp=fs_tp, label_df=label_df)
if str_compare(report_tp, 'quarter'):
quarter = search_filings(corp_code=corp_code, bgn_de=bgn_de, end_de=end_de,
pblntf_detail_ty='A003', page_count=100, last_reprt_at='Y')
- for report in tqdm(quarter, desc='Quarterly report', unit='report'):
- statements, label_df = merge_fs(statements, label_df, report, fs_tp=fs_tp, separate=separate, lang=lang)
+ length = len(quarter)
+ for _ in tqdm(range(length), desc='Quarterly report', unit='report'):
+ report = quarter.pop(0)
+ nstatements = analyze_report(report=report,
+ fs_tp=fs_tp,
+ separate=separate,
+ lang=lang,
+ separator=separator)
+ statements, label_df = merge_fs(statements, nstatements, fs_tp=fs_tp, label_df=label_df)
statements = drop_empty_columns(statements)
label_df = drop_empty_columns(label_df)
@@ -1108,4 +1262,12 @@ def extract(corp_code: str,
'lang': lang,
'separator': separator
}
+ # Spinner enable
+ dart.utils.spinner.spinner_enable = True
return FinancialStatement(statements, label_df, info)
+ except Exception as e:
+ msg = 'An error occurred while fetching or analyzing {}.'.format(report.to_dict())
+ e.args = (*e.args, msg, )
+ raise e
+ finally:
+ dart.utils.spinner.spinner_enable = True
diff --git a/dart_fss/fs/fs.py b/dart_fss/fs/fs.py
index 1a2b59c..54573f2 100644
--- a/dart_fss/fs/fs.py
+++ b/dart_fss/fs/fs.py
@@ -1,7 +1,7 @@
import pandas as pd
from pandas import DataFrame
-from typing import Dict
+from typing import Dict, Optional
from dart_fss.utils import dict_to_html, create_folder
@@ -42,7 +42,7 @@ def separator(self, separator):
pd.options.display.float_format = '{:}'.format
self.info['separator'] = separator
- def show(self, tp, show_class: bool = True, show_depth: int = 10, show_concept: bool = True) -> DataFrame:
+ def show(self, tp, show_class: bool = True, show_depth: int = 10, show_concept: bool = True) -> Optional[DataFrame]:
"""
재무제표 정보를 표시해주는 Method
@@ -75,6 +75,8 @@ class 표시 여부
for column in columns:
if column not in class_columns:
ncolumns.append(column)
+ if len(ncolumns) > 0:
+ ncolumns = pd.MultiIndex.from_tuples(ncolumns)
df = df[ncolumns]
else:
drop_rows = []
@@ -88,7 +90,8 @@ class 표시 여부
for column in columns:
if column not in class_columns[show_depth + 1:]:
ncolumns.append(column)
-
+ if len(ncolumns) > 0:
+ ncolumns = pd.MultiIndex.from_tuples(ncolumns)
df = df[ncolumns].drop(drop_rows)
if show_concept is False:
@@ -99,6 +102,8 @@ class 표시 여부
for column in columns:
if column not in concept_colmuns:
ncolumns.append(column)
+ if len(ncolumns) > 0:
+ ncolumns = pd.MultiIndex.from_tuples(ncolumns)
df = df[ncolumns]
return df
diff --git a/dart_fss/tests/test_case/crp_case.py b/dart_fss/tests/test_case/crp_case.py
index 920861b..5d85ead 100644
--- a/dart_fss/tests/test_case/crp_case.py
+++ b/dart_fss/tests/test_case/crp_case.py
@@ -6,11 +6,12 @@
samsung.add_test_value('is', '20091231', 'label_ko', '영업이익(손실)', 10925259000000)
samsung.add_test_value('cis', '20091231', 'label_ko', '총포괄손익', 9098844000000)
samsung.add_test_value('cf', '20091231', 'concept_id', 'dart_CashAndCashEquivalentsAtEndOfPeriodCf', 10149930000000)
+samsung.add_test_value('cf', '20151231', 'concept_id', 'ifrs-full_InterestPaidClassifiedAsOperatingActivities', 748256000000)
# 현대자동차
hyundai = TestCrp(corp_code='00164742', bgn_de='20120101', separate=False, report_tp='annual')
hyundai.add_test_value('bs', '20101231', 'label_ko', '유동자산', 43520154000000)
-hyundai.add_test_value('is', '20101231', 'label_ko', '영업이익', 5918492000000)
+hyundai.add_test_value('is', '20101231', 'label_ko', '영업이익', 5885960000000)
hyundai.add_test_value('cis', '20101231', 'concept_id', 'ifrs-full_ComprehensiveIncome', 6223342000000)
hyundai.add_test_value('cf', '20101231', 'concept_id', 'dart_CashAndCashEquivalentsAtEndOfPeriodCf', 6215815000000)
@@ -31,7 +32,19 @@
sds = TestCrp(corp_code='00126186', bgn_de='20130813', end_de='20150807', separate=False, report_tp='quarter')
sds.add_test_value('bs', '20130630', 'label_ko', '유동자산', 2602291807082)
+# JTC
jtc = TestCrp(corp_code='01041828', bgn_de='20190101', end_de='20200811', separate=False, report_tp='annual')
jtc.add_test_value('cf', '20200229', 'concept_id', 'ifrs-full_CashFlowsFromUsedInOperatingActivities', 4810599061)
-test_crp_list = [samsung, hyundai, dexter, stone, sjgroup, sds]
+# GS리테일
+gs_retail = TestCrp(corp_code='00140177', bgn_de='20110101', separate=False, report_tp='annual')
+gs_retail.add_test_value('cis', '20161231', 'label_ko', '매출원가', 6015117323057)
+gs_retail.add_test_value('cis', '20161231', 'label_ko', '기타손실', 60931373946)
+gs_retail.add_test_value('cis', '20161231', 'label_ko', '판매비와관리비', 1168120874437)
+gs_retail.add_test_value('cis', '20161231', 'label_ko', '금융원가', 48502482146)
+
+# LG화학
+lg_chemical = TestCrp(corp_code='00356361', bgn_de='20180101', end_de='20201231', separate=False, report_tp='quarter')
+lg_chemical.add_test_value('cis', '20180701-20180930', 'concept_id', 'ifrs-full_ProfitLoss', 346600000000 )
+
+test_crp_list = [samsung, hyundai, dexter, stone, sjgroup, sds, jtc, gs_retail, lg_chemical]
diff --git a/dart_fss/utils/__init__.py b/dart_fss/utils/__init__.py
index a224eb3..bf600e6 100644
--- a/dart_fss/utils/__init__.py
+++ b/dart_fss/utils/__init__.py
@@ -5,7 +5,7 @@
from dart_fss.utils.notebook import dict_to_html, is_notebook
from dart_fss.utils.request import get_user_agent, query_to_regex, request
from dart_fss.utils.singleton import Singleton
-from dart_fss.utils.spinner import Spinner
+from dart_fss.utils.spinner import Spinner, spinner_enable
from dart_fss.utils.string import str_compare, str_insert_whitespace, str_unit_to_number_unit, str_upper, get_currency_str
from dart_fss.utils.regex import is_operator, precedence, infix_to_postfix, str_to_regex, str_to_pattern
from dart_fss.utils.dataframe import dataframe_astype
@@ -14,6 +14,6 @@
__all__ = ['cache', 'get_datetime', 'check_datetime', 'unzip', 'xml_to_dict',
'search_file', 'create_folder', 'get_cache_folder', 'dict_to_html',
'is_notebook', 'get_user_agent', 'query_to_regex', 'request',
- 'Singleton', 'Spinner', 'str_compare', 'str_insert_whitespace',
+ 'Singleton', 'Spinner', 'spinner_enable', 'str_compare', 'str_insert_whitespace',
'str_unit_to_number_unit', 'get_currency_str', 'str_upper', 'is_operator', 'precedence',
'infix_to_postfix', 'str_to_regex', 'str_to_pattern', 'dataframe_astype']
\ No newline at end of file
diff --git a/dart_fss/utils/spinner.py b/dart_fss/utils/spinner.py
index ca1d134..a85d07f 100644
--- a/dart_fss/utils/spinner.py
+++ b/dart_fss/utils/spinner.py
@@ -6,24 +6,39 @@
else:
from halo import Halo
+# Global Spinner Control
+spinner_enable = True
-class Spinner:
- """
- Halo 라이브러리를 이용한 Spinner
- """
- def __init__(self, text):
- """ 초기화
- Parameters
- ----------
- text: str
- spinner 사용시 표시할 text
+if spinner_enable:
+ class Spinner:
"""
- self.spinner = Halo(text=text, spinner='dots')
+ Halo 라이브러리를 이용한 Spinner
+ """
+ def __init__(self, text):
+ """ 초기화
+ Parameters
+ ----------
+ text: str
+ spinner 사용시 표시할 text
+ """
+ self.spinner = Halo(text=text, spinner='dots')
+
+ def start(self):
+ """ Spinner Start"""
+ if spinner_enable:
+ self.spinner.start()
+
+ def stop(self):
+ """ Spinner Stop """
+ if spinner_enable:
+ self.spinner.stop()
+else:
+ class Spinner:
+ def __init__(self, text):
+ pass
- def start(self):
- """ Spinner Start"""
- self.spinner.start()
+ def start(self):
+ pass
- def stop(self):
- """ Spinner Stop """
- self.spinner.stop()
\ No newline at end of file
+ def stop(self):
+ pass
\ No newline at end of file
diff --git a/dart_fss/xbrl/dart_xbrl.py b/dart_fss/xbrl/dart_xbrl.py
index 3d64bb7..828ba93 100644
--- a/dart_fss/xbrl/dart_xbrl.py
+++ b/dart_fss/xbrl/dart_xbrl.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import re
+import pandas as pd
from typing import List, Union
from pandas import DataFrame
@@ -124,6 +125,7 @@ def get_period_information(self, lang: str = 'ko') -> DataFrame:
data = df[df.columns[2:]].iloc[3]
data_set = [(key, data[key]) for key in data.keys()]
new_columns = list(df.columns[:2]) + [data[0] for data in sorted(data_set, key=lambda x: x[1], reverse=True)]
+ new_columns = pd.MultiIndex.from_tuples(new_columns)
return df[new_columns]
def get_audit_information(self, lang: str = 'ko') -> DataFrame:
diff --git a/dart_fss/xbrl/xbrl.py b/dart_fss/xbrl/xbrl.py
index 4feac54..a0878f7 100644
--- a/dart_fss/xbrl/xbrl.py
+++ b/dart_fss/xbrl/xbrl.py
@@ -15,17 +15,16 @@ def get_xbrl_from_file(file_path: str) -> DartXbrl:
----------
file_path: str
XBRL 파일 경로
-
Returns
-------
DartXbrl
DartXbrl 클래스
"""
- # PyPI를 통해 설치된 Arelle 라이브러리 사용시 발생하는 오류 수정을 위한코드
from dart_fss.utils.spinner import Spinner
spinner = Spinner('XBRL Loading')
spinner.start()
+ # PyPI를 통해 설치된 Arelle 라이브러리 사용시 발생하는 오류 수정을 위한코드
if sys.platform == 'win32':
pass
elif sys.platform == 'darwin':
@@ -39,5 +38,6 @@ def get_xbrl_from_file(file_path: str) -> DartXbrl:
model_xbrl = Cntlr.Cntlr().modelManager.load(file_path)
filename = file_path.split('\\')[-1]
xbrl = DartXbrl(filename, model_xbrl)
+
spinner.stop()
return xbrl