Skip to content

Commit

Permalink
Support metrics in openapi and asyncapi (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek authored Mar 19, 2024
1 parent 0f9950d commit 4271281
Show file tree
Hide file tree
Showing 13 changed files with 398 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT;
import static java.util.Collections.emptyList;

import java.util.List;

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;

public class AsyncapiClientCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi
Expand All @@ -39,6 +43,8 @@ public BindingConfig adapt(
AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options;
AsyncapiConfig asyncapiConfig = options.specs.get(0);
this.asyncapi = asyncapiConfig.asyncapi;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

//TODO: add composite for all servers
AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue());
Expand All @@ -52,20 +58,23 @@ public BindingConfig adapt(
return BindingConfig.builder(binding)
.composite()
.name(String.format("%s.%s", qname, "$composite"))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.inject(n -> this.injectCatalog(n, asyncapi))
.inject(protocol::injectProtocolClientCache)
.inject(n -> protocol.injectProtocolClientCache(n, metricRefs))
.binding()
.name(String.format("%s_client0", protocol.scheme))
.type(protocol.scheme)
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme))
.inject(protocol::injectProtocolClientOptions)
.exit(isTlsEnabled ? "tls_client0" : "tcp_client0")
.build()
.inject(n -> injectTlsClient(n, options))
.inject(n -> injectTlsClient(n, options, metricRefs))
.binding()
.name("tcp_client0")
.type("tcp")
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "tcp"))
.options(options.tcp)
.build()
.build()
Expand All @@ -74,7 +83,8 @@ public BindingConfig adapt(

private <C> NamespaceConfigBuilder<C> injectTlsClient(
NamespaceConfigBuilder<C> namespace,
AsyncapiOptionsConfig options)
AsyncapiOptionsConfig options,
List<MetricRefConfig> metricRefs)
{
if (isTlsEnabled)
{
Expand All @@ -83,6 +93,7 @@ private <C> NamespaceConfigBuilder<C> injectTlsClient(
.name("tls_client0")
.type("tls")
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "tls"))
.options(options.tls)
.vault(String.format("%s:%s", this.namespace, vault))
.exit("tcp_client0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.MINIMIZE_QUOTES;
import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -36,7 +38,11 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineOptionsConfig;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder;

public class AsyncapiCompositeBindingAdapter
{
Expand Down Expand Up @@ -149,4 +155,77 @@ protected static String writeSchemaYaml(
}
return result;
}

protected <C> BindingConfigBuilder<C> injectMetrics(
BindingConfigBuilder<C> binding,
List<MetricRefConfig> metricRefs,
String protocol)
{
List<MetricRefConfig> metrics = metricRefs.stream()
.filter(m -> m.name.startsWith("stream."))
.collect(toList());

if (!metrics.isEmpty())
{
final TelemetryRefConfigBuilder<BindingConfigBuilder<C>> telemetry = binding.telemetry();
metrics.forEach(telemetry::metric);
telemetry.build();
}

return binding;
}

protected NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> injectNamespaceMetric(
NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> namespace,
boolean hasMetrics)
{
if (hasMetrics)
{
namespace
.telemetry()
.metric()
.group("stream")
.name("stream.active.received")
.build()
.metric()
.group("stream")
.name("stream.active.sent")
.build()
.metric()
.group("stream")
.name("stream.opens.received")
.build()
.metric()
.group("stream")
.name("stream.opens.sent")
.build()
.metric()
.group("stream")
.name("stream.data.received")
.build()
.metric()
.group("stream")
.name("stream.data.sent")
.build()
.metric()
.group("stream")
.name("stream.errors.received")
.build()
.metric()
.group("stream")
.name("stream.errors.sent")
.build()
.metric()
.group("stream")
.name("stream.closes.received")
.build()
.metric()
.group("stream")
.name("stream.closes.sent")
.build()
.build();
}

return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import static java.util.Objects.requireNonNull;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
Expand All @@ -28,7 +30,9 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder;

public abstract class AsyncapiProtocol
{
Expand Down Expand Up @@ -62,7 +66,8 @@ public abstract <C> BindingConfigBuilder<C> injectProtocolServerRoutes(
BindingConfigBuilder<C> binding);

public <C> NamespaceConfigBuilder<C> injectProtocolClientCache(
NamespaceConfigBuilder<C> namespace)
NamespaceConfigBuilder<C> namespace,
List<MetricRefConfig> metricRefs)
{
return namespace;
}
Expand Down Expand Up @@ -145,4 +150,18 @@ protected URI findFirstServerUrlWithScheme(
}
return result;
}

protected <C> BindingConfigBuilder<C> injectMetrics(
BindingConfigBuilder<C> binding,
List<MetricRefConfig> metricRefs,
String protocol)
{
final TelemetryRefConfigBuilder<BindingConfigBuilder<C>> telemetry = binding.telemetry();
metricRefs.stream()
.filter(m -> m.name.startsWith("stream."))
.collect(Collectors.toList())
.forEach(m -> telemetry.metric(m));
telemetry.build();
return binding;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY;
import static java.util.Collections.emptyList;

import java.util.Collections;
import java.util.List;
Expand All @@ -33,6 +34,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder;

public class AsyncapiProxyCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi
Expand All @@ -58,6 +60,8 @@ public BindingConfig adapt(
.collect(Collectors.toList());
this.asyncApis = options.specs.stream().collect(Collectors.toUnmodifiableMap(a -> a.apiLabel, a -> a.asyncapi));
this.qname = binding.qname;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

String sessions = "";
String messages = "";
Expand All @@ -83,10 +87,12 @@ public BindingConfig adapt(
return BindingConfig.builder(binding)
.composite()
.name(String.format("%s/%s", qname, "mqtt-kafka"))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.binding()
.name("mqtt_kafka_proxy0")
.type("mqtt-kafka")
.kind(PROXY)
.inject(b -> this.injectMetrics(b, metricRefs, "mqtt-kafka"))
.options(MqttKafkaOptionsConfig::builder)
.topics()
.sessions(sessions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER;
import static java.util.Collections.emptyList;

import java.util.List;

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
Expand All @@ -25,6 +28,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;

public class AsyncapiServerCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi
Expand All @@ -44,6 +48,8 @@ public BindingConfig adapt(
AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options;
AsyncapiConfig asyncapiConfig = options.specs.get(0);
this.asyncapi = asyncapiConfig.asyncapi;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

//TODO: add composite for all servers
AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue());
Expand All @@ -57,22 +63,25 @@ public BindingConfig adapt(
return BindingConfig.builder(binding)
.composite()
.name(String.format("%s/%s", qname, protocol.scheme))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.inject(n -> this.injectCatalog(n, asyncapi))
.binding()
.name("tcp_server0")
.type("tcp")
.kind(SERVER)
.inject(b -> this.injectMetrics(b, metricRefs, "tcp"))
.options(TcpOptionsConfig::builder)
.host("0.0.0.0")
.ports(compositePorts)
.build()
.inject(this::injectPlainTcpRoute)
.inject(this::injectTlsTcpRoute)
.inject(b -> this.injectTlsTcpRoute(b, metricRefs))
.build()
.inject(n -> injectTlsServer(n, options))
.binding()
.name(String.format("%s_server0", protocol.scheme))
.type(protocol.scheme)
.inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme))
.kind(SERVER)
.inject(protocol::injectProtocolServerOptions)
.inject(protocol::injectProtocolServerRoutes)
Expand All @@ -98,11 +107,13 @@ private <C> BindingConfigBuilder<C> injectPlainTcpRoute(
}

private <C> BindingConfigBuilder<C> injectTlsTcpRoute(
BindingConfigBuilder<C> binding)
BindingConfigBuilder<C> binding,
List<MetricRefConfig> metricRefs)
{
if (isTlsEnabled)
{
binding
.inject(b -> this.injectMetrics(b, metricRefs, "tls"))
.route()
.when(TcpConditionConfig::builder)
.ports(compositePorts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -34,6 +35,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.KindConfig;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;

Expand All @@ -60,13 +62,15 @@ public AyncapiKafkaProtocol(

@Override
public <C> NamespaceConfigBuilder<C> injectProtocolClientCache(
NamespaceConfigBuilder<C> namespace)
NamespaceConfigBuilder<C> namespace,
List<MetricRefConfig> metricRefs)
{
return namespace
.binding()
.name("kafka_cache_client0")
.type("kafka")
.kind(KindConfig.CACHE_CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "kafka"))
.options(KafkaOptionsConfig::builder)
.inject(this::injectKafkaTopicOptions)
.build()
Expand All @@ -76,6 +80,7 @@ public <C> NamespaceConfigBuilder<C> injectProtocolClientCache(
.name("kafka_cache_server0")
.type("kafka")
.kind(KindConfig.CACHE_SERVER)
.inject(b -> this.injectMetrics(b, metricRefs, "kafka"))
.options(KafkaOptionsConfig::builder)
.inject(this::injectKafkaBootstrapOptions)
.inject(this::injectKafkaTopicOptions)
Expand Down
Loading

0 comments on commit 4271281

Please sign in to comment.