-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelized read-write operations in Hoodie Merge phase #370
Conversation
n3nash
commented
Apr 2, 2018
•
edited
Loading
edited
- Parallelized read-write operations in Hoodie Merge phase
- Make BufferedIterator generic enough to be able to Buffer any type of payload, not just HoodieRecord.
// It caches the exception seen while fetching insert value. | ||
public Optional<Exception> exception = Optional.empty(); | ||
|
||
public BufferedIteratorPayload(T record, Schema schema) { | ||
this.record = record; | ||
try { | ||
this.insertValue = record.getData().getInsertValue(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to remove this? This is an expensive operation which we want to offload to reader thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid commenting on "WIP" PR's :)
0b9c336
to
21a9166
Compare
@vinothchandar Please take a pass at the approach, comments and java docs and code cleaning coming soon after we agree on the approach. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decoupling Reading and Writing Parquet files from/to potentially remote server makes sense. Code changes looks good in general. I have added some minor comments.
logger.info("starting hoodie writer thread"); | ||
// Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext | ||
// properties. | ||
TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: It looks like there is a contract in using threads inside Spark Task and is repeated in update case too. Can we make a first class type (like SparkTaskThread) and handle this boiler-plate in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext);
this is the only boiler plate code here. The other code around is a little different in each scenario unfortunately. Trying to avoid introducing new first class types if possible.
|
||
public BufferedIterator(final Iterator<T> iterator, final long bufferMemoryLimit, | ||
final Schema schema) { | ||
final Schema schema, final PayloadFunction<T, R> payloadFunc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Rename PayloadFunction to BufferedIteratorPayloadFactory as it is used only in the context of BufferedIterator
|
||
package com.uber.hoodie.func.payload; | ||
|
||
public class AbstractBufferedIteratorPayload<I, O> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abstract ?
public HoodieRecordBufferedIteratorPayload(HoodieRecord record, Schema schema) { | ||
this.inputPayload = record; | ||
try { | ||
this.outputPayload = record.getData().getInsertValue(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An observation: I am assuming this serialization is cpu intensive and with your changes, this work is decoupled from parquet write for new file-group generation case (Hoodie Create Handle). For the merge-case though, the deserialization seems to be tied up with parquet data fetch but they together are decoupled from parquet write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's true. The issue is that for CreateHandle the process isn't dependent on anything apart from the input, whereas in the MergeHandle, a merge process with the new records takes place. A larger refactor could help address this but I'm hoping to keep that for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets capture the AIs from here into an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -40,8 +38,8 @@ | |||
* internally samples every {@link #RECORD_SAMPLING_RATE}th record and adjusts number of records in | |||
* buffer accordingly. This is done to ensure that we don't OOM. | |||
*/ | |||
public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRecord<K>> implements | |||
Iterator<BufferedIterator.BufferedIteratorPayload<T>> { | |||
public class BufferedIterator<T, R> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add metrics around ingress, egress throughput and latency.
import com.uber.hoodie.func.payload.AbstractBufferedIteratorPayload; | ||
import java.util.function.Function; | ||
|
||
public abstract class PayloadFunction<T, R> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Make it interface as there is no implementation ?
@bvaradar Thanks for your comments. Unfortunately, the PR wasn't ready for review yet apart from just discussing the approach. Since we agree on the approach and hopefully so does @vinothchandar, I'm going to make necessary code changes now. You might need to do another pass because of this, thanks! |
21a9166
to
3146eaa
Compare
@bvaradar @vinothchandar Cleaned up the code, please take a pass at it. |
3146eaa
to
f005b30
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall fine with approach. On top of comments I left, like to see if we can avoid introducing a single abstraction BufferedIteratorTransform<I,O,F>
which contains input, output and also the function. Function can just be a lambda as well.
@Override | ||
public boolean hasNext() { | ||
try { | ||
this.next = parquetReader.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets handle the case where hasNext
is handled without next
being called and vice versa..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e make it true to iterator contract
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
*/ | ||
public interface BufferedIteratorPayloadFunction<I, O> | ||
extends Function<I, AbstractBufferedIteratorPayload<I, O>> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra line
public HoodieRecordBufferedIteratorPayload(HoodieRecord record, Schema schema) { | ||
this.inputPayload = record; | ||
try { | ||
this.outputPayload = record.getData().getInsertValue(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets capture the AIs from here into an issue?
@@ -136,7 +136,8 @@ private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) { | |||
// Load the new records in a map | |||
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge()); | |||
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), | |||
Optional.empty(), new StringConverter(), new HoodieRecordConverter(schema, config.getPayloadClass())); | |||
Optional.empty(), new StringConverter(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are wraps that happened column width 100. Checkstyle and code-style configuration under style/ folder have 120 as wrap point.
@n3nash : If you have not already imported style/intellij-java-google-style.xml, Can you use them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already use them, not sure what happened here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, Digged a bit. @n3nash: Can you verify IntelliJ -> Preferences-> Code Style -> Java -> Wrapping and Braces is set to 100 (instead of 120).
Found an issue in IntelliJ where if you have existing code-styles (other than default) which are active, importing a new code-style does not seem to change this value. I am not able to reproduce this consistently though. Deleting all non-default code-styles and/or activating the Default Code-Style and then importing our code-style does the trick.
Let me know how it goes.
Balaji.V
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future writerResult = | ||
writerService.submit( | ||
() -> { | ||
logger.info("starting hoodie writer thread"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code seems repeated, is there a way to modularize and share code? Also revisit logger statements to make then more contextualized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed.
@@ -94,7 +95,9 @@ private void initFile(File writeOnlyFileHandle) throws IOException { | |||
} | |||
writeOnlyFileHandle.createNewFile(); | |||
|
|||
log.info("Spilling to file location " + writeOnlyFileHandle.getAbsolutePath()); | |||
log.info("Spilling to file location " + writeOnlyFileHandle.getAbsolutePath() + " in machine (" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename machine to host
00c0f8a
to
80fa5c6
Compare
@vinothchandar @bvaradar addressed CR comments. @vinothchandar I started making changes for |
80fa5c6
to
e16d6c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, | ||
payload.record.getPartitionPath()); | ||
handle.write(payload.record, payload.insertValue, | ||
handle = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's formatting incorrectly, I made a pass on all files..
// Holds the next entry returned by the parquet reader | ||
private T next; | ||
// Holds the current entry returned by the parquet reader | ||
private T prev; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can implement this using a single variable next
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a test for this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, done.
* needs to be buffered, the runnable function that needs to be executed in the reader thread and | ||
* return the transformed output based on the writer function | ||
*/ | ||
public class BufferedIteratorWrapper<I, O, E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets rename to something more specific.. It feels like we are overloading Wrapper
a lot. How about BufferedIteratorExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
* This class wraps a parquet reader and provides an iterator based api to | ||
* read from a parquet file. This is used in {@link BufferedIterator} | ||
*/ | ||
public class ParquetReaderWrappedIterator<T> implements Iterator<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename : ParquetReaderIterator or ParquetIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
* @param <I> input payload data type | ||
* @param <O> output payload data type | ||
*/ | ||
public interface BufferedIteratorPayloadFunction<I, O> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename : BufferedIteratorTransform
(what it does is to transform I to O)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should reconsider merging the Payload and the Transform here.. Current way introduces too many classes/interfaces for what we need to achieve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree it introduces extra classes to achieve something more simple...I like the cleaner separation of payload and transformer though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a unique advantage for having them separate? In this scenario, you are just looking for a a transform to turn I to O and it seems straightforward to have them together. Even if you look up something like Adapter pattern (https://en.wikipedia.org/wiki/Adapter_pattern) , the I,O,F are together.. (we don't need the target object to reflect source object changes though)
extends AbstractBufferedIteratorPayload<GenericRecord, GenericRecord> { | ||
|
||
public GenericRecordBufferedIteratorPayload(GenericRecord record) { | ||
this.inputPayload = record; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this constructor to the abstract class above.. thats the typical pattern . or did you intend for it to be a Interface originally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, missed to refactor that after other changes, thanks for pointing that out.
implements BufferedIteratorPayloadFunction<GenericRecord, AbstractBufferedIteratorPayload> { | ||
|
||
@Override | ||
public AbstractBufferedIteratorPayload apply(GenericRecord t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class for e.g is why I strongly feel we should merge payload and transform into a single entity. We always use them to together and end up with a more complex Generic interfaces and micro implementations in the current way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in any case, lets name this class shorter? the Transform rename would make this GenericRecordBufferedIteratorTransform
which is a tad shorter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed rename
699805e
to
90ba1c3
Compare
// It caches the exception seen while fetching insert value. | ||
public Optional<Exception> exception = Optional.empty(); | ||
|
||
public HoodieRecordBufferedIteratorPayload(HoodieRecord record, Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider using a static method lambda if possible to capture the transform.. This whole class structure with constructor chaining/inheritance all seem overkill for the amount of work thats done in converting I to O. This is one of the reasons function passing was introduced in java - to avoid such boilerplate code. Can we take adv of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored, please take a pass.
90ba1c3
to
967a338
Compare
@vinothchandar Refactored, please take a pass. |
Co-authored-by: lokesh-lingarajan-0310 <[email protected]>