Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(deletes): add run commands (list, show, rollback) to datahub ingest #2960

Merged
merged 59 commits into from
Jul 30, 2021

Conversation

gabe-lyons
Copy link
Contributor

Adds three new commands to datahub ingest:

datahub ingest list-runs - lists recent runs ingested to datahub
datahub ingest show-run - describes an individual ingestion run
datahub ingest rollback-run - reverts the changes made during an ingestion run

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable)

@@ -0,0 +1,1974 @@
[
{
"auditHeader": null,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this and the original bootstrap file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was for testing-- will remove.

click.echo(f"Received error, please check your {CONDENSED_DATAHUB_CONFIG_PATH}")
click.echo()
click.echo(response_json)
exit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is awfully repetitive

gms_config = config.get("gms")
if not isinstance(gms_config, dict) or gms_config.get("server") is None:
print_datahub_env_format_guide()
exit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use pydantic for all of this validation logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool- will look into that!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 👍

from datahub.metadata.schema_classes import UsageAggregationClass


def current_milli_time():
return round(t.time() * 1000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mce_builder.py get_sys_time does essentially the same thing - could we just use that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!


system_metadata = SystemMetadata(
lastObserved=current_milli_time(), runId=self.ctx.run_id
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: the extractor runs before the transformers, and so any MCEs created by a transformer won't get the system metadata :(

This isn't an immediate issue since our current transformers only modify MCEs and don't create any, but it will definitely come up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok- good tip. Any suggestions on how to best circumvent this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to split the extractor up into (1) extracting RecordEnvelopes from work units and (2) running validation + checks. (2) can probably be implemented as a transformer that always runs

lastObserved=current_milli_time(), runId=self.ctx.run_id
)

workunit.mce.systemMetadata = system_metadata
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also note: the run_id config param of the pipeline will eventually need to be changed to run_id_prefix



@freeze_time(FROZEN_TIME)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im surprised that this one needed a freeze_time given that it already had mock_time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me too- this was definitely necessary though.

builder.name("ingest");
builder.addParam(entityFieldDef, entity);
SystemMetadata generatedSystemMetadata = new SystemMetadata();
generatedSystemMetadata.setLastObserved(System.currentTimeMillis());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious - why isn't GMS itself adding this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Seems like clients could mess this up if they all need to remember to set it)


if (systemMetadata == null) {
SystemMetadata generatedSystemMetadata = new SystemMetadata();
generatedSystemMetadata.setLastObserved(System.currentTimeMillis());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh got it - so client can optionally overwrite this. It still may make sense to keep last observed consistent by overriding this on the GMS side.. thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call- i already have logic to handle system metadata not being set in gms. Updating so the client sends a null system metadata if it doesn't receive one.

return null;
});
}

@Action(name = ACTION_BATCH_INGEST)
@Nonnull
public Task<Void> batchIngest(@ActionParam(PARAM_ENTITIES) @Nonnull Entity[] entities) throws URISyntaxException {
public Task<Void> batchIngest(
@ActionParam(PARAM_ENTITIES) @Nonnull Entity[] entities,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate that the list of entities is the same size as the list of system metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if systemMetadataList is null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call- I'll generate system metadata if not provided

@@ -198,6 +219,30 @@
return RestliUtils.toTask(() -> new StringArray(_searchService.getBrowsePaths(urnToEntityName(urn), urn)));
}

/*
Used to enable writes in GMS after data migration is complete
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misplaced comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

return RestliUtils.toTask(() -> {
RollbackResponse response = new RollbackResponse();

EbeanEntityService.RollbackRunResult result = _entityService.rollbackUrn(urn);
Copy link
Collaborator

@jjoyce0510 jjoyce0510 Jul 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just hard deleting this urn? If so, why not call it "deleteEntity"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I like that, will update


response.setAspectsAffected(deletedRows.size());
response.setEntitiesAffected(deletedRows.stream().filter(row -> row.isKeyAspect()).count());
response.setAspectRowSummaries(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

datahub delete --urn "<my urn>"
```

_Note: make sure you surround your urn with quotes! If you do not include the quotes, your terminal my misinterpret the command._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo s/my/may

Copy link
Contributor

@shirshanka shirshanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shirshanka shirshanka merged commit aa253f5 into datahub-project:master Jul 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants