diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 0ee7b5777fc3..1555bbe03c95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2404,7 +2404,6 @@ public void replayFlush(List fileNames, boolean dropMemstoreSnapshot) long snapshotId = -1; // -1 means do not drop if (dropMemstoreSnapshot && snapshot != null) { snapshotId = snapshot.getId(); - snapshot.close(); } HStore.this.updateStorefiles(storeFiles, snapshotId); } @@ -2415,10 +2414,6 @@ public void replayFlush(List fileNames, boolean dropMemstoreSnapshot) @Override public void abort() throws IOException { if (snapshot != null) { - //We need to close the snapshot when aborting, otherwise, the segment scanner - //won't be closed. If we are using MSLAB, the chunk referenced by those scanners - //can't be released, thus memory leak - snapshot.close(); HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 8c426bc655ac..972684f59b3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -85,6 +85,10 @@ public String toString() { return res; } + /** + * We create a new {@link SnapshotSegmentScanner} to increase the reference count of + * {@link MemStoreLABImpl} used by this segment. + */ List getSnapshotScanners() { return Collections.singletonList(new SnapshotSegmentScanner(this)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 3b3482898cd4..07eb64dee642 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -17,31 +17,38 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.List; import org.apache.yetus.audience.InterfaceAudience; -import java.io.Closeable; -import java.util.List; + /** - * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier, - * count of cells in it and total memory size occupied by all the cells, timestamp information of - * all the cells and a scanner to read all cells in it. + * {@link MemStoreSnapshot} is a Context Object to hold details of the snapshot taken on a MemStore. + * Details include the snapshot's identifier, count of cells in it and total memory size occupied by + * all the cells, timestamp information of all the cells and the snapshot immutableSegment. + *

+ * NOTE:Every time when {@link MemStoreSnapshot#getScanners} is called, we create new + * {@link SnapshotSegmentScanner}s on the {@link MemStoreSnapshot#snapshotImmutableSegment},and + * {@link Segment#incScannerCount} is invoked in the {@link SnapshotSegmentScanner} ctor to increase + * the reference count of {@link MemStoreLAB} which used by + * {@link MemStoreSnapshot#snapshotImmutableSegment}, so after we finish using these scanners, we + * must call their close method to invoke {@link Segment#decScannerCount}. */ @InterfaceAudience.Private -public class MemStoreSnapshot implements Closeable { +public class MemStoreSnapshot { private final long id; private final int cellsCount; private final MemStoreSize memStoreSize; private final TimeRangeTracker timeRangeTracker; - private final List scanners; private final boolean tagsPresent; + private final ImmutableSegment snapshotImmutableSegment; public MemStoreSnapshot(long id, ImmutableSegment snapshot) { this.id = id; this.cellsCount = snapshot.getCellsCount(); this.memStoreSize = snapshot.getMemStoreSize(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanners = snapshot.getSnapshotScanners(); this.tagsPresent = snapshot.isTagsPresent(); + this.snapshotImmutableSegment = snapshot; } /** @@ -74,10 +81,16 @@ public TimeRangeTracker getTimeRangeTracker() { } /** - * @return {@link KeyValueScanner} for iterating over the snapshot + * Create new {@link SnapshotSegmentScanner}s for iterating over the snapshot.
+ * NOTE:Here when create new {@link SnapshotSegmentScanner}s, {@link Segment#incScannerCount} is + * invoked in the {@link SnapshotSegmentScanner} ctor,so after we use these + * {@link SnapshotSegmentScanner}s, we must call {@link SnapshotSegmentScanner#close} to invoke + * {@link Segment#decScannerCount}. + * @return {@link KeyValueScanner}s(Which type is {@link SnapshotSegmentScanner}) for iterating + * over the snapshot. */ public List getScanners() { - return scanners; + return snapshotImmutableSegment.getSnapshotScanners(); } /** @@ -86,13 +99,4 @@ public List getScanners() { public boolean isTagsPresent() { return this.tagsPresent; } - - @Override - public void close() { - if (this.scanners != null) { - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 4e373a823c56..24c3ccd343a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -783,11 +784,12 @@ private void injectFault() throws IOException { } } - private static void flushStore(HStore store, long id) throws IOException { + private static StoreFlushContext flushStore(HStore store, long id) throws IOException { StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + return storeFlushCtx; } /** @@ -2222,7 +2224,7 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception { flushThread.join(); if (myDefaultMemStore.shouldWait) { - SegmentScanner segmentScanner = getSegmentScanner(storeScanner); + SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class); MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB()); assertTrue(memStoreLAB.isClosed()); assertTrue(!memStoreLAB.chunks.isEmpty()); @@ -2249,16 +2251,16 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception { } } - private SegmentScanner getSegmentScanner(StoreScanner storeScanner) { - List segmentScanners = new ArrayList(); + @SuppressWarnings("unchecked") + private T getTypeKeyValueScanner(StoreScanner storeScanner, Class keyValueScannerClass) { + List resultScanners = new ArrayList(); for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) { - if (keyValueScanner instanceof SegmentScanner) { - segmentScanners.add((SegmentScanner) keyValueScanner); + if (keyValueScannerClass.isInstance(keyValueScanner)) { + resultScanners.add((T) keyValueScanner); } } - - assertTrue(segmentScanners.size() == 1); - return segmentScanners.get(0); + assertTrue(resultScanners.size() == 1); + return resultScanners.get(0); } @Test @@ -2310,6 +2312,116 @@ public CustomDefaultMemStore(Configuration conf, CellComparator c, } + /** + * This test is for HBASE-26488 + */ + @Test + public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception { + + Configuration conf = HBaseConfiguration.create(); + + byte[] smallValue = new byte[3]; + byte[] largeValue = new byte[9]; + final long timestamp = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); + final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + + conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName()); + conf.setBoolean(WALFactory.WAL_ENABLED, false); + conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, + MyDefaultStoreFlusher.class.getName()); + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build()); + MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore); + assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher); + + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + store.add(smallCell, memStoreSizing); + store.add(largeCell, memStoreSizing); + flushStore(store, id++); + + MemStoreLABImpl memStoreLAB = + (MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB()); + assertTrue(memStoreLAB.isClosed()); + assertTrue(memStoreLAB.getOpenScannerCount() == 0); + assertTrue(memStoreLAB.isReclaimed()); + assertTrue(memStoreLAB.chunks.isEmpty()); + StoreScanner storeScanner = null; + try { + storeScanner = + (StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1); + assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1); + assertTrue(store.memstore.size().getCellsCount() == 0); + assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0); + assertTrue(storeScanner.currentScanners.size() == 1); + assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner); + + List results = new ArrayList<>(); + storeScanner.next(results); + assertEquals(2, results.size()); + CellUtil.equals(smallCell, results.get(0)); + CellUtil.equals(largeCell, results.get(1)); + } finally { + if (storeScanner != null) { + storeScanner.close(); + } + } + } + + + static class MyDefaultMemStore1 extends DefaultMemStore { + + private ImmutableSegment snapshotImmutableSegment; + + public MyDefaultMemStore1(Configuration conf, CellComparator c, + RegionServicesForStores regionServices) { + super(conf, c, regionServices); + } + + @Override + public MemStoreSnapshot snapshot() { + MemStoreSnapshot result = super.snapshot(); + this.snapshotImmutableSegment = snapshot; + return result; + } + + } + + public static class MyDefaultStoreFlusher extends DefaultStoreFlusher { + private static final AtomicInteger failCounter = new AtomicInteger(1); + private static final AtomicInteger counter = new AtomicInteger(0); + + public MyDefaultStoreFlusher(Configuration conf, HStore store) { + super(conf, store); + } + + @Override + public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker) throws IOException { + counter.incrementAndGet(); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); + } + + @Override + protected void performFlush(InternalScanner scanner, final CellSink sink, + ThroughputController throughputController) throws IOException { + + final int currentCount = counter.get(); + CellSink newCellSink = (cell) -> { + if (currentCount <= failCounter.get()) { + throw new IOException("Simulated exception by tests"); + } + sink.append(cell); + }; + super.performFlush(scanner, newCellSink, throughputController); + } + } + private HStoreFile mockStoreFileWithLength(long length) { HStoreFile sf = mock(HStoreFile.class); StoreFileReader sfr = mock(StoreFileReader.class); @@ -3093,7 +3205,5 @@ protected void doClearSnapShot() { } } } - - } }