Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error inject mar13 #6

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.FileIOUtils.killJVMIfDesired;

/**
* Abstract Write Client providing functionality for performing commit, index updates and rollback
* Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
Expand Down Expand Up @@ -218,6 +220,7 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
} finally {
this.txnManager.endTransaction(Option.of(inflightInstant));
}

// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
emitCommitMetrics(instantTime, metadata, commitActionType);
Expand Down Expand Up @@ -281,9 +284,21 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
* @param metadata instance of {@link HoodieCommitMetadata}.
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {

if (config.getBasePath().contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail4_mt_pre_commit.txt", "Fail metadata table commit for " + instantTime + " " + actionType, 16);
} else {
killJVMIfDesired("/tmp/fail4_dt_pre_commit.txt", "Fail after metadata table commit/services before data table commit "
+ instantTime + " " + actionType, 16);
}

context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table");
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
if (!config.getBasePath().contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail4_dt_post_commit.txt",
"Fail after metadata table commit/services before data table commit " + instantTime + " " + actionType, 16);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,21 @@ private Stream<HoodieInstant> getInstantsToArchive() {
throw new HoodieException("Error limiting instant archival based on metadata table", e);
}
}

// If this is a metadata table, do not archive the commits that live in data set
// active timeline. This is required by metadata table,
// see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getDataTableBasePathFromMetadataTable(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
Option<String> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
if (earliestActiveDatasetCommit.isPresent()) {
instants = instants.filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
}
}

return instants.flatMap(hoodieInstant ->
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.util.FileIOUtils.killJVMIfDesired;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;

Expand Down Expand Up @@ -819,6 +820,7 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst
final String compactionInstantTime = latestDeltacommitTime + "001";
if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
writeClient.compact(compactionInstantTime);
killJVMIfDesired("/tmp/fail92_mt_write.txt", "Fail metadata table just after compaction " + compactionInstantTime, 4);
}
}

Expand All @@ -836,6 +838,8 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan
// 3 is a value that I think is enough for metadata table reader.
return;
}

killJVMIfDesired("/tmp/fail102_mt_write.txt", "Fail metadata table just before cleaning " + instantTime, 12);
// Trigger cleaning with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.hudi.table.action.clean;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand All @@ -42,6 +39,8 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -55,6 +54,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.FileIOUtils.killJVMIfDesired;

public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -208,7 +209,16 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}
if (config.getBasePath().contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail32_mt_clean.txt", "Fail metadata table cleaning " + instantTime, 8);
} else {
killJVMIfDesired("/tmp/fail32_dt_clean.txt", "Fail data table cleaning before applying to MDT " + instantTime, 8);
}
writeTableMetadata(metadata, inflightInstant.getTimestamp());
if (!config.getBasePath().contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail32_dt_clean.txt", "Fail data table cleaning after applying to MDT, but before completing in DT "
+ instantTime, 8);
}
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.FileIOUtils.killJVMIfDesired;

public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieRollbackMetadata> {

private static final Logger LOG = LogManager.getLogger(BaseRollbackActionExecutor.class);
Expand Down Expand Up @@ -241,7 +243,17 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}

if (config.getBasePath().contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail72_mt_rollback.txt", "Fail metadata rollback for " + instantToRollback.toString(), 8);
} else {
killJVMIfDesired("/tmp/fail72_dt_rollback.txt", "Fail data table rollback just before writing to MDT " + instantToRollback.toString(), 8);
}
writeTableMetadata(rollbackMetadata);
if (!config.getBasePath().contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail72_dt_rollback.txt", "Fail data table rollback after writing to MDT, before completing in DT "
+ instantToRollback.toString(), 8);
}
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.FileIOUtils.killJVMIfDesired;

@SuppressWarnings("checkstyle:LineLength")
public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
BaseHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
Expand Down Expand Up @@ -307,11 +309,23 @@ protected void completeCompaction(HoodieCommitMetadata metadata,
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
try {
if (!basePath.contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail82_dt_compaction.txt", "Fail data table compaction before applying to MDT " + compactionCommitTime, 4);
}

this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
updateTableMetadata(table, metadata, compactionInstant);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);

if (basePath.contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail82_mt_compaction.txt", "Fail metadata table compaction " + compactionCommitTime, 4);
} else {
killJVMIfDesired("/tmp/fail82_dt_compaction.txt", "Fail data table compaction after applying to MDT, but before completing in DT "
+ compactionCommitTime, 4);
}

CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
this.txnManager.endTransaction(Option.of(compactionInstant));
Expand Down Expand Up @@ -384,6 +398,9 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,

final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
try {
if (!basePath.contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail81_dt_clustering.txt", "Fail data table clustering before applying to MDT " + clusteringCommitTime, 4);
}
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());

finalizeWrite(table, clusteringCommitTime, writeStats);
Expand All @@ -395,6 +412,13 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,

LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);

if (basePath.contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail81_mt_clustering.txt", "Fail metadata table clustering " + clusteringCommitTime, 4);
} else {
killJVMIfDesired("/tmp/fail81_dt_clustering.txt", "Fail data table clustering after applying to MDT, but before completing in DT "
+ clusteringCommitTime, 4);
}

table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.hudi.common.util.FileIOUtils.killJVMIfDesired;

public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {

private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
Expand Down Expand Up @@ -162,6 +164,7 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
metadataMetaClient.reloadActiveTimeline();
if (canTriggerTableService) {
cleanIfNecessary(writeClient, instantTime);
killJVMIfDesired("/tmp/fail112_mt_write.txt", "Fail metadata table just before archival " + instantTime, 12);
writeClient.archive();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@

import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
import static org.apache.hudi.common.util.FileIOUtils.killJVMIfDesired;

public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
Expand Down Expand Up @@ -263,6 +264,11 @@ protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatu
protected void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatusRDD, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
updateIndex(writeStatusRDD, result);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
if (config.getBasePath().contains(".hoodie/metadata")) {
killJVMIfDesired("/tmp/fail2_mt_write.txt", "Fail metadata table writing before commit " + instantTime, 8);
} else {
killJVMIfDesired("/tmp/fail1_dt_write.txt", "Fail data table writing before commit " + instantTime, 8);
}
commitOnAutoCommit(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -42,6 +43,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -204,4 +206,38 @@ public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.
public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) {
return readDataFromPath(fileSystem, detailPath, false);
}

public static void killJVMIfDesired(String signalFilePath, String msg) {
try {
final String val = FileIOUtils.readAsUTFString(new FileInputStream(signalFilePath));
boolean kill = Boolean.parseBoolean(val.trim());
if (kill) {
System.out.println("Killing the jvm at " + signalFilePath + " Reason: " + msg);
System.exit(1);
}
} catch (Exception e) {
System.err.println(">>> error killing the jvm at " + signalFilePath + " ...");
e.printStackTrace();
}
}

/**
* Kill with probability of 1/denom
*
* @param signalFilePath
* @param msg
* @param denom
*/
public static void killJVMIfDesired(String signalFilePath, String msg, int denom) {
try {
boolean kill = new Random().nextInt(denom) == 0;
if (kill) {
System.out.println("Killing the jvm at " + signalFilePath + " Reason: " + msg);
System.exit(1);
}
} catch (Exception e) {
System.err.println(">>> error killing the jvm at " + signalFilePath + " ...");
e.printStackTrace();
}
}
}