Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace calls to X-Ray daemon for rules / targets with a simple JDK-b… #145

Merged
merged 9 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions aws-xray-recorder-sdk-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<scope>provided</scope>
</dependency>
<!-- Test -->
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.amazonaws.xray.internal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.nio.charset.StandardCharsets;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.AmazonWebServiceResult;
import com.amazonaws.services.xray.model.GetSamplingRulesRequest;
import com.amazonaws.services.xray.model.GetSamplingRulesResult;
import com.amazonaws.services.xray.model.GetSamplingTargetsRequest;
import com.amazonaws.services.xray.model.GetSamplingTargetsResult;
import com.amazonaws.xray.config.DaemonConfiguration;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;

/**
* A simple client for sending API requests via the X-Ray daemon. Requests do not have to be
* signed, so we can avoid having a strict dependency on the full AWS SDK in instrumentation. This
* is an internal utility and not meant to represent the entire X-Ray API nor be particularly
* efficient as we only use it in long poll loops.
*/
public class UnsignedXrayClient {

// Visible for testing
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.setSerializationInclusion(Include.NON_EMPTY)
.setPropertyNamingStrategy(PropertyNamingStrategy.UPPER_CAMEL_CASE)
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
public boolean hasIgnoreMarker(AnnotatedMember m) {
// This is a somewhat hacky way of having ObjectMapper only serialize the fields in our
// model classes instead of the base class that comes from the SDK. In the future, we will
// remove the SDK dependency itself and the base classes and this hack will go away.
if (m.getDeclaringClass() == AmazonWebServiceRequest.class ||
m.getDeclaringClass() == AmazonWebServiceResult.class) {
return true;
}
return super.hasIgnoreMarker(m);
}
});
private static final int TIME_OUT_MILLIS = 2000;

private final URL getSamplingRulesEndpoint;
private final URL getSamplingTargetsEndpoint;

public UnsignedXrayClient() {
this(new DaemonConfiguration().getEndpointForTCPConnection());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside the scope of this PR, but we should add the DaemonConfiguration to the global recorder rather than constructing one here so that customers can programmatically set custom daemon addresses, e.g. with AWSXRay.setDaemonAddress()

}

// Visible for testing
UnsignedXrayClient(String endpoint) {
try {
getSamplingRulesEndpoint = new URL(endpoint + "/GetSamplingRules");
getSamplingTargetsEndpoint = new URL(endpoint + "/GetSamplingTargets");
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid URL: " + endpoint, e);
}
}

public GetSamplingRulesResult getSamplingRules(GetSamplingRulesRequest request) {
return sendRequest(getSamplingRulesEndpoint, request, GetSamplingRulesResult.class);
}

public GetSamplingTargetsResult getSamplingTargets(GetSamplingTargetsRequest request) {
return sendRequest(getSamplingTargetsEndpoint, request, GetSamplingTargetsResult.class);
}

private <T> T sendRequest(URL endpoint, Object request, Class<T> responseClass) {
final HttpURLConnection connection;
try {
connection = (HttpURLConnection) endpoint.openConnection();
} catch (IOException e) {
throw new UncheckedIOException("Could not connect to endpoint " + endpoint, e);
anuraaga marked this conversation as resolved.
Show resolved Hide resolved
}

connection.setConnectTimeout(TIME_OUT_MILLIS);
connection.setReadTimeout(TIME_OUT_MILLIS);

try {
connection.setRequestMethod("POST");
} catch (ProtocolException e) {
throw new IllegalStateException("Invalid protocol, can't happen.");
}

connection.addRequestProperty("Content-Type", "application/json");
connection.setDoOutput(true);

try (OutputStream outputStream = connection.getOutputStream()) {
OBJECT_MAPPER.writeValue(outputStream, request);
} catch (IOException e) {
throw new UncheckedIOException("Could not serialize and send request.", e);
}

final int responseCode;
try {
responseCode = connection.getResponseCode();
} catch (IOException e) {
throw new UncheckedIOException("Could not read response code.", e);
}

if (responseCode != 200) {
throw new IllegalStateException("Error response from X-Ray: " +
readResponseString(connection));
}

try {
return OBJECT_MAPPER.readValue(connection.getInputStream(), responseClass);
} catch (IOException e) {
throw new UncheckedIOException("Error reading response.", e);
}
}

private static String readResponseString(HttpURLConnection connection) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
try (InputStream is = connection.getInputStream()) {
readTo(is, os);
} catch (IOException e) {
// Only best effort read if we can.
}
try (InputStream is = connection.getErrorStream()) {
readTo(is, os);
} catch (IOException e) {
// Only best effort read if we can.
}
try {
return os.toString(StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("UTF-8 not supported can't happen.");
}
}

private static void readTo(InputStream is, ByteArrayOutputStream os) throws IOException {
int b;
while ((b = is.read()) != -1) {
os.write(b);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,53 +1,53 @@
package com.amazonaws.xray.strategy.sampling;

import com.amazonaws.services.xray.AWSXRay;
import com.amazonaws.xray.AWSXRayRecorderBuilder;
import java.net.URL;
import java.security.SecureRandom;
import java.time.Clock;
import java.time.Instant;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.xray.internal.UnsignedXrayClient;
import com.amazonaws.xray.strategy.sampling.manifest.CentralizedManifest;
import com.amazonaws.xray.strategy.sampling.pollers.RulePoller;
import com.amazonaws.xray.strategy.sampling.pollers.TargetPoller;
import com.amazonaws.xray.strategy.sampling.rule.CentralizedRule;
import com.amazonaws.xray.utils.ByteUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.net.URL;
import java.security.SecureRandom;
import java.time.Clock;
import java.time.Instant;

public class CentralizedSamplingStrategy implements SamplingStrategy {
private static Log logger = LogFactory.getLog(TargetPoller.class);
private static final Log logger = LogFactory.getLog(TargetPoller.class);
// Initialize random ClientID. We use the same ClientID for all GetSamplingTargets calls. Conflicts are avoided
// because IDs are scoped to a single account.
private static String clientID;
private static final String clientID;
static {
SecureRandom rand = new SecureRandom();
byte[] bytes = new byte[12];
rand.nextBytes(bytes);
clientID = ByteUtils.byteArrayToHexString(bytes);
}

private final CentralizedManifest manifest;
private final LocalizedSamplingStrategy fallback;
private final RulePoller rulePoller;
private final TargetPoller targetPoller;

private boolean isStarted = false;
private CentralizedManifest manifest;
private LocalizedSamplingStrategy fallback;
private RulePoller rulePoller;
private TargetPoller targetPoller;
private AWSXRay client;

public CentralizedSamplingStrategy() {
this.manifest = new CentralizedManifest();
this.fallback = new LocalizedSamplingStrategy();
this.client = XRayClient.newClient();
this.rulePoller = new RulePoller(manifest, client, Clock.systemUTC());
this.targetPoller = new TargetPoller(manifest, client, Clock.systemUTC());
UnsignedXrayClient client = new UnsignedXrayClient();
this.rulePoller = new RulePoller(client, manifest, Clock.systemUTC());
this.targetPoller = new TargetPoller(client, manifest, Clock.systemUTC());
}

public CentralizedSamplingStrategy(URL ruleLocation) {
this.manifest = new CentralizedManifest();
this.fallback = new LocalizedSamplingStrategy(ruleLocation);
this.client = XRayClient.newClient();
this.rulePoller = new RulePoller(manifest, client, Clock.systemUTC());
this.targetPoller = new TargetPoller(manifest, client, Clock.systemUTC());
UnsignedXrayClient client = new UnsignedXrayClient();
this.rulePoller = new RulePoller(client, manifest, Clock.systemUTC());
this.targetPoller = new TargetPoller(client, manifest, Clock.systemUTC());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
import com.amazonaws.services.xray.AWSXRay;
import com.amazonaws.xray.config.DaemonConfiguration;

/**
* @deprecated aws-xray-recorder only supports communicating with the X-Ray daemon, which does not
* require the usual AWS API signatures so we have stopped using the SDK X-Ray client.
*/
@Deprecated
public final class XRayClient {

private static final AWSCredentialsProvider ANONYMOUS_CREDENTIALS = new AWSStaticCredentialsProvider(
Expand All @@ -17,6 +22,12 @@ public final class XRayClient {
private static final int TIME_OUT = 2000; // Milliseconds
private XRayClient() {}

/**
*
* @deprecated aws-xray-recorder only supports communicating with the X-Ray daemon, which does
* not require the usual AWS API signatures so we have stopped using the SDK X-Ray client.
*/
@Deprecated
public static AWSXRay newClient() {
DaemonConfiguration config = new DaemonConfiguration();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,49 @@
package com.amazonaws.xray.strategy.sampling.pollers;

import java.time.Clock;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.services.xray.AWSXRay;
import com.amazonaws.services.xray.model.GetSamplingRulesRequest;
import com.amazonaws.services.xray.model.GetSamplingRulesResult;
import com.amazonaws.services.xray.model.SamplingRule;
import com.amazonaws.services.xray.AWSXRay;
import com.amazonaws.services.xray.model.SamplingRuleRecord;
import com.amazonaws.xray.internal.UnsignedXrayClient;
import com.amazonaws.xray.strategy.sampling.manifest.CentralizedManifest;
import com.amazonaws.xray.strategy.sampling.rand.Rand;
import com.amazonaws.xray.strategy.sampling.rand.RandImpl;
import com.amazonaws.xray.strategy.sampling.rule.CentralizedRule;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.time.Clock;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class RulePoller {
private static Log logger = LogFactory.getLog(RulePoller.class);
private static final Log logger = LogFactory.getLog(RulePoller.class);

private static final long PERIOD = 300; // Seconds
private static final long MAX_JITTER = 5; // Seconds

private final UnsignedXrayClient client;
private final CentralizedManifest manifest;
private final Clock clock;
private final ScheduledExecutorService executor;

private AWSXRay client;
private Clock clock;
private CentralizedManifest manifest;
private ScheduledExecutorService executor;
/**
* @deprecated Use {@link #RulePoller(UnsignedXrayClient, CentralizedManifest, Clock)}.
*/
@Deprecated
public RulePoller(CentralizedManifest manifest, AWSXRay unused, Clock clock) {
this(new UnsignedXrayClient(), manifest, clock);
}

public RulePoller(CentralizedManifest manifest, AWSXRay client, Clock clock) {
this.manifest = manifest;
public RulePoller(UnsignedXrayClient client, CentralizedManifest manifest, Clock clock) {
this.client = client;
this.manifest = manifest;
this.clock = clock;
executor = Executors.newSingleThreadScheduledExecutor();
}
Expand All @@ -50,13 +58,18 @@ public void start() {
// The executor will die and not abrupt main thread.
if(t instanceof Error) { throw t; }
}
}, 0, getJitterInterval(), TimeUnit.SECONDS);
}, 0, getIntervalWithJitter(), TimeUnit.SECONDS);
}

public void shutdown() {
executor.shutdownNow();
}

// Visible for testing
ScheduledExecutorService getExecutor() {
return executor;
}

private void pollRule() {
Instant now = clock.instant();

Expand All @@ -72,9 +85,8 @@ private void pollRule() {
manifest.putRules(rules, now);
}

private long getJitterInterval() {
private long getIntervalWithJitter() {
Rand random = new RandImpl();
long interval = Math.round(random.next() * MAX_JITTER) + PERIOD;
return interval;
return Math.round(random.next() * MAX_JITTER) + PERIOD;
}
}
Loading