Skip to content

Commit

Permalink
Store operator summaries in MysqlEventListener
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr authored and losipiuk committed Mar 10, 2023
1 parent 682b4ef commit cb716d9
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 7 deletions.
5 changes: 5 additions & 0 deletions plugin/trino-mysql-event-listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package io.trino.plugin.eventlistener.mysql;

import com.google.common.base.Joiner;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorType;
import io.trino.spi.TrinoWarning;
Expand Down Expand Up @@ -47,6 +49,10 @@
public class MysqlEventListener
implements EventListener
{
private static final Logger log = Logger.get(MysqlEventListener.class);

private static final long MAX_OPERATOR_SUMMARIES_JSON_LENGTH = 16 * 1024 * 1024;

private final QueryDao dao;
private final JsonCodec<Set<String>> clientTagsJsonCodec;
private final JsonCodec<Map<String, String>> sessionPropertiesJsonCodec;
Expand Down Expand Up @@ -152,10 +158,25 @@ public void queryCompleted(QueryCompletedEvent event)
stats.getCumulativeMemory(),
stats.getFailedCumulativeMemory(),
stats.getCompletedSplits(),
context.getRetryPolicy());
context.getRetryPolicy(),
createOperatorSummariesJson(metadata.getQueryId(), stats.getOperatorSummaries()));
dao.store(entity);
}

private Optional<String> createOperatorSummariesJson(String queryId, List<String> summaries)
{
StringBuilder builder = new StringBuilder();
builder.append("[");
Joiner.on(",").appendTo(builder, summaries);
builder.append("]");
String result = builder.toString();
if (result.length() > MAX_OPERATOR_SUMMARIES_JSON_LENGTH) {
log.info("Exceeded maximum operator summaries length for query %s: %s", queryId, result);
return Optional.empty();
}
return Optional.of(result);
}

@Override
public void splitCompleted(SplitCompletedEvent event) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public interface QueryDao
" cumulative_memory DOUBLE NOT NULL,\n" +
" failed_cumulative_memory DOUBLE NOT NULL,\n" +
" completed_splits BIGINT NOT NULL,\n" +
" retry_policy VARCHAR(255) NOT NULL\n" +
" retry_policy VARCHAR(255) NOT NULL,\n" +
" operator_summaries_json MEDIUMTEXT NOT NULL\n" +
")")
void createTable();

Expand Down Expand Up @@ -152,7 +153,8 @@ public interface QueryDao
" cumulative_memory,\n" +
" failed_cumulative_memory,\n" +
" completed_splits,\n" +
" retry_policy\n" +
" retry_policy,\n" +
" operator_summaries_json\n" +
")\n" +
"VALUES (\n" +
" :queryId,\n" +
Expand Down Expand Up @@ -219,7 +221,8 @@ public interface QueryDao
" :cumulativeMemory,\n" +
" :failedCumulativeMemory,\n" +
" :completedSplits,\n" +
" :retryPolicy\n" +
" :retryPolicy,\n" +
" :operatorSummariesJson\n" +
")")
void store(@BindBean QueryEntity entity);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class QueryEntity
private final int completedSplits;

private final String retryPolicy;
private final Optional<String> operatorSummariesJson;

public QueryEntity(
String queryId,
Expand Down Expand Up @@ -165,7 +166,8 @@ public QueryEntity(
double cumulativeMemory,
double failedCumulativeMemory,
int completedSplits,
String retryPolicy)
String retryPolicy,
Optional<String> operatorSummariesJson)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
Expand Down Expand Up @@ -232,6 +234,7 @@ public QueryEntity(
this.failedCumulativeMemory = failedCumulativeMemory;
this.completedSplits = completedSplits;
this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null");
this.operatorSummariesJson = requireNonNull(operatorSummariesJson, "operatorSummariesJson is null");
}

public String getQueryId()
Expand Down Expand Up @@ -558,4 +561,9 @@ public String getRetryPolicy()
{
return retryPolicy;
}

public Optional<String> getOperatorSummariesJson()
{
return operatorSummariesJson;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class TestMysqlEventListener
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
List.of("{operator: \"operator1\"}", "{operator: \"operator2\"}"),
// not stored
Optional.empty());

Expand Down Expand Up @@ -267,7 +267,6 @@ public class TestMysqlEventListener
Collections.emptyList(),
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
// not stored
Optional.empty());
Expand Down Expand Up @@ -421,6 +420,7 @@ public void testFull()
assertEquals(resultSet.getDouble("failed_cumulative_memory"), 129.0);
assertEquals(resultSet.getLong("completed_splits"), 130);
assertEquals(resultSet.getString("retry_policy"), "TASK");
assertEquals(resultSet.getString("operator_summaries_json"), "[{operator: \"operator1\"},{operator: \"operator2\"}]");
assertFalse(resultSet.next());
}
}
Expand Down Expand Up @@ -503,6 +503,7 @@ public void testMinimal()
assertEquals(resultSet.getDouble("failed_cumulative_memory"), 129.0);
assertEquals(resultSet.getLong("completed_splits"), 130);
assertEquals(resultSet.getString("retry_policy"), "NONE");
assertEquals(resultSet.getString("operator_summaries_json"), "[]");
assertFalse(resultSet.next());
}
}
Expand Down

0 comments on commit cb716d9

Please sign in to comment.