Skip to content

Commit

Permalink
feat: shared runtimes (#7721)
Browse files Browse the repository at this point in the history
Shared runtimes with a feature flag
  • Loading branch information
wcarlson5 authored Aug 26, 2021
1 parent 9e532ce commit 44e8129
Show file tree
Hide file tree
Showing 24 changed files with 1,919 additions and 234 deletions.
14 changes: 14 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,14 @@ public class KsqlConfig extends AbstractConfig {
+ "if false, new lambda queries won't be processed but any existing lambda "
+ "queries are unaffected.";

public static final String KSQL_SHARED_RUNTIME_ENABLED = "ksql.runtime.feature.shared.enabled";
public static final Boolean KSQL_SHARED_RUNTIME_ENABLED_DEFAULT = false;
public static final String KSQL_SHARED_RUNTIME_ENABLED_DOC =
"Feature flag for sharing streams runtimes. "
+ "Default is false. If false, persistent queries will use separate "
+ " runtimes, if true, new queries may share streams instances.";


public static final String KSQL_SUPPRESS_BUFFER_SIZE_BYTES = "ksql.suppress.buffer.size.bytes";
public static final Long KSQL_SUPPRESS_BUFFER_SIZE_BYTES_DEFAULT = -1L;
public static final String KSQL_SUPPRESS_BUFFER_SIZE_BYTES_DOC =
Expand Down Expand Up @@ -1018,6 +1026,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_LAMBDAS_ENABLED_DEFAULT,
Importance.LOW,
KSQL_LAMBDAS_ENABLED_DOC
).define(
KSQL_SHARED_RUNTIME_ENABLED,
Type.BOOLEAN,
KSQL_SHARED_RUNTIME_ENABLED_DEFAULT,
Importance.MEDIUM,
KSQL_SHARED_RUNTIME_ENABLED_DOC
)
.withClientSslSupport();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,11 @@ private void configureGaugeForState(
final String description = String.format("Count of queries in %s state.", state.toString());
final MetricName metricName = metrics.metricName(name, group, description, tags);
final CountMetric countMetric = new CountMetric(metricName, gauge);
metrics.addMetric(metricName, gauge);
try {
metrics.addMetric(metricName, gauge);
} catch (IllegalArgumentException e) {
//not duplicate metrics, can be improved
}
countMetrics.add(countMetric);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void onCreate(
}
perQuery.put(
queryMetadata.getQueryId(),
new PerQueryListener(metrics, metricsPrefix, queryMetadata.getQueryApplicationId())
new PerQueryListener(metrics, metricsPrefix, queryMetadata.getQueryId().toString())
);
}

Expand Down Expand Up @@ -104,35 +104,34 @@ private static class PerQueryListener {
PerQueryListener(
final Metrics metrics,
final String groupPrefix,
final String queryApplicationId
final String queryId
) {
this(metrics, groupPrefix, queryApplicationId, CURRENT_TIME_MILLIS_TICKER);
this(metrics, groupPrefix, queryId, CURRENT_TIME_MILLIS_TICKER);
}

PerQueryListener(
final Metrics metrics,
final String groupPrefix,
final String queryApplicationId,
final String queryId,
final Ticker ticker
) {
Objects.requireNonNull(groupPrefix, "groupPrefix");
Objects.requireNonNull(queryApplicationId, "queryApplicationId");
Objects.requireNonNull(queryId, "queryId");
this.metrics = Objects.requireNonNull(metrics, "metrics cannot be null.");
this.ticker = Objects.requireNonNull(ticker, "ticker");

this.stateMetricName = metrics.metricName(
"query-status",
groupPrefix + "ksql-queries",
"The current status of the given query.",
Collections.singletonMap("status", queryApplicationId));
Collections.singletonMap("status", queryId));

errorMetricName = metrics.metricName(
"error-status",
groupPrefix + "ksql-queries",
"The current error status of the given query, if the state is in ERROR state",
Collections.singletonMap("status", queryApplicationId)
Collections.singletonMap("status", queryId)
);

this.metrics.addMetric(stateMetricName, (Gauge<String>) (config, now) -> state);
this.metrics.addMetric(errorMetricName, (Gauge<String>) (config, now) -> error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import java.util.Map;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;

public interface KafkaStreamsBuilder {
KafkaStreams build(Topology topology, Map<String, Object> conf);

KafkaStreamsNamedTopologyWrapper buildNamedTopologyWrapper(Map<String, Object> conf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;

public class KafkaStreamsBuilderImpl implements KafkaStreamsBuilder {
private final KafkaClientSupplier clientSupplier;
Expand All @@ -29,8 +30,16 @@ public class KafkaStreamsBuilderImpl implements KafkaStreamsBuilder {
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier");
}

@Override
public KafkaStreams build(final Topology topology, final Map<String, Object> conf) {
return new KafkaStreams(topology, PropertiesUtil.asProperties(conf), clientSupplier);
}

public KafkaStreamsNamedTopologyWrapper buildNamedTopologyWrapper(
final Map<String, Object> conf
) {
return new KafkaStreamsNamedTopologyWrapper(
PropertiesUtil.asProperties(conf),
clientSupplier
);
}
}
Loading

0 comments on commit 44e8129

Please sign in to comment.