Skip to content

Commit

Permalink
[HUDI-4416] Default database path for hoodie hive catalog (apache#6136)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6c35780)
  • Loading branch information
danny0405 authored and XuQianJin-Stars committed Aug 2, 2022
1 parent f6e7184 commit 22516bd
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 132 deletions.
25 changes: 13 additions & 12 deletions hudi-flink-datasource/hudi-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
Expand Down Expand Up @@ -395,5 +383,18 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- Hive dependencies -->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.hudi.table.catalog;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
Expand All @@ -37,7 +39,6 @@
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -60,9 +61,8 @@ public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTab
allCols.addAll(hiveTable.getPartitionKeys());

String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
List<String> primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName)
? Collections.EMPTY_LIST
: StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
String pkColumnStr = hiveTable.getParameters().getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
List<String> pkColumns = StringUtils.split(pkColumnStr,",");

String[] colNames = new String[allCols.size()];
DataType[] colTypes = new DataType[allCols.size()];
Expand All @@ -73,14 +73,16 @@ public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTab
colNames[i] = fs.getName();
colTypes[i] =
toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
if (primaryColNames.contains(colNames[i])) {
if (pkColumns.contains(colNames[i])) {
colTypes[i] = colTypes[i].notNull();
}
}

org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, primaryColNames);
builder.primaryKeyNamed(pkConstraintName, pkColumns);
} else {
builder.primaryKey(pkColumns);
}

return builder.build();
Expand Down Expand Up @@ -152,7 +154,8 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
case DATE:
return DataTypes.DATE();
case TIMESTAMP:
return DataTypes.TIMESTAMP(9);
// see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
return DataTypes.TIMESTAMP(6);
case BINARY:
return DataTypes.BYTES();
case DECIMAL:
Expand All @@ -168,16 +171,18 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {

/** Create Hive columns from Flink TableSchema. */
public static List<FieldSchema> createHiveColumns(TableSchema schema) {
String[] fieldNames = schema.getFieldNames();
DataType[] fieldTypes = schema.getFieldDataTypes();
final DataType dataType = schema.toPersistedRowDataType();
final RowType rowType = (RowType) dataType.getLogicalType();
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
final DataType[] fieldTypes = dataType.getChildren().toArray(new DataType[0]);

List<FieldSchema> columns = new ArrayList<>(fieldNames.length);

for (int i = 0; i < fieldNames.length; i++) {
columns.add(
new FieldSchema(
fieldNames[i],
toHiveTypeInfo(fieldTypes[i], true).getTypeName(),
toHiveTypeInfo(fieldTypes[i]).getTypeName(),
null));
}

Expand All @@ -191,13 +196,12 @@ public static List<FieldSchema> createHiveColumns(TableSchema schema) {
* checkPrecision is true.
*
* @param dataType a Flink DataType
* @param checkPrecision whether to fail the conversion if the precision of the DataType is not
* supported by Hive
*
* @return the corresponding Hive data type
*/
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
public static TypeInfo toHiveTypeInfo(DataType dataType) {
checkNotNull(dataType, "type cannot be null");
LogicalType logicalType = dataType.getLogicalType();
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Set;

import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;

/**
* A catalog factory impl that creates {@link HoodieCatalog}.
Expand All @@ -59,6 +58,7 @@ public Catalog createCatalog(Context context) {
case "hms":
return new HoodieHiveCatalog(
context.getName(),
helper.getOptions().get(CatalogOptions.CATALOG_PATH),
helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
case "dfs":
Expand All @@ -82,7 +82,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PROPERTY_VERSION);
options.add(CatalogOptions.HIVE_CONF_DIR);
options.add(CatalogOptions.MODE);
options.add(CATALOG_PATH);
options.add(CatalogOptions.CATALOG_PATH);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table.catalog;

import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -69,6 +68,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -92,7 +92,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
Expand All @@ -104,7 +103,6 @@
import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;

Expand All @@ -117,12 +115,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
private final HiveConf hiveConf;
private IMetaStoreClient client;

public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) {
this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
// optional catalog base path: used for db/table path inference.
private final String catalogPath;

public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
}

public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) {
public HoodieHiveCatalog(
String catalogName,
String catalogPath,
String defaultDatabase,
HiveConf hiveConf,
boolean allowEmbedded) {
super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
// fallback to hive.metastore.warehouse.dir if catalog path is not specified
this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
this.hiveConf = hiveConf;
if (!allowEmbedded) {
checkArgument(
Expand All @@ -145,7 +153,7 @@ public void open() throws CatalogException {
}
if (!databaseExists(getDefaultDatabase())) {
LOG.info("{} does not exist, will be created.", getDefaultDatabase());
CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database");
CatalogDatabase database = new CatalogDatabaseImpl(Collections.emptyMap(), "default database");
try {
createDatabase(getDefaultDatabase(), database, true);
} catch (DatabaseAlreadyExistException e) {
Expand Down Expand Up @@ -227,6 +235,10 @@ public void createDatabase(
Map<String, String> properties = database.getProperties();

String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
if (dbLocationUri == null && this.catalogPath != null) {
// infer default location uri
dbLocationUri = new Path(this.catalogPath, databaseName).toString();
}

Database hiveDatabase =
new Database(databaseName, database.getComment(), dbLocationUri, properties);
Expand Down Expand Up @@ -381,8 +393,7 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
Table hiveTable = getHiveTable(tablePath);
hiveTable = translateSparkTable2Flink(tablePath, hiveTable);
Table hiveTable = translateSparkTable2Flink(tablePath, getHiveTable(tablePath));
String path = hiveTable.getSd().getLocation();
Map<String, String> parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
Expand All @@ -391,16 +402,21 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ","));
// pkColumns expect not to be null
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ","));
} else if (pkColumns != null) {
builder.primaryKey(StringUtils.split(pkColumns, ","));
}
schema = builder.build();
} else {
LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
schema = HiveSchemaUtils.convertTableSchema(hiveTable);
}
Map<String, String> options = supplementOptions(tablePath, parameters);
return CatalogTable.of(schema, parameters.get(COMMENT),
HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters);
HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), options);
}

@Override
Expand Down Expand Up @@ -439,8 +455,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}

private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions()));
final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString();
Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPersistedRowDataType().getLogicalType()).toString();
flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);

// stores two copies of options:
Expand All @@ -449,15 +465,13 @@ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTabl
// because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
// when calling #getTable.

if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) {
if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()
&& !flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD);
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
}
flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
}

if (catalogTable.isPartitioned()) {
if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
final String partitions = String.join(",", catalogTable.getPartitionKeys());
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
}
Expand All @@ -468,7 +482,7 @@ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTabl

flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
try {
StreamerUtil.initTableIfNotExists(flinkConf);
StreamerUtil.initTableIfNotExists(flinkConf, hiveConf);
} catch (IOException e) {
throw new HoodieCatalogException("Initialize table exception.", e);
}
Expand All @@ -487,20 +501,6 @@ private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
return location;
}

private Map<String, String> applyOptionsHook(Map<String, String> options) {
Map<String, String> properties = new HashMap<>(options);
if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
}
return properties;
}

private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
// let Hive set default parameters for us, e.g. serialization.format
Table hiveTable =
Expand All @@ -510,7 +510,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));

Map<String, String> properties = applyOptionsHook(table.getOptions());
Map<String, String> properties = new HashMap<>(table.getOptions());

if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
Expand All @@ -523,17 +523,11 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
}

//set pk
if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) {
if (table.getUnresolvedSchema().getPrimaryKey().isPresent()
&& !properties.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(
String.format("Primary key [%s] and record key [%s] should be the the same.",
pkColumns,
recordKey));
}
properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
properties.put(PK_COLUMNS, pkColumns);
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), pkColumns);
}

if (!properties.containsKey(FlinkOptions.PATH.key())) {
Expand Down Expand Up @@ -896,4 +890,22 @@ public void alterPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}

private Map<String, String> supplementOptions(
ObjectPath tablePath,
Map<String, String> options) {
if (HoodieCatalogUtil.isEmbeddedMetastore(hiveConf)) {
return options;
} else {
Map<String, String> newOptions = new HashMap<>(options);
// set up hive sync options
newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
return newOptions;
}
}
}
Loading

0 comments on commit 22516bd

Please sign in to comment.