From 8a76451a9b799761cb54163d3bfaaf0981a0b552 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Fri, 25 Nov 2022 21:58:23 +0000 Subject: [PATCH] HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882) Signed-off-by: Peter Somogyi --- .../hadoop/hbase/regionserver/HStore.java | 4 +- .../hadoop/hbase/regionserver/TestHStore.java | 67 +++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c698cfd8aba7..a41c8da16072 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -961,10 +961,10 @@ public List getScanners(boolean cacheBlocks, boolean usePread, storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, includeStartRow, stopRow, includeStopRow); memStoreScanners = this.memstore.getScanners(readPt); + storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet()); } finally { this.storeEngine.readUnlock(); } - try { // First the store file scanners @@ -981,6 +981,8 @@ public List getScanners(boolean cacheBlocks, boolean usePread, } catch (Throwable t) { clearAndClose(memStoreScanners); throw t instanceof IOException ? (IOException) t : new IOException(t); + } finally { + storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 4fd572949ad9..a052520e54d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -41,12 +42,14 @@ import java.util.List; import java.util.ListIterator; import java.util.NavigableSet; +import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -104,7 +107,9 @@ import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @@ -1114,6 +1119,68 @@ public void testRefreshStoreFilesNotChanged() throws IOException { verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any()); } + @Test + public void testScanWithCompactionAfterFlush() throws Exception { + TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, + EverythingPolicy.class.getName()); + init(name.getMethodName()); + + assertEquals(0, this.store.getStorefilesCount()); + + KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null); + // add some data, flush + this.store.add(kv, null); + flush(1); + kv = new KeyValue(row, family, qf2, 1, (byte[]) null); + // add some data, flush + this.store.add(kv, null); + flush(2); + kv = new KeyValue(row, family, qf3, 1, (byte[]) null); + // add some data, flush + this.store.add(kv, null); + flush(3); + + ExecutorService service = Executors.newFixedThreadPool(2); + + Scan scan = new Scan(new Get(row)); + Future scanFuture = service.submit(() -> { + try { + LOG.info(">>>> creating scanner"); + return this.store.createScanner(scan, + new ScanInfo(HBaseConfiguration.create(), + ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(), + Long.MAX_VALUE, 0, CellComparator.getInstance()), + scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + }); + Future compactFuture = service.submit(() -> { + try { + LOG.info(">>>>>> starting compaction"); + Optional opCompaction = this.store.requestCompaction(); + assertTrue(opCompaction.isPresent()); + store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent()); + LOG.info(">>>>>> Compaction is finished"); + this.store.closeAndArchiveCompactedFiles(); + LOG.info(">>>>>> Compacted files deleted"); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + KeyValueScanner kvs = scanFuture.get(); + compactFuture.get(); + ((StoreScanner) kvs).currentScanners.forEach(s -> { + if (s instanceof StoreFileScanner) { + assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount()); + } + }); + kvs.seek(kv); + service.shutdownNow(); + } + private long countMemStoreScanner(StoreScanner scanner) { if (scanner.currentScanners == null) { return 0;