Skip to content

Commit

Permalink
Add nested field
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Jan 6, 2025
1 parent 265339b commit 2f0f04b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.MappedFields;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetUtil;
Expand Down Expand Up @@ -234,6 +235,19 @@ public void testIdentityColumnScan() throws Exception {

@Test
public void testNameMappingScan() throws Exception {
org.apache.avro.Schema metadataSchema =
org.apache.avro.Schema.createRecord(
"metadata",
null,
null,
false,
ImmutableList.of(
new org.apache.avro.Schema.Field(
"source",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
null,
null)));

org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord(
"test",
Expand All @@ -242,15 +256,33 @@ public void testNameMappingScan() throws Exception {
false,
ImmutableList.of(
new org.apache.avro.Schema.Field(
"data", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)),
"data",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
null,
null),
new org.apache.avro.Schema.Field(
"id", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG))));
"id",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
null,
null),
new org.apache.avro.Schema.Field("metadata", metadataSchema, null, null)));

List<Map<String, Object>> recordData =
ImmutableList.<Map<String, Object>>builder()
.add(ImmutableMap.of("id", 0L, "data", "clarification"))
.add(ImmutableMap.of("id", 1L, "data", "risky"))
.add(ImmutableMap.of("id", 2L, "data", "falafel"))
.add(
ImmutableMap.of(
"id",
0L,
"data",
"clarification",
"metadata",
ImmutableMap.of("source", "systemA")))
.add(
ImmutableMap.of(
"id", 1L, "data", "risky", "metadata", ImmutableMap.of("source", "systemB")))
.add(
ImmutableMap.of(
"id", 2L, "data", "falafel", "metadata", ImmutableMap.of("source", "systemC")))
.build();

List<GenericRecord> avroRecords =
Expand All @@ -262,7 +294,11 @@ public void testNameMappingScan() throws Exception {
String path = createParquetFile(avroSchema, avroRecords);
HadoopInputFile inputFile = HadoopInputFile.fromLocation(path, hadoopConf);

NameMapping defaultMapping = NameMapping.of(MappedField.of(1, "id"), MappedField.of(2, "data"));
NameMapping defaultMapping =
NameMapping.of(
MappedField.of(1, "id"),
MappedField.of(2, "data"),
MappedField.of(3, "metadata", MappedFields.of(MappedField.of(4, "source"))));
ImmutableMap<String, String> tableProperties =
ImmutableMap.<String, String>builder()
.put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(defaultMapping))
Expand All @@ -272,7 +308,7 @@ public void testNameMappingScan() throws Exception {
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
Table simpleTable =
warehouse
.buildTable(tableId, TestFixtures.SCHEMA)
.buildTable(tableId, TestFixtures.NESTED_SCHEMA)
.withProperties(tableProperties)
.withPartitionSpec(PartitionSpec.unpartitioned())
.create();
Expand All @@ -286,7 +322,7 @@ public void testNameMappingScan() throws Exception {
.withMetrics(metrics)
.build();

final Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
final Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.NESTED_SCHEMA);

simpleTable.newFastAppend().appendFile(dataFile).commit();

Expand All @@ -310,7 +346,7 @@ public void testNameMappingScan() throws Exception {

final Row[] expectedRows =
recordData.stream()
.map(data -> icebergGenericRecord(TestFixtures.SCHEMA, data))
.map(data -> icebergGenericRecord(TestFixtures.NESTED_SCHEMA.asStruct(), data))
.map(record -> IcebergUtils.icebergRecordToBeamRow(beamSchema, record))
.toArray(Row[]::new);

Expand All @@ -324,16 +360,34 @@ public void testNameMappingScan() throws Exception {
testPipeline.run();
}

@SuppressWarnings("unchecked")
public static GenericRecord avroGenericRecord(
org.apache.avro.Schema schema, Map<String, Object> values) {
GenericRecord record = new GenericData.Record(schema);
values.forEach(record::put);
for (org.apache.avro.Schema.Field field : schema.getFields()) {
Object rawValue = values.get(field.name());
Object avroValue =
rawValue instanceof Map
? avroGenericRecord(field.schema(), (Map<String, Object>) rawValue)
: rawValue;
record.put(field.name(), avroValue);
}
return record;
}

public static Record icebergGenericRecord(
org.apache.iceberg.Schema schema, Map<String, Object> values) {
return org.apache.iceberg.data.GenericRecord.create(schema).copy(values);
@SuppressWarnings("unchecked")
public static Record icebergGenericRecord(Types.StructType type, Map<String, Object> values) {
org.apache.iceberg.data.GenericRecord record =
org.apache.iceberg.data.GenericRecord.create(type);
for (Types.NestedField field : type.fields()) {
Object rawValue = values.get(field.name());
Object value =
rawValue instanceof Map
? icebergGenericRecord(field.type().asStructType(), (Map<String, Object>) rawValue)
: rawValue;
record.setField(field.name(), value);
}
return record;
}

public static String createParquetFile(org.apache.avro.Schema schema, List<GenericRecord> records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ public class TestFixtures {
new Schema(
required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get()));

public static final Schema NESTED_SCHEMA =
new Schema(
required(1, "id", Types.LongType.get()),
optional(2, "data", Types.StringType.get()),
optional(
3, "metadata", Types.StructType.of(optional(4, "source", Types.StringType.get()))));

public static final List<Map<String, Object>> FILE1SNAPSHOT1_DATA =
ImmutableList.of(
ImmutableMap.of("id", 0L, "data", "clarification"),
Expand Down

0 comments on commit 2f0f04b

Please sign in to comment.