Skip to content

Commit

Permalink
[Managed Iceberg] Support partitioning time types (year, month, day, …
Browse files Browse the repository at this point in the history
…hour) (#32939)
  • Loading branch information
ahmedabu98 authored Dec 17, 2024
1 parent a9f50fa commit 286e29c
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 5
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -31,21 +36,28 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +102,7 @@ class DestinationState {
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
private final Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
private final List<Exception> exceptions = Lists.newArrayList();

DestinationState(IcebergDestination icebergDestination, Table table) {
Expand All @@ -98,6 +111,9 @@ class DestinationState {
this.spec = table.spec();
this.partitionKey = new PartitionKey(spec, schema);
this.table = table;
for (PartitionField partitionField : spec.fields()) {
partitionFieldMap.put(partitionField.name(), partitionField);
}

// build a cache of RecordWriters.
// writers will expire after 1 min of idle time.
Expand All @@ -123,7 +139,9 @@ class DestinationState {
throw rethrow;
}
openWriters--;
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
String partitionPath = getPartitionDataPath(pk.toPath(), partitionFieldMap);
dataFiles.add(
SerializableDataFile.from(recordWriter.getDataFile(), partitionPath));
})
.build();
}
Expand All @@ -136,7 +154,7 @@ class DestinationState {
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
*/
boolean write(Record record) {
partitionKey.partition(record);
partitionKey.partition(getPartitionableRecord(record));

if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) {
return false;
Expand Down Expand Up @@ -185,8 +203,65 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
e);
}
}

/**
* Resolves an input {@link Record}'s partition values and returns another {@link Record} that
* can be applied to the destination's {@link PartitionSpec}.
*/
private Record getPartitionableRecord(Record record) {
if (spec.isUnpartitioned()) {
return record;
}
Record output = GenericRecord.create(schema);
for (PartitionField partitionField : spec.fields()) {
Transform<?, ?> transform = partitionField.transform();
Types.NestedField field = schema.findField(partitionField.sourceId());
String name = field.name();
Object value = record.getField(name);
@Nullable Literal<Object> literal = Literal.of(value.toString()).to(field.type());
if (literal == null || transform.isVoid() || transform.isIdentity()) {
output.setField(name, value);
} else {
output.setField(name, literal.value());
}
}
return output;
}
}

/**
* Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a
* {@link DataFile}.
*/
@VisibleForTesting
static String getPartitionDataPath(
String partitionPath, Map<String, PartitionField> partitionFieldMap) {
if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) {
return partitionPath;
}
List<String> resolved = new ArrayList<>();
for (String partition : Splitter.on('/').splitToList(partitionPath)) {
List<String> nameAndValue = Splitter.on('=').splitToList(partition);
String name = nameAndValue.get(0);
String value = nameAndValue.get(1);
String transformName =
Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString();
if (Transforms.month().toString().equals(transformName)) {
int month = YearMonth.parse(value).getMonthValue();
value = String.valueOf(month);
} else if (Transforms.hour().toString().equals(transformName)) {
long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER));
value = String.valueOf(hour);
}
resolved.add(name + "=" + value);
}
return String.join("/", resolved);
}

private static final DateTimeFormatter HOUR_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);

private final Catalog catalog;
private final String filePrefix;
private final long maxFileSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,14 @@ abstract static class Builder {
* Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link
* PartitionKey}.
*/
static SerializableDataFile from(DataFile f, PartitionKey key) {
static SerializableDataFile from(DataFile f, String partitionPath) {

return SerializableDataFile.builder()
.setPath(f.path().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
.setPartitionPath(key.toPath())
.setPartitionPath(partitionPath)
.setPartitionSpecId(f.specId())
.setKeyMetadata(f.keyMetadata())
.setSplitOffsets(f.splitOffsets())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public void testWritePartitionedData() {
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
.identity("bool")
.identity("modulo_5")
.hour("datetime")
.truncate("str", "value_x".length())
.build();
Table table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
Expand All @@ -49,12 +54,16 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
Expand Down Expand Up @@ -360,4 +369,93 @@ public Void apply(Iterable<Row> input) {
return null;
}
}

@Test
public void testWritePartitionedData() {
Schema schema =
Schema.builder()
.addStringField("str")
.addInt32Field("int")
.addLogicalTypeField("y_date", SqlTypes.DATE)
.addLogicalTypeField("y_datetime", SqlTypes.DATETIME)
.addDateTimeField("y_datetime_tz")
.addLogicalTypeField("m_date", SqlTypes.DATE)
.addLogicalTypeField("m_datetime", SqlTypes.DATETIME)
.addDateTimeField("m_datetime_tz")
.addLogicalTypeField("d_date", SqlTypes.DATE)
.addLogicalTypeField("d_datetime", SqlTypes.DATETIME)
.addDateTimeField("d_datetime_tz")
.addLogicalTypeField("h_datetime", SqlTypes.DATETIME)
.addDateTimeField("h_datetime_tz")
.build();
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
PartitionSpec spec =
PartitionSpec.builderFor(icebergSchema)
.identity("str")
.bucket("int", 5)
.year("y_date")
.year("y_datetime")
.year("y_datetime_tz")
.month("m_date")
.month("m_datetime")
.month("m_datetime_tz")
.day("d_date")
.day("d_datetime")
.day("d_datetime_tz")
.hour("h_datetime")
.hour("h_datetime_tz")
.build();
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);

warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, spec);
Map<String, Object> config =
ImmutableMap.of(
"table",
identifier,
"catalog_properties",
ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location));

List<Row> rows = new ArrayList<>();
for (int i = 0; i < 30; i++) {
long millis = i * 100_00_000_000L;
LocalDate localDate = DateTimeUtil.dateFromDays(i * 100);
LocalDateTime localDateTime = DateTimeUtil.timestampFromMicros(millis * 1000);
DateTime dateTime = new DateTime(millis).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25));
Row row =
Row.withSchema(schema)
.addValues(
"str_" + i,
i,
localDate,
localDateTime,
dateTime,
localDate,
localDateTime,
dateTime,
localDate,
localDateTime,
dateTime,
localDateTime,
dateTime)
.build();
rows.add(row);
}

PCollection<Row> result =
testPipeline
.apply("Records To Add", Create.of(rows))
.setRowSchema(schema)
.apply(Managed.write(Managed.ICEBERG).withConfig(config))
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
testPipeline.run().waitUntilFinish();

Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
PCollection<Row> readRows =
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
PAssert.that(readRows).containsInAnyOrder(rows);
p.run();
}
}
Loading

0 comments on commit 286e29c

Please sign in to comment.