From 2cef977324c9b78508de72b0eb49c6eff88346c0 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 25 Oct 2023 13:35:29 -0700 Subject: [PATCH 1/4] Adjust padding to accommodate good enough headers and don't include partial data frame while computing crc32c value --- .../internal/stream/KafkaClientProduceFactory.java | 9 +++++++-- .../kafka/internal/stream/ClientMergedIT.java | 6 +++--- .../produce/message.value.100k/client.rpt | 2 +- .../produce/message.value.100k/server.rpt | 4 ++-- .../application/produce/message.value.10k/client.rpt | 2 +- .../application/produce/message.value.10k/server.rpt | 4 ++-- .../produce/message.values.sequential/client.rpt | 2 +- .../produce/message.values.sequential/server.rpt | 2 +- .../produce.v3/message.values.sequential/client.rpt | 12 ++++++------ .../produce.v3/message.values.sequential/server.rpt | 12 ++++++------ 10 files changed, 30 insertions(+), 25 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index d6c2cb2b5b..9690473ba5 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -86,7 +86,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i { private static final int PRODUCE_REQUEST_RECORDS_OFFSET_MAX = 512; - private static final int KAFKA_RECORD_FRAMING = 100; // TODO + private static final int KAFKA_RECORD_FRAMING = 512; // TODO private static final int FLAGS_CON = 0x00; private static final int FLAGS_FIN = 0x01; @@ -539,6 +539,7 @@ private int flushRecordInit( final int valueSize = payload != null ? payload.sizeof() : 0; client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred; + final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + KAFKA_RECORD_FRAMING; if (client.encodeSlot != NO_SLOT && maxEncodeableBytes > encodePool.slotCapacity()) @@ -1191,6 +1192,7 @@ private final class KafkaProduceClient extends KafkaSaslClient private int encodeableRecordCount; private int encodeableRecordBytes; private int encodeableRecordBytesDeferred; + private int encodeableRecordValueBytes; private int flushableRequestBytes; private int decodeSlot = NO_SLOT; @@ -1652,6 +1654,7 @@ private void doEncodeRecordInit( encodeSlotBuffer.putBytes(encodeSlotLimit, encodeBuffer, 0, encodeProgress); encodeSlotLimit += encodeProgress; + encodeableRecordValueBytes = 0; if (headersCount > 0) { @@ -1689,6 +1692,7 @@ private void doEncodeRecordCont( encodeSlotBuffer.putBytes(encodeSlotLimit, value.buffer(), value.offset(), length); encodeSlotLimit += length; + encodeableRecordValueBytes += length; if ((flags & FLAGS_FIN) == 0) { @@ -1893,7 +1897,8 @@ private void doEncodeProduceRequest( final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot); final int encodeSlotBytePosition = encodeSlotByteBuffer.position(); - encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit); + final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0; + encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize); encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit); final CRC32C crc = crc32c; diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java index 8ae2d6fd51..7803a7ce1f 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java @@ -49,7 +49,7 @@ public class ClientMergedIT .countersBufferCapacity(8192) .configure(ENGINE_BUFFER_SLOT_CAPACITY, 8192) .configure(KAFKA_CLIENT_META_MAX_AGE_MILLIS, 1000) - .configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 116) + .configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 528) .configurationRoot("io/aklivity/zilla/specs/binding/kafka/config") .external("net0") .clean(); @@ -234,7 +234,7 @@ public void shouldProduceMergedMessageValues() throws Exception @Configure( name = "zilla.binding.kafka.client.produce.max.bytes", value = "200000") - @ScriptProperty("padding ${512 + 100}") + @ScriptProperty("padding ${512 + 512}") public void shouldProduceMergedMessageValue10k() throws Exception { k3po.finish(); @@ -248,7 +248,7 @@ public void shouldProduceMergedMessageValue10k() throws Exception @Configure( name = "zilla.binding.kafka.client.produce.max.bytes", value = "200000") - @ScriptProperty("padding ${512 + 100}") + @ScriptProperty("padding ${512 + 512}") public void shouldProduceMergedMessageValue100k() throws Exception { k3po.finish(); diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt index b21502d3b5..cd5237ac4a 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx() write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(102400 - 8192 + 512 + 100) + .deferred(102400 - 8192 + 512 + 512) .timestamp(newTimestamp) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt index cf93c76e87..6cd3a8d747 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0" accept ${serverAddress} option zilla:window 8192 - option zilla:padding 612 + option zilla:padding 1024 option zilla:transmission "half-duplex" accepted @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx() read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(102400 - 8192 + 512 + 100) + .deferred(102400 - 8192 + 512 + 512) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt index dcd8f9d0de..b054e49965 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx() write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(10240 - 8192 + 512 + 100) + .deferred(10240 - 8192 + 512 + 512) .timestamp(newTimestamp) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt index c248b9e910..6b3eaeb77b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0" accept ${serverAddress} option zilla:window 8192 - option zilla:padding 612 + option zilla:padding 1024 option zilla:transmission "half-duplex" accepted @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx() read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(10240 - 8192 + 512 + 100) + .deferred(10240 - 8192 + 512 + 512) .build() .build()} read zilla:data.ext ${kafka:matchDataEx() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt index 8ddf6911ff..5b8050c2df 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt @@ -81,7 +81,7 @@ write zilla:data.ext ${kafka:dataEx() .produce() .build() .build()} -write ${kafka:randomBytes(7580)} +write ${kafka:randomBytes(8192-(512+512))} write flush write zilla:data.ext ${kafka:dataEx() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt index b9d3f4903c..5a2d505c1c 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt @@ -79,7 +79,7 @@ read zilla:data.ext ${kafka:matchDataEx() .produce() .build() .build()} -read [0..7580] +read [0..7168] read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt index 6e967c671d..9b48621d87 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt @@ -78,7 +78,7 @@ write zilla:begin.ext ${proxy:beginEx() connected -write 7690 +write 7278 0s 3s ${newRequestId} @@ -90,9 +90,9 @@ write 7690 4s "test" 1 0 - 7650 # record set size + 7238 # record set size 0L # first offset - 7638 # length + 7226 # length -1 [0x02] 0x4e8723aa @@ -104,13 +104,13 @@ write 7690 -1s -1 1 # records - ${kafka:varint(7587)} + ${kafka:varint(7175)} [0x00] ${kafka:varint(0)} ${kafka:varint(0)} ${kafka:varint(-1)} # key - ${kafka:varint(7580)} # value - ${kafka:randomBytes(7580)} + ${kafka:varint(7168)} # value + ${kafka:randomBytes(7168)} ${kafka:varint(0)} # headers read 44 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt index 0c2dae01c2..a6aaebb5a4 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt @@ -74,7 +74,7 @@ read zilla:begin.ext ${proxy:beginEx() connected -read 7690 +read 7278 0s 3s (int:requestId) @@ -86,9 +86,9 @@ read 7690 4s "test" 1 0 - 7650 # record set size + 7238 # record set size 0L # first offset - 7638 # length + 7226 # length -1 [0x02] [0..4] @@ -100,13 +100,13 @@ read 7690 -1s -1 1 # records - ${kafka:varint(7587)} + ${kafka:varint(7175)} [0x00] ${kafka:varint(0)} ${kafka:varint(0)} ${kafka:varint(-1)} # key - ${kafka:varint(7580)} # value - [0..7580] + ${kafka:varint(7168)} # value + [0..7168] ${kafka:varint(0)} # headers write 44 From 40245626bac3d39eaf018ba439712ba76f55c568 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 19 Mar 2024 12:48:06 +0000 Subject: [PATCH 2/4] Support metrics in openapi and asyncapi --- ...AsyncapiClientCompositeBindingAdapter.java | 17 ++- .../AsyncapiCompositeBindingAdapter.java | 75 ++++++++++++ .../asyncapi/internal/AsyncapiProtocol.java | 21 +++- .../AsyncapiProxyCompositeBindingAdapter.java | 6 + ...AsyncapiServerCompositeBindingAdapter.java | 15 ++- .../internal/AyncapiKafkaProtocol.java | 7 +- .../OpenapiAsyncCompositeBindingAdapter.java | 79 +++++++++++- .../config/OpenapiBindingAdapter.java | 56 +++++++++ .../OpenapiClientCompositeBindingAdapter.java | 42 +++---- .../OpenapiCompositeBindingAdapter.java | 114 ++++++++++++++---- .../OpenapiServerCompositeBindingAdapter.java | 41 ++----- .../src/main/moditect/module-info.java | 2 +- ...e.engine.config.CompositeBindingAdapterSpi | 2 +- 13 files changed, 386 insertions(+), 91 deletions(-) create mode 100644 incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingAdapter.java diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java index 34cc1e7bf6..f2467929e0 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java @@ -15,12 +15,16 @@ package io.aklivity.zilla.runtime.binding.asyncapi.internal; import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; +import static java.util.Collections.emptyList; + +import java.util.List; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; public class AsyncapiClientCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi @@ -39,6 +43,8 @@ public BindingConfig adapt( AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options; AsyncapiConfig asyncapiConfig = options.specs.get(0); this.asyncapi = asyncapiConfig.asyncapi; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); //TODO: add composite for all servers AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue()); @@ -52,20 +58,23 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s.%s", qname, "$composite")) + .inject(this::injectNamespaceMetric) .inject(n -> this.injectCatalog(n, asyncapi)) - .inject(protocol::injectProtocolClientCache) + .inject(n -> protocol.injectProtocolClientCache(n, metricRefs)) .binding() .name(String.format("%s_client0", protocol.scheme)) .type(protocol.scheme) .kind(CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme)) .inject(protocol::injectProtocolClientOptions) .exit(isTlsEnabled ? "tls_client0" : "tcp_client0") .build() - .inject(n -> injectTlsClient(n, options)) + .inject(n -> injectTlsClient(n, options, metricRefs)) .binding() .name("tcp_client0") .type("tcp") .kind(CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .options(options.tcp) .build() .build() @@ -74,7 +83,8 @@ public BindingConfig adapt( private NamespaceConfigBuilder injectTlsClient( NamespaceConfigBuilder namespace, - AsyncapiOptionsConfig options) + AsyncapiOptionsConfig options, + List metricRefs) { if (isTlsEnabled) { @@ -83,6 +93,7 @@ private NamespaceConfigBuilder injectTlsClient( .name("tls_client0") .type("tls") .kind(CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, "tls")) .options(options.tls) .vault(String.format("%s:%s", this.namespace, vault)) .exit("tcp_client0") diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java index aaa35c87e2..3fd190ccb7 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java @@ -16,8 +16,10 @@ import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.MINIMIZE_QUOTES; import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; +import static java.util.stream.Collectors.toList; import static org.agrona.LangUtil.rethrowUnchecked; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,7 +38,11 @@ import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiSchemaView; import io.aklivity.zilla.runtime.catalog.inline.config.InlineOptionsConfig; import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; public class AsyncapiCompositeBindingAdapter { @@ -149,4 +155,73 @@ protected static String writeSchemaYaml( } return result; } + + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) + { + List metrics = metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(toList()); + + if (!metrics.isEmpty()) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metrics.forEach(telemetry::metric); + telemetry.build(); + } + + return binding; + } + + protected NamespaceConfigBuilder> injectNamespaceMetric( + NamespaceConfigBuilder> namespace) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + + return namespace; + } } diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java index 72fb71a3de..51dad71e5c 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProtocol.java @@ -17,9 +17,11 @@ import static java.util.Objects.requireNonNull; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiMessage; @@ -28,7 +30,9 @@ import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; public abstract class AsyncapiProtocol { @@ -62,7 +66,8 @@ public abstract BindingConfigBuilder injectProtocolServerRoutes( BindingConfigBuilder binding); public NamespaceConfigBuilder injectProtocolClientCache( - NamespaceConfigBuilder namespace) + NamespaceConfigBuilder namespace, + List metricRefs) { return namespace; } @@ -145,4 +150,18 @@ protected URI findFirstServerUrlWithScheme( } return result; } + + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(Collectors.toList()) + .forEach(m -> telemetry.metric(m)); + telemetry.build(); + return binding; + } } diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java index c85e764503..49b796879b 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.binding.asyncapi.internal; import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; +import static java.util.Collections.emptyList; import java.util.Collections; import java.util.List; @@ -33,6 +34,7 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; public class AsyncapiProxyCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi @@ -58,6 +60,8 @@ public BindingConfig adapt( .collect(Collectors.toList()); this.asyncApis = options.specs.stream().collect(Collectors.toUnmodifiableMap(a -> a.apiLabel, a -> a.asyncapi)); this.qname = binding.qname; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); String sessions = ""; String messages = ""; @@ -83,10 +87,12 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/%s", qname, "mqtt-kafka")) + .inject(this::injectNamespaceMetric) .binding() .name("mqtt_kafka_proxy0") .type("mqtt-kafka") .kind(PROXY) + .inject(b -> this.injectMetrics(b, metricRefs, "mqtt-kafka")) .options(MqttKafkaOptionsConfig::builder) .topics() .sessions(sessions) diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java index 9fae81ce33..3209c12494 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java @@ -15,6 +15,9 @@ package io.aklivity.zilla.runtime.binding.asyncapi.internal; import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; +import static java.util.Collections.emptyList; + +import java.util.List; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiConfig; import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiOptionsConfig; @@ -25,6 +28,7 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; public class AsyncapiServerCompositeBindingAdapter extends AsyncapiCompositeBindingAdapter implements CompositeBindingAdapterSpi @@ -44,6 +48,8 @@ public BindingConfig adapt( AsyncapiOptionsConfig options = (AsyncapiOptionsConfig) binding.options; AsyncapiConfig asyncapiConfig = options.specs.get(0); this.asyncapi = asyncapiConfig.asyncapi; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); //TODO: add composite for all servers AsyncapiServerView firstServer = AsyncapiServerView.of(asyncapi.servers.entrySet().iterator().next().getValue()); @@ -57,22 +63,25 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/%s", qname, protocol.scheme)) + .inject(this::injectNamespaceMetric) .inject(n -> this.injectCatalog(n, asyncapi)) .binding() .name("tcp_server0") .type("tcp") .kind(SERVER) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .options(TcpOptionsConfig::builder) .host("0.0.0.0") .ports(compositePorts) .build() .inject(this::injectPlainTcpRoute) - .inject(this::injectTlsTcpRoute) + .inject(b -> this.injectTlsTcpRoute(b, metricRefs)) .build() .inject(n -> injectTlsServer(n, options)) .binding() .name(String.format("%s_server0", protocol.scheme)) .type(protocol.scheme) + .inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme)) .kind(SERVER) .inject(protocol::injectProtocolServerOptions) .inject(protocol::injectProtocolServerRoutes) @@ -98,11 +107,13 @@ private BindingConfigBuilder injectPlainTcpRoute( } private BindingConfigBuilder injectTlsTcpRoute( - BindingConfigBuilder binding) + BindingConfigBuilder binding, + List metricRefs) { if (isTlsEnabled) { binding + .inject(b -> this.injectMetrics(b, metricRefs, "tls")) .route() .when(TcpConditionConfig::builder) .ports(compositePorts) diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java index ec7e1243b2..fce84a3023 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AyncapiKafkaProtocol.java @@ -14,6 +14,7 @@ */ package io.aklivity.zilla.runtime.binding.asyncapi.internal; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder; import io.aklivity.zilla.runtime.engine.config.KindConfig; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; @@ -60,13 +62,15 @@ public AyncapiKafkaProtocol( @Override public NamespaceConfigBuilder injectProtocolClientCache( - NamespaceConfigBuilder namespace) + NamespaceConfigBuilder namespace, + List metricRefs) { return namespace .binding() .name("kafka_cache_client0") .type("kafka") .kind(KindConfig.CACHE_CLIENT) + .inject(b -> this.injectMetrics(b, metricRefs, "kafka")) .options(KafkaOptionsConfig::builder) .inject(this::injectKafkaTopicOptions) .build() @@ -76,6 +80,7 @@ public NamespaceConfigBuilder injectProtocolClientCache( .name("kafka_cache_server0") .type("kafka") .kind(KindConfig.CACHE_SERVER) + .inject(b -> this.injectMetrics(b, metricRefs, "kafka")) .options(KafkaOptionsConfig::builder) .inject(this::injectKafkaBootstrapOptions) .inject(this::injectKafkaTopicOptions) diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java index 0bcd8d57b7..9bc7b11596 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; +import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import java.util.ArrayList; @@ -46,7 +47,10 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi { @@ -66,7 +70,9 @@ public String type() public BindingConfig adapt( BindingConfig binding) { - OpenapiAsyncapiOptionsConfig options = (OpenapiAsyncapiOptionsConfig) binding.options; + final OpenapiAsyncapiOptionsConfig options = (OpenapiAsyncapiOptionsConfig) binding.options; + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); List routes = binding.routes.stream() .map(r -> new OpenapiAsyncapiRouteConfig(r, options::resolveOpenapiApiId)) @@ -75,10 +81,12 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/http_kafka", binding.qname)) + .inject(this::injectNamespaceMetric) .binding() .name("http_kafka0") .type("http-kafka") .kind(PROXY) + .inject(b -> this.injectMetrics(b, metricRefs, "http-kafka")) .inject(b -> this.injectHttpKafkaRoutes(b, binding.qname, options.specs, routes)) .build() .build() @@ -213,6 +221,75 @@ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( return fetch; } + protected NamespaceConfigBuilder> injectNamespaceMetric( + NamespaceConfigBuilder> namespace) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + + return namespace; + } + + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) + { + List metrics = metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(toList()); + + if (!metrics.isEmpty()) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metrics.forEach(telemetry::metric); + telemetry.build(); + } + + return binding; + } + private OpenapiSchemaView resolveSchemaForJsonContentType( Map content, Openapi openApi) diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingAdapter.java new file mode 100644 index 0000000000..f99cb7f5f4 --- /dev/null +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiBindingAdapter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.internal.config; + +import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; +import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; +import static java.util.function.UnaryOperator.identity; + +import java.util.EnumMap; +import java.util.Map; +import java.util.function.UnaryOperator; + +import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.KindConfig; + +public final class OpenapiBindingAdapter implements CompositeBindingAdapterSpi +{ + private final UnaryOperator composite; + + @Override + public String type() + { + return OpenapiBinding.NAME; + } + + public OpenapiBindingAdapter() + { + Map> composites = new EnumMap<>(KindConfig.class); + composites.put(SERVER, new OpenapiServerCompositeBindingAdapter()::adapt); + composites.put(CLIENT, new OpenapiClientCompositeBindingAdapter()::adapt); + UnaryOperator composite = binding -> composites + .getOrDefault(binding.kind, identity()).apply(binding); + this.composite = composite; + } + + @Override + public BindingConfig adapt( + BindingConfig binding) + { + return composite.apply(binding); + } +} diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java index c49479ef35..f229075da7 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java @@ -15,9 +15,11 @@ package io.aklivity.zilla.runtime.binding.openapi.internal.config; import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; +import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; import java.net.URI; +import java.util.List; import java.util.Map; import io.aklivity.zilla.runtime.binding.http.config.HttpOptionsConfig; @@ -27,7 +29,6 @@ import io.aklivity.zilla.runtime.binding.http.config.HttpResponseConfigBuilder; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiConfig; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiOptionsConfig; -import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiHeader; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponse; @@ -41,38 +42,21 @@ import io.aklivity.zilla.runtime.binding.tls.config.TlsOptionsConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.ModelConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; -import io.aklivity.zilla.runtime.model.core.config.IntegerModelConfig; -import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public final class OpenapiClientCompositeBindingAdapter implements CompositeBindingAdapterSpi +public final class OpenapiClientCompositeBindingAdapter extends OpenapiCompositeBindingAdapter { - private static final String INLINE_CATALOG_NAME = "catalog0"; - - private final Map models = Map.of( - "string", StringModelConfig.builder().build(), - "integer", IntegerModelConfig.builder().build() - ); - - @Override - public String type() - { - return OpenapiBinding.NAME; - } - - public OpenapiClientCompositeBindingAdapter() - { - } - @Override public BindingConfig adapt( BindingConfig binding) { - OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; - OpenapiConfig openapiConfig = options.openapis.get(0); + final OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; + final OpenapiConfig openapiConfig = options.openapis.get(0); + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); final Openapi openApi = openapiConfig.openapi; final int[] httpsPorts = resolvePortsForScheme(openApi, "https"); @@ -81,19 +65,22 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format(binding.qname, "$composite")) + .inject(this::injectNamespaceMetric) .binding() .name("http_client0") .type("http") .kind(CLIENT) .inject(b -> this.injectHttpClientOptions(b, openApi)) + .inject(b -> this.injectMetrics(b, metricRefs, "http")) .exit(secure ? "tls_client0" : "tcp_client0") .build() - .inject(b -> this.injectTlsClient(b, options.tls, secure)) + .inject(b -> this.injectTlsClient(b, options.tls, secure, metricRefs)) .binding() .name("tcp_client0") .type("tcp") .kind(CLIENT) .options(options.tcp) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .build() .build() .build(); @@ -235,7 +222,8 @@ private HttpResponseConfigBuilder injectResponseHeaders( private NamespaceConfigBuilder injectTlsClient( NamespaceConfigBuilder namespace, TlsOptionsConfig tlsConfig, - boolean secure) + boolean secure, + List metricRefs) { if (secure) { @@ -246,10 +234,10 @@ private NamespaceConfigBuilder injectTlsClient( .kind(CLIENT) .options(tlsConfig) .vault("client") + .inject(b -> injectMetrics(b, metricRefs, "tls")) .exit("tcp_client0") .build(); } return namespace; } - } diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java index 06ad49a40c..c374ff8eae 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java @@ -14,43 +14,105 @@ */ package io.aklivity.zilla.runtime.binding.openapi.internal.config; -import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; -import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; -import static java.util.function.UnaryOperator.identity; -import java.util.EnumMap; +import static java.util.stream.Collectors.toList; + +import java.util.List; import java.util.Map; -import java.util.function.UnaryOperator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; import io.aklivity.zilla.runtime.engine.config.BindingConfig; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; -import io.aklivity.zilla.runtime.engine.config.KindConfig; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; +import io.aklivity.zilla.runtime.engine.config.ModelConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; +import io.aklivity.zilla.runtime.model.core.config.IntegerModelConfig; +import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; -public final class OpenapiCompositeBindingAdapter implements CompositeBindingAdapterSpi +public abstract class OpenapiCompositeBindingAdapter { - private final UnaryOperator composite; + protected static final String INLINE_CATALOG_NAME = "catalog0"; + protected static final String INLINE_CATALOG_TYPE = "inline"; + protected static final String VERSION_LATEST = "latest"; + protected static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); - @Override - public String type() - { - return OpenapiBinding.NAME; - } + protected final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); + protected final Map models = Map.of( + "string", StringModelConfig.builder().build(), + "integer", IntegerModelConfig.builder().build() + ); - public OpenapiCompositeBindingAdapter() + protected NamespaceConfigBuilder> injectNamespaceMetric( + NamespaceConfigBuilder> namespace) { - Map> composites = new EnumMap<>(KindConfig.class); - composites.put(SERVER, new OpenapiServerCompositeBindingAdapter()::adapt); - composites.put(CLIENT, new OpenapiClientCompositeBindingAdapter()::adapt); - UnaryOperator composite = binding -> composites - .getOrDefault(binding.kind, identity()).apply(binding); - this.composite = composite; + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + + return namespace; } - @Override - public BindingConfig adapt( - BindingConfig binding) + protected BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding, + List metricRefs, + String protocol) { - return composite.apply(binding); + List metrics = metricRefs.stream() + .filter(m -> m.name.startsWith("stream.")) + .collect(toList()); + + if (!metrics.isEmpty()) + { + final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); + metrics.forEach(telemetry::metric); + telemetry.build(); + } + + return binding; } + + public abstract BindingConfig adapt( + BindingConfig binding); } diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java index e12da7de4e..809928bb98 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java @@ -16,14 +16,13 @@ import static io.aklivity.zilla.runtime.binding.http.config.HttpPolicyConfig.CROSS_ORIGIN; import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; +import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; import static org.agrona.LangUtil.rethrowUnchecked; import java.net.URI; import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import jakarta.json.bind.Jsonb; import jakarta.json.bind.JsonbBuilder; @@ -38,7 +37,6 @@ import io.aklivity.zilla.runtime.binding.http.config.HttpRequestConfigBuilder; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiConfig; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiOptionsConfig; -import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiMediaType; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiOperation; @@ -55,40 +53,23 @@ import io.aklivity.zilla.runtime.catalog.inline.config.InlineSchemaConfigBuilder; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; import io.aklivity.zilla.runtime.engine.config.GuardedConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.ModelConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; -import io.aklivity.zilla.runtime.model.core.config.IntegerModelConfig; -import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public final class OpenapiServerCompositeBindingAdapter implements CompositeBindingAdapterSpi +public final class OpenapiServerCompositeBindingAdapter extends OpenapiCompositeBindingAdapter { - private static final String INLINE_CATALOG_NAME = "catalog0"; - private static final String INLINE_CATALOG_TYPE = "inline"; - private static final String VERSION_LATEST = "latest"; - private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); - - private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); - private final Map models = Map.of( - "string", StringModelConfig.builder().build(), - "integer", IntegerModelConfig.builder().build() - ); - - @Override - public String type() - { - return OpenapiBinding.NAME; - } - @Override public BindingConfig adapt( BindingConfig binding) { - OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; - OpenapiConfig openapiConfig = options.openapis.get(0); + final OpenapiOptionsConfig options = (OpenapiOptionsConfig) binding.options; + final OpenapiConfig openapiConfig = options.openapis.get(0); + final List metricRefs = binding.telemetryRef != null ? + binding.telemetryRef.metricRefs : emptyList(); final Openapi openApi = openapiConfig.openapi; final TlsOptionsConfig tlsOption = options.tls != null ? options.tls : null; @@ -107,6 +88,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/http", binding.qname)) + .inject(this::injectNamespaceMetric) .inject(n -> this.injectCatalog(n, openApi)) .binding() .name("tcp_server0") @@ -118,8 +100,9 @@ public BindingConfig adapt( .build() .inject(b -> this.injectPlainTcpRoute(b, httpPorts, secure)) .inject(b -> this.injectTlsTcpRoute(b, httpsPorts, secure)) + .inject(b -> this.injectMetrics(b, metricRefs, "tcp")) .build() - .inject(n -> this.injectTlsServer(n, qvault, tlsOption, secure)) + .inject(n -> this.injectTlsServer(n, qvault, tlsOption, secure, metricRefs)) .binding() .name("http_server0") .type("http") @@ -132,6 +115,7 @@ public BindingConfig adapt( .inject(r -> this.injectHttpServerRequests(r, openApi)) .build() .inject(b -> this.injectHttpServerRoutes(b, openApi, binding.qname, guardName, securitySchemes)) + .inject(b -> this.injectMetrics(b, metricRefs, "http")) .build() .build() .build(); @@ -177,7 +161,7 @@ private NamespaceConfigBuilder injectTlsServer( NamespaceConfigBuilder namespace, String vault, TlsOptionsConfig tls, - boolean secure) + boolean secure, List metricRefs) { if (secure) { @@ -189,6 +173,7 @@ private NamespaceConfigBuilder injectTlsServer( .options(tls) .vault(vault) .exit("http_server0") + .inject(b -> this.injectMetrics(b, metricRefs, "tls")) .build(); } return namespace; diff --git a/incubator/binding-openapi/src/main/moditect/module-info.java b/incubator/binding-openapi/src/main/moditect/module-info.java index 9e8001e5dd..11c6001a1c 100644 --- a/incubator/binding-openapi/src/main/moditect/module-info.java +++ b/incubator/binding-openapi/src/main/moditect/module-info.java @@ -35,7 +35,7 @@ with io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBindingFactorySpi; provides io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi - with io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiCompositeBindingAdapter; + with io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiBindingAdapter; provides io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi with io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiOptionsConfigAdapter; diff --git a/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi b/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi index 499d92471c..392312312e 100644 --- a/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi +++ b/incubator/binding-openapi/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi @@ -1 +1 @@ -io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiCompositeBindingAdapter +io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiBindingAdapter From bf0ec8c7692ea7ec53e1d139d6e9aff5d83cf33b Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 19 Mar 2024 12:54:04 +0000 Subject: [PATCH 3/4] Resolve conflict --- .../internal/config/OpenapiCompositeBindingAdapter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java index c374ff8eae..34d71cdbc5 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java @@ -28,7 +28,7 @@ import io.aklivity.zilla.runtime.engine.config.ModelConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; -import io.aklivity.zilla.runtime.model.core.config.IntegerModelConfig; +import io.aklivity.zilla.runtime.model.core.config.Int32ModelConfig; import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; public abstract class OpenapiCompositeBindingAdapter @@ -41,7 +41,7 @@ public abstract class OpenapiCompositeBindingAdapter protected final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); protected final Map models = Map.of( "string", StringModelConfig.builder().build(), - "integer", IntegerModelConfig.builder().build() + "integer", Int32ModelConfig.builder().build() ); protected NamespaceConfigBuilder> injectNamespaceMetric( From b6074f5208826369b00456fc39ae3fc1b5297266 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 19 Mar 2024 15:33:51 +0000 Subject: [PATCH 4/4] Ignore namespace metrics if not specified --- ...AsyncapiClientCompositeBindingAdapter.java | 2 +- .../AsyncapiCompositeBindingAdapter.java | 92 +++++++++--------- .../AsyncapiProxyCompositeBindingAdapter.java | 2 +- ...AsyncapiServerCompositeBindingAdapter.java | 2 +- .../OpenapiAsyncCompositeBindingAdapter.java | 94 ++++++++++--------- .../OpenapiClientCompositeBindingAdapter.java | 2 +- .../OpenapiCompositeBindingAdapter.java | 92 +++++++++--------- .../OpenapiServerCompositeBindingAdapter.java | 2 +- 8 files changed, 150 insertions(+), 138 deletions(-) diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java index f2467929e0..d8d7c354cb 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiClientCompositeBindingAdapter.java @@ -58,7 +58,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s.%s", qname, "$composite")) - .inject(this::injectNamespaceMetric) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .inject(n -> this.injectCatalog(n, asyncapi)) .inject(n -> protocol.injectProtocolClientCache(n, metricRefs)) .binding() diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java index 3fd190ccb7..9243a80d1d 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiCompositeBindingAdapter.java @@ -176,51 +176,55 @@ protected BindingConfigBuilder injectMetrics( } protected NamespaceConfigBuilder> injectNamespaceMetric( - NamespaceConfigBuilder> namespace) + NamespaceConfigBuilder> namespace, + boolean hasMetrics) { - namespace - .telemetry() - .metric() - .group("stream") - .name("stream.active.received") - .build() - .metric() - .group("stream") - .name("stream.active.sent") - .build() - .metric() - .group("stream") - .name("stream.opens.received") - .build() - .metric() - .group("stream") - .name("stream.opens.sent") - .build() - .metric() - .group("stream") - .name("stream.data.received") - .build() - .metric() - .group("stream") - .name("stream.data.sent") - .build() - .metric() - .group("stream") - .name("stream.errors.received") - .build() - .metric() - .group("stream") - .name("stream.errors.sent") - .build() - .metric() - .group("stream") - .name("stream.closes.received") - .build() - .metric() - .group("stream") - .name("stream.closes.sent") - .build() - .build(); + if (hasMetrics) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + } return namespace; } diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java index 49b796879b..1309fabaaa 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiProxyCompositeBindingAdapter.java @@ -87,7 +87,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/%s", qname, "mqtt-kafka")) - .inject(this::injectNamespaceMetric) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .binding() .name("mqtt_kafka_proxy0") .type("mqtt-kafka") diff --git a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java index 3209c12494..7cfab97f87 100644 --- a/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java +++ b/incubator/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/AsyncapiServerCompositeBindingAdapter.java @@ -63,7 +63,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/%s", qname, protocol.scheme)) - .inject(this::injectNamespaceMetric) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .inject(n -> this.injectCatalog(n, asyncapi)) .binding() .name("tcp_server0") diff --git a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java index 9bc7b11596..5ba3db36a2 100644 --- a/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java +++ b/incubator/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncCompositeBindingAdapter.java @@ -81,7 +81,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/http_kafka", binding.qname)) - .inject(this::injectNamespaceMetric) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .binding() .name("http_kafka0") .type("http-kafka") @@ -222,51 +222,55 @@ private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( } protected NamespaceConfigBuilder> injectNamespaceMetric( - NamespaceConfigBuilder> namespace) + NamespaceConfigBuilder> namespace, + boolean hasMetrics) { - namespace - .telemetry() - .metric() - .group("stream") - .name("stream.active.received") - .build() - .metric() - .group("stream") - .name("stream.active.sent") - .build() - .metric() - .group("stream") - .name("stream.opens.received") - .build() - .metric() - .group("stream") - .name("stream.opens.sent") - .build() - .metric() - .group("stream") - .name("stream.data.received") - .build() - .metric() - .group("stream") - .name("stream.data.sent") - .build() - .metric() - .group("stream") - .name("stream.errors.received") - .build() - .metric() - .group("stream") - .name("stream.errors.sent") - .build() - .metric() - .group("stream") - .name("stream.closes.received") - .build() - .metric() - .group("stream") - .name("stream.closes.sent") - .build() - .build(); + if (hasMetrics) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + } return namespace; } diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java index f229075da7..c4171996da 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientCompositeBindingAdapter.java @@ -65,7 +65,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format(binding.qname, "$composite")) - .inject(this::injectNamespaceMetric) + .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) .binding() .name("http_client0") .type("http") diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java index 34d71cdbc5..a24b0525ae 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeBindingAdapter.java @@ -45,51 +45,55 @@ public abstract class OpenapiCompositeBindingAdapter ); protected NamespaceConfigBuilder> injectNamespaceMetric( - NamespaceConfigBuilder> namespace) + NamespaceConfigBuilder> namespace, + boolean hasMetrics) { - namespace - .telemetry() - .metric() - .group("stream") - .name("stream.active.received") - .build() - .metric() - .group("stream") - .name("stream.active.sent") - .build() - .metric() - .group("stream") - .name("stream.opens.received") - .build() - .metric() - .group("stream") - .name("stream.opens.sent") - .build() - .metric() - .group("stream") - .name("stream.data.received") - .build() - .metric() - .group("stream") - .name("stream.data.sent") - .build() - .metric() - .group("stream") - .name("stream.errors.received") - .build() - .metric() - .group("stream") - .name("stream.errors.sent") - .build() - .metric() - .group("stream") - .name("stream.closes.received") - .build() - .metric() - .group("stream") - .name("stream.closes.sent") - .build() - .build(); + if (hasMetrics) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + } return namespace; } diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java index 809928bb98..ff70d4e4db 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerCompositeBindingAdapter.java @@ -88,7 +88,7 @@ public BindingConfig adapt( return BindingConfig.builder(binding) .composite() .name(String.format("%s/http", binding.qname)) - .inject(this::injectNamespaceMetric) + .inject(namespace -> injectNamespaceMetric(namespace, !metricRefs.isEmpty())) .inject(n -> this.injectCatalog(n, openApi)) .binding() .name("tcp_server0")