Skip to content

Commit

Permalink
HBASE-27484 FNFE on StoreFileScanner after a flush followed by a comp…
Browse files Browse the repository at this point in the history
…action (#4882)

Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
wchevreuil committed Nov 25, 2022
1 parent 59f92a8 commit 8a76451
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -961,10 +961,10 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
} finally {
this.storeEngine.readUnlock();
}

try {
// First the store file scanners

Expand All @@ -981,6 +981,8 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
} catch (Throwable t) {
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -41,12 +42,14 @@
import java.util.List;
import java.util.ListIterator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -104,7 +107,9 @@
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
Expand Down Expand Up @@ -1114,6 +1119,68 @@ public void testRefreshStoreFilesNotChanged() throws IOException {
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
}

@Test
public void testScanWithCompactionAfterFlush() throws Exception {
TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
EverythingPolicy.class.getName());
init(name.getMethodName());

assertEquals(0, this.store.getStorefilesCount());

KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(1);
kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(2);
kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(3);

ExecutorService service = Executors.newFixedThreadPool(2);

Scan scan = new Scan(new Get(row));
Future<KeyValueScanner> scanFuture = service.submit(() -> {
try {
LOG.info(">>>> creating scanner");
return this.store.createScanner(scan,
new ScanInfo(HBaseConfiguration.create(),
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
Long.MAX_VALUE, 0, CellComparator.getInstance()),
scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
} catch (IOException e) {
e.printStackTrace();
return null;
}
});
Future compactFuture = service.submit(() -> {
try {
LOG.info(">>>>>> starting compaction");
Optional<CompactionContext> opCompaction = this.store.requestCompaction();
assertTrue(opCompaction.isPresent());
store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
LOG.info(">>>>>> Compaction is finished");
this.store.closeAndArchiveCompactedFiles();
LOG.info(">>>>>> Compacted files deleted");
} catch (IOException e) {
e.printStackTrace();
}
});

KeyValueScanner kvs = scanFuture.get();
compactFuture.get();
((StoreScanner) kvs).currentScanners.forEach(s -> {
if (s instanceof StoreFileScanner) {
assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
}
});
kvs.seek(kv);
service.shutdownNow();
}

private long countMemStoreScanner(StoreScanner scanner) {
if (scanner.currentScanners == null) {
return 0;
Expand Down

0 comments on commit 8a76451

Please sign in to comment.