Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONNECTOR] Add hdfs server, hdfsFileSystem and tests #1447

Merged
merged 10 commits into from
Jan 30, 2023
264 changes: 264 additions & 0 deletions connector/src/test/java/org/astraea/connector/backup/ExporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.astraea.common.producer.Producer;
import org.astraea.fs.FileSystem;
import org.astraea.it.FtpServer;
import org.astraea.it.HdfsServer;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -422,4 +423,267 @@ var record = reader.next();
task.close();
}
}

@Test
void testHdfsSinkTask() {
try (var server = HdfsServer.local()) {
var fileSize = "500Byte";
var topicName = Utils.randomString(10);

var task = new Exporter.Task();
var configs =
Map.of(
"fs.schema",
"hdfs",
"topics",
topicName,
"fs.hdfs.hostname",
String.valueOf(server.hostname()),
"fs.hdfs.port",
String.valueOf(server.port()),
"fs.hdfs.user",
String.valueOf(server.user()),
"path",
"/" + fileSize,
"size",
fileSize,
"tasks.max",
"1",
"roll.duration",
"100m");

task.start(configs);

var records =
List.of(
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test0".getBytes())
.partition(0)
.timestamp(System.currentTimeMillis())
.build(),
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test1".getBytes())
.partition(1)
.timestamp(System.currentTimeMillis())
.build());

task.put(records);

Utils.sleep(Duration.ofMillis(2000));

task.close();

Assertions.assertTrue(task.isWriterDone());

var fs = FileSystem.of("hdfs", Configuration.of(configs));

Assertions.assertEquals(
2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size());

records.forEach(
sinkRecord -> {
var input =
fs.read(
"/"
+ String.join(
"/",
fileSize,
topicName,
String.valueOf(sinkRecord.partition()),
String.valueOf(sinkRecord.offset())));
var reader = RecordReader.builder(input).build();

while (reader.hasNext()) {
var record = reader.next();
Assertions.assertArrayEquals(record.key(), sinkRecord.key());
Assertions.assertArrayEquals(record.value(), sinkRecord.value());
Assertions.assertEquals(record.topic(), sinkRecord.topic());
Assertions.assertEquals(record.partition(), sinkRecord.partition());
Assertions.assertEquals(record.timestamp(), sinkRecord.timestamp());
}
});
}
}

@Test
void testHdfsSinkTaskIntervalWith1File() {
try (var server = HdfsServer.local()) {
var fileSize = "500Byte";
var topicName = Utils.randomString(10);

var task = new Exporter.Task();
var configs =
Map.of(
"fs.schema",
"hdfs",
"topics",
topicName,
"fs.hdfs.hostname",
String.valueOf(server.hostname()),
"fs.hdfs.port",
String.valueOf(server.port()),
"fs.hdfs.user",
String.valueOf(server.user()),
"path",
"/" + fileSize,
"size",
fileSize,
"tasks.max",
"1",
"roll.duration",
"300ms");

task.start(configs);

var records1 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test0".getBytes())
.partition(0)
.offset(0)
.timestamp(System.currentTimeMillis())
.build();

task.put(List.of(records1));

Utils.sleep(Duration.ofMillis(1000));

var fs = FileSystem.of("hdfs", Configuration.of(configs));

Assertions.assertEquals(
1, fs.listFiles("/" + String.join("/", fileSize, topicName, "0")).size());

var input =
fs.read(
"/"
+ String.join(
"/",
fileSize,
topicName,
String.valueOf(records1.partition()),
String.valueOf(records1.offset())));
var reader = RecordReader.builder(input).build();

while (reader.hasNext()) {
var record = reader.next();
Assertions.assertArrayEquals(record.key(), records1.key());
Assertions.assertArrayEquals(record.value(), records1.value());
Assertions.assertEquals(record.topic(), records1.topic());
Assertions.assertEquals(record.partition(), records1.partition());
Assertions.assertEquals(record.timestamp(), records1.timestamp());
Assertions.assertEquals(record.offset(), records1.offset());
}
task.close();
}
}

@Test
void testHdfsSinkTaskIntervalWith2Writers() {
try (var server = HdfsServer.local()) {
var fileSize = "500Byte";
var topicName = Utils.randomString(10);

var task = new Exporter.Task();
var configs =
Map.of(
"fs.schema",
"hdfs",
"topics",
topicName,
"fs.hdfs.hostname",
String.valueOf(server.hostname()),
"fs.hdfs.port",
String.valueOf(server.port()),
"fs.hdfs.user",
String.valueOf(server.user()),
"path",
"/" + fileSize,
"size",
fileSize,
"tasks.max",
"1",
"roll.duration",
"100ms");

task.start(configs);

var record1 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test0".getBytes())
.partition(0)
.offset(0)
.timestamp(System.currentTimeMillis())
.build();

var record2 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test1".getBytes())
.partition(1)
.offset(0)
.timestamp(System.currentTimeMillis())
.build();

var record3 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test2".getBytes())
.partition(0)
.offset(1)
.timestamp(System.currentTimeMillis())
.build();

task.put(List.of(record1));
Utils.sleep(Duration.ofMillis(500));

task.put(List.of(record2));
Utils.sleep(Duration.ofMillis(1000));

task.put(List.of(record3));
Utils.sleep(Duration.ofMillis(1000));

var fs = FileSystem.of("hdfs", Configuration.of(configs));

Assertions.assertEquals(
2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size());

Assertions.assertEquals(
2, fs.listFiles("/" + String.join("/", fileSize, topicName, "0")).size());

List.of(record1, record2, record3)
.forEach(
sinkRecord -> {
var input =
fs.read(
"/"
+ String.join(
"/",
fileSize,
topicName,
String.valueOf(sinkRecord.partition()),
String.valueOf(sinkRecord.offset())));
var reader = RecordReader.builder(input).build();

while (reader.hasNext()) {
var record = reader.next();
Assertions.assertArrayEquals(record.key(), sinkRecord.key());
Assertions.assertArrayEquals(record.value(), sinkRecord.value());
Assertions.assertEquals(record.topic(), sinkRecord.topic());
Assertions.assertEquals(record.partition(), sinkRecord.partition());
Assertions.assertEquals(record.timestamp(), sinkRecord.timestamp());
Assertions.assertEquals(record.offset(), sinkRecord.offset());
}
});
task.close();
}
}
}
3 changes: 3 additions & 0 deletions fs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies {
exclude group: "org.slf4j"
}
implementation libs["commons-net"]
implementation(libs["hadoop-common"]) {
exclude group: "com.sun.jersey"
}
}

java {
Expand Down
4 changes: 3 additions & 1 deletion fs/src/main/java/org/astraea/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public interface FileSystem extends AutoCloseable {
"ftp.impl",
"org.astraea.fs.ftp.FtpFileSystem",
"local.impl",
"org.astraea.fs.local.LocalFileSystem");
"org.astraea.fs.local.LocalFileSystem",
"hdfs.impl",
"org.astraea.fs.hdfs.HdfsFileSystem");

static FileSystem of(String schema, Configuration configuration) {
var key = schema.toLowerCase() + "." + "impl";
Expand Down
Loading