Skip to content

Commit

Permalink
adds support for Micrometer Observations (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcingrzejszczak authored Oct 25, 2022
1 parent 000f6da commit 32da131
Show file tree
Hide file tree
Showing 20 changed files with 1,664 additions and 4 deletions.
7 changes: 5 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ subprojects {
ext['slf4j.version'] = '1.7.36'
ext['jmh.version'] = '1.35'
ext['junit.version'] = '5.8.1'
ext['micrometer.version'] = '1.8.4'
ext['micrometer.version'] = '1.10.0-RC1'
ext['micrometer-tracing.version'] = '1.0.0-RC1'
ext['assertj.version'] = '3.22.0'
ext['netflix.limits.version'] = '0.3.6'
ext['bouncycastle-bcpkix.version'] = '1.70'
Expand All @@ -69,14 +70,15 @@ subprojects {
mavenBom "io.projectreactor:reactor-bom:${ext['reactor-bom.version']}"
mavenBom "io.netty:netty-bom:${ext['netty-bom.version']}"
mavenBom "org.junit:junit-bom:${ext['junit.version']}"
mavenBom "io.micrometer:micrometer-bom:${ext['micrometer.version']}"
mavenBom "io.micrometer:micrometer-tracing-bom:${ext['micrometer-tracing.version']}"
}

dependencies {
dependency "com.netflix.concurrency-limits:concurrency-limits-core:${ext['netflix.limits.version']}"
dependency "ch.qos.logback:logback-classic:${ext['logback.version']}"
dependency "io.netty:netty-tcnative-boringssl-static:${ext['netty-boringssl.version']}"
dependency "org.bouncycastle:bcpkix-jdk15on:${ext['bouncycastle-bcpkix.version']}"
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
dependency "org.hdrhistogram:HdrHistogram:${ext['hdrhistogram.version']}"
dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}"
Expand All @@ -103,6 +105,7 @@ subprojects {
content {
includeGroup "io.projectreactor"
includeGroup "io.projectreactor.netty"
includeGroup "io.micrometer"
}
}

Expand Down
6 changes: 6 additions & 0 deletions rsocket-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ dependencies {
implementation project(':rsocket-transport-netty')

implementation 'com.netflix.concurrency-limits:concurrency-limits-core'
implementation "io.micrometer:micrometer-core"
implementation "io.micrometer:micrometer-tracing"
implementation project(":rsocket-micrometer")
testImplementation 'org.awaitility:awaitility'

runtimeOnly 'ch.qos.logback:logback-classic'

Expand All @@ -33,6 +37,8 @@ dependencies {
testImplementation 'org.mockito:mockito-core'
testImplementation 'org.assertj:assertj-core'
testImplementation 'io.projectreactor:reactor-test'
testImplementation "io.micrometer:micrometer-test"
testImplementation "io.micrometer:micrometer-tracing-integration-test"

testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.integration.observation;

import static org.assertj.core.api.Assertions.assertThat;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.tck.MeterRegistryAssert;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.test.SampleTestRunner;
import io.micrometer.tracing.test.reporter.BuildingBlocks;
import io.micrometer.tracing.test.simple.SpansAssert;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.micrometer.observation.ByteBufGetter;
import io.rsocket.micrometer.observation.ByteBufSetter;
import io.rsocket.micrometer.observation.ObservationRequesterRSocketProxy;
import io.rsocket.micrometer.observation.ObservationResponderRSocketProxy;
import io.rsocket.micrometer.observation.RSocketRequesterTracingObservationHandler;
import io.rsocket.micrometer.observation.RSocketResponderTracingObservationHandler;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ObservationIntegrationTest extends SampleTestRunner {
private static final MeterRegistry registry = new SimpleMeterRegistry();
private static final ObservationRegistry observationRegistry = ObservationRegistry.create();

static {
observationRegistry
.observationConfig()
.observationHandler(new DefaultMeterObservationHandler(registry));
}

private final RSocketInterceptor requesterInterceptor;
private final RSocketInterceptor responderInterceptor;

ObservationIntegrationTest() {
super(SampleRunnerConfig.builder().build());
requesterInterceptor =
reactiveSocket -> new ObservationRequesterRSocketProxy(reactiveSocket, observationRegistry);

responderInterceptor =
reactiveSocket -> new ObservationResponderRSocketProxy(reactiveSocket, observationRegistry);
}

private CloseableChannel server;
private RSocket client;
private AtomicInteger counter;

@Override
public BiConsumer<BuildingBlocks, Deque<ObservationHandler<? extends Observation.Context>>>
customizeObservationHandlers() {
return (buildingBlocks, observationHandlers) -> {
observationHandlers.addFirst(
new RSocketRequesterTracingObservationHandler(
buildingBlocks.getTracer(),
buildingBlocks.getPropagator(),
new ByteBufSetter(),
false));
observationHandlers.addFirst(
new RSocketResponderTracingObservationHandler(
buildingBlocks.getTracer(),
buildingBlocks.getPropagator(),
new ByteBufGetter(),
false));
};
}

@AfterEach
public void teardown() {
if (server != null) {
server.dispose();
}
}

private void testRequest() {
counter.set(0);
client.requestResponse(DefaultPayload.create("REQUEST", "META")).block();
assertThat(counter).as("Server did not see the request.").hasValue(1);
}

private void testStream() {
counter.set(0);
client.requestStream(DefaultPayload.create("start")).blockLast();

assertThat(counter).as("Server did not see the request.").hasValue(1);
}

private void testRequestChannel() {
counter.set(0);
client.requestChannel(Mono.just(DefaultPayload.create("start"))).blockFirst();
assertThat(counter).as("Server did not see the request.").hasValue(1);
}

private void testFireAndForget() {
counter.set(0);
client.fireAndForget(DefaultPayload.create("start")).subscribe();
Awaitility.await().atMost(Duration.ofSeconds(50)).until(() -> counter.get() == 1);
assertThat(counter).as("Server did not see the request.").hasValue(1);
}

@Override
public SampleTestRunnerConsumer yourCode() {
return (bb, meterRegistry) -> {
counter = new AtomicInteger();
server =
RSocketServer.create(
(setup, sendingSocket) -> {
sendingSocket.onClose().subscribe();

return Mono.just(
new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
counter.incrementAndGet();
return Mono.just(DefaultPayload.create("RESPONSE", "METADATA"));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
counter.incrementAndGet();
return Flux.range(1, 10_000)
.map(i -> DefaultPayload.create("data -> " + i));
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
counter.incrementAndGet();
return Flux.from(payloads);
}

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
counter.incrementAndGet();
return Mono.empty();
}
});
})
.interceptors(registry -> registry.forResponder(responderInterceptor))
.bind(TcpServerTransport.create("localhost", 0))
.block();

client =
RSocketConnector.create()
.interceptors(registry -> registry.forRequester(requesterInterceptor))
.connect(TcpClientTransport.create(server.address()))
.block();

testRequest();

testStream();

testRequestChannel();

testFireAndForget();

// @formatter:off
SpansAssert.assertThat(bb.getFinishedSpans())
.haveSameTraceId()
// "request_*" + "handle" x 4
.hasNumberOfSpansEqualTo(8)
.hasNumberOfSpansWithNameEqualTo("handle", 4)
.forAllSpansWithNameEqualTo("handle", span -> span.hasTagWithKey("rsocket.request-type"))
.hasASpanWithNameIgnoreCase("request_stream")
.thenASpanWithNameEqualToIgnoreCase("request_stream")
.hasTag("rsocket.request-type", "REQUEST_STREAM")
.backToSpans()
.hasASpanWithNameIgnoreCase("request_channel")
.thenASpanWithNameEqualToIgnoreCase("request_channel")
.hasTag("rsocket.request-type", "REQUEST_CHANNEL")
.backToSpans()
.hasASpanWithNameIgnoreCase("request_fnf")
.thenASpanWithNameEqualToIgnoreCase("request_fnf")
.hasTag("rsocket.request-type", "REQUEST_FNF")
.backToSpans()
.hasASpanWithNameIgnoreCase("request_response")
.thenASpanWithNameEqualToIgnoreCase("request_response")
.hasTag("rsocket.request-type", "REQUEST_RESPONSE");

MeterRegistryAssert.assertThat(registry)
.hasTimerWithNameAndTags(
"rsocket.response",
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_RESPONSE")))
.hasTimerWithNameAndTags(
"rsocket.fnf",
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_FNF")))
.hasTimerWithNameAndTags(
"rsocket.request",
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_RESPONSE")))
.hasTimerWithNameAndTags(
"rsocket.channel",
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_CHANNEL")))
.hasTimerWithNameAndTags(
"rsocket.stream",
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_STREAM")));
// @formatter:on
};
}

@Override
protected MeterRegistry getMeterRegistry() {
return registry;
}

@Override
protected ObservationRegistry getObservationRegistry() {
return observationRegistry;
}
}
1 change: 1 addition & 0 deletions rsocket-micrometer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ plugins {
dependencies {
api project(':rsocket-core')
api 'io.micrometer:micrometer-core'
api 'io.micrometer:micrometer-tracing'

implementation 'org.slf4j:slf4j-api'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.micrometer.observation;

import io.micrometer.tracing.propagation.Propagator;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import io.rsocket.metadata.CompositeMetadata;

public class ByteBufGetter implements Propagator.Getter<ByteBuf> {

@Override
public String get(ByteBuf carrier, String key) {
final CompositeMetadata compositeMetadata = new CompositeMetadata(carrier, false);
for (CompositeMetadata.Entry entry : compositeMetadata) {
if (key.equals(entry.getMimeType())) {
return entry.getContent().toString(CharsetUtil.UTF_8);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.micrometer.observation;

import io.micrometer.tracing.propagation.Propagator;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.rsocket.metadata.CompositeMetadataCodec;

public class ByteBufSetter implements Propagator.Setter<CompositeByteBuf> {

@Override
public void set(CompositeByteBuf carrier, String key, String value) {
final ByteBufAllocator alloc = carrier.alloc();
CompositeMetadataCodec.encodeAndAddMetadataWithCompression(
carrier, alloc, key, ByteBufUtil.writeUtf8(alloc, value));
}
}
Loading

0 comments on commit 32da131

Please sign in to comment.