Skip to content

Commit

Permalink
Use airlift json codec in HTTP event listener
Browse files Browse the repository at this point in the history
Fix issue airlift/airlift#983
Update server disconnect test to use min threads to test for this issue
Cleanup tests
  • Loading branch information
mosiac1 authored and losipiuk committed Feb 23, 2022
1 parent 1d74483 commit 1454d43
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 92 deletions.
21 changes: 11 additions & 10 deletions plugin/trino-http-event-listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
Expand All @@ -63,16 +68,6 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -118,6 +113,12 @@
</dependency>

<!-- for testing -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@
*/
package io.trino.plugin.httpquery;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
import io.airlift.http.client.BodyGenerator;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.Request;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.spi.eventlistener.EventListener;
Expand All @@ -39,6 +37,7 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static com.google.common.net.MediaType.JSON_UTF_8;
import static io.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
import static io.airlift.http.client.Request.Builder.preparePost;
import static io.airlift.http.client.StatusResponseHandler.StatusResponse;
import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
Expand All @@ -56,7 +55,9 @@ public class HttpEventListener

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private final ObjectWriter objectWriter = new ObjectMapper().registerModule(new Jdk8Module()).registerModule(new JavaTimeModule()).writer();
private final JsonCodec<QueryCompletedEvent> queryCompletedEventJsonCodec;
private final JsonCodec<QueryCreatedEvent> queryCreatedEventJsonCodec;
private final JsonCodec<SplitCompletedEvent> splitCompletedEventJsonCodec;

private final HttpClient client;

Expand All @@ -65,11 +66,20 @@ public class HttpEventListener
private final URI ingestUri;

@Inject
public HttpEventListener(HttpEventListenerConfig config, @ForHttpEventListener HttpClient httpClient)
public HttpEventListener(
JsonCodec<QueryCompletedEvent> queryCompletedEventJsonCodec,
JsonCodec<QueryCreatedEvent> queryCreatedEventJsonCodec,
JsonCodec<SplitCompletedEvent> splitCompletedEventJsonCodec,
HttpEventListenerConfig config,
@ForHttpEventListener HttpClient httpClient)
{
this.config = requireNonNull(config, "http event listener config is null");
this.client = requireNonNull(httpClient, "http event listener http client is null");

this.queryCompletedEventJsonCodec = requireNonNull(queryCompletedEventJsonCodec, "queryCompletedEventJsonCodec is null");
this.queryCreatedEventJsonCodec = requireNonNull(queryCreatedEventJsonCodec, "queryCreatedEventJsonCodec is null");
this.splitCompletedEventJsonCodec = requireNonNull(splitCompletedEventJsonCodec, "splitCompletedEventJsonCodec is null");

try {
ingestUri = new URI(this.config.getIngestUri());
}
Expand All @@ -82,33 +92,33 @@ public HttpEventListener(HttpEventListenerConfig config, @ForHttpEventListener H
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
if (config.getLogCreated()) {
sendLog(queryCreatedEvent, queryCreatedEvent.getMetadata().getQueryId());
sendLog(jsonBodyGenerator(queryCreatedEventJsonCodec, queryCreatedEvent), queryCreatedEvent.getMetadata().getQueryId());
}
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
if (config.getLogCompleted()) {
sendLog(queryCompletedEvent, queryCompletedEvent.getMetadata().getQueryId());
sendLog(jsonBodyGenerator(queryCompletedEventJsonCodec, queryCompletedEvent), queryCompletedEvent.getMetadata().getQueryId());
}
}

@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
if (config.getLogSplit()) {
sendLog(splitCompletedEvent, splitCompletedEvent.getQueryId());
sendLog(jsonBodyGenerator(splitCompletedEventJsonCodec, splitCompletedEvent), splitCompletedEvent.getQueryId());
}
}

private <T> void sendLog(T event, String queryId)
private void sendLog(BodyGenerator eventBodyGenerator, String queryId)
{
Request request = preparePost()
.addHeaders(Multimaps.forMap(config.getHttpHeaders()))
.addHeader(CONTENT_TYPE, JSON_UTF_8.toString())
.setUri(ingestUri)
.setBodyGenerator(out -> objectWriter.writeValue(out, event))
.setBodyGenerator(eventBodyGenerator)
.build();

attemptToSend(request, 0, Duration.valueOf("0s"), queryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
import com.google.inject.Injector;
import com.google.inject.Scopes;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.EventListenerFactory;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.SplitCompletedEvent;

import java.util.Map;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.http.client.HttpClientBinder.httpClientBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;

public class HttpEventListenerFactory
implements EventListenerFactory
Expand All @@ -36,7 +41,11 @@ public String getName()
public EventListener create(Map<String, String> config)
{
Bootstrap app = new Bootstrap(
new JsonModule(),
binder -> {
jsonCodecBinder(binder).bindJsonCodec(QueryCompletedEvent.class);
jsonCodecBinder(binder).bindJsonCodec(QueryCreatedEvent.class);
jsonCodecBinder(binder).bindJsonCodec(SplitCompletedEvent.class);
configBinder(binder).bindConfig(HttpEventListenerConfig.class);
httpClientBinder(binder).bindHttpClient("http-event-listener", ForHttpEventListener.class);
binder.bind(HttpEventListener.class).in(Scopes.SINGLETON);
Expand Down
Loading

0 comments on commit 1454d43

Please sign in to comment.