From ec269f1bd56a1a8eea3b7a4334cc90777ed82980 Mon Sep 17 00:00:00 2001 From: Bo Date: Sat, 29 Jan 2022 19:44:36 +0800 Subject: [PATCH] [HUDI-3336][HUDI-FLINK] Configurations transferred through Flink SQL cannot take effect --- .../java/org/apache/hudi/configuration/FlinkOptions.java | 8 ++++---- .../java/org/apache/hudi/table/catalog/HoodieCatalog.java | 2 +- .../java/org/apache/hudi/table/format/FormatUtils.java | 2 +- .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 77c3f15e54c45..2c430994880fd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -664,21 +664,21 @@ private FlinkOptions() { * Collects the config options that start with 'properties.' into a 'key'='value' list. */ public static Map getHoodieProperties(Map options) { - return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX); + return getPropertiesWithPrefix(options, PROPERTIES_PREFIX); } /** * Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list. */ - public static Map getHoodiePropertiesWithPrefix(Map options, String prefix) { + public static Map getPropertiesWithPrefix(Map options, String prefix) { final Map hoodieProperties = new HashMap<>(); if (hasPropertyOptions(options)) { options.keySet().stream() - .filter(key -> key.startsWith(PROPERTIES_PREFIX)) + .filter(key -> key.startsWith(PROPERTIES_PREFIX + prefix)) .forEach(key -> { final String value = options.get(key); - final String subKey = key.substring((prefix).length()); + final String subKey = key.substring((PROPERTIES_PREFIX + prefix).length()); hoodieProperties.put(subKey, value); }); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index 12e3c41fbe2db..43833f2140ff4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -93,7 +93,7 @@ public class HoodieCatalog extends AbstractCatalog { public HoodieCatalog(String name, Configuration options) { super(name, options.get(DEFAULT_DATABASE)); this.catalogPathStr = options.get(CATALOG_PATH); - this.hadoopConf = StreamerUtil.getHadoopConf(options); + this.hadoopConf = StreamerUtil.getHadoopConf(options); this.tableCommonOptions = CatalogOptions.tableCommonOptions(options); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 2c3318362b053..a654b8dad1ff8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -256,7 +256,7 @@ public static org.apache.hadoop.conf.Configuration getParquetConf( org.apache.hadoop.conf.Configuration hadoopConf) { final String prefix = "parquet."; org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf); - Map parquetOptions = FlinkOptions.getHoodiePropertiesWithPrefix(options.toMap(), prefix); + Map parquetOptions = FlinkOptions.getPropertiesWithPrefix(options.toMap(), prefix); parquetOptions.forEach((k, v) -> copy.set(prefix + k, v)); return copy; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 68b57404d03e9..017fb0f2d31ab 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -152,8 +152,8 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration c } else { final String prefix = "hadoop."; org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf(); - Map options = FlinkOptions.getHoodiePropertiesWithPrefix(configuration.toMap(), prefix); - options.forEach((k, v) -> hadoopConf.set(prefix + k, v)); + Map options = FlinkOptions.getPropertiesWithPrefix(configuration.toMap(), prefix); + options.forEach((k, v) -> hadoopConf.set(k, v)); return hadoopConf; } }