Skip to content

Commit

Permalink
fix(ingest/dbt): better dbt timestamp parsing (#10223)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Apr 5, 2024
1 parent c1b489f commit e743478
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
20 changes: 8 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import re
from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse

Expand Down Expand Up @@ -208,7 +208,7 @@ def extract_dbt_entities(
max_loaded_at_str = sources_by_id.get(key, {}).get("max_loaded_at")
max_loaded_at = None
if max_loaded_at_str:
max_loaded_at = dateutil.parser.parse(max_loaded_at_str)
max_loaded_at = parse_dbt_timestamp(max_loaded_at_str)

test_info = None
if manifest_node.get("resource_type") == "test":
Expand Down Expand Up @@ -284,10 +284,8 @@ def extract_dbt_entities(
return dbt_entities


def _parse_dbt_timestamp(timestamp: str) -> datetime:
return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=timezone.utc
)
def parse_dbt_timestamp(timestamp: str) -> datetime:
return dateutil.parser.parse(timestamp)


class DBTRunTiming(BaseModel):
Expand Down Expand Up @@ -338,11 +336,9 @@ def _parse_test_result(

execution_timestamp = run_result.timing_map.get("execute")
if execution_timestamp and execution_timestamp.started_at:
execution_timestamp_parsed = _parse_dbt_timestamp(
execution_timestamp.started_at
)
execution_timestamp_parsed = parse_dbt_timestamp(execution_timestamp.started_at)
else:
execution_timestamp_parsed = _parse_dbt_timestamp(dbt_metadata.generated_at)
execution_timestamp_parsed = parse_dbt_timestamp(dbt_metadata.generated_at)

return DBTTestResult(
invocation_id=dbt_metadata.invocation_id,
Expand All @@ -369,8 +365,8 @@ def _parse_model_run(
return DBTModelPerformance(
run_id=dbt_metadata.invocation_id,
status=status,
start_time=_parse_dbt_timestamp(execution_timestamp.started_at),
end_time=_parse_dbt_timestamp(execution_timestamp.completed_at),
start_time=parse_dbt_timestamp(execution_timestamp.started_at),
end_time=parse_dbt_timestamp(execution_timestamp.completed_at),
)


Expand Down
25 changes: 24 additions & 1 deletion metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import Dict, List, Union
from unittest import mock

Expand All @@ -7,7 +8,11 @@
from datahub.emitter import mce_builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.dbt.dbt_cloud import DBTCloudConfig
from datahub.ingestion.source.dbt.dbt_core import DBTCoreConfig, DBTCoreSource
from datahub.ingestion.source.dbt.dbt_core import (
DBTCoreConfig,
DBTCoreSource,
parse_dbt_timestamp,
)
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipSourceClass,
Expand Down Expand Up @@ -365,3 +370,21 @@ def test_dbt_cloud_config_with_defined_metadata_endpoint():
config.metadata_endpoint
== "https://my-metadata-endpoint.my-dbt-cloud.dbt.com/graphql"
)


def test_dbt_time_parsing() -> None:
time_formats = [
"2024-03-28T05:56:15.236210Z",
"2024-04-04T11:55:28Z",
"2024-04-04T12:55:28Z",
"2024-03-25T00:52:14Z",
]

for time_format in time_formats:
# Check that it parses without an error.
timestamp = parse_dbt_timestamp(time_format)

# Ensure that we get an object with tzinfo set to UTC.
assert timestamp.tzinfo is not None and timestamp.tzinfo.utcoffset(
timestamp
) == timedelta(0)

0 comments on commit e743478

Please sign in to comment.