diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java b/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java index 8828aeebb379..5b1202b858d8 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java @@ -16,15 +16,19 @@ package io.confluent.ksql.metrics; import io.confluent.common.utils.Time; +import io.confluent.ksql.util.AppInfo; import io.confluent.ksql.util.KsqlConfig; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.MetricConfig; @@ -39,9 +43,24 @@ */ @SuppressWarnings("ClassDataAbstractionCoupling") public final class MetricCollectors { + private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics"; + public static final String RESOURCE_LABEL_PREFIX = + CommonClientConfigs.METRICS_CONTEXT_PREFIX + "resource."; + private static final String KSQL_RESOURCE_TYPE = "KSQL"; + + public static final String RESOURCE_LABEL_TYPE = + RESOURCE_LABEL_PREFIX + "type"; + public static final String RESOURCE_LABEL_VERSION = + RESOURCE_LABEL_PREFIX + "version"; + public static final String RESOURCE_LABEL_COMMIT_ID = + RESOURCE_LABEL_PREFIX + "commit.id"; + public static final String RESOURCE_LABEL_CLUSTER_ID = + RESOURCE_LABEL_PREFIX + "cluster.id"; + public static final String RESOURCE_LABEL_KSQL_SERVICE_ID = + RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG; + private static Map collectorMap; private static Metrics metrics; - private static final String JMX_PREFIX = "io.confluent.ksql.metrics"; static { initialize(); @@ -64,7 +83,7 @@ public static void initialize() { ); final List reporters = new ArrayList<>(); reporters.add(new JmxReporter()); - final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX); + final MetricsContext metricsContext = new KafkaMetricsContext(KSQL_JMX_PREFIX); // Replace all static contents other than Time to ensure they are cleaned for tests that are // not aware of the need to initialize/cleanup this test, in case test processes are reused. // Tests aware of the class clean everything up properly to get the state into a clean state, @@ -96,15 +115,53 @@ static String addCollector(final String id, final MetricCollector collector) { return finalId; } - public static void addConfigurableReporter(final KsqlConfig ksqlConfig) { + public static void addConfigurableReporter( + final KsqlConfig ksqlConfig + ) { + final String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); + final List reporters = ksqlConfig.getConfiguredInstances( KsqlConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - for (final MetricsReporter reporter: reporters) { - metrics.addReporter(reporter); + MetricsReporter.class, + Collections.singletonMap( + KsqlConfig.KSQL_SERVICE_ID_CONFIG, + ksqlServiceId)); + + if (reporters.size() > 0) { + final Map props = ksqlConfig.originals(); + props.putAll(addConfluentMetricsContextConfigsForKsql(ksqlServiceId)); + final KsqlConfig ksqlConfigWithMetricsContext = new KsqlConfig(props); + final MetricsContext metricsContext = new KafkaMetricsContext( + KSQL_JMX_PREFIX, + ksqlConfigWithMetricsContext.originalsWithPrefix( + CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + for (final MetricsReporter reporter : reporters) { + reporter.contextChange(metricsContext); + metrics.addReporter(reporter); + } } } + public static Map addConfluentMetricsContextConfigs( + final String ksqlServiceId + ) { + final Map updatedProps = new HashMap<>(); + updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE); + updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId); + return updatedProps; + } + + public static Map addConfluentMetricsContextConfigsForKsql( + final String ksqlServiceId + ) { + final Map updatedProps = new HashMap<>(); + updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId); + updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion()); + updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId()); + updatedProps.putAll(addConfluentMetricsContextConfigs(ksqlServiceId)); + return updatedProps; + } + static void remove(final String id) { collectorMap.remove(id); } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 1d8816fe4f90..776d803c7bf5 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -20,12 +20,14 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.confluent.ksql.config.ConfigItem; import io.confluent.ksql.config.KsqlConfigResolver; import io.confluent.ksql.configdef.ConfigValidators; import io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler; import io.confluent.ksql.errors.ProductionExceptionHandlerUtil; import io.confluent.ksql.logging.processing.ProcessingLogConfig; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.model.SemanticVersion; import io.confluent.ksql.query.QueryError; import io.confluent.ksql.testing.EffectivelyImmutable; @@ -48,6 +50,7 @@ import org.apache.kafka.common.config.ConfigDef.ValidString; import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.streams.StreamsConfig; @EffectivelyImmutable @@ -65,6 +68,13 @@ public class KsqlConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC; + + private static final String TELEMETRY_PREFIX = "confluent.telemetry"; + private static final Set REPORTER_CONFIGS_PREFIXES = + ImmutableSet.of( + TELEMETRY_PREFIX, + CommonClientConfigs.METRICS_CONTEXT_PREFIX + ); public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas"; @@ -763,20 +773,59 @@ private KsqlConfig(final ConfigGeneration generation, this.ksqlStreamConfigProps = ksqlStreamConfigProps; } + public Map getKsqlStreamConfigProps(final String applicationId) { + final Map map = new HashMap<>(getKsqlStreamConfigProps()); + map.put( + MetricCollectors.RESOURCE_LABEL_PREFIX + + StreamsConfig.APPLICATION_ID_CONFIG, + applicationId + ); + map.putAll( + addConfluentMetricsContextConfigsKafka( + Collections.emptyMap(), + getString(KSQL_SERVICE_ID_CONFIG))); + return Collections.unmodifiableMap(map); + } + public Map getKsqlStreamConfigProps() { - final Map props = new HashMap<>(); + final Map map = new HashMap<>(); for (final ConfigValue config : ksqlStreamConfigProps.values()) { - props.put(config.key, config.value); + map.put(config.key, config.value); } - return Collections.unmodifiableMap(props); + return Collections.unmodifiableMap(map); } public Map getKsqlAdminClientConfigProps() { - return getConfigsFor(AdminClientConfig.configNames()); + final Map map = new HashMap<>(); + map.putAll(getConfigsFor(AdminClientConfig.configNames())); + map.putAll( + addConfluentMetricsContextConfigsKafka(Collections.emptyMap(), + getString(KSQL_SERVICE_ID_CONFIG))); + return Collections.unmodifiableMap(map); } public Map getProducerClientConfigProps() { - return getConfigsFor(ProducerConfig.configNames()); + final Map map = new HashMap<>(); + map.putAll(getConfigsFor(ProducerConfig.configNames())); + map.putAll( + addConfluentMetricsContextConfigsKafka(Collections.emptyMap(), + getString(KSQL_SERVICE_ID_CONFIG))); + return Collections.unmodifiableMap(map); + } + + public Map addConfluentMetricsContextConfigsKafka( + final Map props, + final String ksqlServiceId + ) { + final Map updatedProps = new HashMap<>(props); + final AppInfoParser.AppInfo appInfo = new AppInfoParser.AppInfo(System.currentTimeMillis()); + updatedProps.put(MetricCollectors.RESOURCE_LABEL_VERSION, appInfo.getVersion()); + updatedProps.put(MetricCollectors.RESOURCE_LABEL_COMMIT_ID, appInfo.getCommitId()); + + updatedProps.putAll( + MetricCollectors.addConfluentMetricsContextConfigs(ksqlServiceId)); + updatedProps.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES)); + return updatedProps; } public Map getProcessingLogConfigProps() { @@ -791,6 +840,14 @@ private Map getConfigsFor(final Set configs) { return Collections.unmodifiableMap(props); } + private Map getConfigsForPrefix(final Set configs) { + final Map props = new HashMap<>(); + ksqlStreamConfigProps.values().stream() + .filter(configValue -> configs.stream().anyMatch(configValue.key::startsWith)) + .forEach(configValue -> props.put(configValue.key, configValue.value)); + return Collections.unmodifiableMap(props); + } + public Map getKsqlFunctionsConfigProps(final String functionName) { final Map udfProps = originalsWithPrefix( KSQL_FUNCTIONS_PROPERTY_PREFIX + functionName.toLowerCase(), false); diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java index 68edc17f7619..e514877d8dbb 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/metrics/MetricCollectorsTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -73,8 +74,9 @@ public void shouldAggregateStats() { public void shouldAddConfigurableReporters() { final MetricsReporter mockReporter = mock(MetricsReporter.class); assertThat(MetricCollectors.getMetrics().reporters().size(), equalTo(1)); - when(ksqlConfig.getConfiguredInstances(any(), any())) + when(ksqlConfig.getConfiguredInstances(anyString(), any(), any())) .thenReturn(Collections.singletonList(mockReporter)); + when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id"); MetricCollectors.addConfigurableReporter(ksqlConfig); final List reporters = MetricCollectors.getMetrics().reporters(); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index c856bfaadef8..b260f333a91e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -303,7 +303,7 @@ private Map buildStreamsProperties( final QueryId queryId ) { final Map newStreamsProperties - = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps()); + = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps(applicationId)); newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); final ProcessingLogger logger = processingLogContext.getLoggerFactory().getLogger(queryId.toString()); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java index 166e5a8f6e9d..35e305a44ab6 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java @@ -5,6 +5,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; @@ -190,7 +191,7 @@ public void setup() { when(ksqlMaterializationFactory.create(any(), any(), any(), any())).thenReturn(materialization); when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory); when(processingLoggerFactory.getLogger(any())).thenReturn(processingLogger); - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(Collections.emptyMap()); + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(Collections.emptyMap()); when(ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG)) .thenReturn(PERSISTENT_PREFIX); when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn(SERVICE_ID); @@ -420,7 +421,7 @@ private void shouldUseProvidedOptimizationConfig(final Object value) { // Given: final Map properties = Collections.singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION, value); - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(properties); + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(properties); // When: queryBuilder.buildQuery( @@ -473,7 +474,7 @@ private void assertPropertiesContainDummyInterceptors() { @Test public void shouldAddMetricsInterceptorsToExistingList() { // Given: - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of( + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of( StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), ImmutableList.of(DummyConsumerInterceptor.class.getName()), StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), @@ -497,7 +498,7 @@ public void shouldAddMetricsInterceptorsToExistingList() { @Test public void shouldAddMetricsInterceptorsToExistingString() { // When: - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of( + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of( StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), DummyConsumerInterceptor.class.getName(), StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), @@ -522,7 +523,7 @@ public void shouldAddMetricsInterceptorsToExistingString() { @SuppressWarnings("unchecked") public void shouldAddMetricsInterceptorsToExistingStringList() { // When: - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of( + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of( StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), DummyConsumerInterceptor.class.getName() + "," diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 4c05cb1a7c90..3f92654072c3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -262,7 +262,7 @@ public static SourceName getCommandsStreamName() { serviceContext, this.restConfig, this.ksqlConfigNoPort); - MetricCollectors.addConfigurableReporter(ksqlConfig); + MetricCollectors.addConfigurableReporter(ksqlConfigNoPort); log.debug("ksqlDB API server instance created"); } @@ -605,12 +605,17 @@ static KsqlRestApplication buildApplication( final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig); + final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); final CommandStore commandStore = CommandStore.Factory.create( commandTopicName, ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), - restConfig.getCommandConsumerProperties(), - restConfig.getCommandProducerProperties() + ksqlConfig.addConfluentMetricsContextConfigsKafka( + restConfig.getCommandConsumerProperties(), + serviceId), + ksqlConfig.addConfluentMetricsContextConfigsKafka( + restConfig.getCommandProducerProperties(), + serviceId) ); final InteractiveStatementExecutor statementExecutor = diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 3003236f8af4..a5abf5678f20 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -15,7 +15,6 @@ package io.confluent.ksql.rest.server; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -23,6 +22,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -65,6 +65,8 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import org.apache.kafka.common.metrics.MetricsReporter; @@ -163,6 +165,9 @@ public void setUp() { when(commandQueue.getCommandTopicName()).thenReturn(CMD_TOPIC_NAME); when(serviceContext.getTopicClient()).thenReturn(topicClient); when(topicClient.isTopicExists(CMD_TOPIC_NAME)).thenReturn(false); + + when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id"); + when(precondition1.checkPrecondition(any(), any())).thenReturn(Optional.empty()); when(precondition2.checkPrecondition(any(), any())).thenReturn(Optional.empty()); @@ -215,7 +220,7 @@ public void shouldCloseSecurityExtensionOnClose() { public void shouldAddConfigurableMetricsReportersIfPresentInKsqlConfig() { // When: final MetricsReporter mockReporter = mock(MetricsReporter.class); - when(ksqlConfig.getConfiguredInstances(any(), any())) + when(ksqlConfig.getConfiguredInstances(anyString(), any(), any())) .thenReturn(Collections.singletonList(mockReporter)); givenAppWithRestConfig(Collections.emptyMap()); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index cb224402ec7f..5423b96b7c2c 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -25,7 +25,9 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -316,8 +318,9 @@ public void shouldAddConfigurableMetricsReportersIfPresentInKsqlConfig() { // When: final MetricsReporter mockReporter = mock(MetricsReporter.class); final KsqlConfig mockKsqlConfig = mock(KsqlConfig.class); - when(mockKsqlConfig.getConfiguredInstances(any(), any())) + when(mockKsqlConfig.getConfiguredInstances(anyString(), any(), any())) .thenReturn(Collections.singletonList(mockReporter)); + when(mockKsqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id"); standaloneExecutor = new StandaloneExecutor( serviceContext, processingLogConfig,