Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1707] Enhance IcebergDataset to detect when files already at dest then proceed with only delta #3575

Merged

Conversation

phet
Copy link
Contributor

@phet phet commented Oct 1, 2022

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):

Enhance IcebergDataset to detect when files already at dest then proceed with only delta

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

unit tests included

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

* upstream/master: (124 commits)
  [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs apache#3552
  fix helix job wait completion bug when job goes to STOPPING state (apache#3556)
  [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (apache#3551)
  [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (apache#3554)
  [GOBBLIN-1700] Remove unused coveralls-gradle-plugin dependency
  add MysqlUserQuotaManager (apache#3545)
  [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (apache#3544)
  Add GMCE topic explicitly to hive commit event (apache#3547)
  [GOBBLIN-1678] Refactor git flowgraph component to be extensible (apache#3536)
  [GOBBLIN-1690] Added logging to ORC writer
  Allow all iceberg exceptions to be fault tolerant (apache#3541)
  Guard against exists fs call as well (apache#3538)
  Add error handling for timeaware finder to handle scenarios where fil… (apache#3537)
  [GOBBLIN-1675] Add pagination for GaaS on server side (apache#3533)
  [GOBBLIN-1672] Refactor metrics from DagManager into its own class, add metrics per … (apache#3532)
  [GOBBLIN-1677] Fix timezone property to read from key correctly (apache#3535)
  [Gobblin-931] Fix typo in gobblin CLI usage (apache#3530)
  [GOBBLIN-1671] : Fix gobblin.sh script to add external jars as colon separated to HADOOP_CLASSPATH (apache#3531)
  [GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (apache#3516)
  [GOBBLIN-1669] Clean up TimeAwareRecursiveCopyableDataset to support seconds in time… (apache#3528)
  [GOBBLIN-1670] Remove rat tasks and unneeded checkstyles blocking build pipeline (apache#3529)
  [GOBBLIN-1668] Add audit counts for iceberg registration (apache#3527)
  [GOBBLIN-1667] Create new predicate - ExistingPartitionSkipPredicate (apache#3526)
  Calculate requested container count based on adding allocated count and outstanding ContainerRequests in Yarn (apache#3524)
  make the requestedContainerCountMap correctly update the container count (apache#3523)
  Fix running counts for retried flows (apache#3520)
  Allow table to flush after write failure (apache#3522)
  [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails to process one GobblinTrackingEvent (apache#3513)
  Make Yarn container and helix instance allocation group by tag (apache#3519)
  [GOBBLIN-1657] Update completion watermark on change_property in IcebergMetadataWriter (apache#3517)
  [GOBBLIN-1654] Add capacity floor to avoid aggressively requesting resource and small files. (apache#3515)
  [GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (apache#3514)
  [GOBBLIN-1650] Implement flowGroup quotas for the DagManager (apache#3511)
  [GOBBLIN-1648] Complete use of JDBC `DataSource` 'read-only' validation query by incorporating where previously omitted (apache#3509)
  Add config to set close timeout in HiveRegister (apache#3512)
  add an API in AbstractBaseKafkaConsumerClient to list selected topics (apache#3501)
  [GOBBLIN-1649] Revert gobblin-1633 (apache#3510)
  [GOBBLIN-1639] Prevent metrics reporting if configured, clean up workunit count metric (apache#3500)
  [GOBBLIN-1647] Add hive commit GTE to HiveMetadataWriter (apache#3508)
  [GOBBLIN-1633] Fix compaction actions on job failure not retried if compaction succeeds (apache#3494)
  [GOBBLIN-1646] Revert yarn container / helix tag group changes (apache#3507)
  [GOBBLIN-1641] Add meter for sla exceeded flows (apache#3502)
  GOBBLIN-1644 (apache#3506)
  [GOBBLIN-1645]Change the prefix of dagManager heartbeat to make it consistent with other metrics (apache#3505)
  Fix bug when shrinking the container in Yarn service (apache#3504)
  [GOBBLIN-1637] Add writer, operation, and partition info to failed metadata writer events (apache#3498)
  [GOBBLIN-1638] Fix unbalanced running count metrics due to Azkaban failures (apache#3499)
  [GOBBLIN-1634] Add retries on flow sla kills (apache#3495)
  [GOBBLIN-1620]Make yarn container allocation group by helix tag (apache#3487)
  [GOBBLIN-1636] Close DatasetCleaner after clean task (apache#3497)
  [GOBBLIN-1635] Avoid loading env configuration when using config store to improve the performance (apache#3496)
  use user supplied props to create FileSystem in DatasetCleanerTask (apache#3483)
  [GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (apache#3477)
  use data node aliases to figure out data node names before using DMAS (apache#3493)
  [GOBBLIN-1630] Remove flow level metrics for adhoc flows (apache#3491)
  [GOBBLIN-1631]Emit heartbeat for dagManagerThread (apache#3492)
  [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (apache#3481)
  [GOBBLIN-1613] Add metadata writers field to GMCE schema (apache#3490)
  Update README.md
  [GOBBLIN-1629] Make GobblinMCEWriter be able to catch error when calculating hive specs (apache#3489)
  Add/fix some fields of MetadataWriterFailureEvent (apache#3485)
  [GOBBLIN-1627] provide option to convert datanodes names (apache#3484)
  Add coverage for edge cases when table paths do not exist, check parents (apache#3482)
  [GOBBLIN-1616] Add close connection logic in salseforceSource (apache#3486)
  [GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job skip event when job is dropped due to previous job is running (apache#3478)
  [GOBBLIN-1623] Fix NPE when try to close RestApiConnector (apache#3480)
  Clear bad mysql packages from cache in CI/CD machines (apache#3479)
  [GOBBLIN-1617] pass configurations to some HadoopUtils APIs (apache#3475)
  [GOBBLIN-1616] Make RestApiConnector be able to close the connection finally (apache#3474)
  add config to set log level for any class (apache#3473)
  Fix bug where partitioned tables would always return the wrong equality in paths (apache#3472)
  [GOBBLIN-1602] Change hive table location and partition check to validate using FS r… (apache#3459)
  Don't flush on change_property operation (apache#3467)
  Fix case where error GTE is incorrectly sent from MCE writer (apache#3466)
  partial rollback of PR 3464 (apache#3465)
  [GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (apache#3461)
  [GOBBLIN-1603] Throws error if configured when encountering an IO exception while co… (apache#3460)
  [GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (apache#3464)
  Upgraded dropwizard metrics library version from 3.2.3 -> 4.1.2 and added a new wrapper class on dropwizard Timer.Context class to handle the code compatibility as the newer version of this class implements AutoClosable instead of Closable. (apache#3463)
  [GOBBLIN-1605] Fix mysql ubuntu download 404 not found for Github Actions CI/CD (apache#3462)
  [GOBBLIN-1601] implement ChangePermissionCommitStep (apache#3457)
  [GOBBLIN-1598]Fix metrics already exist issue in dag manager (apache#3454)
  [GOBBLIN-1597] Add error handling in dagmanager to continue if dag fails to process,… (apache#3452)
  GOBBLIN-1579 Fail job on hive existing target table location mismatch (apache#3433)
  [GOBBLIN-1596] Ignore already exists exception if the table has already been created… (apache#3451)
  [GOBBLIn-1595]Fix the dead lock during hive registration (apache#3450)
  Add guard in DagManager for improperly formed SLA (apache#3449)
  [GOBBLIN-1588] Send failure events for write failures when watermark is advanced in MCE writer (apache#3441)
  [GOBBLIN-1593] Fix bugs in dag manager about metric reporting and job status monitor (apache#3448)
  Fix bug in `JobSpecSerializer` of inadequately preventing access errors (within `MysqlJobCatalog`) (apache#3447)
  [GOBBLIN-1583] Add System level job start SLA (apache#3437)
  [GOBBLIN-1592] Make hive copy be able to apply filter on directory (apache#3446)
  [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max attempt number (apache#3439)
  [GOBBLIN-1590] Add low/high watermark information in event emitted by Gobblin cluster (apache#3443)
  [HotFix]Try to fix the mysql dependency issue in Github action (apache#3445)
  Lazily initialize FileContext and do not store a handle of it so it can be GC'ed when required (apache#3444)
  [GOBBLIN-1584] Add replace record logic for Mysql writer (apache#3438)
  Bump up code cov version (apache#3440)
  [GOBBLIN-1581] Iterate over Sql ResultSet in Only the Forward Direction (apache#3435)
  [GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (apache#3427)
  ...
* upstream/master:
  [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (apache#3539)
* upstream/master:
  [GOBBLIN-1710]  Codecov should be optional in CI and not fail Github Actions (apache#3562)
  Define basics for collecting Iceberg metadata for the current snapshot (apache#3559)
  [GOBBLIN-1698] Fast fail during work unit generation based on config. (apache#3542)
* upstream/master:
  [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (apache#3570)
  Correct semantics of `IcebergDatasetTest` and streamline both impl and test code (apache#3571)
  [GOBBLIN-1713] Add missing sql source validation (apache#3567)
@codecov-commenter
Copy link

codecov-commenter commented Oct 1, 2022

Codecov Report

Merging #3575 (72b5662) into master (71da34b) will decrease coverage by 0.00%.
The diff coverage is 67.59%.

@@             Coverage Diff              @@
##             master    #3575      +/-   ##
============================================
- Coverage     46.90%   46.90%   -0.01%     
- Complexity    10610    10651      +41     
============================================
  Files          2111     2114       +3     
  Lines         82533    82893     +360     
  Branches       9178     9237      +59     
============================================
+ Hits          38714    38877     +163     
- Misses        40261    40450     +189     
- Partials       3558     3566       +8     
Impacted Files Coverage Δ
...obblin/util/function/CheckedExceptionFunction.java 0.00% <0.00%> (ø)
...bblin/util/measurement/GrowthMilestoneTracker.java 0.00% <0.00%> (ø)
...lin/data/management/copy/iceberg/IcebergTable.java 83.63% <70.00%> (-5.05%) ⬇️
...n/data/management/copy/iceberg/IcebergDataset.java 86.77% <94.02%> (+4.95%) ⬆️
...a/management/copy/iceberg/IcebergSnapshotInfo.java 89.47% <100.00%> (+1.23%) ⬆️
...bblin/service/monitoring/FsJobStatusRetriever.java 55.17% <100.00%> (ø)
...e/modules/orchestration/MysqlUserQuotaManager.java 47.34% <0.00%> (-26.53%) ⬇️
...a/org/apache/gobblin/util/limiter/NoopLimiter.java 40.00% <0.00%> (-20.00%) ⬇️
...lin/service/modules/flow/MultiHopFlowCompiler.java 50.28% <0.00%> (-18.58%) ⬇️
...lin/util/filesystem/FileSystemInstrumentation.java 85.71% <0.00%> (-14.29%) ⬇️
... and 30 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have several questions here:

  1. From the algorithm, it seems as long as there is one new snapshot generate on source, we will go through all the data files available on source to do copy even there is only one new file added. Not sure if I'm missing anything here?
  2. How do we plan to handle the file deletion on source? i.e. expire snapshot operation? If file A is deleted on source (I mean deleted from HDFS), how do we plan to apply the same on destination?

@phet
Copy link
Contributor Author

phet commented Oct 7, 2022

  1. it seems as long as there is one new snapshot generate on source, we will go through all the data files available on source to do copy even there is only one new file added.

actually we'll always go through the complete metadata on source to list every file reachable from at least one snapshot. that is, unless there's not any 'new', unreplicated snapshot. actual copy however only happens for files that are not present on the destination. further, we need not examine every file, to determine whether it exists on dest. rather, thanks to the immutability of iceberg files, we may short-circuit evaluation of an entire subtree of the iceberg metadata, when the root (e.g. manifest-list or manifest) is found already to exist at dest. to document details, I've added the comment // ALGO: in IcebergDataset.getFilePathsToFileStatus()

edit: upon further consideration, I added top-level short-circuiting of any manifest scanning, in the special case of the (root) metadata.json file already found to be replicated to destination (and changed text above accordingly).

  1. How do we plan to handle the file deletion on source? i.e. expire snapshot operation?

good question! the answer is: distcp is not responsible. instead we expect reachability analysis and orphan file deletion to happen elsewhere. a good candidate would be to encapsulate within the destination catalog that replication will eventually--forthcoming enhancement!--come to register the copied 'metadata.json' file with. e.g. that catalog would hold the metadata version prior to the registration and could easily determine which snapshots 'expired' from the act of replacing the older metadata file with the newer one (just copied from source)

@ZihanLi58 ZihanLi58 changed the title Enhance IcebergDataset to detect when files already at dest then proceed with only delta [GOBBLIN-1707] Enhance IcebergDataset to detect when files already at dest then proceed with only delta Oct 7, 2022
Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall look good to me, just minor comments

// child of the current file's predecessor (which this new meta file now replaces).
if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add comment here to mention that only oldest snapshot which most likely already been copied contains the all old files, the mfi from newer snapshot only contain new added file in those manifest?

Copy link
Contributor Author

@phet phet Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those are definitely the semantics of using IcebergTable.getIncrementalSnapshotInfosIterator(). it's in the javadoc there. where do you want me to repeat it over here? maybe should I note those semantics above here when I call that method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can add more clarity in IcebergTable.getIncrementalSnapshotInfosIterator(), at least I was missing the assumption that it return the snapshot from oldest to latest. and files only appear once in earliest snapshot that contains it. Might be just me though...

Copy link
Contributor Author

@phet phet Oct 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing. I added this:

This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} from the n-th or prior elements. Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from which it first becomes reachable.

Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).

/** @returns whether `path` is present on `targetFs`, tunneling checked exceptions and caching results throughout */
protected static boolean isPathPresentOnTarget(Path path, FileSystem targetFs, CopyConfiguration copyConfig) {
try {
// omit considering timestamp (or other markers of freshness), as files should be immutable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether this is true for all files, as metadata file is immutable, data file might be updated?

Copy link
Contributor Author

@phet phet Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's technically possible to change files in place, but to do so, breaks the iceberg's repeatability. it's not something we should ever encourage... instead write new files and create a snapshot w/ those that substitutes out and hence replaces the original ones!

for distcp specifically, the real issue w/ in-place mods to data files is that every delta copy must devolve into a full comparison of the filestatus (between source and dest) for the entire iceberg table. that's a huge amount of effort in some cases... all because of misbehaving writers/updaters.

I suggest that if we do learn we're working w/ writers that do this, we later return here to add the necessary complexity. it's likely something we'll control via configuration, so it's not always on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, given you have the comment mention this behavior as well, it should be fine.

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ZihanLi58 ZihanLi58 merged commit 585298f into apache:master Oct 14, 2022
phet added a commit to phet/gobblin that referenced this pull request Dec 3, 2022
…at dest then proceed with only delta (apache#3575)

* Enhance `IcebergDataset` to detect when files already at dest and proceed with only delta

* fixup: minor doc/comment mods

* Allow not finding a file mentioned by iceberg metadata to be a non-fatal error

* Have `IcebergDataset` check more thoroughly at dest for previously-copied files that could be skipped

* Add short-circuiting of `IcebergDataset` file scan when (root) metadata file already replicated to dest

* Abreviate `IcebergDataset` logging when source file not found, refactor for code reuse, and round out testing

* Improve javadoc as suggested during review

* Streamline `IcebergDataset` logging when source path not found

* Skip `IcebergDataset` per-data-file check, falling back to per-manifest-file checking

* minor comment change

* Add `IcebergDataset` logging to indicate volume of filepaths accumulated

* Extend and refactor `IcebergDataset` logging to indicate volume of filepaths accumulated

* improve comments

* Add logging for running count of source paths not found
phet added a commit to phet/gobblin that referenced this pull request Dec 5, 2022
* upstream/master:
  move dataset handler code before cleaning up staging data (apache#3594)
  [GOBBLIN-1730] Include flow execution id when try to cancel/submit job using SimpleKafkaSpecProducer (apache#3588)
  [GOBBLIN-1734] make DestinationDatasetHandler work on streaming sources (apache#3592)
  give option to cancel helix workflow through Delete API (apache#3580)
  [GOBBLIN-1728] Fix YarnService incorrect container allocation behavior (apache#3586)
  Support multiple node types in shared flowgraph, fix logs (apache#3590)
  Search for dummy file in writer directory (apache#3589)
  Use root cause for checking if exception is transient (apache#3585)
  [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (apache#3583)
  [GOBBLIN-1731] Enable HiveMetadataWriter to override table schema lit… (apache#3587)
  [GOBBLIN-1726] Avro 1.9 upgrade of Gobblin OSS (apache#3581)
  [GOBBLIN-1725] Fix bugs in gaas warm standby mode (apache#3582)
  [GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume… (apache#3572)
  Add log line for committing/retrieving watermarks in streaming (apache#3578)
  [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (apache#3575)
  Ignore AlreadyExistsException in hive writer (apache#3579)
  Fail GMIP container for known transient exceptions to avoid data loss (apache#3576)
  GOBBLIN-1715: Support vectorized row batch pooling (apache#3574)
  [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (apache#3548)
  GOBBLIN-1719 Replace moveToTrash with moveToAppropriateTrash for hadoop trash (apache#3573)
  [GOBBLIN-1703] avoid double quota increase for adhoc flows (apache#3550)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants