diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java index f7190e80..c2a86b14 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java @@ -12,10 +12,14 @@ import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.util.PropertyUtil; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + /** * Iceberg Table Writer Factory to get TaskWriter for the table. upsert modes used to return correct writer. * @@ -37,6 +41,9 @@ public BaseTaskWriter create(Table icebergTable) { OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format); // equality Field Ids Set equalityFieldIds = icebergTable.schema().identifierFieldIds(); + long targetFileSize = + PropertyUtil.propertyAsLong( + icebergTable.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); BaseTaskWriter writer; // 1. TABLE DONT HAVE identifierFieldIds @@ -52,12 +59,12 @@ public BaseTaskWriter create(Table icebergTable) { if (icebergTable.spec().isUnpartitioned()) { // table is un partitioned use un partitioned append writer writer = new UnpartitionedWriter<>( - icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE); + icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), targetFileSize); } else { // table is partitioned use partitioned append writer writer = new PartitionedAppendWriter( - icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE, icebergTable.schema()); + icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), targetFileSize, icebergTable.schema()); } } @@ -69,12 +76,12 @@ else if (icebergTable.spec().isUnpartitioned()) { // running with upsert mode + un partitioned table writer = new UnpartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), - Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); + targetFileSize, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); } else { // running with upsert mode + partitioned table writer = new PartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), - Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); + targetFileSize, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); } return writer;