Skip to content

Commit

Permalink
Refactor to manage concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
aureamunoz committed Sep 28, 2023
1 parent f6c037f commit 717ef6b
Showing 1 changed file with 57 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,105 +16,79 @@
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.api.observability.ObservationCollector;
import io.smallrye.stork.api.observability.ObservationPoints;

@ApplicationScoped
@Typed(ObservationCollector.class)
public class StorkObservationCollectorBean implements ObservationCollector {
public class StorkObservationCollectorBean implements ObservationCollector, ObservationCollector.EventCompletionHandler {

public static final String METRICS_SUFIX = "-metrics";
final MeterRegistry registry = Metrics.globalRegistry;

private final EventCompletionHandler STORK_HANDLER = ev -> {
//TODO
};
public final static Map<String, ObservationPoints.StorkResolutionEvent> STORK_METRICS = new ConcurrentHashMap<>();

@Override
public ObservationPoints.StorkResolutionEvent create(String serviceName, String serviceDiscoveryType,
String serviceSelectionType) {
return STORK_METRICS.computeIfAbsent(serviceName + METRICS_SUFIX,
key -> new ObservationPoints.StorkResolutionEvent(serviceName, serviceDiscoveryType, serviceSelectionType,
STORK_HANDLER) {

private final Tags tags = Tags.of(Tag.of("service-name", getServiceName()));;
private final Counter instanceCounter = Counter.builder("stork.instances.count")
.description("The number of service instances discovered")
.tags(tags)
.register(registry);;

private final Timer serviceDiscoveryTimer = Timer
.builder("stork.service-discovery.duration")
.description("The duration of the discovery operation")
.tags(tags)
.register(registry);

private final Timer serviceSelectionTimer = Timer
.builder("stork.service-selection.duration")
.description("The duration of the selection operation ")
.tags(tags)
.register(registry);

private final Timer overallTimer = Timer.builder("stork.overall.duration")
.description("The total duration of the Stork service discovery and selection operations")
.tags(tags)
.register(registry);

private List<Throwable> serviceDiscoveryExceptions = new ArrayList<>();

private final Gauge serviceDiscoveryFailures = Gauge
.builder("stork.service-discovery.failures", serviceDiscoveryExceptions,
List::size)
.description("The number of failures during service discovery").tags(tags)
.register(registry);

private List<Throwable> serviceSelectionExceptions = new ArrayList<>();

private final Gauge serviceSelectionFailures = Gauge
.builder("stork.load-balancer.failures", serviceSelectionExceptions, List::size)
.description("The number of failures during service selection.").tags(tags)
.register(registry);

@Override
public void onServiceDiscoverySuccess(List<ServiceInstance> instances) {
super.onServiceDiscoverySuccess(instances);
if (instances != null) {
instanceCounter.increment();
}
serviceDiscoveryTimer.record(getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS);

}

@Override
public void onServiceDiscoveryFailure(Throwable throwable) {
super.onServiceDiscoveryFailure(throwable);
serviceDiscoveryExceptions.add(throwable);
serviceDiscoveryTimer.record(getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS);
overallTimer.record(getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS);

}

@Override
public void onServiceSelectionSuccess(long id) {
super.onServiceSelectionSuccess(id);
serviceSelectionTimer.record(getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS);
overallTimer.record(getOverallDuration().getNano(), TimeUnit.NANOSECONDS);

}

@Override
public void onServiceSelectionFailure(Throwable throwable) {
if (failure != throwable) { //if SD fails we don't know count the error as a LB error
serviceSelectionExceptions.add(throwable);
}
super.onServiceSelectionFailure(throwable);
serviceSelectionTimer.record(getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS);
overallTimer.record(getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS);
}

});
this));
}

@Override
public void complete(ObservationPoints.StorkResolutionEvent event) {
Tags tags = Tags.of(Tag.of("service-name", event.getServiceName()));

Counter instanceCounter = Counter.builder("stork.instances.count")
.description("The number of service instances discovered")
.tags(tags)
.register(registry);

Timer serviceDiscoveryTimer = Timer
.builder("stork.service-discovery.duration")
.description("The duration of the discovery operation")
.tags(tags)
.register(registry);

Timer serviceSelectionTimer = Timer
.builder("stork.service-selection.duration")
.description("The duration of the selection operation ")
.tags(tags)
.register(registry);

Timer overallTimer = Timer.builder("stork.overall.duration")
.description("The total duration of the Stork service discovery and selection operations")
.tags(tags)
.register(registry);

List<Throwable> serviceDiscoveryExceptions = new ArrayList<>();

Gauge serviceDiscoveryFailures = Gauge
.builder("stork.service-discovery.failures", serviceDiscoveryExceptions,
List::size)
.description("The number of failures during service discovery").tags(tags)
.register(registry);

List<Throwable> serviceSelectionExceptions = new ArrayList<>();

Gauge serviceSelectionFailures = Gauge
.builder("stork.load-balancer.failures", serviceSelectionExceptions, List::size)
.description("The number of failures during service selection.").tags(tags)
.register(registry);

instanceCounter.increment(event.getDiscoveredInstancesCount());
serviceDiscoveryTimer.record(event.getServiceDiscoveryDuration().getNano(), TimeUnit.NANOSECONDS);
serviceSelectionTimer.record(event.getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS);
overallTimer.record(event.getServiceSelectionDuration().getNano(), TimeUnit.NANOSECONDS);

if (event.failure() != null) {
if (event.isServiceDiscoveryDone()) {
serviceSelectionExceptions.add(event.failure());
} else {// SD failure
serviceDiscoveryExceptions.add(event.failure());
}
}

}
}

0 comments on commit 717ef6b

Please sign in to comment.