From 8c003fb6e37f78ffda6031dc4b9dd9a391daa42f Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 8 Apr 2022 17:29:30 -0700 Subject: [PATCH] HBASE-26938 Compaction failures after StoreFileTracker integration Compactions might be concurrent against a given store and the Compactor is shared among them. Do not put mutable state into shared class fields. All Compactor class fields should be final or effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. --- .../hbase/mob/DefaultMobStoreCompactor.java | 12 +- .../hadoop/hbase/regionserver/HStore.java | 30 ++-- .../hbase/regionserver/StoreEngine.java | 17 +-- .../AbstractMultiOutputCompactor.java | 5 +- .../compactions/CompactionProgress.java | 2 +- .../regionserver/compactions/Compactor.java | 130 ++++++++++++------ .../compactions/DateTieredCompactor.java | 3 +- .../compactions/DefaultCompactor.java | 9 +- .../compactions/StripeCompactor.java | 3 +- .../hbase/mob/FaultyMobStoreCompactor.java | 7 +- .../regionserver/TestCompactorMemLeak.java | 5 +- .../regionserver/TestMajorCompaction.java | 19 --- .../TestSplitTransactionOnCluster.java | 17 +-- 13 files changed, 138 insertions(+), 121 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 15f0a73a9df9..ff658adb1502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; @@ -61,7 +63,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -285,17 +286,19 @@ private void calculateMobLengthMap(SetMultimap mobRefs) throw * * @param fd File details * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param throughputController The compaction throughput controller. * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Clear old mob references @@ -661,9 +664,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId } } - @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 730182866300..4c178c73dfcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -156,8 +156,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, // rows that has cells from both memstore and files (or only files) private LongAdder mixedRowReadsCount = new LongAdder(); - private boolean cacheOnWriteLogged; - /** * Lock specific to archiving compacted store files. This avoids races around * the combination of retrieving the list of compacted files and moving them to @@ -290,7 +288,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); - cacheOnWriteLogged = false; } private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { @@ -1157,23 +1154,28 @@ protected List doCompaction(CompactionRequestImpl cr, } replaceStoreFiles(filesToCompact, sfs, true); - // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the - // CleanerChore know that compaction is done and the file can be cleaned up if compaction - // have failed. - storeEngine.resetCompactionWriter(); - - if (cr.isMajor()) { - majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); - } else { - compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); + // Compaction progress for the request will be removed after completeCompaction so be sure + // this code runs before you call completeCompaction. + CompactionProgress progress = storeEngine.getCompactor().getProgress(cr); + if (progress != null) { + if (cr.isMajor()) { + majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); + } else { + compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + compactedCellsSize.addAndGet(progress.totalCompactedSize); + } } long outputBytes = getTotalSize(sfs); // At this point the store will use new files for all new scanners. refreshStoreSizeAndTotalBytes(); // update store size. + // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the + // CleanerChore know that compaction is done and the file can be cleaned up if compaction + // has failed. + storeEngine.completeCompaction(cr); + long now = EnvironmentEdgeManager.currentTime(); if (region.getRegionServerServices() != null && region.getRegionServerServices().getMetrics() != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index d85553ac8082..b534b7075012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -94,7 +94,7 @@ */ @InterfaceAudience.Private public abstract class StoreEngine { + C extends Compactor, SFM extends StoreFileManager> { private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); @@ -157,7 +157,7 @@ public CompactionPolicy getCompactionPolicy() { /** * @return Compactor to use. */ - public Compactor getCompactor() { + public Compactor getCompactor() { return this.compactor; } @@ -545,14 +545,15 @@ public boolean requireWritingToTmpDirFirst() { } /** - * Resets the compaction writer when the new file is committed and used as active storefile. - * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the - * CleanerChore know that compaction is done and the file can be cleaned up if compaction - * have failed. Currently called in + * Completes the compaction, cleaning up resources, once the new file is committed and used as + * active storefile. This step is necessary for the correctness of BrokenStoreFileCleanerChore. + * It lets the CleanerChore know that compaction is done and the file can be cleaned up if + * compaction has failed. Currently called in + * @param request the compaction request * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List) */ - public void resetCompactionWriter(){ - compactor.resetWriter(); + public void completeCompaction(CompactionRequestImpl request) { + compactor.completeCompaction(request); } @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index 19b7a98627e6..b042a158b73c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -68,7 +68,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) } @Override - protected void abortWriter() throws IOException { + protected void abortWriter(AbstractMultiFileWriter writer) throws IOException { FileSystem fs = store.getFileSystem(); for (Path leftoverFile : writer.abortWriters()) { try { @@ -79,7 +79,6 @@ protected void abortWriter() throws IOException { e); } } - //this step signals that the target file is no longer writen and can be cleaned up - writer = null; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 942cc4f3fd6b..2ccdd150cd21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -37,7 +37,7 @@ public class CompactionProgress { private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class); /** the total compacting key values in currently running compaction */ - private long totalCompactingKVs; + public long totalCompactingKVs; /** the completed count of key values in currently running compaction */ public long currentCompactedKVs = 0; /** the total size of data processed by the currently running compaction, in bytes */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 93f0555b7f4d..94bfbbe0f7ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -25,12 +25,11 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -65,21 +64,23 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; /** * A compactor is a compaction algorithm associated a given policy. Base class also contains * reusable parts for implementing compactors (what is common and what isn't is evolving). + *

+ * Compactions might be concurrent against a given store and the Compactor is shared among + * them. Do not put mutable state into class fields. All Compactor class fields should be + * final or effectively final. + * 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. */ @InterfaceAudience.Private public abstract class Compactor { private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; - protected volatile CompactionProgress progress; protected final Configuration conf; protected final HStore store; - protected final int compactionKVMax; protected final Compression.Algorithm majorCompactionCompression; protected final Compression.Algorithm minorCompactionCompression; @@ -93,15 +94,17 @@ public abstract class Compactor { protected static final String MINOR_COMPACTION_DROP_CACHE = "hbase.regionserver.minorcompaction.pagecache.drop"; - private final boolean dropCacheMajor; - private final boolean dropCacheMinor; + protected final boolean dropCacheMajor; + protected final boolean dropCacheMinor; - // In compaction process only a single thread will access and write to this field, and - // getCompactionTargets is the only place we will access it other than the compaction thread, so - // make it volatile. - protected volatile T writer = null; + // We track writers per request using the CompactionRequestImpl identity as key. + // completeCompaction() cleans up this state. + protected final Map writerMap = + Collections.synchronizedMap(new IdentityHashMap<>()); + protected final Map progressMap = + Collections.synchronizedMap(new IdentityHashMap<>()); - //TODO: depending on Store is not good but, realistically, all compactors currently do. + // TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { this.conf = conf; this.store = store; @@ -117,17 +120,11 @@ public abstract class Compactor { this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); } - - protected interface CellSinkFactory { S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException; } - public CompactionProgress getProgress() { - return this.progress; - } - /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ protected static class FileDetails { /** Maximum key count after compaction (for blooms) */ @@ -328,7 +325,6 @@ protected final List compact(final CompactionRequestImpl request, InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); - this.progress = new CompactionProgress(fd.maxKeyCount); // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); @@ -344,6 +340,9 @@ protected final List compact(final CompactionRequestImpl request, boolean finished = false; List scanners = createFileScanners(request.getFiles(), smallestReadPoint, dropCache); + T writer = null; + CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); + progressMap.put(request, progress); try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); @@ -356,14 +355,10 @@ protected final List compact(final CompactionRequestImpl request, smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - if (writer != null){ - LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() - .map(n -> n.toString()) - .collect(Collectors.joining(", ", "{ ", " }"))); - } writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); - finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size()); + writerMap.put(request, writer); + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isAllFiles(), request.getFiles().size(), progress); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -381,34 +376,40 @@ protected final List compact(final CompactionRequestImpl request, } else { Closeables.close(scanner, true); } - if (!finished && writer != null) { - abortWriter(); + if (!finished) { + if (writer != null) { + abortWriter(writer); + } + // This signals that the target file is no longer written and can be cleaned up + completeCompaction(request); } } assert finished : "We should have exited the method on all error paths"; assert writer != null : "Writer should be non-null if no error"; - return commitWriter(fd, request); + return commitWriter(writer, fd, request); } - protected abstract List commitWriter(FileDetails fd, + protected abstract List commitWriter(T writer, FileDetails fd, CompactionRequestImpl request) throws IOException; - protected abstract void abortWriter() throws IOException; + protected abstract void abortWriter(T writer) throws IOException; /** * Performs the compaction. * @param fd FileDetails of cell sink writer * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { assert writer instanceof ShipperListener; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -550,22 +551,63 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, dropDeletesFromRow, dropDeletesToRow); } - public List getCompactionTargets() { - T writer = this.writer; - if (writer == null) { - return Collections.emptyList(); + /** + * Return the progress for a given compaction request. + * @param request the compaction request + */ + public CompactionProgress getProgress(CompactionRequestImpl request) { + return progressMap.get(request); + } + + /** + * Return the aggregate progress for all currently active compactions. + */ + public CompactionProgress getProgress() { + synchronized (progressMap) { + long totalCompactingKVs = 0; + long currentCompactedKVs = 0; + long totalCompactedSize = 0; + for (CompactionProgress progress: progressMap.values()) { + totalCompactingKVs += progress.totalCompactingKVs; + currentCompactedKVs += progress.currentCompactedKVs; + totalCompactedSize += progress.totalCompactedSize; + } + CompactionProgress result = new CompactionProgress(totalCompactingKVs); + result.currentCompactedKVs = currentCompactedKVs; + result.totalCompactedSize = totalCompactedSize; + return result; } - if (writer instanceof StoreFileWriter) { - return Arrays.asList(((StoreFileWriter) writer).getPath()); + } + + public boolean isCompacting() { + return !progressMap.isEmpty(); + } + + /** + * Return the list of target files for all currently active compactions. + */ + public List getCompactionTargets() { + // Build a list of all the compaction targets for all active writers + List targets = new ArrayList<>(); + synchronized (writerMap) { + for (T writer: writerMap.values()) { + if (writer instanceof StoreFileWriter) { + targets.add(((StoreFileWriter) writer).getPath()); + } else { + ((AbstractMultiFileWriter) writer).writers().stream() + .forEach(sfw -> targets.add(sfw.getPath())); + } + } } - return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath()) - .collect(Collectors.toList()); + return targets; } /** - * Reset the Writer when the new storefiles were successfully added + * Complete the compaction after the new storefiles are successfully added. */ - public void resetWriter(){ - writer = null; + public void completeCompaction(CompactionRequestImpl request) { + writerMap.remove(request); + progressMap.remove(request); } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index 43e037c5e702..2cfdf5b804ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -79,10 +79,11 @@ public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetai } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List pathList = writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); return pathList; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 03e3a1b5f394..a28e05b543bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -63,7 +63,7 @@ public List compact(final CompactionRequestImpl request, } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); @@ -72,12 +72,6 @@ protected List commitWriter(FileDetails fd, } @Override - protected void abortWriter() throws IOException { - abortWriter(writer); - // this step signals that the target file is no longer written and can be cleaned up - writer = null; - } - protected final void abortWriter(StoreFileWriter writer) throws IOException { Path leftoverFile = writer.getPath(); try { @@ -92,4 +86,5 @@ protected final void abortWriter(StoreFileWriter writer) throws IOException { leftoverFile, e); } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 060a11b41fe6..b65120d5466e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -125,10 +125,11 @@ public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails f } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StripeMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 813c288c61dc..8377a0c7e5ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -27,7 +27,6 @@ import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; @@ -88,9 +89,9 @@ public FaultyMobStoreCompactor(Configuration conf, HStore store) { } @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { totalCompactions.incrementAndGet(); if (major) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java index 6a0a8baa9ded..7863cd4e3160 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java @@ -128,13 +128,14 @@ public MyCompactor(Configuration conf, HStore store) { } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; Cell cell = writerImpl.getLastCell(); // The cell should be backend with an KeyOnlyKeyValue. IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); - return super.commitWriter(fd, request); + return super.commitWriter(writer, fd, request); } } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 85fdf0871f7f..9552142881ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -211,26 +209,9 @@ private void majorCompaction() throws Exception { Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100)); assertEquals(compactionThreshold, result.size()); - // see if CompactionProgress is in place but null - for (HStore store : r.getStores()) { - assertNull(store.getCompactionProgress()); - } - r.flush(true); r.compact(true); - // see if CompactionProgress has done its thing on at least one store - int storeCount = 0; - for (HStore store : r.getStores()) { - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null) { - ++storeCount; - assertTrue(progress.currentCompactedKVs > 0); - assertTrue(progress.getTotalCompactingKVs() > 0); - } - assertTrue(storeCount > 0); - } - // look at the second row // Increment the least significant character so we get to next row. byte[] secondRowBytes = START_KEY_BYTES.clone(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 652c019ff044..e20f6871b64d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -117,11 +116,9 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; @@ -620,17 +617,11 @@ private void clearReferences(HRegion region) throws IOException { assertEquals(1, region.getStores().size()); HStore store = region.getStores().get(0); while (store.hasReferences()) { - // Wait on any current compaction to complete first. - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null && progress.getProgressPct() < 1.0f) { - while (progress.getProgressPct() < 1.0f) { - LOG.info("Waiting, progress={}", progress.getProgressPct()); - Threads.sleep(1000); - } - } else { - // Run new compaction. Shoudn't be any others running. - region.compact(true); + while (store.storeEngine.getCompactor().isCompacting()) { + Threads.sleep(100); } + // Run new compaction. Shoudn't be any others running. + region.compact(true); store.closeAndArchiveCompactedFiles(); } }