Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): Do not require platform_instance for stateful ingestion #7397

Merged

Conversation

asikowitz
Copy link
Collaborator

@asikowitz asikowitz commented Feb 21, 2023

Following the changes from #6795, I believe that get_platform_instance_id can now return None. See its only usage:

In StatefulIngestionSourceBase._get_last_checkpoint

                try:
                    platform_instance_id = self.get_platform_instance_id()
                except NotImplementedError:
                    pass
                else:
                    last_checkpoint_aspect = self.ingestion_checkpointing_state_provider.get_latest_checkpoint(  # type: ignore
                        pipeline_name=self.ctx.pipeline_name,
                        job_name=job_id,
                        platform_instance_id=platform_instance_id,
                    )

Note that in that PR, get_latest_checkpoint was configured to accept an optional platform_instance_id (and also note that in the not implemented case, None is implicitly passed?)

Since we still want to keep implementations of get_platform_instance_id for backwards compatibility, I allow it to return None for when you sometimes have a platform_instance, and other times don't, like with Kafka.
Also removes any validation around null platform_instance that I could find.

EDIT: Tested the following recipe

source:
  type: kafka
  config:
    connection: 
      bootstrap: localhost:9092
      schema_registry_url: http://localhost:8081/

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Feb 21, 2023
@@ -156,7 +156,7 @@ def create(cls, config_dict, ctx):
config = UnityCatalogSourceConfig.parse_obj(config_dict)
return cls(ctx=ctx, config=config)

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes aren't strictly necessary, since mypy allows an override method to narrow the return type

@asikowitz asikowitz merged commit 2764c44 into datahub-project:master Feb 22, 2023
@asikowitz asikowitz deleted the platform-instance-optional branch February 22, 2023 02:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants