Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create MetricsContext for ksql metrics reporters #5528

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
package io.confluent.ksql.metrics;

import io.confluent.common.utils.Time;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -39,9 +43,24 @@
*/
@SuppressWarnings("ClassDataAbstractionCoupling")
public final class MetricCollectors {
private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";
public static final String RESOURCE_LABEL_PREFIX =
CommonClientConfigs.METRICS_CONTEXT_PREFIX + "resource.";
private static final String KSQL_RESOURCE_TYPE = "KSQL";

public static final String RESOURCE_LABEL_TYPE =
RESOURCE_LABEL_PREFIX + "type";
public static final String RESOURCE_LABEL_VERSION =
RESOURCE_LABEL_PREFIX + "version";
public static final String RESOURCE_LABEL_COMMIT_ID =
RESOURCE_LABEL_PREFIX + "commit.id";
public static final String RESOURCE_LABEL_CLUSTER_ID =
RESOURCE_LABEL_PREFIX + "cluster.id";
public static final String RESOURCE_LABEL_KSQL_SERVICE_ID =
RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG;

private static Map<String, MetricCollector> collectorMap;
private static Metrics metrics;
private static final String JMX_PREFIX = "io.confluent.ksql.metrics";

static {
initialize();
Expand All @@ -64,7 +83,7 @@ public static void initialize() {
);
final List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter());
final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX);
final MetricsContext metricsContext = new KafkaMetricsContext(KSQL_JMX_PREFIX);
// Replace all static contents other than Time to ensure they are cleaned for tests that are
// not aware of the need to initialize/cleanup this test, in case test processes are reused.
// Tests aware of the class clean everything up properly to get the state into a clean state,
Expand Down Expand Up @@ -96,15 +115,53 @@ static String addCollector(final String id, final MetricCollector collector) {
return finalId;
}

public static void addConfigurableReporter(final KsqlConfig ksqlConfig) {
public static void addConfigurableReporter(
final KsqlConfig ksqlConfig
) {
final String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);

final List<MetricsReporter> reporters = ksqlConfig.getConfiguredInstances(
KsqlConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
for (final MetricsReporter reporter: reporters) {
metrics.addReporter(reporter);
MetricsReporter.class,
Collections.singletonMap(
KsqlConfig.KSQL_SERVICE_ID_CONFIG,
ksqlServiceId));

if (reporters.size() > 0) {
final Map<String, Object> props = ksqlConfig.originals();
props.putAll(addConfluentMetricsContextConfigsForKsql(ksqlServiceId));
final KsqlConfig ksqlConfigWithMetricsContext = new KsqlConfig(props);
final MetricsContext metricsContext = new KafkaMetricsContext(
KSQL_JMX_PREFIX,
ksqlConfigWithMetricsContext.originalsWithPrefix(
CommonClientConfigs.METRICS_CONTEXT_PREFIX));
for (final MetricsReporter reporter : reporters) {
reporter.contextChange(metricsContext);
metrics.addReporter(reporter);
}
}
}

public static Map<String, Object> addConfluentMetricsContextConfigs(
final String ksqlServiceId
) {
final Map<String, Object> updatedProps = new HashMap<>();
updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE);
updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId);
return updatedProps;
}

public static Map<String, Object> addConfluentMetricsContextConfigsForKsql(
final String ksqlServiceId
) {
final Map<String, Object> updatedProps = new HashMap<>();
updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId);
updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion());
updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId());
updatedProps.putAll(addConfluentMetricsContextConfigs(ksqlServiceId));
return updatedProps;
}

static void remove(final String id) {
collectorMap.remove(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.config.ConfigItem;
import io.confluent.ksql.config.KsqlConfigResolver;
import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.model.SemanticVersion;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.testing.EffectivelyImmutable;
Expand All @@ -48,6 +50,7 @@
import org.apache.kafka.common.config.ConfigDef.ValidString;
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.streams.StreamsConfig;

@EffectivelyImmutable
Expand All @@ -65,6 +68,13 @@ public class KsqlConfig extends AbstractConfig {

public static final String METRIC_REPORTER_CLASSES_DOC =
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;

private static final String TELEMETRY_PREFIX = "confluent.telemetry";
private static final Set<String> REPORTER_CONFIGS_PREFIXES =
ImmutableSet.of(
TELEMETRY_PREFIX,
CommonClientConfigs.METRICS_CONTEXT_PREFIX
);

public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas";

Expand Down Expand Up @@ -763,20 +773,59 @@ private KsqlConfig(final ConfigGeneration generation,
this.ksqlStreamConfigProps = ksqlStreamConfigProps;
}

public Map<String, Object> getKsqlStreamConfigProps(final String applicationId) {
final Map<String, Object> map = new HashMap<>(getKsqlStreamConfigProps());
map.put(
MetricCollectors.RESOURCE_LABEL_PREFIX
+ StreamsConfig.APPLICATION_ID_CONFIG,
applicationId
);
map.putAll(
addConfluentMetricsContextConfigsKafka(
Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getKsqlStreamConfigProps() {
final Map<String, Object> props = new HashMap<>();
final Map<String, Object> map = new HashMap<>();
for (final ConfigValue config : ksqlStreamConfigProps.values()) {
props.put(config.key, config.value);
map.put(config.key, config.value);
}
return Collections.unmodifiableMap(props);
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getKsqlAdminClientConfigProps() {
return getConfigsFor(AdminClientConfig.configNames());
final Map<String, Object> map = new HashMap<>();
Copy link
Member

@xiaodongdu xiaodongdu Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want ksql cluster id and application propagate to AdminClient, you should add as part of admin client properties:
metrics.context.cluster.id, metrics.context.ksql.application.id

Because in KafkaAdminClient, we treat properties as metrics reporter properties if it starts with metrics.context: https://github.com/confluentinc/ce-kafka/blob/0f61da6b355f2f34e1b6d2f30072bb47f600bdae/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L495

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it so that metrics.context.resource.cluster.id=<ksql-service-id> is being passed in for adminClient, producer, and streams

map.putAll(getConfigsFor(AdminClientConfig.configNames()));
map.putAll(
addConfluentMetricsContextConfigsKafka(Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getProducerClientConfigProps() {
return getConfigsFor(ProducerConfig.configNames());
final Map<String, Object> map = new HashMap<>();
map.putAll(getConfigsFor(ProducerConfig.configNames()));
map.putAll(
addConfluentMetricsContextConfigsKafka(Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> addConfluentMetricsContextConfigsKafka(
final Map<String,Object> props,
final String ksqlServiceId
) {
final Map<String, Object> updatedProps = new HashMap<>(props);
final AppInfoParser.AppInfo appInfo = new AppInfoParser.AppInfo(System.currentTimeMillis());
updatedProps.put(MetricCollectors.RESOURCE_LABEL_VERSION, appInfo.getVersion());
updatedProps.put(MetricCollectors.RESOURCE_LABEL_COMMIT_ID, appInfo.getCommitId());

updatedProps.putAll(
MetricCollectors.addConfluentMetricsContextConfigs(ksqlServiceId));
updatedProps.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
return updatedProps;
}

public Map<String, Object> getProcessingLogConfigProps() {
Expand All @@ -791,6 +840,14 @@ private Map<String, Object> getConfigsFor(final Set<String> configs) {
return Collections.unmodifiableMap(props);
}

private Map<String, Object> getConfigsForPrefix(final Set<String> configs) {
final Map<String, Object> props = new HashMap<>();
ksqlStreamConfigProps.values().stream()
.filter(configValue -> configs.stream().anyMatch(configValue.key::startsWith))
.forEach(configValue -> props.put(configValue.key, configValue.value));
return Collections.unmodifiableMap(props);
}

public Map<String, Object> getKsqlFunctionsConfigProps(final String functionName) {
final Map<String, Object> udfProps = originalsWithPrefix(
KSQL_FUNCTIONS_PROPERTY_PREFIX + functionName.toLowerCase(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -73,8 +74,9 @@ public void shouldAggregateStats() {
public void shouldAddConfigurableReporters() {
final MetricsReporter mockReporter = mock(MetricsReporter.class);
assertThat(MetricCollectors.getMetrics().reporters().size(), equalTo(1));
when(ksqlConfig.getConfiguredInstances(any(), any()))
when(ksqlConfig.getConfiguredInstances(anyString(), any(), any()))
.thenReturn(Collections.singletonList(mockReporter));
when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id");

MetricCollectors.addConfigurableReporter(ksqlConfig);
final List<MetricsReporter> reporters = MetricCollectors.getMetrics().reporters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private Map<String, Object> buildStreamsProperties(
final QueryId queryId
) {
final Map<String, Object> newStreamsProperties
= new HashMap<>(ksqlConfig.getKsqlStreamConfigProps());
= new HashMap<>(ksqlConfig.getKsqlStreamConfigProps(applicationId));
newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
final ProcessingLogger logger
= processingLogContext.getLoggerFactory().getLogger(queryId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -190,7 +191,7 @@ public void setup() {
when(ksqlMaterializationFactory.create(any(), any(), any(), any())).thenReturn(materialization);
when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory);
when(processingLoggerFactory.getLogger(any())).thenReturn(processingLogger);
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(Collections.emptyMap());
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(Collections.emptyMap());
when(ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG))
.thenReturn(PERSISTENT_PREFIX);
when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn(SERVICE_ID);
Expand Down Expand Up @@ -420,7 +421,7 @@ private void shouldUseProvidedOptimizationConfig(final Object value) {
// Given:
final Map<String, Object> properties =
Collections.singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION, value);
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(properties);
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(properties);

// When:
queryBuilder.buildQuery(
Expand Down Expand Up @@ -473,7 +474,7 @@ private void assertPropertiesContainDummyInterceptors() {
@Test
public void shouldAddMetricsInterceptorsToExistingList() {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
ImmutableList.of(DummyConsumerInterceptor.class.getName()),
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
Expand All @@ -497,7 +498,7 @@ public void shouldAddMetricsInterceptorsToExistingList() {
@Test
public void shouldAddMetricsInterceptorsToExistingString() {
// When:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyConsumerInterceptor.class.getName(),
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
Expand All @@ -522,7 +523,7 @@ public void shouldAddMetricsInterceptorsToExistingString() {
@SuppressWarnings("unchecked")
public void shouldAddMetricsInterceptorsToExistingStringList() {
// When:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyConsumerInterceptor.class.getName()
+ ","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public static SourceName getCommandsStreamName() {
serviceContext,
this.restConfig,
this.ksqlConfigNoPort);
MetricCollectors.addConfigurableReporter(ksqlConfig);
MetricCollectors.addConfigurableReporter(ksqlConfigNoPort);
log.debug("ksqlDB API server instance created");
}

Expand Down Expand Up @@ -605,12 +605,17 @@ static KsqlRestApplication buildApplication(

final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig);

final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final CommandStore commandStore = CommandStore.Factory.create(
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
restConfig.getCommandConsumerProperties(),
restConfig.getCommandProducerProperties()
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandConsumerProperties(),
serviceId),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandProducerProperties(),
serviceId)
);

final InteractiveStatementExecutor statementExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@

package io.confluent.ksql.rest.server;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -65,6 +65,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import org.apache.kafka.common.metrics.MetricsReporter;
Expand Down Expand Up @@ -163,6 +165,9 @@ public void setUp() {
when(commandQueue.getCommandTopicName()).thenReturn(CMD_TOPIC_NAME);
when(serviceContext.getTopicClient()).thenReturn(topicClient);
when(topicClient.isTopicExists(CMD_TOPIC_NAME)).thenReturn(false);

when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id");

when(precondition1.checkPrecondition(any(), any())).thenReturn(Optional.empty());
when(precondition2.checkPrecondition(any(), any())).thenReturn(Optional.empty());

Expand Down Expand Up @@ -215,7 +220,7 @@ public void shouldCloseSecurityExtensionOnClose() {
public void shouldAddConfigurableMetricsReportersIfPresentInKsqlConfig() {
// When:
final MetricsReporter mockReporter = mock(MetricsReporter.class);
when(ksqlConfig.getConfiguredInstances(any(), any()))
when(ksqlConfig.getConfiguredInstances(anyString(), any(), any()))
.thenReturn(Collections.singletonList(mockReporter));
givenAppWithRestConfig(Collections.emptyMap());

Expand Down
Loading