diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java
index a6816c7ebb49..ed4a74337a95 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java
@@ -221,11 +221,6 @@ public static E setPartitionKeyRangeId(E e, St
return e;
}
- public static E setRequestTimeline(E e, RequestTimeline timeline) {
- e.setRequestTimeline(timeline);
- return e;
- }
-
public static boolean isEnableMultipleWriteLocations(DatabaseAccount account) {
return account.getEnableMultipleWriteLocations();
}
@@ -418,6 +413,11 @@ public static CosmosResponseDiagnostics createCosmosResponseDiagnostics() {
return new CosmosResponseDiagnostics();
}
+ public static void setTransportClientRequestTimelineOnDiagnostics(CosmosResponseDiagnostics cosmosResponseDiagnostics,
+ RequestTimeline requestTimeline) {
+ cosmosResponseDiagnostics.clientSideRequestStatistics().setTransportClientRequestTimeline(requestTimeline);
+ }
+
public static void recordResponse(CosmosResponseDiagnostics cosmosResponseDiagnostics,
RxDocumentServiceRequest request, StoreResult storeResult) {
cosmosResponseDiagnostics.clientSideRequestStatistics().recordResponse(request, storeResult);
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ClientSideRequestStatistics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ClientSideRequestStatistics.java
index bc7d9d31b3f3..cfad3b746a74 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ClientSideRequestStatistics.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ClientSideRequestStatistics.java
@@ -4,6 +4,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
+import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
@@ -53,6 +54,7 @@ class ClientSideRequestStatistics {
private Set regionsContacted;
private RetryContext retryContext;
private GatewayStatistics gatewayStatistics;
+ private RequestTimeline transportRequestTimeline;
ClientSideRequestStatistics() {
this.requestStartTime = ZonedDateTime.now(ZoneOffset.UTC);
@@ -99,7 +101,7 @@ void recordResponse(RxDocumentServiceRequest request, StoreResult storeResult) {
}
if (storeResponseStatistics.requestOperationType == OperationType.Head
- || storeResponseStatistics.requestOperationType == OperationType.HeadFeed) {
+ || storeResponseStatistics.requestOperationType == OperationType.HeadFeed) {
this.supplementalResponseStatisticsList.add(storeResponseStatistics);
} else {
this.responseStatisticsList.add(storeResponseStatistics);
@@ -127,13 +129,18 @@ void recordGatewayResponse(RxDocumentServiceRequest rxDocumentServiceRequest, St
this.gatewayStatistics.subStatusCode = DirectBridgeInternal.getSubStatusCode(storeResponse);
this.gatewayStatistics.sessionToken = storeResponse.getHeaderValue(HttpConstants.HttpHeaders.SESSION_TOKEN);
this.gatewayStatistics.requestCharge = storeResponse.getHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE);
+ this.gatewayStatistics.requestTimeline = DirectBridgeInternal.getRequestTimeline(storeResponse);
} else if(exception != null){
this.gatewayStatistics.statusCode = exception.getStatusCode();
this.gatewayStatistics.subStatusCode = exception.getSubStatusCode();
+ this.gatewayStatistics.requestTimeline = this.transportRequestTimeline;
}
}
}
+ void setTransportClientRequestTimeline(RequestTimeline transportRequestTimeline) {
+ this.transportRequestTimeline = transportRequestTimeline;
+ }
String recordAddressResolutionStart(URI targetEndpoint) {
String identifier = Utils.randomUUID().toString();
@@ -232,6 +239,7 @@ private class GatewayStatistics {
public int statusCode;
public int subStatusCode;
public String requestCharge;
+ public RequestTimeline requestTimeline;
}
private static class SystemInformation {
@@ -284,7 +292,7 @@ public void serialize(ClientSideRequestStatistics statistics, JsonGenerator gene
generator.writeObjectField("regionsContacted", statistics.regionsContacted);
generator.writeObjectField("retryContext", statistics.retryContext);
generator.writeObjectField("gatewayStatistics", statistics.gatewayStatistics);
- generator.writeObjectField("gatewayStatistics", statistics.gatewayStatistics);
+
try {
SystemInformation systemInformation = new SystemInformation();
long totalMemory = Runtime.getRuntime().totalMemory() / 1024;
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientException.java
index f7ddd614a395..91d811466214 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientException.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientException.java
@@ -250,15 +250,6 @@ CosmosClientException setCosmosResponseDiagnostics(CosmosResponseDiagnostics cos
return this;
}
- public RequestTimeline getRequestTimeline() {
- return this.requestTimeline;
- }
-
- CosmosClientException setRequestTimeline(RequestTimeline requestTimeline) {
- this.requestTimeline = requestTimeline;
- return this;
- }
-
@Override
public String toString() {
return getClass().getSimpleName() + "{" + "error=" + cosmosError + ", resourceAddress='"
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeline.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeline.java
index 4febb68b180c..713277b586e6 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeline.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeline.java
@@ -4,6 +4,8 @@
package com.azure.cosmos.implementation;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
@@ -16,9 +18,9 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Represents the time and duration of important events in the lifetime of a request.
+ * Represents the startTime and duration of important events in the lifetime of a request.
*
- * A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, time, and
+ * A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, startTime, and
* duration properties. Hence, one might use this class to represent any timeline. Today we use it to represent
* request timelines for:
*
@@ -35,7 +37,7 @@
* new RequestTimeline.Event("foo", startTime, startTime.plusSeconds(1)),
* new RequestTimeline.Event("bar", startTime.plusSeconds(1), startTime.plusSeconds(2))));}
* JSON serialization:
- *
{@code [{"name":"foo","time":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","time":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}
+ * {@code [{"name":"foo","startTime":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","startTime":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}
*/
public final class RequestTimeline implements Iterable {
@@ -54,7 +56,7 @@ private RequestTimeline(final ImmutableList events) {
/**
* Returns an empty {@link RequestTimeline}.
*
- * The empty time line returned is static.
+ * The empty startTime line returned is static.
*
* @return an empty {@link RequestTimeline}.
*/
@@ -75,7 +77,7 @@ public Iterator iterator() {
/**
* Returns an empty {@link RequestTimeline}.
*
- * The empty time line returned is static and equivalent to calling {@link RequestTimeline#empty}.
+ * The empty startTime line returned is static and equivalent to calling {@link RequestTimeline#empty}.
*
* @return an empty request timeline.
*/
@@ -148,25 +150,34 @@ public String toString() {
return RntbdObjectMapper.toString(this);
}
- @JsonPropertyOrder({ "name", "time", "duration" })
+ @JsonPropertyOrder({ "name", "startTime", "durationInMicroSec" })
public static final class Event {
- @JsonSerialize(using = ToStringSerializer.class)
+ @JsonIgnore
private final Duration duration;
+ @JsonSerialize(using = ToStringSerializer.class)
+ private final long durationInMicroSec;
+
+ @JsonProperty("eventName")
private final String name;
@JsonSerialize(using = ToStringSerializer.class)
- private final OffsetDateTime time;
+ private final OffsetDateTime startTime;
public Event(final String name, final OffsetDateTime from, final OffsetDateTime to) {
checkNotNull(name, "expected non-null name");
this.name = name;
- this.time = from;
+ this.startTime = from;
this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
+ if(this.duration != null) {
+ this.durationInMicroSec = duration.toNanos()/1000L;
+ } else {
+ this.durationInMicroSec = 0;
+ }
}
public Duration getDuration() {
@@ -177,8 +188,8 @@ public String getName() {
return name;
}
- public OffsetDateTime getTime() {
- return time;
+ public OffsetDateTime getStartTime() {
+ return startTime;
}
}
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java
index 6b1f9c9f6d8f..a133d877dfb9 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java
@@ -2,17 +2,18 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;
+import com.azure.cosmos.BridgeInternal;
+import com.azure.cosmos.ConsistencyLevel;
+import com.azure.cosmos.CosmosClientException;
+import com.azure.cosmos.CosmosError;
+import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
-import com.azure.cosmos.BridgeInternal;
-import com.azure.cosmos.ConsistencyLevel;
-import com.azure.cosmos.CosmosClientException;
-import com.azure.cosmos.CosmosError;
-import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
+import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
@@ -29,6 +30,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
+import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -141,8 +143,7 @@ public Flux performRequest(RxDocumentServiceRequest r
try {
- if (request.getResourceType().equals(ResourceType.Document) &&
- request.requestContext.cosmosResponseDiagnostics == null) {
+ if (request.requestContext.cosmosResponseDiagnostics == null) {
request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
}
@@ -271,6 +272,14 @@ private Flux toDocumentServiceResponse(Mono {
try {
+ //Adding transport client request timeline to diagnostics
+ ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse.request().getReactorNettyRequestRecord();
+ if (reactorNettyRequestRecord != null) {
+ reactorNettyRequestRecord.setTimeCompleted(OffsetDateTime.now());
+ BridgeInternal.setTransportClientRequestTimelineOnDiagnostics(request.requestContext.cosmosResponseDiagnostics,
+ reactorNettyRequestRecord.takeTimelineSnapshot());
+ }
+
// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);
@@ -279,6 +288,7 @@ private Flux toDocumentServiceResponse(Mono invokeStoreAsync(final Uri addressUri, final RxDocume
return Mono.fromFuture(record.whenComplete((response, throwable) -> {
record.stage(RntbdRequestRecord.Stage.COMPLETED);
+ if (request.requestContext.cosmosResponseDiagnostics == null) {
+ request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
+ }
- if (throwable == null) {
+ if(response != null) {
response.setRequestTimeline(record.takeTimelineSnapshot());
- } else if (throwable instanceof CosmosClientException) {
- CosmosClientException error = (CosmosClientException) throwable;
- BridgeInternal.setRequestTimeline(error, record.takeTimelineSnapshot());
}
-
})).doOnCancel(() -> {
logger.debug("REQUEST CANCELLED: {}", record);
});
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java
index c3b6f7f7f3c2..44b94f975bac 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java
@@ -121,13 +121,12 @@ StoreResponse setCosmosResponseDiagnostics(CosmosResponseDiagnostics cosmosRespo
return this;
}
- public RequestTimeline getRequestTimeline() {
- return this.requestTimeline;
+ void setRequestTimeline(RequestTimeline requestTimeline) {
+ this.requestTimeline = requestTimeline;
}
- public StoreResponse setRequestTimeline(RequestTimeline requestTimeline) {
- this.requestTimeline = requestTimeline;
- return this;
+ RequestTimeline getRequestTimeline() {
+ return this.requestTimeline;
}
int getSubStatusCode() {
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java
index f204152f6da7..0499e4bb843a 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java
@@ -195,6 +195,7 @@ public void serialize(StoreResult storeResult,
jsonGenerator.writeNumberField("itemLSN", storeResult.itemLSN);
jsonGenerator.writeStringField("sessionToken", (storeResult.sessionToken != null ? storeResult.sessionToken.convertToString() : null));
jsonGenerator.writeStringField("exception", BridgeInternal.getInnerErrorMessage(storeResult.exception));
+ jsonGenerator.writeObjectField("transportRequestTimeline", storeResult.storeResponse != null ? storeResult.storeResponse.getRequestTimeline() : null);
jsonGenerator.writeEndObject();
}
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java
index 01cdaea76eff..02fbfa3bb257 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java
@@ -217,7 +217,7 @@ public RequestTimeline takeTimelineSnapshot() {
timeQueued, timePipelined == null ? timeCompletedOrNow : timePipelined),
new RequestTimeline.Event("pipelined",
timePipelined, timeSent == null ? timeCompletedOrNow : timeSent),
- new RequestTimeline.Event("sent",
+ new RequestTimeline.Event("transitTime",
timeSent, timeReceived == null ? timeCompletedOrNow : timeReceived),
new RequestTimeline.Event("received",
timeReceived, timeCompletedOrNow),
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java
index 4ff3c4fe6e3a..44a38557d151 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java
@@ -5,7 +5,6 @@
import reactor.core.publisher.Mono;
import reactor.netty.resources.ConnectionProvider;
-import java.io.Closeable;
import java.time.Duration;
/**
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java
index 06629f2c2a8a..4dd9cb0336ee 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpRequest.java
@@ -10,6 +10,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
+import java.time.OffsetDateTime;
/**
* The outgoing Http request.
@@ -20,6 +21,7 @@ public class HttpRequest {
private int port;
private HttpHeaders headers;
private Flux body;
+ private ReactorNettyRequestRecord reactorNettyRequestRecord;
/**
* Create a new HttpRequest instance.
@@ -32,6 +34,7 @@ public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders httpHea
this.uri = uri;
this.port = port;
this.headers = httpHeaders;
+ this.reactorNettyRequestRecord = createReactorNettyRequestRecord();
}
/**
@@ -45,6 +48,7 @@ public HttpRequest(HttpMethod httpMethod, String uri, int port) throws URISyntax
this.uri = new URI(uri);
this.port = port;
this.headers = new HttpHeaders();
+ this.reactorNettyRequestRecord = createReactorNettyRequestRecord();
}
/**
@@ -61,6 +65,7 @@ public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders headers
this.port = port;
this.headers = headers;
this.body = body;
+ this.reactorNettyRequestRecord = createReactorNettyRequestRecord();
}
/**
@@ -202,4 +207,28 @@ public HttpRequest withBody(Flux content) {
this.body = content;
return this;
}
+
+ /**
+ * Sets ReactorNettyRequestRecord for recording request timeline.
+ *
+ * @param reactorNettyRequestRecord the reactor netty request record
+ */
+ public void setReactorNettyRequestRecord(ReactorNettyRequestRecord reactorNettyRequestRecord) {
+ this.reactorNettyRequestRecord = reactorNettyRequestRecord;
+ }
+
+ /**
+ * Gets ReactorNettyRequestRecord for recording request timeline
+ *
+ * @return reactorNettyRequestRecord the reactor netty request record
+ */
+ public ReactorNettyRequestRecord getReactorNettyRequestRecord() {
+ return this.reactorNettyRequestRecord;
+ }
+
+ private ReactorNettyRequestRecord createReactorNettyRequestRecord(){
+ ReactorNettyRequestRecord reactorNettyRequestRecord = new ReactorNettyRequestRecord();
+ reactorNettyRequestRecord.setTimeCreated(OffsetDateTime.now());
+ return reactorNettyRequestRecord;
+ }
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
index 700cb6cbe7d8..e7017d727d1b 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
@@ -8,6 +8,7 @@
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
+import org.apache.commons.lang3.tuple.Pair;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,10 +19,12 @@
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
+import reactor.netty.http.client.HttpClientState;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.ProxyProvider;
import java.nio.charset.Charset;
+import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.function.BiFunction;
@@ -66,7 +69,7 @@ public static ReactorNettyClient createWithConnectionProvider(ConnectionProvider
private void configureChannelPipelineHandlers() {
Configs configs = this.httpClientConfig.getConfigs();
this.httpClient = this.httpClient.tcpConfiguration(tcpClient -> {
-
+
if (this.httpClientConfig.getProxy() != null) {
tcpClient =
tcpClient.proxy(typeSpec -> typeSpec.type(ProxyProvider.Proxy.HTTP).address(this.httpClientConfig.getProxy()));
@@ -95,9 +98,26 @@ public Mono send(final HttpRequest request) {
Objects.requireNonNull(request.httpMethod());
Objects.requireNonNull(request.uri());
Objects.requireNonNull(this.httpClientConfig);
+ if(request.getReactorNettyRequestRecord() == null) {
+ ReactorNettyRequestRecord reactorNettyRequestRecord = new ReactorNettyRequestRecord();
+ reactorNettyRequestRecord.setTimeCreated(OffsetDateTime.now());
+ request.setReactorNettyRequestRecord(reactorNettyRequestRecord);
+ }
return this.httpClient
- .keepAlive(this.httpClientConfig.isConnectionKeepAlive())
+ .observe((connection, state) -> {
+ OffsetDateTime time = OffsetDateTime.now();
+ if(state.equals(HttpClientState.CONNECTED) || state.equals(HttpClientState.ACQUIRED)){
+ request.getReactorNettyRequestRecord().setTimeConnected(time);
+ } else if(state.equals(HttpClientState.CONFIGURED)){
+ request.getReactorNettyRequestRecord().setTimeConfigured(time);
+ } else if(state.equals(HttpClientState.REQUEST_SENT)){
+ request.getReactorNettyRequestRecord().setTimeSent(time);
+ } else if(state.equals(HttpClientState.RESPONSE_RECEIVED)){
+ request.getReactorNettyRequestRecord().setTimeReceived(time);
+ }
+ })
+ .keepAlive(this.httpClientConfig.isConnectionKeepAlive())
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java
new file mode 100644
index 000000000000..d27efbb0f2ef
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java
@@ -0,0 +1,157 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.http;
+
+import com.azure.cosmos.implementation.RequestTimeline;
+import reactor.netty.http.client.HttpClientState;
+
+import java.time.OffsetDateTime;
+
+/**
+ * Represents the timeline of various events in the lifetime of a reactor netty request response.
+ *
+ * A {@link ReactorNettyRequestRecord} create a snapshot of {@link RequestTimeline} based on various {@link HttpClientState}.
+ * Below are the states which are used to capture {@link RequestTimeline} snapshot.
+ *
+ * - {@link HttpClientState#CONNECTED},
+ *
- {@link HttpClientState#ACQUIRED},
+ *
- {@link HttpClientState#CONFIGURED},
+ *
- {@link HttpClientState#REQUEST_SENT},
+ *
- {@link HttpClientState#RESPONSE_RECEIVED},
+ *
+ */
+public final class ReactorNettyRequestRecord {
+
+ private volatile OffsetDateTime timeCreated;
+ private volatile OffsetDateTime timeConnected;
+ private volatile OffsetDateTime timeConfigured;
+ private volatile OffsetDateTime timeSent;
+ private volatile OffsetDateTime timeReceived;
+ private volatile OffsetDateTime timeCompleted;
+
+ /**
+ * Gets request created offsetDateTime.
+ * @return
+ */
+ public OffsetDateTime timeCreated() {
+ return this.timeCreated;
+ }
+
+ /**
+ * Get connection established offsetDateTime.
+ * @return timeConnected
+ */
+ public OffsetDateTime timeConnected() {
+ return this.timeConnected;
+ }
+
+ /**
+ * Get connection configured offsetDateTime.
+ * @return timeConfigured
+ */
+ public OffsetDateTime timeConfigured() {
+ return this.timeConfigured;
+ }
+
+ /**
+ * Gets request sent offsetDateTime.
+ * @return timeSent
+ */
+ public OffsetDateTime timeSent() {
+ return this.timeSent;
+ }
+
+ /**
+ * Gets response received offsetDateTime.
+ * @return timeReceived
+ */
+ public OffsetDateTime timeReceived() {
+ return this.timeReceived;
+ }
+
+ /**
+ * Gets request completed offsetDateTime.
+ * @return timeCompleted
+ */
+ public OffsetDateTime timeCompleted() {
+ return this.timeCompleted;
+ }
+
+ /**
+ * Sets request created offsetDateTime.
+ * @param timeCreated
+ */
+ public void setTimeCreated(OffsetDateTime timeCreated) {
+ this.timeCreated = timeCreated;
+ }
+
+ /**
+ * Sets connection established offsetDateTime.
+ * @param timeConnected
+ */
+ public void setTimeConnected(OffsetDateTime timeConnected) {
+ this.timeConnected = timeConnected;
+ }
+
+ /**
+ * Sets connection configured offsetDateTime.
+ * @param timeConfigured
+ */
+ public void setTimeConfigured(OffsetDateTime timeConfigured) {
+ this.timeConfigured = timeConfigured;
+ }
+
+ /**
+ * Sets request sent offsetDateTime.
+ * @param timeSent
+ */
+ public void setTimeSent(OffsetDateTime timeSent) {
+ this.timeSent = timeSent;
+ }
+
+ /**
+ * Sets response received offsetDateTime.
+ * @param timeReceived
+ */
+ public void setTimeReceived(OffsetDateTime timeReceived) {
+ this.timeReceived = timeReceived;
+ }
+
+ /**
+ * Sets request completed offsetDateTime.
+ * @param timeCompleted
+ */
+ public void setTimeCompleted(OffsetDateTime timeCompleted) {
+ this.timeCompleted = timeCompleted;
+ }
+
+ /**
+ * Creates the RequestTimeline snapshot.
+ * @return requestTimeline
+ */
+ public RequestTimeline takeTimelineSnapshot() {
+
+ OffsetDateTime now = OffsetDateTime.now();
+
+ OffsetDateTime timeCreated = this.timeCreated();
+ OffsetDateTime timeConnected = this.timeConnected();
+ OffsetDateTime timeConfigured = this.timeConfigured();
+ OffsetDateTime timeSent = this.timeSent();
+ OffsetDateTime timeReceived = this.timeReceived();
+ OffsetDateTime timeCompleted = this.timeCompleted();
+ OffsetDateTime timeCompletedOrNow = timeCompleted == null ? now : timeCompleted;
+
+ return RequestTimeline.of(
+ new RequestTimeline.Event("connectionCreated",
+ timeCreated, timeConnected() == null ? timeCompletedOrNow : timeConnected),
+ new RequestTimeline.Event("connectionConfigured",
+ timeConnected, timeConfigured == null ? timeCompletedOrNow : timeConfigured),
+ new RequestTimeline.Event("requestSent",
+ timeConfigured, timeSent == null ? timeCompletedOrNow : timeSent),
+ new RequestTimeline.Event("transitTime",
+ timeSent, timeReceived == null ? timeCompletedOrNow : timeReceived),
+ new RequestTimeline.Event("received",
+ timeReceived, timeCompletedOrNow));
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosResponseDiagnosticsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosResponseDiagnosticsTest.java
index 48ae24a1788f..6c5cab30f3ad 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosResponseDiagnosticsTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosResponseDiagnosticsTest.java
@@ -64,6 +64,7 @@ public void gatewayDiagnostics() throws CosmosClientException {
assertThat(diagnostics).doesNotContain(("\"gatewayStatistics\":null"));
assertThat(diagnostics).contains("\"operationType\":\"Create\"");
assertThat(createResponse.getCosmosResponseDiagnostics().getRequestLatency()).isNotNull();
+ validateTransportRequestTimelineGateway(diagnostics);
}
@Test(groups = {"simple"})
@@ -75,9 +76,9 @@ public void gatewayDiagnosticsOnException() throws CosmosClientException {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
cosmosItemRequestOptions.setPartitionKey(new PartitionKey("wrongPartitionKey"));
CosmosItemResponse readResponse =
- this.container.readItem(createResponse.getProperties().getId(),
- new PartitionKey("wrongPartitionKey"),
- CosmosItemProperties.class);
+ this.container.readItem(createResponse.getProperties().getId(),
+ new PartitionKey("wrongPartitionKey"),
+ CosmosItemProperties.class);
fail("request should fail as partition key is wrong");
} catch (CosmosClientException exception) {
String diagnostics = exception.getCosmosResponseDiagnostics().toString();
@@ -87,6 +88,7 @@ public void gatewayDiagnosticsOnException() throws CosmosClientException {
assertThat(diagnostics).contains("\"statusCode\":404");
assertThat(diagnostics).contains("\"operationType\":\"Read\"");
assertThat(exception.getCosmosResponseDiagnostics().getRequestLatency()).isNotNull();
+ validateTransportRequestTimelineGateway(diagnostics);
}
}
@@ -114,6 +116,7 @@ public void directDiagnostics() throws CosmosClientException {
assertThat(diagnostics).contains("\"gatewayStatistics\":null");
assertThat(diagnostics).contains("addressResolutionStatistics");
assertThat(createResponse.getCosmosResponseDiagnostics().getRequestLatency()).isNotNull();
+ validateTransportRequestTimelineDirect(diagnostics);
}
@Test(groups = {"simple"})
@@ -126,15 +129,18 @@ public void directDiagnosticsOnException() throws CosmosClientException {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
cosmosItemRequestOptions.setPartitionKey(new PartitionKey("wrongPartitionKey"));
CosmosItemResponse readResponse =
- cosmosContainer.readItem(createResponse.getProperties().getId(),
- new PartitionKey("wrongPartitionKey"),
- CosmosItemProperties.class);
+ cosmosContainer.readItem(createResponse.getProperties().getId(),
+ new PartitionKey("wrongPartitionKey"),
+ CosmosItemProperties.class);
fail("request should fail as partition key is wrong");
} catch (CosmosClientException exception) {
String diagnostics = exception.getCosmosResponseDiagnostics().toString();
assertThat(exception.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.NOTFOUND);
assertThat(diagnostics).contains("\"connectionMode\":\"DIRECT\"");
assertThat(exception.getCosmosResponseDiagnostics().getRequestLatency()).isNotNull();
+ // TODO https://github.com/Azure/azure-sdk-for-java/issues/8035
+ // uncomment below if above issue is fixed
+ //validateTransportRequestTimelineDirect(diagnostics);
}
}
@@ -187,4 +193,21 @@ private void clearStoreResponseStatistics(ClientSideRequestStatistics requestSta
storeResponseStatisticsField.setAccessible(true);
storeResponseStatisticsField.set(requestStatistics, new ArrayList());
}
+
+ private void validateTransportRequestTimelineGateway(String diagnostics) {
+ assertThat(diagnostics).contains("\"eventName\":\"connectionConfigured\"");
+ assertThat(diagnostics).contains("\"eventName\":\"connectionConfigured\"");
+ assertThat(diagnostics).contains("\"eventName\":\"requestSent\"");
+ assertThat(diagnostics).contains("\"eventName\":\"transitTime\"");
+ assertThat(diagnostics).contains("\"eventName\":\"received\"");
+ }
+
+ private void validateTransportRequestTimelineDirect(String diagnostics) {
+ assertThat(diagnostics).contains("\"eventName\":\"created\"");
+ assertThat(diagnostics).contains("\"eventName\":\"queued\"");
+ assertThat(diagnostics).contains("\"eventName\":\"pipelined\"");
+ assertThat(diagnostics).contains("\"eventName\":\"transitTime\"");
+ assertThat(diagnostics).contains("\"eventName\":\"received\"");
+ assertThat(diagnostics).contains("\"eventName\":\"completed\"");
+ }
}