Skip to content

Commit

Permalink
Add more logging to the HTTP event listener
Browse files Browse the repository at this point in the history
  • Loading branch information
mosiac1 authored and losipiuk committed Jan 14, 2022
1 parent 670dfd5 commit 2aed7e0
Showing 1 changed file with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,27 @@ public HttpEventListener(HttpEventListenerConfig config, @ForHttpEventListener H
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
if (config.getLogCreated()) {
sendLog(queryCreatedEvent);
sendLog(queryCreatedEvent, queryCreatedEvent.getMetadata().getQueryId());
}
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
if (config.getLogCompleted()) {
sendLog(queryCompletedEvent);
sendLog(queryCompletedEvent, queryCompletedEvent.getMetadata().getQueryId());
}
}

@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
if (config.getLogSplit()) {
sendLog(splitCompletedEvent);
sendLog(splitCompletedEvent, splitCompletedEvent.getQueryId());
}
}

private <T> void sendLog(T event)
private <T> void sendLog(T event, String queryId)
{
Request request = preparePost()
.addHeaders(Multimaps.forMap(config.getHttpHeaders()))
Expand All @@ -111,10 +111,10 @@ private <T> void sendLog(T event)
.setBodyGenerator(out -> objectWriter.writeValue(out, event))
.build();

attemptToSend(request, 0, Duration.valueOf("0s"));
attemptToSend(request, 0, Duration.valueOf("0s"), queryId);
}

private void attemptToSend(Request request, int attempt, Duration delay)
private void attemptToSend(Request request, int attempt, Duration delay, String queryId)
{
this.executor.schedule(
() -> Futures.addCallback(client.executeAsync(request, createStatusResponseHandler()),
Expand All @@ -125,19 +125,45 @@ public void onSuccess(StatusResponse result)
verify(result != null);

if (result.getStatusCode() >= 500 && attempt < config.getRetryCount()) {
attemptToSend(request, attempt + 1, nextDelay(delay));
Duration nextDelay = nextDelay(delay);
int nextAttepmt = attempt + 1;

log.warn("QueryId = \"%s\", attempt = %d/%d, URL = %s | Ingest server responded with code %d, will retry after approximately %d seconds",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(),
result.getStatusCode(), nextDelay.roundTo(TimeUnit.SECONDS));

attemptToSend(request, nextAttepmt, nextDelay, queryId);
return;
}

if (!(result.getStatusCode() >= 200 && result.getStatusCode() < 300)) {
log.error("Received status code %d from ingest server URI %s; expecting status 200", result.getStatusCode(), request.getUri());
log.warn("QueryId = \"%s\", attempt = %d/%d, URL = %s | Received status code %d from ingest server; expecting status 200",
queryId, attempt + 1, config.getRetryCount(), request.getUri().toString(),
result.getStatusCode(), request.getUri().toString());
return;
}

log.debug("QueryId = \"%s\", attempt = %d/%d, URL = %s | Query event delivered successfully",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString());
}

@Override
public void onFailure(Throwable t)
{
log.error("Error sending HTTP request to ingest server with URL %s: %s", request.getUri(), t);
if (attempt < config.getRetryCount()) {
Duration nextDelay = nextDelay(delay);
int nextAttempt = attempt + 1;

log.warn(t, "QueryId = \"%s\", attempt = %d/%d, URL = %s | Sending event caused an exception, will retry after %d seconds",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString(),
nextDelay.roundTo(TimeUnit.SECONDS));

attemptToSend(request, nextAttempt, nextDelay, queryId);
return;
}

log.error(t, "QueryId = \"%s\", attempt = %d/%d, URL = %s | Error sending HTTP request",
queryId, attempt + 1, config.getRetryCount() + 1, request.getUri().toString());
}
}, executor),
(long) delay.getValue(), delay.getUnit());
Expand Down

0 comments on commit 2aed7e0

Please sign in to comment.