-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark Data Source (finally) #266
Conversation
I am going to work on merging the commits into 1 in the meantime. |
70ab733
to
c41b0c7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments, need to finish last third.
<span style="color:grey"> Options useful for writing datasets </span> | ||
- [OPERATION_OPT_KEY](#OPERATION_OPT_KEY) (Default: upsert) <br/> | ||
<span style="color:grey">whether to do upsert, insert or bulkinsert for the write operation</span> | ||
- [STORAGE_TYPE_OPT_KEY](#STORAGE_TYPE_OPT_KEY) (Default: COPY_ON_WRITE) <br/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this change if you're updating an already existing table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, you can write to a COW table as MOR going forward. But yes, table name and storage type can't change across commits..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a note to that effect.
DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) | ||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), | ||
<beginInstantTime>) | ||
.load(tablePath); // For incremental view, pass in the root/base path of dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-incremental view is it possible to pass in anything other than base path of hoodie table to read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. you can pass in glob with a mix of multiple datasets with both hoodie and non-hoodie ones for the RO view.
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); | ||
HoodieTimeline timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the switch to testing this via timeline vs client? I see you're using HoodieClientTestUtils
below for reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since I moved all the incremental pulling & commit listing methods etc from HoodieReadClient, had to use the timeline directly to have these tests going again.. Hope that makes sense.
@@ -90,4 +106,83 @@ public static SparkConf getSparkConfForTest(String appName) { | |||
.setMaster("local[1]"); | |||
return HoodieReadClient.addHoodieSupport(sparkConf); | |||
} | |||
|
|||
public static HashMap<String, String> getLatestFileIsToFullPath(String basePath, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/FileIs/FileId/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if (i == parts.length - 1) { | ||
return val.toString(); | ||
} else { | ||
if (val instanceof GenericRecord) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be not (val instanceof GenericRecord)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. I have been testing with top level names all along. Will try to add a test around this as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch
} | ||
|
||
/** | ||
* Create a payload class via reflection, passing in an ordering/precombine value value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/value value/value/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same. will do
targetList | ||
} | ||
} | ||
case MapType(StringType, valueType, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Reviewed till here, continuing tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @zqureshi . I know this is a big one :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comments are around style mostly.
* Get a list of instant times that have occurred, from the given instant timestamp. | ||
* | ||
* @param instantTimestamp | ||
* @return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nitpick but I'd remove the empty @return
's in the function docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed what I saw. Fishing this out and enforcing across teh board is out of scope of this PR :)
@@ -349,7 +348,6 @@ public SourceDataFormat convert(String value) throws ParameterException { | |||
public static void main(String[] args) throws Exception { | |||
final Config cfg = new Config(); | |||
JCommander cmd = new JCommander(cfg, args); | |||
// TODO(vc): Do proper validation | |||
if (cfg.help || args.length == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe include a check of to see if none of the other args have been matched.
.load(basePath + "/*/*/*/*") | ||
fail() // we would error out, since no compaction has yet occurred. | ||
} catch { | ||
case e: Exception => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert the exception is the type that you expect
try { | ||
val hoodieROViewDF1 = spark.read.format("com.uber.hoodie") | ||
.load(basePath + "/*/*/*/*") | ||
fail() // we would error out, since no compaction has yet occurred. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a message to the failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
cli.run(); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick 1 newline between method calls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks Good. Lets gets this baby merged.
Dataset<Row> hoodieROViewDF = spark.read() | ||
.format("com.uber.hoodie") | ||
// pass any path glob, can include hoodie & non-hoodie datasets | ||
.load(tablePath + "/*/*/*/*"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pass a pattern to glob? Do we not know this from partition metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because spark.read.parquet
takes a glob of paths. Idea is to support a mix of hoodie/non-hoodie paths.. Underneath, we do look up partition metadata to check if this is a hoodie dataset and so forth.
.format("com.uber.hoodie") | ||
// pass any path glob, can include hoodie & non-hoodie datasets | ||
.load(tablePath + "/*/*/*/*"); | ||
hoodieROViewDF.registerTempTable("hoodie_ro"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass a SQL statement during the read and register it as a temp table? It will be good if we can do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can. a sql statement is executed on this table in lines below.
- Write with COW/MOR paths work fully - Read with RO view works on both storages* - Incremental view supported on COW - Refactored out HoodieReadClient methods, to just contain key based access - HoodieDataSourceHelpers class can be now used to construct inputs to datasource - Tests in hoodie-client using new helpers and mechanisms - Basic tests around save modes & insert/upserts (more to follow) - Bumped up scala to 2.11, since 2.10 is deprecated & complains with scalatest - Updated documentation to describe usage - New sample app written using the DataSource API
* Default: upsert() | ||
*/ | ||
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation" | ||
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All read constant values are upper case whereas all write are lower. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not intentional. But write constant are named consistently as below right
val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = "ts"
all vals are upper case .. May be I am not following along
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val VIEW_TYPE_INCREMENTAL_OPT_VAL = "INCREMENTAL"
vs
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I over generalized, but basically my question was that the value is all caps in the case of some constants but most others are lower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay. no reason. made it consistent now.
// use schema from a file produced in the latest instant | ||
val latestSchema = { | ||
val latestMeta = HoodieCommitMetadata | ||
.fromBytes(commitTimeline.getInstantDetails(commitsToReturn.last).get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite neat. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) as long as things are backwards compatible. queries should work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great! 🚢
Thanks @zqureshi for the diligent review.. Addressed all comments and added a few tests too. Merging soon .. |
c41b0c7
to
31cd0e9
Compare
Fixes #7