diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index c885ee6525b086..89f562fdc71a10 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -1,7 +1,7 @@ import json import logging import re -from datetime import datetime, timezone +from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse @@ -208,7 +208,7 @@ def extract_dbt_entities( max_loaded_at_str = sources_by_id.get(key, {}).get("max_loaded_at") max_loaded_at = None if max_loaded_at_str: - max_loaded_at = dateutil.parser.parse(max_loaded_at_str) + max_loaded_at = parse_dbt_timestamp(max_loaded_at_str) test_info = None if manifest_node.get("resource_type") == "test": @@ -284,10 +284,8 @@ def extract_dbt_entities( return dbt_entities -def _parse_dbt_timestamp(timestamp: str) -> datetime: - return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").replace( - tzinfo=timezone.utc - ) +def parse_dbt_timestamp(timestamp: str) -> datetime: + return dateutil.parser.parse(timestamp) class DBTRunTiming(BaseModel): @@ -338,11 +336,9 @@ def _parse_test_result( execution_timestamp = run_result.timing_map.get("execute") if execution_timestamp and execution_timestamp.started_at: - execution_timestamp_parsed = _parse_dbt_timestamp( - execution_timestamp.started_at - ) + execution_timestamp_parsed = parse_dbt_timestamp(execution_timestamp.started_at) else: - execution_timestamp_parsed = _parse_dbt_timestamp(dbt_metadata.generated_at) + execution_timestamp_parsed = parse_dbt_timestamp(dbt_metadata.generated_at) return DBTTestResult( invocation_id=dbt_metadata.invocation_id, @@ -369,8 +365,8 @@ def _parse_model_run( return DBTModelPerformance( run_id=dbt_metadata.invocation_id, status=status, - start_time=_parse_dbt_timestamp(execution_timestamp.started_at), - end_time=_parse_dbt_timestamp(execution_timestamp.completed_at), + start_time=parse_dbt_timestamp(execution_timestamp.started_at), + end_time=parse_dbt_timestamp(execution_timestamp.completed_at), ) diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index b0db18594f76d6..0206f2e280ef2d 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -1,3 +1,4 @@ +from datetime import timedelta from typing import Dict, List, Union from unittest import mock @@ -7,7 +8,11 @@ from datahub.emitter import mce_builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.dbt.dbt_cloud import DBTCloudConfig -from datahub.ingestion.source.dbt.dbt_core import DBTCoreConfig, DBTCoreSource +from datahub.ingestion.source.dbt.dbt_core import ( + DBTCoreConfig, + DBTCoreSource, + parse_dbt_timestamp, +) from datahub.metadata.schema_classes import ( OwnerClass, OwnershipSourceClass, @@ -365,3 +370,21 @@ def test_dbt_cloud_config_with_defined_metadata_endpoint(): config.metadata_endpoint == "https://my-metadata-endpoint.my-dbt-cloud.dbt.com/graphql" ) + + +def test_dbt_time_parsing() -> None: + time_formats = [ + "2024-03-28T05:56:15.236210Z", + "2024-04-04T11:55:28Z", + "2024-04-04T12:55:28Z", + "2024-03-25T00:52:14Z", + ] + + for time_format in time_formats: + # Check that it parses without an error. + timestamp = parse_dbt_timestamp(time_format) + + # Ensure that we get an object with tzinfo set to UTC. + assert timestamp.tzinfo is not None and timestamp.tzinfo.utcoffset( + timestamp + ) == timedelta(0)