Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support metrics in openapi and asyncapi #868

Merged
merged 77 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
9fe7a91
Merge branch 'aklivity:develop' into develop
akrambek Dec 11, 2023
7e3d237
Merge branch 'aklivity:develop' into develop
akrambek Dec 12, 2023
33c4411
Merge branch 'aklivity:develop' into develop
akrambek Dec 13, 2023
fe9e318
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
d8b5e5c
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
ebca7ef
Merge branch 'aklivity:develop' into develop
akrambek Dec 18, 2023
5e3e059
Merge branch 'aklivity:develop' into develop
akrambek Dec 22, 2023
ee71db9
Merge branch 'aklivity:develop' into develop
akrambek Dec 24, 2023
0b7a15a
Merge branch 'aklivity:develop' into develop
akrambek Dec 25, 2023
be13489
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
95df84c
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
3ebdbf5
Merge branch 'aklivity:develop' into develop
akrambek Dec 28, 2023
24ad9e1
Merge branch 'aklivity:develop' into develop
akrambek Dec 30, 2023
6d21fec
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
368a0a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
7069f1a
Merge branch 'aklivity:develop' into develop
akrambek Jan 2, 2024
09b7041
Merge branch 'aklivity:develop' into develop
akrambek Jan 3, 2024
98f1faa
Merge branch 'aklivity:develop' into develop
akrambek Jan 4, 2024
371391a
Merge branch 'aklivity:develop' into develop
akrambek Jan 5, 2024
c6a0882
Merge branch 'aklivity:develop' into develop
akrambek Jan 8, 2024
f99f009
Merge branch 'aklivity:develop' into develop
akrambek Jan 9, 2024
a110b68
Merge branch 'aklivity:develop' into develop
akrambek Jan 11, 2024
80c4625
Merge branch 'aklivity:develop' into develop
akrambek Jan 16, 2024
6617e20
Merge branch 'aklivity:develop' into develop
akrambek Jan 19, 2024
dea9f53
Merge branch 'aklivity:develop' into develop
akrambek Jan 20, 2024
b74db57
Merge branch 'aklivity:develop' into develop
akrambek Jan 23, 2024
4617b54
Merge branch 'aklivity:develop' into develop
akrambek Jan 30, 2024
b3b421d
Merge branch 'aklivity:develop' into develop
akrambek Jan 31, 2024
73d64b1
Merge branch 'aklivity:develop' into develop
akrambek Feb 1, 2024
7bb546e
Merge branch 'aklivity:develop' into develop
akrambek Feb 2, 2024
b1c7901
Merge branch 'aklivity:develop' into develop
akrambek Feb 8, 2024
949df2f
Merge branch 'aklivity:develop' into develop
akrambek Feb 13, 2024
ca946b8
Merge branch 'aklivity:develop' into develop
akrambek Feb 14, 2024
f9dcd75
Merge branch 'aklivity:develop' into develop
akrambek Feb 21, 2024
e1e5e75
Merge branch 'aklivity:develop' into develop
akrambek Feb 22, 2024
5f50549
Merge branch 'aklivity:develop' into develop
akrambek Feb 23, 2024
32725be
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
83b145a
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
75e7709
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
d4c3117
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
a438cfe
Merge branch 'aklivity:develop' into develop
akrambek Mar 1, 2024
36f8f1e
Merge branch 'aklivity:develop' into develop
akrambek Mar 4, 2024
cf8a5c7
Merge branch 'aklivity:develop' into develop
akrambek Mar 10, 2024
7b46270
Merge branch 'aklivity:develop' into develop
akrambek Mar 12, 2024
6461ebf
Merge branch 'aklivity:develop' into develop
akrambek Mar 13, 2024
a97cfb3
Merge branch 'aklivity:develop' into develop
akrambek Mar 14, 2024
fc97dc2
Merge branch 'aklivity:develop' into develop
akrambek Mar 16, 2024
a5edb3f
Merge branch 'aklivity:develop' into develop
akrambek Mar 17, 2024
7a79fd6
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
82c24c3
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
4024562
Support metrics in openapi and asyncapi
akrambek Mar 19, 2024
1f3f305
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
f2de614
Merge branch 'develop' into feature/api-metrics
akrambek Mar 19, 2024
bf0ec8c
Resolve conflict
akrambek Mar 19, 2024
b6074f5
Ignore namespace metrics if not specified
akrambek Mar 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT;
import static java.util.Collections.emptyList;

import java.util.List;

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;

public class AsyncapiClientCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi
Expand All @@ -39,6 +43,8 @@ public BindingConfig adapt(
AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options;
AsyncapiConfig asyncapiConfig = options.specs.get(0);
this.asyncapi = asyncapiConfig.asyncapi;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

//TODO: add composite for all servers
AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue());
Expand All @@ -52,20 +58,23 @@ public BindingConfig adapt(
return BindingConfig.builder(binding)
.composite()
.name(String.format("%s.%s", qname, "$composite"))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.inject(n -> this.injectCatalog(n, asyncapi))
.inject(protocol::injectProtocolClientCache)
.inject(n -> protocol.injectProtocolClientCache(n, metricRefs))
.binding()
.name(String.format("%s_client0", protocol.scheme))
.type(protocol.scheme)
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme))
.inject(protocol::injectProtocolClientOptions)
.exit(isTlsEnabled ? "tls_client0" : "tcp_client0")
.build()
.inject(n -> injectTlsClient(n, options))
.inject(n -> injectTlsClient(n, options, metricRefs))
.binding()
.name("tcp_client0")
.type("tcp")
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "tcp"))
.options(options.tcp)
.build()
.build()
Expand All @@ -74,7 +83,8 @@ public BindingConfig adapt(

private <C> NamespaceConfigBuilder<C> injectTlsClient(
NamespaceConfigBuilder<C> namespace,
AsyncapiOptionsConfig options)
AsyncapiOptionsConfig options,
List<MetricRefConfig> metricRefs)
{
if (isTlsEnabled)
{
Expand All @@ -83,6 +93,7 @@ private <C> NamespaceConfigBuilder<C> injectTlsClient(
.name("tls_client0")
.type("tls")
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "tls"))
.options(options.tls)
.vault(String.format("%s:%s", this.namespace, vault))
.exit("tcp_client0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.MINIMIZE_QUOTES;
import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -36,7 +38,11 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineOptionsConfig;
import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder;

public class AsyncapiCompositeBindingAdapter
{
Expand Down Expand Up @@ -149,4 +155,77 @@ protected static String writeSchemaYaml(
}
return result;
}

protected <C> BindingConfigBuilder<C> injectMetrics(
BindingConfigBuilder<C> binding,
List<MetricRefConfig> metricRefs,
String protocol)
{
List<MetricRefConfig> metrics = metricRefs.stream()
.filter(m -> m.name.startsWith("stream."))
.collect(toList());

if (!metrics.isEmpty())
{
final TelemetryRefConfigBuilder<BindingConfigBuilder<C>> telemetry = binding.telemetry();
metrics.forEach(telemetry::metric);
telemetry.build();
}

return binding;
}

protected NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> injectNamespaceMetric(
NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> namespace,
boolean hasMetrics)
{
if (hasMetrics)
{
namespace
.telemetry()
.metric()
.group("stream")
.name("stream.active.received")
.build()
.metric()
.group("stream")
.name("stream.active.sent")
.build()
.metric()
.group("stream")
.name("stream.opens.received")
.build()
.metric()
.group("stream")
.name("stream.opens.sent")
.build()
.metric()
.group("stream")
.name("stream.data.received")
.build()
.metric()
.group("stream")
.name("stream.data.sent")
.build()
.metric()
.group("stream")
.name("stream.errors.received")
.build()
.metric()
.group("stream")
.name("stream.errors.sent")
.build()
.metric()
.group("stream")
.name("stream.closes.received")
.build()
.metric()
.group("stream")
.name("stream.closes.sent")
.build()
.build();
}

return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import static java.util.Objects.requireNonNull;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage;
Expand All @@ -28,7 +30,9 @@
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder;

public abstract class AsyncapiProtocol
{
Expand Down Expand Up @@ -62,7 +66,8 @@ public abstract <C> BindingConfigBuilder<C> injectProtocolServerRoutes(
BindingConfigBuilder<C> binding);

public <C> NamespaceConfigBuilder<C> injectProtocolClientCache(
NamespaceConfigBuilder<C> namespace)
NamespaceConfigBuilder<C> namespace,
List<MetricRefConfig> metricRefs)
{
return namespace;
}
Expand Down Expand Up @@ -145,4 +150,18 @@ protected URI findFirstServerUrlWithScheme(
}
return result;
}

protected <C> BindingConfigBuilder<C> injectMetrics(
BindingConfigBuilder<C> binding,
List<MetricRefConfig> metricRefs,
String protocol)
{
final TelemetryRefConfigBuilder<BindingConfigBuilder<C>> telemetry = binding.telemetry();
metricRefs.stream()
.filter(m -> m.name.startsWith("stream."))
.collect(Collectors.toList())
.forEach(m -> telemetry.metric(m));
telemetry.build();
return binding;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY;
import static java.util.Collections.emptyList;

import java.util.Collections;
import java.util.List;
Expand All @@ -33,6 +34,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder;

public class AsyncapiProxyCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi
Expand All @@ -58,6 +60,8 @@ public BindingConfig adapt(
.collect(Collectors.toList());
this.asyncApis = options.specs.stream().collect(Collectors.toUnmodifiableMap(a -> a.apiLabel, a -> a.asyncapi));
this.qname = binding.qname;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

String sessions = "";
String messages = "";
Expand All @@ -83,10 +87,12 @@ public BindingConfig adapt(
return BindingConfig.builder(binding)
.composite()
.name(String.format("%s/%s", qname, "mqtt-kafka"))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.binding()
.name("mqtt_kafka_proxy0")
.type("mqtt-kafka")
.kind(PROXY)
.inject(b -> this.injectMetrics(b, metricRefs, "mqtt-kafka"))
.options(MqttKafkaOptionsConfig::builder)
.topics()
.sessions(sessions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER;
import static java.util.Collections.emptyList;

import java.util.List;

import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig;
Expand All @@ -25,6 +28,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;

public class AsyncapiServerCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi
Expand All @@ -44,6 +48,8 @@ public BindingConfig adapt(
AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options;
AsyncapiConfig asyncapiConfig = options.specs.get(0);
this.asyncapi = asyncapiConfig.asyncapi;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

//TODO: add composite for all servers
AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue());
Expand All @@ -57,22 +63,25 @@ public BindingConfig adapt(
return BindingConfig.builder(binding)
.composite()
.name(String.format("%s/%s", qname, protocol.scheme))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.inject(n -> this.injectCatalog(n, asyncapi))
.binding()
.name("tcp_server0")
.type("tcp")
.kind(SERVER)
.inject(b -> this.injectMetrics(b, metricRefs, "tcp"))
.options(TcpOptionsConfig::builder)
.host("0.0.0.0")
.ports(compositePorts)
.build()
.inject(this::injectPlainTcpRoute)
.inject(this::injectTlsTcpRoute)
.inject(b -> this.injectTlsTcpRoute(b, metricRefs))
.build()
.inject(n -> injectTlsServer(n, options))
.binding()
.name(String.format("%s_server0", protocol.scheme))
.type(protocol.scheme)
.inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme))
.kind(SERVER)
.inject(protocol::injectProtocolServerOptions)
.inject(protocol::injectProtocolServerRoutes)
Expand All @@ -98,11 +107,13 @@ private <C> BindingConfigBuilder<C> injectPlainTcpRoute(
}

private <C> BindingConfigBuilder<C> injectTlsTcpRoute(
BindingConfigBuilder<C> binding)
BindingConfigBuilder<C> binding,
List<MetricRefConfig> metricRefs)
{
if (isTlsEnabled)
{
binding
.inject(b -> this.injectMetrics(b, metricRefs, "tls"))
.route()
.when(TcpConditionConfig::builder)
.ports(compositePorts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal;

import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -34,6 +35,7 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.KindConfig;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder;
import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;

Expand All @@ -60,13 +62,15 @@ public AyncapiKafkaProtocol(

@Override
public <C> NamespaceConfigBuilder<C> injectProtocolClientCache(
NamespaceConfigBuilder<C> namespace)
NamespaceConfigBuilder<C> namespace,
List<MetricRefConfig> metricRefs)
{
return namespace
.binding()
.name("kafka_cache_client0")
.type("kafka")
.kind(KindConfig.CACHE_CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "kafka"))
.options(KafkaOptionsConfig::builder)
.inject(this::injectKafkaTopicOptions)
.build()
Expand All @@ -76,6 +80,7 @@ public <C> NamespaceConfigBuilder<C> injectProtocolClientCache(
.name("kafka_cache_server0")
.type("kafka")
.kind(KindConfig.CACHE_SERVER)
.inject(b -> this.injectMetrics(b, metricRefs, "kafka"))
.options(KafkaOptionsConfig::builder)
.inject(this::injectKafkaBootstrapOptions)
.inject(this::injectKafkaTopicOptions)
Expand Down
Loading