Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONNECTOR] Fix some issues in connector #1487

Merged
merged 11 commits into from
Feb 12, 2023
Merged
14 changes: 14 additions & 0 deletions common/src/main/java/org/astraea/common/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ default String requireString(String key) {
return string(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent"));
}

/**
* @param filteredString the string to be filtered and removed
* @return new Configuration only contains which the key value includes the filteredString, and
* the filteredString and the following dot will be removed from the key
*/
default Configuration filteredConfigs(String filteredString) {
return of(
entrySet().stream()
.filter(k -> k.getKey().contains(filteredString))
.collect(
Collectors.toMap(
i -> i.getKey().replaceAll(filteredString + '.', ""), Map.Entry::getValue)));
}

/**
* @param key the key whose associated value is to be returned
* @param separator to split string to multiple strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ void testMap() {
Assertions.assertEquals(
Map.of("v0", 0, "v1", 1), config.map("key", ",", ":", Integer::valueOf));
}

@Test
void testFilteredConfigs() {
var config = Configuration.of(Map.of("key", "v1", "filtered.key", "v2"));
Assertions.assertEquals(Map.of("key", "v2"), config.filteredConfigs("filtered").raw());
}
}
35 changes: 25 additions & 10 deletions connector/src/main/java/org/astraea/connector/backup/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,32 @@ public class Exporter extends SinkConnector {
Definition.builder()
.name("fs.schema")
.type(Definition.Type.STRING)
.documentation("decide which file system to use, such as FTP.")
.documentation("decide which file system to use, such as FTP, HDFS.")
.required()
.build();
static Definition HOSTNAME_KEY =
Definition.builder()
.name("fs.ftp.hostname")
.name("fs.<schema>.hostname")
.type(Definition.Type.STRING)
.documentation("the host name of the ftp server used.")
.documentation("the host name of the <schema> server used.")
.build();
static Definition PORT_KEY =
Definition.builder()
.name("fs.ftp.port")
.name("fs.<schema>.port")
.type(Definition.Type.STRING)
.documentation("the port of the ftp server used.")
.documentation("the port of the <schema> server used.")
.build();
static Definition USER_KEY =
Definition.builder()
.name("fs.ftp.user")
.name("fs.<schema>.user")
.type(Definition.Type.STRING)
.documentation("the user name required to login to the FTP server.")
.documentation("the user name required to login to the <schema> server.")
.build();
static Definition PASSWORD_KEY =
Definition.builder()
.name("fs.ftp.password")
.name("fs.<schema>.password")
.type(Definition.Type.PASSWORD)
.documentation("the password required to login to the ftp server.")
.documentation("the password required to login to the <schema> server.")
.build();
static Definition PATH_KEY =
Definition.builder()
Expand All @@ -94,6 +94,13 @@ public class Exporter extends SinkConnector {
.defaultValue("3s")
.documentation("the maximum time before a new archive file is rolling out.")
.build();

static Definition OVERRIDE_KEY =
Definition.builder()
.name("fs.<schema>.override.<property_name>")
.type(Definition.Type.STRING)
.documentation("a value that needs to be overridden in the file system.")
.build();
private Configuration configs;

@Override
Expand All @@ -113,7 +120,15 @@ protected List<Configuration> takeConfiguration(int maxTasks) {

@Override
protected List<Definition> definitions() {
return List.of(SCHEMA_KEY, HOSTNAME_KEY, PORT_KEY, USER_KEY, PASSWORD_KEY, PATH_KEY, SIZE_KEY);
return List.of(
SCHEMA_KEY,
HOSTNAME_KEY,
PORT_KEY,
USER_KEY,
PASSWORD_KEY,
PATH_KEY,
SIZE_KEY,
OVERRIDE_KEY);
}

public static class Task extends SinkTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ void testHdfsSinkTask() {
"tasks.max",
"1",
"roll.duration",
"100m");
"100m",
"fs.hdfs.override.dfs.client.use.datanode.hostname",
"true");

task.start(configs);

Expand Down
2 changes: 1 addition & 1 deletion docs/connector/exporter.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
| 參數名稱 | 說明 | 預設值 |
|:--------------------------|--------------------------------------------------------------------------------------------------------------|-------|
| fs.schema | (必填) 決定儲存目標為何種檔案系統,例如: `local`, `ftp`等 | 無 |
| path | (必填) 填入目標檔案系統要儲存的資料夾目錄之目標位置 | 無 |
| fs.{file System}.hostname | (選填) 如果最初的 `fs.schema` 選定為非 `local` 之項目,需要填入目標 `host name`, `file System` 取決於前者之內容 | 無 |
| fs.{file System}.port | (選填) 填入目標檔案系統之 `port` | 無 |
| fs.{file System}.user | (選填) 填入目標檔案系統之登入 `user` | 無 |
| fs.{file System}.password | (選填) 填入目標檔案系統之登入 `password` | 無 |
| path | (選填) 填入目標檔案系統要儲存的資料夾目錄之目標位置 | 無 |
| size | (選填) 寫入檔案目標超過此設定之大小上限時會創見新檔案,並且寫入目標改為新創建之檔案。 <br/>檔案大小單位: `Bit`, `Kb`, `KiB`, `Mb`, etc. | 100MB |
| roll.duration | (選填) 如果 `connector` 在超過此時間沒有任何資料流入,會把當下所有已創建之檔案關閉,並在之後有新資料時會創建新檔案並寫入。 <br/>時間單位: `s`, `m`, `h`, `day`, etc. | 3s |

Expand Down
14 changes: 11 additions & 3 deletions fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class HdfsFileSystem implements FileSystem {
public static final String HOSTNAME_KEY = "fs.hdfs.hostname";
public static final String PORT_KEY = "fs.hdfs.port";
public static final String USER_KEY = "fs.hdfs.user";
public static final String OVERRIDE_KEY = "fs.hdfs.override";

private org.apache.hadoop.fs.FileSystem fs;

Expand All @@ -48,7 +49,14 @@ public HdfsFileSystem(Configuration config) {
+ ":"
+ config.requireString(PORT_KEY));

fs = org.apache.hadoop.fs.FileSystem.get(uri, new org.apache.hadoop.conf.Configuration());
var conf = new org.apache.hadoop.conf.Configuration();

config
.filteredConfigs(OVERRIDE_KEY)
.entrySet()
.forEach(configItem -> conf.set(configItem.getKey(), configItem.getValue()));

fs = org.apache.hadoop.fs.FileSystem.get(uri, conf, config.requireString(USER_KEY));
});
}

Expand Down Expand Up @@ -78,7 +86,7 @@ public List<String> listFiles(String path) {
return Utils.packException(
() -> {
if (type(path) != Type.FOLDER)
throw new IllegalArgumentException(path + " is nto a folder");
throw new IllegalArgumentException(path + " is not a folder");
return Arrays.stream(fs.listStatus(new Path(path)))
.filter(FileStatus::isFile)
.map(f -> f.getPath().toUri().getPath())
Expand All @@ -91,7 +99,7 @@ public List<String> listFolders(String path) {
return Utils.packException(
() -> {
if (type(path) != Type.FOLDER)
throw new IllegalArgumentException(path + " is nto a folder");
throw new IllegalArgumentException(path + " is not a folder");
return Arrays.stream(fs.listStatus(new Path(path)))
.filter(FileStatus::isDirectory)
.map(f -> f.getPath().getName())
Expand Down
2 changes: 1 addition & 1 deletion it/src/main/java/org/astraea/it/HdfsServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public int port() {

@Override
public String user() {
return "root";
return System.getProperty("user.name");
}

@Override
Expand Down