Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Updated API for Iceberg read operations #6268

Merged
merged 13 commits into from
Oct 28, 2024

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
Expand Down Expand Up @@ -40,7 +40,7 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
/**
* The instructions for customizations while reading.
*/
final IcebergInstructions instructions;
final IcebergReadInstructions instructions;

/**
* A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent.
Expand Down Expand Up @@ -83,7 +83,7 @@ protected IcebergTableLocationKey locationKey(
}
}

// Add the data instructions if provided as part of the IcebergInstructions.
// Add the data instructions if provided as part of the IcebergReadInstructions.
if (instructions.dataInstructions().isPresent()) {
builder.setSpecialInstructions(instructions.dataInstructions().get());
} else {
Expand All @@ -110,14 +110,18 @@ protected IcebergTableLocationKey locationKey(
public IcebergBaseLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@Nullable final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions,
@NotNull final IcebergReadInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableAdapter = tableAdapter;
this.snapshot = tableSnapshot;
this.instructions = instructions;
this.dataInstructionsProvider = dataInstructionsProvider;

this.tableDef = tableAdapter.definition(tableSnapshot, instructions);
if (snapshot == null) {
this.tableDef = tableAdapter.definition(instructions);
} else {
this.tableDef = tableAdapter.definition(instructions.withSnapshot(snapshot));
}

this.cache = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import org.apache.iceberg.*;
Expand All @@ -27,7 +27,7 @@ public final class IcebergFlatLayout extends IcebergBaseLayout {
public IcebergFlatLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@Nullable final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions,
@NotNull final IcebergReadInstructions instructions,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.util.type.TypeUtils;
Expand Down Expand Up @@ -49,7 +49,7 @@ public IcebergKeyValuePartitionedLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@Nullable final Snapshot tableSnapshot,
@NotNull final PartitionSpec partitionSpec,
@NotNull final IcebergInstructions instructions,
@NotNull final IcebergReadInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,30 @@

import io.deephaven.annotations.CopyableStyle;
import io.deephaven.engine.table.TableDefinition;
import org.apache.iceberg.Snapshot;
import org.immutables.value.Value;
import org.immutables.value.Value.Immutable;

import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

/**
* This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in
* this class may change in the future. As such, callers may wish to explicitly set the values.
*/
@Immutable
@CopyableStyle
public abstract class IcebergInstructions {
public abstract class IcebergReadInstructions {
/**
* The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system
* defaults for cloud provider-specific parameters
* The default {@link IcebergReadInstructions} to use when reading Iceberg data files. Providing this will use
* system defaults for cloud provider-specific parameters
*/
@SuppressWarnings("unused")
public static final IcebergInstructions DEFAULT = builder().build();
public static final IcebergReadInstructions DEFAULT = builder().build();

public static Builder builder() {
return ImmutableIcebergInstructions.builder();
return ImmutableIcebergReadInstructions.builder();
}

/**
Expand All @@ -49,7 +51,7 @@ public static Builder builder() {
/**
* Return a copy of this instructions object with the column renames replaced by {@code entries}.
*/
public abstract IcebergInstructions withColumnRenames(Map<String, ? extends String> entries);
public abstract IcebergReadInstructions withColumnRenames(Map<String, ? extends String> entries);

/**
* The {@link IcebergUpdateMode} mode to use when reading the Iceberg data files. Default is
Expand All @@ -60,22 +62,46 @@ public IcebergUpdateMode updateMode() {
return IcebergUpdateMode.staticMode();
}

/**
* The identifier of the snapshot to load for reading. If both this and {@link #snapshot()} are provided, the
* {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is
* provided, the latest snapshot will be loaded.
*/
public abstract OptionalLong tableSnapshotId();

/**
* The snapshot to load for reading. If both this and {@link #tableSnapshotId()} are provided, the
* {@link Snapshot#snapshotId()} should match the {@link #tableSnapshotId()}. Otherwise, only one of them should be
* provided. If neither is provided, the latest snapshot will be loaded.
*/
public abstract Optional<Snapshot> snapshot();

public abstract IcebergReadInstructions withSnapshot(Snapshot value);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

public interface Builder {
@SuppressWarnings("unused")
Builder tableDefinition(TableDefinition tableDefinition);

@SuppressWarnings("unused")
Builder dataInstructions(Object s3Instructions);

@SuppressWarnings("unused")
Builder putColumnRenames(String key, String value);

@SuppressWarnings("unused")
Builder putAllColumnRenames(Map<String, ? extends String> entries);

@SuppressWarnings("unused")
Builder updateMode(IcebergUpdateMode updateMode);

IcebergInstructions build();
Builder tableSnapshotId(long tableSnapshotId);

Builder snapshot(Snapshot snapshot);

IcebergReadInstructions build();
}

@Value.Check
final void checkSnapshotId() {
if (tableSnapshotId().isPresent() && snapshot().isPresent() &&
tableSnapshotId().getAsLong() != snapshot().get().snapshotId()) {
throw new IllegalArgumentException("If both tableSnapshotId and snapshot are provided, the snapshotId " +
"must match");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;

import org.apache.iceberg.Snapshot;
import org.immutables.value.Value;

import java.util.Optional;
import java.util.OptionalLong;

/**
* Base class for all read operations on Iceberg tables.
*/
abstract class IcebergReadOperationsBase {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
/**
* The instructions for customizations while reading, defaults to {@link IcebergReadInstructions#DEFAULT}.
*/
@Value.Default
public IcebergReadInstructions instructions() {
return IcebergReadInstructions.DEFAULT;
}

/**
* The identifier of the snapshot to load for reading. If both this and {@link #snapshot()} are provided, the
* {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is
* provided, the latest snapshot will be loaded.
*/
public abstract OptionalLong tableSnapshotId();

/**
* The snapshot to load for reading. If both this and {@link #tableSnapshotId()} are provided, the
* {@link Snapshot#snapshotId()} should match the {@link #tableSnapshotId()}. Otherwise, only one of them should be
* provided. If neither is provided, the latest snapshot will be loaded.
*/
public abstract Optional<Snapshot> snapshot();

interface Builder<OPERATION extends IcebergReadOperationsBase, OPERATION_BUILDER extends Builder<OPERATION, OPERATION_BUILDER>> {
OPERATION_BUILDER instructions(IcebergReadInstructions instructions);

OPERATION_BUILDER tableSnapshotId(long tableSnapshotId);

OPERATION_BUILDER snapshot(Snapshot snapshot);

OPERATION build();
}

@Value.Check
final void checkSnapshotId() {
if (tableSnapshotId().isPresent() && snapshot().isPresent() &&
tableSnapshotId().getAsLong() != snapshot().get().snapshotId()) {
throw new IllegalArgumentException("If both tableSnapshotId and snapshot are provided, the snapshotId " +
"must match");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public interface IcebergTable extends Table {
/**
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with the latest snapshot from
* the catalog.
* <p>
Expand All @@ -18,7 +18,7 @@ public interface IcebergTable extends Table {
void update();

/**
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from
* the catalog. If the {@code snapshotId} is not found in the list of snapshots for the table, an
* {@link IllegalArgumentException} is thrown. The input snapshot must also be newer (higher in sequence number)
Expand All @@ -31,7 +31,7 @@ public interface IcebergTable extends Table {
void update(final long snapshotId);

/**
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from
* the catalog. The input snapshot must be newer (higher in sequence number) than the current snapshot or an
* {@link IllegalArgumentException} is thrown.
Expand Down
Loading
Loading