Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partition spec evolutions gracefully #202

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.kafka.clients.admin.MemberDescription;
Expand Down Expand Up @@ -213,15 +216,31 @@ private void commitToTable(
LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
} else {
if (deleteFiles.isEmpty()) {
AppendFiles appendOp = table.newAppend();
branch.ifPresent(appendOp::toBranch);
appendOp.set(snapshotOffsetsProp, offsetsJson);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (vtts != null) {
appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts));
Transaction transaction = table.newTransaction();

Map<Integer, List<DataFile>> filesBySpec =
dataFiles.stream()
.collect(Collectors.groupingBy(DataFile::specId, Collectors.toList()));

List<List<DataFile>> list = Lists.newArrayList(filesBySpec.values());
int lastIdx = list.size() - 1;
for (int i = 0; i <= lastIdx; i++) {
AppendFiles appendOp = transaction.newAppend();
branch.ifPresent(appendOp::toBranch);

list.get(i).forEach(appendOp::appendFile);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note however that we include the commit-id snapshot property on each snapshot we add as part of this transaction. We don't have to do this but it could be helpful for debugging purposes and it's easy enough to do.

if (i == lastIdx) {
appendOp.set(snapshotOffsetsProp, offsetsJson);
if (vtts != null) {
appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts));
}
}

appendOp.commit();
}
dataFiles.forEach(appendOp::appendFile);
appendOp.commit();

transaction.commitTransaction();
} else {
RowDelta deltaOp = table.newRowDelta();
branch.ifPresent(deltaOp::toBranch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import io.tabular.iceberg.connect.events.TopicPartitionOffset;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
Expand All @@ -43,8 +47,14 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types.StructType;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -222,6 +232,165 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() {
Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size());
}

private void validateAddedFiles(
Snapshot snapshot, Set<String> expectedDataFilePaths, PartitionSpec expectedSpec) {
final List<DataFile> addedDataFiles = ImmutableList.copyOf(snapshot.addedDataFiles(table.io()));
final List<DeleteFile> addedDeleteFiles =
ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io()));

Assertions.assertEquals(
expectedDataFilePaths,
addedDataFiles.stream().map(ContentFile::path).collect(Collectors.toSet()));

Assertions.assertEquals(
ImmutableSet.of(expectedSpec.specId()),
Stream.concat(addedDataFiles.stream(), addedDeleteFiles.stream())
.map(ContentFile::specId)
.collect(Collectors.toSet()));
}

/**
*
*
* <ul>
* <li>Sets up an empty table with 2 partition specs
* <li>Starts a coordinator with 2 worker assignment each handling a different topic-partition
* <li>Sends a commit request to workers
* <li>Each worker writes datafiles with a different partition spec
* <li>The coordinator receives datafiles from both workers eventually and commits them to the
* table
* </ul>
*/
@Test
public void testCommitMultiPartitionSpecAppendDataFiles() {
final PartitionSpec spec1 = table.spec();
assert spec1.isUnpartitioned();

// evolve spec to partition by date
final PartitionSpec partitionByDate = PartitionSpec.builderFor(SCHEMA).identity("date").build();
table.updateSpec().addField(partitionByDate.fields().get(0).name()).commit();
final PartitionSpec spec2 = table.spec();
assert spec2.isPartitioned();

// pretend we have two workers each handling 1 topic partition
final List<MemberDescription> members = Lists.newArrayList();
for (int i : ImmutableList.of(0, 1)) {
members.add(
new MemberDescription(
"memberId" + i,
"clientId" + i,
"host" + i,
new MemberAssignment(ImmutableSet.of(new TopicPartition(SRC_TOPIC_NAME, i)))));
}

final Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory);
coordinator.start();
initConsumer();

// start a new commit immediately and wait for all workers to respond infinitely
when(config.commitIntervalMs()).thenReturn(0);
when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);
coordinator.process();

// retrieve commitId from commit request produced by coordinator
final byte[] bytes = producer.history().get(0).value();
final Event commitRequest = Event.decode(bytes);
assert commitRequest.type().equals(EventType.COMMIT_REQUEST);
final UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId();

// each worker sends its responses for the commit request
Map<Integer, PartitionSpec> workerIdToSpecMap =
ImmutableMap.of(
1, spec1, // worker 1 produces datafiles with the old partition spec
2, spec2 // worker 2 produces datafiles with the new partition spec
);

int currentControlTopicOffset = 1;
for (Map.Entry<Integer, PartitionSpec> entry : workerIdToSpecMap.entrySet()) {
Integer workerId = entry.getKey();
PartitionSpec spec = entry.getValue();

final DataFile dataFile =
DataFiles.builder(spec)
.withPath(String.format("file%d.parquet", workerId))
.withFileSizeInBytes(100)
.withRecordCount(5)
.build();

consumer.addRecord(
new ConsumerRecord<>(
CTL_TOPIC_NAME,
0,
currentControlTopicOffset,
"key",
Event.encode(
new Event(
config.controlGroupId(),
EventType.COMMIT_RESPONSE,
new CommitResponsePayload(
spec.partitionType(),
commitId,
TableName.of(TABLE_IDENTIFIER),
ImmutableList.of(dataFile),
ImmutableList.of())))));
currentControlTopicOffset += 1;

consumer.addRecord(
new ConsumerRecord<>(
CTL_TOPIC_NAME,
0,
currentControlTopicOffset,
"key",
Event.encode(
new Event(
config.controlGroupId(),
EventType.COMMIT_READY,
new CommitReadyPayload(
commitId,
ImmutableList.of(
new TopicPartitionOffset(SRC_TOPIC_NAME, 0, 100L, 100L)))))));
currentControlTopicOffset += 1;
}

// all workers have responded so coordinator can process responses now
coordinator.process();

// assertions
table.refresh();
final List<Snapshot> snapshots = ImmutableList.copyOf(table.snapshots());
Assertions.assertEquals(2, snapshots.size(), "Expected 2 snapshots, one for each spec.");

final Snapshot firstSnapshot = snapshots.get(0);
final Snapshot secondSnapshot = snapshots.get(1);

validateAddedFiles(firstSnapshot, ImmutableSet.of("file1.parquet"), spec1);
validateAddedFiles(secondSnapshot, ImmutableSet.of("file2.parquet"), spec2);

Assertions.assertEquals(
commitId.toString(),
firstSnapshot.summary().get(COMMIT_ID_SNAPSHOT_PROP),
"All snapshots should be tagged with a commit-id");
Assertions.assertNull(
firstSnapshot.summary().getOrDefault(OFFSETS_SNAPSHOT_PROP, null),
"Earlier snapshots should not include control-topic-offsets in their summary");
Assertions.assertNull(
firstSnapshot.summary().getOrDefault(VTTS_SNAPSHOT_PROP, null),
"Earlier snapshots should not include vtts in their summary");

Assertions.assertEquals(
commitId.toString(),
secondSnapshot.summary().get(COMMIT_ID_SNAPSHOT_PROP),
"All snapshots should be tagged with a commit-id");
Assertions.assertEquals(
"{\"0\":5}",
secondSnapshot.summary().get(OFFSETS_SNAPSHOT_PROP),
"Only the most recent snapshot should include control-topic-offsets in it's summary");
Assertions.assertEquals(
"100",
secondSnapshot.summary().get(VTTS_SNAPSHOT_PROP),
"Only the most recent snapshot should include vtts in it's summary");
}

private void assertCommitTable(int idx, UUID commitId, long ts) {
byte[] bytes = producer.history().get(idx).value();
Event commitTable = Event.decode(bytes);
Expand Down
Loading