Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Transport client request life cycle timeline on diagnostics #7500

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,6 @@ public static <E extends CosmosClientException> E setPartitionKeyRangeId(E e, St
return e;
}

public static <E extends CosmosClientException> E setRequestTimeline(E e, RequestTimeline timeline) {
e.setRequestTimeline(timeline);
return e;
}

public static boolean isEnableMultipleWriteLocations(DatabaseAccount account) {
return account.getEnableMultipleWriteLocations();
}
Expand Down Expand Up @@ -418,6 +413,10 @@ public static CosmosResponseDiagnostics createCosmosResponseDiagnostics() {
return new CosmosResponseDiagnostics();
}

public static void setTransportClientRequestTimelineOnDiagnostics(CosmosResponseDiagnostics cosmosResponseDiagnostics, RequestTimeline requestTimeline) {
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
cosmosResponseDiagnostics.clientSideRequestStatistics().setTransportClientRequestTimeline(requestTimeline);
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
}

public static void recordResponse(CosmosResponseDiagnostics cosmosResponseDiagnostics,
RxDocumentServiceRequest request, StoreResult storeResult) {
cosmosResponseDiagnostics.clientSideRequestStatistics().recordResponse(request, storeResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ISessionToken;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestTimeline;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SessionTokenHelper;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
Expand All @@ -17,7 +16,6 @@

import java.lang.management.ManagementFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand All @@ -26,6 +24,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -43,7 +42,7 @@ class ClientSideRequestStatistics {
private ZonedDateTime requestEndTime;

private ConnectionMode connectionMode;

private final List<StoreResponseStatistics> responseStatisticsList;
private final List<StoreResponseStatistics> supplementalResponseStatisticsList;
private final Map<String, AddressResolutionStatistics> addressResolutionStatistics;
Expand All @@ -54,6 +53,8 @@ class ClientSideRequestStatistics {
private Set<URI> failedReplicas;
private Set<URI> regionsContacted;

private RequestTimeline transportRequestTimeline;

ClientSideRequestStatistics() {
this.requestStartTime = ZonedDateTime.now(ZoneOffset.UTC);
this.requestEndTime = ZonedDateTime.now(ZoneOffset.UTC);
Expand Down Expand Up @@ -100,7 +101,7 @@ void recordResponse(RxDocumentServiceRequest request, StoreResult storeResult) {
}

if (storeResponseStatistics.requestOperationType == OperationType.Head
|| storeResponseStatistics.requestOperationType == OperationType.HeadFeed) {
|| storeResponseStatistics.requestOperationType == OperationType.HeadFeed) {
this.supplementalResponseStatisticsList.add(storeResponseStatistics);
} else {
this.responseStatisticsList.add(storeResponseStatistics);
Expand All @@ -117,6 +118,7 @@ void recordGatewayResponse(RxDocumentServiceRequest rxDocumentServiceRequest, St
}
this.gatewayStatistic = new GatewayStatistic();
this.gatewayStatistic.operationType = rxDocumentServiceRequest.getOperationType();

if (storeResponse != null) {
this.gatewayStatistic.statusCode = storeResponse.getStatus();
this.gatewayStatistic.subStatusCode = DirectBridgeInternal.getSubStatusCode(storeResponse);
Expand All @@ -129,6 +131,9 @@ void recordGatewayResponse(RxDocumentServiceRequest rxDocumentServiceRequest, St
}
}

void setTransportClientRequestTimeline(RequestTimeline transportRequestTimeline) {
this.transportRequestTimeline = transportRequestTimeline;
}

String recordAddressResolutionStart(URI targetEndpoint) {
String identifier = Utils.randomUUID().toString();
Expand Down Expand Up @@ -221,6 +226,7 @@ public String toString() {
}

printSystemInformation(stringBuilder);
printTransportRequestTimeline(stringBuilder);
}
String requestStatsString = stringBuilder.toString();
if (!requestStatsString.isEmpty()) {
Expand Down Expand Up @@ -274,6 +280,17 @@ private void printSystemInformation(StringBuilder stringBuilder) {
}
}

private void printTransportRequestTimeline(StringBuilder stringBuilder) {
if (transportRequestTimeline != null) {
stringBuilder.append("Transport request timeline -------").append(System.lineSeparator());
Iterator<RequestTimeline.Event> iterator = transportRequestTimeline.iterator();
while (iterator.hasNext()) {
RequestTimeline.Event event = iterator.next();
stringBuilder.append(" eventName = " + event.getName() + ", startTime = " + event.getStartTime() + ", durationInMicrosec = " + event.getDuration().toNanos()/1000L).append(System.lineSeparator());
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

private static String formatDateTime(ZonedDateTime dateTime) {
if (dateTime == null) {
return null;
Expand All @@ -291,11 +308,11 @@ private class StoreResponseStatistics {
@Override
public String toString() {
return "StoreResponseStatistics{"
+ "requestResponseTime=\"" + formatDateTime(requestResponseTime) + "\"" +
", storeResult=" + storeResult
+ ", requestResourceType=" + requestResourceType
+ ", requestOperationType=" + requestOperationType
+ '}';
+ "requestResponseTime=\"" + formatDateTime(requestResponseTime) + "\"" +
", storeResult=" + storeResult
+ ", requestResourceType=" + requestResourceType
+ ", requestOperationType=" + requestOperationType
+ '}';
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,6 @@ CosmosClientException setCosmosResponseDiagnostics(CosmosResponseDiagnostics cos
return this;
}

public RequestTimeline getRequestTimeline() {
return this.requestTimeline;
}

CosmosClientException setRequestTimeline(RequestTimeline requestTimeline) {
this.requestTimeline = requestTimeline;
return this;
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" + "error=" + cosmosError + ", resourceAddress='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
Expand All @@ -16,9 +17,9 @@
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Represents the time and duration of important events in the lifetime of a request.
* Represents the startTime and duration of important events in the lifetime of a request.
* <p>
* A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, time, and
* A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, startTime, and
* duration properties. Hence, one might use this class to represent any timeline. Today we use it to represent
* request timelines for:
* <p><ul>
Expand All @@ -35,7 +36,7 @@
* new RequestTimeline.Event("foo", startTime, startTime.plusSeconds(1)),
* new RequestTimeline.Event("bar", startTime.plusSeconds(1), startTime.plusSeconds(2))));}</pre>
* JSON serialization:
* <pre>{@code [{"name":"foo","time":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","time":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}</pre>
* <pre>{@code [{"name":"foo","startTime":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","startTime":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}</pre>
*/
public final class RequestTimeline implements Iterable<RequestTimeline.Event> {

Expand All @@ -54,7 +55,7 @@ private RequestTimeline(final ImmutableList<Event> events) {
/**
* Returns an empty {@link RequestTimeline}.
*
* The empty time line returned is static.
* The empty startTime line returned is static.
*
* @return an empty {@link RequestTimeline}.
*/
Expand All @@ -75,7 +76,7 @@ public Iterator<Event> iterator() {
/**
* Returns an empty {@link RequestTimeline}.
*
* The empty time line returned is static and equivalent to calling {@link RequestTimeline#empty}.
* The empty startTime line returned is static and equivalent to calling {@link RequestTimeline#empty}.
*
* @return an empty request timeline.
*/
Expand Down Expand Up @@ -148,25 +149,33 @@ public String toString() {
return RntbdObjectMapper.toString(this);
}

@JsonPropertyOrder({ "name", "time", "duration" })
@JsonPropertyOrder({ "name", "startTime", "durationInMicroSec" })
public static final class Event {

@JsonSerialize(using = ToStringSerializer.class)
@JsonIgnore
private final Duration duration;

@JsonSerialize(using = ToStringSerializer.class)
private final long durationInMicroSec;
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved

private final String name;

@JsonSerialize(using = ToStringSerializer.class)
private final OffsetDateTime time;
private final OffsetDateTime startTime;

public Event(final String name, final OffsetDateTime from, final OffsetDateTime to) {

checkNotNull(name, "expected non-null name");

this.name = name;
this.time = from;
this.startTime = from;

this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
if(this.duration != null) {
this.durationInMicroSec = duration.toNanos()/1000L;
} else {
this.durationInMicroSec = 0;
}
}

public Duration getDuration() {
Expand All @@ -177,8 +186,8 @@ public String getName() {
return name;
}

public OffsetDateTime getTime() {
return time;
public OffsetDateTime getStartTime() {
return startTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosError;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.CosmosError;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand All @@ -29,6 +31,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -141,8 +144,7 @@ public Flux<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r

try {

if (request.getResourceType().equals(ResourceType.Document) &&
request.requestContext.cosmosResponseDiagnostics == null) {
if (request.requestContext.cosmosResponseDiagnostics == null) {
request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
}

Expand All @@ -165,9 +167,11 @@ public Flux<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r
httpHeaders,
byteBufObservable);

Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest);
ReactorNettyRequestRecord reactorNettyRequestRecord = new ReactorNettyRequestRecord();
reactorNettyRequestRecord.setTimeCreated(OffsetDateTime.now());
Mono<Pair<HttpResponse, ReactorNettyRequestRecord>> httpResponsePairMono = this.httpClient.send(httpRequest, reactorNettyRequestRecord);

return toDocumentServiceResponse(httpResponseMono, request);
return toDocumentServiceResponse(httpResponsePairMono, request);

} catch (Exception e) {
return Flux.error(e);
Expand Down Expand Up @@ -242,15 +246,16 @@ private String ensureSlashPrefixed(String path) {
* Once the customer code subscribes to the observable returned by the CRUD APIs,
* the subscription goes up till it reaches the source reactor netty's observable, and at that point the HTTP invocation will be made.
*
* @param httpResponseMono
* @param httpResponsePairMono
* @param request
* @return {@link Flux}
*/
private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpResponse> httpResponseMono,
RxDocumentServiceRequest request) {
private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<Pair<HttpResponse, ReactorNettyRequestRecord>> httpResponsePairMono,
RxDocumentServiceRequest request) {

return httpResponseMono.flatMap(httpResponse -> {
return httpResponsePairMono.flatMap(httpResponseTuple -> {

HttpResponse httpResponse = httpResponseTuple.getLeft();
// header key/value pairs
HttpHeaders httpResponseHeaders = httpResponse.headers();
int httpResponseStatus = httpResponse.statusCode();
Expand All @@ -271,6 +276,14 @@ private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
return contentObservable
.flatMap(content -> {
try {
//Adding transport client request timeline to diagnostics
ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponseTuple.getRight();
if (reactorNettyRequestRecord != null) {
reactorNettyRequestRecord.setTimeCompleted(OffsetDateTime.now());
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
BridgeInternal.setTransportClientRequestTimelineOnDiagnostics(request.requestContext.cosmosResponseDiagnostics,
reactorNettyRequestRecord.takeTimelineSnapshot());
}

// If there is any error in the header response this throws exception
// TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content, null);
Expand All @@ -279,8 +292,7 @@ private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
StoreResponse rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()),
content);
if (request.requestContext.cosmosResponseDiagnostics != null &&
request.getResourceType().equals(ResourceType.Document)) {
if (request.requestContext.cosmosResponseDiagnostics != null) {
BridgeInternal.recordGatewayResponse(request.requestContext.cosmosResponseDiagnostics, request, rsp, null);
DirectBridgeInternal.setCosmosResponseDiagnostics(rsp, request.requestContext.cosmosResponseDiagnostics);
}
Expand Down Expand Up @@ -309,8 +321,7 @@ private Flux<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
return Mono.error(dce);
}

if (request.requestContext.cosmosResponseDiagnostics != null &&
request.getResourceType().equals(ResourceType.Document)) {
if (request.requestContext.cosmosResponseDiagnostics != null) {
BridgeInternal.recordGatewayResponse(request.requestContext.cosmosResponseDiagnostics, request, null, (CosmosClientException)exception);
BridgeInternal.setCosmosResponseDiagnostics((CosmosClientException)exception, request.requestContext.cosmosResponseDiagnostics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,11 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
return Mono.fromFuture(record.whenComplete((response, throwable) -> {

record.stage(RntbdRequestRecord.Stage.COMPLETED);

if (throwable == null) {
response.setRequestTimeline(record.takeTimelineSnapshot());
} else if (throwable instanceof CosmosClientException) {
CosmosClientException error = (CosmosClientException) throwable;
BridgeInternal.setRequestTimeline(error, record.takeTimelineSnapshot());
if (request.requestContext.cosmosResponseDiagnostics == null) {
request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
}

BridgeInternal.setTransportClientRequestTimelineOnDiagnostics(request.requestContext.cosmosResponseDiagnostics, record.takeTimelineSnapshot());
})).doOnCancel(() -> {
record.cancel(true);
});
Expand Down
Loading