Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1707] Enhance IcebergDataset to detect when files already at dest then proceed with only delta #3575

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.util.function.CheckedExceptionFunction;
import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -61,6 +68,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
private final IcebergTable icebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired

private final Optional<URI> sourceCatalogMetastoreURI;
private final Optional<URI> targetCatalogMetastoreURI;
Expand Down Expand Up @@ -127,15 +135,14 @@ protected Iterator<FileSet<CopyEntity>> createFileSets(FileSystem targetFs, Copy
Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus(targetFs, copyConfig);
log.info("~{}.{}~ found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());

Configuration defaultHadoopConfiguration = new Configuration();
for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
Path srcPath = entry.getKey();
FileStatus srcFileStatus = entry.getValue();
// TODO: determine whether unnecessarily expensive to repeatedly re-create what should be the same FS: could it
// instead be created once and reused thereafter?
// TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);

// TODO: Add preservation of ancestor ownership and permissions!
Expand All @@ -149,34 +156,151 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
log.info("{}.{} - generated {} copy entities", dbName, inputTableName, copyEntities.size());
log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}

/**
* Finds all files of the Iceberg's current snapshot
* Returns a map of path, file status for each file that needs to be copied
* @return a map of path, file status for each file that needs to be copied
*/
protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
Map<Path, FileStatus> result = Maps.newHashMap();
protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
IcebergTable icebergTable = this.getIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
// omit considering timestamp (or other markers of freshness), as files should be immutable
// ATTENTION: `CopyContext.getFileStatus()`, to partake in caching
copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()
);

// check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) &&
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
log.info("~{}.{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
return Maps.newHashMap();
}
Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
Iterator<String> filePathsIterator = Iterators.concat(
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
// TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
return snapshotInfo.getAllPaths().iterator();
// log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
String manListPath = snapshotInfo.getManifestListPath();
log.info("~{}.{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
// ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
// most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend
// also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
// thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
// already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
// hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply
// its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
// metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least
// some of the children pointed to within could have been copied prior, when they previously appeared as a
// child of the current file's predecessor (which this new meta file now replaces).
if (!isPresentOnTarget.apply(manListPath)) {
List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add comment here to mention that only oldest snapshot which most likely already been copied contains the all old files, the mfi from newer snapshot only contain new added file in those manifest?

Copy link
Contributor Author

@phet phet Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those are definitely the semantics of using IcebergTable.getIncrementalSnapshotInfosIterator(). it's in the javadoc there. where do you want me to repeat it over here? maybe should I note those semantics above here when I call that method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can add more clarity in IcebergTable.getIncrementalSnapshotInfosIterator(), at least I was missing the assumption that it return the snapshot from oldest to latest. and files only appear once in earliest snapshot that contains it. Might be just me though...

Copy link
Contributor Author

@phet phet Oct 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing. I added this:

This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} from the n-th or prior elements. Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from which it first becomes reachable.

Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).

if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
missingPaths.add(mfi.getManifestFilePath());
// being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now.
// skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents
// during a period where replication fell so far behind that no snapshots listed among current metadata
// are yet at dest. since the consequence of unnecessary copy is merely wasted data transfer and
// compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since
// file count may run into 100k+ (even beyond!)
missingPaths.addAll(mfi.getListedFilePaths());
}
}
log.info("~{}.{}~ snapshot '{}': collected {} additional source paths",
dbName, inputTableName, snapshotInfo.getSnapshotId(), missingPaths.size());
return missingPaths.iterator();
} else {
log.info("~{}.{}~ snapshot '{}' already present on target... skipping (including contents)",
dbName, inputTableName, snapshotInfo.getSnapshotId());
// IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
Optional<String> metadataPath = snapshotInfo.getMetadataPath();
Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p));
metadataPath.ifPresent(ignore ->
log.info("~{}.{}~ metadata IS {} already present on target", dbName, inputTableName,
nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
);
return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
}
})
);

Map<Path, FileStatus> results = Maps.newHashMap();
long numSourceFilesNotFound = 0L;
Iterable<String> filePathsIterable = () -> filePathsIterator;
// TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
// likely benefit from parallelism
for (String pathString : filePathsIterable) {
Path path = new Path(pathString);
result.put(path, this.sourceFs.getFileStatus(path));
try {
// TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
// to benefit from parallelism
GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
PathErrorConsolidator errorConsolidator = new PathErrorConsolidator();
for (String pathString : filePathsIterable) {
Path path = new Path(pathString);
try {
results.put(path, this.sourceFs.getFileStatus(path));
if (growthTracker.isAnotherMilestone(results.size())) {
log.info("~{}.{}~ collected file status on '{}' source paths", dbName, inputTableName, results.size());
}
} catch (FileNotFoundException fnfe) {
if (!shouldTolerateMissingSourceFiles) {
throw fnfe;
} else {
// log, but otherwise swallow... to continue on
String total = ++numSourceFilesNotFound + " total";
String speculation = "either premature deletion broke time-travel or metadata read interleaved among delete";
errorConsolidator.prepLogMsg(path).ifPresent(msg ->
log.warn("~{}.{}~ source {} ({}... {})", dbName, inputTableName, msg, speculation, total)
);
}
}
}
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
wrapper.rethrowWrapped();
}
return results;
}

/**
* Stateful object to consolidate error messages (e.g. for logging), per a {@link Path} consolidation strategy.
* OVERVIEW: to avoid run-away logging into the 1000s of lines, consolidate to parent (directory) level:
* 1. on the first path within the dir, log that specific path
* 2. on the second path within the dir, log the dir path as a summarization (with ellipsis)
* 3. thereafter, skip, logging nothing
* The directory, parent path is the default consolidation strategy, yet may be overridden.
*/
@NotThreadSafe
protected static class PathErrorConsolidator {
private final Map<Path, Boolean> consolidatedPathToWhetherErrorLogged = Maps.newHashMap();

/** @return consolidated message to log, iff appropriate; else `Optional.empty()` when deserves inhibition */
public Optional<String> prepLogMsg(Path path) {
Path consolidatedPath = calcPathConsolidation(path);
Boolean hadAlreadyLoggedConsolidation = this.consolidatedPathToWhetherErrorLogged.get(consolidatedPath);
if (!Boolean.valueOf(true).equals(hadAlreadyLoggedConsolidation)) {
boolean shouldLogConsolidationNow = hadAlreadyLoggedConsolidation != null;
consolidatedPathToWhetherErrorLogged.put(consolidatedPath, shouldLogConsolidationNow);
String pathLogString = shouldLogConsolidationNow ? (consolidatedPath.toString() + "/...") : path.toString();
return Optional.of("path" + (shouldLogConsolidationNow ? "s" : " ") + " not found: '" + pathLogString + "'");
} else {
return Optional.empty();
}
}

/** @return a {@link Path} to consolidate around; default is: {@link Path#getParent()} */
protected Path calcPathConsolidation(Path path) {
return path.getParent();
}
return result;
}

@VisibleForTesting
static PathErrorConsolidator createPathErrorConsolidator() {
return new PathErrorConsolidator();
}

/** Add layer of indirection to permit test mocking by working around `FileSystem.get()` `static` method */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ public List<String> getAllDataFilePaths() {
return manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
}

public List<String> getAllPaths() {
/** @return the `manifestListPath` and `metadataPath`, if present */
public List<String> getSnapshotApexPaths() {
List<String> result = metadataPath.map(Lists::newArrayList).orElse(Lists.newArrayList());
result.add(manifestListPath);
return result;
}

public List<String> getAllPaths() {
List<String> result = getSnapshotApexPaths();
result.addAll(getManifestFilePaths());
result.addAll(getAllDataFilePaths());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()));
}

/** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */
public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException {
TableMetadata current = accessTableMetadata();
return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), true);
}

/** @return metadata info for all known snapshots, ordered historically, with *most recent last* */
public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() throws IOException {
TableMetadata current = accessTableMetadata();
Expand All @@ -90,23 +96,31 @@ public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() throws IOExce
}

/**
* @return metadata info for all known snapshots, but incrementally, so content overlap between snapshots appears
* only within the first as they're ordered historically, with *most recent last*
* @return metadata info for all known snapshots, but incrementally, so overlapping entries within snapshots appear
* only with the first as they're ordered historically, with *most recent last*.
*
* This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit
* all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()}
* from the n-th or prior elements. Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this
* mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from
* which it first becomes reachable.
*
* Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
*/
public Iterator<IcebergSnapshotInfo> getIncrementalSnapshotInfosIterator() throws IOException {
// TODO: investigate using `.addedFiles()`, `.deletedFiles()` to calc this
Set<String> knownManifestListFilePaths = Sets.newHashSet();
Set<String> knownManifestFilePaths = Sets.newHashSet();
Set<String> knownListedFilePaths = Sets.newHashSet();
Set<String> knownFilePaths = Sets.newHashSet(); // as absolute paths are clearly unique, use a single set for all
return Iterators.filter(Iterators.transform(getAllSnapshotInfosIterator(), snapshotInfo -> {
if (false == knownManifestListFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
log.info("~{}~ before snapshot '{}' - '{}' total known iceberg paths",
tableId, snapshotInfo.getSnapshotId(), knownFilePaths.size());
if (false == knownFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
return snapshotInfo.toBuilder().manifestListPath(null).build(); // use `null` as marker to surrounding `filter`
}
List<IcebergSnapshotInfo.ManifestFileInfo> novelManifestInfos = Lists.newArrayList();
for (ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
if (true == knownManifestFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
if (true == knownFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
List<String> novelListedPaths = mfi.getListedFilePaths().stream()
.filter(fpath -> true == knownListedFilePaths.add(fpath)) // heretofore unknown
.filter(fpath -> true == knownFilePaths.add(fpath)) // heretofore unknown
.collect(Collectors.toList());
if (novelListedPaths.size() == mfi.getListedFilePaths().size()) { // nothing filtered
novelManifestInfos.add(mfi); // reuse orig
Expand All @@ -130,15 +144,18 @@ protected TableMetadata accessTableMetadata() throws TableNotFoundException {
}

protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation) throws IOException {
return createSnapshotInfo(snapshot, metadataFileLocation, false);
}

protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation, boolean skipManifestFileInfo) throws IOException {
// TODO: verify correctness, even when handling 'delete manifests'!
List<ManifestFile> manifests = snapshot.allManifests();
return new IcebergSnapshotInfo(
snapshot.snapshotId(),
Instant.ofEpochMilli(snapshot.timestampMillis()),
metadataFileLocation,
snapshot.manifestListLocation(),
// NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
calcAllManifestFileInfos(manifests, tableOps.io())
skipManifestFileInfo ? Lists.newArrayList() : calcAllManifestFileInfos(snapshot.allManifests(), tableOps.io())
);
}

Expand Down
Loading