forked from datahub-project/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(ingest/stateful): remove
IngestionJobStateProvider
(datahu…
- Loading branch information
1 parent
c214182
commit 0b6ff84
Showing
13 changed files
with
82 additions
and
159 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
79 changes: 44 additions & 35 deletions
79
metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,64 +1,73 @@ | ||
from abc import abstractmethod | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, List, Optional | ||
from typing import Any, Dict, List, NewType, Type, TypeVar | ||
|
||
from datahub.ingestion.api.committable import CommitPolicy | ||
import datahub.emitter.mce_builder as builder | ||
from datahub.configuration.common import ConfigModel | ||
from datahub.ingestion.api.committable import CommitPolicy, StatefulCommittable | ||
from datahub.ingestion.api.common import PipelineContext | ||
from datahub.ingestion.api.ingestion_job_state_provider import ( | ||
IngestionJobStateProvider, | ||
IngestionJobStateProviderConfig, | ||
JobId, | ||
JobStateFilterType, | ||
JobStateKey, | ||
JobStatesMap, | ||
) | ||
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass | ||
|
||
# | ||
# Common type exports | ||
# | ||
JobId = JobId | ||
JobStateKey = JobStateKey | ||
JobStateFilterType = JobStateFilterType | ||
|
||
# | ||
# Checkpoint state specific types | ||
# | ||
JobId = NewType("JobId", str) | ||
CheckpointJobStateType = DatahubIngestionCheckpointClass | ||
CheckpointJobStatesMap = JobStatesMap[CheckpointJobStateType] | ||
CheckpointJobStatesMap = Dict[JobId, CheckpointJobStateType] | ||
|
||
|
||
@dataclass | ||
class JobStateKey: | ||
pipeline_name: str | ||
platform_instance_id: str | ||
job_names: List[JobId] | ||
|
||
class IngestionCheckpointingProviderConfig(IngestionJobStateProviderConfig): | ||
|
||
class IngestionCheckpointingProviderConfig(ConfigModel): | ||
pass | ||
|
||
|
||
_Self = TypeVar("_Self", bound="IngestionCheckpointingProviderBase") | ||
|
||
|
||
@dataclass() | ||
class IngestionCheckpointingProviderBase( | ||
IngestionJobStateProvider[CheckpointJobStateType] | ||
StatefulCommittable[JobStateKey, CheckpointJobStatesMap] | ||
): | ||
""" | ||
The base class(non-abstract) for all checkpointing state provider implementations. | ||
This class is implemented this way as a concrete class is needed to work with the registry, | ||
but we don't want to implement any of the functionality yet. | ||
The base class for all checkpointing state provider implementations. | ||
""" | ||
|
||
def __init__( | ||
self, name: str, commit_policy: CommitPolicy = CommitPolicy.ON_NO_ERRORS | ||
): | ||
super(IngestionCheckpointingProviderBase, self).__init__(name, commit_policy) | ||
# Set the initial state to an empty dict. | ||
super().__init__(name, commit_policy, {}) | ||
|
||
@classmethod | ||
def create( | ||
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str | ||
) -> "IngestionJobStateProvider": | ||
cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext, name: str | ||
) -> "_Self": | ||
raise NotImplementedError("Sub-classes must override this method.") | ||
|
||
def get_previous_states( | ||
@abstractmethod | ||
def get_last_state( | ||
self, | ||
state_key: JobStateKey, | ||
last_only: bool = True, | ||
filter_opt: Optional[JobStateFilterType] = None, | ||
) -> List[CheckpointJobStatesMap]: | ||
raise NotImplementedError("Sub-classes must override this method.") | ||
) -> CheckpointJobStatesMap: | ||
... | ||
|
||
@abstractmethod | ||
def commit(self) -> None: | ||
raise NotImplementedError("Sub-classes must override this method.") | ||
... | ||
|
||
@staticmethod | ||
def get_data_job_urn( | ||
orchestrator: str, | ||
pipeline_name: str, | ||
job_name: JobId, | ||
platform_instance_id: str, | ||
) -> str: | ||
""" | ||
Standardizes datajob urn minting for all ingestion job state providers. | ||
""" | ||
return builder.make_data_job_urn( | ||
orchestrator, f"{pipeline_name}_{platform_instance_id}", job_name | ||
) |
68 changes: 0 additions & 68 deletions
68
metadata-ingestion/src/datahub/ingestion/api/ingestion_job_state_provider.py
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,15 @@ | ||
from _pytest.config import Config as PytestConfig # noqa: F401 | ||
from typing import Optional, TypeVar | ||
|
||
# The current PytestConfig solution is somewhat ugly and not ideal. | ||
# However, it is currently the best solution available, as the type itself is not | ||
# exported: https://docs.pytest.org/en/stable/reference.html#config. | ||
# As pytest's type support improves, this will likely change. | ||
# TODO: revisit pytestconfig as https://github.com/pytest-dev/pytest/issues/7469 progresses. | ||
from _pytest.config import Config as PytestConfig # noqa: F401 | ||
|
||
_T = TypeVar("_T") | ||
|
||
|
||
def assert_not_null(value: Optional[_T]) -> _T: | ||
assert value is not None, "value is unexpectedly None" | ||
return value |
Oops, something went wrong.