Skip to content

Commit

Permalink
Support committing data files with different partition specs (#202)
Browse files Browse the repository at this point in the history
Co-authored-by: Farooq Qaiser <[email protected]>
  • Loading branch information
fqtab and fqaiser94 authored Mar 26, 2024
1 parent d144cbc commit cdd54f3
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.kafka.clients.admin.MemberDescription;
Expand Down Expand Up @@ -213,15 +216,31 @@ private void commitToTable(
LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
} else {
if (deleteFiles.isEmpty()) {
AppendFiles appendOp = table.newAppend();
branch.ifPresent(appendOp::toBranch);
appendOp.set(snapshotOffsetsProp, offsetsJson);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (vtts != null) {
appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts));
Transaction transaction = table.newTransaction();

Map<Integer, List<DataFile>> filesBySpec =
dataFiles.stream()
.collect(Collectors.groupingBy(DataFile::specId, Collectors.toList()));

List<List<DataFile>> list = Lists.newArrayList(filesBySpec.values());
int lastIdx = list.size() - 1;
for (int i = 0; i <= lastIdx; i++) {
AppendFiles appendOp = transaction.newAppend();
branch.ifPresent(appendOp::toBranch);

list.get(i).forEach(appendOp::appendFile);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (i == lastIdx) {
appendOp.set(snapshotOffsetsProp, offsetsJson);
if (vtts != null) {
appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts));
}
}

appendOp.commit();
}
dataFiles.forEach(appendOp::appendFile);
appendOp.commit();

transaction.commitTransaction();
} else {
RowDelta deltaOp = table.newRowDelta();
branch.ifPresent(deltaOp::toBranch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import io.tabular.iceberg.connect.events.TopicPartitionOffset;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
Expand All @@ -43,8 +47,14 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types.StructType;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -222,6 +232,165 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() {
Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size());
}

private void validateAddedFiles(
Snapshot snapshot, Set<String> expectedDataFilePaths, PartitionSpec expectedSpec) {
final List<DataFile> addedDataFiles = ImmutableList.copyOf(snapshot.addedDataFiles(table.io()));
final List<DeleteFile> addedDeleteFiles =
ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io()));

Assertions.assertEquals(
expectedDataFilePaths,
addedDataFiles.stream().map(ContentFile::path).collect(Collectors.toSet()));

Assertions.assertEquals(
ImmutableSet.of(expectedSpec.specId()),
Stream.concat(addedDataFiles.stream(), addedDeleteFiles.stream())
.map(ContentFile::specId)
.collect(Collectors.toSet()));
}

/**
*
*
* <ul>
* <li>Sets up an empty table with 2 partition specs
* <li>Starts a coordinator with 2 worker assignment each handling a different topic-partition
* <li>Sends a commit request to workers
* <li>Each worker writes datafiles with a different partition spec
* <li>The coordinator receives datafiles from both workers eventually and commits them to the
* table
* </ul>
*/
@Test
public void testCommitMultiPartitionSpecAppendDataFiles() {
final PartitionSpec spec1 = table.spec();
assert spec1.isUnpartitioned();

// evolve spec to partition by date
final PartitionSpec partitionByDate = PartitionSpec.builderFor(SCHEMA).identity("date").build();
table.updateSpec().addField(partitionByDate.fields().get(0).name()).commit();
final PartitionSpec spec2 = table.spec();
assert spec2.isPartitioned();

// pretend we have two workers each handling 1 topic partition
final List<MemberDescription> members = Lists.newArrayList();
for (int i : ImmutableList.of(0, 1)) {
members.add(
new MemberDescription(
"memberId" + i,
"clientId" + i,
"host" + i,
new MemberAssignment(ImmutableSet.of(new TopicPartition(SRC_TOPIC_NAME, i)))));
}

final Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory);
coordinator.start();
initConsumer();

// start a new commit immediately and wait for all workers to respond infinitely
when(config.commitIntervalMs()).thenReturn(0);
when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
coordinator.process();

// retrieve commitId from commit request produced by coordinator
final byte[] bytes = producer.history().get(0).value();
final Event commitRequest = Event.decode(bytes);
assert commitRequest.type().equals(EventType.COMMIT_REQUEST);
final UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId();

// each worker sends its responses for the commit request
Map<Integer, PartitionSpec> workerIdToSpecMap =
ImmutableMap.of(
1, spec1, // worker 1 produces datafiles with the old partition spec
2, spec2 // worker 2 produces datafiles with the new partition spec
);

int currentControlTopicOffset = 1;
for (Map.Entry<Integer, PartitionSpec> entry : workerIdToSpecMap.entrySet()) {
Integer workerId = entry.getKey();
PartitionSpec spec = entry.getValue();

final DataFile dataFile =
DataFiles.builder(spec)
.withPath(String.format("file%d.parquet", workerId))
.withFileSizeInBytes(100)
.withRecordCount(5)
.build();

consumer.addRecord(
new ConsumerRecord<>(
CTL_TOPIC_NAME,
0,
currentControlTopicOffset,
"key",
Event.encode(
new Event(
config.controlGroupId(),
EventType.COMMIT_RESPONSE,
new CommitResponsePayload(
spec.partitionType(),
commitId,
TableName.of(TABLE_IDENTIFIER),
ImmutableList.of(dataFile),
ImmutableList.of())))));
currentControlTopicOffset += 1;

consumer.addRecord(
new ConsumerRecord<>(
CTL_TOPIC_NAME,
0,
currentControlTopicOffset,
"key",
Event.encode(
new Event(
config.controlGroupId(),
EventType.COMMIT_READY,
new CommitReadyPayload(
commitId,
ImmutableList.of(
new TopicPartitionOffset(SRC_TOPIC_NAME, 0, 100L, 100L)))))));
currentControlTopicOffset += 1;
}

// all workers have responded so coordinator can process responses now
coordinator.process();

// assertions
table.refresh();
final List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
Assertions.assertEquals(2, snapshots.size(), "Expected 2 snapshots, one for each spec.");

final Snapshot firstSnapshot = snapshots.get(0);
final Snapshot secondSnapshot = snapshots.get(1);

validateAddedFiles(firstSnapshot, ImmutableSet.of("file1.parquet"), spec1);
validateAddedFiles(secondSnapshot, ImmutableSet.of("file2.parquet"), spec2);

Assertions.assertEquals(
commitId.toString(),
firstSnapshot.summary().get(COMMIT_ID_SNAPSHOT_PROP),
"All snapshots should be tagged with a commit-id");
Assertions.assertNull(
firstSnapshot.summary().getOrDefault(OFFSETS_SNAPSHOT_PROP, null),
"Earlier snapshots should not include control-topic-offsets in their summary");
Assertions.assertNull(
firstSnapshot.summary().getOrDefault(VTTS_SNAPSHOT_PROP, null),
"Earlier snapshots should not include vtts in their summary");

Assertions.assertEquals(
commitId.toString(),
secondSnapshot.summary().get(COMMIT_ID_SNAPSHOT_PROP),
"All snapshots should be tagged with a commit-id");
Assertions.assertEquals(
"{\"0\":5}",
secondSnapshot.summary().get(OFFSETS_SNAPSHOT_PROP),
"Only the most recent snapshot should include control-topic-offsets in it's summary");
Assertions.assertEquals(
"100",
secondSnapshot.summary().get(VTTS_SNAPSHOT_PROP),
"Only the most recent snapshot should include vtts in it's summary");
}

private void assertCommitTable(int idx, UUID commitId, long ts) {
byte[] bytes = producer.history().get(idx).value();
Event commitTable = Event.decode(bytes);
Expand Down

0 comments on commit cdd54f3

Please sign in to comment.