Skip to content

Commit

Permalink
Use write.target-file-size-bytes value (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek authored Sep 8, 2024
1 parent 2a40003 commit e3b3097
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.util.PropertyUtil;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;

/**
* Iceberg Table Writer Factory to get TaskWriter for the table. upsert modes used to return correct writer.
*
Expand All @@ -37,6 +41,9 @@ public BaseTaskWriter<Record> create(Table icebergTable) {
OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format);
// equality Field Ids
Set<Integer> equalityFieldIds = icebergTable.schema().identifierFieldIds();
long targetFileSize =
PropertyUtil.propertyAsLong(
icebergTable.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
BaseTaskWriter<Record> writer;

// 1. TABLE DONT HAVE identifierFieldIds
Expand All @@ -52,12 +59,12 @@ public BaseTaskWriter<Record> create(Table icebergTable) {
if (icebergTable.spec().isUnpartitioned()) {
// table is un partitioned use un partitioned append writer
writer = new UnpartitionedWriter<>(
icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE);
icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), targetFileSize);

} else {
// table is partitioned use partitioned append writer
writer = new PartitionedAppendWriter(
icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE, icebergTable.schema());
icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), targetFileSize, icebergTable.schema());
}

}
Expand All @@ -69,12 +76,12 @@ else if (icebergTable.spec().isUnpartitioned()) {
// running with upsert mode + un partitioned table
writer = new UnpartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory,
icebergTable.io(),
Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes);
targetFileSize, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes);
} else {
// running with upsert mode + partitioned table
writer = new PartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory,
icebergTable.io(),
Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes);
targetFileSize, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes);
}

return writer;
Expand Down

0 comments on commit e3b3097

Please sign in to comment.