Skip to content

Commit

Permalink
[HUDI-3336][HUDI-FLINK] Support custom hadoop config options for flink
Browse files Browse the repository at this point in the history
  • Loading branch information
cuibo01 committed Feb 11, 2022
1 parent 099a340 commit 1cb3039
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -659,25 +659,7 @@ private FlinkOptions() {
// -------------------------------------------------------------------------

// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";

/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getPropertiesWithPrefix(Map<String, String> options, String subprefix) {
final Map<String, String> hoodieProperties = new HashMap<>();
String prefix = StringUtils.isNullOrEmpty(subprefix) ? PROPERTIES_PREFIX : PROPERTIES_PREFIX + subprefix;
if (hasPropertyOptions(options, prefix)) {
options.keySet().stream()
.filter(key -> key.startsWith(prefix))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring(prefix.length());
hoodieProperties.put(subKey, value);
});
}
return hoodieProperties;
}
public static final String PROPERTIES_PREFIX = "properties.";

/**
* Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it.
Expand All @@ -694,10 +676,6 @@ public static Configuration flatOptions(Configuration conf) {
return fromMap(propsMap);
}

private static boolean hasPropertyOptions(Map<String, String> options, String prefix) {
return options.keySet().stream().anyMatch(k -> k.startsWith(prefix));
}

/**
* Creates a new configuration that is initialized with the options of the given map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public static class Config {

private Schema targetSchema;

@Deprecated
public FilebasedSchemaProvider(TypedProperties props) {
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf(new Configuration()));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.configuration.FlinkOptions;
Expand Down Expand Up @@ -51,6 +52,10 @@
import java.util.List;
import java.util.Set;

import static org.apache.hudi.configuration.FlinkOptions.PROPERTIES_PREFIX;
import static org.apache.hudi.table.format.FormatUtils.PARQUET_PREFIX;
import static org.apache.hudi.util.StreamerUtil.HADOOP_PREFIX;

/**
* Hoodie data source/sink factory.
*/
Expand All @@ -61,7 +66,9 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(HADOOP_PREFIX, PARQUET_PREFIX, PROPERTIES_PREFIX);
Configuration conf = (Configuration) helper.getOptions();
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
Expand All @@ -78,7 +85,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(HADOOP_PREFIX, PARQUET_PREFIX, PROPERTIES_PREFIX);
Configuration conf = (Configuration) helper.getOptions();
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.format;

import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -55,6 +56,8 @@
* Utilities for format.
*/
public class FormatUtils {
public static final String PARQUET_PREFIX = "parquet.";

private FormatUtils() {
}

Expand Down Expand Up @@ -254,10 +257,10 @@ private static Boolean string2Boolean(String s) {
public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
final String prefix = "parquet.";
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = FlinkOptions.getPropertiesWithPrefix(options.toMap(), prefix);
parquetOptions.forEach((k, v) -> copy.set(prefix + k, v));
DelegatingConfiguration delegatingConf = new DelegatingConfiguration(options, PARQUET_PREFIX);
Map<String, String> parquetOptions = delegatingConf.toMap();
parquetOptions.forEach((k, v) -> copy.set(PARQUET_PREFIX + k, v));
return copy;
}
}
17 changes: 11 additions & 6 deletions hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.util;

import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
Expand Down Expand Up @@ -90,6 +91,8 @@ public class StreamerUtil {

private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);

public static final String HADOOP_PREFIX = "hadoop.";

public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
TypedProperties properties = getProps(config);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers);
Expand All @@ -102,7 +105,7 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) {
return new TypedProperties();
}
return readConfig(
getHadoopConf(cfg),
getHadoopConf(cfg),
new Path(cfg.propsFilePath), cfg.configs).getProps();
}

Expand Down Expand Up @@ -141,18 +144,20 @@ public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Confi
return conf;
}

// Keep the redundant to avoid too many modifications.
@Deprecated
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
return getHadoopConf(null);
return getHadoopConf(new Configuration());
}

// Keep the redundant to avoid too many modifications.
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration configuration) {
if (configuration == null) {
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
if (conf == null) {
return FlinkClientUtil.getHadoopConf();
} else {
final String prefix = "hadoop.";
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(configuration.toMap(), prefix);
DelegatingConfiguration delegatingConf = new DelegatingConfiguration(conf, HADOOP_PREFIX);
Map<String, String> options = delegatingConf.toMap();
options.forEach((k, v) -> hadoopConf.set(k, v));
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ public static void createProperties(
/**
* Read the {@link FileSystemViewStorageConfig} with given table base path.
*/
public static FileSystemViewStorageConfig loadFromProperties(String basePath, Configuration configuration) {
public static FileSystemViewStorageConfig loadFromProperties(String basePath, Configuration conf) {
Path propertyPath = getPropertiesFilePath(basePath);
LOG.info("Loading filesystem view storage properties from " + propertyPath);
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf(configuration));
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf(conf));
Properties props = new Properties();
try {
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void testInstantState() {

@Test
public void testTableInitialized() throws IOException {
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(new Configuration());
String basePath = tempFile.getAbsolutePath();
try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
assertTrue(fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private List<MergeOnReadInputSplit> generateSplits(StreamReadMonitoringFunction

private OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> createReader() throws Exception {
final String basePath = tempFile.getAbsolutePath();
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(new Configuration());
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(hadoopConf).setBasePath(basePath).build();
final List<String> partitionKeys = Collections.singletonList("partition");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void testInstantTimeDiff() {
void testDumpRemoteViewStorageConfig() throws IOException {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamerUtil.createWriteClient(conf);
FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), null);
FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), new Configuration());
assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST));
}
}
Expand Down
10 changes: 5 additions & 5 deletions hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@
public class TestUtils {
public static String getLastPendingInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
.setConf(StreamerUtil.getHadoopConf(new Configuration())).setBasePath(basePath).build();
return StreamerUtil.getLastPendingInstant(metaClient);
}

public static String getLastCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
.setConf(StreamerUtil.getHadoopConf(new Configuration())).setBasePath(basePath).build();
return StreamerUtil.getLastCompletedInstant(metaClient);
}

public static String getLastDeltaCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
.setConf(StreamerUtil.getHadoopConf(new Configuration())).setBasePath(basePath).build();
return metaClient.getCommitsTimeline().filterCompletedInstants()
.filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
.lastInstant()
Expand All @@ -61,15 +61,15 @@ public static String getLastDeltaCompleteInstant(String basePath) {

public static String getFirstCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
.setConf(StreamerUtil.getHadoopConf(new Configuration())).setBasePath(basePath).build();
return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant()
.map(HoodieInstant::getTimestamp).orElse(null);
}

@Nullable
public static String getNthCompleteInstant(String basePath, int n, boolean isDelta) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
.setConf(StreamerUtil.getHadoopConf(new Configuration())).setBasePath(basePath).build();
return metaClient.getActiveTimeline()
.filterCompletedInstants()
.filter(instant -> isDelta ? HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) : HoodieTimeline.COMMIT_ACTION.equals(instant.getAction()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.util.ViewStorageProperties;
Expand Down Expand Up @@ -45,11 +46,12 @@ void testReadWriteProperties() throws IOException {
.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
.withRemoteServerHost("host1")
.withRemoteServerPort(1234).build();
ViewStorageProperties.createProperties(basePath, config, null);
ViewStorageProperties.createProperties(basePath, config, null);
ViewStorageProperties.createProperties(basePath, config, null);
Configuration flinkConfig = new Configuration();
ViewStorageProperties.createProperties(basePath, config, flinkConfig);
ViewStorageProperties.createProperties(basePath, config, flinkConfig);
ViewStorageProperties.createProperties(basePath, config, flinkConfig);

FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath, null);
FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath, new Configuration());
assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK));
assertThat(readConfig.getRemoteViewServerHost(), is("host1"));
assertThat(readConfig.getRemoteViewServerPort(), is(1234));
Expand Down

0 comments on commit 1cb3039

Please sign in to comment.