Skip to content

Commit

Permalink
Completed port from v4. Unit tests pass locally. Next up: tests on cl…
Browse files Browse the repository at this point in the history
…oud. (#9211)
  • Loading branch information
David Noble authored Mar 24, 2020
1 parent b580ccc commit 72d66ac
Show file tree
Hide file tree
Showing 22 changed files with 1,413 additions and 727 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.cosmos.internal;

import com.azure.data.cosmos.internal.directconnectivity.rntbd.RntbdObjectMapper;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.google.common.collect.ImmutableList;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Iterator;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Represents the startTime and duration of important events in the lifetime of a request.
* <p>
* A {@link RequestTimeline} represents a timeline as a sequence of {@link Event} instances with name, startTime, and
* duration properties. Hence, one might use this class to represent any timeline. Today we use it to represent
* request timelines for:
* <p><ul>
* <li>{@link com.azure.cosmos.implementation.http.HttpClient#send},
* <li>{@link com.azure.cosmos.implementation.directconnectivity.HttpTransportClient#invokeStoreAsync}, and
* <li>{@link com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient#invokeStoreAsync}.
* </ul></p>
* A {@link RequestTimeline} serializes to JSON as an array of {@link Event} instances. This is the default
* serialization for any class that implements {@link Iterable}.
* <p>
* <b>Example:</b>
* <pre>{@code OffsetDateTime startTime = OffsetDateTime.parse("2020-01-07T11:24:12.842749-08:00", DateTimeFormatter.ISO_OFFSET_DATE_TIME);
* sys.out.println(RequestTimeline.of(
* new RequestTimeline.Event("foo", startTime, startTime.plusSeconds(1)),
* new RequestTimeline.Event("bar", startTime.plusSeconds(1), startTime.plusSeconds(2))));}</pre>
* JSON serialization:
* <pre>{@code [{"name":"foo","startTime":"2020-01-07T11:24:12.842749-08:00","duration":"PT1S"},{"name":"bar","startTime":"2020-01-07T11:24:13.842749-08:00","duration":"PT1S"}])}</pre>
*/
public final class RequestTimeline implements Iterable<RequestTimeline.Event> {

private static final RequestTimeline EMPTY = new RequestTimeline();
private final ImmutableList<Event> events;

private RequestTimeline() {
this.events = ImmutableList.of();
}

private RequestTimeline(final ImmutableList<Event> events) {
checkNotNull(events, "expected non-null events");
this.events = events;
}

/**
* Returns an empty {@link RequestTimeline}.
*
* The empty startTime line returned is static.
*
* @return an empty {@link RequestTimeline}.
*/
public static RequestTimeline empty() {
return EMPTY;
}

/**
* Returns an iterator for enumerating the {@link Event} instances in this {@link RequestTimeline}.
*
* @return an iterator for enumerating the {@link Event} instances in this {@link RequestTimeline}.
*/
@Override
public Iterator<Event> iterator() {
return this.events.iterator();
}

/**
* Returns an empty {@link RequestTimeline}.
*
* The empty startTime line returned is static and equivalent to calling {@link RequestTimeline#empty}.
*
* @return an empty request timeline.
*/
public static RequestTimeline of() {
return EMPTY;
}

/**
* Returns a new {@link RequestTimeline} with a single event.
*
* @return a new {@link RequestTimeline} with a single event.
*/
public static RequestTimeline of(final Event event) {
return new RequestTimeline(ImmutableList.of(event));
}

/**
* Returns a new {@link RequestTimeline} with a pair of events.
*
* @return a new {@link RequestTimeline} with a pair of events.
*/
public static RequestTimeline of(final Event e1, final Event e2) {
return new RequestTimeline(ImmutableList.of(e1, e2));
}

/**
* Returns a new {@link RequestTimeline} with three events.
*
* @return a new {@link RequestTimeline} with three events.
*/
public static RequestTimeline of(final Event e1, final Event e2, final Event e3) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3));
}

/**
* Returns a new {@link RequestTimeline} with four events.
*
* @return a new {@link RequestTimeline} with four events.
*/
public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4));
}

/**
* Returns a new {@link RequestTimeline} with five events.
*
* @return a new {@link RequestTimeline} with five events.
*/
public static RequestTimeline of(final Event e1, final Event e2, final Event e3, final Event e4, final Event e5) {
return new RequestTimeline(ImmutableList.of(e1, e2, e3, e4, e5));
}

/**
* Returns a new {@link RequestTimeline} with an arbitrary number of events.
*
* @return a new {@link RequestTimeline} with an arbitrary number of events.
*/
public static RequestTimeline of(final Event... events) {
return new RequestTimeline(ImmutableList.copyOf(events));
}

/**
* Returns a textual representation of this {@link RequestTimeline}.
* <p>
* The textual representation returned is a string of the form {@code RequestTimeline(}<i> &lt;event-array&gt;</i>
* {@code )}.
*/
@Override
public String toString() {
return RntbdObjectMapper.toString(this);
}

@JsonPropertyOrder({ "name", "startTime", "durationInMicroSec" })
public static final class Event {

@JsonIgnore
private final Duration duration;

@JsonSerialize(using = ToStringSerializer.class)
private final long durationInMicroSec;

@JsonProperty("eventName")
private final String name;

@JsonSerialize(using = ToStringSerializer.class)
private final OffsetDateTime startTime;

public Event(final String name, final OffsetDateTime from, final OffsetDateTime to) {

checkNotNull(name, "expected non-null name");

this.name = name;
this.startTime = from;

this.duration = from == null ? null : to == null ? Duration.ZERO : Duration.between(from, to);
if(this.duration != null) {
this.durationInMicroSec = duration.toNanos()/1000L;
} else {
this.durationInMicroSec = 0;
}
}

public Duration getDuration() {
return this.duration;
}

public String getName() {
return name;
}

public OffsetDateTime getStartTime() {
return startTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.data.cosmos.internal.directconnectivity;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.internal.Configs;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.UserAgentContainer;
Expand All @@ -23,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -122,14 +122,19 @@ public Mono<StoreResponse> invokeStoreAsync(final URI address, final RxDocumentS

logger.debug("RntbdTransportClient.invokeStoreAsync({}, {}): {}", address, request, record);

return Mono.fromFuture(record).doFinally(signalType -> {
logger.debug("SignalType.{} received from reactor: {\n endpoint: {},\n record: {}\n}",
signalType.name(),
endpoint,
record);
if (signalType == SignalType.CANCEL) {
record.stage(RntbdRequestRecord.Stage.CANCELLED_BY_CLIENT);
return Mono.fromFuture(record.whenComplete((response, throwable) -> {

record.stage(RntbdRequestRecord.Stage.COMPLETED);

if (request.requestContext.cosmosResponseDiagnostics == null) {
request.requestContext.cosmosResponseDiagnostics = BridgeInternal.createCosmosResponseDiagnostics();
}

if(response != null) {
logger.debug("%s", record.takeTimelineSnapshot());
}
})).doOnCancel(() -> {
logger.debug("REQUEST CANCELLED: {}", record);
});
}

Expand Down Expand Up @@ -319,6 +324,48 @@ public String toString() {

// region Types

/**
* A builder for constructing {@link Options} instances.
*
* <h3>Using system properties to set the default {@link Options} used by an {@link Builder}</h3>
* <p>
* A default options instance is created when the {@link Builder} class is initialized. This instance specifies
* the default options used by every {@link Builder} instance. In priority order the default options instance
* is created from:
* <ol>
* <li>The JSON value of system property {@code azure.cosmos.directTcp.defaultOptions}.
* <p>Example:
* <pre>{@code -Dazure.cosmos.directTcp.defaultOptions={\"maxChannelsPerEndpoint\":5,\"maxRequestsPerChannel\":30}}</pre>
* </li>
* <li>The contents of the JSON file located by system property {@code azure.cosmos.directTcp
* .defaultOptionsFile}.
* <p>Example:
* <pre>{@code -Dazure.cosmos.directTcp.defaultOptionsFile=/path/to/default/options/file}</pre>
* </li>
* <li>The contents of JSON resource file {@code azure.cosmos.directTcp.defaultOptions.json}.
* <p>Specifically, the resource file is read from this stream:
* <pre>{@code RntbdTransportClient.class.getClassLoader().getResourceAsStream("azure.cosmos.directTcp.defaultOptions.json")}</pre>
* <p>Example: <pre>{@code {
* "bufferPageSize": 8192,
* "connectionTimeout": "PT1M",
* "idleChannelTimeout": "PT0S",
* "idleEndpointTimeout": "PT1M10S",
* "maxBufferCapacity": 8388608,
* "maxChannelsPerEndpoint": 10,
* "maxRequestsPerChannel": 30,
* "receiveHangDetectionTime": "PT1M5S",
* "requestExpiryInterval": "PT5S",
* "requestTimeout": "PT1M",
* "requestTimerResolution": "PT0.5S",
* "sendHangDetectionTime": "PT10S",
* "shutdownTimeout": "PT15S"
* }}</pre>
* </li>
* </ol>
* <p>JSON value errors are logged and then ignored. If none of the above values are available or all available
* values are in error, the default options instance is created from the private parameterless constructor for
* {@link Options}.
*/
@SuppressWarnings("UnusedReturnValue")
public static class Builder {

Expand All @@ -329,16 +376,6 @@ public static class Builder {

static {

// In priority order we take default Direct TCP options from:
//
// 1. the string value of system property "azure.cosmos.directTcp.options", or
// 2. the contents of the file located by the system property "azure.cosmos.directTcp.optionsFile", or
// 3. the contents of the resource file named "azure.cosmos.directTcp.options.json"
//
// Otherwise, if none of these values are set or an error occurs we create default options based on a
// set of hard-wired values defined in the default private parameterless constructor for
// RntbdTransportClient.Options.

Options options = null;

try {
Expand Down Expand Up @@ -373,7 +410,7 @@ public static class Builder {
final ClassLoader loader = RntbdTransportClient.class.getClassLoader();
final String name = DEFAULT_OPTIONS_PROPERTY_NAME + ".json";

try (final InputStream stream = loader.getResourceAsStream(name)) {
try (InputStream stream = loader.getResourceAsStream(name)) {
if (stream != null) {
// Attempt to load default options from the JSON resource file "{propertyName}.json"
options = RntbdObjectMapper.readValue(stream, Options.class);
Expand All @@ -383,7 +420,14 @@ public static class Builder {
}
}
} finally {
DEFAULT_OPTIONS = options != null ? options : new Options();
if (options == null) {
DEFAULT_OPTIONS = new Options();
} else {
logger.info("Updated default Direct TCP options from system property {}: {}",
DEFAULT_OPTIONS_PROPERTY_NAME,
options);
DEFAULT_OPTIONS = options;
}
}
}

Expand Down Expand Up @@ -553,7 +597,9 @@ public Builder userAgent(final UserAgentContainer value) {

static final class JsonSerializer extends StdSerializer<RntbdTransportClient> {

public JsonSerializer() {
private static final long serialVersionUID = 1007663695768825670L;

JsonSerializer() {
super(RntbdTransportClient.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class RntbdClientChannelHandler extends ChannelInitializer<Channel> imple
*/
@Override
public void channelAcquired(final Channel channel) {
logger.trace("{} CHANNEL ACQUIRED", channel);
logger.debug("{} CHANNEL ACQUIRED", channel);
}

/**
Expand All @@ -56,7 +56,7 @@ public void channelAcquired(final Channel channel) {
*/
@Override
public void channelCreated(final Channel channel) {
logger.trace("{} CHANNEL CREATED", channel);
logger.debug("{} CHANNEL CREATED", channel);
this.initChannel(channel);
}

Expand All @@ -69,7 +69,7 @@ public void channelCreated(final Channel channel) {
*/
@Override
public void channelReleased(final Channel channel) {
logger.trace("{} CHANNEL RELEASED", channel);
logger.debug("{} CHANNEL RELEASED", channel);
}

/**
Expand Down
Loading

0 comments on commit 72d66ac

Please sign in to comment.