diff --git a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java index 5f9ef30ef294..374ea9f45d1a 100644 --- a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java +++ b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; import io.airlift.configuration.validation.FileExists; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import java.io.File; @@ -26,6 +27,7 @@ public class EventListenerConfig { private List eventListenerFiles = ImmutableList.of(); + private int maxConcurrentQueryCompletedEvents = 100; @NotNull public List<@FileExists File> getEventListenerFiles() @@ -41,4 +43,17 @@ public EventListenerConfig setEventListenerFiles(List eventListenerFiles .collect(toImmutableList()); return this; } + + @Min(1) + public int getMaxConcurrentQueryCompletedEvents() + { + return maxConcurrentQueryCompletedEvents; + } + + @Config("event-listener.max-concurrent-query-completed-events") + public EventListenerConfig setMaxConcurrentQueryCompletedEvents(int maxConcurrentQueryCompletedEvents) + { + this.maxConcurrentQueryCompletedEvents = maxConcurrentQueryCompletedEvents; + return this; + } } diff --git a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java index a8bc8c551ea2..88ec185b2663 100644 --- a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java +++ b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.airlift.log.Logger; +import io.airlift.stats.CounterStat; import io.airlift.stats.TimeStat; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.eventlistener.EventListener; @@ -38,6 +39,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkArgument; @@ -55,10 +57,13 @@ public class EventListenerManager private static final File CONFIG_FILE = new File("etc/event-listener.properties"); private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name"; private final List configFiles; + private final int maxConcurrentQueryCompletedEvents; private final Map eventListenerFactories = new ConcurrentHashMap<>(); private final List providedEventListeners = Collections.synchronizedList(new ArrayList<>()); private final AtomicReference> configuredEventListeners = new AtomicReference<>(ImmutableList.of()); private final AtomicBoolean loading = new AtomicBoolean(false); + private final AtomicInteger concurrentQueryCompletedEvents = new AtomicInteger(); + private final CounterStat skippedQueryCompletedEvents = new CounterStat(); private final TimeStat queryCreatedTime = new TimeStat(MILLISECONDS); private final TimeStat queryCompletedTime = new TimeStat(MILLISECONDS); @@ -68,6 +73,7 @@ public class EventListenerManager public EventListenerManager(EventListenerConfig config) { this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles()); + this.maxConcurrentQueryCompletedEvents = config.getMaxConcurrentQueryCompletedEvents(); } public void addEventListenerFactory(EventListenerFactory eventListenerFactory) @@ -144,7 +150,13 @@ private static Map loadEventListenerProperties(File configFile) public void queryCompleted(Function queryCompletedEventProvider) { try (TimeStat.BlockTimer _ = queryCompletedTime.time()) { + if (concurrentQueryCompletedEvents.incrementAndGet() > maxConcurrentQueryCompletedEvents) { + concurrentQueryCompletedEvents.decrementAndGet(); + skippedQueryCompletedEvents.update(1); + return; + } doQueryCompleted(queryCompletedEventProvider); + concurrentQueryCompletedEvents.decrementAndGet(); } } @@ -220,6 +232,19 @@ public TimeStat getSplitCompletedTime() return splitCompletedTime; } + @Managed + public int getConcurrentQueryCompletedEvents() + { + return concurrentQueryCompletedEvents.get(); + } + + @Managed + @Nested + public CounterStat getSkippedQueryCompletedEvents() + { + return skippedQueryCompletedEvents; + } + @PreDestroy public void shutdown() { diff --git a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java index 20ccadc9df1a..32180cdcf0c3 100644 --- a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java +++ b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java @@ -32,6 +32,7 @@ public class TestEventListenerConfig public void testDefaults() { assertRecordedDefaults(ConfigAssertions.recordDefaults(EventListenerConfig.class) + .setMaxConcurrentQueryCompletedEvents(100) .setEventListenerFiles(ImmutableList.of())); } @@ -42,10 +43,13 @@ public void testExplicitPropertyMappings() Path config1 = Files.createTempFile(null, null); Path config2 = Files.createTempFile(null, null); - Map properties = ImmutableMap.of("event-listener.config-files", config1.toString() + "," + config2.toString()); + Map properties = ImmutableMap.of( + "event-listener.config-files", config1.toString() + "," + config2.toString(), + "event-listener.max-concurrent-query-completed-events", "1"); EventListenerConfig expected = new EventListenerConfig() - .setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath())); + .setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath())) + .setMaxConcurrentQueryCompletedEvents(1); assertFullMapping(properties, expected); } diff --git a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java index d31353539930..f3fbcd1f0006 100644 --- a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java +++ b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java @@ -14,14 +14,139 @@ package io.trino.eventlistener; import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.QueryCompletedEvent; +import io.trino.spi.eventlistener.QueryContext; +import io.trino.spi.eventlistener.QueryIOMetadata; +import io.trino.spi.eventlistener.QueryMetadata; +import io.trino.spi.eventlistener.QueryStatistics; +import io.trino.spi.session.ResourceEstimates; import org.junit.jupiter.api.Test; +import java.net.URI; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static java.time.Duration.ofMillis; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.assertj.core.api.Assertions.assertThat; class TestEventListenerManager { + private static final QueryMetadata QUERY_METADATA = new QueryMetadata( + "minimal_query", + Optional.empty(), + "query", + Optional.empty(), + Optional.empty(), + "queryState", + // not stored + List.of(), + // not stored + List.of(), + URI.create("http://localhost"), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + private static final QueryStatistics QUERY_STATISTICS = new QueryStatistics( + ofMillis(101), + ofMillis(102), + ofMillis(103), + ofMillis(104), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + 115L, + 116L, + 117L, + 118L, + 119L, + 1191L, + 1192L, + 120L, + 121L, + 122L, + 123L, + 124L, + 125L, + 126L, + 127L, + 1271L, + 128.0, + 129.0, + // not stored + Collections.emptyList(), + 130, + false, + // not stored + Collections.emptyList(), + // not stored + Collections.emptyList(), + // not stored + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + // not stored + Optional.empty()); + + private static final QueryContext QUERY_CONTEXT = new QueryContext( + "user", + "originalUser", + Optional.empty(), + Set.of(), + Set.of(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Set.of(), + // not stored + Set.of(), + Optional.empty(), + UTC_KEY.getId(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Map.of(), + // not stored + new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.empty()), + "serverAddress", + "serverVersion", + "environment", + Optional.empty(), + "NONE"); + + private static final QueryIOMetadata QUERY_IO_METADATA = new QueryIOMetadata(List.of(), Optional.empty()); + + private static final QueryCompletedEvent QUERY_COMPLETED_EVENT = new QueryCompletedEvent( + QUERY_METADATA, + QUERY_STATISTICS, + QUERY_CONTEXT, + QUERY_IO_METADATA, + Optional.empty(), + List.of(), + Instant.now(), + Instant.now(), + Instant.now()); + @Test public void testShutdownIsForwardedToListeners() { @@ -42,4 +167,47 @@ public void shutdown() assertThat(wasCalled.get()).isTrue(); } + + @Test + public void testMaxConcurrentQueryCompletedEvents() + throws InterruptedException + { + EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig().setMaxConcurrentQueryCompletedEvents(1)); + eventListenerManager.addEventListener(new BlockingEventListener()); + eventListenerManager.loadEventListeners(); + ExecutorService executor = newFixedThreadPool(2); + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + Runnable queryCompletedEvent = () -> { + eventListenerManager.queryCompleted(_ -> QUERY_COMPLETED_EVENT); + countDownLatch.countDown(); + }; + executor.submit(queryCompletedEvent); + executor.submit(queryCompletedEvent); + + countDownLatch.await(); + assertThat(eventListenerManager.getSkippedQueryCompletedEvents().getTotalCount()).isEqualTo(1); + assertThat(eventListenerManager.getConcurrentQueryCompletedEvents()).isEqualTo(1); + } + finally { + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + } + } + + private static final class BlockingEventListener + implements EventListener + { + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + try { + // sleep forever + Thread.sleep(100_000); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } } diff --git a/docs/src/main/sphinx/develop/event-listener.md b/docs/src/main/sphinx/develop/event-listener.md index 3c81931b34de..8d365b605704 100644 --- a/docs/src/main/sphinx/develop/event-listener.md +++ b/docs/src/main/sphinx/develop/event-listener.md @@ -45,6 +45,10 @@ custom-property1=custom-value1 custom-property2=custom-value2 ``` +Maximum number of concurrent query completed events +can be configured using `event-listener.max-concurrent-query-completed-events` property +(`100` by default). Excessive events are dropped. + (multiple-listeners)= ## Multiple event listeners