diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml
index 4a0dd2bfecf5d..dea35c8d4a9cc 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -262,18 +262,6 @@
-
- javax.transaction
- jta
- 1.1
- test
-
-
- javax.transaction
- javax.transaction-api
- 1.3
- test
-
${hive.groupid}
hive-metastore
@@ -395,5 +383,18 @@
${flink.version}
test
+
+
+ javax.transaction
+ jta
+ 1.1
+ test
+
+
+ javax.transaction
+ javax.transaction-api
+ 1.3
+ test
+
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index 36503c152c3c8..c9590ff4a26bf 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -19,11 +19,13 @@
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
@@ -37,7 +39,6 @@
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -60,9 +61,8 @@ public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTab
allCols.addAll(hiveTable.getPartitionKeys());
String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
- List primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName)
- ? Collections.EMPTY_LIST
- : StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
+ String pkColumnStr = hiveTable.getParameters().getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
+ List pkColumns = StringUtils.split(pkColumnStr,",");
String[] colNames = new String[allCols.size()];
DataType[] colTypes = new DataType[allCols.size()];
@@ -73,14 +73,16 @@ public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTab
colNames[i] = fs.getName();
colTypes[i] =
toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
- if (primaryColNames.contains(colNames[i])) {
+ if (pkColumns.contains(colNames[i])) {
colTypes[i] = colTypes[i].notNull();
}
}
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
- builder.primaryKeyNamed(pkConstraintName, primaryColNames);
+ builder.primaryKeyNamed(pkConstraintName, pkColumns);
+ } else {
+ builder.primaryKey(pkColumns);
}
return builder.build();
@@ -152,7 +154,8 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
case DATE:
return DataTypes.DATE();
case TIMESTAMP:
- return DataTypes.TIMESTAMP(9);
+ // see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
+ return DataTypes.TIMESTAMP(6);
case BINARY:
return DataTypes.BYTES();
case DECIMAL:
@@ -168,8 +171,10 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
/** Create Hive columns from Flink TableSchema. */
public static List createHiveColumns(TableSchema schema) {
- String[] fieldNames = schema.getFieldNames();
- DataType[] fieldTypes = schema.getFieldDataTypes();
+ final DataType dataType = schema.toPersistedRowDataType();
+ final RowType rowType = (RowType) dataType.getLogicalType();
+ final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+ final DataType[] fieldTypes = dataType.getChildren().toArray(new DataType[0]);
List columns = new ArrayList<>(fieldNames.length);
@@ -177,7 +182,7 @@ public static List createHiveColumns(TableSchema schema) {
columns.add(
new FieldSchema(
fieldNames[i],
- toHiveTypeInfo(fieldTypes[i], true).getTypeName(),
+ toHiveTypeInfo(fieldTypes[i]).getTypeName(),
null));
}
@@ -191,13 +196,12 @@ public static List createHiveColumns(TableSchema schema) {
* checkPrecision is true.
*
* @param dataType a Flink DataType
- * @param checkPrecision whether to fail the conversion if the precision of the DataType is not
- * supported by Hive
+ *
* @return the corresponding Hive data type
*/
- public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
+ public static TypeInfo toHiveTypeInfo(DataType dataType) {
checkNotNull(dataType, "type cannot be null");
LogicalType logicalType = dataType.getLogicalType();
- return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
+ return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
index 4a63b7a26ba09..e9fbd95e8fd42 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
@@ -34,7 +34,6 @@
import java.util.Set;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
-import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
/**
* A catalog factory impl that creates {@link HoodieCatalog}.
@@ -59,6 +58,7 @@ public Catalog createCatalog(Context context) {
case "hms":
return new HoodieHiveCatalog(
context.getName(),
+ helper.getOptions().get(CatalogOptions.CATALOG_PATH),
helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
case "dfs":
@@ -82,7 +82,7 @@ public Set> optionalOptions() {
options.add(PROPERTY_VERSION);
options.add(CatalogOptions.HIVE_CONF_DIR);
options.add(CatalogOptions.MODE);
- options.add(CATALOG_PATH);
+ options.add(CatalogOptions.CATALOG_PATH);
return options;
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index ff80a7004ba31..1e877b133e1a7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table.catalog;
-import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -69,6 +68,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -92,7 +92,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
@@ -104,7 +103,6 @@
import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
-import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
@@ -117,12 +115,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
private final HiveConf hiveConf;
private IMetaStoreClient client;
- public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) {
- this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
+ // optional catalog base path: used for db/table path inference.
+ private final String catalogPath;
+
+ public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
+ this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
}
- public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) {
+ public HoodieHiveCatalog(
+ String catalogName,
+ String catalogPath,
+ String defaultDatabase,
+ HiveConf hiveConf,
+ boolean allowEmbedded) {
super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
+ // fallback to hive.metastore.warehouse.dir if catalog path is not specified
+ this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
this.hiveConf = hiveConf;
if (!allowEmbedded) {
checkArgument(
@@ -145,7 +153,7 @@ public void open() throws CatalogException {
}
if (!databaseExists(getDefaultDatabase())) {
LOG.info("{} does not exist, will be created.", getDefaultDatabase());
- CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database");
+ CatalogDatabase database = new CatalogDatabaseImpl(Collections.emptyMap(), "default database");
try {
createDatabase(getDefaultDatabase(), database, true);
} catch (DatabaseAlreadyExistException e) {
@@ -227,6 +235,10 @@ public void createDatabase(
Map properties = database.getProperties();
String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
+ if (dbLocationUri == null && this.catalogPath != null) {
+ // infer default location uri
+ dbLocationUri = new Path(this.catalogPath, databaseName).toString();
+ }
Database hiveDatabase =
new Database(databaseName, database.getComment(), dbLocationUri, properties);
@@ -381,8 +393,7 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
- Table hiveTable = getHiveTable(tablePath);
- hiveTable = translateSparkTable2Flink(tablePath, hiveTable);
+ Table hiveTable = translateSparkTable2Flink(tablePath, getHiveTable(tablePath));
String path = hiveTable.getSd().getLocation();
Map parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
@@ -391,16 +402,21 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
+ String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
- builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ","));
+ // pkColumns expect not to be null
+ builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ","));
+ } else if (pkColumns != null) {
+ builder.primaryKey(StringUtils.split(pkColumns, ","));
}
schema = builder.build();
} else {
LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
schema = HiveSchemaUtils.convertTableSchema(hiveTable);
}
+ Map options = supplementOptions(tablePath, parameters);
return CatalogTable.of(schema, parameters.get(COMMENT),
- HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters);
+ HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), options);
}
@Override
@@ -439,8 +455,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
- Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions()));
- final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString();
+ Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
+ final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPersistedRowDataType().getLogicalType()).toString();
flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
// stores two copies of options:
@@ -449,15 +465,13 @@ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTabl
// because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
// when calling #getTable.
- if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) {
+ if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()
+ && !flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
- String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD);
- if (!Objects.equals(pkColumns, recordKey)) {
- throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
- }
+ flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
}
- if (catalogTable.isPartitioned()) {
+ if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
final String partitions = String.join(",", catalogTable.getPartitionKeys());
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
}
@@ -468,7 +482,7 @@ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTabl
flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
try {
- StreamerUtil.initTableIfNotExists(flinkConf);
+ StreamerUtil.initTableIfNotExists(flinkConf, hiveConf);
} catch (IOException e) {
throw new HoodieCatalogException("Initialize table exception.", e);
}
@@ -487,20 +501,6 @@ private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
return location;
}
- private Map applyOptionsHook(Map options) {
- Map properties = new HashMap<>(options);
- if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
- properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
- }
- if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
- properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
- }
- if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
- properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
- }
- return properties;
- }
-
private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
// let Hive set default parameters for us, e.g. serialization.format
Table hiveTable =
@@ -510,7 +510,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
- Map properties = applyOptionsHook(table.getOptions());
+ Map properties = new HashMap<>(table.getOptions());
if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
@@ -523,17 +523,11 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
}
//set pk
- if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) {
+ if (table.getUnresolvedSchema().getPrimaryKey().isPresent()
+ && !properties.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
- String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
- if (!Objects.equals(pkColumns, recordKey)) {
- throw new HoodieCatalogException(
- String.format("Primary key [%s] and record key [%s] should be the the same.",
- pkColumns,
- recordKey));
- }
properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
- properties.put(PK_COLUMNS, pkColumns);
+ properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), pkColumns);
}
if (!properties.containsKey(FlinkOptions.PATH.key())) {
@@ -896,4 +890,22 @@ public void alterPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
+
+ private Map supplementOptions(
+ ObjectPath tablePath,
+ Map options) {
+ if (HoodieCatalogUtil.isEmbeddedMetastore(hiveConf)) {
+ return options;
+ } else {
+ Map newOptions = new HashMap<>(options);
+ // set up hive sync options
+ newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
+ newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
+ newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
+ newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
+ newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
+ newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
+ return newOptions;
+ }
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
index d6cfe3ed723a7..e6b15788fe79e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
@@ -18,8 +18,6 @@
package org.apache.hudi.table.catalog;
-import org.apache.hudi.exception.HoodieCatalogException;
-
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
@@ -40,8 +38,6 @@
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -53,64 +49,25 @@
*/
public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor {
private final LogicalType type;
- // whether to check type precision
- private final boolean checkPrecision;
- TypeInfoLogicalTypeVisitor(DataType dataType, boolean checkPrecision) {
- this(dataType.getLogicalType(), checkPrecision);
+ TypeInfoLogicalTypeVisitor(DataType dataType) {
+ this(dataType.getLogicalType());
}
- TypeInfoLogicalTypeVisitor(LogicalType type, boolean checkPrecision) {
+ TypeInfoLogicalTypeVisitor(LogicalType type) {
this.type = type;
- this.checkPrecision = checkPrecision;
}
@Override
public TypeInfo visit(CharType charType) {
- // Flink and Hive have different length limit for CHAR. Promote it to STRING if it
- // exceeds the limits of
- // Hive and we're told not to check precision. This can be useful when calling Hive UDF
- // to process data.
- if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH || charType.getLength() < 1) {
- if (checkPrecision) {
- throw new HoodieCatalogException(
- String.format(
- "HiveCatalog doesn't support char type with length of '%d'. "
- + "The supported length is [%d, %d]",
- charType.getLength(), 1, HiveChar.MAX_CHAR_LENGTH));
- } else {
- return TypeInfoFactory.stringTypeInfo;
- }
- }
- return TypeInfoFactory.getCharTypeInfo(charType.getLength());
+ // hoodie only supports avro compatible data type
+ return TypeInfoFactory.stringTypeInfo;
}
@Override
public TypeInfo visit(VarCharType varCharType) {
- // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
- // We don't have more information in LogicalTypeRoot to distinguish StringType and a
- // VARCHAR(Integer.MAX_VALUE) instance
- // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
- if (varCharType.getLength() == Integer.MAX_VALUE) {
- return TypeInfoFactory.stringTypeInfo;
- }
- // Flink and Hive have different length limit for VARCHAR. Promote it to STRING if it
- // exceeds the limits of
- // Hive and we're told not to check precision. This can be useful when calling Hive UDF
- // to process data.
- if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH
- || varCharType.getLength() < 1) {
- if (checkPrecision) {
- throw new HoodieCatalogException(
- String.format(
- "HiveCatalog doesn't support varchar type with length of '%d'. "
- + "The supported length is [%d, %d]",
- varCharType.getLength(), 1, HiveVarchar.MAX_VARCHAR_LENGTH));
- } else {
- return TypeInfoFactory.stringTypeInfo;
- }
- }
- return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
+ // hoodie only supports avro compatible data type
+ return TypeInfoFactory.stringTypeInfo;
}
@Override
@@ -140,12 +97,14 @@ public TypeInfo visit(DecimalType decimalType) {
@Override
public TypeInfo visit(TinyIntType tinyIntType) {
- return TypeInfoFactory.byteTypeInfo;
+ // hoodie only supports avro compatible data type
+ return TypeInfoFactory.intTypeInfo;
}
@Override
public TypeInfo visit(SmallIntType smallIntType) {
- return TypeInfoFactory.shortTypeInfo;
+ // hoodie only supports avro compatible data type
+ return TypeInfoFactory.intTypeInfo;
}
@Override
@@ -175,11 +134,14 @@ public TypeInfo visit(DateType dateType) {
@Override
public TypeInfo visit(TimestampType timestampType) {
- if (checkPrecision && timestampType.getPrecision() == 9) {
- throw new HoodieCatalogException(
- "HoodieCatalog currently does not support timestamp of precision 9");
+ int precision = timestampType.getPrecision();
+ // see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
+ // default supports timestamp
+ if (precision == 6) {
+ return TypeInfoFactory.timestampTypeInfo;
+ } else {
+ return TypeInfoFactory.longTypeInfo;
}
- return TypeInfoFactory.timestampTypeInfo;
}
@Override
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d817f537cbddd..5a34f2a178b11 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -273,8 +273,19 @@ public static void checkRequiredProperties(TypedProperties props, List c
* @throws IOException if errors happens when writing metadata
*/
public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
+ return initTableIfNotExists(conf, HadoopConfigurations.getHadoopConf(conf));
+ }
+
+ /**
+ * Initialize the table if it does not exist.
+ *
+ * @param conf the configuration
+ * @throws IOException if errors happens when writing metadata
+ */
+ public static HoodieTableMetaClient initTableIfNotExists(
+ Configuration conf,
+ org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
final String basePath = conf.getString(FlinkOptions.PATH);
- final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
if (!tableExists(basePath, hadoopConf)) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
@@ -529,7 +540,7 @@ public static Schema getLatestTableSchema(String path, org.apache.hadoop.conf.Co
}
return null;
}
-
+
public static boolean fileExists(FileSystem fs, Path path) {
try {
return fs.exists(path);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 9fd7e2f912b70..44d300f55528d 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1342,7 +1342,6 @@ void testBuiltinFunctionWithHMSCatalog() {
.field("f_par string")
.pkField("f_int")
.partitionField("f_par")
- .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + "t1")
.option(FlinkOptions.RECORD_KEY_FIELD, "f_int")
.option(FlinkOptions.PRECOMBINE_FIELD, "f_date")
.end();
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
index 6a077ec7c46b1..8bcf7e795351d 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
@@ -41,6 +41,7 @@ public static HoodieHiveCatalog createHiveCatalog(String name) {
return new HoodieHiveCatalog(
name,
null,
+ null,
createHiveConf(),
true);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 4d9b4e051888e..da6cde4e89ced 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -54,6 +54,7 @@
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -106,7 +107,7 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except
assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString());
assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid");
- assertEquals(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "ts");
+ assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared");
assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid"));
assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1"));
}