Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Support writing to partitioned tables #32102

Merged
merged 15 commits into from
Aug 16, 2024
Prev Previous commit
Next Next commit
refactor record writer manager
ahmedabu98 committed Aug 13, 2024

Verified

This commit was signed with the committer’s verified signature.
snyk-bot Snyk bot
commit 3f7e1b8c31fb3c1050f7655952a054486b40c756

This file was deleted.

Original file line number Diff line number Diff line change
@@ -20,10 +20,11 @@
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;

import java.io.IOException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
@@ -34,14 +35,18 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters");
private final DataWriter<Record> icebergDataWriter;
private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;

RecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
@@ -56,12 +61,12 @@ class RecordWriter {
RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey)
throws IOException {
this.table = table;
LocationProvider locationProvider =
LocationProviders.locationsFor(table.location(), table.properties());
this.fileFormat = fileFormat;
if (table.spec().isUnpartitioned()) {
absoluteFilename = locationProvider.newDataLocation(filename);
absoluteFilename = table.locationProvider().newDataLocation(filename);
} else {
absoluteFilename = locationProvider.newDataLocation(table.spec(), partitionKey, filename);
absoluteFilename =
table.locationProvider().newDataLocation(table.spec(), partitionKey, filename);
}
OutputFile outputFile = table.io().newOutputFile(absoluteFilename);

@@ -91,6 +96,13 @@ class RecordWriter {
default:
throw new RuntimeException("Unknown File Format: " + fileFormat);
}
activeWriters.inc();
LOG.info(
"Opened {} writer for table {}, partition {}. Writing to path: {}",
fileFormat,
table.name(),
partitionKey,
absoluteFilename);
}

public void write(Row row) {
@@ -103,7 +115,17 @@ void write(Record record) {
}

public void close() throws IOException {
icebergDataWriter.close();
try {
icebergDataWriter.close();
} catch (IOException e) {
throw new IOException(
String.format(
"Failed to close %s writer for table %s, path: %s",
fileFormat, table.name(), absoluteFilename),
e);
}
activeWriters.dec();
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
}

public long bytesWritten() {
@@ -121,4 +143,8 @@ public ManifestFile getManifestFile() throws IOException {

return manifestWriter.toManifestFile();
}

public DataFile getDataFile() {
return icebergDataWriter.toDataFile();
}
}
Loading