Skip to content

Commit

Permalink
Cosmos Issue #6240: Port read-my-writes fix from V4 (#6242)
Browse files Browse the repository at this point in the history
* Port from v4

* Corrected package misspelling in log4j.properties and removed System.exit from Main.java

* Responded to code review comments
  • Loading branch information
David Noble authored and xseeseesee committed Dec 10, 2019
1 parent cda2b92 commit 3ca8508
Show file tree
Hide file tree
Showing 23 changed files with 477 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -161,8 +162,10 @@ protected String getDocumentLink(Document doc) {
protected abstract void performWorkload(BaseSubscriber<T> baseSubscriber, long i) throws Exception;

private boolean shouldContinue(long startTimeMillis, long iterationCount) {

Duration maxDurationTime = configuration.getMaxRunningTimeDuration();
int maxNumberOfOperations = configuration.getNumberOfOperations();

if (maxDurationTime == null) {
return iterationCount < maxNumberOfOperations;
}
Expand All @@ -182,17 +185,19 @@ void run() throws Exception {

successMeter = metricsRegistry.meter("#Successful Operations");
failureMeter = metricsRegistry.meter("#Unsuccessful Operations");

if (configuration.getOperationType() == Operation.ReadLatency
|| configuration.getOperationType() == Operation.WriteLatency)
|| configuration.getOperationType() == Operation.WriteLatency) {
latency = metricsRegistry.timer("Latency");
}

reporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS);

long startTime = System.currentTimeMillis();

AtomicLong count = new AtomicLong(0);
long i;
for ( i = 0; shouldContinue(startTime, i); i++) {

for (i = 0; shouldContinue(startTime, i); i++) {

BaseSubscriber<T> baseSubscriber = new BaseSubscriber<T>() {
@Override
Expand All @@ -202,7 +207,12 @@ protected void hookOnSubscribe(Subscription subscription) {

@Override
protected void hookOnNext(T value) {
logger.debug("hookOnNext: {}, count:{}", value, count.get());
}

@Override
protected void hookOnCancel() {
this.hookOnError(new CancellationException());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscr
} else {
throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType());
}
concurrencyControlSemaphore.acquire();

concurrencyControlSemaphore.acquire();
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static void main(String[] args) throws Exception {
throw new RuntimeException(cfg.getOperationType() + " is not supported");
}

LOGGER.info("Starting {}", cfg.getOperationType());
benchmark.run();
benchmark.shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ protected void init() {

@Override
protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i) throws Exception {

Flux<Document> obs;
boolean readyMyWrite = RandomUtils.nextBoolean();

if (readyMyWrite) {

// will do a write and immediately upon success will either
// do a point read
// or single partition query
// or cross partition query to find the write.

int j = Math.toIntExact(Math.floorMod(i, 3));

switch (j) {
case 0:
// write a random document to cosmodb and update the cache.
Expand All @@ -78,7 +83,7 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
"couldn't find my write in a single partition query!"))));
break;
case 2:
// write a random document to cosmodb and update the cache.
// write a random document to cosmosdb and update the cache.
// then try to query for the document which just was written
obs = writeDocument()
.flatMap(d -> xPartitionQuery(generateQuery(d))
Expand All @@ -90,12 +95,15 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
throw new IllegalStateException();
}
} else {

// will either do
// a write
// a point read for a in memory cached document
// or single partition query for a in memory cached document
// or cross partition query for a in memory cached document

int j = Math.toIntExact(Math.floorMod(i, 4));

switch (j) {
case 0:
// write a random document to cosmosdb and update the cache
Expand Down Expand Up @@ -125,6 +133,7 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)

concurrencyControlSemaphore.acquire();

logger.debug("concurrencyControlSemaphore: {}", concurrencyControlSemaphore);
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
# this is the log4j configuration for tests
# This is the log4j configuration for benchmarks

# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
log4j.rootLogger=INFO, Console

log4j.category.com.azure.data.cosmos.internal.directconnectivity.rntbd=WARN
log4j.category.io.netty=INFO
log4j.category.io.reactivex=INFO
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.category.com.azure.cosmos=INFO
log4j.category.com.azure.cosmos.benchmark=INFO
log4j.category.com.azure.cosmos.internal=INFO
log4j.category.com.azure.cosmos.internal.caches=INFO
log4j.category.com.azure.cosmos.internal.changefeed=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd=INFO
log4j.category.com.azure.cosmos.internal.http=INFO
log4j.category.com.azure.cosmos.internal.query=INFO
log4j.category.com.azure.cosmos.internal.query.aggregation=INFO
log4j.category.com.azure.cosmos.internal.query.metrics=INFO
log4j.category.com.azure.cosmos.internal.query.orderbyquery=INFO
log4j.category.com.azure.cosmos.internal.routing=INFO

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n
log4j.category.com.azure.cosmos.internal.directconnectivity.RntbdTransportClient=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd.RntbdRequestManager=INFO

log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n

log4j.appender.LogFile=org.apache.log4j.FileAppender
log4j.appender.LogFile.File=${azure.cosmos.logger.directory}/azure-cosmos-benchmark.log
log4j.appender.LogFile.layout=org.apache.log4j.PatternLayout
log4j.appender.LogFile.layout.ConversionPattern=[%d][%p][${azure.cosmos.hostname}][thread:%t][logger:%c] %m%n
Loading

0 comments on commit 3ca8508

Please sign in to comment.