Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/glue): Add additional checks and logging when specifying catalog_id #12168

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@

logger = logging.getLogger(__name__)


DEFAULT_PLATFORM = "glue"
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]

Expand Down Expand Up @@ -668,6 +667,7 @@ def get_datajob_wu(self, node: Dict[str, Any], job_name: str) -> MetadataWorkUni
return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce)

def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
logger.debug("Getting all databases")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetDatabases.html
paginator = self.glue_client.get_paginator("get_databases")

Expand All @@ -684,10 +684,18 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
pattern += "[?!TargetDatabase]"

for database in paginator_response.search(pattern):
if self.source_config.database_pattern.allowed(database["Name"]):
yield database
if not self.source_config.database_pattern.allowed(database["Name"]):
continue
if (
self.source_config.catalog_id
and database.get("CatalogId")
and database.get("CatalogId") != self.source_config.catalog_id
):
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this issue has occurred in the past, we might consider adding a log: skipping because of non matching catalog id

yield database

def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]:
logger.debug(f"Getting tables from database {database['Name']}")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html
paginator = self.glue_client.get_paginator("get_tables")
database_name = database["Name"]
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/glue/glue_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
36 changes: 32 additions & 4 deletions metadata-ingestion/tests/unit/glue/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
validate_all_providers_have_committed_successfully,
)
from tests.unit.glue.test_glue_source_stubs import (
databases_1,
databases_2,
empty_database,
flights_database,
get_bucket_tagging,
get_databases_delta_response,
get_databases_response,
Expand Down Expand Up @@ -64,6 +64,7 @@
tables_2,
tables_profiling_1,
target_database_tables,
test_database,
)

FROZEN_TIME = "2020-04-14 07:00:00"
Expand Down Expand Up @@ -310,6 +311,33 @@ def test_config_without_platform():
assert source.platform == "glue"


def test_get_databases_filters_by_catalog():
def format_databases(databases):
return set(d["Name"] for d in databases)

all_catalogs_source: GlueSource = GlueSource(
config=GlueSourceConfig(), ctx=PipelineContext(run_id="glue-source-test")
)
with Stubber(all_catalogs_source.glue_client) as glue_stubber:
glue_stubber.add_response("get_databases", get_databases_response, {})

expected = format_databases([flights_database, test_database, empty_database])
assert format_databases(all_catalogs_source.get_all_databases()) == expected

catalog_id = "123412341234"
single_catalog_source = GlueSource(
config=GlueSourceConfig(catalog_id=catalog_id),
ctx=PipelineContext(run_id="glue-source-test"),
)
with Stubber(single_catalog_source.glue_client) as glue_stubber:
glue_stubber.add_response(
"get_databases", get_databases_response, {"CatalogId": catalog_id}
)

expected = format_databases([flights_database, test_database])
assert format_databases(single_catalog_source.get_all_databases()) == expected


@freeze_time(FROZEN_TIME)
def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
deleted_actor_golden_mcs = "{}/glue_deleted_actor_mces_golden.json".format(
Expand Down Expand Up @@ -357,8 +385,8 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
tables_on_first_call = tables_1
tables_on_second_call = tables_2
mock_get_all_databases_and_tables.side_effect = [
(databases_1, tables_on_first_call),
(databases_2, tables_on_second_call),
([flights_database], tables_on_first_call),
([test_database], tables_on_second_call),
]

pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
Expand Down
8 changes: 5 additions & 3 deletions metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@
"Permissions": ["ALL"],
}
],
"CatalogId": "123412341234",
"CatalogId": "000000000000",
},
]
}
databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}]
databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}]
flights_database = {"Name": "flights-database", "CatalogId": "123412341234"}
test_database = {"Name": "test-database", "CatalogId": "123412341234"}
empty_database = {"Name": "empty-database", "CatalogId": "000000000000"}

tables_1 = [
{
"Name": "avro",
Expand Down
Loading