Skip to content

Commit

Permalink
[Improve][Connector-V2] Reduce the create times of iceberg sink writer (
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Dec 5, 2024
1 parent e6f92fd commit 45a7a71
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class IcebergSinkWriter
private SeaTunnelRowType rowType;
private final SinkConfig config;
private final IcebergTableLoader icebergTableLoader;
private RecordWriter writer;
private volatile RecordWriter writer;
private final IcebergFilesCommitter filesCommitter;
private final List<WriteResult> results = Lists.newArrayList();
private String commitUser = UUID.randomUUID().toString();
Expand All @@ -79,7 +79,6 @@ public IcebergSinkWriter(
this.rowType = tableSchema.toPhysicalRowDataType();
this.filesCommitter = IcebergFilesCommitter.of(config, icebergTableLoader);
this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
tryCreateRecordWriter();
if (Objects.nonNull(states) && !states.isEmpty()) {
this.commitUser = states.get(0).getCommitUser();
preCommit(states);
Expand Down Expand Up @@ -107,8 +106,7 @@ public static IcebergSinkWriter of(SinkConfig config, CatalogTable catalogTable)

public static IcebergSinkWriter of(
SinkConfig config, CatalogTable catalogTable, List<IcebergSinkState> states) {
IcebergTableLoader icebergTableLoader =
IcebergTableLoader.create(config, catalogTable).open();
IcebergTableLoader icebergTableLoader = IcebergTableLoader.create(config, catalogTable);
return new IcebergSinkWriter(
icebergTableLoader, config, catalogTable.getTableSchema(), states);
}
Expand All @@ -121,7 +119,12 @@ public void write(SeaTunnelRow element) throws IOException {

@Override
public Optional<IcebergCommitInfo> prepareCommit() throws IOException {
List<WriteResult> writeResults = writer.complete();
List<WriteResult> writeResults;
if (writer != null) {
writeResults = writer.complete();
} else {
writeResults = Collections.emptyList();
}
IcebergCommitInfo icebergCommitInfo = new IcebergCommitInfo(writeResults);
this.results.addAll(writeResults);
return Optional.of(icebergCommitInfo);
Expand All @@ -134,6 +137,7 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
log.info("changed rowType before: {}", fieldsInfo(rowType));
this.rowType = dataTypeChangeEventHandler.reset(rowType).apply(event);
log.info("changed rowType after: {}", fieldsInfo(rowType));
tryCreateRecordWriter();
writer.applySchemaChange(this.rowType, event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ public static IcebergFilesCommitter of(

public void doCommit(List<WriteResult> results) {
TableIdentifier tableIdentifier = icebergTableLoader.getTableIdentifier();
Table table = icebergTableLoader.loadTable();
log.info("do commit table : " + table.toString());
commit(tableIdentifier, table, results);
commit(tableIdentifier, results);
}

private void commit(TableIdentifier tableIdentifier, Table table, List<WriteResult> results) {
private void commit(TableIdentifier tableIdentifier, List<WriteResult> results) {
List<DataFile> dataFiles =
results.stream()
.filter(payload -> payload.getDataFiles() != null)
Expand All @@ -77,6 +75,8 @@ private void commit(TableIdentifier tableIdentifier, Table table, List<WriteResu
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
log.info(String.format("Nothing to commit to table %s, skipping", tableIdentifier));
} else {
Table table = icebergTableLoader.loadTable();
log.info("do commit table : {}", table.toString());
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
if (branch != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class IcebergRecordWriter implements RecordWriter {
private final Table table;
private final SinkConfig config;
private final List<WriteResult> writerResults;
private TaskWriter<Record> writer;
private volatile TaskWriter<Record> writer;
private RowConverter recordConverter;
private final IcebergWriterFactory writerFactory;

Expand All @@ -62,7 +62,6 @@ public IcebergRecordWriter(Table table, IcebergWriterFactory writerFactory, Sink
this.writerResults = Lists.newArrayList();
this.recordConverter = new RowConverter(table, config);
this.writerFactory = writerFactory;
this.writer = createTaskWriter();
}

private TaskWriter<Record> createTaskWriter() {
Expand All @@ -71,6 +70,9 @@ private TaskWriter<Record> createTaskWriter() {

@Override
public void write(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) {
if (writer == null) {
resetWriter();
}
SchemaChangeWrapper updates = new SchemaChangeWrapper();
Record record = recordConverter.convert(seaTunnelRow, rowType, updates);
if (!updates.empty()) {
Expand Down Expand Up @@ -139,7 +141,6 @@ public List<WriteResult> complete() {
flush();
List<WriteResult> result = Lists.newArrayList(writerResults);
writerResults.clear();
resetWriter();
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ public void testMysqlCdcCheckDataE2e(TestContainer container)
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason =
"Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.")
public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container)
throws IOException, InterruptedException {
// Clear related content to ensure that multiple operations are not affected
Expand Down

0 comments on commit 45a7a71

Please sign in to comment.