Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Managed Iceberg streaming writes #32451

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
3 changes: 1 addition & 2 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ dependencies {
// **** IcebergIO runtime dependencies ****
runtimeOnly library.java.hadoop_client
// Needed when using GCS as the warehouse location.
implementation library.java.bigdataoss_gcs_connector
permitUnusedDeclared library.java.bigdataoss_gcs_connector
runtimeOnly library.java.bigdataoss_gcs_connector
// Needed for HiveCatalog
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow")
Expand Down
15 changes: 11 additions & 4 deletions sdks/java/io/iceberg/hive/exec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@ artifacts {
shadowJar {
zip64 true

relocate 'com.google.common', getJavaRelocatedPath('iceberg.hive.com.google.common')
relocate 'com.google.protobuf', getJavaRelocatedPath('iceberg.hive.com.google.protobuf')
relocate 'shaded.parquet', getJavaRelocatedPath('iceberg.hive.shaded.parquet')
relocate 'org.apache.parquet', getJavaRelocatedPath('iceberg.hive.org.apache.parquet')
def problematicPackages = [
'com.google.protobuf',
'com.google.common',
'shaded.parquet',
'org.apache.parquet',
'org.joda'
]

problematicPackages.forEach {
relocate it, getJavaRelocatedPath("iceberg.hive.${it}")
}

version "3.1.3"
mergeServiceFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -29,14 +31,17 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AppendFilesToTables
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> {

private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class);
private final IcebergCatalogConfig catalogConfig;

AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
Expand Down Expand Up @@ -66,6 +71,8 @@ public String apply(FileWriteResult input) {

private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
private final Counter snapshotsCreated =
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");

private final IcebergCatalogConfig catalogConfig;

Expand All @@ -87,15 +94,21 @@ public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, SnapshotInfo>> out,
BoundedWindow window) {
if (!element.getValue().iterator().hasNext()) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
update.appendManifest(writtenFile.getManifestFile());
}
update.commit();
Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(table.currentSnapshot())),
window.maxTimestamp());
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
Expand All @@ -25,6 +26,12 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand All @@ -33,6 +40,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* The underlying Iceberg connector used by {@link org.apache.beam.sdk.managed.Managed#ICEBERG}. Not
Expand All @@ -49,13 +57,16 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {

@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
private static final int TRIGGERING_RECORD_COUNT = 50_000;

abstract IcebergCatalogConfig getCatalogConfig();

abstract @Nullable TableIdentifier getTableIdentifier();

abstract @Nullable DynamicDestinations getDynamicDestinations();

abstract @Nullable Duration getTriggeringFrequency();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -66,6 +77,8 @@ abstract static class Builder {

abstract Builder setDynamicDestinations(DynamicDestinations destinations);

abstract Builder setTriggeringFrequency(Duration triggeringFrequency);

abstract WriteRows build();
}

Expand All @@ -77,6 +90,21 @@ public WriteRows to(DynamicDestinations destinations) {
return toBuilder().setDynamicDestinations(destinations).build();
}

/**
* Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
* is produced.
*
* <p>Roughly every triggeringFrequency duration, this connector will try to accumulate all
* {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
* commit results in a new table {@link org.apache.iceberg.Snapshot}.
*
* <p>This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
* pipeline).
*/
public WriteRows withTriggeringFrequency(Duration triggeringFrequency) {
return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
}

@Override
public IcebergWriteResult expand(PCollection<Row> input) {
List<?> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
Expand All @@ -89,11 +117,32 @@ public IcebergWriteResult expand(PCollection<Row> input) {
destinations =
DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier()));
}

if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
Duration triggeringFrequency = getTriggeringFrequency();
checkArgumentNotNull(
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
input =
input.apply(
"WindowIntoGlobal",
Window.<Row>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
} else {
Preconditions.checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency is only applicable for streaming pipelines.");
}
return input
.apply("Set Destination Metadata", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations));
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand All @@ -35,14 +42,16 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* SchemaTransform implementation for {@link IcebergIO#writeRows}. Writes Beam Rows to Iceberg and
* outputs a {@code PCollection<Row>} representing snapshots created in the process.
*/
@AutoService(SchemaTransformProvider.class)
public class IcebergWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<SchemaTransformConfiguration> {
extends TypedSchemaTransformProvider<Configuration> {

static final String INPUT_TAG = "input";
static final String OUTPUT_TAG = "output";
Expand All @@ -57,8 +66,55 @@ public String description() {
+ "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), \"manifestListLocation\" (str)}";
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration {
public static Builder builder() {
return new AutoValue_IcebergWriteSchemaTransformProvider_Configuration.Builder();
}

@SchemaFieldDescription("Identifier of the Iceberg table.")
public abstract String getTable();

@SchemaFieldDescription("Name of the catalog containing the table.")
public abstract @Nullable String getCatalogName();

@SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
public abstract @Nullable Map<String, String> getCatalogProperties();

@SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
public abstract @Nullable Map<String, String> getConfigProperties();

@SchemaFieldDescription(
"For a streaming pipeline, sets the frequency at which snapshots are produced.")
public abstract @Nullable Integer getTriggeringFrequencySeconds();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);

public abstract Builder setCatalogName(String catalogName);

public abstract Builder setCatalogProperties(Map<String, String> catalogProperties);

public abstract Builder setConfigProperties(Map<String, String> confProperties);

public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds);

public abstract Configuration build();
}

public IcebergCatalogConfig getIcebergCatalog() {
return IcebergCatalogConfig.builder()
.setCatalogName(getCatalogName())
.setCatalogProperties(getCatalogProperties())
.setConfigProperties(getConfigProperties())
.build();
}
}

@Override
protected SchemaTransform from(SchemaTransformConfiguration configuration) {
protected SchemaTransform from(Configuration configuration) {
return new IcebergWriteSchemaTransform(configuration);
}

Expand All @@ -78,9 +134,9 @@ public String identifier() {
}

static class IcebergWriteSchemaTransform extends SchemaTransform {
private final SchemaTransformConfiguration configuration;
private final Configuration configuration;

IcebergWriteSchemaTransform(SchemaTransformConfiguration configuration) {
IcebergWriteSchemaTransform(Configuration configuration) {
this.configuration = configuration;
}

Expand All @@ -89,7 +145,7 @@ Row getConfigurationRow() {
// To stay consistent with our SchemaTransform configuration naming conventions,
// we sort lexicographically and convert field names to snake_case
return SchemaRegistry.createDefault()
.getToRowFunction(SchemaTransformConfiguration.class)
.getToRowFunction(Configuration.class)
.apply(configuration)
.sorted()
.toSnakeCase();
Expand All @@ -102,11 +158,17 @@ Row getConfigurationRow() {
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rows = input.get(INPUT_TAG);

IcebergIO.WriteRows writeTransform =
IcebergIO.writeRows(configuration.getIcebergCatalog())
.to(TableIdentifier.parse(configuration.getTable()));

Integer trigFreq = configuration.getTriggeringFrequencySeconds();
if (trigFreq != null) {
writeTransform = writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq));
}

// TODO: support dynamic destinations
IcebergWriteResult result =
rows.apply(
IcebergIO.writeRows(configuration.getIcebergCatalog())
.to(TableIdentifier.parse(configuration.getTable())));
IcebergWriteResult result = rows.apply(writeTransform);

PCollection<Row> snapshots =
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@

class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters");
private final Counter activeIcebergWriters =
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
private final DataWriter<Record> icebergDataWriter;
private final Table table;
private final String absoluteFilename;
Expand Down Expand Up @@ -92,7 +93,7 @@ class RecordWriter {
default:
throw new RuntimeException("Unknown File Format: " + fileFormat);
}
activeWriters.inc();
activeIcebergWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
fileFormat,
Expand All @@ -115,7 +116,7 @@ public void close() throws IOException {
fileFormat, table.name(), absoluteFilename),
e);
}
activeWriters.dec();
activeIcebergWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
}

Expand Down
Loading
Loading