Skip to content

Commit

Permalink
Error handling of Servlet Async API
Browse files Browse the repository at this point in the history
  • Loading branch information
eyalkoren committed Dec 24, 2018
1 parent 863822f commit 81dff7a
Show file tree
Hide file tree
Showing 14 changed files with 336 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public abstract class AsyncInstrumentation extends ElasticApmInstrumentation {
public void init(ElasticApmTracer tracer) {
asyncHelper = HelperClassManager.ForSingleClassLoader.of(tracer,
"co.elastic.apm.agent.servlet.helper.AsyncContextAdviceHelperImpl",
"co.elastic.apm.agent.servlet.helper.AsyncContextAdviceHelperImpl$ApmAsyncListenerAllocator",
"co.elastic.apm.agent.servlet.helper.ApmAsyncListener");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,53 @@

import co.elastic.apm.agent.impl.context.Response;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.objectpool.Recyclable;
import co.elastic.apm.agent.servlet.ServletTransactionHelper;

import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicBoolean;

import static co.elastic.apm.agent.servlet.ServletTransactionHelper.TRANSACTION_ATTRIBUTE;

/**
* Based on brave.servlet.ServletRuntime$TracingAsyncListener (under Apache license 2.0)
*
* onComplete is always called, even if onError/onTimeout is called, as per the specifications.
* However, when onError/onTimeout is called, the Response that can be obtained through the event arg is not yet set with the right
* status code, for that we need to rely on onComplete. On the the other hand, the event arg that is received in onComplete does not
* contain the Throwable that comes with the event in the preceding onError, so we need to keep it.
*
* After testing on Payara, WildFly, Tomcat, WebSphere Liberty and Jetty, here is a summary of subtle differences:
* - Liberty is the only one that will invoke onError following an AsyncListener.start invocation with a Runnable that ends with Exception
* - WildFly will not resume the Response until timed-out in the same scenario, but it invokes onTimeout, which is good for our tests
* - Jetty on the same scenario will just go crazy endlessly trying to run the Runnable over and over
* - Some containers may release the response after onError/onTimeout to return to the client, meaning that onComplete is called afterwards
* - Jetty fails to invoke onError after AsyncContext.dispatch to a Servlet that ends with ServletException
*/
public class ApmAsyncListener implements AsyncListener {
private static final AtomicIntegerFieldUpdater<ApmAsyncListener> EVENT_COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ApmAsyncListener.class, "endEventCounter");
public class ApmAsyncListener implements AsyncListener, Recyclable {

private final AtomicBoolean completed = new AtomicBoolean(false);
private final AsyncContextAdviceHelperImpl asyncContextAdviceHelperImpl;
private final ServletTransactionHelper servletTransactionHelper;
private final Transaction transaction;
private volatile int endEventCounter = 0;
@Nullable
private volatile Transaction transaction;
@Nullable
private volatile Throwable throwable;

ApmAsyncListener(AsyncContextAdviceHelperImpl asyncContextAdviceHelperImpl) {
this.asyncContextAdviceHelperImpl = asyncContextAdviceHelperImpl;
this.servletTransactionHelper = asyncContextAdviceHelperImpl.getServletTransactionHelper();
}

ApmAsyncListener(ServletTransactionHelper servletTransactionHelper, Transaction transaction) {
this.servletTransactionHelper = servletTransactionHelper;
ApmAsyncListener withTransaction(Transaction transaction) {
this.transaction = transaction;
return this;
}

@Override
Expand All @@ -56,12 +77,31 @@ public void onComplete(AsyncEvent event) {

@Override
public void onTimeout(AsyncEvent event) {
endTransaction(event);
throwable = event.getThrowable();
/*
NOTE: HTTP status code may not have been set yet, so we do not call endTransaction() from here.
According to the Servlet 3 specification
(http://download.oracle.com/otn-pub/jcp/servlet-3.0-fr-eval-oth-JSpec/servlet-3_0-final-spec.pdf, section 2.3.3.3),
onComplete() should always be called by the container even in the case of timeout or error, and the final
HTTP status code should be set by then. So we'll just defer to onComplete() for finalizing the span and do
nothing here.
*/
}

@Override
public void onError(AsyncEvent event) {
endTransaction(event);
throwable = event.getThrowable();
/*
NOTE: HTTP status code may not have been set yet, so we only hold a reference to the related error that may not be
otherwise available, but not calling endTransaction() from here.
According to the Servlet 3 specification
(http://download.oracle.com/otn-pub/jcp/servlet-3.0-fr-eval-oth-JSpec/servlet-3_0-final-spec.pdf, section 2.3.3.3),
onComplete() should always be called by the container even in the case of timeout or error, and the final
HTTP status code should be set by then. So we'll just defer to onComplete() for finalizing the span and do
nothing here.
*/
}

/**
Expand All @@ -80,31 +120,46 @@ public void onStartAsync(AsyncEvent event) {
// (see class-level Javadoc)
private void endTransaction(AsyncEvent event) {
// To ensure transaction is ended only by a single event
if (EVENT_COUNTER_UPDATER.getAndIncrement(this) > 0) {
if (completed.getAndSet(true) || transaction == null) {
return;
}

HttpServletRequest request = (HttpServletRequest) event.getSuppliedRequest();
request.removeAttribute(TRANSACTION_ATTRIBUTE);
try {
HttpServletRequest request = (HttpServletRequest) event.getSuppliedRequest();
request.removeAttribute(TRANSACTION_ATTRIBUTE);

HttpServletResponse response = (HttpServletResponse) event.getSuppliedResponse();
final Response resp = transaction.getContext().getResponse();
if (transaction.isSampled() && servletTransactionHelper.isCaptureHeaders()) {
for (String headerName : response.getHeaderNames()) {
resp.addHeader(headerName, response.getHeaders(headerName));
HttpServletResponse response = (HttpServletResponse) event.getSuppliedResponse();
final Response resp = transaction.getContext().getResponse();
if (transaction.isSampled() && servletTransactionHelper.isCaptureHeaders()) {
for (String headerName : response.getHeaderNames()) {
resp.addHeader(headerName, response.getHeaders(headerName));
}
}
// request.getParameterMap() may allocate a new map, depending on the servlet container implementation
// so only call this method if necessary
final String contentTypeHeader = request.getHeader("Content-Type");
final Map<String, String[]> parameterMap;
if (transaction.isSampled() && servletTransactionHelper.captureParameters(request.getMethod(), contentTypeHeader)) {
parameterMap = request.getParameterMap();
} else {
parameterMap = null;
}
Throwable throwableToSend = event.getThrowable();
if (throwableToSend == null) {
throwableToSend = throwable;
}
servletTransactionHelper.onAfter(transaction, throwableToSend,
response.isCommitted(), response.getStatus(), request.getMethod(), parameterMap,
request.getServletPath(), request.getPathInfo(), contentTypeHeader);
} finally {
asyncContextAdviceHelperImpl.recycle(this);
}
// request.getParameterMap() may allocate a new map, depending on the servlet container implementation
// so only call this method if necessary
final String contentTypeHeader = request.getHeader("Content-Type");
final Map<String, String[]> parameterMap;
if (transaction.isSampled() && servletTransactionHelper.captureParameters(request.getMethod(), contentTypeHeader)) {
parameterMap = request.getParameterMap();
} else {
parameterMap = null;
}
servletTransactionHelper.onAfter(transaction, event.getThrowable(),
response.isCommitted(), response.getStatus(), request.getMethod(), parameterMap,
request.getServletPath(), request.getPathInfo(), contentTypeHeader);
}

@Override
public void resetState() {
transaction = null;
throwable = null;
completed.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,50 @@

import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.report.ReporterConfiguration;
import co.elastic.apm.agent.servlet.AsyncInstrumentation;
import co.elastic.apm.agent.servlet.ServletApiAdvice;
import co.elastic.apm.agent.servlet.ServletTransactionHelper;
import org.jctools.queues.atomic.AtomicQueueFactory;

import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;

import static co.elastic.apm.agent.servlet.ServletTransactionHelper.ASYNC_ATTRIBUTE;
import static co.elastic.apm.agent.servlet.ServletTransactionHelper.TRANSACTION_ATTRIBUTE;
import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

public class AsyncContextAdviceHelperImpl implements AsyncInstrumentation.AsyncContextAdviceHelper<AsyncContext> {

private static final String ASYNC_LISTENER_ADDED = ServletApiAdvice.class.getName() + ".asyncListenerAdded";

private final ObjectPool<ApmAsyncListener> asyncListenerObjectPool;
private final ServletTransactionHelper servletTransactionHelper;
private final ElasticApmTracer tracer;

public AsyncContextAdviceHelperImpl(ElasticApmTracer tracer) {
this.tracer = tracer;
servletTransactionHelper = new ServletTransactionHelper(tracer);

int maxPooledElements = tracer.getConfigurationRegistry().getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2;
asyncListenerObjectPool = QueueBasedObjectPool.ofRecyclable(
AtomicQueueFactory.<ApmAsyncListener>newQueue(createBoundedMpmc(maxPooledElements)),
false,
new ApmAsyncListenerAllocator());
}

ServletTransactionHelper getServletTransactionHelper() {
return servletTransactionHelper;
}

private final class ApmAsyncListenerAllocator implements Allocator<ApmAsyncListener> {
@Override
public ApmAsyncListener createInstance() {
return new ApmAsyncListener(AsyncContextAdviceHelperImpl.this);
}
}

@Override
Expand All @@ -57,11 +81,15 @@ public void onExitStartAsync(AsyncContext asyncContext) {
// specifying the request and response is important
// otherwise AsyncEvent.getSuppliedRequest returns null per spec
// however, only some application server like WebSphere actually implement it that way
asyncContext.addListener(new ApmAsyncListener(servletTransactionHelper, transaction),
asyncContext.addListener(asyncListenerObjectPool.createInstance().withTransaction(transaction),
asyncContext.getRequest(), asyncContext.getResponse());

request.setAttribute(ASYNC_ATTRIBUTE, Boolean.TRUE);
request.setAttribute(TRANSACTION_ATTRIBUTE, transaction);
}
}

void recycle(ApmAsyncListener apmAsyncListener) {
asyncListenerObjectPool.recycle(apmAsyncListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* App CL (Servlet API) uses AsyncContextAdviceHelperImpl from Helper CL
* / \
* v v
* WebApp CL (user code/libs) Helper CL implements AsyncContextAdviceHelperImpl
* WebApp CL (user code/libs) Helper CL loads AsyncContextAdviceHelperImpl and ApmAsyncListener
* </pre>
*/
@NonnullApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void testAsyncDispatchTwice() throws Exception {

@Test
void testAsyncTimeout() throws Exception {
assertHasOneTransaction("/async-timeout", body -> true, 200);
assertHasOneTransaction("/async-timeout", body -> true, 500);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,60 @@ public final void stopServer() {
public void testTransactionReporting() throws Exception {
for (String pathToTest : getPathsToTest()) {
mockServerContainer.getClient().clear(HttpRequest.request(), ClearType.LOG);
executeRequest(pathToTest, "Hello World", 200);
String transactionId = assertTransactionReported(pathToTest, 200);
assertSpansTransactionId(500, this::getReportedSpans, transactionId);
validateMetadata();
}
}

@Test
public void testSpanErrorReporting() throws Exception {
for (String pathToTest : getPathsToTest()) {
mockServerContainer.getClient().clear(HttpRequest.request(), ClearType.LOG);
executeRequest(pathToTest + "?cause_db_error=true", "DB Error", 200);
String transactionId = assertTransactionReported(pathToTest, 200);
assertSpansTransactionId(500, this::getReportedSpans, transactionId);
assertErrorContent(500, this::getReportedErrors, transactionId, "Column \"NON_EXISTING_COLUMN\" not found");
}
}

final Response response = httpClient.newCall(new Request.Builder()
.get()
.url(getBaseUrl() + pathToTest)
.build())
.execute();
assertThat(response.code()).withFailMessage(response.toString() + getServerLogs()).isEqualTo(200);
final ResponseBody responseBody = response.body();
assertThat(responseBody).isNotNull();
assertThat(responseBody.string()).contains("Hello World");

final List<JsonNode> reportedTransactions = assertContainsOneEntryReported(500, this::getReportedTransactions);
JsonNode transaction = reportedTransactions.iterator().next();
assertThat(transaction.get("context").get("request").get("url").get("pathname").textValue()).isEqualTo(contextPath + pathToTest);
String transactionId = reportedTransactions.iterator().next().get("id").textValue();
// TODO make that less hacky
if (!pathToTest.equals("/index.jsp")) {
assertSpansTransactionId(500, this::getReportedSpans, transactionId);
validateMetadata();
@Test
public void testTransactionErrorReporting() throws Exception {
for (String pathToTest : getPathsToTestErrors()) {
mockServerContainer.getClient().clear(HttpRequest.request(), ClearType.LOG);
executeRequest(pathToTest + "?cause_transaction_error=true", "", 500);
String transactionId = assertTransactionReported(pathToTest, 500);
assertSpansTransactionId(500, this::getReportedSpans, transactionId);
// we currently only report errors when Exceptions are caught, still this test is relevant for response code capturing
if (isExpectedStacktrace(pathToTest)) {
assertErrorContent(500, this::getReportedErrors, transactionId, "Transaction failure");
}
}
}

public String assertTransactionReported(String pathToTest, int expectedResponseCode) throws IOException {
final List<JsonNode> reportedTransactions = assertContainsOneEntryReported(500, this::getReportedTransactions);
JsonNode transaction = reportedTransactions.iterator().next();
assertThat(transaction.get("context").get("request").get("url").get("pathname").textValue()).isEqualTo(contextPath + pathToTest);
assertThat(transaction.get("context").get("response").get("status_code").intValue()).isEqualTo(expectedResponseCode);
return transaction.get("id").textValue();
}

public void executeRequest(String pathToTest, String expectedContent, int expectedResponseCode) throws IOException, InterruptedException {
Response response = httpClient.newCall(new Request.Builder()
.get()
.url(getBaseUrl() + pathToTest)
.build())
.execute();
assertThat(response.code()).withFailMessage(response.toString() + getServerLogs()).isEqualTo(expectedResponseCode);
final ResponseBody responseBody = response.body();
assertThat(responseBody).isNotNull();
assertThat(responseBody.string()).contains(expectedContent);
}

@Nonnull
private List<JsonNode> assertContainsOneEntryReported(int timeoutMs, Supplier<List<JsonNode>> supplier) throws IOException {
private List<JsonNode> assertContainsOneEntryReported(int timeoutMs, Supplier<List<JsonNode>> supplier) {
long start = System.currentTimeMillis();
List<JsonNode> reportedTransactions;
do {
Expand All @@ -214,7 +243,7 @@ private List<JsonNode> assertContainsOneEntryReported(int timeoutMs, Supplier<Li
}

@Nonnull
private void assertSpansTransactionId(int timeoutMs, Supplier<List<JsonNode>> supplier, String transactionId) {
private List<JsonNode> assertSpansTransactionId(int timeoutMs, Supplier<List<JsonNode>> supplier, String transactionId) {
long start = System.currentTimeMillis();
List<JsonNode> reportedSpans;
do {
Expand All @@ -224,6 +253,23 @@ private void assertSpansTransactionId(int timeoutMs, Supplier<List<JsonNode>> su
for (JsonNode span : reportedSpans) {
assertThat(span.get("transaction_id").textValue()).isEqualTo(transactionId);
}
return reportedSpans;
}

@Nonnull
private List<JsonNode> assertErrorContent(int timeoutMs, Supplier<List<JsonNode>> supplier, String transactionId, String errorMessage) {
long start = System.currentTimeMillis();
List<JsonNode> reportedErrors;
do {
reportedErrors = supplier.get();
} while (reportedErrors.size() == 0 && System.currentTimeMillis() - start < timeoutMs);
assertThat(reportedErrors.size()).isEqualTo(1);
for (JsonNode error : reportedErrors) {
assertThat(error.get("transaction_id").textValue()).isEqualTo(transactionId);
assertThat(error.get("exception").get("message").textValue()).contains(errorMessage);
assertThat(error.get("exception").get("stacktrace").size()).isGreaterThanOrEqualTo(1);
}
return reportedErrors;
}

@Nonnull
Expand All @@ -247,6 +293,15 @@ protected List<String> getPathsToTest() {
return Arrays.asList("/index.jsp", "/servlet", "/async-dispatch-servlet", "/async-start-servlet");
}

@NotNull
protected List<String> getPathsToTestErrors() {
return Arrays.asList("/index.jsp", "/servlet", "/async-dispatch-servlet", "/async-start-servlet");
}

protected boolean isExpectedStacktrace(String path) {
return !path.equals("/async-start-servlet");
}

private String getBaseUrl() {
return "http://" + servletContainer.getContainerIpAddress() + ":" + servletContainer.getMappedPort(webPort) + contextPath;
}
Expand All @@ -263,6 +318,12 @@ private List<JsonNode> getReportedSpans() {
return transactions;
}

private List<JsonNode> getReportedErrors() {
final List<JsonNode> transactions = getEvents("error");
transactions.forEach(mockReporter::verifyErrorSchema);
return transactions;
}

private List<JsonNode> getEvents(String eventType) {
try {
final List<JsonNode> transactions = new ArrayList<>();
Expand Down
Loading

0 comments on commit 81dff7a

Please sign in to comment.