diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 15f0a73a9df9..ff658adb1502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; @@ -61,7 +63,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -285,17 +286,19 @@ private void calculateMobLengthMap(SetMultimap mobRefs) throw * * @param fd File details * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param throughputController The compaction throughput controller. * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Clear old mob references @@ -661,9 +664,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId } } - @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 730182866300..4c178c73dfcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -156,8 +156,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, // rows that has cells from both memstore and files (or only files) private LongAdder mixedRowReadsCount = new LongAdder(); - private boolean cacheOnWriteLogged; - /** * Lock specific to archiving compacted store files. This avoids races around * the combination of retrieving the list of compacted files and moving them to @@ -290,7 +288,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); - cacheOnWriteLogged = false; } private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { @@ -1157,23 +1154,28 @@ protected List doCompaction(CompactionRequestImpl cr, } replaceStoreFiles(filesToCompact, sfs, true); - // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the - // CleanerChore know that compaction is done and the file can be cleaned up if compaction - // have failed. - storeEngine.resetCompactionWriter(); - - if (cr.isMajor()) { - majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); - } else { - compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); + // Compaction progress for the request will be removed after completeCompaction so be sure + // this code runs before you call completeCompaction. + CompactionProgress progress = storeEngine.getCompactor().getProgress(cr); + if (progress != null) { + if (cr.isMajor()) { + majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); + } else { + compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + compactedCellsSize.addAndGet(progress.totalCompactedSize); + } } long outputBytes = getTotalSize(sfs); // At this point the store will use new files for all new scanners. refreshStoreSizeAndTotalBytes(); // update store size. + // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the + // CleanerChore know that compaction is done and the file can be cleaned up if compaction + // has failed. + storeEngine.completeCompaction(cr); + long now = EnvironmentEdgeManager.currentTime(); if (region.getRegionServerServices() != null && region.getRegionServerServices().getMetrics() != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index d85553ac8082..b534b7075012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -94,7 +94,7 @@ */ @InterfaceAudience.Private public abstract class StoreEngine { + C extends Compactor, SFM extends StoreFileManager> { private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); @@ -157,7 +157,7 @@ public CompactionPolicy getCompactionPolicy() { /** * @return Compactor to use. */ - public Compactor getCompactor() { + public Compactor getCompactor() { return this.compactor; } @@ -545,14 +545,15 @@ public boolean requireWritingToTmpDirFirst() { } /** - * Resets the compaction writer when the new file is committed and used as active storefile. - * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the - * CleanerChore know that compaction is done and the file can be cleaned up if compaction - * have failed. Currently called in + * Completes the compaction, cleaning up resources, once the new file is committed and used as + * active storefile. This step is necessary for the correctness of BrokenStoreFileCleanerChore. + * It lets the CleanerChore know that compaction is done and the file can be cleaned up if + * compaction has failed. Currently called in + * @param request the compaction request * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List) */ - public void resetCompactionWriter(){ - compactor.resetWriter(); + public void completeCompaction(CompactionRequestImpl request) { + compactor.completeCompaction(request); } @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index 19b7a98627e6..b042a158b73c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -68,7 +68,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) } @Override - protected void abortWriter() throws IOException { + protected void abortWriter(AbstractMultiFileWriter writer) throws IOException { FileSystem fs = store.getFileSystem(); for (Path leftoverFile : writer.abortWriters()) { try { @@ -79,7 +79,6 @@ protected void abortWriter() throws IOException { e); } } - //this step signals that the target file is no longer writen and can be cleaned up - writer = null; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 942cc4f3fd6b..2ccdd150cd21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -37,7 +37,7 @@ public class CompactionProgress { private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class); /** the total compacting key values in currently running compaction */ - private long totalCompactingKVs; + public long totalCompactingKVs; /** the completed count of key values in currently running compaction */ public long currentCompactedKVs = 0; /** the total size of data processed by the currently running compaction, in bytes */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 93f0555b7f4d..94bfbbe0f7ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -25,12 +25,11 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -65,21 +64,23 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; /** * A compactor is a compaction algorithm associated a given policy. Base class also contains * reusable parts for implementing compactors (what is common and what isn't is evolving). + *

+ * Compactions might be concurrent against a given store and the Compactor is shared among + * them. Do not put mutable state into class fields. All Compactor class fields should be + * final or effectively final. + * 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. */ @InterfaceAudience.Private public abstract class Compactor { private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; - protected volatile CompactionProgress progress; protected final Configuration conf; protected final HStore store; - protected final int compactionKVMax; protected final Compression.Algorithm majorCompactionCompression; protected final Compression.Algorithm minorCompactionCompression; @@ -93,15 +94,17 @@ public abstract class Compactor { protected static final String MINOR_COMPACTION_DROP_CACHE = "hbase.regionserver.minorcompaction.pagecache.drop"; - private final boolean dropCacheMajor; - private final boolean dropCacheMinor; + protected final boolean dropCacheMajor; + protected final boolean dropCacheMinor; - // In compaction process only a single thread will access and write to this field, and - // getCompactionTargets is the only place we will access it other than the compaction thread, so - // make it volatile. - protected volatile T writer = null; + // We track writers per request using the CompactionRequestImpl identity as key. + // completeCompaction() cleans up this state. + protected final Map writerMap = + Collections.synchronizedMap(new IdentityHashMap<>()); + protected final Map progressMap = + Collections.synchronizedMap(new IdentityHashMap<>()); - //TODO: depending on Store is not good but, realistically, all compactors currently do. + // TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { this.conf = conf; this.store = store; @@ -117,17 +120,11 @@ public abstract class Compactor { this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); } - - protected interface CellSinkFactory { S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException; } - public CompactionProgress getProgress() { - return this.progress; - } - /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ protected static class FileDetails { /** Maximum key count after compaction (for blooms) */ @@ -328,7 +325,6 @@ protected final List compact(final CompactionRequestImpl request, InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); - this.progress = new CompactionProgress(fd.maxKeyCount); // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); @@ -344,6 +340,9 @@ protected final List compact(final CompactionRequestImpl request, boolean finished = false; List scanners = createFileScanners(request.getFiles(), smallestReadPoint, dropCache); + T writer = null; + CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); + progressMap.put(request, progress); try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); @@ -356,14 +355,10 @@ protected final List compact(final CompactionRequestImpl request, smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - if (writer != null){ - LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() - .map(n -> n.toString()) - .collect(Collectors.joining(", ", "{ ", " }"))); - } writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); - finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size()); + writerMap.put(request, writer); + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isAllFiles(), request.getFiles().size(), progress); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -381,34 +376,40 @@ protected final List compact(final CompactionRequestImpl request, } else { Closeables.close(scanner, true); } - if (!finished && writer != null) { - abortWriter(); + if (!finished) { + if (writer != null) { + abortWriter(writer); + } + // This signals that the target file is no longer written and can be cleaned up + completeCompaction(request); } } assert finished : "We should have exited the method on all error paths"; assert writer != null : "Writer should be non-null if no error"; - return commitWriter(fd, request); + return commitWriter(writer, fd, request); } - protected abstract List commitWriter(FileDetails fd, + protected abstract List commitWriter(T writer, FileDetails fd, CompactionRequestImpl request) throws IOException; - protected abstract void abortWriter() throws IOException; + protected abstract void abortWriter(T writer) throws IOException; /** * Performs the compaction. * @param fd FileDetails of cell sink writer * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { assert writer instanceof ShipperListener; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -550,22 +551,63 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, dropDeletesFromRow, dropDeletesToRow); } - public List getCompactionTargets() { - T writer = this.writer; - if (writer == null) { - return Collections.emptyList(); + /** + * Return the progress for a given compaction request. + * @param request the compaction request + */ + public CompactionProgress getProgress(CompactionRequestImpl request) { + return progressMap.get(request); + } + + /** + * Return the aggregate progress for all currently active compactions. + */ + public CompactionProgress getProgress() { + synchronized (progressMap) { + long totalCompactingKVs = 0; + long currentCompactedKVs = 0; + long totalCompactedSize = 0; + for (CompactionProgress progress: progressMap.values()) { + totalCompactingKVs += progress.totalCompactingKVs; + currentCompactedKVs += progress.currentCompactedKVs; + totalCompactedSize += progress.totalCompactedSize; + } + CompactionProgress result = new CompactionProgress(totalCompactingKVs); + result.currentCompactedKVs = currentCompactedKVs; + result.totalCompactedSize = totalCompactedSize; + return result; } - if (writer instanceof StoreFileWriter) { - return Arrays.asList(((StoreFileWriter) writer).getPath()); + } + + public boolean isCompacting() { + return !progressMap.isEmpty(); + } + + /** + * Return the list of target files for all currently active compactions. + */ + public List getCompactionTargets() { + // Build a list of all the compaction targets for all active writers + List targets = new ArrayList<>(); + synchronized (writerMap) { + for (T writer: writerMap.values()) { + if (writer instanceof StoreFileWriter) { + targets.add(((StoreFileWriter) writer).getPath()); + } else { + ((AbstractMultiFileWriter) writer).writers().stream() + .forEach(sfw -> targets.add(sfw.getPath())); + } + } } - return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath()) - .collect(Collectors.toList()); + return targets; } /** - * Reset the Writer when the new storefiles were successfully added + * Complete the compaction after the new storefiles are successfully added. */ - public void resetWriter(){ - writer = null; + public void completeCompaction(CompactionRequestImpl request) { + writerMap.remove(request); + progressMap.remove(request); } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index 43e037c5e702..2cfdf5b804ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -79,10 +79,11 @@ public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetai } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List pathList = writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); return pathList; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 03e3a1b5f394..a28e05b543bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -63,7 +63,7 @@ public List compact(final CompactionRequestImpl request, } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); @@ -72,12 +72,6 @@ protected List commitWriter(FileDetails fd, } @Override - protected void abortWriter() throws IOException { - abortWriter(writer); - // this step signals that the target file is no longer written and can be cleaned up - writer = null; - } - protected final void abortWriter(StoreFileWriter writer) throws IOException { Path leftoverFile = writer.getPath(); try { @@ -92,4 +86,5 @@ protected final void abortWriter(StoreFileWriter writer) throws IOException { leftoverFile, e); } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 060a11b41fe6..b65120d5466e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -125,10 +125,11 @@ public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails f } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StripeMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 813c288c61dc..8377a0c7e5ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -27,7 +27,6 @@ import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; @@ -88,9 +89,9 @@ public FaultyMobStoreCompactor(Configuration conf, HStore store) { } @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { totalCompactions.incrementAndGet(); if (major) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java index 6a0a8baa9ded..7863cd4e3160 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java @@ -128,13 +128,14 @@ public MyCompactor(Configuration conf, HStore store) { } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; Cell cell = writerImpl.getLastCell(); // The cell should be backend with an KeyOnlyKeyValue. IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); - return super.commitWriter(fd, request); + return super.commitWriter(writer, fd, request); } } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 85fdf0871f7f..9552142881ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -211,26 +209,9 @@ private void majorCompaction() throws Exception { Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100)); assertEquals(compactionThreshold, result.size()); - // see if CompactionProgress is in place but null - for (HStore store : r.getStores()) { - assertNull(store.getCompactionProgress()); - } - r.flush(true); r.compact(true); - // see if CompactionProgress has done its thing on at least one store - int storeCount = 0; - for (HStore store : r.getStores()) { - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null) { - ++storeCount; - assertTrue(progress.currentCompactedKVs > 0); - assertTrue(progress.getTotalCompactingKVs() > 0); - } - assertTrue(storeCount > 0); - } - // look at the second row // Increment the least significant character so we get to next row. byte[] secondRowBytes = START_KEY_BYTES.clone(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 652c019ff044..e20f6871b64d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -117,11 +116,9 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; @@ -620,17 +617,11 @@ private void clearReferences(HRegion region) throws IOException { assertEquals(1, region.getStores().size()); HStore store = region.getStores().get(0); while (store.hasReferences()) { - // Wait on any current compaction to complete first. - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null && progress.getProgressPct() < 1.0f) { - while (progress.getProgressPct() < 1.0f) { - LOG.info("Waiting, progress={}", progress.getProgressPct()); - Threads.sleep(1000); - } - } else { - // Run new compaction. Shoudn't be any others running. - region.compact(true); + while (store.storeEngine.getCompactor().isCompacting()) { + Threads.sleep(100); } + // Run new compaction. Shoudn't be any others running. + region.compact(true); store.closeAndArchiveCompactedFiles(); } }