Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): clarify s3/s3a requirements and platform defaults #4263

Merged
merged 4 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metadata-ingestion/source_docs/data_lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This source is in **Beta** and under active development. Not yet considered read

## Setup

To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see [compatibility](#compatibility) for more details).
To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see [compatibility](#compatibility) for more details). If profiling, make sure that permissions for **s3a://** access are set because Spark and Hadoop use the s3a:// protocol to interface with AWS (schema inference outside of profiling requires s3:// access).

The data lake connector extracts schemas and profiles from a variety of file formats (see below for an exhaustive list).
Individual files are ingested as tables, and profiles are computed similar to the [SQL profiler](./sql_profiles.md).
Expand Down Expand Up @@ -94,7 +94,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ---------------------------------------------------- | ------------------------ | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `env` | | `PROD` | Environment to use in namespace when constructing URNs. |
| `platform` | | | Platform to use in namespace when constructing URNs. |
| `platform` | | Autodetected | Platform to use in namespace when constructing URNs. If left blank, local paths will correspond to `file` and S3 paths will correspond to `s3`. |
| `base_path` | ✅ | | Path of the base folder to crawl. Unless `schema_patterns` and `profile_patterns` are set, the connector will ingest all files in this folder. |
| `path_spec` | | | Format string for constructing table identifiers from the relative path. See the above [setup section](#setup) for details. |
| `use_relative_path` | | `False` | Whether to use the relative path when constructing URNs. Has no effect when a `path_spec` is provided. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from datetime import datetime
from enum import Enum
from math import log10
from typing import Any, Dict, Iterable, List, Optional

Expand Down Expand Up @@ -246,7 +247,10 @@ def create(cls, config_dict, ctx):

return cls(config, ctx)

def read_file_spark(self, file: str) -> Optional[DataFrame]:
def read_file_spark(self, file: str, is_aws: bool) -> Optional[DataFrame]:

if is_aws:
file = f"s3a://{file}"
Copy link
Collaborator

@jjoyce0510 jjoyce0510 Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: s3://{file}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is the "a" intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a typo—Spark uses the s3a protocol rather than s3 (see https://spark.apache.org/docs/latest/cloud-integration.html)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay got it.. Thanks !


extension = os.path.splitext(file)[1]

Expand Down Expand Up @@ -297,7 +301,7 @@ def read_file_spark(self, file: str) -> Optional[DataFrame]:
return df.toDF(*(c.replace(".", "_") for c in df.columns))

def get_table_schema(
self, file_path: str, table_name: str
self, file_path: str, table_name: str, is_aws: bool
) -> Iterable[MetadataWorkUnit]:

data_platform_urn = make_data_platform_urn(self.source_config.platform)
Expand All @@ -307,10 +311,6 @@ def get_table_schema(

dataset_name = os.path.basename(file_path)

# if no path spec is provided and the file is in S3, then use the S3 path to construct an URN
if is_s3_uri(file_path) and self.source_config.path_spec is None:
dataset_urn = make_s3_urn(file_path, self.source_config.env)

dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[],
Expand All @@ -322,13 +322,15 @@ def get_table_schema(
)
dataset_snapshot.aspects.append(dataset_properties)

if file_path.startswith("s3a://"):
if is_aws:
if self.source_config.aws_config is None:
raise ValueError("AWS config is required for S3 file sources")

s3_client = self.source_config.aws_config.get_s3_client()

file = smart_open(file_path, "rb", transport_params={"client": s3_client})
file = smart_open(
f"s3://{file_path}", "rb", transport_params={"client": s3_client}
)

else:

Expand Down Expand Up @@ -416,7 +418,7 @@ def warn():
return ".".join(name_components)

def ingest_table(
self, full_path: str, relative_path: str
self, full_path: str, relative_path: str, is_aws: bool
) -> Iterable[MetadataWorkUnit]:

table_name = self.get_table_name(relative_path, full_path)
Expand All @@ -425,14 +427,14 @@ def ingest_table(
logger.debug(
f"Ingesting {full_path}: making table schemas {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}"
)
yield from self.get_table_schema(full_path, table_name)
yield from self.get_table_schema(full_path, table_name, is_aws)

# If profiling is not enabled, skip the rest
if not self.source_config.profiling.enabled:
return

# read in the whole table with Spark for profiling
table = self.read_file_spark(full_path)
table = self.read_file_spark(full_path, is_aws)

# if table is not readable, skip
if table is None:
Expand Down Expand Up @@ -513,7 +515,7 @@ def get_workunits_s3(self) -> Iterable[MetadataWorkUnit]:
s3 = self.source_config.aws_config.get_s3_resource()
bucket = s3.Bucket(plain_base_path.split("/")[0])

unordered_files = []
base_obj_paths = []

for obj in bucket.objects.filter(
Prefix=plain_base_path.split("/", maxsplit=1)[1]
Expand All @@ -534,16 +536,16 @@ def get_workunits_s3(self) -> Iterable[MetadataWorkUnit]:
if self.source_config.ignore_dotfiles and file.startswith("."):
continue

obj_path = f"s3a://{obj.bucket_name}/{obj.key}"
base_obj_path = f"{obj.bucket_name}/{obj.key}"

unordered_files.append(obj_path)
base_obj_paths.append(base_obj_path)

for aws_file in sorted(unordered_files):
for aws_file in sorted(base_obj_paths):

relative_path = "./" + aws_file[len(f"s3a://{plain_base_path}") :]
relative_path = "./" + aws_file[len(plain_base_path) :]

# pass in the same relative_path as the full_path for S3 files
yield from self.ingest_table(aws_file, relative_path)
yield from self.ingest_table(aws_file, relative_path, is_aws=True)

def get_workunits_local(self) -> Iterable[MetadataWorkUnit]:
for root, dirs, files in os.walk(self.source_config.base_path):
Expand All @@ -562,7 +564,7 @@ def get_workunits_local(self) -> Iterable[MetadataWorkUnit]:
if not self.source_config.schema_patterns.allowed(full_path):
continue

yield from self.ingest_table(full_path, relative_path)
yield from self.ingest_table(full_path, relative_path, is_aws=False)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.data_lake.profiling import DataLakeProfilerConfig


class DataLakeSourceConfig(ConfigModel):

env: str = DEFAULT_ENV
platform: str
base_path: str
platform: str = "" # overwritten by validator below

use_relative_path: bool = False
ignore_dotfiles: bool = True
Expand All @@ -40,6 +41,15 @@ def ensure_profiling_pattern_is_passed_to_profiling(
profiling.allow_deny_patterns = values["profile_patterns"]
return values

@pydantic.validator("platform", always=True)
def validate_platform(cls, value: str, values: Dict[str, Any]) -> Optional[str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is "values" coming from here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the rest of the config values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

values are the previously-validated fields here since this is a Pydantic validator — see https://pydantic-docs.helpmanual.io/usage/validators/

if value != "":
return value

if is_s3_uri(values["base_path"]):
return "s3"
return "file"

@pydantic.validator("path_spec", always=True)
def validate_path_spec(
cls, value: Optional[str], values: Dict[str, Any]
Expand Down