Skip to content

Commit

Permalink
feat: add configurable backend session expiration policy
Browse files Browse the repository at this point in the history
Allows to define an expiration policy for the backend stored session.
By default no expiration is set, but can be expiration can be activated
setting the vaadin.kubernetes.backend-session-expiration-tolerance property
or by providing a custom SessionExpirationPolicy bean.

Fixes #126
  • Loading branch information
mcollovati committed Nov 27, 2024
1 parent 6d3192e commit 59eeed5
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import jakarta.servlet.FilterRegistration;
import jakarta.servlet.ServletContext;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.function.Predicate;

import com.hazelcast.config.Config;
Expand Down Expand Up @@ -48,6 +50,7 @@
import com.vaadin.kubernetes.starter.sessiontracker.backend.BackendConnector;
import com.vaadin.kubernetes.starter.sessiontracker.backend.HazelcastConnector;
import com.vaadin.kubernetes.starter.sessiontracker.backend.RedisConnector;
import com.vaadin.kubernetes.starter.sessiontracker.backend.SessionExpirationPolicy;
import com.vaadin.kubernetes.starter.sessiontracker.push.PushSessionTracker;
import com.vaadin.kubernetes.starter.sessiontracker.serialization.SpringTransientHandler;
import com.vaadin.kubernetes.starter.sessiontracker.serialization.TransientHandler;
Expand Down Expand Up @@ -105,6 +108,18 @@ Predicate<Class<?>> transientInjectableFilter(
return props.getTransients().transientInjectableFilter();
}

@Bean
@ConditionalOnMissingBean
SessionExpirationPolicy sessionExpirationPolicy() {
Duration duration = properties
.getBackendSessionExpirationTolerance();
if (duration != null) {
return sessionTimeout -> duration.plus(sessionTimeout,
ChronoUnit.SECONDS);
}
return SessionExpirationPolicy.NEVER;
}

@Bean
@ConditionalOnMissingBean
SessionSerializationCallback sessionSerializationCallback() {
Expand All @@ -115,10 +130,11 @@ SessionSerializationCallback sessionSerializationCallback() {
SessionSerializer sessionSerializer(BackendConnector backendConnector,
TransientHandler transientInjector,
SessionSerializationCallback sessionSerializationCallback,
SessionExpirationPolicy sessionExpirationPolicy,
@Autowired(required = false) @Qualifier(TRANSIENT_INJECTABLE_FILTER) Predicate<Class<?>> injectablesFilter) {
SessionSerializer sessionSerializer = new SessionSerializer(
backendConnector, transientInjector,
sessionSerializationCallback);
sessionExpirationPolicy, sessionSerializationCallback);
if (injectablesFilter != null) {
sessionSerializer.setInjectableFilter(injectablesFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package com.vaadin.kubernetes.starter;

import java.time.Duration;

import org.springframework.boot.context.properties.ConfigurationProperties;

import com.vaadin.kubernetes.starter.sessiontracker.CurrentKey;
Expand All @@ -33,6 +35,12 @@ public class KubernetesKitProperties {
*/
private boolean autoConfigure = true;

/**
* Amount of time to be added to the HTTP session timeout to determine the
* expiration of the backend session.
*/
private Duration backendSessionExpirationTolerance;

/**
* The name of the distributed storage session key cookie.
*/
Expand Down Expand Up @@ -70,6 +78,23 @@ public void setAutoConfigure(boolean autoConfigure) {
this.autoConfigure = autoConfigure;
}

/**
* Sets the amount of time to be added to the HTTP session timeout to
* determine the expiration of the backend session.
*/
public void setBackendSessionExpirationTolerance(
Duration backendSessionExpirationTolerance) {
this.backendSessionExpirationTolerance = backendSessionExpirationTolerance;
}

/**
* Gets the amount of time to be added to the HTTP session timeout to
* determine the expiration of the backend session.
*/
public Duration getBackendSessionExpirationTolerance() {
return backendSessionExpirationTolerance;
}

/**
* Gets the name of the distributed storage session key cookie.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.NotSerializableException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -43,6 +44,7 @@
import com.vaadin.flow.server.WrappedSession;
import com.vaadin.kubernetes.starter.ProductUtils;
import com.vaadin.kubernetes.starter.sessiontracker.backend.BackendConnector;
import com.vaadin.kubernetes.starter.sessiontracker.backend.SessionExpirationPolicy;
import com.vaadin.kubernetes.starter.sessiontracker.backend.SessionInfo;
import com.vaadin.kubernetes.starter.sessiontracker.serialization.TransientHandler;
import com.vaadin.kubernetes.starter.sessiontracker.serialization.TransientInjectableObjectInputStream;
Expand Down Expand Up @@ -123,6 +125,8 @@ public class SessionSerializer

private final SessionSerializationCallback sessionSerializationCallback;

private final SessionExpirationPolicy sessionExpirationPolicy;

private Predicate<Class<?>> injectableFilter = type -> true;

/**
Expand All @@ -139,9 +143,10 @@ public class SessionSerializer
*/
public SessionSerializer(BackendConnector backendConnector,
TransientHandler transientHandler,
SessionExpirationPolicy sessionExpirationPolicy,
SessionSerializationCallback sessionSerializationCallback) {
this(backendConnector, (sessionId, clusterKey) -> transientHandler,
sessionSerializationCallback);
sessionExpirationPolicy, sessionSerializationCallback);
}

/**
Expand Down Expand Up @@ -171,21 +176,25 @@ public SessionSerializer(BackendConnector backendConnector,
*/
public SessionSerializer(BackendConnector backendConnector,
BiFunction<String, String, TransientHandler> transientHandlerProvider,
SessionExpirationPolicy sessionExpirationPolicy,
SessionSerializationCallback sessionSerializationCallback) {
this.backendConnector = backendConnector;
this.handlerProvider = transientHandlerProvider;
this.sessionSerializationCallback = sessionSerializationCallback;
this.sessionExpirationPolicy = sessionExpirationPolicy;
optimisticSerializationTimeoutMs = OPTIMISTIC_SERIALIZATION_TIMEOUT_MS;
}

// Visible for test
SessionSerializer(BackendConnector backendConnector,
TransientHandler transientHandler,
SessionExpirationPolicy sessionExpirationPolicy,
SessionSerializationCallback sessionSerializationCallback,
long optimisticSerializationTimeoutMs) {
this.backendConnector = backendConnector;
this.optimisticSerializationTimeoutMs = optimisticSerializationTimeoutMs;
this.sessionSerializationCallback = sessionSerializationCallback;
this.sessionExpirationPolicy = sessionExpirationPolicy;
this.handlerProvider = (sessionId, clusterKey) -> transientHandler;
}

Expand Down Expand Up @@ -226,7 +235,9 @@ public void serialize(WrappedSession session) {
Map<String, Object> values = session.getAttributeNames().stream()
.collect(Collectors.toMap(Function.identity(),
session::getAttribute));
queueSerialization(session.getId(), values);
Duration timeToLive = sessionExpirationPolicy
.apply(session.getMaxInactiveInterval());
queueSerialization(session.getId(), timeToLive, values);
}

/**
Expand All @@ -251,7 +262,7 @@ public void deserialize(SessionInfo sessionInfo, HttpSession session)
}
}

private void queueSerialization(String sessionId,
private void queueSerialization(String sessionId, Duration timeToLive,
Map<String, Object> attributes) {
if (pending.containsKey(sessionId)) {
// This session will be serialized again soon enough
Expand Down Expand Up @@ -285,8 +296,8 @@ private void queueSerialization(String sessionId,
backendConnector
.markSerializationComplete(clusterKey);
};
handleSessionSerialization(sessionId, attributes,
whenSerialized);
handleSessionSerialization(sessionId, timeToLive,
attributes, whenSerialized);
}
return null;
}).whenComplete((unused, error) -> {
Expand All @@ -299,7 +310,7 @@ private void queueSerialization(String sessionId,
}

private void handleSessionSerialization(String sessionId,
Map<String, Object> attributes,
Duration timeToLive, Map<String, Object> attributes,
Consumer<SessionInfo> whenSerialized) {
long start = System.currentTimeMillis();
long timeout = start + optimisticSerializationTimeoutMs;
Expand All @@ -311,7 +322,7 @@ private void handleSessionSerialization(String sessionId,
sessionId, clusterKey);
while (System.currentTimeMillis() < timeout) {
SessionInfo info = serializeOptimisticLocking(sessionId,
attributes);
timeToLive, attributes);
if (info != null) {
pending.remove(sessionId); // Is this a race condition?
getLogger().debug(
Expand Down Expand Up @@ -343,21 +354,22 @@ private void handleSessionSerialization(String sessionId,
if (!unrecoverableError) { // NOSONAR
// Serializing using optimistic locking failed for a long time so be
// pessimistic and get it done
sessionInfo = serializePessimisticLocking(sessionId, attributes);
sessionInfo = serializePessimisticLocking(sessionId, timeToLive,
attributes);
}
whenSerialized.accept(sessionInfo);
}

private SessionInfo serializePessimisticLocking(String sessionId,
Map<String, Object> attributes) {
Duration timeToLive, Map<String, Object> attributes) {
long start = System.currentTimeMillis();
String clusterKey = getClusterKey(attributes);
Set<ReentrantLock> locks = getLocks(attributes);
for (ReentrantLock lock : locks) {
lock.lock();
}
try {
return doSerialize(sessionId, attributes);
return doSerialize(sessionId, timeToLive, attributes);
} catch (Exception e) {
getLogger().error(
"An error occurred during pessimistic serialization of session {} with distributed key {} ",
Expand Down Expand Up @@ -392,7 +404,8 @@ private Set<ReentrantLock> getLocks(Map<String, Object> attributes) {
}

private SessionInfo serializeOptimisticLocking(String sessionId,
Map<String, Object> attributes) throws IOException {
Duration timeToLive, Map<String, Object> attributes)
throws IOException {
String clusterKey = getClusterKey(attributes);
try {
long latestLockTime = findNewestLockTime(attributes);
Expand All @@ -406,7 +419,7 @@ private SessionInfo serializeOptimisticLocking(String sessionId,
return null;
}

SessionInfo info = doSerialize(sessionId, attributes);
SessionInfo info = doSerialize(sessionId, timeToLive, attributes);

long latestUnlockTimeCheck = findNewestUnlockTime(attributes);

Expand Down Expand Up @@ -487,7 +500,7 @@ private long findNewestUnlockTime(Map<String, Object> attributes) {
return latestUnlock;
}

private SessionInfo doSerialize(String sessionId,
private SessionInfo doSerialize(String sessionId, Duration timeToLive,
Map<String, Object> attributes) throws Exception {
long start = System.currentTimeMillis();
String clusterKey = getClusterKey(attributes);
Expand All @@ -502,7 +515,8 @@ private SessionInfo doSerialize(String sessionId,
throw ex;
}

SessionInfo info = new SessionInfo(clusterKey, out.toByteArray());
SessionInfo info = new SessionInfo(clusterKey, timeToLive,
out.toByteArray());

getLogger().debug(
"Serialization of attributes {} for session {} with distributed key {} completed in {}ms ({} bytes)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package com.vaadin.kubernetes.starter.sessiontracker.backend;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import com.hazelcast.core.HazelcastInstance;
Expand All @@ -35,7 +36,14 @@ public void sendSession(SessionInfo sessionInfo) {
getLogger().debug("Sending session {} to Hazelcast",
sessionInfo.getClusterKey());
String mapKey = getKey(sessionInfo.getClusterKey());
sessions.put(mapKey, sessionInfo.getData());
Duration timeToLive = sessionInfo.getTimeToLive();
if (timeToLive.isZero() || timeToLive.isNegative()) {
sessions.put(mapKey, sessionInfo.getData());
} else {
sessions.put(mapKey, sessionInfo.getData(), timeToLive.toSeconds(),
TimeUnit.SECONDS);
}

getLogger().debug("Session {} sent to Hazelcast",
sessionInfo.getClusterKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
*/
package com.vaadin.kubernetes.starter.sessiontracker.backend;

import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.types.Expiration;

import com.vaadin.kubernetes.starter.ProductUtils;

Expand All @@ -34,8 +38,15 @@ public void sendSession(SessionInfo sessionInfo) {
sessionInfo.getClusterKey());
try (RedisConnection connection = redisConnectionFactory
.getConnection()) {
connection.set(getKey(sessionInfo.getClusterKey()),
sessionInfo.getData());
byte[] key = getKey(sessionInfo.getClusterKey());
Duration timeToLive = sessionInfo.getTimeToLive();
if (timeToLive.isZero() || timeToLive.isNegative()) {
connection.set(key, sessionInfo.getData());
} else {
connection.set(key, sessionInfo.getData(),
Expiration.from(timeToLive),
RedisStringCommands.SetOption.UPSERT);
}
getLogger().debug("Session {} sent to Redis",
sessionInfo.getClusterKey());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.vaadin.kubernetes.starter.sessiontracker.backend;

import java.time.Duration;

/**
* A rule that determines the expiration for a backend session based on the
* current HTTP session timeout.
*/
public interface SessionExpirationPolicy {

/**
* Computes the maximum amount of time an inactive session should be
* preserved in the backed, based on the given HTTP session timeout
* expressed in seconds.
* <p>
* </p>
* A return value of {@link Duration#ZERO} or less means the backend session
* should never expire.
*
* @param sessionTimeout
* HTTP session timeout expressed in seconds.
* @return the maximum amount of time an inactive session should be
* preserved in the backed.
*/
Duration apply(long sessionTimeout);

/**
* A policy that prevents expiration.
*/
SessionExpirationPolicy NEVER = sessionTimeout -> Duration.ZERO;

}
Loading

0 comments on commit 59eeed5

Please sign in to comment.