Skip to content

Commit

Permalink
Spark Stage retry handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bvaradar authored and vinothchandar committed May 21, 2019
1 parent 3fd2fd6 commit 145034c
Show file tree
Hide file tree
Showing 53 changed files with 1,660 additions and 963 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
Expand Down Expand Up @@ -245,7 +246,7 @@ protected static List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsToAl
"Expect new log version to be sane");
HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(),
FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()),
compactionInstant, lf.getLogVersion() - maxVersion)));
compactionInstant, lf.getLogVersion() - maxVersion, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
return Pair.of(lf, newLogFile);
}).collect(Collectors.toList());
}
Expand Down Expand Up @@ -436,7 +437,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
List<HoodieLogFile> logFilesToRepair =
merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant))
.sorted(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed())
.sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList());
FileSlice fileSliceForCompaction =
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime())
Expand All @@ -451,7 +452,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
for (HoodieLogFile toRepair : logFilesToRepair) {
int version = maxUsedVersion + 1;
HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(),
logExtn, operation.getBaseInstantTime(), version)));
logExtn, operation.getBaseInstantTime(), version, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
result.add(Pair.of(toRepair, newLf));
maxUsedVersion = version;
}
Expand Down
57 changes: 37 additions & 20 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
Expand Down Expand Up @@ -333,20 +334,27 @@ private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> deduped
String commitTime, HoodieTable<T> table,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
final int parallelism = config.getBulkInsertShuffleParallelism();
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords = bulkInsertPartitioner.get()
.repartitionRecords(dedupedRecords, config.getBulkInsertShuffleParallelism());
.repartitionRecords(dedupedRecords, parallelism);
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getBulkInsertShuffleParallelism());
}, true, parallelism);
}

//generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes = IntStream.range(0, parallelism)
.mapToObj(i -> FSUtils.createNewFileIdPfx())
.collect(Collectors.toList());

JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true)
.flatMap(writeStatuses -> writeStatuses.iterator());

return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
Expand Down Expand Up @@ -498,20 +506,7 @@ private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
updateMetadataAndRollingStats(actionType, metadata, stats);

// Finalize write
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
try {
table.finalizeWrite(jsc, stats);
if (finalizeCtx != null) {
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> {
logger.info("Finalize write elapsed time (milliseconds): " + duration);
metrics.updateFinalizeWriteMetrics(duration, stats.size());
});
}
} catch (HoodieIOException ioe) {
throw new HoodieCommitException(
"Failed to complete commit " + commitTime + " due to finalize errors.", ioe);
}
finalizeWrite(table, commitTime, stats);

// add in extra metadata
if (extraMetadata.isPresent()) {
Expand Down Expand Up @@ -1270,7 +1265,7 @@ protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTa
String compactionCommitTime, boolean autoCommit, Optional<Map<String, String>> extraMetadata) {
if (autoCommit) {
HoodieCommitMetadata metadata =
doCompactionCommit(compactedStatuses, table.getMetaClient(), compactionCommitTime, extraMetadata);
doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
Expand All @@ -1288,6 +1283,23 @@ protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTa
}
}

private void finalizeWrite(HoodieTable<T> table, String instantTime, List<HoodieWriteStat> stats) {
try {
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
table.finalizeWrite(jsc, instantTime, stats);
if (finalizeCtx != null) {
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> {
logger.info("Finalize write elapsed time (milliseconds): " + duration);
metrics.updateFinalizeWriteMetrics(duration, stats.size());
});
}
} catch (HoodieIOException ioe) {
throw new HoodieCommitException(
"Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
}
}

/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*
Expand All @@ -1301,8 +1313,9 @@ void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}

private HoodieCommitMetadata doCompactionCommit(JavaRDD<WriteStatus> writeStatuses,
HoodieTableMetaClient metaClient, String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses,
String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = table.getMetaClient();
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat)
.collect();

Expand All @@ -1311,6 +1324,10 @@ private HoodieCommitMetadata doCompactionCommit(JavaRDD<WriteStatus> writeStatus
metadata.addWriteStat(stat.getPartitionPath(), stat);
}

// Finalize write
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
finalizeWrite(table, compactionCommitTime, stats);

// Copy extraMetadata
extraMetadata.ifPresent(m -> {
m.entrySet().stream().forEach(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,26 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE =
"hoodie.copyonwrite.use" + ".temp.folder.for.create";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "false";
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE =
"hoodie.copyonwrite.use" + ".temp.folder.for.merge";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
private static final String CONSISTENCY_CHECK_ENABLED = "hoodie.consistency.check.enabled";
private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled";
private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false";

// time between successive attempts to ensure written data's metadata is consistent on storage
private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
"hoodie.consistency.check.initial_interval_ms";
private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;

// max interval time
private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;

// maximum number of checks, for consistency of written data. Will wait upto 256 Secs
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;

// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
Expand Down Expand Up @@ -148,29 +155,28 @@ public String getWriteStatusClassName() {
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
}

public boolean shouldUseTempFolderForCopyOnWriteForCreate() {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE));
public int getFinalizeWriteParallelism() {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}

public boolean shouldUseTempFolderForCopyOnWriteForMerge() {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE));
public boolean isConsistencyCheckEnabled() {
return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
}

public boolean shouldUseTempFolderForCopyOnWrite() {
return shouldUseTempFolderForCopyOnWriteForCreate()
|| shouldUseTempFolderForCopyOnWriteForMerge();
public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}

public int getFinalizeWriteParallelism() {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
public int getMaxConsistencyChecks() {
return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP));
}

public boolean isConsistencyCheckEnabled() {
return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED));
public int getInitialConsistencyCheckIntervalMs() {
return Integer.parseInt(props.getProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
}

public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
public int getMaxConsistencyCheckIntervalMs() {
return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
}

/**
Expand Down Expand Up @@ -588,20 +594,6 @@ public Builder withWriteStatusClass(Class<? extends WriteStatus> writeStatusClas
return this;
}

public Builder withUseTempFolderCopyOnWriteForCreate(
boolean shouldUseTempFolderCopyOnWriteForCreate) {
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
String.valueOf(shouldUseTempFolderCopyOnWriteForCreate));
return this;
}

public Builder withUseTempFolderCopyOnWriteForMerge(
boolean shouldUseTempFolderCopyOnWriteForMerge) {
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
String.valueOf(shouldUseTempFolderCopyOnWriteForMerge));
return this;
}

public Builder withFileSystemViewConfig(FileSystemViewStorageConfig viewStorageConfig) {
props.putAll(viewStorageConfig.getProps());
isViewConfigSet = true;
Expand All @@ -614,7 +606,7 @@ public Builder withFinalizeWriteParallelism(int parallelism) {
}

public Builder withConsistencyCheckEnabled(boolean enabled) {
props.setProperty(CONSISTENCY_CHECK_ENABLED, String.valueOf(enabled));
props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, String.valueOf(enabled));
return this;
}

Expand All @@ -623,6 +615,21 @@ public Builder withEmbeddedTimelineServerEnabled(boolean enabled) {
return this;
}

public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) {
props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(initialIntevalMs));
return this;
}

public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(maxIntervalMs));
return this;
}

public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(maxConsistencyChecks));
return this;
}

public HoodieWriteConfig build() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
Expand All @@ -643,18 +650,18 @@ public HoodieWriteConfig build() {
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED),
CONSISTENCY_CHECK_ENABLED, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP),
MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));

// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,19 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload> implements
private String commitTime;
private HoodieWriteConfig config;
private HoodieTable<T> hoodieTable;
private List<String> fileIDPrefixes;

public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config,
HoodieTable<T> hoodieTable) {
HoodieTable<T> hoodieTable, List<String> fileIDPrefixes) {
this.commitTime = commitTime;
this.config = config;
this.hoodieTable = hoodieTable;
this.fileIDPrefixes = fileIDPrefixes;
}

@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> sortedRecordItr) throws Exception {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable);
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable,
fileIDPrefixes.get(partition));
}
}
Loading

0 comments on commit 145034c

Please sign in to comment.