Skip to content

Commit

Permalink
more feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Jun 4, 2020
1 parent c082556 commit 9ff74dd
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@
@SuppressWarnings("ClassDataAbstractionCoupling")
public final class MetricCollectors {
private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";
public static final String METRICS_CONTEXT_RESOURCE_LABEL_PREFIX =
public static final String RESOURCE_LABEL_PREFIX =
CommonClientConfigs.METRICS_CONTEXT_PREFIX + "resource.";
private static final String KSQL_RESOURCE_TYPE = "KSQL";

public static final String RESOURCE_LABEL_TYPE =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "type";
RESOURCE_LABEL_PREFIX + "type";
public static final String RESOURCE_LABEL_VERSION =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "version";
RESOURCE_LABEL_PREFIX + "version";
public static final String RESOURCE_LABEL_COMMIT_ID =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "commit.id";
RESOURCE_LABEL_PREFIX + "commit.id";
public static final String RESOURCE_LABEL_CLUSTER_ID =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + "cluster.id";
RESOURCE_LABEL_PREFIX + "cluster.id";
public static final String RESOURCE_LABEL_KSQL_SERVICE_ID =
METRICS_CONTEXT_RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG;
RESOURCE_LABEL_PREFIX + KsqlConfig.KSQL_SERVICE_ID_CONFIG;

private static Map<String, MetricCollector> collectorMap;
private static Metrics metrics;
Expand Down Expand Up @@ -119,23 +119,22 @@ public static void addConfigurableReporter(
final KsqlConfig ksqlConfig
) {
final String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final List<MetricsReporter> reporters = ksqlConfig.getConfiguredInstances(
final Map<String, Object> props = ksqlConfig.originals();
props.putAll(addConfluentMetricsContextConfigsForKsql(ksqlServiceId));
final KsqlConfig ksqlConfigWithMetricsContext = new KsqlConfig(props);

final List<MetricsReporter> reporters = ksqlConfigWithMetricsContext.getConfiguredInstances(
KsqlConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(
KsqlConfig.KSQL_SERVICE_ID_CONFIG,
ksqlServiceId));

if (reporters.size() > 0) {
final Map<String, Object> metadata =
new HashMap<>(ksqlConfig.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
final MetricsContext metricsContext =
new KafkaMetricsContext(
KSQL_JMX_PREFIX,
addConfluentMetricsContextConfigs(
metadata,
ksqlServiceId));

final MetricsContext metricsContext = new KafkaMetricsContext(
KSQL_JMX_PREFIX,
ksqlConfigWithMetricsContext.originalsWithPrefix(
CommonClientConfigs.METRICS_CONTEXT_PREFIX));
for (final MetricsReporter reporter : reporters) {
reporter.contextChange(metricsContext);
metrics.addReporter(reporter);
Expand All @@ -144,18 +143,25 @@ public static void addConfigurableReporter(
}

public static Map<String, Object> addConfluentMetricsContextConfigs(
final Map<String,Object> props,
final String ksqlServiceId
) {
final Map<String, Object> updatedProps = new HashMap<>(props);
updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion());
updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId());
final Map<String, Object> updatedProps = new HashMap<>();
updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE);
updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId);
updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId);
return updatedProps;
}

public static Map<String, Object> addConfluentMetricsContextConfigsForKsql(
final String ksqlServiceId
) {
final Map<String, Object> updatedProps = new HashMap<>();
updatedProps.put(RESOURCE_LABEL_KSQL_SERVICE_ID, ksqlServiceId);
updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion());
updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId());
updatedProps.putAll(addConfluentMetricsContextConfigs(ksqlServiceId));
return updatedProps;
}

static void remove(final String id) {
collectorMap.remove(id);
}
Expand Down
46 changes: 35 additions & 11 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.common.config.ConfigDef.ValidString;
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.streams.StreamsConfig;

@EffectivelyImmutable
Expand All @@ -67,12 +68,10 @@ public class KsqlConfig extends AbstractConfig {

public static final String METRIC_REPORTER_CLASSES_DOC =
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;

private static final String METRIC_REPORTERS_PREFIX = "metric.reporters";

private static final String TELEMETRY_PREFIX = "confluent.telemetry";
private static final Set<String> REPORTER_CONFIGS_PREFIXES =
ImmutableSet.of(
METRIC_REPORTERS_PREFIX,
TELEMETRY_PREFIX,
CommonClientConfigs.METRICS_CONTEXT_PREFIX
);
Expand Down Expand Up @@ -774,34 +773,59 @@ private KsqlConfig(final ConfigGeneration generation,
this.ksqlStreamConfigProps = ksqlStreamConfigProps;
}

public Map<String, Object> getKsqlStreamConfigProps(final String applicationId) {
final Map<String, Object> map = new HashMap<>(getKsqlStreamConfigProps());
map.put(
MetricCollectors.RESOURCE_LABEL_PREFIX
+ StreamsConfig.APPLICATION_ID_CONFIG,
applicationId
);
map.putAll(
addConfluentMetricsContextConfigsKafka(
Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getKsqlStreamConfigProps() {
final Map<String, Object> map = new HashMap<>();
for (final ConfigValue config : ksqlStreamConfigProps.values()) {
map.put(config.key, config.value);
}
map.putAll(getMetricsContextConfigs());
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getKsqlAdminClientConfigProps() {
final Map<String, Object> map = new HashMap<>();
map.putAll(getConfigsFor(AdminClientConfig.configNames()));
map.putAll(getMetricsContextConfigs());
map.putAll(
addConfluentMetricsContextConfigsKafka(Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
return Collections.unmodifiableMap(map);
}

public Map<String, Object> getProducerClientConfigProps() {
final Map<String, Object> map = new HashMap<>();
map.putAll(getConfigsFor(ProducerConfig.configNames()));
map.putAll(getMetricsContextConfigs());
map.putAll(
addConfluentMetricsContextConfigsKafka(Collections.emptyMap(),
getString(KSQL_SERVICE_ID_CONFIG)));
return Collections.unmodifiableMap(map);
}

private Map<String,Object> getMetricsContextConfigs() {
final Map<String, Object> map = new HashMap<>();
map.put(MetricCollectors.RESOURCE_LABEL_CLUSTER_ID, getString(KSQL_SERVICE_ID_CONFIG));
map.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
return map;
public Map<String, Object> addConfluentMetricsContextConfigsKafka(
final Map<String,Object> props,
final String ksqlServiceId
) {
final Map<String, Object> updatedProps = new HashMap<>(props);
final AppInfoParser.AppInfo appInfo = new AppInfoParser.AppInfo(System.currentTimeMillis());
updatedProps.put(MetricCollectors.RESOURCE_LABEL_VERSION, appInfo.getVersion());
updatedProps.put(MetricCollectors.RESOURCE_LABEL_COMMIT_ID, appInfo.getCommitId());

updatedProps.putAll(
MetricCollectors.addConfluentMetricsContextConfigs(ksqlServiceId));
updatedProps.putAll(getConfigsForPrefix(REPORTER_CONFIGS_PREFIXES));
return updatedProps;
}

public Map<String, Object> getProcessingLogConfigProps() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private Map<String, Object> buildStreamsProperties(
final QueryId queryId
) {
final Map<String, Object> newStreamsProperties
= new HashMap<>(ksqlConfig.getKsqlStreamConfigProps());
= new HashMap<>(ksqlConfig.getKsqlStreamConfigProps(applicationId));
newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
final ProcessingLogger logger
= processingLogContext.getLoggerFactory().getLogger(queryId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -190,7 +191,7 @@ public void setup() {
when(ksqlMaterializationFactory.create(any(), any(), any(), any())).thenReturn(materialization);
when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory);
when(processingLoggerFactory.getLogger(any())).thenReturn(processingLogger);
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(Collections.emptyMap());
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(Collections.emptyMap());
when(ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG))
.thenReturn(PERSISTENT_PREFIX);
when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn(SERVICE_ID);
Expand Down Expand Up @@ -420,7 +421,7 @@ private void shouldUseProvidedOptimizationConfig(final Object value) {
// Given:
final Map<String, Object> properties =
Collections.singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION, value);
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(properties);
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(properties);

// When:
queryBuilder.buildQuery(
Expand Down Expand Up @@ -473,7 +474,7 @@ private void assertPropertiesContainDummyInterceptors() {
@Test
public void shouldAddMetricsInterceptorsToExistingList() {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
ImmutableList.of(DummyConsumerInterceptor.class.getName()),
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
Expand All @@ -497,7 +498,7 @@ public void shouldAddMetricsInterceptorsToExistingList() {
@Test
public void shouldAddMetricsInterceptorsToExistingString() {
// When:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyConsumerInterceptor.class.getName(),
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
Expand All @@ -522,7 +523,7 @@ public void shouldAddMetricsInterceptorsToExistingString() {
@SuppressWarnings("unchecked")
public void shouldAddMetricsInterceptorsToExistingStringList() {
// When:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyConsumerInterceptor.class.getName()
+ ","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,17 @@ static KsqlRestApplication buildApplication(

final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig);

final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final CommandStore commandStore = CommandStore.Factory.create(
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
restConfig.getCommandConsumerProperties(),
restConfig.getCommandProducerProperties()
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandConsumerProperties(),
serviceId),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandProducerProperties(),
serviceId)
);

final InteractiveStatementExecutor statementExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public class KsqlRestApplicationTest {

@SuppressWarnings("unchecked")
@Before
public void setUp() throws ExecutionException, InterruptedException, TimeoutException {
public void setUp() {
when(processingLogConfig.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE))
.thenReturn(true);
when(processingLogConfig.getString(ProcessingLogConfig.STREAM_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.test.TestUtils;
Expand Down

0 comments on commit 9ff74dd

Please sign in to comment.