-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-4445] S3 Incremental source improvements #6176
[HUDI-4445] S3 Incremental source improvements #6176
Conversation
305c9e0
to
e434c6b
Compare
.rdd().toJavaRDD().mapPartitions(fileListIterator -> { | ||
List<String> cloudFilesPerPartition = new ArrayList<>(); | ||
fileListIterator.forEachRemaining(row -> { | ||
final Configuration configuration = serializableConfiguration.newCopy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why creating a copy again? I don't see any config modification happening within the executor. Why not pass serializableConfiguration
simply?
} | ||
LOG.warn("Extracted distinct files " + cloudFiles.size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i assume it was for testing, change log to debug
level?
@@ -81,6 +94,12 @@ static class Config { | |||
* - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"} | |||
*/ | |||
static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options"; | |||
|
|||
// ToDo make it a list of extensions | |||
static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a list of supported source data files extensions, e.g. .json, .parquet, .avro, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | |
static final String S3INCR_FILE_EXTENSIONS_OPTIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
should align with the actual key, and suffix OPTIONS
since it is a key not the extensions
// ToDo make it a list of extensions | ||
static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | ||
|
||
static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; | |
// Add a comment on the purpose of this config and rename as below | |
static final String ADD_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.add.source.partition.column"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. same naming rules here
static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | ||
|
||
static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; | ||
static final Boolean DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we fully tested this change? If not, I would suggest keeping the default false for now.
private Dataset addPartitionColumn(Dataset ds, List<String> cloudFiles) { | ||
if (props.getBoolean(Config.ATTACH_SOURCE_PARTITION_COLUMN, Config.DEFAULT_ATTACH_SOURCE_PARTITION_COLUMN) | ||
&& !StringUtils.isNullOrEmpty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) { | ||
String partitionKey = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(":")[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return early or log error/warn if partitionKey
is null or empty?
List<String> nestedPartition = Arrays.stream(filePath.split("/")) | ||
.filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList()); | ||
if (nestedPartition.size() > 1) { | ||
throw new HoodieException("More than one level of partitioning exists"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it planned to be supported sometime in future? If yes, let's create a tracking JIRA for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multiple level partition is very common. so this is a major limitation? if push this out, how would it affect existing users?
return ds; | ||
} | ||
|
||
private Column s3EventsColumnFilter(String fileFormat) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A minor suggestion to extract such kind of methods to a separate util class and keep this class plain and simple. Or if you prefer to keep these methods in this class for better readability then move it to the bottom (i.e. after the call site) for linear flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick skimmed the code. have some comments
@@ -81,6 +94,12 @@ static class Config { | |||
* - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"} | |||
*/ | |||
static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options"; | |||
|
|||
// ToDo make it a list of extensions | |||
static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | |
static final String S3INCR_FILE_EXTENSIONS_OPTIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; |
should align with the actual key, and suffix OPTIONS
since it is a key not the extensions
// ToDo make it a list of extensions | ||
static final String S3_ACTUAL_FILE_EXTENSIONS = "hoodie.deltastreamer.source.s3incr.file.extensions"; | ||
|
||
static final String ATTACH_SOURCE_PARTITION_COLUMN = "hoodie.deltastreamer.source.s3incr.source.partition.exists"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. same naming rules here
List<String> nestedPartition = Arrays.stream(filePath.split("/")) | ||
.filter(level -> level.contains(partitionPathPattern)).collect(Collectors.toList()); | ||
if (nestedPartition.size() > 1) { | ||
throw new HoodieException("More than one level of partitioning exists"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multiple level partition is very common. so this is a major limitation? if push this out, how would it affect existing users?
if (checkExists) { | ||
FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration); | ||
try { | ||
if (fs.exists(new Path(decodeUrl))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creating hadoop Path gives much more memory overhead than normal instantiation. If just for checking, let's find a better way.
.filter(filterColumn) | ||
.select("s3.bucket.name", "s3.object.key") | ||
.distinct() | ||
.rdd().toJavaRDD().mapPartitions(fileListIterator -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why convert to RDD? you should be able to do mapPartitions with Dataset too
Priority is to land #6228 ahead of this while this can make it to the next release. |
…-S3-Incremental-Source
@@ -274,6 +277,11 @@ public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, Strin | |||
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA); | |||
} | |||
|
|||
public RawTripTestPayload generatePayloadForS3EventsSchema(HoodieKey key, String commitTime) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RawTripTestPayload
assumes some form of trips schema. If you look at its constructor, we don't use the schema. And its APIs assume a few things about the schema. Should we keep all this out of HoodieTestDataGenerator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an old comment. Please check if it's still valid.
import org.apache.avro.generic.GenericData; | ||
import org.apache.avro.generic.GenericRecord; | ||
|
||
// Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be multi-line comment
.requiredString("eventSource") | ||
.requiredString("eventName") | ||
.name("s3") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's extract all these strings to constants.
dos.write(jsonData.getBytes()); | ||
} finally { | ||
dos.flush(); | ||
dos.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this close the ByteArrayOutputStream too?
/** | ||
* Generic class for specific payload implementations to inherit from. | ||
*/ | ||
public abstract class GenericTestPayload { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to AbstractJsonTestPayload? It's essentially for json data right?
import org.apache.avro.generic.GenericData; | ||
import org.apache.avro.generic.GenericRecord; | ||
|
||
// Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be multi line comment.
.requiredString("eventSource") | ||
.requiredString("eventName") | ||
.requiredString("_row_key") | ||
.name("s3") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preferably extract all these as static string constants.
@Override | ||
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) { | ||
Pair<Option<Dataset<Row>>, String> sourceMetadata = fetchMetadata(lastCkptStr, sourceLimit); | ||
if (!sourceMetadata.getKey().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sourceMetadata.getLeft?
fileListIterator.forEachRemaining(row -> { | ||
// TODO: configuration is updated in the getFs call. check if new copy is needed w.r.t to getFs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still required?
} | ||
|
||
@Test | ||
public void testHoodieIncrSource() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to testS3EventsHoodieIncrSource?
this.dataSize = jsonData.length(); | ||
Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class); | ||
this.rowKey = jsonRecordMap.get("_row_key").toString(); | ||
this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i recall this logic has been refactored in the current RawTripTestPayload
.mapPartitions((MapPartitionsFunction<Row, String>) fileListIterator -> { | ||
.rdd() | ||
// JavaRDD simplifies coding with collect and suitable mapPartitions signature. check if this can be avoided. | ||
.toJavaRDD() | ||
.mapPartitions(fileListIterator -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually prefer high level dataframe apis. how is it actually beneficial to convert to rdd here? don't quite get the comment
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have the EOL
/** | ||
* Test payload for S3 event here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html). | ||
*/ | ||
public class S3EventTestPayload extends GenericTestPayload implements HoodieRecordPayload<S3EventTestPayload> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest just test with DefaultHoodieRecordPayload with a specific S3 event schema, instead of creating a new test payload, as we want to test as close as the real scenario. Besides, we don't couple payload with schema, as payload is just responsible for how to merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a lot of existing misused with the RawTripTestPayload see https://issues.apache.org/jira/browse/HUDI-6164
so you may want to decouple the improvement changes from payload changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For whatever improvements done for S3 incr source, we should make the same for GCS incr source?
Closing this as not needed. |
What is the purpose of the pull request
S3 Incremental source improvements:
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.