Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ensure data read order reflects commit sequence in Iceberg tables #6341

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
27a134c
Added ordering for discovery of manifest files in a snapshot
malhotrashivam Nov 7, 2024
5af1281
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 7, 2024
4799ed5
Added ordering of Iceberg keys based on number of files discovered
malhotrashivam Nov 7, 2024
9a72894
Sorted data files on dataSequenceNumber while reading
malhotrashivam Nov 7, 2024
e9f2e66
Added data file ordering based on (dataSequenceNumber, fileSequenceNu…
malhotrashivam Nov 7, 2024
a0e209e
Review with Devin contd.
malhotrashivam Nov 8, 2024
3c116cb
Review with Devin Part 2
malhotrashivam Nov 8, 2024
b048c64
Added manifest sequence number for ordering of data files
malhotrashivam Nov 9, 2024
c9a4e86
Updated comments as per code review
malhotrashivam Nov 11, 2024
f42b602
Review with Ryan part 1
malhotrashivam Nov 12, 2024
0e477e0
Discussion with Devin Part 1
malhotrashivam Nov 12, 2024
968f769
Review with Devin Part 2
malhotrashivam Nov 14, 2024
9116c63
Review with Ryan contd.
malhotrashivam Nov 15, 2024
ad2bf0e
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 15, 2024
9a69898
Changed signrature of IcebergTableParquetLocationKey constructor
malhotrashivam Nov 15, 2024
0be2ff8
Removed an extra newline
malhotrashivam Nov 15, 2024
ceac1b2
Review with Ryan contd.
malhotrashivam Nov 18, 2024
814b96f
Using toString instead of name for tableIdentifier and renamed a vari…
malhotrashivam Nov 18, 2024
e358605
Updated PR based on review comments by Devin
malhotrashivam Nov 19, 2024
c994561
More changes to comparator logic
malhotrashivam Nov 19, 2024
7af3345
Some more changes
malhotrashivam Nov 19, 2024
503af9f
More review comments
malhotrashivam Nov 19, 2024
e762fbc
Including patch from Ryan
malhotrashivam Nov 19, 2024
22aee1d
Including patch from Ryan
malhotrashivam Nov 19, 2024
c440d6c
Moved TableIdentifierComparator next to location key
malhotrashivam Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class URITableLocationKey extends PartitionedTableLocationKey {
private static final String IMPLEMENTATION_NAME = URITableLocationKey.class.getSimpleName();

protected final URI uri;
private final int order;
protected final int order;

private int cachedHashCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,26 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. Only
* accessed while synchronized on {@code this}.
*/
ParquetInstructions parquetInstructions;
private ParquetInstructions parquetInstructions;

/**
* Create a new {@link IcebergTableLocationKey} for the given {@link ManifestFile}, {@link DataFile} and
* {@link URI}.
*
* @param manifestFile The manifest file from which the data file was discovered
* @param dataFile The data file that backs the keyed location
* @param fileUri The {@link URI} for the file that backs the keyed location
* @param partitions The table partitions enclosing the table location keyed by the returned key. If {@code null},
* the location will be a member of no partitions.
*
* @return A new {@link IcebergTableLocationKey}
*/
protected IcebergTableLocationKey locationKey(
final org.apache.iceberg.FileFormat format,
final URI fileUri,
@NotNull final ManifestFile manifestFile,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri,
@Nullable final Map<String, Comparable<?>> partitions) {

final org.apache.iceberg.FileFormat format = dataFile.format();
if (format == org.apache.iceberg.FileFormat.PARQUET) {
if (parquetInstructions == null) {
// Start with user-supplied instructions (if provided).
Expand All @@ -78,7 +91,7 @@ protected IcebergTableLocationKey locationKey(

// Add any column rename mappings.
if (!instructions.columnRenames().isEmpty()) {
for (Map.Entry<String, String> entry : instructions.columnRenames().entrySet()) {
for (final Map.Entry<String, String> entry : instructions.columnRenames().entrySet()) {
builder.addColumnNameMapping(entry.getKey(), entry.getValue());
}
}
Expand All @@ -96,7 +109,8 @@ protected IcebergTableLocationKey locationKey(

parquetInstructions = builder.build();
}
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions);
return new IcebergTableParquetLocationKey(dataFile, manifestFile, fileUri, 0, partitions,
parquetInstructions);
}
throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'",
tableAdapter, snapshot.snapshotId(), format, fileUri));
Expand All @@ -119,7 +133,7 @@ public IcebergBaseLayout(
this.cache = new HashMap<>();
}

abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri);
abstract IcebergTableLocationKey keyFromDataFile(ManifestFile manifestFile, DataFile dataFile, URI fileUri);

@NotNull
private URI dataFileUri(@NotNull DataFile df) {
Expand Down Expand Up @@ -148,10 +162,9 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
table, snapshot.snapshotId(), manifestFile.content()));
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
for (DataFile df : reader) {
final URI fileUri = dataFileUri(df);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
for (final DataFile dataFile : reader) {
final URI fileUri = dataFileUri(dataFile);
final IcebergTableLocationKey locationKey = keyFromDataFile(manifestFile, dataFile, fileUri);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ public String toString() {
}

@Override
IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
return locationKey(df.format(), fileUri, null);
IcebergTableLocationKey keyFromDataFile(
@NotNull final ManifestFile manifestFile,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri) {
return locationKey(manifestFile, dataFile, fileUri, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ public String toString() {
}

@Override
IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
IcebergTableLocationKey keyFromDataFile(
@NotNull final ManifestFile manifestFile,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri) {
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();

final PartitionData partitionData = (PartitionData) df.partition();
final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
final String colName = colData.name;
final Object colValue = partitionData.get(colData.index);
Expand All @@ -94,6 +97,6 @@ IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
}
partitions.put(colName, (Comparable<?>) colValue);
}
return locationKey(df.format(), fileUri, partitions);
return locationKey(manifestFile, dataFile, fileUri, partitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
//
package io.deephaven.iceberg.location;

import io.deephaven.base.verify.Require;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -18,24 +21,60 @@
public class IcebergTableParquetLocationKey extends ParquetTableLocationKey implements IcebergTableLocationKey {
private static final String IMPLEMENTATION_NAME = IcebergTableParquetLocationKey.class.getSimpleName();

/**
* The {@link DataFile#dataSequenceNumber()} of the data file backing this keyed location.
*/
private final long dataSequenceNumber;

/**
* The {@link DataFile#fileSequenceNumber()} of the data file backing this keyed location.
*/
private final long fileSequenceNumber;

/**
* The {@link DataFile#pos()} of data file backing this keyed location.
*/
private final long dataFilePos;

/**
* The {@link ManifestFile#sequenceNumber()} of the manifest file from which the data file was discovered.
*/
private final long manifestSequenceNumber;

private final ParquetInstructions readInstructions;

private int cachedHashCode;

/**
* Construct a new IcebergTableParquetLocationKey for the supplied {@code fileUri} and {@code partitions}.
*
* @param fileUri The file that backs the keyed location
* @param dataFile The data file that backs the keyed location
* @param manifestFile The manifest file from which the data file was discovered
* @param fileUri The {@link URI} for the file that backs the keyed location
* @param order Explicit ordering index, taking precedence over other fields
* @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this
* parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will
* be made, so the calling code is free to mutate the map after this call
* @param readInstructions the instructions for customizations while reading
*/
public IcebergTableParquetLocationKey(
@NotNull final DataFile dataFile,
@NotNull final ManifestFile manifestFile,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final URI fileUri,
final int order,
@Nullable final Map<String, Comparable<?>> partitions,
@NotNull final ParquetInstructions readInstructions) {
super(fileUri, order, partitions, readInstructions);

// Files with unknown sequence numbers should be ordered last.
dataSequenceNumber = dataFile.dataSequenceNumber() != null ? dataFile.dataSequenceNumber() : Long.MAX_VALUE;
fileSequenceNumber = dataFile.fileSequenceNumber() != null ? dataFile.fileSequenceNumber() : Long.MAX_VALUE;

// This should never be null because we are discovering this data file through a non-null manifest file
dataFilePos = Require.neqNull(dataFile.pos(), "dataFile.pos()");

manifestSequenceNumber = manifestFile.sequenceNumber();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

this.readInstructions = readInstructions;
}

Expand All @@ -48,4 +87,80 @@ public String getImplementationName() {
public ParquetInstructions readInstructions() {
return readInstructions;
}

/**
* Precedence-wise this implementation compares {@code order}, then applies a {@link PartitionsComparator} to
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* {@code partitions}, then {@code dataSequenceNumber}, then {@code fileSequenceNumber}, then
* {@code manifestSequenceNumber}, then {@code dataFilePos}, and finally compares {@code uri}.
*
* @inheritDoc
*/
@Override
public int compareTo(@NotNull final TableLocationKey other) {
if (other instanceof IcebergTableParquetLocationKey) {
// TODO(deephaven-core#5989): Add unit tests for the ordering of data files
final IcebergTableParquetLocationKey otherTyped = (IcebergTableParquetLocationKey) other;
int comparisonResult;
if ((comparisonResult = Integer.compare(order, otherTyped.order)) != 0) {
return comparisonResult;
}
if ((comparisonResult = PartitionsComparator.INSTANCE.compare(partitions, otherTyped.partitions)) != 0) {
return comparisonResult;
}
if ((comparisonResult = Long.compare(dataSequenceNumber, otherTyped.dataSequenceNumber)) != 0) {
return comparisonResult;
}
if ((comparisonResult = Long.compare(fileSequenceNumber, otherTyped.fileSequenceNumber)) != 0) {
return comparisonResult;
}
if ((comparisonResult = Long.compare(manifestSequenceNumber, otherTyped.manifestSequenceNumber)) != 0) {
return comparisonResult;
}
if ((comparisonResult = Long.compare(dataFilePos, otherTyped.dataFilePos)) != 0) {
return comparisonResult;
}
return uri.compareTo(otherTyped.uri);
}
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
}

@Override
public boolean equals(@Nullable final Object other) {
if (this == other) {
return true;
}
if (!(other instanceof IcebergTableParquetLocationKey)) {
return false;
}
// Iceberg devs have confirmed that uri's are supposed to be unique across data files, but Iceberg doesn't have
// enough checks to enforce that. So for safety, we are comparing all fields and not just URI.
// https://apache-iceberg.slack.com/archives/C03LG1D563F/p1731352244907559
final IcebergTableParquetLocationKey otherTyped = (IcebergTableParquetLocationKey) other;
return dataSequenceNumber == otherTyped.dataSequenceNumber
&& fileSequenceNumber == otherTyped.fileSequenceNumber
&& dataFilePos == otherTyped.dataFilePos
&& manifestSequenceNumber == otherTyped.manifestSequenceNumber
&& super.equals(otherTyped);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public int hashCode() {
if (cachedHashCode == 0) {
final int prime = 31;
int result = 1;
result = prime * result + Long.hashCode(dataSequenceNumber);
result = prime * result + Long.hashCode(fileSequenceNumber);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
result = prime * result + Long.hashCode(dataFilePos);
result = prime * result + Long.hashCode(manifestSequenceNumber);
result = prime * result + super.hashCode();
// Don't use 0; that's used by StandaloneTableLocationKey, and also our sentinel for the need to compute
if (result == 0) {
final int fallbackHashCode = IcebergTableParquetLocationKey.class.hashCode();
cachedHashCode = fallbackHashCode == 0 ? 1 : fallbackHashCode;
} else {
cachedHashCode = result;
}
}
return cachedHashCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ public static IcebergUpdateMode staticMode() {
}

/**
* Set a manually-refreshing update mode for this table.
* Set a manually-refreshing update mode for this table. The ordering of the data in the table will depend on the
* order in which the data files are discovered on refresh.
*/
public static IcebergUpdateMode manualRefreshingMode() {
return MANUAL_REFRESHING;
}

/**
* Set a automatically-refreshing update mode for this table using the default refresh interval of 60 seconds.
* Set a automatically-refreshing update mode for this table using the default refresh interval of 60 seconds. The
* ordering of the data in the table will depend on the order in which the data files are discovered on refresh.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
public static IcebergUpdateMode autoRefreshingMode() {
return AUTO_REFRESHING;
Expand Down
Loading