Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ensure data read order reflects commit sequence in Iceberg tables #6341

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
27a134c
Added ordering for discovery of manifest files in a snapshot
malhotrashivam Nov 7, 2024
5af1281
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 7, 2024
4799ed5
Added ordering of Iceberg keys based on number of files discovered
malhotrashivam Nov 7, 2024
9a72894
Sorted data files on dataSequenceNumber while reading
malhotrashivam Nov 7, 2024
e9f2e66
Added data file ordering based on (dataSequenceNumber, fileSequenceNu…
malhotrashivam Nov 7, 2024
a0e209e
Review with Devin contd.
malhotrashivam Nov 8, 2024
3c116cb
Review with Devin Part 2
malhotrashivam Nov 8, 2024
b048c64
Added manifest sequence number for ordering of data files
malhotrashivam Nov 9, 2024
c9a4e86
Updated comments as per code review
malhotrashivam Nov 11, 2024
f42b602
Review with Ryan part 1
malhotrashivam Nov 12, 2024
0e477e0
Discussion with Devin Part 1
malhotrashivam Nov 12, 2024
968f769
Review with Devin Part 2
malhotrashivam Nov 14, 2024
9116c63
Review with Ryan contd.
malhotrashivam Nov 15, 2024
ad2bf0e
Merge branch 'main' into nightly/sm-ice-read-order
malhotrashivam Nov 15, 2024
9a69898
Changed signrature of IcebergTableParquetLocationKey constructor
malhotrashivam Nov 15, 2024
0be2ff8
Removed an extra newline
malhotrashivam Nov 15, 2024
ceac1b2
Review with Ryan contd.
malhotrashivam Nov 18, 2024
814b96f
Using toString instead of name for tableIdentifier and renamed a vari…
malhotrashivam Nov 18, 2024
e358605
Updated PR based on review comments by Devin
malhotrashivam Nov 19, 2024
c994561
More changes to comparator logic
malhotrashivam Nov 19, 2024
7af3345
Some more changes
malhotrashivam Nov 19, 2024
503af9f
More review comments
malhotrashivam Nov 19, 2024
e762fbc
Including patch from Ryan
malhotrashivam Nov 19, 2024
22aee1d
Including patch from Ryan
malhotrashivam Nov 19, 2024
c440d6c
Moved TableIdentifierComparator next to location key
malhotrashivam Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,6 +65,16 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
*/
ParquetInstructions parquetInstructions;

/**
* The number of files discovered, used for ordering the keys.
*/
private int fileCount;

/**
* Create a new {@link IcebergTableLocationKey} for the given file URI.
* <p>
* This method assumes that keys are created in a specific order and that each key is created only once.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
protected IcebergTableLocationKey locationKey(
final org.apache.iceberg.FileFormat format,
final URI fileUri,
Expand Down Expand Up @@ -96,7 +108,7 @@ protected IcebergTableLocationKey locationKey(

parquetInstructions = builder.build();
}
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions);
return new IcebergTableParquetLocationKey(fileUri, fileCount++, partitions, parquetInstructions);
}
throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'",
tableAdapter, snapshot.snapshotId(), format, fileUri));
Expand Down Expand Up @@ -137,9 +149,11 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
return;
}
final Table table = tableAdapter.icebergTable();
final List<DataFile> dataFiles = new ArrayList<>();
try {
// Retrieve the manifest files from the snapshot
final List<ManifestFile> manifestFiles = snapshot.allManifests(table.io());
// TODO(deephaven-core#5989: Add unit tests for the ordering of manifest files
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
for (final ManifestFile manifestFile : manifestFiles) {
// Currently only can process manifest files with DATA content type.
if (manifestFile.content() != ManifestContent.DATA) {
Expand All @@ -148,20 +162,28 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
table, snapshot.snapshotId(), manifestFile.content()));
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
for (DataFile df : reader) {
final URI fileUri = dataFileUri(df);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
}
reader.forEach(dataFiles::add);
}
}
} catch (final Exception e) {
throw new TableDataException(
String.format("%s:%d - error finding Iceberg locations", tableAdapter, snapshot.snapshotId()), e);
}

// Sort manifest files to read data files in the correct commit order
dataFiles.sort(Comparator
.comparingLong(DataFile::dataSequenceNumber)
.thenComparingLong(DataFile::fileSequenceNumber)
.thenComparingLong(DataFile::pos));
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

for (final DataFile df : dataFiles) {
final URI fileUri = dataFileUri(df);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ public static IcebergUpdateMode staticMode() {
}

/**
* Set a manually-refreshing update mode for this table.
* Set a manually-refreshing update mode for this table. The ordering of the data in the table will depend on the
* order in which the data files are discovered on refresh.
*/
public static IcebergUpdateMode manualRefreshingMode() {
return MANUAL_REFRESHING;
}

/**
* Set a automatically-refreshing update mode for this table using the default refresh interval of 60 seconds.
* Set a automatically-refreshing update mode for this table using the default refresh interval of 60 seconds. The
* ordering of the data in the table will depend on the order in which the data files are discovered on refresh.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
public static IcebergUpdateMode autoRefreshingMode() {
return AUTO_REFRESHING;
Expand Down