From b980f6ca9160bf4d3059abc1893f7caf6e14b88d Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Thu, 9 Feb 2023 17:27:26 +0800 Subject: [PATCH 01/11] add configuration to write data into a DataNode located on another host. Additionally, make user property in hdfs filesystem available. --- fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index 59f667ec0f..b5055774c9 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -48,7 +48,9 @@ public HdfsFileSystem(Configuration config) { + ":" + config.requireString(PORT_KEY)); - fs = org.apache.hadoop.fs.FileSystem.get(uri, new org.apache.hadoop.conf.Configuration()); + var conf = new org.apache.hadoop.conf.Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + fs = org.apache.hadoop.fs.FileSystem.get(uri, conf, config.requireString(USER_KEY)); }); } From 9de7e475626f3c3dd289bc973ab483167f5066fa Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Thu, 9 Feb 2023 17:30:34 +0800 Subject: [PATCH 02/11] fix the typo. --- fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index b5055774c9..457f3d84a4 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -80,7 +80,7 @@ public List listFiles(String path) { return Utils.packException( () -> { if (type(path) != Type.FOLDER) - throw new IllegalArgumentException(path + " is nto a folder"); + throw new IllegalArgumentException(path + " is not a folder"); return Arrays.stream(fs.listStatus(new Path(path))) .filter(FileStatus::isFile) .map(f -> f.getPath().toUri().getPath()) @@ -93,7 +93,7 @@ public List listFolders(String path) { return Utils.packException( () -> { if (type(path) != Type.FOLDER) - throw new IllegalArgumentException(path + " is nto a folder"); + throw new IllegalArgumentException(path + " is not a folder"); return Arrays.stream(fs.listStatus(new Path(path))) .filter(FileStatus::isDirectory) .map(f -> f.getPath().getName()) From 877ab951043365c4c54047836777cc1a35e4491a Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Thu, 9 Feb 2023 17:35:56 +0800 Subject: [PATCH 03/11] corrected the wrong indication of the necessity of the path parameter --- docs/connector/exporter.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connector/exporter.md b/docs/connector/exporter.md index 22e4a54b56..55ff94e4d2 100644 --- a/docs/connector/exporter.md +++ b/docs/connector/exporter.md @@ -16,11 +16,11 @@ | 參數名稱 | 說明 | 預設值 | |:--------------------------|--------------------------------------------------------------------------------------------------------------|-------| | fs.schema | (必填) 決定儲存目標為何種檔案系統,例如: `local`, `ftp`等 | 無 | +| path | (必填) 填入目標檔案系統要儲存的資料夾目錄之目標位置 | 無 | | fs.{file System}.hostname | (選填) 如果最初的 `fs.schema` 選定為非 `local` 之項目,需要填入目標 `host name`, `file System` 取決於前者之內容 | 無 | | fs.{file System}.port | (選填) 填入目標檔案系統之 `port` | 無 | | fs.{file System}.user | (選填) 填入目標檔案系統之登入 `user` | 無 | | fs.{file System}.password | (選填) 填入目標檔案系統之登入 `password` | 無 | -| path | (選填) 填入目標檔案系統要儲存的資料夾目錄之目標位置 | 無 | | size | (選填) 寫入檔案目標超過此設定之大小上限時會創見新檔案,並且寫入目標改為新創建之檔案。
檔案大小單位: `Bit`, `Kb`, `KiB`, `Mb`, etc. | 100MB | | roll.duration | (選填) 如果 `connector` 在超過此時間沒有任何資料流入,會把當下所有已創建之檔案關閉,並在之後有新資料時會創建新檔案並寫入。
時間單位: `s`, `m`, `h`, `day`, etc. | 3s | From 1e28f6ac01d46fafd020bdd2d0f1491f6c3b0e12 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Thu, 9 Feb 2023 20:59:21 +0800 Subject: [PATCH 04/11] return the system username in local hdfs to avoid permission denied --- it/src/main/java/org/astraea/it/HdfsServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/it/src/main/java/org/astraea/it/HdfsServer.java b/it/src/main/java/org/astraea/it/HdfsServer.java index 8950b847c9..64f4638e51 100644 --- a/it/src/main/java/org/astraea/it/HdfsServer.java +++ b/it/src/main/java/org/astraea/it/HdfsServer.java @@ -101,7 +101,7 @@ public int port() { @Override public String user() { - return "root"; + return System.getProperty("user.name"); } @Override From ffed32197b7dcfde3deb56650cad7534a5ce138f Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Fri, 10 Feb 2023 14:52:24 +0800 Subject: [PATCH 05/11] add parameter fs..override --- .../src/main/java/org/astraea/common/Configuration.java | 9 +++++++++ .../astraea/common/partitioner/ConfigurationTest.java | 1 + .../java/org/astraea/connector/backup/ExporterTest.java | 4 +++- fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java | 3 ++- 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index 6d4d80399a..2746480f38 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -102,6 +102,15 @@ default String requireString(String key) { return string(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent")); } + /** + * @param key the key whose associated value is to be returned should have a corresponding value + * in the format of "k1:v1,k2:v2" + * @return string map object, never null + */ + default Map requireMap(String key) { + return map(key, ",", ":", k -> k.replaceAll("\n", ""), String::valueOf); + } + /** * @param key the key whose associated value is to be returned * @param separator to split string to multiple strings diff --git a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java index e7eece7950..cd36791bf2 100644 --- a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java @@ -43,5 +43,6 @@ void testMap() { var config = Configuration.of(Map.of("key", "v0:0,v1:1")); Assertions.assertEquals( Map.of("v0", 0, "v1", 1), config.map("key", ",", ":", Integer::valueOf)); + Assertions.assertEquals(Map.of("v0", "0", "v1", "1"), config.requireMap("key")); } } diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index f4e43016e5..72f763a641 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -450,7 +450,9 @@ void testHdfsSinkTask() { "tasks.max", "1", "roll.duration", - "100m"); + "100m", + "fs.hdfs.override", + "dfs.client.use.datanode.hostname:true"); task.start(configs); diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index 457f3d84a4..efa76b14f2 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -35,6 +35,7 @@ public class HdfsFileSystem implements FileSystem { public static final String HOSTNAME_KEY = "fs.hdfs.hostname"; public static final String PORT_KEY = "fs.hdfs.port"; public static final String USER_KEY = "fs.hdfs.user"; + public static final String OVERRIDE_KEY = "fs.hdfs.override"; private org.apache.hadoop.fs.FileSystem fs; @@ -49,7 +50,7 @@ public HdfsFileSystem(Configuration config) { + config.requireString(PORT_KEY)); var conf = new org.apache.hadoop.conf.Configuration(); - conf.set("dfs.client.use.datanode.hostname", "true"); + config.requireMap(OVERRIDE_KEY).forEach(conf::set); fs = org.apache.hadoop.fs.FileSystem.get(uri, conf, config.requireString(USER_KEY)); }); } From 0eaef208c96422a17550fea5417fffdfe2ca5aa6 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Fri, 10 Feb 2023 14:59:54 +0800 Subject: [PATCH 06/11] add the documentation about fs..override --- .../astraea/connector/backup/Exporter.java | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 1f63e16a6b..caf9695dc6 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -43,32 +43,32 @@ public class Exporter extends SinkConnector { Definition.builder() .name("fs.schema") .type(Definition.Type.STRING) - .documentation("decide which file system to use, such as FTP.") + .documentation("decide which file system to use, such as FTP, HDFS.") .required() .build(); static Definition HOSTNAME_KEY = Definition.builder() - .name("fs.ftp.hostname") + .name("fs..hostname") .type(Definition.Type.STRING) - .documentation("the host name of the ftp server used.") + .documentation("the host name of the server used.") .build(); static Definition PORT_KEY = Definition.builder() - .name("fs.ftp.port") + .name("fs..port") .type(Definition.Type.STRING) - .documentation("the port of the ftp server used.") + .documentation("the port of the server used.") .build(); static Definition USER_KEY = Definition.builder() - .name("fs.ftp.user") + .name("fs..user") .type(Definition.Type.STRING) - .documentation("the user name required to login to the FTP server.") + .documentation("the user name required to login to the server.") .build(); static Definition PASSWORD_KEY = Definition.builder() - .name("fs.ftp.password") + .name("fs..password") .type(Definition.Type.PASSWORD) - .documentation("the password required to login to the ftp server.") + .documentation("the password required to login to the server.") .build(); static Definition PATH_KEY = Definition.builder() @@ -94,6 +94,14 @@ public class Exporter extends SinkConnector { .defaultValue("3s") .documentation("the maximum time before a new archive file is rolling out.") .build(); + + static Definition OVERRIDE_KEY = + Definition.builder() + .name("fs..override") + .type(Definition.Type.STRING) + .documentation( + "a string that needs to be overridden in the file system should have a format of \"k1:v1,k2:v2\".") + .build(); private Configuration configs; @Override @@ -113,7 +121,15 @@ protected List takeConfiguration(int maxTasks) { @Override protected List definitions() { - return List.of(SCHEMA_KEY, HOSTNAME_KEY, PORT_KEY, USER_KEY, PASSWORD_KEY, PATH_KEY, SIZE_KEY); + return List.of( + SCHEMA_KEY, + HOSTNAME_KEY, + PORT_KEY, + USER_KEY, + PASSWORD_KEY, + PATH_KEY, + SIZE_KEY, + OVERRIDE_KEY); } public static class Task extends SinkTask { From 42cc821adc182c151fe7560de45b39b7f7db91c0 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Fri, 10 Feb 2023 15:44:31 +0800 Subject: [PATCH 07/11] add try-catch to avoid fs.hdfs.override non-exist error --- fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index efa76b14f2..806ac69707 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -22,6 +22,7 @@ import java.net.URI; import java.util.Arrays; import java.util.List; +import java.util.NoSuchElementException; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -50,7 +51,11 @@ public HdfsFileSystem(Configuration config) { + config.requireString(PORT_KEY)); var conf = new org.apache.hadoop.conf.Configuration(); - config.requireMap(OVERRIDE_KEY).forEach(conf::set); + try { + config.requireMap(OVERRIDE_KEY).forEach(conf::set); + } catch (NoSuchElementException ignored) { + + } fs = org.apache.hadoop.fs.FileSystem.get(uri, conf, config.requireString(USER_KEY)); }); } From 1255d70cc52e0d45acccd99320a17c145cca7215 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Fri, 10 Feb 2023 16:55:47 +0800 Subject: [PATCH 08/11] flat map form of parameter fs..override --- .../java/org/astraea/common/Configuration.java | 9 --------- .../common/partitioner/ConfigurationTest.java | 1 - .../org/astraea/connector/backup/Exporter.java | 5 ++--- .../astraea/connector/backup/ExporterTest.java | 4 ++-- .../java/org/astraea/fs/hdfs/HdfsFileSystem.java | 16 +++++++++++----- 5 files changed, 15 insertions(+), 20 deletions(-) diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index 2746480f38..6d4d80399a 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -102,15 +102,6 @@ default String requireString(String key) { return string(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent")); } - /** - * @param key the key whose associated value is to be returned should have a corresponding value - * in the format of "k1:v1,k2:v2" - * @return string map object, never null - */ - default Map requireMap(String key) { - return map(key, ",", ":", k -> k.replaceAll("\n", ""), String::valueOf); - } - /** * @param key the key whose associated value is to be returned * @param separator to split string to multiple strings diff --git a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java index cd36791bf2..e7eece7950 100644 --- a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java @@ -43,6 +43,5 @@ void testMap() { var config = Configuration.of(Map.of("key", "v0:0,v1:1")); Assertions.assertEquals( Map.of("v0", 0, "v1", 1), config.map("key", ",", ":", Integer::valueOf)); - Assertions.assertEquals(Map.of("v0", "0", "v1", "1"), config.requireMap("key")); } } diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index caf9695dc6..e31e0c4866 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -97,10 +97,9 @@ public class Exporter extends SinkConnector { static Definition OVERRIDE_KEY = Definition.builder() - .name("fs..override") + .name("fs..override.") .type(Definition.Type.STRING) - .documentation( - "a string that needs to be overridden in the file system should have a format of \"k1:v1,k2:v2\".") + .documentation("a value that needs to be overridden in the file system.") .build(); private Configuration configs; diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 72f763a641..f4395deef9 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -451,8 +451,8 @@ void testHdfsSinkTask() { "1", "roll.duration", "100m", - "fs.hdfs.override", - "dfs.client.use.datanode.hostname:true"); + "fs.hdfs.override.dfs.client.use.datanode.hostname", + "true"); task.start(configs); diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index 806ac69707..e34301dc58 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -22,7 +22,6 @@ import java.net.URI; import java.util.Arrays; import java.util.List; -import java.util.NoSuchElementException; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -51,11 +50,18 @@ public HdfsFileSystem(Configuration config) { + config.requireString(PORT_KEY)); var conf = new org.apache.hadoop.conf.Configuration(); - try { - config.requireMap(OVERRIDE_KEY).forEach(conf::set); - } catch (NoSuchElementException ignored) { - } + config + .entrySet() + .forEach( + configItem -> { + if (configItem.getKey().contains(OVERRIDE_KEY)) { + conf.set( + configItem.getKey().replaceAll("fs.hdfs.override.", ""), + configItem.getValue()); + } + }); + fs = org.apache.hadoop.fs.FileSystem.get(uri, conf, config.requireString(USER_KEY)); }); } From 2b1ccac23d73f44ac1535d2981bea13a2a61cdf1 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sat, 11 Feb 2023 21:33:50 +0800 Subject: [PATCH 09/11] add function filteredConfigs and corresponding test --- .../java/org/astraea/common/Configuration.java | 14 ++++++++++++++ .../common/partitioner/ConfigurationTest.java | 6 ++++++ .../java/org/astraea/fs/hdfs/HdfsFileSystem.java | 10 ++-------- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index 6d4d80399a..b3084b50a3 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -102,6 +102,20 @@ default String requireString(String key) { return string(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent")); } + /** + * @param filteredString the string to be filtered and removed + * @return new Configuration only contains which the key value includes the filteredString, and + * the filteredString and the following dot will be removed from the key + */ + default Configuration filteredConfigs(String filteredString) { + return of( + entrySet().stream() + .filter(k -> k.getKey().contains(filteredString)) + .collect( + Collectors.toMap( + i -> i.getKey().replaceAll(filteredString + '.', ""), Map.Entry::getValue))); + } + /** * @param key the key whose associated value is to be returned * @param separator to split string to multiple strings diff --git a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java index e7eece7950..a8d8fc5266 100644 --- a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java @@ -44,4 +44,10 @@ void testMap() { Assertions.assertEquals( Map.of("v0", 0, "v1", 1), config.map("key", ",", ":", Integer::valueOf)); } + + @Test + void testFilteredConfigs() { + var config = Configuration.of(Map.of("key", "v1", "filtered.key", "v2")); + Assertions.assertEquals(Map.of("key", "v2"), config.filteredConfigs("filtered").raw()); + } } diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index e34301dc58..ac63fb5a8d 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -52,15 +52,9 @@ public HdfsFileSystem(Configuration config) { var conf = new org.apache.hadoop.conf.Configuration(); config + .filteredConfigs(OVERRIDE_KEY) .entrySet() - .forEach( - configItem -> { - if (configItem.getKey().contains(OVERRIDE_KEY)) { - conf.set( - configItem.getKey().replaceAll("fs.hdfs.override.", ""), - configItem.getValue()); - } - }); + .forEach(configItem -> conf.set(configItem.getKey(), configItem.getValue())); fs = org.apache.hadoop.fs.FileSystem.get(uri, conf, config.requireString(USER_KEY)); }); From c264b0dfd2ebda389f292da310b3ea9912c5dbda Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sat, 11 Feb 2023 22:12:18 +0800 Subject: [PATCH 10/11] replace the filteredString with prefix and ensure the function does not replace strings that are not at the beginning --- .../main/java/org/astraea/common/Configuration.java | 12 ++++++------ .../common/partitioner/ConfigurationTest.java | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index b3084b50a3..f57b6a45b5 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -103,17 +103,17 @@ default String requireString(String key) { } /** - * @param filteredString the string to be filtered and removed - * @return new Configuration only contains which the key value includes the filteredString, and - * the filteredString and the following dot will be removed from the key + * @param prefix the string to be filtered and removed + * @return new Configuration only contains which the key value starts with the prefix, and the + * prefix string and the following dot will be removed from the key */ - default Configuration filteredConfigs(String filteredString) { + default Configuration filteredConfigs(String prefix) { return of( entrySet().stream() - .filter(k -> k.getKey().contains(filteredString)) + .filter(k -> k.getKey().startsWith(prefix)) .collect( Collectors.toMap( - i -> i.getKey().replaceAll(filteredString + '.', ""), Map.Entry::getValue))); + i -> i.getKey().replaceFirst(prefix + '.', ""), Map.Entry::getValue))); } /** diff --git a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java index a8d8fc5266..e208bd06fd 100644 --- a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java @@ -47,7 +47,7 @@ void testMap() { @Test void testFilteredConfigs() { - var config = Configuration.of(Map.of("key", "v1", "filtered.key", "v2")); + var config = Configuration.of(Map.of("key", "v1", "filtered.key", "v2", "key.filtered", "v3")); Assertions.assertEquals(Map.of("key", "v2"), config.filteredConfigs("filtered").raw()); } } From 7775b5619b51908acc5e0f532b60ebd15d695924 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sun, 12 Feb 2023 12:26:15 +0800 Subject: [PATCH 11/11] change function name from filteredConfigs to filteredPrefixConfigs --- common/src/main/java/org/astraea/common/Configuration.java | 2 +- .../java/org/astraea/common/partitioner/ConfigurationTest.java | 2 +- fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index f57b6a45b5..c61d797af3 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -107,7 +107,7 @@ default String requireString(String key) { * @return new Configuration only contains which the key value starts with the prefix, and the * prefix string and the following dot will be removed from the key */ - default Configuration filteredConfigs(String prefix) { + default Configuration filteredPrefixConfigs(String prefix) { return of( entrySet().stream() .filter(k -> k.getKey().startsWith(prefix)) diff --git a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java index e208bd06fd..a7af687c83 100644 --- a/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/ConfigurationTest.java @@ -48,6 +48,6 @@ void testMap() { @Test void testFilteredConfigs() { var config = Configuration.of(Map.of("key", "v1", "filtered.key", "v2", "key.filtered", "v3")); - Assertions.assertEquals(Map.of("key", "v2"), config.filteredConfigs("filtered").raw()); + Assertions.assertEquals(Map.of("key", "v2"), config.filteredPrefixConfigs("filtered").raw()); } } diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index ac63fb5a8d..c31fdacb2c 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -52,7 +52,7 @@ public HdfsFileSystem(Configuration config) { var conf = new org.apache.hadoop.conf.Configuration(); config - .filteredConfigs(OVERRIDE_KEY) + .filteredPrefixConfigs(OVERRIDE_KEY) .entrySet() .forEach(configItem -> conf.set(configItem.getKey(), configItem.getValue()));