Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Support writing to partitioned tables #32102

Merged
merged 15 commits into from
Aug 16, 2024
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

import java.io.IOException;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -34,23 +31,37 @@
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RecordWriter {

private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters");
private final DataWriter<Record> icebergDataWriter;

private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;

RecordWriter(Catalog catalog, IcebergDestination destination, String filename)
RecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
throws IOException {
this(
catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename);
catalog.loadTable(destination.getTableIdentifier()),
destination.getFileFormat(),
filename,
partitionKey);
}

RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException {
RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey)
throws IOException {
this.table = table;
this.absoluteFilename = table.location() + "/" + filename;
this.fileFormat = fileFormat;
if (table.spec().isUnpartitioned()) {
absoluteFilename = table.locationProvider().newDataLocation(filename);
} else {
absoluteFilename =
table.locationProvider().newDataLocation(table.spec(), partitionKey, filename);
}
OutputFile outputFile = table.io().newOutputFile(absoluteFilename);

switch (fileFormat) {
Expand All @@ -60,6 +71,7 @@ class RecordWriter {
.createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.overwrite()
.build();
break;
Expand All @@ -69,6 +81,7 @@ class RecordWriter {
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.overwrite()
.build();
break;
Expand All @@ -77,34 +90,38 @@ class RecordWriter {
default:
throw new RuntimeException("Unknown File Format: " + fileFormat);
}
activeWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
fileFormat,
table.name(),
partitionKey,
absoluteFilename);
}

public void write(Row row) {
Record record = beamRowToIcebergRecord(table.schema(), row);
public void write(Record record) {
icebergDataWriter.write(record);
}

public void close() throws IOException {
icebergDataWriter.close();
}

public Table getTable() {
return table;
try {
icebergDataWriter.close();
} catch (IOException e) {
throw new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
}
activeWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
}

public long bytesWritten() {
return icebergDataWriter.length();
}

public ManifestFile getManifestFile() throws IOException {
String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest");
OutputFile outputFile = table.io().newOutputFile(manifestFilename);
ManifestWriter<DataFile> manifestWriter;
try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(getTable().spec(), outputFile)) {
openWriter.add(icebergDataWriter.toDataFile());
manifestWriter = openWriter;
}

return manifestWriter.toManifestFile();
public DataFile getDataFile() {
return icebergDataWriter.toDataFile();
}
}
Loading
Loading