Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Allow updating partition specs during pipeline runtime #32879

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
* [Managed Iceberg] Allow updating partition specs at runtime ([#32879](https://github.com/apache/beam/pull/32879))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
Expand All @@ -29,14 +32,23 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,9 +57,11 @@ class AppendFilesToTables
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> {
private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class);
private final IcebergCatalogConfig catalogConfig;
private final String manifestFilePrefix;

AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
AppendFilesToTables(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) {
this.catalogConfig = catalogConfig;
this.manifestFilePrefix = manifestFilePrefix;
}

@Override
Expand All @@ -67,27 +81,27 @@ public String apply(FileWriteResult input) {
.apply("Group metadata updates by table", GroupByKey.create())
.apply(
"Append metadata updates to tables",
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER));
}

private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
private final Counter snapshotsCreated =
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");
private final Counter dataFilesCommitted =
Metrics.counter(AppendFilesToTables.class, "dataFilesCommitted");
private final Distribution committedDataFileByteSize =
Metrics.distribution(RecordWriter.class, "committedDataFileByteSize");
private final Distribution committedDataFileRecordCount =
Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount");

private final IcebergCatalogConfig catalogConfig;
private final String manifestFilePrefix;

private transient @MonotonicNonNull Catalog catalog;

private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) {
this.catalogConfig = catalogConfig;
this.manifestFilePrefix = manifestFilePrefix;
}

private Catalog getCatalog() {
Expand All @@ -97,36 +111,103 @@ private Catalog getCatalog() {
return catalog;
}

private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> fileWriteResults) {
int id = fileWriteResults.iterator().next().getSerializableDataFile().getPartitionSpecId();
for (FileWriteResult result : fileWriteResults) {
if (id != result.getSerializableDataFile().getPartitionSpecId()) {
return true;
}
}
return false;
}

@ProcessElement
public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, SnapshotInfo>> out,
BoundedWindow window) {
BoundedWindow window)
throws IOException {
String tableStringIdentifier = element.getKey();
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (!fileWriteResults.iterator().hasNext()) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));

// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition specs, we will group
// data into manifest files and append.
// note: either way, we must use a single commit operation for atomicity.
if (containsMultiplePartitionSpecs(fileWriteResults)) {
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
appendManifestFiles(table, fileWriteResults);
} else {
appendDataFiles(table, fileWriteResults);
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
}

Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
}

// This works only when all files are using the same partition spec.
private void appendDataFiles(Table table, Iterable<FileWriteResult> fileWriteResults) {
AppendFiles update = table.newAppend();
long numFiles = 0;
for (FileWriteResult result : fileWriteResults) {
DataFile dataFile = result.getDataFile(table.spec());
DataFile dataFile = result.getDataFile(table.specs());
update.appendFile(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
numFiles++;
}
// this commit will create a ManifestFile. we don't need to manually create one.
update.commit();
dataFilesCommitted.inc(numFiles);
}

// When a user updates their table partition spec during runtime, we can end up with
// a batch of files where some are written with the old spec and some are written with the new
// spec.
// A table commit is limited to a single partition spec.
// To handle this, we create a manifest file for each partition spec, and group data files
// accordingly.
// Afterward, we append all manifests using a single commit operation.
private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWriteResults)
Copy link

@DanielMorales9 DanielMorales9 Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a long method 😲

throws IOException {
String uuid = UUID.randomUUID().toString();
Map<Integer, PartitionSpec> specs = table.specs();
Map<Integer, ManifestWriter<DataFile>> manifestFileWriters = Maps.newHashMap();
// first add datafiles to the appropriate manifest file, according to its spec id
for (FileWriteResult result : fileWriteResults) {
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
DataFile dataFile = result.getDataFile(specs);
int specId = dataFile.specId();
PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId));
ManifestWriter<DataFile> writer =
manifestFileWriters.computeIfAbsent(
specId, id -> createManifestWriter(table.location(), uuid, spec, table.io()));
writer.add(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
}

// append all manifest files and commit
AppendFiles update = table.newAppend();
for (ManifestWriter<DataFile> writer : manifestFileWriters.values()) {
writer.close();
ManifestFile manifestFile = writer.toManifestFile();
update.appendManifest(manifestFile);
}
update.commit();
}

Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
private ManifestWriter<DataFile> createManifestWriter(
String tableLocation, String uuid, PartitionSpec spec, FileIO io) {
String location =
FileFormat.AVRO.addExtension(
String.format(
"%s/metadata/%s-%s-%s.manifest",
tableLocation, manifestFilePrefix, uuid, spec.specId()));
OutputFile outputFile = io.newOutputFile(location);
return ManifestFiles.write(spec, outputFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.iceberg;

import com.google.auto.value.AutoValue;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
Expand Down Expand Up @@ -46,9 +47,9 @@ public TableIdentifier getTableIdentifier() {
}

@SchemaIgnore
public DataFile getDataFile(PartitionSpec spec) {
public DataFile getDataFile(Map<Integer, PartitionSpec> specs) {
if (cachedDataFile == null) {
cachedDataFile = getSerializableDataFile().createDataFile(spec);
cachedDataFile = getSerializableDataFile().createDataFile(specs);
}
return cachedDataFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.Preconditions;
Expand Down Expand Up @@ -195,7 +194,9 @@ private RecordWriter createWriter(PartitionKey partitionKey) {

private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();
private static final Cache<TableIdentifier, Table> TABLE_CACHE =

@VisibleForTesting
static final Cache<TableIdentifier, Table> TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

private boolean isClosed = false;
Expand All @@ -221,22 +222,28 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table == null) {
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
synchronized (TABLE_CACHE) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
}
}
TABLE_CACHE.put(identifier, table);
}
TABLE_CACHE.put(identifier, table);
} else {
// If fetching from cache, refresh the table to avoid working with stale metadata
// (e.g. partition spec)
table.refresh();
}
return table;
}
Expand All @@ -254,15 +261,7 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
icebergDestination,
destination -> {
TableIdentifier identifier = destination.getValue().getTableIdentifier();
Table table;
try {
table =
TABLE_CACHE.get(
identifier, () -> getOrCreateTable(identifier, row.getSchema()));
} catch (ExecutionException e) {
throw new RuntimeException(
"Error while fetching or creating table: " + identifier, e);
}
Table table = getOrCreateTable(identifier, row.getSchema());
return new DestinationState(destination.getValue(), table);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -141,12 +142,14 @@ static SerializableDataFile from(DataFile f, PartitionKey key) {
* it from Beam-compatible types.
*/
@SuppressWarnings("nullness")
DataFile createDataFile(PartitionSpec partitionSpec) {
Preconditions.checkState(
partitionSpec.specId() == getPartitionSpecId(),
"Invalid partition spec id '%s'. This DataFile was originally created with spec id '%s'.",
partitionSpec.specId(),
getPartitionSpecId());
DataFile createDataFile(Map<Integer, PartitionSpec> partitionSpecs) {
PartitionSpec partitionSpec =
checkStateNotNull(
partitionSpecs.get(getPartitionSpecId()),
"This DataFile was originally created with spec id '%s'. Could not find "
+ "this among table's partition specs: %s.",
getPartitionSpecId(),
partitionSpecs.keySet());

Metrics dataFileMetrics =
new Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {

// Commit files to tables
PCollection<KV<String, SnapshotInfo>> snapshots =
writtenFiles.apply(new AppendFilesToTables(catalogConfig));
writtenFiles.apply(new AppendFilesToTables(catalogConfig, filePrefix));

return new IcebergWriteResult(input.getPipeline(), snapshots);
}
Expand Down
Loading
Loading