Skip to content

Commit

Permalink
Add Iceberg support for name-based mapping schema
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Dec 9, 2024
1 parent d24d838 commit f5a0764
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 9 deletions.
2 changes: 2 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ dependencies {
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
testImplementation library.java.bigdataoss_util_hadoop
testImplementation "org.apache.parquet:parquet-avro:$parquet_version"
testImplementation "org.apache.parquet:parquet-common:$parquet_version"
testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
Expand All @@ -40,6 +41,7 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand Down Expand Up @@ -88,6 +90,7 @@ public boolean advance() throws IOException {
// which are not null-safe.
@SuppressWarnings("nullness")
org.apache.iceberg.@NonNull Schema project = this.project;
String nameMapping = source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING);

do {
// If our current iterator is working... do that.
Expand Down Expand Up @@ -117,33 +120,48 @@ public boolean advance() throws IOException {
switch (file.format()) {
case ORC:
LOG.info("Preparing ORC input");
iterable =
ORC.ReadBuilder orcReader =
ORC.read(input)
.split(fileTask.start(), fileTask.length())
.project(project)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(project, fileSchema))
.filter(fileTask.residual())
.build();
.filter(fileTask.residual());

if (nameMapping != null) {
orcReader.withNameMapping(NameMappingParser.fromJson(nameMapping));
}

iterable = orcReader.build();
break;
case PARQUET:
LOG.info("Preparing Parquet input.");
iterable =
Parquet.ReadBuilder parquetReader =
Parquet.read(input)
.split(fileTask.start(), fileTask.length())
.project(project)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(project, fileSchema))
.filter(fileTask.residual())
.build();
.filter(fileTask.residual());

if (nameMapping != null) {
parquetReader.withNameMapping(NameMappingParser.fromJson(nameMapping));
}

iterable = parquetReader.build();
break;
case AVRO:
LOG.info("Preparing Avro input.");
iterable =
Avro.ReadBuilder avroReader =
Avro.read(input)
.split(fileTask.start(), fileTask.length())
.project(project)
.createReaderFunc(DataReader::create)
.build();
.createReaderFunc(DataReader::create);

if (nameMapping != null) {
avroReader.withNameMapping(NameMappingParser.fromJson(nameMapping));
}

iterable = avroReader.build();
break;
default:
throw new UnsupportedOperationException("Cannot read format: " + file.format());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
Expand All @@ -33,10 +38,28 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -122,4 +145,131 @@ public void testSimpleScan() throws Exception {

testPipeline.run();
}

@Test
public void testNameMappingScan() throws Exception {
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord(
"test",
null,
null,
false,
Lists.newArrayList(
new org.apache.avro.Schema.Field(
"data", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)),
new org.apache.avro.Schema.Field(
"id", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG))));

List<Map<String, Object>> recordData =
ImmutableList.<Map<String, Object>>builder()
.add(ImmutableMap.of("id", 0L, "data", "clarification"))
.add(ImmutableMap.of("id", 1L, "data", "risky"))
.add(ImmutableMap.of("id", 2L, "data", "falafel"))
.build();

List<GenericRecord> avroRecords =
recordData.stream()
.map(data -> avroGenericRecord(avroSchema, data))
.collect(Collectors.toList());

Configuration hadoopConf = new Configuration();
String path = createParquetFile(avroSchema, avroRecords);
HadoopInputFile inputFile = HadoopInputFile.fromLocation(path, hadoopConf);

NameMapping defaultMapping = NameMapping.of(MappedField.of(1, "id"), MappedField.of(2, "data"));
ImmutableMap<String, String> tableProperties =
ImmutableMap.<String, String>builder()
.put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(defaultMapping))
.build();

TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
Table simpleTable =
warehouse
.buildTable(tableId, TestFixtures.SCHEMA)
.withProperties(tableProperties)
.withPartitionSpec(PartitionSpec.unpartitioned())
.create();

MetricsConfig metricsConfig = MetricsConfig.forTable(simpleTable);
Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig);
DataFile dataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withFormat(FileFormat.PARQUET)
.withInputFile(inputFile)
.withMetrics(metrics)
.build();

final Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);

simpleTable.newFastAppend().appendFile(dataFile).commit();

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
.put("warehouse", warehouse.location)
.build();

IcebergCatalogConfig catalogConfig =
IcebergCatalogConfig.builder()
.setCatalogName("name")
.setCatalogProperties(catalogProps)
.build();

PCollection<Row> output =
testPipeline
.apply(IcebergIO.readRows(catalogConfig).from(tableId))
.apply(ParDo.of(new PrintRow()))
.setCoder(RowCoder.of(beamSchema));

final Row[] expectedRows =
recordData.stream()
.map(data -> icebergGenericRecord(TestFixtures.SCHEMA, data))
.map(record -> IcebergUtils.icebergRecordToBeamRow(beamSchema, record))
.toArray(Row[]::new);

PAssert.that(output)
.satisfies(
(Iterable<Row> rows) -> {
assertThat(rows, containsInAnyOrder(expectedRows));
return null;
});

testPipeline.run();
}

public static GenericRecord avroGenericRecord(
org.apache.avro.Schema schema, Map<String, Object> values) {
GenericRecord record = new GenericData.Record(schema);
values.forEach(record::put);
return record;
}

public static Record icebergGenericRecord(
org.apache.iceberg.Schema schema, Map<String, Object> values) {
return org.apache.iceberg.data.GenericRecord.create(schema).copy(values);
}

public static String createParquetFile(org.apache.avro.Schema schema, List<GenericRecord> records)
throws IOException {

File tempFile = createTempFile();
Path file = new Path(tempFile.getPath());

AvroParquetWriter.Builder<GenericRecord> builder = AvroParquetWriter.builder(file);
ParquetWriter<GenericRecord> parquetWriter = builder.withSchema(schema).build();
for (GenericRecord record : records) {
parquetWriter.write(record);
}
parquetWriter.close();

return tempFile.getPath();
}

private static File createTempFile() throws IOException {
File tempFile = File.createTempFile(ScanSourceTest.class.getSimpleName(), ".tmp");
tempFile.deleteOnExit();
boolean unused = tempFile.delete();
return tempFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public Table createTable(
return catalog.createTable(tableId, schema, partitionSpec);
}

public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) {
return catalog.buildTable(tableId, schema);
}

public Table loadTable(TableIdentifier tableId) {
return catalog.loadTable(tableId);
}
Expand Down

0 comments on commit f5a0764

Please sign in to comment.