From 8e5af71b49376d2cf694744f49747cd75a2b8e8e Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 3 Oct 2018 18:02:09 +0100 Subject: [PATCH] Add --filter-dupes to DeltaStreamer - Optionally filter out duplicates before inserting data - Unit tests --- .../java/com/uber/hoodie/DataSourceUtils.java | 16 ++++++++++ .../deltastreamer/HoodieDeltaStreamer.java | 18 ++++++++++- .../utilities/TestHoodieDeltaStreamer.java | 32 +++++++++++++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 5c0d27248daa7..f953b2a3d9dc7 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -26,6 +26,7 @@ import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.DatasetNotFoundException; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.index.HoodieIndex; @@ -142,4 +143,19 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable order HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); return new HoodieRecord<>(hKey, payload); } + + @SuppressWarnings("unchecked") + public static JavaRDD dropDuplicates(JavaSparkContext jssc, + JavaRDD incomingHoodieRecords, + HoodieWriteConfig writeConfig) throws Exception { + try { + HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig); + return client.tagLocation(incomingHoodieRecords) + .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); + } catch (DatasetNotFoundException e) { + // this will be executed when there is no hoodie dataset yet + // so no dups to drop + return incomingHoodieRecords; + } + } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 835e7fa66a610..c3573fdeb0c42 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -179,8 +179,20 @@ public void sync() throws Exception { return new HoodieRecord<>(keyGenerator.getKey(gr), payload); }); - // Perform the write + // filter dupes if needed HoodieWriteConfig hoodieCfg = getHoodieClientConfig(); + if (cfg.filterDupes) { + // turn upserts to insert + cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; + records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg); + } + + if (records.isEmpty()) { + log.info("No new data, nothing to commit.. "); + return; + } + + // Perform the write HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg); String commitTime = client.startCommit(); log.info("Starting commit : " + commitTime); @@ -285,6 +297,10 @@ public static class Config implements Serializable { converter = OperationConvertor.class) public Operation operation = Operation.UPSERT; + @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out" + + "before insert/bulk-insert") + public Boolean filterDupes = false; + @Parameter(names = {"--spark-master"}, description = "spark master to use.") public String sparkMaster = "local[2]"; diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java index 1a17a687c00b9..63d93b4144a59 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java @@ -32,10 +32,12 @@ import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import com.uber.hoodie.utilities.sources.TestDataSource; import java.io.IOException; +import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.junit.After; import org.junit.AfterClass; @@ -103,6 +105,11 @@ static void assertRecordCount(long expected, String datasetPath, SQLContext sqlC assertEquals(expected, recordCount); } + static List countsPerCommit(String datasetPath, SQLContext sqlContext) { + return sqlContext.read().format("com.uber.hoodie").load(datasetPath).groupBy("_hoodie_commit_time").count() + .sort("_hoodie_commit_time").collectAsList(); + } + static void assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits) throws IOException { HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath); @@ -159,5 +166,30 @@ public void testBulkInsertsAndUpserts() throws Exception { new HoodieDeltaStreamer(cfg, jsc).sync(); TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2); + List counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); + assertEquals(2000, counts.get(0).getLong(1)); + } + + @Test + public void testFilterDupes() throws Exception { + String datasetBasePath = dfsBasePath + "/test_dupes_dataset"; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT); + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1); + + // Generate the same 1000 records + 1000 new ones for upsert + cfg.filterDupes = true; + cfg.sourceLimit = 2000; + cfg.operation = Operation.UPSERT; + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2); + // 1000 records for commit 00000 & 1000 for commit 00001 + List counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); + assertEquals(1000, counts.get(0).getLong(1)); + assertEquals(1000, counts.get(1).getLong(1)); } }