diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java index 2175080ac7f..1028ae21b4a 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java @@ -61,7 +61,7 @@ public class IcebergSinkWriter private SeaTunnelRowType rowType; private final SinkConfig config; private final IcebergTableLoader icebergTableLoader; - private RecordWriter writer; + private volatile RecordWriter writer; private final IcebergFilesCommitter filesCommitter; private final List results = Lists.newArrayList(); private String commitUser = UUID.randomUUID().toString(); @@ -79,7 +79,6 @@ public IcebergSinkWriter( this.rowType = tableSchema.toPhysicalRowDataType(); this.filesCommitter = IcebergFilesCommitter.of(config, icebergTableLoader); this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher(); - tryCreateRecordWriter(); if (Objects.nonNull(states) && !states.isEmpty()) { this.commitUser = states.get(0).getCommitUser(); preCommit(states); @@ -107,8 +106,7 @@ public static IcebergSinkWriter of(SinkConfig config, CatalogTable catalogTable) public static IcebergSinkWriter of( SinkConfig config, CatalogTable catalogTable, List states) { - IcebergTableLoader icebergTableLoader = - IcebergTableLoader.create(config, catalogTable).open(); + IcebergTableLoader icebergTableLoader = IcebergTableLoader.create(config, catalogTable); return new IcebergSinkWriter( icebergTableLoader, config, catalogTable.getTableSchema(), states); } @@ -121,7 +119,12 @@ public void write(SeaTunnelRow element) throws IOException { @Override public Optional prepareCommit() throws IOException { - List writeResults = writer.complete(); + List writeResults; + if (writer != null) { + writeResults = writer.complete(); + } else { + writeResults = Collections.emptyList(); + } IcebergCommitInfo icebergCommitInfo = new IcebergCommitInfo(writeResults); this.results.addAll(writeResults); return Optional.of(icebergCommitInfo); @@ -134,6 +137,7 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException { log.info("changed rowType before: {}", fieldsInfo(rowType)); this.rowType = dataTypeChangeEventHandler.reset(rowType).apply(event); log.info("changed rowType after: {}", fieldsInfo(rowType)); + tryCreateRecordWriter(); writer.applySchemaChange(this.rowType, event); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java index 0b5e473440d..5e44e1d8750 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java @@ -54,12 +54,10 @@ public static IcebergFilesCommitter of( public void doCommit(List results) { TableIdentifier tableIdentifier = icebergTableLoader.getTableIdentifier(); - Table table = icebergTableLoader.loadTable(); - log.info("do commit table : " + table.toString()); - commit(tableIdentifier, table, results); + commit(tableIdentifier, results); } - private void commit(TableIdentifier tableIdentifier, Table table, List results) { + private void commit(TableIdentifier tableIdentifier, List results) { List dataFiles = results.stream() .filter(payload -> payload.getDataFiles() != null) @@ -77,6 +75,8 @@ private void commit(TableIdentifier tableIdentifier, Table table, List writerResults; - private TaskWriter writer; + private volatile TaskWriter writer; private RowConverter recordConverter; private final IcebergWriterFactory writerFactory; @@ -62,7 +62,6 @@ public IcebergRecordWriter(Table table, IcebergWriterFactory writerFactory, Sink this.writerResults = Lists.newArrayList(); this.recordConverter = new RowConverter(table, config); this.writerFactory = writerFactory; - this.writer = createTaskWriter(); } private TaskWriter createTaskWriter() { @@ -71,6 +70,9 @@ private TaskWriter createTaskWriter() { @Override public void write(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) { + if (writer == null) { + resetWriter(); + } SchemaChangeWrapper updates = new SchemaChangeWrapper(); Record record = recordConverter.convert(seaTunnelRow, rowType, updates); if (!updates.empty()) { @@ -139,7 +141,6 @@ public List complete() { flush(); List result = Lists.newArrayList(writerResults); writerResults.clear(); - resetWriter(); return result; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java index a7cbba8b89f..fa271eb8f67 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java @@ -209,6 +209,11 @@ public void testMysqlCdcCheckDataE2e(TestContainer container) } @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container) throws IOException, InterruptedException { // Clear related content to ensure that multiple operations are not affected