Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Transport client request life cycle timeline on diagnostics #7500

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,6 @@ public static <E extends CosmosClientException> E setPartitionKeyRangeId(E e, St
return e;
}

public static <E extends CosmosClientException> E setRequestTimeline(E e, RequestTimeline timeline) {
e.setRequestTimeline(timeline);
return e;
}

public static boolean isEnableMultipleWriteLocations(DatabaseAccount account) {
return account.getEnableMultipleWriteLocations();
}
Expand Down Expand Up @@ -418,6 +413,11 @@ public static CosmosResponseDiagnostics createCosmosResponseDiagnostics() {
return new CosmosResponseDiagnostics();
}

public static void setTransportClientRequestTimelineOnDiagnostics(CosmosResponseDiagnostics cosmosResponseDiagnostics,
RequestTimeline requestTimeline) {
cosmosResponseDiagnostics.clientSideRequestStatistics().setTransportClientRequestTimeline(requestTimeline);
}

public static void recordResponse(CosmosResponseDiagnostics cosmosResponseDiagnostics,
RxDocumentServiceRequest request, StoreResult storeResult) {
cosmosResponseDiagnostics.clientSideRequestStatistics().recordResponse(request, storeResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
Expand Down Expand Up @@ -53,6 +54,7 @@ class ClientSideRequestStatistics {
private Set<URI> regionsContacted;
private RetryContext retryContext;
private GatewayStatistics gatewayStatistics;
private RequestTimeline transportRequestTimeline;

ClientSideRequestStatistics() {
this.requestStartTime = ZonedDateTime.now(ZoneOffset.UTC);
Expand Down Expand Up @@ -99,7 +101,7 @@ void recordResponse(RxDocumentServiceRequest request, StoreResult storeResult) {
}

if (storeResponseStatistics.requestOperationType == OperationType.Head
|| storeResponseStatistics.requestOperationType == OperationType.HeadFeed) {
|| storeResponseStatistics.requestOperationType == OperationType.HeadFeed) {
this.supplementalResponseStatisticsList.add(storeResponseStatistics);
} else {
this.responseStatisticsList.add(storeResponseStatistics);
Expand Down Expand Up @@ -127,13 +129,18 @@ void recordGatewayResponse(RxDocumentServiceRequest rxDocumentServiceRequest, St
this.gatewayStatistics.subStatusCode = DirectBridgeInternal.getSubStatusCode(storeResponse);
this.gatewayStatistics.sessionToken = storeResponse.getHeaderValue(HttpConstants.HttpHeaders.SESSION_TOKEN);
this.gatewayStatistics.requestCharge = storeResponse.getHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE);
this.gatewayStatistics.requestTimeline = DirectBridgeInternal.getRequestTimeline(storeResponse);
} else if(exception != null){
this.gatewayStatistics.statusCode = exception.getStatusCode();
this.gatewayStatistics.subStatusCode = exception.getSubStatusCode();
this.gatewayStatistics.requestTimeline = this.transportRequestTimeline;
}
}
}

void setTransportClientRequestTimeline(RequestTimeline transportRequestTimeline) {
this.transportRequestTimeline = transportRequestTimeline;
}

String recordAddressResolutionStart(URI targetEndpoint) {
String identifier = Utils.randomUUID().toString();
Expand Down Expand Up @@ -232,6 +239,7 @@ private class GatewayStatistics {
public int statusCode;
public int subStatusCode;
public String requestCharge;
public RequestTimeline requestTimeline;
}

private static class SystemInformation {
Expand Down Expand Up @@ -284,7 +292,7 @@ public void serialize(ClientSideRequestStatistics statistics, JsonGenerator gene
generator.writeObjectField("regionsContacted", statistics.regionsContacted);
generator.writeObjectField("retryContext", statistics.retryContext);
generator.writeObjectField("gatewayStatistics", statistics.gatewayStatistics);
generator.writeObjectField("gatewayStatistics", statistics.gatewayStatistics);

try {
SystemInformation systemInformation = new SystemInformation();
long totalMemory = Runtime.getRuntime().totalMemory() / 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,6 @@ CosmosClientException setCosmosResponseDiagnostics(CosmosResponseDiagnostics cos
return this;
}

public RequestTimeline getRequestTimeline() {
return this.requestTimeline;
}

CosmosClientException setRequestTimeline(RequestTimeline requestTimeline) {
this.requestTimeline = requestTimeline;
return this;
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" + "error=" + cosmosError + ", resourceAddress='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
Expand All @@ -16,9 +18,9 @@
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Represents the time and duration of important events in the lifetime of a request.
* Represents the startTime and duration of important events in the lifetime of a request.
* <p>
* A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, time, and
* A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, startTime, and
* duration properties. Hence, one might use this class to represent any timeline. Today we use it to represent
* request timelines for:
* <p><ul>
Expand All @@ -35,7 +37,7 @@
* new RequestTimeline.Event("foo", startTime, startTime.plusSeconds(1)),
* new RequestTimeline.Event("bar", startTime.plusSeconds(1), startTime.plusSeconds(2))));}</pre>
* JSON serialization:
* <pre>{@code [{"name":"foo","time":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","time":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}</pre>
* <pre>{@code [{"name":"foo","startTime":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","startTime":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}</pre>
*/
public final class RequestTimeline implements Iterable<RequestTimeline.Event> {

Expand All @@ -54,7 +56,7 @@ private RequestTimeline(final ImmutableList<Event> events) {
/**
* Returns an empty {@link RequestTimeline}.
*
* The empty time line returned is static.
* The empty startTime line returned is static.
*
* @return an empty {@link RequestTimeline}.
*/
Expand All @@ -75,7 +77,7 @@ public Iterator<Event> iterator() {
/**
* Returns an empty {@link RequestTimeline}.
*
* The empty time line returned is static and equivalent to calling {@link RequestTimeline#empty}.
* The empty startTime line returned is static and equivalent to calling {@link RequestTimeline#empty}.
*
* @return an empty request timeline.
*/
Expand Down Expand Up @@ -148,25 +150,34 @@ public String toString() {
return RntbdObjectMapper.toString(this);
}

@JsonPropertyOrder({ "name", "time", "duration" })
@JsonPropertyOrder({ "name", "startTime", "durationInMicroSec" })
public static final class Event {

@JsonSerialize(using = ToStringSerializer.class)
@JsonIgnore
private final Duration duration;

@JsonSerialize(using = ToStringSerializer.class)
private final long durationInMicroSec;

@JsonProperty("eventName")
private final String name;

@JsonSerialize(using = ToStringSerializer.class)
private final OffsetDateTime time;
private final OffsetDateTime startTime;

public Event(final String name, final OffsetDateTime from, final OffsetDateTime to) {

checkNotNull(name, "expected non-null name");

this.name = name;
this.time = from;
this.startTime = from;

this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
if(this.duration != null) {
this.durationInMicroSec = duration.toNanos()/1000L;
} else {
this.durationInMicroSec = 0;
}
}

public Duration getDuration() {
Expand All @@ -177,8 +188,8 @@ public String getName() {
return name;
}

public OffsetDateTime getTime() {
return time;
public OffsetDateTime getStartTime() {
return startTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosError;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosError;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
Expand All @@ -29,6 +30,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -141,8 +143,7 @@ public Flux<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r

try {

if (request.getResourceType().equals(ResourceType.Document) &&
request.requestContext.cosmosResponseDiagnostics == null) {
if (request.requestContext.cosmosResponseDiagnostics == null) {
request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
}

Expand Down Expand Up @@ -271,6 +272,14 @@ private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
return contentObservable
.flatMap(content -> {
try {
//Adding transport client request timeline to diagnostics
ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse.request().getReactorNettyRequestRecord();
if (reactorNettyRequestRecord != null) {
reactorNettyRequestRecord.setTimeCompleted(OffsetDateTime.now());
BridgeInternal.setTransportClientRequestTimelineOnDiagnostics(request.requestContext.cosmosResponseDiagnostics,
reactorNettyRequestRecord.takeTimelineSnapshot());
}

// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);
Expand All @@ -279,6 +288,7 @@ private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
DirectBridgeInternal.setRequestTimeline(rsp, reactorNettyRequestRecord.takeTimelineSnapshot());
if (request.requestContext.cosmosResponseDiagnostics != null) {
BridgeInternal.recordGatewayResponse(request.requestContext.cosmosResponseDiagnostics, request, rsp, null);
DirectBridgeInternal.setCosmosResponseDiagnostics(rsp, request.requestContext.cosmosResponseDiagnostics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.CosmosResponseDiagnostics;
import com.azure.cosmos.implementation.RequestTimeline;

/**
* This is meant to be used only internally as a bridge access to classes in
Expand All @@ -18,4 +19,12 @@ public static void setCosmosResponseDiagnostics(StoreResponse storeResponse, Cos
public static int getSubStatusCode(StoreResponse storeResponse) {
return storeResponse.getSubStatusCode();
}

public static RequestTimeline getRequestTimeline(StoreResponse storeResponse) {
return storeResponse.getRequestTimeline();
}

public static void setRequestTimeline(StoreResponse storeResponse, RequestTimeline requestTimeline) {
storeResponse.setRequestTimeline(requestTimeline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,13 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
return Mono.fromFuture(record.whenComplete((response, throwable) -> {

record.stage(RntbdRequestRecord.Stage.COMPLETED);
if (request.requestContext.cosmosResponseDiagnostics == null) {
request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
}

if (throwable == null) {
if(response != null) {
response.setRequestTimeline(record.takeTimelineSnapshot());
} else if (throwable instanceof CosmosClientException) {
CosmosClientException error = (CosmosClientException) throwable;
BridgeInternal.setRequestTimeline(error, record.takeTimelineSnapshot());
}

})).doOnCancel(() -> {
logger.debug("REQUEST CANCELLED: {}", record);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,12 @@ StoreResponse setCosmosResponseDiagnostics(CosmosResponseDiagnostics cosmosRespo
return this;
}

public RequestTimeline getRequestTimeline() {
return this.requestTimeline;
void setRequestTimeline(RequestTimeline requestTimeline) {
this.requestTimeline = requestTimeline;
}

public StoreResponse setRequestTimeline(RequestTimeline requestTimeline) {
this.requestTimeline = requestTimeline;
return this;
RequestTimeline getRequestTimeline() {
return this.requestTimeline;
}

int getSubStatusCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void serialize(StoreResult storeResult,
jsonGenerator.writeNumberField("itemLSN", storeResult.itemLSN);
jsonGenerator.writeStringField("sessionToken", (storeResult.sessionToken != null ? storeResult.sessionToken.convertToString() : null));
jsonGenerator.writeStringField("exception", BridgeInternal.getInnerErrorMessage(storeResult.exception));
jsonGenerator.writeObjectField("transportRequestTimeline", storeResult.storeResponse != null ? storeResult.storeResponse.getRequestTimeline() : null);
jsonGenerator.writeEndObject();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public RequestTimeline takeTimelineSnapshot() {
timeQueued, timePipelined == null ? timeCompletedOrNow : timePipelined),
new RequestTimeline.Event("pipelined",
timePipelined, timeSent == null ? timeCompletedOrNow : timeSent),
new RequestTimeline.Event("sent",
new RequestTimeline.Event("transitTime",
timeSent, timeReceived == null ? timeCompletedOrNow : timeReceived),
new RequestTimeline.Event("received",
timeReceived, timeCompletedOrNow),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import reactor.core.publisher.Mono;
import reactor.netty.resources.ConnectionProvider;

import java.io.Closeable;
import java.time.Duration;

/**
Expand Down
Loading