From cb716d967f008722ab7517efe28c1f1b12d68355 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Mon, 27 Feb 2023 19:31:15 -0500 Subject: [PATCH] Store operator summaries in MysqlEventListener --- plugin/trino-mysql-event-listener/pom.xml | 5 ++++ .../mysql/MysqlEventListener.java | 23 ++++++++++++++++++- .../plugin/eventlistener/mysql/QueryDao.java | 9 +++++--- .../eventlistener/mysql/QueryEntity.java | 10 +++++++- .../mysql/TestMysqlEventListener.java | 5 ++-- 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/plugin/trino-mysql-event-listener/pom.xml b/plugin/trino-mysql-event-listener/pom.xml index 26334aa5f029..483d914cdcd8 100644 --- a/plugin/trino-mysql-event-listener/pom.xml +++ b/plugin/trino-mysql-event-listener/pom.xml @@ -33,6 +33,11 @@ json + + io.airlift + log + + com.google.guava guava diff --git a/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/MysqlEventListener.java b/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/MysqlEventListener.java index 93158eaa7d59..bb21ce07c97c 100644 --- a/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/MysqlEventListener.java +++ b/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/MysqlEventListener.java @@ -13,8 +13,10 @@ */ package io.trino.plugin.eventlistener.mysql; +import com.google.common.base.Joiner; import com.google.inject.Inject; import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; import io.trino.spi.ErrorCode; import io.trino.spi.ErrorType; import io.trino.spi.TrinoWarning; @@ -47,6 +49,10 @@ public class MysqlEventListener implements EventListener { + private static final Logger log = Logger.get(MysqlEventListener.class); + + private static final long MAX_OPERATOR_SUMMARIES_JSON_LENGTH = 16 * 1024 * 1024; + private final QueryDao dao; private final JsonCodec> clientTagsJsonCodec; private final JsonCodec> sessionPropertiesJsonCodec; @@ -152,10 +158,25 @@ public void queryCompleted(QueryCompletedEvent event) stats.getCumulativeMemory(), stats.getFailedCumulativeMemory(), stats.getCompletedSplits(), - context.getRetryPolicy()); + context.getRetryPolicy(), + createOperatorSummariesJson(metadata.getQueryId(), stats.getOperatorSummaries())); dao.store(entity); } + private Optional createOperatorSummariesJson(String queryId, List summaries) + { + StringBuilder builder = new StringBuilder(); + builder.append("["); + Joiner.on(",").appendTo(builder, summaries); + builder.append("]"); + String result = builder.toString(); + if (result.length() > MAX_OPERATOR_SUMMARIES_JSON_LENGTH) { + log.info("Exceeded maximum operator summaries length for query %s: %s", queryId, result); + return Optional.empty(); + } + return Optional.of(result); + } + @Override public void splitCompleted(SplitCompletedEvent event) {} } diff --git a/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryDao.java b/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryDao.java index 06a1c989ab70..a24415eab9a0 100644 --- a/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryDao.java +++ b/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryDao.java @@ -83,7 +83,8 @@ public interface QueryDao " cumulative_memory DOUBLE NOT NULL,\n" + " failed_cumulative_memory DOUBLE NOT NULL,\n" + " completed_splits BIGINT NOT NULL,\n" + - " retry_policy VARCHAR(255) NOT NULL\n" + + " retry_policy VARCHAR(255) NOT NULL,\n" + + " operator_summaries_json MEDIUMTEXT NOT NULL\n" + ")") void createTable(); @@ -152,7 +153,8 @@ public interface QueryDao " cumulative_memory,\n" + " failed_cumulative_memory,\n" + " completed_splits,\n" + - " retry_policy\n" + + " retry_policy,\n" + + " operator_summaries_json\n" + ")\n" + "VALUES (\n" + " :queryId,\n" + @@ -219,7 +221,8 @@ public interface QueryDao " :cumulativeMemory,\n" + " :failedCumulativeMemory,\n" + " :completedSplits,\n" + - " :retryPolicy\n" + + " :retryPolicy,\n" + + " :operatorSummariesJson\n" + ")") void store(@BindBean QueryEntity entity); } diff --git a/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryEntity.java b/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryEntity.java index 4e9bcb8ac909..fa3e37441c27 100644 --- a/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryEntity.java +++ b/plugin/trino-mysql-event-listener/src/main/java/io/trino/plugin/eventlistener/mysql/QueryEntity.java @@ -99,6 +99,7 @@ public class QueryEntity private final int completedSplits; private final String retryPolicy; + private final Optional operatorSummariesJson; public QueryEntity( String queryId, @@ -165,7 +166,8 @@ public QueryEntity( double cumulativeMemory, double failedCumulativeMemory, int completedSplits, - String retryPolicy) + String retryPolicy, + Optional operatorSummariesJson) { this.queryId = requireNonNull(queryId, "queryId is null"); this.transactionId = requireNonNull(transactionId, "transactionId is null"); @@ -232,6 +234,7 @@ public QueryEntity( this.failedCumulativeMemory = failedCumulativeMemory; this.completedSplits = completedSplits; this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null"); + this.operatorSummariesJson = requireNonNull(operatorSummariesJson, "operatorSummariesJson is null"); } public String getQueryId() @@ -558,4 +561,9 @@ public String getRetryPolicy() { return retryPolicy; } + + public Optional getOperatorSummariesJson() + { + return operatorSummariesJson; + } } diff --git a/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java b/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java index c49c9377b431..87c43fd309b8 100644 --- a/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java +++ b/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java @@ -121,7 +121,7 @@ public class TestMysqlEventListener // not stored Collections.emptyList(), // not stored - Collections.emptyList(), + List.of("{operator: \"operator1\"}", "{operator: \"operator2\"}"), // not stored Optional.empty()); @@ -267,7 +267,6 @@ public class TestMysqlEventListener Collections.emptyList(), // not stored Collections.emptyList(), - // not stored Collections.emptyList(), // not stored Optional.empty()); @@ -421,6 +420,7 @@ public void testFull() assertEquals(resultSet.getDouble("failed_cumulative_memory"), 129.0); assertEquals(resultSet.getLong("completed_splits"), 130); assertEquals(resultSet.getString("retry_policy"), "TASK"); + assertEquals(resultSet.getString("operator_summaries_json"), "[{operator: \"operator1\"},{operator: \"operator2\"}]"); assertFalse(resultSet.next()); } } @@ -503,6 +503,7 @@ public void testMinimal() assertEquals(resultSet.getDouble("failed_cumulative_memory"), 129.0); assertEquals(resultSet.getLong("completed_splits"), 130); assertEquals(resultSet.getString("retry_policy"), "NONE"); + assertEquals(resultSet.getString("operator_summaries_json"), "[]"); assertFalse(resultSet.next()); } }