-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GOBBLIN-1715: Support for Vectorized row batch pooling #3574
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3574 +/- ##
============================================
- Coverage 46.87% 46.87% -0.01%
- Complexity 10630 10640 +10
============================================
Files 2112 2113 +1
Lines 82744 82798 +54
Branches 9215 9220 +5
============================================
+ Hits 38784 38808 +24
- Misses 40404 40429 +25
- Partials 3556 3561 +5
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits but nothing major. Great work!
|
||
static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs"; | ||
static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really minor nits about style. Typically we follow a pattern of having a shared prefix variable for orc.row.batch.expiry.
or even just orc.row.batch
as the prefix. And then we start the default values with the name DEFAULT_
See
gobblin/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
Lines 109 to 110 in 3733d60
public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes"; | |
public static final Long DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES = Long.MAX_VALUE; |
Also, maybe add the word pool to these settings since these are specific to the batch pool and not for regular row batch?
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
Outdated
Show resolved
Hide resolved
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
Show resolved
Hide resolved
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
Outdated
Show resolved
Hide resolved
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
Outdated
Show resolved
Hide resolved
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
Show resolved
Hide resolved
@@ -269,6 +280,7 @@ public void commit() | |||
@Override | |||
public void write(D record) | |||
throws IOException { | |||
Preconditions.checkState(!closed, "Writer already closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When did you see this edge case happen? And does this cause the fork to immediately terminate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given there's so much usage and abstract classes which this extends I wanted to be sure that this condition is preserved
synchronized (rowBatches) { | ||
LinkedList<RowBatchHolder> vals = rowBatches.get(schema); | ||
VectorizedRowBatch rowBatch; | ||
if (vals == null || vals.size() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dumb question but do we have a preference between apache commons CollectionUtils.isEmpty
and a basic null check like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not keen on adding more dependencies here if this is a small change
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
Show resolved
Hide resolved
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
Outdated
Show resolved
Hide resolved
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java
Show resolved
Hide resolved
btw service tests are failing checkstyle because you need the apache docstring header at the top of new files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
* upstream/master: move dataset handler code before cleaning up staging data (apache#3594) [GOBBLIN-1730] Include flow execution id when try to cancel/submit job using SimpleKafkaSpecProducer (apache#3588) [GOBBLIN-1734] make DestinationDatasetHandler work on streaming sources (apache#3592) give option to cancel helix workflow through Delete API (apache#3580) [GOBBLIN-1728] Fix YarnService incorrect container allocation behavior (apache#3586) Support multiple node types in shared flowgraph, fix logs (apache#3590) Search for dummy file in writer directory (apache#3589) Use root cause for checking if exception is transient (apache#3585) [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (apache#3583) [GOBBLIN-1731] Enable HiveMetadataWriter to override table schema lit… (apache#3587) [GOBBLIN-1726] Avro 1.9 upgrade of Gobblin OSS (apache#3581) [GOBBLIN-1725] Fix bugs in gaas warm standby mode (apache#3582) [GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume… (apache#3572) Add log line for committing/retrieving watermarks in streaming (apache#3578) [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (apache#3575) Ignore AlreadyExistsException in hive writer (apache#3579) Fail GMIP container for known transient exceptions to avoid data loss (apache#3576) GOBBLIN-1715: Support vectorized row batch pooling (apache#3574) [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (apache#3548) GOBBLIN-1719 Replace moveToTrash with moveToAppropriateTrash for hadoop trash (apache#3573) [GOBBLIN-1703] avoid double quota increase for adhoc flows (apache#3550)
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
As mentioned in the ticket the existing array resizing is buggy for ORC's vectorized row batch. In almost all the cases we should fall back on the 'enlargeFactor' option. In some cases where throughput may be a big concern. We should enable the proposed approach below. We've seen some benefits of using the current approach.
Major GC pauses have reduceddata:image/s3,"s3://crabby-images/633e8/633e8f2f68691957bf161565ed51531a7529032e" alt="Before"
data:image/s3,"s3://crabby-images/cb43c/cb43c1c141bc7f2d6dbfcc61c2912cc83c94282e" alt="After"
BEFORE
AFTER
Memory is only allocated when needed which when compared to the old approach allocated a lot more memory even when it was not required. [For details please see the ticket]data:image/s3,"s3://crabby-images/15e10/15e1061c3ef455a1ee95ea43259f1b667564cd62" alt="before2"
data:image/s3,"s3://crabby-images/2b59f/2b59f9293ce9d30e8384d0f003ac38ba0e44a838" alt="Screen Shot 2022-09-30 at 3 41 05 PM"
BEFORE:
AFTER:
Tests
Commits