Skip to content

Commit

Permalink
fix: ConcurrentModificationException in ClusterStatusResource (#4510)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas authored Feb 11, 2020
1 parent 6299410 commit c79cba9
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.confluent.ksql.rest.server.LagReportingAgent;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -100,19 +102,24 @@ private HostStoreLags getHostStoreLags(final KsqlHostInfo ksqlHostInfo) {
private Map<String, ActiveStandbyEntity> getActiveStandbyInformation(
final KsqlHostInfo ksqlHostInfo
) {
return engine.getPersistentQueries().stream()
.flatMap(persistentQueryMetadata -> persistentQueryMetadata.getAllMetadata()
.stream()
.map(streamsMetadata -> new QueryIdAndStreamsMetadata(
persistentQueryMetadata.getQueryId().toString(), streamsMetadata)))
.filter(queryIdAndStreamsMetadata ->
queryIdAndStreamsMetadata.streamsMetadata != StreamsMetadata.NOT_AVAILABLE)
.filter(queryIdAndStreamsMetadata ->
queryIdAndStreamsMetadata.streamsMetadata.hostInfo().equals(asHostInfo(
ksqlHostInfo)))
.collect(Collectors.toMap(queryIdAndStreamsMetadata ->
queryIdAndStreamsMetadata.queryId ,
QueryIdAndStreamsMetadata::toActiveStandbyEntity));
final Map<String, ActiveStandbyEntity> perQueryMap = new HashMap<>();
for (PersistentQueryMetadata persistentQueryMetadata: engine.getPersistentQueries()) {
for (StreamsMetadata streamsMetadata: persistentQueryMetadata.getAllMetadata()) {
if (streamsMetadata == StreamsMetadata.NOT_AVAILABLE
|| !streamsMetadata.hostInfo().equals(asHostInfo(ksqlHostInfo))) {
continue;
}
final QueryIdAndStreamsMetadata queryIdAndStreamsMetadata = new QueryIdAndStreamsMetadata(
persistentQueryMetadata.getQueryId().toString(),
streamsMetadata
);
perQueryMap.putIfAbsent(
queryIdAndStreamsMetadata.queryId,
queryIdAndStreamsMetadata.toActiveStandbyEntity()
);
}
}
return perQueryMap;
}

private static final class QueryIdAndStreamsMetadata {
Expand Down

0 comments on commit c79cba9

Please sign in to comment.