Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#11379] Add AsyncPollingPutWriter #11380

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,33 @@ hbase.client.properties.hbase.client.retries.number=4
hbase.client.put-writer.concurrency-limit=100000
hbase.client.span-put-writer.concurrency-limit=0

# asyncBufferedMutator, asyncTable
hbase.client.put-writer=asyncBufferedMutator
# asyncBufferedMutator, asyncTable, asyncPoller
hbase.client.put-writer=asyncPoller

hbase.client.put-writer.async-buffered-mutator.writeBufferSize=100
hbase.client.put-writer.async-buffered-mutator.writeBufferPeriodicFlush=100

## asyncPoller
# parallelism=0 : auto detect cpu core
hbase.client.put-writer.async-poller.span.parallelism=0
# 1: all core, n: 1/n core (only work if parallelism=0)
hbase.client.put-writer.async-poller.span.cpuRatio=1
# Minimum CPU core (only work if parallelism=0)
hbase.client.put-writer.async-poller.span.minCpuCore=4

hbase.client.put-writer.async-poller.span.queueSize=10000
hbase.client.put-writer.async-poller.span.writeBufferSize=100
hbase.client.put-writer.async-poller.span.writeBufferPeriodicFlush=100

# parallelism=0 : auto detect cpu core
hbase.client.put-writer.async-poller.default.parallelism=0
# n: 1/n core, 1: all core, 4: 1/4 core (only work if parallelism=0)
hbase.client.put-writer.async-poller.default.cpuRatio=4
# Minimum CPU core (only work if parallelism=0)
hbase.client.put-writer.async-poller.default.minCpuCore=2
hbase.client.put-writer.async-poller.default.queueSize=5000
hbase.client.put-writer.async-poller.default.writeBufferSize=100
hbase.client.put-writer.async-poller.default.writeBufferPeriodicFlush=100

# hbase async =================================================================
# enable hbase async operation. default: false
Expand Down
4 changes: 4 additions & 0 deletions commons-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<artifactId>pinpoint-commons-profiler</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.util.CpuUtils;

public class AsyncPollerOption {

private int queueSize = 1000 * 100;

private int writeBufferSize = 100;
private int writeBufferPeriodicFlush = 100;
private int parallelism = 0;
private int cpuRatio = 1;
private int minCpuCore = 2;


public int getQueueSize() {
return queueSize;
}

public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}

public int getWriteBufferSize() {
return writeBufferSize;
}

public void setWriteBufferSize(int writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

public int getWriteBufferPeriodicFlush() {
return writeBufferPeriodicFlush;
}

public void setWriteBufferPeriodicFlush(int writeBufferPeriodicFlush) {
this.writeBufferPeriodicFlush = writeBufferPeriodicFlush;
}

public int getParallelism() {
return cpu(parallelism, cpuRatio, minCpuCore);
}

int cpu(int parallelism, int cpuRatio, int minCpu) {
if (parallelism <= 0) {
final int cpuCount = getCpuCount();
int cpu = Math.floorDiv(cpuCount, cpuRatio);
return Math.max(cpu, minCpu);
}
return parallelism;
}

int getCpuCount() {
return CpuUtils.cpuCount();
}

public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}

public int getCpuRatio() {
return cpuRatio;

Check warning on line 62 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java#L62

Added line #L62 was not covered by tests
}

public void setCpuRatio(int cpuRatio) {
this.cpuRatio = cpuRatio;
}

public int getMinCpuCore() {
return minCpuCore;

Check warning on line 70 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java#L70

Added line #L70 was not covered by tests
}

public void setMinCpuCore(int minCpuCore) {
this.minCpuCore = minCpuCore;
}

@Override
public String toString() {
return "AsyncPollerOption{" +

Check warning on line 79 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerOption.java#L79

Added line #L79 was not covered by tests
"queueSize=" + queueSize +
", writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
", parallelism=" + parallelism +
", cpuRatio=" + cpuRatio +
", minCpuCore=" + minCpuCore +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.hbase.RequestNotPermittedException;
import com.navercorp.pinpoint.common.hbase.util.FutureUtils;
import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class AsyncPollerThread implements Closeable {

private final Logger logger = LogManager.getLogger(this.getClass());
private final ThrottledLogger tLogger = ThrottledLogger.getLogger(logger, 100);

private final TableWriterFactory writerFactory;

private final BlockingQueue<WriteRequest> queue;
private final int queueSize;
private final int writeBufferSize;
private final int writeBufferPeriodicFlush;
private final int pollTimeout;

private final Thread thread;
private final AtomicBoolean runState = new AtomicBoolean(true);

public static final RequestNotPermittedException OVERFLOW = new RequestNotPermittedException("write queue is full", false);

public AsyncPollerThread(String id, TableWriterFactory writerFactory,
AsyncPollerOption option) {
this.writerFactory = Objects.requireNonNull(writerFactory, "writerFactory");

this.queueSize = option.getQueueSize();
this.queue = new ArrayBlockingQueue<>(queueSize);

this.writeBufferSize = option.getWriteBufferSize();
this.writeBufferPeriodicFlush = option.getWriteBufferPeriodicFlush();
this.pollTimeout = Math.max(writeBufferPeriodicFlush / 4, 20);

this.thread = new Thread(this::dispatch, id);
this.thread.setDaemon(true);
this.thread.start();
}

public List<CompletableFuture<Void>> write(TableName tableName, List<Put> puts) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(puts, "puts");
if (isShutdown()) {
return FutureUtils.newFutureList(() -> CompletableFuture.failedFuture(new IllegalStateException("closed")), puts.size());

Check warning on line 61 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L61

Added line #L61 was not covered by tests
}

WriteRequest writeRequest = new WriteRequest(tableName, puts);
if (this.queue.offer(writeRequest)) {
return writeRequest.getFutures();
}
tLogger.info("write queue overflow");
return FutureUtils.newFutureList(() -> CompletableFuture.failedFuture(OVERFLOW), puts.size());

Check warning on line 69 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L68-L69

Added lines #L68 - L69 were not covered by tests
}


public void dispatch() {
while (isRun()) {
try {
List<WriteRequest> requests = poll();
if (requests == null) {
break;
}
Map<TableName, List<WriteRequest>> map = tableGroup(requests);
for (Map.Entry<TableName, List<WriteRequest>> entry : map.entrySet()) {
TableName tableName = entry.getKey();
List<WriteRequest> writes = entry.getValue();

if (logger.isDebugEnabled()) {
logger.debug("write {} {} requests:{}", this.thread.getName(), tableName, writes.size());
}
List<Put> puts = getPuts(writes);

AsyncTableWriterFactory.Writer writer = this.writerFactory.writer(tableName);
List<CompletableFuture<Void>> hbaseResults = writer.put(puts);
addListeners(hbaseResults, writes);
}
} catch (Throwable th) {
logger.warn("Dispatch Error {}", this.thread.getName(), th);

Check warning on line 95 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L94-L95

Added lines #L94 - L95 were not covered by tests
if (isShutdown()) {
break;

Check warning on line 97 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L97

Added line #L97 was not covered by tests
}
}
}
logger.info("dispatch terminated {}", this.thread.getName());
}

private boolean isRun() {
return runState.get();
}

private boolean isShutdown() {
return !isRun();
}

private Map<TableName, List<WriteRequest>> tableGroup(List<WriteRequest> requests) {
Map<TableName, List<WriteRequest>> map = new HashMap<>();
for (WriteRequest req : requests) {
TableName tableName = req.getTableName();
List<WriteRequest> puts = map.computeIfAbsent(tableName, (key) -> new ArrayList<>());
puts.add(req);
}
return map;
}

@Override
public void close() {
logger.debug("Close {}", this.thread.getName());
this.runState.set(false);
this.thread.interrupt();
try {
this.thread.join(3000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();

Check warning on line 130 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L129-L130

Added lines #L129 - L130 were not covered by tests
}
}

private List<Put> getPuts(List<WriteRequest> writes) {
List<Put> puts = new ArrayList<>();
for (WriteRequest write : writes) {
puts.addAll(write.getPuts());
}
return puts;
}

private static void addListeners(List<CompletableFuture<Void>> hbaseResults, List<WriteRequest> requests) {
int i = 0;
for (WriteRequest writeRequest : requests) {
for (CompletableFuture<Void> write : writeRequest.getFutures()) {
CompletableFuture<Void> hbaseFuture = hbaseResults.get(i++);
FutureUtils.addListener(hbaseFuture, write);
}
}
}

private List<WriteRequest> poll() {
final long startTime = System.currentTimeMillis();

List<WriteRequest> drain = new ArrayList<>(writeBufferSize);
int drainSize = 0;
while (isRun()) {
WriteRequest request = null;
try {
request = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("Thread.interrupted {}", this.thread.getName());
if (isShutdown()) {
return null;
}
}
if (request != null) {
drain.add(request);
drainSize += request.size();
if (bufferOverflow(drainSize)) {
return drain;

Check warning on line 172 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L172

Added line #L172 was not covered by tests
}
}
if (drainSize > 0) {
if (timeout(startTime)) {
return drain;
}
}
}
return null;

Check warning on line 181 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L181

Added line #L181 was not covered by tests
}

private boolean timeout(long startTime) {
return System.currentTimeMillis() - startTime > writeBufferPeriodicFlush;
}

private boolean bufferOverflow(int drainSize) {
return drainSize >= writeBufferSize;
}

@Override
public String toString() {
return "AsyncPollerThread{" +

Check warning on line 194 in commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java

View check run for this annotation

Codecov / codecov/patch

commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/async/AsyncPollerThread.java#L194

Added line #L194 was not covered by tests
", queueSize=" + queueSize +
", writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
", pollTimeout=" + pollTimeout +
", thread=" + thread +
'}';
}
}
Loading