-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to lookback only into datefolders that match range #3563
Conversation
…rovement' of https://github.com/AndyJiang99/gobblin into andjiang/TimeAwareRecursiveCopyableDataset-lookback-improvement
private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path path, String traversedDatePath, PathFilter fileFilter, | ||
int level, LocalDateTime startDate, LocalDateTime endDate, DateTimeFormatter formatter) throws IOException { | ||
List<FileStatus> fileStatuses = Lists.newArrayList(); | ||
if (!Objects.equals(traversedDatePath, "")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can do traversedDatePath.equals(""), unless you think it can be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I don't think traversedDatePath should ever be set to null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or for sake of uniformity, you can do traversedDatePath.isEmpty()
as done later in this method.
Integer.parseInt(traversedDatePathSplit[index]) > endDateSplit[index]) { | ||
return false; | ||
} | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exception would be thrown here? We should avoid a wide catch and silent return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception that could be caught is if Integer.parseInt
tries to parse a string, or any errors resulting from Integer.parseInt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed so string is checked for digits before parsing
Codecov Report
@@ Coverage Diff @@
## master #3563 +/- ##
============================================
- Coverage 46.81% 46.79% -0.03%
- Complexity 10514 10516 +2
============================================
Files 2095 2099 +4
Lines 81945 82001 +56
Branches 9129 9135 +6
============================================
+ Hits 38361 38369 +8
- Misses 40040 40087 +47
- Partials 3544 3545 +1
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Please update the details in PR template |
@@ -134,9 +134,40 @@ protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter f | |||
return recursivelyGetFilesAtDatePath(fs, path, "", fileFilter, 1, startDate, endDate, formatter); | |||
} | |||
|
|||
public Boolean checkPathDateTimeValidity(LocalDateTime startDate, LocalDateTime endDate, String traversedDatePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly is this method doing? Please add the javadoc.
Is it comparing two Dates? If yes, we should do it with available library methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what is being done here could be done in a more intuitive way with in-build methods.
// Only check the number of parameters that the traversedDatePath has traversed through so far | ||
for (int index = 0; index < traversedDatePathSplit.length; index++) { | ||
// Only attempt to parse the number if the entire string are digits | ||
boolean onlyNumbers = traversedDatePathSplit[index].matches("^[0-9]+$"); | ||
if (onlyNumbers) { | ||
if (Integer.parseInt(traversedDatePathSplit[index]) < startDateSplit[index] || | ||
Integer.parseInt(traversedDatePathSplit[index]) > endDateSplit[index]) { | ||
return false; | ||
} | ||
} | ||
else { | ||
return false; | ||
} | ||
} | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would not work when considering ranges that span beyond multiple years/months/days.
Consider traversedDatePathSplit == [2022, 09, 01 ....]
startDate is 2022/08/20, endDate is 2022/09/10
Then:
it would return false since it thinks the date is previous to the start Date, 01 < 20.
I would follow Arjun's recommendation of keeping strings as dates, and then rounding them to the lowest granularity, and then comparing them.
private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path path, String traversedDatePath, PathFilter fileFilter, | ||
int level, LocalDateTime startDate, LocalDateTime endDate, DateTimeFormatter formatter) throws IOException { | ||
List<FileStatus> fileStatuses = Lists.newArrayList(); | ||
if (!traversedDatePath.equals("")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isEmpty() is probably better here actually
boolean beforeOrOnEndDate = traversedDatePathRound.isBefore(endDateRound) || traversedDatePathRound.isEqual(endDateRound); | ||
return afterOrOnStartDate && beforeOrOnEndDate; | ||
} | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it return true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should return false because if the datePath has non-numerical characters (excluding /
), then it should not keep traversing as it is not in the right dateTime format
* @return true/false | ||
*/ | ||
public Boolean checkPathDateTimeValidity(LocalDateTime startDate, LocalDateTime endDate, String datePath, String datePathFormat, int level) { | ||
String [] array = datePathFormat.split("/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use a better name :). Like datePathFormatArray
for (int index = 1; index < level; index++) { | ||
if (index > 1) { | ||
datePathPattern.append("/"); | ||
} | ||
datePathPattern.append(array[index - 1]); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an easier way of doing this is to define a list, and use
String.join("/", Arrays.asList(datePathFormatArray).subList(0, level));
which essentially gives you the reconstructed datePathFormat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, use FileSystems.getDefault().getSeparator()
boolean beforeOrOnEndDate = traversedDatePathRound.isBefore(endDateRound) || traversedDatePathRound.isEqual(endDateRound); | ||
return afterOrOnStartDate && beforeOrOnEndDate; | ||
} catch (IllegalArgumentException e) { | ||
log.error("Cannot parse path " + datePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some expectation around this log too,
String.format("Cannot parse path at %s, expected in format of %s", datePath, datePathPattern)
boolean afterOrOnStartDate = traversedDatePathRound.isAfter(startDateRound) || traversedDatePathRound.isEqual(startDateRound); | ||
boolean beforeOrOnEndDate = traversedDatePathRound.isBefore(endDateRound) || traversedDatePathRound.isEqual(endDateRound); | ||
return afterOrOnStartDate && beforeOrOnEndDate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably simplify this by
return !traversedDatePathRound.isBefore(startDateRound) && !traversedDatePathRound.isAfter(endDateRound)
* @param level | ||
* @return true/false | ||
*/ | ||
public Boolean checkPathDateTimeValidity(LocalDateTime startDate, LocalDateTime endDate, String datePath, String datePathFormat, int level) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit tests please!
/** | ||
* Checks if the datePath provided is in the range of the start and end dates. | ||
* Rounds startDate and endDate to the same granularity as datePath prior to comparing. | ||
* Returns true if the datePath provided is in the range of start and end dates, inclusive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be moved to line 146
…only into datefolders that match range (apache#3563) * Check datetime range validity prior to recursing * Remove unused packages * Remove extra line * Reformat function * Check string prior to parsing * removed unused import * Change checkpathdatetimevalidity to use available localdatetime library parsing functions * Change to isempty * Modify check path to be flexible * Update javadoc * Add unit tests and refactor
…can_icebergs_incrementally * upstream/master: [GOBBLIN-1704] Purge offline helix instances during startup (apache#3561) [GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to lookback only into datefolders that match range (apache#3563) [GOBBLIN-1707] Add `IcebergTableTest` unit test (apache#3564) [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (apache#3560) [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding (apache#3549) [GOBBLIN-1711] Replace Jcenter with maven central (apache#3566)
…one flow execution (#3558) * address comments * use connectionmanager when httpclient is not cloesable * [GOBBLIN-1706]Add DagActionStore to store the action to kill/resume one flow execution * add new flow execution handler which use DagactionStore to persist dag actions and let other host get the info * Make dag manager integrate with the dag action store * address comments * address comments * fix typo and add comments * [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552 * before starting reduce * after first record is reduced * after reducing every 1000 records Co-authored-by: Urmi Mustafi <[email protected]> * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539) * [GOBBLIN-1673] Schema for dynamic work unit message * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions * [GOBBLIN-1698] Fast fail during work unit generation based on config. (#3542) * fast fail during work unit generation based on config. * [GOBBLIN-1690] Added logging to ORC writer Closes #3543 from rdsr/master * [GOBBLIN-1678] Refactor git flowgraph component to be extensible (#3536) * Refactor git flowgraph component to be extensible * Move files to appropriate modules * Cleanup and add javadocs * Cleanup, add missing javadocs * Address review and import order * Fix findbugs * Use java sort instead of collections * Add GMCE topic explicitly to hive commit event (#3547) * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (#3544) * address comments * use connectionmanager when httpclient is not cloesable * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode * add orchestor as listener before service start * fix code style * address comments * fix test case to test orchestor as one listener of flow spec * remove unintentional change * remove unused import * address comments * fix typo Co-authored-by: Zihan Li <[email protected]> * fast fail during work unit generation based on config. Co-authored-by: Meeth Gala <[email protected]> Co-authored-by: Ratandeep <[email protected]> Co-authored-by: William Lo <[email protected]> Co-authored-by: Jack Moseley <[email protected]> Co-authored-by: Zihan Li <[email protected]> Co-authored-by: Zihan Li <[email protected]> * Define basics for collecting Iceberg metadata for the current snapshot (#3559) * [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (#3554) * remove jcentral * Use gradle plugin portal for shadow * Use maven central in all other cases * [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (#3551) * Allow first time failure to authenticate with Azkaban to fail silently * Fix findbugs report * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow * Add handling for fetchSession throwing an exception * Add logging when fails on constructor and initialization, but continue to local deploy * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor * Fixed vars * Revert changes on azkabanSpecProducer * clean up error throwing * revert function checking changes * Reformat file * Clean up function * Format file for try/catch * Allow first time failure to authenticate with Azkaban to fail silently * Fix findbugs report * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow * Fixed rebase * Fixed rebase * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor * Add whitespace back * fix helix job wait completion bug when job goes to STOPPING state (#3556) address comments update stoppingStateEndTime with currentTime update test cases * [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552 * before starting reduce * after first record is reduced * after reducing every 1000 records Co-authored-by: Urmi Mustafi <[email protected]> * Define basics for collecting Iceberg metadata for the current snapshot * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539) * [GOBBLIN-1673] Schema for dynamic work unit message * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions * Address review comments * Correct import order Co-authored-by: Matthew Ho <[email protected]> Co-authored-by: Andy Jiang <[email protected]> Co-authored-by: Hanghang Nate Liu <[email protected]> Co-authored-by: umustafi <[email protected]> Co-authored-by: Urmi Mustafi <[email protected]> Co-authored-by: William Lo <[email protected]> * [GOBBLIN-1710] Codecov should be optional in CI and not fail Github Actions (#3562) * [GOBBLIN-1711] Replace Jcenter with maven central (#3566) * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding (#3549) * address comments * use connectionmanager when httpclient is not cloesable * fix test case to test orchestor as one listener of flow spec * remove unintentional change * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding * fix compilation error * address comments * address comments * address comments * update outdated javadoc Co-authored-by: Zihan Li <[email protected]> * [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560) * initial commit for iceberg distcp. * adding copy entity helper and icerbeg distcp template and test case. * Adding unit tests and refactoring method definitions for an Iceberg dataset. * resolve conflicts after cleaning history * update iceberg dataset and finder to include javadoc * addressed comments on PR and aligned code check style * renamed vars, added logging and updated javadoc * update dataset descriptor with ternary operation and rename fs to sourceFs * added source and target fs and update iceberg dataset finder constructor * Update source and dest dataset methods as protected and add req args constructor * change the order of attributes for iceberg dataset finder ctor * update iceberg dataset methods with correct source and target fs Co-authored-by: Meeth Gala <[email protected]> * [GOBBLIN-1707] Add `IcebergTableTest` unit test (#3564) * Add `IcebergTableTest` unit test * Fixup comment and indentation * Minor correction of `Long` => `Integer` * Correct comment * [GOBBLIN-1711] Replace Jcenter with maven central (#3566) * Minor rename of local var Co-authored-by: Matthew Ho <[email protected]> * [GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to lookback only into datefolders that match range (#3563) * Check datetime range validity prior to recursing * Remove unused packages * Remove extra line * Reformat function * Check string prior to parsing * removed unused import * Change checkpathdatetimevalidity to use available localdatetime library parsing functions * Change to isempty * Modify check path to be flexible * Update javadoc * Add unit tests and refactor * change bind class as GOBBLIN-1697 get merged Co-authored-by: Zihan Li <[email protected]> Co-authored-by: umustafi <[email protected]> Co-authored-by: Urmi Mustafi <[email protected]> Co-authored-by: Matthew Ho <[email protected]> Co-authored-by: meethngala <[email protected]> Co-authored-by: Meeth Gala <[email protected]> Co-authored-by: Ratandeep <[email protected]> Co-authored-by: William Lo <[email protected]> Co-authored-by: Jack Moseley <[email protected]> Co-authored-by: Kip Kohn <[email protected]> Co-authored-by: Andy Jiang <[email protected]> Co-authored-by: Hanghang Nate Liu <[email protected]>
…only into datefolders that match range (apache#3563) * Check datetime range validity prior to recursing * Remove unused packages * Remove extra line * Reformat function * Check string prior to parsing * removed unused import * Change checkpathdatetimevalidity to use available localdatetime library parsing functions * Change to isempty * Modify check path to be flexible * Update javadoc * Add unit tests and refactor
…one flow execution (apache#3558) * address comments * use connectionmanager when httpclient is not cloesable * [GOBBLIN-1706]Add DagActionStore to store the action to kill/resume one flow execution * add new flow execution handler which use DagactionStore to persist dag actions and let other host get the info * Make dag manager integrate with the dag action store * address comments * address comments * fix typo and add comments * [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs apache#3552 * before starting reduce * after first record is reduced * after reducing every 1000 records Co-authored-by: Urmi Mustafi <[email protected]> * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (apache#3539) * [GOBBLIN-1673] Schema for dynamic work unit message * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions * [GOBBLIN-1698] Fast fail during work unit generation based on config. (apache#3542) * fast fail during work unit generation based on config. * [GOBBLIN-1690] Added logging to ORC writer Closes apache#3543 from rdsr/master * [GOBBLIN-1678] Refactor git flowgraph component to be extensible (apache#3536) * Refactor git flowgraph component to be extensible * Move files to appropriate modules * Cleanup and add javadocs * Cleanup, add missing javadocs * Address review and import order * Fix findbugs * Use java sort instead of collections * Add GMCE topic explicitly to hive commit event (apache#3547) * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode (apache#3544) * address comments * use connectionmanager when httpclient is not cloesable * [GOBBLIN-1689] Decouple compiler from scheduler in warm standby mode * add orchestor as listener before service start * fix code style * address comments * fix test case to test orchestor as one listener of flow spec * remove unintentional change * remove unused import * address comments * fix typo Co-authored-by: Zihan Li <[email protected]> * fast fail during work unit generation based on config. Co-authored-by: Meeth Gala <[email protected]> Co-authored-by: Ratandeep <[email protected]> Co-authored-by: William Lo <[email protected]> Co-authored-by: Jack Moseley <[email protected]> Co-authored-by: Zihan Li <[email protected]> Co-authored-by: Zihan Li <[email protected]> * Define basics for collecting Iceberg metadata for the current snapshot (apache#3559) * [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (apache#3554) * remove jcentral * Use gradle plugin portal for shadow * Use maven central in all other cases * [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (apache#3551) * Allow first time failure to authenticate with Azkaban to fail silently * Fix findbugs report * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow * Add handling for fetchSession throwing an exception * Add logging when fails on constructor and initialization, but continue to local deploy * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor * Fixed vars * Revert changes on azkabanSpecProducer * clean up error throwing * revert function checking changes * Reformat file * Clean up function * Format file for try/catch * Allow first time failure to authenticate with Azkaban to fail silently * Fix findbugs report * Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow * Fixed rebase * Fixed rebase * Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor * Add whitespace back * fix helix job wait completion bug when job goes to STOPPING state (apache#3556) address comments update stoppingStateEndTime with currentTime update test cases * [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs apache#3552 * before starting reduce * after first record is reduced * after reducing every 1000 records Co-authored-by: Urmi Mustafi <[email protected]> * Define basics for collecting Iceberg metadata for the current snapshot * [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (apache#3539) * [GOBBLIN-1673] Schema for dynamic work unit message * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions * Address review comments * Correct import order Co-authored-by: Matthew Ho <[email protected]> Co-authored-by: Andy Jiang <[email protected]> Co-authored-by: Hanghang Nate Liu <[email protected]> Co-authored-by: umustafi <[email protected]> Co-authored-by: Urmi Mustafi <[email protected]> Co-authored-by: William Lo <[email protected]> * [GOBBLIN-1710] Codecov should be optional in CI and not fail Github Actions (apache#3562) * [GOBBLIN-1711] Replace Jcenter with maven central (apache#3566) * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding (apache#3549) * address comments * use connectionmanager when httpclient is not cloesable * fix test case to test orchestor as one listener of flow spec * remove unintentional change * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding * fix compilation error * address comments * address comments * address comments * update outdated javadoc Co-authored-by: Zihan Li <[email protected]> * [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (apache#3560) * initial commit for iceberg distcp. * adding copy entity helper and icerbeg distcp template and test case. * Adding unit tests and refactoring method definitions for an Iceberg dataset. * resolve conflicts after cleaning history * update iceberg dataset and finder to include javadoc * addressed comments on PR and aligned code check style * renamed vars, added logging and updated javadoc * update dataset descriptor with ternary operation and rename fs to sourceFs * added source and target fs and update iceberg dataset finder constructor * Update source and dest dataset methods as protected and add req args constructor * change the order of attributes for iceberg dataset finder ctor * update iceberg dataset methods with correct source and target fs Co-authored-by: Meeth Gala <[email protected]> * [GOBBLIN-1707] Add `IcebergTableTest` unit test (apache#3564) * Add `IcebergTableTest` unit test * Fixup comment and indentation * Minor correction of `Long` => `Integer` * Correct comment * [GOBBLIN-1711] Replace Jcenter with maven central (apache#3566) * Minor rename of local var Co-authored-by: Matthew Ho <[email protected]> * [GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to lookback only into datefolders that match range (apache#3563) * Check datetime range validity prior to recursing * Remove unused packages * Remove extra line * Reformat function * Check string prior to parsing * removed unused import * Change checkpathdatetimevalidity to use available localdatetime library parsing functions * Change to isempty * Modify check path to be flexible * Update javadoc * Add unit tests and refactor * change bind class as GOBBLIN-1697 get merged Co-authored-by: Zihan Li <[email protected]> Co-authored-by: umustafi <[email protected]> Co-authored-by: Urmi Mustafi <[email protected]> Co-authored-by: Matthew Ho <[email protected]> Co-authored-by: meethngala <[email protected]> Co-authored-by: Meeth Gala <[email protected]> Co-authored-by: Ratandeep <[email protected]> Co-authored-by: William Lo <[email protected]> Co-authored-by: Jack Moseley <[email protected]> Co-authored-by: Kip Kohn <[email protected]> Co-authored-by: Andy Jiang <[email protected]> Co-authored-by: Hanghang Nate Liu <[email protected]>
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Tests
Commits