Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ensure data read order reflects commit sequence in Iceberg tables #6341

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
27a134c
Added ordering for discovery of manifest files in a snapshot
malhotrashivam Nov 7, 2024
5af1281
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 7, 2024
4799ed5
Added ordering of Iceberg keys based on number of files discovered
malhotrashivam Nov 7, 2024
9a72894
Sorted data files on dataSequenceNumber while reading
malhotrashivam Nov 7, 2024
e9f2e66
Added data file ordering based on (dataSequenceNumber, fileSequenceNu…
malhotrashivam Nov 7, 2024
a0e209e
Review with Devin contd.
malhotrashivam Nov 8, 2024
3c116cb
Review with Devin Part 2
malhotrashivam Nov 8, 2024
b048c64
Added manifest sequence number for ordering of data files
malhotrashivam Nov 9, 2024
c9a4e86
Updated comments as per code review
malhotrashivam Nov 11, 2024
f42b602
Review with Ryan part 1
malhotrashivam Nov 12, 2024
0e477e0
Discussion with Devin Part 1
malhotrashivam Nov 12, 2024
968f769
Review with Devin Part 2
malhotrashivam Nov 14, 2024
9116c63
Review with Ryan contd.
malhotrashivam Nov 15, 2024
ad2bf0e
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 15, 2024
9a69898
Changed signrature of IcebergTableParquetLocationKey constructor
malhotrashivam Nov 15, 2024
0be2ff8
Removed an extra newline
malhotrashivam Nov 15, 2024
ceac1b2
Review with Ryan contd.
malhotrashivam Nov 18, 2024
814b96f
Using toString instead of name for tableIdentifier and renamed a vari…
malhotrashivam Nov 18, 2024
e358605
Updated PR based on review comments by Devin
malhotrashivam Nov 19, 2024
c994561
More changes to comparator logic
malhotrashivam Nov 19, 2024
7af3345
Some more changes
malhotrashivam Nov 19, 2024
503af9f
More review comments
malhotrashivam Nov 19, 2024
e762fbc
Including patch from Ryan
malhotrashivam Nov 19, 2024
22aee1d
Including patch from Ryan
malhotrashivam Nov 19, 2024
c440d6c
Moved TableIdentifierComparator next to location key
malhotrashivam Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,16 @@ <PARTITION_VALUE_TYPE extends Comparable<PARTITION_VALUE_TYPE>> PARTITION_VALUE_
* with any live TableLocation.
*/
default void clear() {}

/**
* By default, compare fully qualified class names of the implementing classes. This method is a fallback where the
* implementing classes are not directly comparable, and should help establish a consistent ordering between
* distinct implementations.
* <p>
* {@inheritDoc}
*/
@Override
default int compareTo(@NotNull final TableLocationKey other) {
return this.getClass().getName().compareTo(other.getClass().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.util.compare.ObjectComparisons;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.UnknownPartitionKeyException;
Expand All @@ -15,9 +16,9 @@

/**
* Base {@link ImmutableTableLocationKey} implementation for table locations that may be enclosed by partitions.
* Sub-classes should be sure to invoke the partition-map comparator at higher priority than other comparisons when
* implementing {@link #compareTo(Object)}, and to include the partitions in their {@link #equals(Object)}
* implementations.
* Subclasses should consider invoking the partition-map comparator at higher priority than other comparisons when
* implementing {@link #compareTo(Object)}. Also, should include the partitions in their {@link #equals(Object)} and
* {@link #hashCode()} implementations if not calling {@code super.equals()} and {@code super.hashCode()}.
*/
public abstract class PartitionedTableLocationKey implements ImmutableTableLocationKey {

Expand All @@ -26,6 +27,8 @@ public abstract class PartitionedTableLocationKey implements ImmutableTableLocat

protected final Map<String, Comparable<?>> partitions;

private int cachedHashCode;

/**
* Construct a new PartitionedTableLocationKey for the supplied {@code partitions}.
*
Expand Down Expand Up @@ -55,6 +58,46 @@ public final Set<String> getPartitionKeys() {
return partitions.keySet();
}

@Override
public int compareTo(@NotNull final TableLocationKey other) {
if (other instanceof PartitionedTableLocationKey) {
final PartitionedTableLocationKey otherTyped = (PartitionedTableLocationKey) other;
final int partitionComparisonResult =
PartitionsComparator.INSTANCE.compare(partitions, otherTyped.partitions);
if (partitionComparisonResult != 0) {
return partitionComparisonResult;
}
}
return ImmutableTableLocationKey.super.compareTo(other);
}

@Override
public int hashCode() {
if (cachedHashCode == 0) {
final int computedHashCode = partitions.hashCode();
// Don't use 0; that's used by StandaloneTableLocationKey, and also our sentinel for the need to compute
if (computedHashCode == 0) {
final int fallbackHashCode = PartitionedTableLocationKey.class.hashCode();
cachedHashCode = fallbackHashCode == 0 ? 1 : fallbackHashCode;
} else {
cachedHashCode = computedHashCode;
}
}
return cachedHashCode;
}

@Override
public boolean equals(@Nullable final Object other) {
if (this == other) {
return true;
}
if (!(other instanceof PartitionedTableLocationKey)) {
return false;
}
final PartitionedTableLocationKey otherTyped = (PartitionedTableLocationKey) other;
return partitions.equals(otherTyped.partitions);
}

/**
* Formats a map of partitions as key-value pairs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public int compareTo(@NotNull final TableLocationKey other) {
if (other instanceof StandaloneTableLocationKey) {
return 0;
}
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
return ImmutableTableLocationKey.super.compareTo(other);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ public String toString() {
}

/**
* Precedence-wise this implementation compares {@code order}, then applies a {@link PartitionsComparator} to
* {@code partitions}, then compares {@code file}.
*
* @inheritDoc
* When comparing with another {@link FileTableLocationKey}, precedence-wise this implementation compares
* {@code order}, then applies a {@link PartitionsComparator} to {@code partitions}, then compares {@code file}.
* Otherwise, it delegates to parent class.
* <p>
* {@inheritDoc}
*/
@Override
public int compareTo(@NotNull final TableLocationKey other) {
Expand All @@ -86,7 +87,7 @@ public int compareTo(@NotNull final TableLocationKey other) {
}
return file.compareTo(otherTyped.file);
}
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
return super.compareTo(other);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class URITableLocationKey extends PartitionedTableLocationKey {
private static final String IMPLEMENTATION_NAME = URITableLocationKey.class.getSimpleName();

protected final URI uri;
private final int order;
protected final int order;

private int cachedHashCode;

Expand Down Expand Up @@ -72,10 +72,11 @@ public String toString() {
}

/**
* Precedence-wise this implementation compares {@code order}, then applies a {@link PartitionsComparator} to
* {@code partitions}, then compares {@code uri}.
*
* @inheritDoc
* When comparing with another {@link URITableLocationKey}, precedence-wise this implementation compares
* {@code order}, then applies a {@link PartitionsComparator} to {@code partitions}, then compares {@code uri}.
* Otherwise, it delegates to parent class.
* <p>
* {@inheritDoc}
*/
@Override
public int compareTo(@NotNull final TableLocationKey other) {
Expand All @@ -92,7 +93,7 @@ public int compareTo(@NotNull final TableLocationKey other) {
}
return uri.compareTo(otherTyped.uri);
}
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
return super.compareTo(other);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package io.deephaven.engine.table.impl.locations.impl;

import io.deephaven.base.log.LogOutput;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Map;
Expand All @@ -31,23 +29,4 @@ public LogOutput append(LogOutput logOutput) {
return logOutput.append(getImplementationName()).append("[partitions=")
.append(PartitionsFormatter.INSTANCE, partitions).append(']');
}

@Override
public int compareTo(@NotNull final TableLocationKey other) {
if (other instanceof SimpleTableLocationKey) {
return PartitionsComparator.INSTANCE.compare(partitions, ((SimpleTableLocationKey) other).partitions);
}
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
}

@Override
public int hashCode() {
return partitions.hashCode();
}

@Override
public boolean equals(final Object other) {
return other == this || (other instanceof SimpleTableLocationKey
&& partitions.equals(((SimpleTableLocationKey) other).partitions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,16 @@ public String toString() {

@Override
public int compareTo(@NotNull final TableLocationKey other) {
// noinspection DataFlowIssue
return Integer.compare(
(int) table.getAttribute("ID"),
(int) ((TableBackedTableLocationKey) other).table.getAttribute("ID"));
if (other instanceof TableBackedTableLocationKey) {
final TableBackedTableLocationKey otherTyped = (TableBackedTableLocationKey) other;
// noinspection DataFlowIssue
final int idComparisonResult =
Integer.compare((int) table.getAttribute("ID"), (int) otherTyped.table.getAttribute("ID"));
if (idComparisonResult != 0) {
return idComparisonResult;
}
}
return ImmutableTableLocationKey.super.compareTo(other);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,15 +704,6 @@ public int hashCode() {
return cachedHashCode;
}

@Override
public int compareTo(@NotNull final TableLocationKey other) {
if (getClass() != other.getClass()) {
throw new ClassCastException(String.format("Cannot compare %s to %s", getClass(), other.getClass()));
}
final TableLocationKeyImpl otherTableLocationKey = (TableLocationKeyImpl) other;
return PartitionsComparator.INSTANCE.compare(partitions, otherTableLocationKey.partitions);
}

@Override
public LogOutput append(@NotNull final LogOutput logOutput) {
return logOutput.append(getImplementationName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;

public abstract class IcebergBaseLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
Expand All @@ -33,6 +35,22 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
*/
final IcebergTableAdapter tableAdapter;

/**
* The UUID of the table, if available.
*/
@Nullable
private final UUID tableUuid;

/**
* Name of the {@link Catalog} used to access this table, if available.
*/
@Nullable
private final String catalogName;

/**
* The table identifier used to access this table.
*/
private final TableIdentifier tableIdentifier;
/**
* The {@link TableDefinition} that will be used for life of this table. Although Iceberg table schema may change,
* schema changes are not supported in Deephaven.
Expand All @@ -45,9 +63,9 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
private final String uriScheme;

/**
* A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent.
* The {@link Snapshot} from which to discover data files.
*/
private final Map<URI, IcebergTableLocationKey> cache;
Snapshot snapshot;

/**
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table.
Expand All @@ -60,17 +78,28 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
*/
private final SeekableChannelsProvider channelsProvider;


/**
* The {@link Snapshot} from which to discover data files.
* Create a new {@link IcebergTableLocationKey} for the given {@link ManifestFile}, {@link DataFile} and
* {@link URI}.
*
* @param manifestFile The manifest file from which the data file was discovered
* @param dataFile The data file that backs the keyed location
* @param fileUri The {@link URI} for the file that backs the keyed location
* @param partitions The table partitions enclosing the table location keyed by the returned key. If {@code null},
* the location will be a member of no partitions.
*
* @return A new {@link IcebergTableLocationKey}
*/
Snapshot snapshot;

protected IcebergTableLocationKey locationKey(
final org.apache.iceberg.FileFormat format,
final URI fileUri,
@NotNull final ManifestFile manifestFile,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri,
@Nullable final Map<String, Comparable<?>> partitions) {
final org.apache.iceberg.FileFormat format = dataFile.format();
if (format == org.apache.iceberg.FileFormat.PARQUET) {
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions, channelsProvider);
return new IcebergTableParquetLocationKey(catalogName, tableUuid, tableIdentifier, manifestFile, dataFile,
fileUri, 0, partitions, parquetInstructions, channelsProvider);
}
throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'",
tableAdapter, snapshot.snapshotId(), format, fileUri));
Expand All @@ -85,6 +114,20 @@ public IcebergBaseLayout(
@NotNull final IcebergReadInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableAdapter = tableAdapter;
{
UUID uuid;
try {
uuid = tableAdapter.icebergTable().uuid();
} catch (final RuntimeException e) {
// The UUID method is unsupported for v1 Iceberg tables since uuid is optional for v1 tables.
uuid = null;
}
this.tableUuid = uuid;
}

this.catalogName = tableAdapter.catalog().name();
this.tableIdentifier = tableAdapter.tableIdentifier();

this.snapshot = tableAdapter.getSnapshot(instructions);
this.tableDef = tableAdapter.definition(instructions);
this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme();
Expand All @@ -111,10 +154,9 @@ public IcebergBaseLayout(
this.parquetInstructions = builder.build();
}
this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions);
this.cache = new HashMap<>();
}

abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri);
abstract IcebergTableLocationKey keyFromDataFile(ManifestFile manifestFile, DataFile dataFile, URI fileUri);

private static String path(String path, FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
Expand Down Expand Up @@ -145,15 +187,14 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
table, snapshot.snapshotId(), manifestFile.content()));
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
for (DataFile df : reader) {
final URI fileUri = dataFileUri(table, df);
for (final DataFile dataFile : reader) {
final URI fileUri = dataFileUri(table, dataFile);
if (!uriScheme.equals(fileUri.getScheme())) {
throw new TableDataException(String.format(
"%s:%d - multiple URI schemes are not currently supported. uriScheme=%s, fileUri=%s",
table, snapshot.snapshotId(), uriScheme, fileUri));
}
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
final IcebergTableLocationKey locationKey = keyFromDataFile(manifestFile, dataFile, fileUri);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ public String toString() {
}

@Override
IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
return locationKey(df.format(), fileUri, null);
IcebergTableLocationKey keyFromDataFile(
@NotNull final ManifestFile manifestFile,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri) {
return locationKey(manifestFile, dataFile, fileUri, null);
}
}
Loading
Loading