Skip to content

Commit

Permalink
[HUDI-3336][HUDI-FLINK] Configurations transferred through Flink SQL …
Browse files Browse the repository at this point in the history
…cannot take effect
  • Loading branch information
cuibo01 committed Jan 29, 2022
1 parent 6e9036e commit ec269f1
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -664,21 +664,21 @@ private FlinkOptions() {
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
return getPropertiesWithPrefix(options, PROPERTIES_PREFIX);
}

/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String, String> options, String prefix) {
public static Map<String, String> getPropertiesWithPrefix(Map<String, String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();

if (hasPropertyOptions(options)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.filter(key -> key.startsWith(PROPERTIES_PREFIX + prefix))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((prefix).length());
final String subKey = key.substring((PROPERTIES_PREFIX + prefix).length());
hoodieProperties.put(subKey, value);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class HoodieCatalog extends AbstractCatalog {
public HoodieCatalog(String name, Configuration options) {
super(name, options.get(DEFAULT_DATABASE));
this.catalogPathStr = options.get(CATALOG_PATH);
this.hadoopConf = StreamerUtil.getHadoopConf(options);
this.hadoopConf = StreamerUtil.getHadoopConf(options);
this.tableCommonOptions = CatalogOptions.tableCommonOptions(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.hadoop.conf.Configuration hadoopConf) {
final String prefix = "parquet.";
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = FlinkOptions.getHoodiePropertiesWithPrefix(options.toMap(), prefix);
Map<String, String> parquetOptions = FlinkOptions.getPropertiesWithPrefix(options.toMap(), prefix);
parquetOptions.forEach((k, v) -> copy.set(prefix + k, v));
return copy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration c
} else {
final String prefix = "hadoop.";
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
Map<String, String> options = FlinkOptions.getHoodiePropertiesWithPrefix(configuration.toMap(), prefix);
options.forEach((k, v) -> hadoopConf.set(prefix + k, v));
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(configuration.toMap(), prefix);
options.forEach((k, v) -> hadoopConf.set(k, v));
return hadoopConf;
}
}
Expand Down

0 comments on commit ec269f1

Please sign in to comment.