Skip to content

Commit

Permalink
support outstanding request
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Jul 23, 2024
1 parent 50a7318 commit a621afe
Showing 1 changed file with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.streamnative.oxia.client.api.*;
import io.streamnative.oxia.client.api.exceptions.OxiaException;
import io.streamnative.oxia.client.metrics.Unit;
Expand Down Expand Up @@ -62,8 +63,11 @@ public final class Worker implements Runnable, Closeable, Operations {

private volatile CompletableFuture<Void> closeFuture;

private final Semaphore outstandingSemaphore;

/* Otl section */
private final LongCounter operationCounter;
private final ObservableLongGauge outstandingRequestGauge;
private final Attributes operationWriteSuccessAttributes;
private final Attributes operationWriteFailedAttributes;
private final Attributes operationReadSuccessAttributes;
Expand Down Expand Up @@ -91,6 +95,7 @@ public Worker(WorkerOptions options, OpenTelemetry openTelemetry) {
}
final GeneratorType generatorType = GeneratorType.fromString(options.keyDistribution);

this.outstandingSemaphore = new Semaphore(options.maxOutstandingRequests);
this.keyGenerator =
Generators.createKeyGenerator(
new KeyGeneratorOptions(
Expand Down Expand Up @@ -125,6 +130,21 @@ public Worker(WorkerOptions options, OpenTelemetry openTelemetry) {
.setDescription("oxia perf operation counter")
.setUnit(Unit.Requests.toString())
.build();
this.outstandingRequestGauge =
meter
.gaugeBuilder("oxia.operf.ycsb.op.outstanding")
.setDescription("oxia outstanding request")
.setUnit(Unit.Requests.toString())
.ofLongs()
.buildWithCallback(
(ob) -> {
ob.record(
options.maxOutstandingRequests - outstandingSemaphore.availablePermits(),
Attributes.builder()
.put("value.size", options.valueSize)
.put("worker", options.workerName)
.build());
});
this.operationLatency =
meter
.histogramBuilder("oxia.perf.ycsb.op.second")
Expand Down Expand Up @@ -174,7 +194,6 @@ public void run() {

final RateLimiter operationRatelimiter = RateLimiter.create(options.requestsRate);
final int maxOutstandingRequests = options.maxOutstandingRequests;
final Semaphore outstandingSemaphore = new Semaphore(maxOutstandingRequests);

final BenchmarkReport globalReport = BenchmarkReport.createDefault();
final BenchmarkReport intervalReport = BenchmarkReport.createDefault();
Expand Down Expand Up @@ -336,6 +355,8 @@ public void close() {
} catch (Exception ex) {
throw new WorkerException(ex);
}

outstandingRequestGauge.close(); // close observer
}

@Override
Expand Down

0 comments on commit a621afe

Please sign in to comment.