Skip to content

Commit

Permalink
Support v2 iceberg views with gzip
Browse files Browse the repository at this point in the history
Similar to projectnessie#8848, but for views.
  • Loading branch information
rohanag12 committed Nov 22, 2024
1 parent 1e5f031 commit 9dcab5d
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.function.IntSupplier;
import java.util.stream.Stream;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergBlobMetadata;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergNamespace;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergNestedField;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionStatisticsFile;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema;
Expand All @@ -57,6 +58,9 @@
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshotRef;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergStatisticsFile;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergViewHistoryEntry;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergViewMetadata;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergViewVersion;
import org.projectnessie.catalog.formats.iceberg.types.IcebergType;

public class IcebergFixtures {
Expand Down Expand Up @@ -179,6 +183,26 @@ public static IcebergTableMetadata.Builder tableMetadataSimple() {
IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345678L).build());
}

public static IcebergViewMetadata.Builder viewMetadataSimple() {
IcebergSchema schemaAllTypes = icebergSchemaAllTypes();

return IcebergViewMetadata.builder()
.viewUuid(UUID.randomUUID().toString())
.location("view-location")
.currentVersionId(11)
.putProperty("prop", "value")
.addSchemas(schemaAllTypes)
.addVersions(
IcebergViewVersion.builder()
.versionId(11)
.schemaId(schemaAllTypes.schemaId())
.putSummary("operation", "testing")
.timestampMs(12345678L)
.defaultNamespace(IcebergNamespace.EMPTY_ICEBERG_NAMESPACE)
.build())
.addVersionLog(IcebergViewHistoryEntry.icebergViewHistoryEntry(12345678L, 11));
}

public static IcebergTableMetadata.Builder tableMetadataWithStatistics() {
IcebergStatisticsFile statisticsFile =
IcebergStatisticsFile.statisticsFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataSimple;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.viewMetadataSimple;

import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.ByteArrayOutputStream;
Expand All @@ -39,6 +40,7 @@
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshot;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortOrder;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergViewMetadata;

public class IcebergGenerateFixtures {
private IcebergGenerateFixtures() {}
Expand All @@ -62,8 +64,8 @@ public static ObjectWriter objectWriterForPath(Path path) {
};
}

public static String generateCompressedMetadata(ObjectWriter writer, int icebergSpecVersion)
throws Exception {
public static String generateCompressedMetadataForTable(
ObjectWriter writer, int icebergSpecVersion) throws Exception {
IcebergTableMetadata simpleTableMetadata =
tableMetadataSimple().formatVersion(icebergSpecVersion).build();

Expand Down Expand Up @@ -91,6 +93,35 @@ public static String generateCompressedMetadata(ObjectWriter writer, int iceberg
return writer.write(URI.create(metadataPath), data);
}

public static String generateCompressedMetadataForView(
ObjectWriter writer, int icebergSpecVersion) throws Exception {
IcebergViewMetadata simpleViewMetadata =
viewMetadataSimple().formatVersion(icebergSpecVersion).build();

byte[] data;
try (ByteArrayOutputStream bytes = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bytes)) {
IcebergJson.objectMapper()
.enable(SerializationFeature.INDENT_OUTPUT)
.writeValue(gzip, simpleViewMetadata);
gzip.flush();
data = bytes.toByteArray();
}
String metadataPath = "view-metadata-simple-no-manifest/";
switch (icebergSpecVersion) {
case 1:
metadataPath += "view-metadata-simple-compressed-no-manifest.metadata.json.gz";
break;
case 2:
metadataPath += "view-metadata-simple-compressed-no-manifest.gz.metadata.json";
break;
default:
metadataPath += "view-metadata-simple-compressed-no-manifest.json.gz";
break;
}
return writer.write(URI.create(metadataPath), data);
}

public static String generateSimpleMetadata(ObjectWriter writer, int icebergSpecVersion)
throws Exception {
IcebergTableMetadata simpleTableMetadata =
Expand All @@ -103,6 +134,18 @@ public static String generateSimpleMetadata(ObjectWriter writer, int icebergSpec
.getBytes(UTF_8));
}

public static String generateSimpleMetadataForView(ObjectWriter writer, int icebergSpecVersion)
throws Exception {
IcebergViewMetadata simpViewMetadata =
viewMetadataSimple().formatVersion(icebergSpecVersion).build();
return writer.write(
URI.create("view-metadata-simple-no-manifest/view-metadata-simple-no-manifest.json"),
IcebergJson.objectMapper()
.enable(SerializationFeature.INDENT_OUTPUT)
.writeValueAsString(simpViewMetadata)
.getBytes(UTF_8));
}

public static String generateMetadataWithManifestList(String basePath, ObjectWriter writer)
throws Exception {
return generateMetadataWithManifestList(basePath, writer, m -> {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.projectnessie.nessie.tasks.api.TaskState.successState;
import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.UUID;
Expand Down Expand Up @@ -93,12 +94,7 @@ private EntitySnapshotObj.Builder importIcebergTable(
NessieTable table;
IcebergTableMetadata tableMetadata;
try {
InputStream input = taskRequest.objectIO().readObject(metadataLocation);
if (metadataLocation.requiredPath().endsWith(".gz")
|| metadataLocation.requiredPath().endsWith(".gz.metadata.json")) {
input = new GZIPInputStream(input);
}
tableMetadata = IcebergJson.objectMapper().readValue(input, IcebergTableMetadata.class);
tableMetadata = icebergMetadata(metadataLocation, IcebergTableMetadata.class);
table = entityObjForContent(content, tableMetadata, entityObjId);
} catch (Exception e) {
throw new RuntimeException(
Expand Down Expand Up @@ -176,12 +172,7 @@ private EntitySnapshotObj.Builder importIcebergView(
IcebergViewMetadata viewMetadata;
StorageUri metadataLocation = StorageUri.of(content.getMetadataLocation());
try {

InputStream input = taskRequest.objectIO().readObject(metadataLocation);
if (metadataLocation.requiredPath().endsWith(".gz")) {
input = new GZIPInputStream(input);
}
viewMetadata = IcebergJson.objectMapper().readValue(input, IcebergViewMetadata.class);
viewMetadata = icebergMetadata(metadataLocation, IcebergViewMetadata.class);
view =
entityObjForContent(
content,
Expand Down Expand Up @@ -272,4 +263,19 @@ private static EntityObj buildEntityObj(ObjId entityObjId, NessieEntity entity)
.versionToken(randomObjId().toString())
.build();
}

private <T> T icebergMetadata(StorageUri metadataLocation, Class<? extends T> metadataType)
throws IOException {
InputStream input = metadataInputStream(metadataLocation);
return IcebergJson.objectMapper().readValue(input, metadataType);
}

private InputStream metadataInputStream(StorageUri metadataLocation) throws IOException {
final InputStream input = taskRequest.objectIO().readObject(metadataLocation);
if (metadataLocation.requiredPath().endsWith(".gz")
|| metadataLocation.requiredPath().endsWith(".gz.metadata.json")) {
return new GZIPInputStream(input);
}
return input;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures.generateCompressedMetadata;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures.generateCompressedMetadataForTable;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures.generateCompressedMetadataForView;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures.generateSimpleMetadata;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures.generateSimpleMetadataForView;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures.objectWriterForPath;
import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId;

Expand Down Expand Up @@ -48,7 +50,9 @@
import org.projectnessie.catalog.files.local.LocalObjectIO;
import org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures;
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.model.snapshot.NessieViewSnapshot;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
import org.projectnessie.nessie.tasks.async.pool.JavaPoolTasksAsync;
import org.projectnessie.nessie.tasks.service.TasksServiceConfig;
import org.projectnessie.nessie.tasks.service.impl.TaskServiceMetrics;
Expand Down Expand Up @@ -132,9 +136,49 @@ public void icebergTableImports(
static Stream<Arguments> icebergTableImports() throws Exception {
IcebergGenerateFixtures.ObjectWriter objectWriter = objectWriterForPath(tempDir);
return Stream.of(
arguments("compressed table-metadata generic", generateCompressedMetadata(objectWriter, 0)),
arguments("compressed table-metadata v1", generateCompressedMetadata(objectWriter, 1)),
arguments("compressed table-metadata v2", generateCompressedMetadata(objectWriter, 2)),
arguments(
"compressed table-metadata generic",
generateCompressedMetadataForTable(objectWriter, 0)),
arguments(
"compressed table-metadata v1", generateCompressedMetadataForTable(objectWriter, 1)),
arguments(
"compressed table-metadata v2", generateCompressedMetadataForTable(objectWriter, 2)),
arguments("simple table-metadata", generateSimpleMetadata(objectWriter, 2)));
}

@ParameterizedTest
@MethodSource("icebergViewImports")
public void icebergViewImports(
@SuppressWarnings("unused") String testName, String icebergViewMetadata) throws Exception {
BackendExceptionMapper exceptionMapper = BackendExceptionMapper.builder().build();

ObjectIO objectIO = new LocalObjectIO();
IcebergStuff icebergStuff =
new IcebergStuff(
objectIO,
persist,
tasksService,
new EntitySnapshotTaskBehavior(exceptionMapper, Duration.ofMillis(1)),
executor);

ObjId snapshotId = randomObjId();
IcebergView icebergView = IcebergView.of("1", icebergViewMetadata, 1, 1);

CompletionStage<NessieViewSnapshot> stage =
icebergStuff.retrieveIcebergSnapshot(snapshotId, icebergView);
NessieViewSnapshot snapshot = stage.toCompletableFuture().get(1, TimeUnit.MINUTES);
soft.assertThat(snapshot).isNotNull();
}

static Stream<Arguments> icebergViewImports() throws Exception {
IcebergGenerateFixtures.ObjectWriter objectWriter = objectWriterForPath(tempDir);
return Stream.of(
arguments(
"compressed view-metadata generic", generateCompressedMetadataForView(objectWriter, 0)),
arguments(
"compressed view-metadata v1", generateCompressedMetadataForView(objectWriter, 1)),
arguments(
"compressed view-metadata v2", generateCompressedMetadataForView(objectWriter, 2)),
arguments("simple view-metadata", generateSimpleMetadataForView(objectWriter, 2)));
}
}

0 comments on commit 9dcab5d

Please sign in to comment.