From 717ef6bbec3b196de9998125b2995cf879c9b3d7 Mon Sep 17 00:00:00 2001 From: Auri Munoz Date: Wed, 27 Sep 2023 17:19:35 +0200 Subject: [PATCH] Refactor to manage concurrency --- .../stork/StorkObservationCollectorBean.java | 140 +++++++----------- 1 file changed, 57 insertions(+), 83 deletions(-) diff --git a/extensions/micrometer/runtime/src/main/java/io/quarkus/micrometer/runtime/binder/stork/StorkObservationCollectorBean.java b/extensions/micrometer/runtime/src/main/java/io/quarkus/micrometer/runtime/binder/stork/StorkObservationCollectorBean.java index 9e6feaef9130c..7f02ca81ad543 100644 --- a/extensions/micrometer/runtime/src/main/java/io/quarkus/micrometer/runtime/binder/stork/StorkObservationCollectorBean.java +++ b/extensions/micrometer/runtime/src/main/java/io/quarkus/micrometer/runtime/binder/stork/StorkObservationCollectorBean.java @@ -16,20 +16,16 @@ import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; -import io.smallrye.stork.api.ServiceInstance; import io.smallrye.stork.api.observability.ObservationCollector; import io.smallrye.stork.api.observability.ObservationPoints; @ApplicationScoped @Typed(ObservationCollector.class) -public class StorkObservationCollectorBean implements ObservationCollector { +public class StorkObservationCollectorBean implements ObservationCollector, ObservationCollector.EventCompletionHandler { public static final String METRICS_SUFIX = "-metrics"; final MeterRegistry registry = Metrics.globalRegistry; - private final EventCompletionHandler STORK_HANDLER = ev -> { - //TODO - }; public final static Map STORK_METRICS = new ConcurrentHashMap<>(); @Override @@ -37,84 +33,62 @@ public ObservationPoints.StorkResolutionEvent create(String serviceName, String String serviceSelectionType) { return STORK_METRICS.computeIfAbsent(serviceName + METRICS_SUFIX, key -> new ObservationPoints.StorkResolutionEvent(serviceName, serviceDiscoveryType, serviceSelectionType, - STORK_HANDLER) { - - private final Tags tags = Tags.of(Tag.of("service-name", getServiceName()));; - private final Counter instanceCounter = Counter.builder("stork.instances.count") - .description("The number of service instances discovered") - .tags(tags) - .register(registry);; - - private final Timer serviceDiscoveryTimer = Timer - .builder("stork.service-discovery.duration") - .description("The duration of the discovery operation") - .tags(tags) - .register(registry); - - private final Timer serviceSelectionTimer = Timer - .builder("stork.service-selection.duration") - .description("The duration of the selection operation ") - .tags(tags) - .register(registry); - - private final Timer overallTimer = Timer.builder("stork.overall.duration") - .description("The total duration of the Stork service discovery and selection operations") - .tags(tags) - .register(registry); - - private List serviceDiscoveryExceptions = new ArrayList<>(); - - private final Gauge serviceDiscoveryFailures = Gauge - .builder("stork.service-discovery.failures", serviceDiscoveryExceptions, - List::size) - .description("The number of failures during service discovery").tags(tags) - .register(registry); - - private List serviceSelectionExceptions = new ArrayList<>(); - - private final Gauge serviceSelectionFailures = Gauge - .builder("stork.load-balancer.failures", serviceSelectionExceptions, List::size) - .description("The number of failures during service selection.").tags(tags) - .register(registry); - - @Override - public void onServiceDiscoverySuccess(List instances) { - super.onServiceDiscoverySuccess(instances); - if (instances != null) { - instanceCounter.increment(); - } - serviceDiscoveryTimer.record(getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS); - - } - - @Override - public void onServiceDiscoveryFailure(Throwable throwable) { - super.onServiceDiscoveryFailure(throwable); - serviceDiscoveryExceptions.add(throwable); - serviceDiscoveryTimer.record(getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS); - overallTimer.record(getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS); - - } - - @Override - public void onServiceSelectionSuccess(long id) { - super.onServiceSelectionSuccess(id); - serviceSelectionTimer.record(getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS); - overallTimer.record(getOverallDuration().getNano(), TimeUnit.NANOSECONDS); - - } - - @Override - public void onServiceSelectionFailure(Throwable throwable) { - if (failure != throwable) { //if SD fails we don't know count the error as a LB error - serviceSelectionExceptions.add(throwable); - } - super.onServiceSelectionFailure(throwable); - serviceSelectionTimer.record(getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS); - overallTimer.record(getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS); - } - - }); + this)); } + @Override + public void complete(ObservationPoints.StorkResolutionEvent event) { + Tags tags = Tags.of(Tag.of("service-name", event.getServiceName())); + + Counter instanceCounter = Counter.builder("stork.instances.count") + .description("The number of service instances discovered") + .tags(tags) + .register(registry); + + Timer serviceDiscoveryTimer = Timer + .builder("stork.service-discovery.duration") + .description("The duration of the discovery operation") + .tags(tags) + .register(registry); + + Timer serviceSelectionTimer = Timer + .builder("stork.service-selection.duration") + .description("The duration of the selection operation ") + .tags(tags) + .register(registry); + + Timer overallTimer = Timer.builder("stork.overall.duration") + .description("The total duration of the Stork service discovery and selection operations") + .tags(tags) + .register(registry); + + List serviceDiscoveryExceptions = new ArrayList<>(); + + Gauge serviceDiscoveryFailures = Gauge + .builder("stork.service-discovery.failures", serviceDiscoveryExceptions, + List::size) + .description("The number of failures during service discovery").tags(tags) + .register(registry); + + List serviceSelectionExceptions = new ArrayList<>(); + + Gauge serviceSelectionFailures = Gauge + .builder("stork.load-balancer.failures", serviceSelectionExceptions, List::size) + .description("The number of failures during service selection.").tags(tags) + .register(registry); + + instanceCounter.increment(event.getDiscoveredInstancesCount()); + serviceDiscoveryTimer.record(event.getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS); + serviceSelectionTimer.record(event.getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS); + overallTimer.record(event.getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS); + + if (event.failure() != null) { + if (event.isServiceDiscoveryDone()) { + serviceSelectionExceptions.add(event.failure()); + } else {// SD failure + serviceDiscoveryExceptions.add(event.failure()); + } + } + + } }