diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java b/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java index 1383c37d0900..89669c3f1960 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/metrics/MetricCollectors.java @@ -44,20 +44,20 @@ @SuppressWarnings("ClassDataAbstractionCoupling") public final class MetricCollectors { private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics"; - public static final String METRICS_CONTEXT_RESOURCE_LABEL_PREFIX = + public static final String RESOURCE_LABEL_PREFIX = CommonClientConfigs.METRICS_CONTEXT_PREFIX + "resource."; private static final String KSQL_RESOURCE_TYPE = "KSQL"; public static final String RESOURCE_LABEL_TYPE = - METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "type"; + RESOURCE_LABEL_PREFIX + "type"; public static final String RESOURCE_LABEL_VERSION = - METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "version"; + RESOURCE_LABEL_PREFIX + "version"; public static final String RESOURCE_LABEL_COMMIT_ID = - METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "commit.id"; + RESOURCE_LABEL_PREFIX + "commit.id"; public static final String RESOURCE_LABEL_CLUSTER_ID = - METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "cluster.id"; + RESOURCE_LABEL_PREFIX + "cluster.id"; public static final String RESOURCE_LABEL_KSQL_SERVICE_ID = - METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG; + RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG; private static Map collectorMap; private static Metrics metrics; @@ -119,7 +119,11 @@ public static void addConfigurableReporter( final KsqlConfig ksqlConfig ) { final String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); - final List reporters = ksqlConfig.getConfiguredInstances( + final Map props = ksqlConfig.originals(); + props.putAll(addConfluentMetricsContextConfigsForKsql(ksqlServiceId)); + final KsqlConfig ksqlConfigWithMetricsContext = new KsqlConfig(props); + + final List reporters = ksqlConfigWithMetricsContext.getConfiguredInstances( KsqlConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class, Collections.singletonMap( @@ -127,15 +131,10 @@ public static void addConfigurableReporter( ksqlServiceId)); if (reporters.size() > 0) { - final Map metadata = - new HashMap<>(ksqlConfig.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); - final MetricsContext metricsContext = - new KafkaMetricsContext( - KSQL_JMX_PREFIX, - addConfluentMetricsContextConfigs( - metadata, - ksqlServiceId)); - + final MetricsContext metricsContext = new KafkaMetricsContext( + KSQL_JMX_PREFIX, + ksqlConfigWithMetricsContext.originalsWithPrefix( + CommonClientConfigs.METRICS_CONTEXT_PREFIX)); for (final MetricsReporter reporter : reporters) { reporter.contextChange(metricsContext); metrics.addReporter(reporter); @@ -144,18 +143,25 @@ public static void addConfigurableReporter( } public static Map addConfluentMetricsContextConfigs( - final Map props, final String ksqlServiceId ) { - final Map updatedProps = new HashMap<>(props); - updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion()); - updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId()); + final Map updatedProps = new HashMap<>(); updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE); - updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId); updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId); return updatedProps; } + public static Map addConfluentMetricsContextConfigsForKsql( + final String ksqlServiceId + ) { + final Map updatedProps = new HashMap<>(); + updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId); + updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion()); + updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId()); + updatedProps.putAll(addConfluentMetricsContextConfigs(ksqlServiceId)); + return updatedProps; + } + static void remove(final String id) { collectorMap.remove(id); } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 45bf131d7e85..776d803c7bf5 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.config.ConfigDef.ValidString; import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.streams.StreamsConfig; @EffectivelyImmutable @@ -67,12 +68,10 @@ public class KsqlConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC; - - private static final String METRIC_REPORTERS_PREFIX = "metric.reporters"; + private static final String TELEMETRY_PREFIX = "confluent.telemetry"; private static final Set REPORTER_CONFIGS_PREFIXES = ImmutableSet.of( - METRIC_REPORTERS_PREFIX, TELEMETRY_PREFIX, CommonClientConfigs.METRICS_CONTEXT_PREFIX ); @@ -774,34 +773,59 @@ private KsqlConfig(final ConfigGeneration generation, this.ksqlStreamConfigProps = ksqlStreamConfigProps; } + public Map getKsqlStreamConfigProps(final String applicationId) { + final Map map = new HashMap<>(getKsqlStreamConfigProps()); + map.put( + MetricCollectors.RESOURCE_LABEL_PREFIX + + StreamsConfig.APPLICATION_ID_CONFIG, + applicationId + ); + map.putAll( + addConfluentMetricsContextConfigsKafka( + Collections.emptyMap(), + getString(KSQL_SERVICE_ID_CONFIG))); + return Collections.unmodifiableMap(map); + } + public Map getKsqlStreamConfigProps() { final Map map = new HashMap<>(); for (final ConfigValue config : ksqlStreamConfigProps.values()) { map.put(config.key, config.value); } - map.putAll(getMetricsContextConfigs()); return Collections.unmodifiableMap(map); } public Map getKsqlAdminClientConfigProps() { final Map map = new HashMap<>(); map.putAll(getConfigsFor(AdminClientConfig.configNames())); - map.putAll(getMetricsContextConfigs()); + map.putAll( + addConfluentMetricsContextConfigsKafka(Collections.emptyMap(), + getString(KSQL_SERVICE_ID_CONFIG))); return Collections.unmodifiableMap(map); } public Map getProducerClientConfigProps() { final Map map = new HashMap<>(); map.putAll(getConfigsFor(ProducerConfig.configNames())); - map.putAll(getMetricsContextConfigs()); + map.putAll( + addConfluentMetricsContextConfigsKafka(Collections.emptyMap(), + getString(KSQL_SERVICE_ID_CONFIG))); return Collections.unmodifiableMap(map); } - private Map getMetricsContextConfigs() { - final Map map = new HashMap<>(); - map.put(MetricCollectors.RESOURCE_LABEL_CLUSTER_ID, getString(KSQL_SERVICE_ID_CONFIG)); - map.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES)); - return map; + public Map addConfluentMetricsContextConfigsKafka( + final Map props, + final String ksqlServiceId + ) { + final Map updatedProps = new HashMap<>(props); + final AppInfoParser.AppInfo appInfo = new AppInfoParser.AppInfo(System.currentTimeMillis()); + updatedProps.put(MetricCollectors.RESOURCE_LABEL_VERSION, appInfo.getVersion()); + updatedProps.put(MetricCollectors.RESOURCE_LABEL_COMMIT_ID, appInfo.getCommitId()); + + updatedProps.putAll( + MetricCollectors.addConfluentMetricsContextConfigs(ksqlServiceId)); + updatedProps.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES)); + return updatedProps; } public Map getProcessingLogConfigProps() { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index c856bfaadef8..b260f333a91e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -303,7 +303,7 @@ private Map buildStreamsProperties( final QueryId queryId ) { final Map newStreamsProperties - = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps()); + = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps(applicationId)); newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); final ProcessingLogger logger = processingLogContext.getLoggerFactory().getLogger(queryId.toString()); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java index 166e5a8f6e9d..35e305a44ab6 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java @@ -5,6 +5,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; @@ -190,7 +191,7 @@ public void setup() { when(ksqlMaterializationFactory.create(any(), any(), any(), any())).thenReturn(materialization); when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory); when(processingLoggerFactory.getLogger(any())).thenReturn(processingLogger); - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(Collections.emptyMap()); + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(Collections.emptyMap()); when(ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG)) .thenReturn(PERSISTENT_PREFIX); when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn(SERVICE_ID); @@ -420,7 +421,7 @@ private void shouldUseProvidedOptimizationConfig(final Object value) { // Given: final Map properties = Collections.singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION, value); - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(properties); + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(properties); // When: queryBuilder.buildQuery( @@ -473,7 +474,7 @@ private void assertPropertiesContainDummyInterceptors() { @Test public void shouldAddMetricsInterceptorsToExistingList() { // Given: - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of( + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of( StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), ImmutableList.of(DummyConsumerInterceptor.class.getName()), StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), @@ -497,7 +498,7 @@ public void shouldAddMetricsInterceptorsToExistingList() { @Test public void shouldAddMetricsInterceptorsToExistingString() { // When: - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of( + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of( StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), DummyConsumerInterceptor.class.getName(), StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), @@ -522,7 +523,7 @@ public void shouldAddMetricsInterceptorsToExistingString() { @SuppressWarnings("unchecked") public void shouldAddMetricsInterceptorsToExistingStringList() { // When: - when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of( + when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of( StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), DummyConsumerInterceptor.class.getName() + "," diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 131d0d27275c..3f92654072c3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -605,12 +605,17 @@ static KsqlRestApplication buildApplication( final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig); + final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); final CommandStore commandStore = CommandStore.Factory.create( commandTopicName, ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), - restConfig.getCommandConsumerProperties(), - restConfig.getCommandProducerProperties() + ksqlConfig.addConfluentMetricsContextConfigsKafka( + restConfig.getCommandConsumerProperties(), + serviceId), + ksqlConfig.addConfluentMetricsContextConfigsKafka( + restConfig.getCommandProducerProperties(), + serviceId) ); final InteractiveStatementExecutor statementExecutor = diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index d31050463ead..a5abf5678f20 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -148,7 +148,7 @@ public class KsqlRestApplicationTest { @SuppressWarnings("unchecked") @Before - public void setUp() throws ExecutionException, InterruptedException, TimeoutException { + public void setUp() { when(processingLogConfig.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)) .thenReturn(true); when(processingLogConfig.getString(ProcessingLogConfig.STREAM_NAME)) diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index 530ee5b16965..5423b96b7c2c 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -97,7 +97,6 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.test.TestUtils;