Skip to content

Commit

Permalink
feat: add observability metric skeleton (#7769)
Browse files Browse the repository at this point in the history
* feat: add observability metric skeleton

* checkstyle
  • Loading branch information
lct45 authored Jul 14, 2021
1 parent 06657ba commit 3362c00
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.internal;

import io.confluent.ksql.engine.QueryEventListener;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryMetadata;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UtilizationMetricsListener implements Runnable, QueryEventListener {

private final List<KafkaStreams> kafkaStreams;
private final Logger logger = LoggerFactory.getLogger(UtilizationMetricsListener.class);
private final List<String> metrics;
private final Time time;
private long lastSampleTime;

public UtilizationMetricsListener() {
this.kafkaStreams = new ArrayList<>();
this.metrics = new LinkedList<>();
time = Time.SYSTEM;
lastSampleTime = time.milliseconds();
}

@Override
public void onCreate(
final ServiceContext serviceContext,
final MetaStore metaStore,
final QueryMetadata queryMetadata) {
kafkaStreams.add(queryMetadata.getKafkaStreams());
}

@Override
public void onDeregister(final QueryMetadata query) {
final KafkaStreams streams = query.getKafkaStreams();
kafkaStreams.remove(streams);
}

@Override
public void run() {
logger.info("Reporting Observability Metrics");
final Long currentTime = time.milliseconds();
// here is where we would report metrics
lastSampleTime = currentTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.ExecutionStep;
Expand All @@ -37,6 +38,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

Expand All @@ -56,6 +60,7 @@ public class PersistentQueryMetadataImpl
private final ProcessingLogger processingLogger;

private Optional<MaterializationProvider> materializationProvider;
private final ScheduledExecutorService executorService;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public PersistentQueryMetadataImpl(
Expand Down Expand Up @@ -112,6 +117,11 @@ public PersistentQueryMetadataImpl(
this.processingLogger = requireNonNull(processingLogger, "processingLogger");
this.scalablePushRegistry = requireNonNull(scalablePushRegistry, "scalablePushRegistry");
this.persistentQueryType = requireNonNull(persistentQueryType, "persistentQueryType");
this.executorService =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat("ksql-csu-metrics-reporter-%d").build()
);
}

// for creating sandbox instances
Expand All @@ -129,6 +139,7 @@ protected PersistentQueryMetadataImpl(
this.processingLogger = original.processingLogger;
this.scalablePushRegistry = original.scalablePushRegistry;
this.persistentQueryType = original.getPersistentQueryType();
this.executorService = original.executorService;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.QueryEventListener;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingFilters;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.MutableFunctionRegistry;
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.UtilizationMetricsListener;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogServerUtils;
Expand Down Expand Up @@ -124,6 +126,7 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -133,6 +136,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -689,14 +693,24 @@ static KsqlRestApplication buildApplication(
final SpecificQueryIdGenerator specificQueryIdGenerator =
new SpecificQueryIdGenerator();

final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("ksql-csu-metrics-reporter-%d")
.build()
);
final UtilizationMetricsListener csuMetricReporter = new UtilizationMetricsListener();
executorService.scheduleAtFixedRate(csuMetricReporter, 0, 300000, TimeUnit.MILLISECONDS);
final List<QueryEventListener> listeners = new ArrayList<>();
listeners.add(csuMetricReporter);

final KsqlEngine ksqlEngine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
ServiceInfo.create(ksqlConfig, metricsPrefix),
specificQueryIdGenerator,
new KsqlConfig(restConfig.getKsqlConfigProperties()),
Collections.emptyList()
listeners
);

UserFunctionLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();
Expand Down

0 comments on commit 3362c00

Please sign in to comment.