Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --filter-dupes to DeltaStreamer #478

Merged
merged 1 commit into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.index.HoodieIndex;
Expand Down Expand Up @@ -142,4 +143,19 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable order
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
return new HoodieRecord<>(hKey, payload);
}

@SuppressWarnings("unchecked")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leletan I reused your method here.. if I land first, then you can just add a new overload that takes parameters and call this..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar Ack. Thanks for letting me know

public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) throws Exception {
try {
HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
} catch (DatasetNotFoundException e) {
// this will be executed when there is no hoodie dataset yet
// so no dups to drop
return incomingHoodieRecords;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,20 @@ public void sync() throws Exception {
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});

// Perform the write
// filter dupes if needed
HoodieWriteConfig hoodieCfg = getHoodieClientConfig();
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg);
}

if (records.isEmpty()) {
log.info("No new data, nothing to commit.. ");
return;
}

// Perform the write
HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg);
String commitTime = client.startCommit();
log.info("Starting commit : " + commitTime);
Expand Down Expand Up @@ -285,6 +297,10 @@ public static class Config implements Serializable {
converter = OperationConvertor.class)
public Operation operation = Operation.UPSERT;

@Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out"
+ "before insert/bulk-insert")
public Boolean filterDupes = false;

@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import com.uber.hoodie.utilities.sources.TestDataSource;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -103,6 +105,11 @@ static void assertRecordCount(long expected, String datasetPath, SQLContext sqlC
assertEquals(expected, recordCount);
}

static List<Row> countsPerCommit(String datasetPath, SQLContext sqlContext) {
return sqlContext.read().format("com.uber.hoodie").load(datasetPath).groupBy("_hoodie_commit_time").count()
.sort("_hoodie_commit_time").collectAsList();
}

static void assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits)
throws IOException {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
Expand Down Expand Up @@ -159,5 +166,30 @@ public void testBulkInsertsAndUpserts() throws Exception {
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1));
}

@Test
public void testFilterDupes() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dupes_dataset";

// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);

// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
// 1000 records for commit 00000 & 1000 for commit 00001
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(1000, counts.get(0).getLong(1));
assertEquals(1000, counts.get(1).getLong(1));
}
}