Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26938 Compaction failures after StoreFileTracker integration #4338

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
Expand All @@ -51,6 +52,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
Expand All @@ -61,7 +63,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
Expand Down Expand Up @@ -285,17 +286,19 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
Expand Down Expand Up @@ -661,9 +664,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
}
}


@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// rows that has cells from both memstore and files (or only files)
private LongAdder mixedRowReadsCount = new LongAdder();

private boolean cacheOnWriteLogged;

/**
* Lock specific to archiving compacted store files. This avoids races around
* the combination of retrieving the list of compacted files and moving them to
Expand Down Expand Up @@ -290,7 +288,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
family.getCompressionType());
cacheOnWriteLogged = false;
}

private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
Expand Down Expand Up @@ -1157,23 +1154,28 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
}
replaceStoreFiles(filesToCompact, sfs, true);

// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// have failed.
storeEngine.resetCompactionWriter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed any more?

Copy link
Contributor Author

@apurtell apurtell Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave it in the API, but the current implementation only set the writer field to null, and so the method bodies become empty after that field is converted into a parameter and they no longer have anything to do, so I removed it.

If we are going to keep it, we need to use a Map instead of Set to track the writers, and somehow need to pass a key as a parameter to abort and remove a StoreFileWriter instance. What should be the key? The CompactionRequestImpl? I do not think there is a requirement to abort compaction writers in this way. We abort compactions today by interrupting the thread.

If BrokenStoreFileCleanerChore will not function correctly without this, then it will need modification.

I think BrokenStoreFileCleanerChore sees the same results from getCompactionTargets after these changes. When the compaction is finished the StoreFileWriter will be removed from the set in the finally block of compact, so getCompactionTargets will not include the files being written by that writer after that point, which is the same thing that happened when resetCompactionWriter would cause the writer field in the previous impl to become null, and also the files being written by that writer would no longer appear in getCompactionTargets results afterward. But the timing has changed, that is true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically we need this to keep correctness. IIRC, the problem here is that, we can only cleanup the writer instance after we successfully commit the store files to SFT, i.e, after the replaceStoreFile method. That's why we can not just simply remove the writer instance in commitWriter, otherwise there could be data loss, i.e, the BrokenStoreFileCleanerChore may delete the store files which are written just now but have not been added to the SFT yet...

Let me check again if the new implementation can solve the problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that, you know BrokenStoreFileCleanerChore best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah actually there is even a problem here in the current code, let me fix it...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, at least now we do not remove the writer from the set until after commit.


if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
} else {
compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
// Compaction progress for the request will be removed after completeCompaction so be sure
// this code runs before you call completeCompaction.
CompactionProgress progress = storeEngine.getCompactor().getProgress(cr);
if (progress != null) {
if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
} else {
compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
compactedCellsSize.addAndGet(progress.totalCompactedSize);
}
}
long outputBytes = getTotalSize(sfs);

// At this point the store will use new files for all new scanners.
refreshStoreSizeAndTotalBytes(); // update store size.

// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// has failed.
storeEngine.completeCompaction(cr);

long now = EnvironmentEdgeManager.currentTime();
if (region.getRegionServerServices() != null
&& region.getRegionServerServices().getMetrics() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
*/
@InterfaceAudience.Private
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
C extends Compactor, SFM extends StoreFileManager> {
C extends Compactor<?>, SFM extends StoreFileManager> {

private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);

Expand Down Expand Up @@ -157,7 +157,7 @@ public CompactionPolicy getCompactionPolicy() {
/**
* @return Compactor to use.
*/
public Compactor getCompactor() {
public Compactor<?> getCompactor() {
return this.compactor;
}

Expand Down Expand Up @@ -545,14 +545,15 @@ public boolean requireWritingToTmpDirFirst() {
}

/**
* Resets the compaction writer when the new file is committed and used as active storefile.
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
* have failed. Currently called in
* Completes the compaction, cleaning up resources, once the new file is committed and used as
* active storefile. This step is necessary for the correctness of BrokenStoreFileCleanerChore.
* It lets the CleanerChore know that compaction is done and the file can be cleaned up if
* compaction has failed. Currently called in
* @param request the compaction request
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
*/
public void resetCompactionWriter(){
compactor.resetWriter();
public void completeCompaction(CompactionRequestImpl request) {
compactor.completeCompaction(request);
}

@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
}

@Override
protected void abortWriter() throws IOException {
protected void abortWriter(AbstractMultiFileWriter writer) throws IOException {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
Expand All @@ -79,7 +79,6 @@ protected void abortWriter() throws IOException {
e);
}
}
//this step signals that the target file is no longer writen and can be cleaned up
writer = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class CompactionProgress {
private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class);

/** the total compacting key values in currently running compaction */
private long totalCompactingKVs;
public long totalCompactingKVs;
/** the completed count of key values in currently running compaction */
public long currentCompactedKVs = 0;
/** the total size of data processed by the currently running compaction, in bytes */
Expand Down
Loading