Skip to content

Commit

Permalink
WebSockets Next: make it possible to store user data in a connection
Browse files Browse the repository at this point in the history
- resolves quarkusio#43772
  • Loading branch information
mkouba committed Oct 9, 2024
1 parent af494da commit 54d9914
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.quarkus.websockets.next.test.connection;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;
import java.util.List;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.UserData.TypedKey;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class ConnectionDataTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(MyEndpoint.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("/end")
URI baseUri;

@Test
void testConnectionData() {
try (WSClient client = WSClient.create(vertx).connect(baseUri)) {
assertEquals("4", client.sendAndAwaitReply("bar").toString());
assertEquals("FOOMartin", client.sendAndAwaitReply("foo").toString());
assertEquals("0", client.sendAndAwaitReply("bar").toString());
}
}

@WebSocket(path = "/end")
public static class MyEndpoint {

@OnOpen
void onOpen(WebSocketConnection connection) {
connection.userData().put(TypedKey.forLong("foo"), 42l);
connection.userData().put(TypedKey.forString("username"), "Martin");
connection.userData().put(TypedKey.forBoolean("isActive"), true);
connection.userData().put(new TypedKey<List<String>>("list"), List.of());
}

@OnTextMessage
public String onMessage(String message, WebSocketConnection connection) {
if ("bar".equals(message)) {
return connection.userData().size() + "";
}
try {
connection.userData().get(TypedKey.forString("foo")).toString();
throw new IllegalStateException();
} catch (ClassCastException expected) {
}
if (!connection.userData().get(TypedKey.forBoolean("isActive"))
|| !connection.userData().get(new TypedKey<List<String>>("list")).isEmpty()) {
return "NOK";
}
if (connection.userData().remove(TypedKey.forLong("foo")) != 42l) {
throw new IllegalStateException();
}
String ret = message.toUpperCase() + connection.userData().get(TypedKey.forString("username"));
connection.userData().clear();
return ret;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.quarkus.websockets.next;

import java.time.Instant;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;

/**
*
* @see WebSocketConnection
* @see WebSocketClientConnection
*/
public interface Connection extends BlockingSender {

/**
*
* @return the unique identifier assigned to this connection
*/
String id();

/**
*
* @param name
* @return the value of the path parameter or {@code null}
* @see WebSocketClient#path()
*/
String pathParam(String name);

/**
* @return {@code true} if the HTTP connection is encrypted via SSL/TLS
*/
boolean isSecure();

/**
* @return {@code true} if the WebSocket is closed
*/
boolean isClosed();

/**
*
* @return the close reason or {@code null} if the connection is not closed
*/
CloseReason closeReason();

/**
*
* @return {@code true} if the WebSocket is open
*/
default boolean isOpen() {
return !isClosed();
}

/**
* Close the connection.
*
* @return a new {@link Uni} with a {@code null} item
*/
@CheckReturnValue
default Uni<Void> close() {
return close(CloseReason.NORMAL);
}

/**
* Close the connection with a specific reason.
*
* @param reason
* @return a new {@link Uni} with a {@code null} item
*/
Uni<Void> close(CloseReason reason);

/**
* Close the connection and wait for the completion.
*/
default void closeAndAwait() {
close().await().indefinitely();
}

/**
* Close the connection with a specific reason and wait for the completion.
*/
default void closeAndAwait(CloseReason reason) {
close(reason).await().indefinitely();
}

/**
*
* @return the handshake request
*/
HandshakeRequest handshakeRequest();

/**
*
* @return the time when this connection was created
*/
Instant creationTime();

/**
*
* @return the user data associated with this connection
*/
UserData userData();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.websockets.next;

/**
* Mutable user data associated with a connection. Implementations must be thread-safe.
*/
public interface UserData {

/**
*
* @param <VALUE>
* @param key
* @return the value or {@code null} if no mapping is found
*/
<VALUE> VALUE get(TypedKey<VALUE> key);

/**
* Associates the specified value with the specified key. An old value is replaced by the specified value.
*
* @param <ConnectionData.VALUE>
* @param key
* @param value
* @return the previous value associated with {@code key}, or {@code null} if no mapping exists
*/
<VALUE> VALUE put(TypedKey<VALUE> key, VALUE value);

/**
*
* @param <VALUE>
* @param key
*/
<VALUE> VALUE remove(TypedKey<VALUE> key);

int size();

void clear();

/**
* @param <TYPE> The type this key is used for.
*/
record TypedKey<TYPE>(String value) {

public static TypedKey<Long> forLong(String val) {
return new TypedKey<>(val);
}

public static TypedKey<String> forString(String val) {
return new TypedKey<>(val);
}

public static TypedKey<Boolean> forBoolean(String val) {
return new TypedKey<>(val);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.quarkus.websockets.next;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Uni;

/**
* This interface represents a client connection to a WebSocket endpoint.
Expand All @@ -11,87 +9,11 @@
* endpoint and used to interact with the connected server.
*/
@Experimental("This API is experimental and may change in the future")
public interface WebSocketClientConnection extends Sender, BlockingSender {

/**
*
* @return the unique identifier assigned to this connection
*/
String id();
public interface WebSocketClientConnection extends Connection {

/*
* @return the client id
*/
String clientId();

/**
*
* @param name
* @return the value of the path parameter or {@code null}
* @see WebSocketClient#path()
*/
String pathParam(String name);

/**
* @return {@code true} if the HTTP connection is encrypted via SSL/TLS
*/
boolean isSecure();

/**
* @return {@code true} if the WebSocket is closed
*/
boolean isClosed();

/**
*
* @return the close reason or {@code null} if the connection is not closed
*/
CloseReason closeReason();

/**
*
* @return {@code true} if the WebSocket is open
*/
default boolean isOpen() {
return !isClosed();
}

/**
* Close the connection.
*
* @return a new {@link Uni} with a {@code null} item
*/
@CheckReturnValue
default Uni<Void> close() {
return close(CloseReason.NORMAL);
}

/**
* Close the connection with a specific reason.
*
* @param reason
* @return a new {@link Uni} with a {@code null} item
*/
Uni<Void> close(CloseReason reason);

/**
* Close the connection and wait for the completion.
*/
default void closeAndAwait() {
close().await().indefinitely();
}

/**
* Close the connection with a specific reason and wait for the completion.
*/
default void closeAndAwait(CloseReason reason) {
close(reason).await().indefinitely();
}

/**
*
* @return the handshake request
*/
HandshakeRequest handshakeRequest();

}
Loading

0 comments on commit 54d9914

Please sign in to comment.