Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create MetricsContext for ksql metrics reporters #5528

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
package io.confluent.ksql.metrics;

import io.confluent.common.utils.Time;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -39,9 +43,26 @@
*/
@SuppressWarnings("ClassDataAbstractionCoupling")
public final class MetricCollectors {
private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";
public static final String METRICS_CONTEXT_RESOURCE_LABEL_PREFIX =
CommonClientConfigs.METRICS_CONTEXT_PREFIX + "resource.";
private static final String KSQL_RESOURCE_TYPE = "KSQL";

public static final String RESOURCE_LABEL_TYPE =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "type";
public static final String RESOURCE_LABEL_VERSION =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "version";
public static final String RESOURCE_LABEL_COMMIT_ID =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "commit.id";
public static final String RESOURCE_LABEL_CLUSTER_ID =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "cluster.id";
public static final String RESOURCE_LABEL_KSQL_SERVICE_ID =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG;
public static final String RESOURCE_LABEL_KAFKA_CLUSTER_ID =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "kafka.cluster.id";

private static Map<String, MetricCollector> collectorMap;
private static Metrics metrics;
private static final String JMX_PREFIX = "io.confluent.ksql.metrics";

static {
initialize();
Expand All @@ -64,7 +85,7 @@ public static void initialize() {
);
final List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter());
final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX);
final MetricsContext metricsContext = new KafkaMetricsContext(KSQL_JMX_PREFIX);
// Replace all static contents other than Time to ensure they are cleaned for tests that are
// not aware of the need to initialize/cleanup this test, in case test processes are reused.
// Tests aware of the class clean everything up properly to get the state into a clean state,
Expand Down Expand Up @@ -96,15 +117,51 @@ static String addCollector(final String id, final MetricCollector collector) {
return finalId;
}

public static void addConfigurableReporter(final KsqlConfig ksqlConfig) {
public static void addConfigurableReporter(
final KsqlConfig ksqlConfig,
final String kafkaClusterId
) {
final String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final List<MetricsReporter> reporters = ksqlConfig.getConfiguredInstances(
KsqlConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
for (final MetricsReporter reporter: reporters) {
metrics.addReporter(reporter);
MetricsReporter.class,
Collections.singletonMap(
KsqlConfig.KSQL_SERVICE_ID_CONFIG,
ksqlServiceId));

if (reporters.size() > 0) {
final Map<String, Object> metadata =
new HashMap<>(ksqlConfig.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
final MetricsContext metricsContext =
new KafkaMetricsContext(
KSQL_JMX_PREFIX,
addConfluentMetricsContextConfigs(
metadata,
ksqlServiceId,
kafkaClusterId));

for (final MetricsReporter reporter : reporters) {
reporter.contextChange(metricsContext);
metrics.addReporter(reporter);
}
}
}

public static Map<String, Object> addConfluentMetricsContextConfigs(
final Map<String,Object> props,
final String ksqlServiceId,
final String kafkaClusterId
) {
final Map<String, Object> updatedProps = new HashMap<>(props);
updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion());
updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId());
updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE);
updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId);
updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId);
updatedProps.put(RESOURCE_LABEL_KAFKA_CLUSTER_ID, kafkaClusterId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this. According to KIP-606, "kafka.cluster.id" is for kafka broker. For client of TelemetryReporter, you only need resource.cluster.id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return updatedProps;
}

static void remove(final String id) {
collectorMap.remove(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.config.ConfigItem;
import io.confluent.ksql.config.KsqlConfigResolver;
import io.confluent.ksql.configdef.ConfigValidators;
Expand Down Expand Up @@ -66,6 +67,12 @@ public class KsqlConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_DOC =
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;

private static final String METRIC_REPORTERS_PREFIX = "metric.reporters";
public static final String METRIC_CONTEXT_PREFIX = CommonClientConfigs.METRICS_CONTEXT_PREFIX;
private static final String TELEMETRY_PREFIX = "confluent.telemetry";
private static final Set<String> REPORTER_CONFIGS_PREFIXES =
ImmutableSet.of(METRIC_REPORTERS_PREFIX, TELEMETRY_PREFIX, METRIC_CONTEXT_PREFIX);

public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas";

public static final String KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY =
Expand Down Expand Up @@ -768,15 +775,25 @@ public Map<String, Object> getKsqlStreamConfigProps() {
for (final ConfigValue config : ksqlStreamConfigProps.values()) {
props.put(config.key, config.value);
}

props.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
return Collections.unmodifiableMap(props);
}

public Map<String, Object> getKsqlAdminClientConfigProps() {
return getConfigsFor(AdminClientConfig.configNames());
final Map<String, Object> map = new HashMap<>();
Copy link
Member

@xiaodongdu xiaodongdu Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want ksql cluster id and application propagate to AdminClient, you should add as part of admin client properties:
metrics.context.cluster.id, metrics.context.ksql.application.id

Because in KafkaAdminClient, we treat properties as metrics reporter properties if it starts with metrics.context: https://github.com/confluentinc/ce-kafka/blob/0f61da6b355f2f34e1b6d2f30072bb47f600bdae/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L495

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it so that metrics.context.resource.cluster.id=<ksql-service-id> is being passed in for adminClient, producer, and streams

map.putAll(getConfigsFor(AdminClientConfig.configNames()));

map.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getProducerClientConfigProps() {
return getConfigsFor(ProducerConfig.configNames());
final Map<String, Object> map = new HashMap<>();
map.putAll(getConfigsFor(ProducerConfig.configNames()));

map.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as AdminClient

return Collections.unmodifiableMap(map);
}

public Map<String, Object> getProcessingLogConfigProps() {
Expand All @@ -791,6 +808,14 @@ private Map<String, Object> getConfigsFor(final Set<String> configs) {
return Collections.unmodifiableMap(props);
}

private Map<String, Object> getConfigsForPrefix(final Set<String> configs) {
final Map<String, Object> props = new HashMap<>();
ksqlStreamConfigProps.values().stream()
.filter(configValue -> configs.stream().anyMatch(configValue.key::startsWith))
.forEach(configValue -> props.put(configValue.key, configValue.value));
return Collections.unmodifiableMap(props);
}

public Map<String, Object> getKsqlFunctionsConfigProps(final String functionName) {
final Map<String, Object> udfProps = originalsWithPrefix(
KSQL_FUNCTIONS_PROPERTY_PREFIX + functionName.toLowerCase(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -73,10 +74,11 @@ public void shouldAggregateStats() {
public void shouldAddConfigurableReporters() {
final MetricsReporter mockReporter = mock(MetricsReporter.class);
assertThat(MetricCollectors.getMetrics().reporters().size(), equalTo(1));
when(ksqlConfig.getConfiguredInstances(any(), any()))
when(ksqlConfig.getConfiguredInstances(anyString(), any(), any()))
.thenReturn(Collections.singletonList(mockReporter));
when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id");

MetricCollectors.addConfigurableReporter(ksqlConfig);
MetricCollectors.addConfigurableReporter(ksqlConfig, "kafka cluster");
final List<MetricsReporter> reporters = MetricCollectors.getMetrics().reporters();
assertThat(reporters, hasItem(mockReporter));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.confluent.ksql.security.KsqlDefaultSecurityExtension;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.LazyServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.SimpleKsqlClient;
Expand Down Expand Up @@ -262,7 +263,6 @@ public static SourceName getCommandsStreamName() {
serviceContext,
this.restConfig,
this.ksqlConfigNoPort);
MetricCollectors.addConfigurableReporter(ksqlConfig);
log.debug("ksqlDB API server instance created");
}

Expand Down Expand Up @@ -437,6 +437,8 @@ private void initialize(final KsqlConfig configWithApplicationServer) {
lagReportingAgent.get().startAgent();
}

MetricCollectors.addConfigurableReporter(
ksqlConfigNoPort, KafkaClusterUtil.getKafkaClusterId(serviceContext));
serverState.setReady();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.properties.PropertyOverrider;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
Expand Down Expand Up @@ -104,7 +105,8 @@ public class StandaloneExecutor implements Executable {
this.failOnNoQueries = failOnNoQueries;
this.versionChecker = requireNonNull(versionChecker, "versionChecker");
this.injectorFactory = requireNonNull(injectorFactory, "injectorFactory");
MetricCollectors.addConfigurableReporter(ksqlConfig);
MetricCollectors.addConfigurableReporter(
ksqlConfig, KafkaClusterUtil.getKafkaClusterId(serviceContext));
}

public void startAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package io.confluent.ksql.rest.server;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -65,8 +66,13 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
Expand All @@ -89,6 +95,8 @@ public class KsqlRestApplicationTest {
@Mock
private ServiceContext serviceContext;
@Mock
private Admin adminClient;
@Mock
private KsqlEngine ksqlEngine;
@Mock
private KsqlConfig ksqlConfig;
Expand Down Expand Up @@ -146,7 +154,7 @@ public class KsqlRestApplicationTest {

@SuppressWarnings("unchecked")
@Before
public void setUp() {
public void setUp() throws ExecutionException, InterruptedException, TimeoutException {
when(processingLogConfig.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE))
.thenReturn(true);
when(processingLogConfig.getString(ProcessingLogConfig.STREAM_NAME))
Expand All @@ -163,6 +171,16 @@ public void setUp() {
when(commandQueue.getCommandTopicName()).thenReturn(CMD_TOPIC_NAME);
when(serviceContext.getTopicClient()).thenReturn(topicClient);
when(topicClient.isTopicExists(CMD_TOPIC_NAME)).thenReturn(false);

when(serviceContext.getAdminClient()).thenReturn(adminClient);
final DescribeClusterResult result = mock(DescribeClusterResult.class);
final KafkaFuture<String> future = mock(KafkaFuture.class);
when(result.clusterId()).thenReturn(future);
when(future.get(anyLong(), any())).thenReturn("kafka-cluster-id");
when(adminClient.describeCluster()).thenReturn(result);

when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id");

when(precondition1.checkPrecondition(any(), any())).thenReturn(Optional.empty());
when(precondition2.checkPrecondition(any(), any())).thenReturn(Optional.empty());

Expand Down Expand Up @@ -215,9 +233,9 @@ public void shouldCloseSecurityExtensionOnClose() {
public void shouldAddConfigurableMetricsReportersIfPresentInKsqlConfig() {
// When:
final MetricsReporter mockReporter = mock(MetricsReporter.class);
when(ksqlConfig.getConfiguredInstances(any(), any()))
when(ksqlConfig.getConfiguredInstances(anyString(), any(), any()))
.thenReturn(Collections.singletonList(mockReporter));
givenAppWithRestConfig(Collections.emptyMap());
app.startKsql(ksqlConfig);

// Then:
final List<MetricsReporter> reporters = MetricCollectors.getMetrics().reporters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -95,7 +97,11 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
Expand Down Expand Up @@ -234,6 +240,8 @@ public class StandaloneExecutorTest {
@Mock
private KafkaTopicClient kafkaTopicClient;
@Mock
private Admin adminClient;
@Mock
private BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
@Mock
private Injector schemaInjector;
Expand All @@ -254,6 +262,12 @@ public void before() throws Exception {
givenQueryFileContains("something");

when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient);
when(serviceContext.getAdminClient()).thenReturn(adminClient);
final DescribeClusterResult result = mock(DescribeClusterResult.class);
final KafkaFuture<String> future = mock(KafkaFuture.class);
when(result.clusterId()).thenReturn(future);
when(future.get(anyLong(), any())).thenReturn("kafka-cluster-id");
when(adminClient.describeCluster()).thenReturn(result);

when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0));

Expand Down Expand Up @@ -316,8 +330,9 @@ public void shouldAddConfigurableMetricsReportersIfPresentInKsqlConfig() {
// When:
final MetricsReporter mockReporter = mock(MetricsReporter.class);
final KsqlConfig mockKsqlConfig = mock(KsqlConfig.class);
when(mockKsqlConfig.getConfiguredInstances(any(), any()))
when(mockKsqlConfig.getConfiguredInstances(anyString(), any(), any()))
.thenReturn(Collections.singletonList(mockReporter));
when(mockKsqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id");
standaloneExecutor = new StandaloneExecutor(
serviceContext,
processingLogConfig,
Expand Down