From 73a7c5a64e5b51cbc9f43f4a5365fca966a183f4 Mon Sep 17 00:00:00 2001 From: Farooq Qaiser Date: Tue, 5 Mar 2024 15:28:54 -0500 Subject: [PATCH] Support committing data files with different partition specs --- .../iceberg/connect/channel/Coordinator.java | 35 +++- .../connect/channel/CoordinatorTest.java | 169 ++++++++++++++++++ 2 files changed, 196 insertions(+), 8 deletions(-) diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 519ad0af..439ba4d2 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -37,16 +37,19 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.apache.kafka.clients.admin.MemberDescription; @@ -213,15 +216,31 @@ private void commitToTable( LOG.info("Nothing to commit to table {}, skipping", tableIdentifier); } else { if (deleteFiles.isEmpty()) { - AppendFiles appendOp = table.newAppend(); - branch.ifPresent(appendOp::toBranch); - appendOp.set(snapshotOffsetsProp, offsetsJson); - appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); - if (vtts != null) { - appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts)); + Transaction transaction = table.newTransaction(); + + Map> filesBySpec = + dataFiles.stream() + .collect(Collectors.groupingBy(DataFile::specId, Collectors.toList())); + + List> list = Lists.newArrayList(filesBySpec.values()); + int lastIdx = list.size() - 1; + for (int i = 0; i <= lastIdx; i++) { + AppendFiles appendOp = transaction.newAppend(); + branch.ifPresent(appendOp::toBranch); + + list.get(i).forEach(appendOp::appendFile); + appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); + if (i == lastIdx) { + appendOp.set(snapshotOffsetsProp, offsetsJson); + if (vtts != null) { + appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts)); + } + } + + appendOp.commit(); } - dataFiles.forEach(appendOp::appendFile); - appendOp.commit(); + + transaction.commitTransaction(); } else { RowDelta deltaOp = table.newRowDelta(); branch.ifPresent(deltaOp::toBranch); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java index 8074c265..353036fd 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java @@ -33,8 +33,12 @@ import io.tabular.iceberg.connect.events.TopicPartitionOffset; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DataOperations; @@ -43,8 +47,14 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types.StructType; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -222,6 +232,165 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() { Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size()); } + private void validateAddedFiles( + Snapshot snapshot, Set expectedDataFilePaths, PartitionSpec expectedSpec) { + final List addedDataFiles = ImmutableList.copyOf(snapshot.addedDataFiles(table.io())); + final List addedDeleteFiles = + ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())); + + Assertions.assertEquals( + expectedDataFilePaths, + addedDataFiles.stream().map(ContentFile::path).collect(Collectors.toSet())); + + Assertions.assertEquals( + ImmutableSet.of(expectedSpec.specId()), + Stream.concat(addedDataFiles.stream(), addedDeleteFiles.stream()) + .map(ContentFile::specId) + .collect(Collectors.toSet())); + } + + /** + * + * + *
    + *
  • Sets up an empty table with 2 partition specs + *
  • Starts a coordinator with 2 worker assignment each handling a different topic-partition + *
  • Sends a commit request to workers + *
  • Each worker writes datafiles with a different partition spec + *
  • The coordinator receives datafiles from both workers eventually and commits them to the + * table + *
+ */ + @Test + public void testCommitMultiPartitionSpecAppendDataFiles() { + final PartitionSpec spec1 = table.spec(); + assert spec1.isUnpartitioned(); + + // evolve spec to partition by date + final PartitionSpec partitionByDate = PartitionSpec.builderFor(SCHEMA).identity("date").build(); + table.updateSpec().addField(partitionByDate.fields().get(0).name()).commit(); + final PartitionSpec spec2 = table.spec(); + assert spec2.isPartitioned(); + + // pretend we have two workers each handling 1 topic partition + final List members = Lists.newArrayList(); + for (int i : ImmutableList.of(0, 1)) { + members.add( + new MemberDescription( + "memberId" + i, + "clientId" + i, + "host" + i, + new MemberAssignment(ImmutableSet.of(new TopicPartition(SRC_TOPIC_NAME, i))))); + } + + final Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory); + coordinator.start(); + initConsumer(); + + // start a new commit immediately and wait for all workers to respond infinitely + when(config.commitIntervalMs()).thenReturn(0); + when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE); + coordinator.process(); + + // retrieve commitId from commit request produced by coordinator + final byte[] bytes = producer.history().get(0).value(); + final Event commitRequest = Event.decode(bytes); + assert commitRequest.type().equals(EventType.COMMIT_REQUEST); + final UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId(); + + // each worker sends its responses for the commit request + Map workerIdToSpecMap = + ImmutableMap.of( + 1, spec1, // worker 1 produces datafiles with the old partition spec + 2, spec2 // worker 2 produces datafiles with the new partition spec + ); + + int currentControlTopicOffset = 1; + for (Map.Entry entry : workerIdToSpecMap.entrySet()) { + Integer workerId = entry.getKey(); + PartitionSpec spec = entry.getValue(); + + final DataFile dataFile = + DataFiles.builder(spec) + .withPath(String.format("file%d.parquet", workerId)) + .withFileSizeInBytes(100) + .withRecordCount(5) + .build(); + + consumer.addRecord( + new ConsumerRecord<>( + CTL_TOPIC_NAME, + 0, + currentControlTopicOffset, + "key", + Event.encode( + new Event( + config.controlGroupId(), + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + spec.partitionType(), + commitId, + TableName.of(TABLE_IDENTIFIER), + ImmutableList.of(dataFile), + ImmutableList.of()))))); + currentControlTopicOffset += 1; + + consumer.addRecord( + new ConsumerRecord<>( + CTL_TOPIC_NAME, + 0, + currentControlTopicOffset, + "key", + Event.encode( + new Event( + config.controlGroupId(), + EventType.COMMIT_READY, + new CommitReadyPayload( + commitId, + ImmutableList.of( + new TopicPartitionOffset(SRC_TOPIC_NAME, 0, 100L, 100L))))))); + currentControlTopicOffset += 1; + } + + // all workers have responded so coordinator can process responses now + coordinator.process(); + + // assertions + table.refresh(); + final List snapshots = ImmutableList.copyOf(table.snapshots()); + Assertions.assertEquals(2, snapshots.size(), "Expected 2 snapshots, one for each spec."); + + final Snapshot firstSnapshot = snapshots.get(0); + final Snapshot secondSnapshot = snapshots.get(1); + + validateAddedFiles(firstSnapshot, ImmutableSet.of("file1.parquet"), spec1); + validateAddedFiles(secondSnapshot, ImmutableSet.of("file2.parquet"), spec2); + + Assertions.assertEquals( + commitId.toString(), + firstSnapshot.summary().get(COMMIT_ID_SNAPSHOT_PROP), + "All snapshots should be tagged with a commit-id"); + Assertions.assertNull( + firstSnapshot.summary().getOrDefault(OFFSETS_SNAPSHOT_PROP, null), + "Earlier snapshots should not include control-topic-offsets in their summary"); + Assertions.assertNull( + firstSnapshot.summary().getOrDefault(VTTS_SNAPSHOT_PROP, null), + "Earlier snapshots should not include vtts in their summary"); + + Assertions.assertEquals( + commitId.toString(), + secondSnapshot.summary().get(COMMIT_ID_SNAPSHOT_PROP), + "All snapshots should be tagged with a commit-id"); + Assertions.assertEquals( + "{\"0\":5}", + secondSnapshot.summary().get(OFFSETS_SNAPSHOT_PROP), + "Only the most recent snapshot should include control-topic-offsets in it's summary"); + Assertions.assertEquals( + "100", + secondSnapshot.summary().get(VTTS_SNAPSHOT_PROP), + "Only the most recent snapshot should include vtts in it's summary"); + } + private void assertCommitTable(int idx, UUID commitId, long ts) { byte[] bytes = producer.history().get(idx).value(); Event commitTable = Event.decode(bytes);