Skip to content

Commit

Permalink
create tests for sink task in hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
Haser0305 committed Jan 18, 2023
1 parent 809b714 commit a39e335
Showing 1 changed file with 264 additions and 0 deletions.
264 changes: 264 additions & 0 deletions connector/src/test/java/org/astraea/connector/backup/ExporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.astraea.common.producer.Producer;
import org.astraea.fs.FileSystem;
import org.astraea.it.FtpServer;
import org.astraea.it.HdfsServer;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -422,4 +423,267 @@ var record = reader.next();
task.close();
}
}

@Test
void testHdfsSinkTask() {
try (var server = HdfsServer.local()) {
var fileSize = "500Byte";
var topicName = Utils.randomString(10);

var task = new Exporter.Task();
var configs =
Map.of(
"fs.schema",
"hdfs",
"topics",
topicName,
"fs.hdfs.hostname",
String.valueOf(server.hostname()),
"fs.hdfs.port",
String.valueOf(server.port()),
"fs.hdfs.user",
String.valueOf(server.user()),
"path",
"/" + fileSize,
"size",
fileSize,
"tasks.max",
"1",
"roll.duration",
"100m");

task.start(configs);

var records =
List.of(
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test0".getBytes())
.partition(0)
.timestamp(System.currentTimeMillis())
.build(),
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test1".getBytes())
.partition(1)
.timestamp(System.currentTimeMillis())
.build());

task.put(records);

Utils.sleep(Duration.ofMillis(2000));

task.close();

Assertions.assertTrue(task.isWriterDone());

var fs = FileSystem.of("hdfs", Configuration.of(configs));

Assertions.assertEquals(
2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size());

records.forEach(
sinkRecord -> {
var input =
fs.read(
"/"
+ String.join(
"/",
fileSize,
topicName,
String.valueOf(sinkRecord.partition()),
String.valueOf(sinkRecord.offset())));
var reader = RecordReader.builder(input).build();

while (reader.hasNext()) {
var record = reader.next();
Assertions.assertArrayEquals(record.key(), sinkRecord.key());
Assertions.assertArrayEquals(record.value(), sinkRecord.value());
Assertions.assertEquals(record.topic(), sinkRecord.topic());
Assertions.assertEquals(record.partition(), sinkRecord.partition());
Assertions.assertEquals(record.timestamp(), sinkRecord.timestamp());
}
});
}
}

@Test
void testHdfsSinkTaskIntervalWith1File() {
try (var server = HdfsServer.local()) {
var fileSize = "500Byte";
var topicName = Utils.randomString(10);

var task = new Exporter.Task();
var configs =
Map.of(
"fs.schema",
"hdfs",
"topics",
topicName,
"fs.hdfs.hostname",
String.valueOf(server.hostname()),
"fs.hdfs.port",
String.valueOf(server.port()),
"fs.hdfs.user",
String.valueOf(server.user()),
"path",
"/" + fileSize,
"size",
fileSize,
"tasks.max",
"1",
"roll.duration",
"300ms");

task.start(configs);

var records1 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test0".getBytes())
.partition(0)
.offset(0)
.timestamp(System.currentTimeMillis())
.build();

task.put(List.of(records1));

Utils.sleep(Duration.ofMillis(1000));

var fs = FileSystem.of("hdfs", Configuration.of(configs));

Assertions.assertEquals(
1, fs.listFiles("/" + String.join("/", fileSize, topicName, "0")).size());

var input =
fs.read(
"/"
+ String.join(
"/",
fileSize,
topicName,
String.valueOf(records1.partition()),
String.valueOf(records1.offset())));
var reader = RecordReader.builder(input).build();

while (reader.hasNext()) {
var record = reader.next();
Assertions.assertArrayEquals(record.key(), records1.key());
Assertions.assertArrayEquals(record.value(), records1.value());
Assertions.assertEquals(record.topic(), records1.topic());
Assertions.assertEquals(record.partition(), records1.partition());
Assertions.assertEquals(record.timestamp(), records1.timestamp());
Assertions.assertEquals(record.offset(), records1.offset());
}
task.close();
}
}

@Test
void testHdfsSinkTaskIntervalWith2Writers() {
try (var server = HdfsServer.local()) {
var fileSize = "500Byte";
var topicName = Utils.randomString(10);

var task = new Exporter.Task();
var configs =
Map.of(
"fs.schema",
"hdfs",
"topics",
topicName,
"fs.hdfs.hostname",
String.valueOf(server.hostname()),
"fs.hdfs.port",
String.valueOf(server.port()),
"fs.hdfs.user",
String.valueOf(server.user()),
"path",
"/" + fileSize,
"size",
fileSize,
"tasks.max",
"1",
"roll.duration",
"100ms");

task.start(configs);

var record1 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test0".getBytes())
.partition(0)
.offset(0)
.timestamp(System.currentTimeMillis())
.build();

var record2 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test1".getBytes())
.partition(1)
.offset(0)
.timestamp(System.currentTimeMillis())
.build();

var record3 =
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test2".getBytes())
.partition(0)
.offset(1)
.timestamp(System.currentTimeMillis())
.build();

task.put(List.of(record1));
Utils.sleep(Duration.ofMillis(500));

task.put(List.of(record2));
Utils.sleep(Duration.ofMillis(1000));

task.put(List.of(record3));
Utils.sleep(Duration.ofMillis(1000));

var fs = FileSystem.of("hdfs", Configuration.of(configs));

Assertions.assertEquals(
2, fs.listFolders("/" + String.join("/", fileSize, topicName)).size());

Assertions.assertEquals(
2, fs.listFiles("/" + String.join("/", fileSize, topicName, "0")).size());

List.of(record1, record2, record3)
.forEach(
sinkRecord -> {
var input =
fs.read(
"/"
+ String.join(
"/",
fileSize,
topicName,
String.valueOf(sinkRecord.partition()),
String.valueOf(sinkRecord.offset())));
var reader = RecordReader.builder(input).build();

while (reader.hasNext()) {
var record = reader.next();
Assertions.assertArrayEquals(record.key(), sinkRecord.key());
Assertions.assertArrayEquals(record.value(), sinkRecord.value());
Assertions.assertEquals(record.topic(), sinkRecord.topic());
Assertions.assertEquals(record.partition(), sinkRecord.partition());
Assertions.assertEquals(record.timestamp(), sinkRecord.timestamp());
Assertions.assertEquals(record.offset(), sinkRecord.offset());
}
});
task.close();
}
}
}

0 comments on commit a39e335

Please sign in to comment.