From df816d4a42d69b8a51e7d1ccb6771ab1fdbc5c31 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Tue, 17 Jan 2023 17:29:48 +0800 Subject: [PATCH 01/10] create a local hdfs using MiniDFSCluster --- gradle/dependencies.gradle | 6 +- it/build.gradle | 3 + .../main/java/org/astraea/it/HdfsServer.java | 115 ++++++++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 it/src/main/java/org/astraea/it/HdfsServer.java diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 451f07fc5a..262b2d53e2 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -36,6 +36,8 @@ def versions = [ spark : project.properties['spark.version'] ?: "3.3.1", "wix-embedded-mysql" : project.properties['wix-embedded-mysql.version'] ?: "4.6.2", zookeeper : project.properties['zookeeper.version'] ?: "3.8.0", + "hadoop-hdfs" : project.properties['hadoop-hdfs.version'] ?: "3.3.4", + "hadoop-minicluster" : project.properties['hadoop-minicluster.version'] ?: "3.3.4", ] libs += [ @@ -61,6 +63,8 @@ libs += [ "spark-kafka" : "org.apache.spark:spark-sql-kafka-0-10_2.13:${versions["spark"]}", "spark-sql" : "org.apache.spark:spark-sql_2.13:${versions["spark"]}", "wix-embedded-mysql" : "com.wix:wix-embedded-mysql:${versions["wix-embedded-mysql"]}", - zookeeper : "org.apache.zookeeper:zookeeper:${versions["zookeeper"]}" + zookeeper : "org.apache.zookeeper:zookeeper:${versions["zookeeper"]}", + "hadoop-hdfs" : "org.apache.hadoop:hadoop-hdfs:${versions["hadoop-hdfs"]}", + "hadoop-minicluster" : "org.apache.hadoop:hadoop-minicluster:${versions["hadoop-minicluster"]}", ] diff --git a/it/build.gradle b/it/build.gradle index 88285ebdaf..000c58b49a 100644 --- a/it/build.gradle +++ b/it/build.gradle @@ -34,6 +34,9 @@ dependencies { implementation libs["kafka-core"] implementation libs["kafka-connect-runtime"] implementation libs["kafka-connect-json"] + implementation libs["hadoop-hdfs"] + implementation libs["hadoop-minicluster"] + implementation libs["mockito-core"] } java { diff --git a/it/src/main/java/org/astraea/it/HdfsServer.java b/it/src/main/java/org/astraea/it/HdfsServer.java new file mode 100644 index 0000000000..d9cc4d3e2f --- /dev/null +++ b/it/src/main/java/org/astraea/it/HdfsServer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.it; + +import java.io.File; +import java.io.IOException; +import java.util.Objects; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; + +public interface HdfsServer extends AutoCloseable { + + static HdfsServer local() throws IOException { + return builder().build(); + } + + static Builder builder() { + return new Builder(); + } + + String hostname(); + + int port(); + + String user(); + + @Override + void close(); + + class Builder { + + private Builder() { + Configuration conf = new Configuration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, homeFolder.getAbsolutePath()); + miniDfsBuilder = new MiniDFSCluster.Builder(conf); + } + + private File homeFolder = Utils.createTempDirectory("local_hdfs"); + + private MiniDFSCluster.Builder miniDfsBuilder; + + public void checkArguments() { + if (!homeFolder.exists() && !homeFolder.mkdir()) + throw new IllegalArgumentException( + "fail to create folder on " + homeFolder.getAbsolutePath()); + if (!homeFolder.isDirectory()) + throw new IllegalArgumentException(homeFolder.getAbsolutePath() + " is not folder"); + } + + public Builder homeFolder(File homeFolder) { + this.homeFolder = Objects.requireNonNull(homeFolder); + return this; + } + + public Builder controlPort(int port) { + this.miniDfsBuilder = this.miniDfsBuilder.nameNodePort(port); + return this; + } + + public HdfsServer build() { + checkArguments(); + + MiniDFSCluster hdfsCluster; + + try { + hdfsCluster = + this.miniDfsBuilder + .manageDataDfsDirs(true) + .manageDataDfsDirs(true) + .format(true) + .build(); + hdfsCluster.waitClusterUp(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new HdfsServer() { + @Override + public String hostname() { + return hdfsCluster.getNameNode().getHostAndPort().split(":")[0]; + } + + @Override + public int port() { + return hdfsCluster.getNameNodePort(); + } + + @Override + public String user() { + return "root"; + } + + @Override + public void close() { + hdfsCluster.close(); + Utils.delete(homeFolder); + } + }; + } + } +} From 809b714eb7e405aad810a0c62354e2fb31b33061 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Wed, 18 Jan 2023 16:57:32 +0800 Subject: [PATCH 02/10] create HdfsFileSystem and corresponding tests --- fs/build.gradle | 1 + .../main/java/org/astraea/fs/FileSystem.java | 4 +- .../org/astraea/fs/hdfs/HdfsFileSystem.java | 187 ++++++++++++++++++ .../astraea/fs/hdfs/HdfsFileSystemTest.java | 48 +++++ gradle/dependencies.gradle | 4 +- it/build.gradle | 2 +- .../main/java/org/astraea/it/HdfsServer.java | 2 +- 7 files changed, 243 insertions(+), 5 deletions(-) create mode 100644 fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java create mode 100644 fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java diff --git a/fs/build.gradle b/fs/build.gradle index 048d67b23b..57b5070221 100644 --- a/fs/build.gradle +++ b/fs/build.gradle @@ -35,6 +35,7 @@ dependencies { exclude group: "org.slf4j" } implementation libs["commons-net"] + implementation libs["hadoop-common"] } java { diff --git a/fs/src/main/java/org/astraea/fs/FileSystem.java b/fs/src/main/java/org/astraea/fs/FileSystem.java index 8b94aa6c37..876689a877 100644 --- a/fs/src/main/java/org/astraea/fs/FileSystem.java +++ b/fs/src/main/java/org/astraea/fs/FileSystem.java @@ -34,7 +34,9 @@ public interface FileSystem extends AutoCloseable { "ftp.impl", "org.astraea.fs.ftp.FtpFileSystem", "local.impl", - "org.astraea.fs.local.LocalFileSystem"); + "org.astraea.fs.local.LocalFileSystem", + "hdfs.impl", + "org.astraea.fs.hdfs.HdfsFileSystem"); static FileSystem of(String schema, Configuration configuration) { var key = schema.toLowerCase() + "." + "impl"; diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java new file mode 100644 index 0000000000..974010449a --- /dev/null +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.fs.hdfs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.astraea.common.Configuration; +import org.astraea.common.Utils; +import org.astraea.fs.FileSystem; +import org.astraea.fs.Type; + +public class HdfsFileSystem implements FileSystem { + + public static final String HOSTNAME_KEY = "fs.hdfs.hostname"; + public static final String PORT_KEY = "fs.hdfs.port"; + public static final String USER_KEY = "fs.hdfs.user"; + + private org.apache.hadoop.fs.FileSystem fs; + + public HdfsFileSystem(Configuration config) { + Utils.packException( + () -> { + var uri = + new URI( + "hdfs://" + + config.requireString(HOSTNAME_KEY) + + ":" + + config.requireString(PORT_KEY)); + + fs = org.apache.hadoop.fs.FileSystem.get(uri, new org.apache.hadoop.conf.Configuration()); + }); + } + + @Override + public Type type(String path) { + return Utils.packException( + () -> { + var targetPath = new Path(path); + if (!fs.exists(targetPath)) return Type.NONEXISTENT; + if (fs.getFileStatus(targetPath).isDirectory()) return Type.FOLDER; + return Type.FILE; + }); + } + + @Override + public void mkdir(String path) { + Utils.packException( + () -> { + if (type(path) == Type.FOLDER) return; + if (!fs.mkdirs(new Path(path))) + throw new IllegalArgumentException("Failed to create folder on " + path); + }); + } + + @Override + public List listFiles(String path) { + return Utils.packException( + () -> { + if (type(path) != Type.FOLDER) + throw new IllegalArgumentException(path + " is nto a folder"); + return Arrays.stream(fs.listStatus(new Path(path))) + .filter(FileStatus::isFile) + .map(f -> f.getPath().toUri().getPath()) + .collect(Collectors.toList()); + }); + } + + @Override + public List listFolders(String path) { + return Utils.packException( + () -> { + if (type(path) != Type.FOLDER) + throw new IllegalArgumentException(path + " is nto a folder"); + return Arrays.stream(fs.listStatus(new Path(path))) + .filter(FileStatus::isDirectory) + .map(f -> f.getPath().getName()) + .collect(Collectors.toList()); + }); + } + + @Override + public void delete(String path) { + Utils.packException( + () -> { + if (path.equals("/")) + throw new IllegalArgumentException("Can't delete while root folder"); + if (type(path) == Type.NONEXISTENT) return; + fs.delete(new Path(path), true); + }); + } + + @Override + public InputStream read(String path) { + return Utils.packException( + () -> { + if (type(path) != Type.FILE) throw new IllegalArgumentException(path + " is not a file"); + var inputStream = fs.open(new Path(path)); + if (inputStream == null) { + fs.close(); + throw new IllegalArgumentException("failed to open file on " + path); + } + + return new InputStream() { + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + return inputStream.read(b, off, len); + } + + @Override + public int readNBytes(byte[] b, int off, int len) throws IOException { + return inputStream.readNBytes(b, off, len); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + }; + }); + } + + @Override + public OutputStream write(String path) { + return Utils.packException( + () -> { + if (type(path) == Type.FOLDER) throw new IllegalArgumentException(path + " is a folder"); + var outputStream = fs.create(new Path(path), true); + if (outputStream == null) { + fs.close(); + throw new IllegalArgumentException("failed to create file on " + path); + } + + return new OutputStream() { + @Override + public void write(int b) throws IOException { + outputStream.write(b); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + outputStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + outputStream.flush(); + } + + @Override + public void close() throws IOException { + outputStream.close(); + } + }; + }); + } + + @Override + public void close() { + Utils.packException(() -> fs.close()); + } +} diff --git a/fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java b/fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java new file mode 100644 index 0000000000..d4ceb407c3 --- /dev/null +++ b/fs/src/test/java/org/astraea/fs/hdfs/HdfsFileSystemTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.fs.hdfs; + +import java.util.Map; +import org.astraea.common.Configuration; +import org.astraea.fs.AbstractFileSystemTest; +import org.astraea.fs.FileSystem; +import org.astraea.it.HdfsServer; +import org.junit.jupiter.api.AfterEach; + +public class HdfsFileSystemTest extends AbstractFileSystemTest { + + private final HdfsServer server = HdfsServer.local(); + + @Override + protected FileSystem fileSystem() { + return FileSystem.of( + "hdfs", + Configuration.of( + Map.of( + HdfsFileSystem.HOSTNAME_KEY, + server.hostname(), + HdfsFileSystem.PORT_KEY, + String.valueOf(server.port()), + HdfsFileSystem.USER_KEY, + server.user()))); + } + + @AfterEach + void close() { + server.close(); + } +} diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 262b2d53e2..96a429c34a 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -36,7 +36,7 @@ def versions = [ spark : project.properties['spark.version'] ?: "3.3.1", "wix-embedded-mysql" : project.properties['wix-embedded-mysql.version'] ?: "4.6.2", zookeeper : project.properties['zookeeper.version'] ?: "3.8.0", - "hadoop-hdfs" : project.properties['hadoop-hdfs.version'] ?: "3.3.4", + "hadoop-common" : project.properties['hadoop-common.version'] ?: "3.3.4", "hadoop-minicluster" : project.properties['hadoop-minicluster.version'] ?: "3.3.4", ] @@ -64,7 +64,7 @@ libs += [ "spark-sql" : "org.apache.spark:spark-sql_2.13:${versions["spark"]}", "wix-embedded-mysql" : "com.wix:wix-embedded-mysql:${versions["wix-embedded-mysql"]}", zookeeper : "org.apache.zookeeper:zookeeper:${versions["zookeeper"]}", - "hadoop-hdfs" : "org.apache.hadoop:hadoop-hdfs:${versions["hadoop-hdfs"]}", + "hadoop-common" : "org.apache.hadoop:hadoop-common:${versions["hadoop-common"]}", "hadoop-minicluster" : "org.apache.hadoop:hadoop-minicluster:${versions["hadoop-minicluster"]}", ] diff --git a/it/build.gradle b/it/build.gradle index 000c58b49a..a4fdeebbbf 100644 --- a/it/build.gradle +++ b/it/build.gradle @@ -34,7 +34,7 @@ dependencies { implementation libs["kafka-core"] implementation libs["kafka-connect-runtime"] implementation libs["kafka-connect-json"] - implementation libs["hadoop-hdfs"] + implementation libs["hadoop-common"] implementation libs["hadoop-minicluster"] implementation libs["mockito-core"] } diff --git a/it/src/main/java/org/astraea/it/HdfsServer.java b/it/src/main/java/org/astraea/it/HdfsServer.java index d9cc4d3e2f..8950b847c9 100644 --- a/it/src/main/java/org/astraea/it/HdfsServer.java +++ b/it/src/main/java/org/astraea/it/HdfsServer.java @@ -24,7 +24,7 @@ public interface HdfsServer extends AutoCloseable { - static HdfsServer local() throws IOException { + static HdfsServer local() { return builder().build(); } From a39e3357ed2c784c8421c9b5b29379bd9f445fce Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Wed, 18 Jan 2023 17:16:22 +0800 Subject: [PATCH 03/10] create tests for sink task in hdfs --- .../connector/backup/ExporterTest.java | 264 ++++++++++++++++++ 1 file changed, 264 insertions(+) diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 4812965b78..f4e43016e5 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -37,6 +37,7 @@ import org.astraea.common.producer.Producer; import org.astraea.fs.FileSystem; import org.astraea.it.FtpServer; +import org.astraea.it.HdfsServer; import org.astraea.it.Service; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -422,4 +423,267 @@ var record = reader.next(); task.close(); } } + + @Test + void testHdfsSinkTask() { + try (var server = HdfsServer.local()) { + var fileSize = "500Byte"; + var topicName = Utils.randomString(10); + + var task = new Exporter.Task(); + var configs = + Map.of( + "fs.schema", + "hdfs", + "topics", + topicName, + "fs.hdfs.hostname", + String.valueOf(server.hostname()), + "fs.hdfs.port", + String.valueOf(server.port()), + "fs.hdfs.user", + String.valueOf(server.user()), + "path", + "/" + fileSize, + "size", + fileSize, + "tasks.max", + "1", + "roll.duration", + "100m"); + + task.start(configs); + + var records = + List.of( + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test0".getBytes()) + .partition(0) + .timestamp(System.currentTimeMillis()) + .build(), + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test1".getBytes()) + .partition(1) + .timestamp(System.currentTimeMillis()) + .build()); + + task.put(records); + + Utils.sleep(Duration.ofMillis(2000)); + + task.close(); + + Assertions.assertTrue(task.isWriterDone()); + + var fs = FileSystem.of("hdfs", Configuration.of(configs)); + + Assertions.assertEquals( + 2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size()); + + records.forEach( + sinkRecord -> { + var input = + fs.read( + "/" + + String.join( + "/", + fileSize, + topicName, + String.valueOf(sinkRecord.partition()), + String.valueOf(sinkRecord.offset()))); + var reader = RecordReader.builder(input).build(); + + while (reader.hasNext()) { + var record = reader.next(); + Assertions.assertArrayEquals(record.key(), sinkRecord.key()); + Assertions.assertArrayEquals(record.value(), sinkRecord.value()); + Assertions.assertEquals(record.topic(), sinkRecord.topic()); + Assertions.assertEquals(record.partition(), sinkRecord.partition()); + Assertions.assertEquals(record.timestamp(), sinkRecord.timestamp()); + } + }); + } + } + + @Test + void testHdfsSinkTaskIntervalWith1File() { + try (var server = HdfsServer.local()) { + var fileSize = "500Byte"; + var topicName = Utils.randomString(10); + + var task = new Exporter.Task(); + var configs = + Map.of( + "fs.schema", + "hdfs", + "topics", + topicName, + "fs.hdfs.hostname", + String.valueOf(server.hostname()), + "fs.hdfs.port", + String.valueOf(server.port()), + "fs.hdfs.user", + String.valueOf(server.user()), + "path", + "/" + fileSize, + "size", + fileSize, + "tasks.max", + "1", + "roll.duration", + "300ms"); + + task.start(configs); + + var records1 = + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test0".getBytes()) + .partition(0) + .offset(0) + .timestamp(System.currentTimeMillis()) + .build(); + + task.put(List.of(records1)); + + Utils.sleep(Duration.ofMillis(1000)); + + var fs = FileSystem.of("hdfs", Configuration.of(configs)); + + Assertions.assertEquals( + 1, fs.listFiles("/" + String.join("/", fileSize, topicName, "0")).size()); + + var input = + fs.read( + "/" + + String.join( + "/", + fileSize, + topicName, + String.valueOf(records1.partition()), + String.valueOf(records1.offset()))); + var reader = RecordReader.builder(input).build(); + + while (reader.hasNext()) { + var record = reader.next(); + Assertions.assertArrayEquals(record.key(), records1.key()); + Assertions.assertArrayEquals(record.value(), records1.value()); + Assertions.assertEquals(record.topic(), records1.topic()); + Assertions.assertEquals(record.partition(), records1.partition()); + Assertions.assertEquals(record.timestamp(), records1.timestamp()); + Assertions.assertEquals(record.offset(), records1.offset()); + } + task.close(); + } + } + + @Test + void testHdfsSinkTaskIntervalWith2Writers() { + try (var server = HdfsServer.local()) { + var fileSize = "500Byte"; + var topicName = Utils.randomString(10); + + var task = new Exporter.Task(); + var configs = + Map.of( + "fs.schema", + "hdfs", + "topics", + topicName, + "fs.hdfs.hostname", + String.valueOf(server.hostname()), + "fs.hdfs.port", + String.valueOf(server.port()), + "fs.hdfs.user", + String.valueOf(server.user()), + "path", + "/" + fileSize, + "size", + fileSize, + "tasks.max", + "1", + "roll.duration", + "100ms"); + + task.start(configs); + + var record1 = + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test0".getBytes()) + .partition(0) + .offset(0) + .timestamp(System.currentTimeMillis()) + .build(); + + var record2 = + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test1".getBytes()) + .partition(1) + .offset(0) + .timestamp(System.currentTimeMillis()) + .build(); + + var record3 = + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test2".getBytes()) + .partition(0) + .offset(1) + .timestamp(System.currentTimeMillis()) + .build(); + + task.put(List.of(record1)); + Utils.sleep(Duration.ofMillis(500)); + + task.put(List.of(record2)); + Utils.sleep(Duration.ofMillis(1000)); + + task.put(List.of(record3)); + Utils.sleep(Duration.ofMillis(1000)); + + var fs = FileSystem.of("hdfs", Configuration.of(configs)); + + Assertions.assertEquals( + 2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size()); + + Assertions.assertEquals( + 2, fs.listFiles("/" + String.join("/", fileSize, topicName, "0")).size()); + + List.of(record1, record2, record3) + .forEach( + sinkRecord -> { + var input = + fs.read( + "/" + + String.join( + "/", + fileSize, + topicName, + String.valueOf(sinkRecord.partition()), + String.valueOf(sinkRecord.offset()))); + var reader = RecordReader.builder(input).build(); + + while (reader.hasNext()) { + var record = reader.next(); + Assertions.assertArrayEquals(record.key(), sinkRecord.key()); + Assertions.assertArrayEquals(record.value(), sinkRecord.value()); + Assertions.assertEquals(record.topic(), sinkRecord.topic()); + Assertions.assertEquals(record.partition(), sinkRecord.partition()); + Assertions.assertEquals(record.timestamp(), sinkRecord.timestamp()); + Assertions.assertEquals(record.offset(), sinkRecord.offset()); + } + }); + task.close(); + } + } } From 870f10f45aad2679ddf6010ba86fc0889da6c589 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Fri, 20 Jan 2023 00:29:53 +0800 Subject: [PATCH 04/10] remove unnecessary override method. --- .../org/astraea/fs/hdfs/HdfsFileSystem.java | 74 +++---------------- gradle/dependencies.gradle | 4 +- 2 files changed, 14 insertions(+), 64 deletions(-) diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index 974010449a..59f667ec0f 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -112,72 +112,22 @@ public void delete(String path) { @Override public InputStream read(String path) { - return Utils.packException( - () -> { - if (type(path) != Type.FILE) throw new IllegalArgumentException(path + " is not a file"); - var inputStream = fs.open(new Path(path)); - if (inputStream == null) { - fs.close(); - throw new IllegalArgumentException("failed to open file on " + path); - } - - return new InputStream() { - @Override - public int read() throws IOException { - return inputStream.read(); - } - - @Override - public int read(byte b[], int off, int len) throws IOException { - return inputStream.read(b, off, len); - } - - @Override - public int readNBytes(byte[] b, int off, int len) throws IOException { - return inputStream.readNBytes(b, off, len); - } - - @Override - public void close() throws IOException { - inputStream.close(); - } - }; - }); + if (type(path) != Type.FILE) throw new IllegalArgumentException(path + " is not a file"); + try { + return fs.open(new Path(path)); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public OutputStream write(String path) { - return Utils.packException( - () -> { - if (type(path) == Type.FOLDER) throw new IllegalArgumentException(path + " is a folder"); - var outputStream = fs.create(new Path(path), true); - if (outputStream == null) { - fs.close(); - throw new IllegalArgumentException("failed to create file on " + path); - } - - return new OutputStream() { - @Override - public void write(int b) throws IOException { - outputStream.write(b); - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - outputStream.write(b, off, len); - } - - @Override - public void flush() throws IOException { - outputStream.flush(); - } - - @Override - public void close() throws IOException { - outputStream.close(); - } - }; - }); + if (type(path) == Type.FOLDER) throw new IllegalArgumentException(path + " is a folder"); + try { + return fs.create(new Path(path), true); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 96a429c34a..b4c4d798e9 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -36,7 +36,7 @@ def versions = [ spark : project.properties['spark.version'] ?: "3.3.1", "wix-embedded-mysql" : project.properties['wix-embedded-mysql.version'] ?: "4.6.2", zookeeper : project.properties['zookeeper.version'] ?: "3.8.0", - "hadoop-common" : project.properties['hadoop-common.version'] ?: "3.3.4", + "hadoop-common" : project.properties['hadoop-common.version'] ?: "3.3.4", "hadoop-minicluster" : project.properties['hadoop-minicluster.version'] ?: "3.3.4", ] @@ -64,7 +64,7 @@ libs += [ "spark-sql" : "org.apache.spark:spark-sql_2.13:${versions["spark"]}", "wix-embedded-mysql" : "com.wix:wix-embedded-mysql:${versions["wix-embedded-mysql"]}", zookeeper : "org.apache.zookeeper:zookeeper:${versions["zookeeper"]}", - "hadoop-common" : "org.apache.hadoop:hadoop-common:${versions["hadoop-common"]}", + "hadoop-common" : "org.apache.hadoop:hadoop-common:${versions["hadoop-common"]}", "hadoop-minicluster" : "org.apache.hadoop:hadoop-minicluster:${versions["hadoop-minicluster"]}", ] From 0d6d3f9ae6743ac4c12d21fa7f54c538fd134fd3 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Fri, 20 Jan 2023 17:50:25 +0800 Subject: [PATCH 05/10] trying to solve dependency conflict --- fs/build.gradle | 4 +++- it/build.gradle | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/fs/build.gradle b/fs/build.gradle index 57b5070221..1ca365f8c9 100644 --- a/fs/build.gradle +++ b/fs/build.gradle @@ -35,7 +35,9 @@ dependencies { exclude group: "org.slf4j" } implementation libs["commons-net"] - implementation libs["hadoop-common"] + implementation(libs["hadoop-common"]) { + transitive false + } } java { diff --git a/it/build.gradle b/it/build.gradle index a4fdeebbbf..86a485caff 100644 --- a/it/build.gradle +++ b/it/build.gradle @@ -34,7 +34,6 @@ dependencies { implementation libs["kafka-core"] implementation libs["kafka-connect-runtime"] implementation libs["kafka-connect-json"] - implementation libs["hadoop-common"] implementation libs["hadoop-minicluster"] implementation libs["mockito-core"] } From 1fba4e0923be6e53977930367c19830e0cabe853 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sat, 28 Jan 2023 21:50:50 +0800 Subject: [PATCH 06/10] exclude group to avoid ignored test --- fs/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/build.gradle b/fs/build.gradle index 1ca365f8c9..a586675d0e 100644 --- a/fs/build.gradle +++ b/fs/build.gradle @@ -36,7 +36,7 @@ dependencies { } implementation libs["commons-net"] implementation(libs["hadoop-common"]) { - transitive false + exclude group: "com.sun.jersey" } } From 1dc13c448461689b827b68a6265f5883a3359749 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sat, 28 Jan 2023 22:30:57 +0800 Subject: [PATCH 07/10] add comments explaining the reason for the exclusion --- fs/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fs/build.gradle b/fs/build.gradle index a586675d0e..c744d5a8e9 100644 --- a/fs/build.gradle +++ b/fs/build.gradle @@ -36,6 +36,8 @@ dependencies { } implementation libs["commons-net"] implementation(libs["hadoop-common"]) { + // connector uses org.glassfish.jersey. This is the higher version of com.sun.jersey. + // excluding com.sun.jersey can make SERVICE.workerUrl() work. exclude group: "com.sun.jersey" } } From e97b070c53d9f01edfb9bd0b791e027e0b05b41e Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sun, 29 Jan 2023 15:46:46 +0800 Subject: [PATCH 08/10] add the version description for hadoop and connect --- fs/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/build.gradle b/fs/build.gradle index c744d5a8e9..817489fd13 100644 --- a/fs/build.gradle +++ b/fs/build.gradle @@ -36,7 +36,7 @@ dependencies { } implementation libs["commons-net"] implementation(libs["hadoop-common"]) { - // connector uses org.glassfish.jersey. This is the higher version of com.sun.jersey. + // connector uses org.glassfish.jersey(2.34). This is the higher version of com.sun.jersey(1.19). // excluding com.sun.jersey can make SERVICE.workerUrl() work. exclude group: "com.sun.jersey" } From ba6579952c6290658e71f9968c59ed045a6ee577 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sun, 29 Jan 2023 15:57:30 +0800 Subject: [PATCH 09/10] change the comment description --- fs/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/build.gradle b/fs/build.gradle index 817489fd13..b6456bf137 100644 --- a/fs/build.gradle +++ b/fs/build.gradle @@ -36,8 +36,8 @@ dependencies { } implementation libs["commons-net"] implementation(libs["hadoop-common"]) { - // connector uses org.glassfish.jersey(2.34). This is the higher version of com.sun.jersey(1.19). - // excluding com.sun.jersey can make SERVICE.workerUrl() work. + // Kafka connector uses org.glassfish.jersey(2.34) which is conflict with com.sun.jersey(1.19). + // Hence, the later must be excluded from hadoop dependencies to make embedded Kafka worker work exclude group: "com.sun.jersey" } } From d409655e05485103be973e46bbff2bcb78fc2e0a Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sun, 29 Jan 2023 22:56:11 +0800 Subject: [PATCH 10/10] add the comment for the dependency mockito-core --- it/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/it/build.gradle b/it/build.gradle index 86a485caff..148e249e10 100644 --- a/it/build.gradle +++ b/it/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation libs["kafka-connect-runtime"] implementation libs["kafka-connect-json"] implementation libs["hadoop-minicluster"] + // this is required by MiniDFSCluster. So we add it to the dependencies. implementation libs["mockito-core"] }