Skip to content

Commit

Permalink
Log active websocket connections
Browse files Browse the repository at this point in the history
  • Loading branch information
CatoTH committed Nov 16, 2024
1 parent 7213123 commit 3f7581b
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/main/java/de/antragsgruen/live/LiveApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication()
@EnableScheduling
public class LiveApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package de.antragsgruen.live.metrics;

import de.antragsgruen.live.multisite.ConsultationScope;
import de.antragsgruen.live.websocket.TopicPermissionChecker;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

record SubscriptionDimensions(String installationId, String siteId, String consultationId) { }

@Service
@RequiredArgsConstructor
@Slf4j
public class ActiveWebsocketConnectionMetric {
private final SimpUserRegistry userRegistry;
private final MeterRegistry registry;

private final String metricName = "antragsgruen_live.ws.active_connections";
private final Integer metricIntervalMs = 5000;

@Scheduled(fixedRate = metricIntervalMs)
public void processGauge() {
userRegistry.findSubscriptions(subscription -> true)
.stream()
.map(subscription -> TopicPermissionChecker.consultationScopeFromPathParts(subscription.getDestination().split("/")))
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
.forEach(this::trackMetric);
}

private void trackMetric(ConsultationScope scope, Long subscribers) {
log.debug("Logging active websocket metrics: " + scope + " - " + subscribers);

registry.gauge(
metricName,
List.of(
Tag.of("installation", scope.installation()),
Tag.of("site", scope.site()),
Tag.of("consultation", scope.consultation())
),
subscribers
);
}
}
6 changes: 6 additions & 0 deletions src/main/java/de/antragsgruen/live/metrics/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
@NonNullApi
@NonNullFields
package de.antragsgruen.live.metrics;

import org.springframework.lang.NonNullApi;
import org.springframework.lang.NonNullFields;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class TopicPermissionChecker {
private static final String ROLE_SPEECH_ADMIN = "ROLE_SPEECH_ADMIN";

public static final int USER_PARTS_LENGTH = 7;
public static final int USER_PARTS_LENGTH = 7; // Also used for /admin/ topics
public static final int USER_PART_ROLE = 1;
public static final int USER_PART_INSTALLATION = 2;
public static final int USER_PART_SITE = 3;
Expand Down Expand Up @@ -44,25 +44,33 @@ public boolean canSubscribeToDestination(JwtAuthenticationToken jwtToken, @Nulla
return false;
}

ConsultationScope scope = null;
boolean additionalPermissionsPassed = true;
ConsultationScope scope = TopicPermissionChecker.consultationScopeFromPathParts(pathParts);

if ("topic".equals(pathParts[TOPIC_PART_TOPIC]) && pathParts.length == TOPIC_PARTS_LENGTH) {
scope = new ConsultationScope(pathParts[TOPIC_PART_INSTALLATION], pathParts[TOPIC_PART_SITE], pathParts[TOPIC_PART_CONSULTATION]);
}
boolean additionalPermissionsPassed = true;
if (Sender.ROLE_USER.equals(pathParts[USER_PART_ROLE]) && pathParts.length == USER_PARTS_LENGTH) {
scope = new ConsultationScope(pathParts[USER_PART_INSTALLATION], pathParts[USER_PART_SITE], pathParts[USER_PART_CONSULTATION]);
additionalPermissionsPassed = pathParts[USER_PART_USER].equals(jwtToken.getName());
}
if (Sender.ROLE_ADMIN.equals(pathParts[USER_PART_ROLE]) && pathParts.length == USER_PARTS_LENGTH) {
scope = new ConsultationScope(pathParts[USER_PART_INSTALLATION], pathParts[USER_PART_SITE], pathParts[USER_PART_CONSULTATION]);
additionalPermissionsPassed = jwtHasRoleForTopic(jwtToken, pathParts[USER_PART_MODULE])
&& pathParts[USER_PART_USER].equals(jwtToken.getName());
}

return (scope != null && jwtIsForCorrectConsultation(jwtToken, scope) && additionalPermissionsPassed);
}

public static ConsultationScope consultationScopeFromPathParts(String[] pathParts) {
if ("topic".equals(pathParts[TOPIC_PART_TOPIC]) && pathParts.length == TOPIC_PARTS_LENGTH) {
return new ConsultationScope(pathParts[TOPIC_PART_INSTALLATION], pathParts[TOPIC_PART_SITE], pathParts[TOPIC_PART_CONSULTATION]);
}
if (Sender.ROLE_USER.equals(pathParts[USER_PART_ROLE]) && pathParts.length == USER_PARTS_LENGTH) {
return new ConsultationScope(pathParts[USER_PART_INSTALLATION], pathParts[USER_PART_SITE], pathParts[USER_PART_CONSULTATION]);
}
if (Sender.ROLE_ADMIN.equals(pathParts[USER_PART_ROLE]) && pathParts.length == USER_PARTS_LENGTH) {
return new ConsultationScope(pathParts[USER_PART_INSTALLATION], pathParts[USER_PART_SITE], pathParts[USER_PART_CONSULTATION]);
}
return null;
}

private boolean jwtIsForCorrectConsultation(JwtAuthenticationToken jwtToken, ConsultationScope scope) {
Object payload = jwtToken.getTokenAttributes().get("payload");
if (!(payload instanceof Map<?, ?> payloadMap)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ antragsgruen:
management:
metrics:
tags:
application: ${spring.application.name}
application: antragsgruen_live
endpoints:
web:
exposure:
Expand Down

0 comments on commit 3f7581b

Please sign in to comment.