Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): Minor cleanup of File, CsvEnricher, BusinessGlossary, and FileLineage sources #7718

Merged
merged 7 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 22 additions & 37 deletions metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
OwnershipTypeClass,
TagAssociationClass,
)
from datahub.utilities.source_helpers import auto_workunit_reporter
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn

Expand Down Expand Up @@ -102,6 +103,10 @@ class CSVEnricherSource(Source):
be applied at the entity field. If a subresource IS populated (as it is for the second and third rows), glossary
terms and tags will be applied on the subresource. Every row MUST have a resource. Also note that owners can only
be applied at the resource level and will be ignored if populated for a row with a subresource.

:::note
This source will not work on very large csv files that do not fit in memory.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need a trailing ::: as well

:::
"""

def __init__(self, config: CSVEnricherConfig, ctx: PipelineContext):
Expand Down Expand Up @@ -305,7 +310,6 @@ def get_resource_workunits(
)
if maybe_terms_wu:
self.report.num_glossary_term_workunits_produced += 1
self.report.report_workunit(maybe_terms_wu)
yield maybe_terms_wu

maybe_tags_wu: Optional[MetadataWorkUnit] = self.get_resource_tags_work_unit(
Expand All @@ -314,7 +318,6 @@ def get_resource_workunits(
)
if maybe_tags_wu:
self.report.num_tag_workunits_produced += 1
self.report.report_workunit(maybe_tags_wu)
yield maybe_tags_wu

maybe_owners_wu: Optional[
Expand All @@ -325,7 +328,6 @@ def get_resource_workunits(
)
if maybe_owners_wu:
self.report.num_owners_workunits_produced += 1
self.report.report_workunit(maybe_owners_wu)
yield maybe_owners_wu

maybe_domain_wu: Optional[
Expand All @@ -336,7 +338,6 @@ def get_resource_workunits(
)
if maybe_domain_wu:
self.report.num_domain_workunits_produced += 1
self.report.report_workunit(maybe_domain_wu)
yield maybe_domain_wu

maybe_description_wu: Optional[
Expand All @@ -347,7 +348,6 @@ def get_resource_workunits(
)
if maybe_description_wu:
self.report.num_description_workunits_produced += 1
self.report.report_workunit(maybe_description_wu)
yield maybe_description_wu

def process_sub_resource_row(
Expand Down Expand Up @@ -469,9 +469,7 @@ def get_sub_resource_work_units(self) -> Iterable[MetadataWorkUnit]:
needs_write = True

# Iterate over each sub resource row
for sub_resource_row in self.editable_schema_metadata_map[
entity_urn
]: # type: SubResourceRow
for sub_resource_row in self.editable_schema_metadata_map[entity_urn]:
(
current_editable_schema_metadata,
needs_write,
Expand All @@ -481,6 +479,7 @@ def get_sub_resource_work_units(self) -> Iterable[MetadataWorkUnit]:

# Write an MCPW if needed.
if needs_write:
self.report.num_editable_schema_metadata_workunits_produced += 1
yield MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=current_editable_schema_metadata,
Expand Down Expand Up @@ -538,100 +537,86 @@ def maybe_extract_owners(
return owners

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# As per https://stackoverflow.com/a/49150749/5004662, we want to use
# the 'utf-8-sig' encoding to handle any BOM character that may be
# present in the file. Excel is known to add a BOM to CSV files.
# As per https://stackoverflow.com/a/63508823/5004662,
# this is also safe with normal files that don't have a BOM.
parsed_location = parse.urlparse(self.config.filename)
keep_rows = []
if parsed_location.scheme in ("file", ""):
with open(
pathlib.Path(self.config.filename), mode="r", encoding="utf-8-sig"
) as f:
rows = csv.DictReader(f, delimiter=self.config.delimiter)
keep_rows = [row for row in rows]
rows = list(csv.DictReader(f, delimiter=self.config.delimiter))
else:
try:
resp = requests.get(self.config.filename)
decoded_content = resp.content.decode("utf-8-sig")
rows = csv.DictReader(
decoded_content.splitlines(), delimiter=self.config.delimiter
rows = list(
csv.DictReader(
decoded_content.splitlines(), delimiter=self.config.delimiter
)
)
keep_rows = [row for row in rows]
except Exception as e:
raise ConfigurationError(
f"Cannot read remote file {self.config.filename}, error:{e}"
)

for row in keep_rows:
for row in rows:
# We need the resource to move forward
if not row["resource"]:
continue

is_resource_row: bool = not row["subresource"]

entity_urn = row["resource"]
entity_type = Urn.create_from_string(row["resource"]).get_type()

term_associations: List[
GlossaryTermAssociationClass
] = self.maybe_extract_glossary_terms(row)

tag_associations: List[TagAssociationClass] = self.maybe_extract_tags(row)

owners: List[OwnerClass] = self.maybe_extract_owners(row, is_resource_row)

domain: Optional[str] = (
row["domain"]
if row["domain"] and entity_type == DATASET_ENTITY_TYPE
else None
)

description: Optional[str] = (
row["description"]
if row["description"] and entity_type == DATASET_ENTITY_TYPE
else None
)

if is_resource_row:
for wu in self.get_resource_workunits(
yield from self.get_resource_workunits(
entity_urn=entity_urn,
term_associations=term_associations,
tag_associations=tag_associations,
owners=owners,
domain=domain,
description=description,
):
yield wu

# If this row is not applying changes at the resource level, modify the EditableSchemaMetadata map.
else:
)
elif entity_type == DATASET_ENTITY_TYPE:
# Only dataset sub-resources are currently supported.
if entity_type != DATASET_ENTITY_TYPE:
continue

field_path = row["subresource"]
if entity_urn not in self.editable_schema_metadata_map:
self.editable_schema_metadata_map[entity_urn] = []
# Add the row to the map from entity (dataset) to SubResource rows. We cannot emit work units for
# EditableSchemaMetadata until we parse the whole CSV due to read-modify-write issues.
self.editable_schema_metadata_map.setdefault(entity_urn, [])
self.editable_schema_metadata_map[entity_urn].append(
SubResourceRow(
entity_urn=entity_urn,
field_path=field_path,
field_path=row["subresource"],
term_associations=term_associations,
tag_associations=tag_associations,
description=description,
domain=domain,
)
)

# Yield sub resource work units once the map has been fully populated.
for wu in self.get_sub_resource_work_units():
self.report.num_editable_schema_metadata_workunits_produced += 1
self.report.report_workunit(wu)
yield wu
yield from self.get_sub_resource_work_units()

def get_report(self):
return self.report
55 changes: 28 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os.path
import pathlib
from collections import defaultdict
from dataclasses import dataclass, field
from enum import auto
from io import BufferedReader
Expand All @@ -15,6 +16,7 @@
from pydantic.fields import Field

from datahub.configuration.common import ConfigEnum, ConfigModel, ConfigurationError
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
Expand All @@ -36,6 +38,7 @@
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import UsageAggregationClass
from datahub.utilities.source_helpers import auto_workunit_reporter

logger = logging.getLogger(__name__)

Expand All @@ -47,11 +50,12 @@ class FileReadMode(ConfigEnum):


class FileSourceConfig(ConfigModel):
filename: Optional[str] = Field(
None, description="[deprecated in favor of `path`] The file to ingest."
_filename = pydantic_field_deprecated(
"filename",
message="filename is deprecated. Use path instead.",
)
path: Union[str, pathlib.Path] = Field(
description="Path to folder or file to ingest. If pointed to a folder, all files with extension {file_extension} (default json) within that folder will be processed. This can also be in the form of a URL containing a single file"
path: str = Field(
description="File path to folder or file to ingest, or URL to a remote file. If pointed to a folder, all files with extension {file_extension} (default json) within that folder will be processed."
)
file_extension: str = Field(
".json",
Expand Down Expand Up @@ -102,8 +106,8 @@ class FileSourceReport(SourceReport):
total_parse_time_in_seconds: float = 0
total_count_time_in_seconds: float = 0
total_deserialize_time_in_seconds: float = 0
aspect_counts: Dict[str, int] = field(default_factory=dict)
entity_type_counts: Dict[str, int] = field(default_factory=dict)
aspect_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
entity_type_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))

def add_deserialize_time(self, delta: datetime.timedelta) -> None:
self.total_deserialize_time_in_seconds += round(delta.total_seconds(), 2)
Expand Down Expand Up @@ -182,58 +186,55 @@ def create(cls, config_dict, ctx):
def get_filenames(self) -> Iterable[str]:
path_parsed = parse.urlparse(str(self.config.path))
if path_parsed.scheme in ("file", ""):
self.config.path = pathlib.Path(self.config.path)
if self.config.path.is_file():
path = pathlib.Path(self.config.path)
if path.is_file():
self.report.total_num_files = 1
return [str(self.config.path)]
elif self.config.path.is_dir():
return [str(path)]
elif path.is_dir():
files_and_stats = [
(str(x), os.path.getsize(x))
for x in list(
self.config.path.glob(f"*{self.config.file_extension}")
)
for x in path.glob(f"*{self.config.file_extension}")
if x.is_file()
]
self.report.total_num_files = len(files_and_stats)
self.report.total_bytes_on_disk = sum([y for (x, y) in files_and_stats])
return [x for (x, y) in files_and_stats]
else:
raise Exception(f"Failed to process {path}")
else:
self.report.total_num_files = 1
return [str(self.config.path)]
raise Exception(f"Failed to process {self.config.path}")

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(
self,
) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
for f in self.get_filenames():
for i, obj in self.iterate_generic_file(f):
id = f"file://{f}:{i}"
wu: Union[MetadataWorkUnit, UsageStatsWorkUnit]
if isinstance(obj, UsageAggregationClass):
wu = UsageStatsWorkUnit(id, obj)
yield UsageStatsWorkUnit(id, obj)
elif isinstance(
obj, (MetadataChangeProposalWrapper, MetadataChangeProposal)
):
self.report.entity_type_counts[obj.entityType] = (
self.report.entity_type_counts.get(obj.entityType, 0) + 1
)
self.report.entity_type_counts[obj.entityType] += 1
if obj.aspectName is not None:
cur_aspect_name = str(obj.aspectName)
self.report.aspect_counts[cur_aspect_name] = (
self.report.aspect_counts.get(cur_aspect_name, 0) + 1
)
self.report.aspect_counts[cur_aspect_name] += 1
if (
self.config.aspect is not None
and cur_aspect_name != self.config.aspect
):
continue

if isinstance(obj, MetadataChangeProposalWrapper):
wu = MetadataWorkUnit(id, mcp=obj)
yield MetadataWorkUnit(id, mcp=obj)
else:
wu = MetadataWorkUnit(id, mcp_raw=obj)
yield MetadataWorkUnit(id, mcp_raw=obj)
else:
wu = MetadataWorkUnit(id, mce=obj)
self.report.report_workunit(wu)
yield wu
yield MetadataWorkUnit(id, mce=obj)

def get_report(self):
return self.report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.source_helpers import auto_workunit_reporter
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,7 +85,7 @@ class DefaultConfig(ConfigModel):

class BusinessGlossarySourceConfig(ConfigModel):
file: Union[str, pathlib.Path] = Field(
description="Path to business glossary file to ingest. This can be in the form of a URL or local file YAML."
description="File path or URL to business glossary file to ingest."
)
enable_auto_id: bool = Field(
description="Generate id field from GlossaryNode and GlossaryTerm's name field",
Expand Down Expand Up @@ -479,21 +480,20 @@ def load_glossary_config(
return glossary_cfg

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(
self,
) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
glossary_config = self.load_glossary_config(self.config.file)
populate_path_vs_id(glossary_config)
for event in get_mces(
glossary_config, ingestion_config=self.config, ctx=self.ctx
):
if isinstance(event, models.MetadataChangeEventClass):
wu = MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
self.report.report_workunit(wu)
yield wu
yield MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
elif isinstance(event, MetadataChangeProposalWrapper):
wu = MetadataWorkUnit(
id=f"{event.entityType}-{event.aspectName}-{event.entityUrn}",
mcp=event,
)
yield wu
yield event.as_workunit()

def get_report(self):
return self.report
Loading