Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ensure data read order reflects commit sequence in Iceberg tables #6341

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
27a134c
Added ordering for discovery of manifest files in a snapshot
malhotrashivam Nov 7, 2024
5af1281
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 7, 2024
4799ed5
Added ordering of Iceberg keys based on number of files discovered
malhotrashivam Nov 7, 2024
9a72894
Sorted data files on dataSequenceNumber while reading
malhotrashivam Nov 7, 2024
e9f2e66
Added data file ordering based on (dataSequenceNumber, fileSequenceNu…
malhotrashivam Nov 7, 2024
a0e209e
Review with Devin contd.
malhotrashivam Nov 8, 2024
3c116cb
Review with Devin Part 2
malhotrashivam Nov 8, 2024
b048c64
Added manifest sequence number for ordering of data files
malhotrashivam Nov 9, 2024
c9a4e86
Updated comments as per code review
malhotrashivam Nov 11, 2024
f42b602
Review with Ryan part 1
malhotrashivam Nov 12, 2024
0e477e0
Discussion with Devin Part 1
malhotrashivam Nov 12, 2024
968f769
Review with Devin Part 2
malhotrashivam Nov 14, 2024
9116c63
Review with Ryan contd.
malhotrashivam Nov 15, 2024
ad2bf0e
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 15, 2024
9a69898
Changed signrature of IcebergTableParquetLocationKey constructor
malhotrashivam Nov 15, 2024
0be2ff8
Removed an extra newline
malhotrashivam Nov 15, 2024
ceac1b2
Review with Ryan contd.
malhotrashivam Nov 18, 2024
814b96f
Using toString instead of name for tableIdentifier and renamed a vari…
malhotrashivam Nov 18, 2024
e358605
Updated PR based on review comments by Devin
malhotrashivam Nov 19, 2024
c994561
More changes to comparator logic
malhotrashivam Nov 19, 2024
7af3345
Some more changes
malhotrashivam Nov 19, 2024
503af9f
More review comments
malhotrashivam Nov 19, 2024
e762fbc
Including patch from Ryan
malhotrashivam Nov 19, 2024
22aee1d
Including patch from Ryan
malhotrashivam Nov 19, 2024
c440d6c
Moved TableIdentifierComparator next to location key
malhotrashivam Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class URITableLocationKey extends PartitionedTableLocationKey {
private static final String IMPLEMENTATION_NAME = URITableLocationKey.class.getSimpleName();

protected final URI uri;
private final int order;
protected final int order;

private int cachedHashCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,13 +62,16 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. Only
* accessed while synchronized on {@code this}.
*/
ParquetInstructions parquetInstructions;
private ParquetInstructions parquetInstructions;

/**
* Create a new {@link IcebergTableLocationKey} for the given {@link DataFile} and {@link URI}.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
protected IcebergTableLocationKey locationKey(
final org.apache.iceberg.FileFormat format,
final URI fileUri,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri,
@Nullable final Map<String, Comparable<?>> partitions) {

final org.apache.iceberg.FileFormat format = dataFile.format();
if (format == org.apache.iceberg.FileFormat.PARQUET) {
if (parquetInstructions == null) {
// Start with user-supplied instructions (if provided).
Expand All @@ -78,7 +82,7 @@ protected IcebergTableLocationKey locationKey(

// Add any column rename mappings.
if (!instructions.columnRenames().isEmpty()) {
for (Map.Entry<String, String> entry : instructions.columnRenames().entrySet()) {
for (final Map.Entry<String, String> entry : instructions.columnRenames().entrySet()) {
builder.addColumnNameMapping(entry.getKey(), entry.getValue());
}
}
Expand All @@ -96,7 +100,7 @@ protected IcebergTableLocationKey locationKey(

parquetInstructions = builder.build();
}
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions);
return new IcebergTableParquetLocationKey(dataFile, fileUri, 0, partitions, parquetInstructions);
}
throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'",
tableAdapter, snapshot.snapshotId(), format, fileUri));
Expand Down Expand Up @@ -140,6 +144,9 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
try {
// Retrieve the manifest files from the snapshot
final List<ManifestFile> manifestFiles = snapshot.allManifests(table.io());
// Sort manifest files by sequence number to read data files in the correct commit order
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
manifestFiles.sort(Comparator.comparingLong(ManifestFile::sequenceNumber));
// TODO(deephaven-core#5989: Add unit tests for the ordering of manifest files
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
for (final ManifestFile manifestFile : manifestFiles) {
// Currently only can process manifest files with DATA content type.
if (manifestFile.content() != ManifestContent.DATA) {
Expand All @@ -148,10 +155,10 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
table, snapshot.snapshotId(), manifestFile.content()));
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
for (DataFile df : reader) {
final URI fileUri = dataFileUri(df);
for (final DataFile dataFile : reader) {
final URI fileUri = dataFileUri(dataFile);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(dataFile, fileUri));
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public String toString() {
}

@Override
IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
return locationKey(df.format(), fileUri, null);
IcebergTableLocationKey keyFromDataFile(final DataFile dataFile, final URI fileUri) {
return locationKey(dataFile, fileUri, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ public String toString() {
}

@Override
IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
IcebergTableLocationKey keyFromDataFile(@NotNull final DataFile dataFile, @NotNull final URI fileUri) {
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();

final PartitionData partitionData = (PartitionData) df.partition();
final PartitionData partitionData = (PartitionData) dataFile.partition();
for (final ColumnData colData : outputPartitioningColumns) {
final String colName = colData.name;
final Object colValue = partitionData.get(colData.index);
Expand All @@ -94,6 +94,6 @@ IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
}
partitions.put(colName, (Comparable<?>) colValue);
}
return locationKey(df.format(), fileUri, partitions);
return locationKey(dataFile, fileUri, partitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import org.apache.iceberg.DataFile;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -18,24 +19,36 @@
public class IcebergTableParquetLocationKey extends ParquetTableLocationKey implements IcebergTableLocationKey {
private static final String IMPLEMENTATION_NAME = IcebergTableParquetLocationKey.class.getSimpleName();

private final long dataSequenceNumber;
private final long fileSequenceNumber;
private final long pos;

private final ParquetInstructions readInstructions;

/**
* Construct a new IcebergTableParquetLocationKey for the supplied {@code fileUri} and {@code partitions}.
*
* @param fileUri The file that backs the keyed location
* @param dataFile The data file that backs the keyed location
* @param fileUri The {@link URI} for the file that backs the keyed location
* @param order Explicit ordering index, taking precedence over other fields
* @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this
* parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will
* be made, so the calling code is free to mutate the map after this call
* @param readInstructions the instructions for customizations while reading
*/
public IcebergTableParquetLocationKey(
@NotNull final DataFile dataFile,
@NotNull final URI fileUri,
final int order,
@Nullable final Map<String, Comparable<?>> partitions,
@NotNull final ParquetInstructions readInstructions) {
super(fileUri, order, partitions, readInstructions);

// Following are used for ordering of data files. Files with unknown sequence numbers should be ordered last.
dataSequenceNumber = dataFile.dataSequenceNumber() != null ? dataFile.dataSequenceNumber() : Long.MAX_VALUE;
fileSequenceNumber = dataFile.fileSequenceNumber() != null ? dataFile.fileSequenceNumber() : Long.MAX_VALUE;
pos = dataFile.pos() != null ? dataFile.pos() : Long.MAX_VALUE;

this.readInstructions = readInstructions;
}

Expand All @@ -48,4 +61,36 @@ public String getImplementationName() {
public ParquetInstructions readInstructions() {
return readInstructions;
}

/**
* Precedence-wise this implementation compares {@code order}, then {@code dataSequenceNumber}, then
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* {@code fileSequenceNumber}, then {@code pos}, then applies a {@link PartitionsComparator} to {@code partitions}
* and finally {@code uri}.
*
* @inheritDoc
*/
@Override
public int compareTo(@NotNull final TableLocationKey other) {
if (other instanceof IcebergTableParquetLocationKey) {
final IcebergTableParquetLocationKey otherTyped = (IcebergTableParquetLocationKey) other;
int comparisonResult;
if ((comparisonResult = Integer.compare(order, otherTyped.order)) != 0) {
return comparisonResult;
}
if ((comparisonResult = Long.compare(dataSequenceNumber, otherTyped.dataSequenceNumber)) != 0) {
return comparisonResult;
}
if ((comparisonResult = Long.compare(fileSequenceNumber, otherTyped.fileSequenceNumber)) != 0) {
return comparisonResult;
}
if ((comparisonResult = Long.compare(pos, otherTyped.pos)) != 0) {
return comparisonResult;
}
if ((comparisonResult = PartitionsComparator.INSTANCE.compare(partitions, otherTyped.partitions)) != 0) {
return comparisonResult;
}
return uri.compareTo(otherTyped.uri);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ public static IcebergUpdateMode staticMode() {
}

/**
* Set a manually-refreshing update mode for this table.
* Set a manually-refreshing update mode for this table. The ordering of the data in the table will depend on the
* order in which the data files are discovered on refresh.
*/
public static IcebergUpdateMode manualRefreshingMode() {
return MANUAL_REFRESHING;
}

/**
* Set a automatically-refreshing update mode for this table using the default refresh interval of 60 seconds.
* Set a automatically-refreshing update mode for this table using the default refresh interval of 60 seconds. The
* ordering of the data in the table will depend on the order in which the data files are discovered on refresh.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
public static IcebergUpdateMode autoRefreshingMode() {
return AUTO_REFRESHING;
Expand Down