Skip to content

Commit

Permalink
Sync Tool registers 2 tables, RO and RT Tables
Browse files Browse the repository at this point in the history
  • Loading branch information
n3nash committed Jun 28, 2017
1 parent 754ab88 commit 36a1947
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 25 deletions.
64 changes: 39 additions & 25 deletions hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent;
import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import com.uber.hoodie.hive.util.SchemaUtil;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand All @@ -39,6 +35,12 @@
import org.slf4j.LoggerFactory;
import parquet.schema.MessageType;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;


/**
* Tool to sync a hoodie HDFS dataset with a hive metastore table.
* Either use it as a api HiveSyncTool.syncHoodieTable(HiveSyncConfig)
Expand All @@ -53,6 +55,7 @@ public class HiveSyncTool {

private static Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class);
private final HoodieHiveClient hoodieHiveClient;
public final static String SUFFIX_REALTIME_TABLE = "_rt";
private final HiveSyncConfig cfg;

public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
Expand All @@ -61,15 +64,38 @@ public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
}

public void syncHoodieTable() {
LOG.info("Trying to sync hoodie table" + cfg.tableName + " with base path " + hoodieHiveClient
switch(hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(false);
break;
case MERGE_ON_READ:
//sync a RO table for MOR
syncHoodieTable(false);
String originalTableName = cfg.tableName;
//TODO : Make realtime table registration optional using a config param
cfg.tableName = cfg.tableName + SUFFIX_REALTIME_TABLE;
//sync a RT table for MOR
syncHoodieTable(true);
cfg.tableName = originalTableName;
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
}
hoodieHiveClient.close();
}

private void syncHoodieTable(boolean isRealTime) {
LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient
.getBasePath() + " of type " + hoodieHiveClient
.getTableType());

// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist();
// Get the parquet schema for this dataset looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();
// Sync schema if needed
syncSchema(tableExists, schema);
syncSchema(tableExists, isRealTime, schema);

LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName);
// Get the last time we successfully synced partitions
Expand All @@ -86,8 +112,6 @@ public void syncHoodieTable() {

hoodieHiveClient.updateLastCommitTimeSynced();
LOG.info("Sync complete for " + cfg.tableName);

hoodieHiveClient.close();
}

/**
Expand All @@ -97,29 +121,19 @@ public void syncHoodieTable() {
* @param tableExists - does table exist
* @param schema - extracted schema
*/
private void syncSchema(boolean tableExists, MessageType schema) {
private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) {
// Check and sync schema
if (!tableExists) {
LOG.info("Table " + cfg.tableName + " is not found. Creating it");
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
hoodieHiveClient.createTable(schema, HoodieInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
break;
case MERGE_ON_READ:
// create RT Table
if(!isRealTime) {
// TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default for now)
hoodieHiveClient.createTable(schema, HoodieInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
} else {
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java#L3488
// Need a fix to check instance of
// hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(),
// MapredParquetOutputFormat.class.getName(), HoodieParquetSerde.class.getName());
hoodieHiveClient.createTable(schema, HoodieRealtimeInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
// TODO - create RO Table
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
MapredParquetOutputFormat.class.getName(), ParquetHiveSerDe.class.getName());
}
} else {
// Check if the dataset schema has evolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,59 @@ public void testSyncMergeOnRead()
hiveClient.getLastCommitTimeSynced().get());
}

@Test
public void testSyncMergeOnReadRT()
throws IOException, InitializationError, URISyntaxException, TException, InterruptedException {
String commitTime = "100";
String deltaCommitTime = "101";
String roTablename = TestUtil.hiveSyncConfig.tableName;
TestUtil.hiveSyncConfig.tableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE;
TestUtil.createMORDataset(commitTime, deltaCommitTime, 5);
HoodieHiveClient hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig,
TestUtil.getHiveConf(), TestUtil.fileSystem);

assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE + " should not exist initially",
hiveClientRT.doesTableExist());

// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(),
TestUtil.fileSystem);
tool.syncHoodieTable();

assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE + " should exist after sync completes",
hiveClientRT.doesTableExist());

assertEquals("Hive Schema should match the dataset schema + partition field",
hiveClientRT.getTableSchema().size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
assertEquals("Table partitions should match the number of partitions we wrote", 5,
hiveClientRT.scanTablePartitions().size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES",
deltaCommitTime,
hiveClientRT.getLastCommitTimeSynced().get());

// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "102";
String deltaCommitTime2 = "103";

TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(),
TestUtil.fileSystem);
tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig,
TestUtil.getHiveConf(), TestUtil.fileSystem);

assertEquals("Hive Schema should match the evolved dataset schema + partition field",
hiveClientRT.getTableSchema().size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
// Sync should add the one partition
assertEquals("The 2 partitions we wrote should be added to hive", 6,
hiveClientRT.scanTablePartitions().size());
assertEquals("The last commit that was sycned should be 103",
deltaCommitTime2,
hiveClientRT.getLastCommitTimeSynced().get());
TestUtil.hiveSyncConfig.tableName = roTablename;
}

}
2 changes: 2 additions & 0 deletions hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ static void createMORDataset(String commitTime, String deltaCommitTime, int numb
DateTime dateTime = DateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE);
HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0))
Expand All @@ -201,6 +202,7 @@ static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimp
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions,
isParquetSchemaSimple, startFrom, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_REALTIME_TABLE);
HoodieCompactionMetadata compactionMetadata = new HoodieCompactionMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.stream().map(k -> new CompactionWriteStat(k, key, 0, 0, 0))
Expand Down

0 comments on commit 36a1947

Please sign in to comment.