Skip to content

Commit

Permalink
HBASE-26938 Compaction failures after StoreFileTracker integration
Browse files Browse the repository at this point in the history
Compactions might be concurrent against a given store and the Compactor is
shared among them. Do not put mutable state into shared class fields. All
Compactor class fields should be final or effectively final.
'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
  • Loading branch information
apurtell committed Apr 13, 2022
1 parent 242a194 commit 8c003fb
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
Expand All @@ -51,6 +52,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
Expand All @@ -61,7 +63,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
Expand Down Expand Up @@ -285,17 +286,19 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
Expand Down Expand Up @@ -661,9 +664,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
}
}


@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// rows that has cells from both memstore and files (or only files)
private LongAdder mixedRowReadsCount = new LongAdder();

private boolean cacheOnWriteLogged;

/**
* Lock specific to archiving compacted store files. This avoids races around
* the combination of retrieving the list of compacted files and moving them to
Expand Down Expand Up @@ -290,7 +288,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
family.getCompressionType());
cacheOnWriteLogged = false;
}

private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
Expand Down Expand Up @@ -1157,23 +1154,28 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
}
replaceStoreFiles(filesToCompact, sfs, true);

// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// have failed.
storeEngine.resetCompactionWriter();

if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
} else {
compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
// Compaction progress for the request will be removed after completeCompaction so be sure
// this code runs before you call completeCompaction.
CompactionProgress progress = storeEngine.getCompactor().getProgress(cr);
if (progress != null) {
if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
} else {
compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
compactedCellsSize.addAndGet(progress.totalCompactedSize);
}
}
long outputBytes = getTotalSize(sfs);

// At this point the store will use new files for all new scanners.
refreshStoreSizeAndTotalBytes(); // update store size.

// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// has failed.
storeEngine.completeCompaction(cr);

long now = EnvironmentEdgeManager.currentTime();
if (region.getRegionServerServices() != null
&& region.getRegionServerServices().getMetrics() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
*/
@InterfaceAudience.Private
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
C extends Compactor, SFM extends StoreFileManager> {
C extends Compactor<?>, SFM extends StoreFileManager> {

private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);

Expand Down Expand Up @@ -157,7 +157,7 @@ public CompactionPolicy getCompactionPolicy() {
/**
* @return Compactor to use.
*/
public Compactor getCompactor() {
public Compactor<?> getCompactor() {
return this.compactor;
}

Expand Down Expand Up @@ -545,14 +545,15 @@ public boolean requireWritingToTmpDirFirst() {
}

/**
* Resets the compaction writer when the new file is committed and used as active storefile.
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
* have failed. Currently called in
* Completes the compaction, cleaning up resources, once the new file is committed and used as
* active storefile. This step is necessary for the correctness of BrokenStoreFileCleanerChore.
* It lets the CleanerChore know that compaction is done and the file can be cleaned up if
* compaction has failed. Currently called in
* @param request the compaction request
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
*/
public void resetCompactionWriter(){
compactor.resetWriter();
public void completeCompaction(CompactionRequestImpl request) {
compactor.completeCompaction(request);
}

@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
}

@Override
protected void abortWriter() throws IOException {
protected void abortWriter(AbstractMultiFileWriter writer) throws IOException {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
Expand All @@ -79,7 +79,6 @@ protected void abortWriter() throws IOException {
e);
}
}
//this step signals that the target file is no longer writen and can be cleaned up
writer = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class CompactionProgress {
private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class);

/** the total compacting key values in currently running compaction */
private long totalCompactingKVs;
public long totalCompactingKVs;
/** the completed count of key values in currently running compaction */
public long currentCompactedKVs = 0;
/** the total size of data processed by the currently running compaction, in bytes */
Expand Down
Loading

0 comments on commit 8c003fb

Please sign in to comment.