From c79cba97eb509860f4a73fb4f03e18d408bb56a2 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Tue, 11 Feb 2020 10:01:05 -0800 Subject: [PATCH] fix: ConcurrentModificationException in ClusterStatusResource (#4510) --- .../resources/ClusterStatusResource.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java index 088e482455b3..23f40ba14913 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java @@ -30,6 +30,8 @@ import io.confluent.ksql.rest.server.LagReportingAgent; import io.confluent.ksql.util.HostStatus; import io.confluent.ksql.util.KsqlHostInfo; +import io.confluent.ksql.util.PersistentQueryMetadata; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -100,19 +102,24 @@ private HostStoreLags getHostStoreLags(final KsqlHostInfo ksqlHostInfo) { private Map getActiveStandbyInformation( final KsqlHostInfo ksqlHostInfo ) { - return engine.getPersistentQueries().stream() - .flatMap(persistentQueryMetadata -> persistentQueryMetadata.getAllMetadata() - .stream() - .map(streamsMetadata -> new QueryIdAndStreamsMetadata( - persistentQueryMetadata.getQueryId().toString(), streamsMetadata))) - .filter(queryIdAndStreamsMetadata -> - queryIdAndStreamsMetadata.streamsMetadata != StreamsMetadata.NOT_AVAILABLE) - .filter(queryIdAndStreamsMetadata -> - queryIdAndStreamsMetadata.streamsMetadata.hostInfo().equals(asHostInfo( - ksqlHostInfo))) - .collect(Collectors.toMap(queryIdAndStreamsMetadata -> - queryIdAndStreamsMetadata.queryId , - QueryIdAndStreamsMetadata::toActiveStandbyEntity)); + final Map perQueryMap = new HashMap<>(); + for (PersistentQueryMetadata persistentQueryMetadata: engine.getPersistentQueries()) { + for (StreamsMetadata streamsMetadata: persistentQueryMetadata.getAllMetadata()) { + if (streamsMetadata == StreamsMetadata.NOT_AVAILABLE + || !streamsMetadata.hostInfo().equals(asHostInfo(ksqlHostInfo))) { + continue; + } + final QueryIdAndStreamsMetadata queryIdAndStreamsMetadata = new QueryIdAndStreamsMetadata( + persistentQueryMetadata.getQueryId().toString(), + streamsMetadata + ); + perQueryMap.putIfAbsent( + queryIdAndStreamsMetadata.queryId, + queryIdAndStreamsMetadata.toActiveStandbyEntity() + ); + } + } + return perQueryMap; } private static final class QueryIdAndStreamsMetadata {