Skip to content

Commit

Permalink
HBASE-26765 Minor refactor of async scanning code (#4121)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
ndimiduk committed Feb 24, 2022
1 parent d13cc38 commit a4036a6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName
public AsyncTable<ScanResultConsumer> build() {
RawAsyncTableImpl rawTable =
new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
return new AsyncTableImpl(rawTable, pool);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@
@InterfaceAudience.Private
class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {

private final AsyncTable<AdvancedScanResultConsumer> rawTable;
private final RawAsyncTableImpl rawTable;

private final ExecutorService pool;

AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable,
ExecutorService pool) {
AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
this.rawTable = rawTable;
this.pool = pool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,24 @@
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
* in background and cache it in memory. Typically the {@link #maxCacheSize} will be
* The {@link ResultScanner} implementation for {@link RawAsyncTableImpl}. It will fetch data
* automatically in background and cache it in memory. Typically, the {@link #maxCacheSize} will be
* {@code 2 * scan.getMaxResultSize()}.
*/
@InterfaceAudience.Private
class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer {

private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class);

private final AsyncTable<AdvancedScanResultConsumer> rawTable;
private final TableName tableName;

private final long maxCacheSize;

Expand All @@ -57,12 +58,10 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum

private ScanResumer resumer;

public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan,
long maxCacheSize) {
this.rawTable = table;
public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
this.tableName = tableName;
this.maxCacheSize = maxCacheSize;
this.scan = scan;
table.scan(scan, this);
}

private void addToCache(Result result) {
Expand All @@ -72,9 +71,10 @@ private void addToCache(Result result) {

private void stopPrefetch(ScanController controller) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
" stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
LOG.debug("{} stop prefetching when scanning {} as the cache size {}" +
" is greater than the maxCacheSize {}",
String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize,
maxCacheSize);
}
resumer = controller.suspend();
}
Expand Down Expand Up @@ -138,7 +138,7 @@ public synchronized Result next() throws IOException {
return null;
}
if (error != null) {
FutureUtils.rethrow(error);
throw FutureUtils.rethrow(error);
}
try {
wait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,14 @@ private long resultSize2CacheSize(long maxResultSize) {
}

@Override
public ResultScanner getScanner(Scan scan) {
return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
resultSize2CacheSize(
scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
public AsyncTableResultScanner getScanner(Scan scan) {
final long maxCacheSize = resultSize2CacheSize(
scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
final AsyncTableResultScanner scanner =
new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
scan(scan, scanner);
return scanner;
}

@Override
Expand Down

0 comments on commit a4036a6

Please sign in to comment.