+ * A binary message is always represented as a {@link io.vertx.core.buffer.Buffer}. Therefore, the following conversion rules
+ * apply. The types listed below are handled specifically. For all other types a {@link BinaryMessageCodec} is used to encode
+ * and decode input and
+ * output messages. By default, the first input codec that supports the message type is used; codecs with higher priority go
+ * first. However, a specific codec can be selected with {@link #inputCodec()} and {@link #outputCodec()}.
+ *
+ *
+ * - {@code java.lang.Buffer} is used as is,
+ * - {@code byte[]} is encoded with {@link io.vertx.core.buffer.Buffer#buffer(byte[])} and decoded with
+ * {@link io.vertx.core.buffer.Buffer#getBytes()},
+ * - {@code java.lang.String} is encoded with {@link io.vertx.core.buffer.Buffer#buffer(String)} and decoded with
+ * {@link io.vertx.core.buffer.Buffer#toString()},
+ * - {@code io.vertx.core.json.JsonObject} is encoded with {@link io.vertx.core.json.JsonObject#toBuffer()} and decoded with
+ * {@link io.vertx.core.json.JsonObject#JsonObject(io.vertx.core.buffer.Buffer)}.
+ * - {@code io.vertx.core.json.JsonArray} is encoded with {@link io.vertx.core.json.JsonArray#toBuffer()} and decoded with
+ * {@link io.vertx.core.json.JsonArray#JsonArray(io.vertx.core.buffer.Buffer)}.
+ *
+ *
+ * @see BinaryMessageCodec
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+@Experimental("This API is experimental and may change in the future")
+public @interface BinaryMessage {
+
+ /**
+ * The codec used for input messages.
+ *
+ * By default, the first codec that supports the message type is used; codecs with higher priority go first.
+ *
+ * Note that, if specified, the codec is also used for output messages unless {@link #outputCodec()} returns a non-default
+ * value.
+ */
+ @SuppressWarnings("rawtypes")
+ Class extends BinaryMessageCodec> inputCodec() default BinaryMessageCodec.class;
+
+ /**
+ * The codec used for output messages.
+ *
+ * By default, the same codec as for the input message is used.
+ */
+ @SuppressWarnings("rawtypes")
+ Class extends BinaryMessageCodec> outputCodec() default BinaryMessageCodec.class;
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/BinaryMessageCodec.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/BinaryMessageCodec.java
new file mode 100644
index 0000000000000..2755e92239ed3
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/BinaryMessageCodec.java
@@ -0,0 +1,15 @@
+package io.quarkus.websockets.next;
+
+import io.smallrye.common.annotation.Experimental;
+import io.vertx.core.buffer.Buffer;
+
+/**
+ * Used to encode and decode binary messages.
+ *
+ * @param
+ * @see BinaryMessage
+ */
+@Experimental("This API is experimental and may change in the future")
+public interface BinaryMessageCodec extends MessageCodec {
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/BlockingSender.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/BlockingSender.java
new file mode 100644
index 0000000000000..67f0d1a471def
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/BlockingSender.java
@@ -0,0 +1,50 @@
+package io.quarkus.websockets.next;
+
+import io.smallrye.common.annotation.Experimental;
+import io.vertx.core.buffer.Buffer;
+
+/**
+ * Sends a message to the connected WebSocket client and waits for the completion.
+ *
+ * Note that blocking sender methods should never be called on an event loop thread.
+ */
+@Experimental("This API is experimental and may change in the future")
+public interface BlockingSender extends Sender {
+
+ /**
+ * Sends a text message and waits for the completion.
+ *
+ * @param message
+ */
+ default void sendTextAndAwait(String message) {
+ sendText(message).await().indefinitely();
+ }
+
+ /**
+ * Sends a text message and waits for the completion.
+ *
+ * @param
+ * @param message
+ */
+ default void sendTextAndAwait(M message) {
+ sendText(message).await().indefinitely();
+ }
+
+ /**
+ * Sends a binary message and waits for the completion.
+ *
+ * @param message
+ */
+ default void sendBinaryAndAwait(Buffer message) {
+ sendBinary(message).await().indefinitely();
+ }
+
+ /**
+ * Sends a binary message and waits for the completion.
+ *
+ * @param message
+ */
+ default void sendBinaryAndAwait(byte[] message) {
+ sendBinary(message).await().indefinitely();
+ }
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/MessageCodec.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/MessageCodec.java
new file mode 100644
index 0000000000000..eaf84d311e8c4
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/MessageCodec.java
@@ -0,0 +1,56 @@
+package io.quarkus.websockets.next;
+
+import java.lang.reflect.Type;
+
+import io.smallrye.common.annotation.Experimental;
+
+/**
+ * Used to encode and decode messages.
+ *
+ * Special types of messages
+ * Some types of messages bypass the encoding/decoding process:
+ *
+ * - {@code java.lang.Buffer},
+ * - {@code byte[]},
+ * - {@code java.lang.String},
+ * - {@code io.vertx.core.json.JsonObject}.
+ * - {@code io.vertx.core.json.JsonArray}.
+ *
+ * The encoding/decoding details are described in {@link BinaryMessage} and {@link TextMessage}.
+ *
+ * CDI beans
+ * Implementation classes must be CDI beans. Qualifiers are ignored. {@link jakarta.enterprise.context.Dependent} beans are
+ * reused during encoding/decoding.
+ *
+ * Lifecycle and concurrency
+ * Codecs are shared accross all WebSocket connections. Therefore, implementations should be either stateless or thread-safe.
+ *
+ * @param
+ * @param
+ */
+@Experimental("This API is experimental and may change in the future")
+public interface MessageCodec {
+
+ /**
+ *
+ * @param type the type to handle, must not be {@code null}
+ * @return {@code true} if this codec can encode/decode the provided type, {@code false} otherwise
+ */
+ boolean supports(Type type);
+
+ /**
+ *
+ * @param value the value to encode, must not be {@code null}
+ * @return the encoded representation of the value
+ */
+ MESSAGE encode(T value);
+
+ /**
+ *
+ * @param type the type of the object to decode, must not be {@code null}
+ * @param value the value to decode, must not be {@code null}
+ * @return the decoded representation of the value
+ */
+ T decode(Type type, MESSAGE value);
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnClose.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnClose.java
new file mode 100644
index 0000000000000..b370a2a747f13
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnClose.java
@@ -0,0 +1,22 @@
+package io.quarkus.websockets.next;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import io.smallrye.common.annotation.Experimental;
+
+/**
+ * A method of an {@link WebSocket} endpoint annotated with this annotation is invoked when the client disconnects from the
+ * socket.
+ *
+ * An endpoint may only declare one method annotated with this annotation.
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+@Experimental("This API is experimental and may change in the future")
+public @interface OnClose {
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnMessage.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnMessage.java
new file mode 100644
index 0000000000000..7079ab430ea32
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnMessage.java
@@ -0,0 +1,28 @@
+package io.quarkus.websockets.next;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import io.smallrye.common.annotation.Experimental;
+
+/**
+ * A method of an {@link WebSocket} endpoint annotated with this annotation is invoked when an incoming message is received.
+ *
+ * An endpoint may only declare one method annotated with this annotation.
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+@Experimental("This API is experimental and may change in the future")
+public @interface OnMessage {
+
+ /**
+ *
+ * @return {@code true} if all the connected clients should receive the objects emitted by the annotated method
+ * @see WebSocketServerConnection#broadcast()
+ */
+ public boolean broadcast() default false;
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnOpen.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnOpen.java
new file mode 100644
index 0000000000000..a16d1ec35feb8
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnOpen.java
@@ -0,0 +1,28 @@
+package io.quarkus.websockets.next;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import io.smallrye.common.annotation.Experimental;
+
+/**
+ * A method of an {@link WebSocket} endpoint annotated with this annotation is invoked when the client connects to a web socket
+ * endpoint.
+ *
+ * An endpoint may only declare one method annotated with this annotation.
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+@Experimental("This API is experimental and may change in the future")
+public @interface OnOpen {
+
+ /**
+ * @return {@code true} if all the connected clients should receive the objects emitted by the annotated method
+ * @see WebSocketServerConnection#broadcast()
+ */
+ public boolean broadcast() default false;
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/Sender.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/Sender.java
new file mode 100644
index 0000000000000..b09795f239c4e
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/Sender.java
@@ -0,0 +1,52 @@
+package io.quarkus.websockets.next;
+
+import io.smallrye.common.annotation.CheckReturnValue;
+import io.smallrye.common.annotation.Experimental;
+import io.smallrye.mutiny.Uni;
+import io.vertx.core.buffer.Buffer;
+
+/**
+ * Sends a message to the connected WebSocket client.
+ */
+@Experimental("This API is experimental and may change in the future")
+public interface Sender {
+
+ /**
+ * Send a text message.
+ *
+ * @param message
+ * @return a new {@link Uni} with a {@code null} item
+ */
+ @CheckReturnValue
+ Uni sendText(String message);
+
+ /**
+ * Send a text message.
+ *
+ * @param
+ * @param message
+ * @return a new {@link Uni} with a {@code null} item
+ */
+ @CheckReturnValue
+ Uni sendText(M message);
+
+ /**
+ * Send a binary message.
+ *
+ * @param message
+ * @return a new {@link Uni} with a {@code null} item
+ */
+ @CheckReturnValue
+ Uni sendBinary(Buffer message);
+
+ /**
+ * Send a binary message.
+ *
+ * @param message
+ */
+ @CheckReturnValue
+ default Uni sendBinary(byte[] message) {
+ return sendBinary(Buffer.buffer(message));
+ }
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/TextMessage.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/TextMessage.java
new file mode 100644
index 0000000000000..25599a36e587b
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/TextMessage.java
@@ -0,0 +1,56 @@
+package io.quarkus.websockets.next;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import io.smallrye.common.annotation.Experimental;
+
+/**
+ * The annotated method consumes/produces text messages.
+ *
+ * A text message is always represented as a {@link String}. Therefore, the following conversion rules apply. The types listed
+ * below are handled specifically. For all other types a {@link TextMessageCodec} is used to encode and decode input and
+ * output messages. By default, the first input codec that supports the message type is used; codecs with higher priority go
+ * first. However, a specific codec can be selected with {@link #inputCodec()} and {@link #outputCodec()}.
+ *
+ *
+ * - {@code java.lang.String} is used as is,
+ * - {@code io.vertx.core.json.JsonObject} is encoded with {@link io.vertx.core.json.JsonObject#encode()} and decoded with
+ * {@link io.vertx.core.json.JsonObject#JsonObject(String))}.
+ * - {@code io.vertx.core.json.JsonArray} is encoded with {@link io.vertx.core.json.JsonArray#encode()} and decoded with
+ * {@link io.vertx.core.json.JsonArray#JsonArray(String))}.
+ * - {@code java.lang.Buffer} is encoded with {@link io.vertx.core.buffer.Buffer#toString()} and decoded with
+ * {@link io.vertx.core.buffer.Buffer#buffer(String)},
+ * - {@code byte[]} is first converted to {@link io.vertx.core.buffer.Buffer} and then converted as defined above,
+ *
+ *
+ * @see TextMessageCodec
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+@Experimental("This API is experimental and may change in the future")
+public @interface TextMessage {
+
+ /**
+ * The codec used for input messages.
+ *
+ * By default, the first codec that supports the message type is used; codecs with higher priority go first.
+ *
+ * Note that, if specified, the codec is also used for output messages unless {@link #outputCodec()} returns a non-default
+ * value.
+ */
+ @SuppressWarnings("rawtypes")
+ Class extends TextMessageCodec> inputCodec() default TextMessageCodec.class;
+
+ /**
+ * The codec used for output messages.
+ *
+ * By default, the same codec as for the input message is used.
+ */
+ @SuppressWarnings("rawtypes")
+ Class extends TextMessageCodec> outputCodec() default TextMessageCodec.class;
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/TextMessageCodec.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/TextMessageCodec.java
new file mode 100644
index 0000000000000..1146f686c3553
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/TextMessageCodec.java
@@ -0,0 +1,14 @@
+package io.quarkus.websockets.next;
+
+import io.smallrye.common.annotation.Experimental;
+
+/**
+ * Used to encode and decode text messages.
+ *
+ * @param
+ * @see TextMessage
+ */
+@Experimental("This API is experimental and may change in the future")
+public interface TextMessageCodec extends MessageCodec {
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocket.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocket.java
new file mode 100644
index 0000000000000..78f1849b6e2d3
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocket.java
@@ -0,0 +1,55 @@
+package io.quarkus.websockets.next;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import io.smallrye.common.annotation.Experimental;
+
+/**
+ * Denotes a WebSocket endpoint.
+ */
+@Retention(RUNTIME)
+@Target(TYPE)
+@Experimental("This API is experimental and may change in the future")
+public @interface WebSocket {
+
+ /**
+ * The path of the endpoint.
+ *
+ * It is possible to match path parameters. The placeholder of a path parameter consists of the parameter name surrounded by
+ * curly brackets. The actual value of a path parameter can be obtained using
+ * {@link WebSocketServerConnection#pathParam(String)}. For example, the path /foo/{bar}
defines the path
+ * parameter {@code bar}.
+ *
+ * @see WebSocketServerConnection#pathParam(String)
+ */
+ public String path();
+
+ /**
+ * The execution mode used to process incoming messages for a specific connection.
+ */
+ public ExecutionMode executionMode() default ExecutionMode.SERIAL;
+
+ /**
+ * Defines the execution mode used to process incoming messages for a specific connection.
+ *
+ * @see WebSocketServerConnection
+ */
+ enum ExecutionMode {
+
+ /**
+ * Messages are processed serially, ordering is guaranteed.
+ */
+ SERIAL,
+
+ /**
+ * Messages are processed concurrently, there are no ordering guarantees.
+ */
+ CONCURRENT,
+
+ }
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketServerConnection.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketServerConnection.java
new file mode 100644
index 0000000000000..78e631307e27a
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketServerConnection.java
@@ -0,0 +1,171 @@
+package io.quarkus.websockets.next;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import io.smallrye.common.annotation.CheckReturnValue;
+import io.smallrye.common.annotation.Experimental;
+import io.smallrye.mutiny.Uni;
+
+/**
+ * This interface represents a connection from a client to a specific {@link WebSocket} endpoint on the server.
+ *
+ * Quarkus provides a built-in CDI bean of type {@code WebSocketServerConnection} that can be injected in a {@link WebSocket}
+ * endpoint and used to interact with the connected client, or all clients connected to the endpoint respectively
+ * (broadcasting).
+ *
+ * Specifically, it is possible to send messages using blocking and non-blocking methods, declared on
+ * {@link BlockingSender} and {@link Sender} respectively.
+ */
+@Experimental("This API is experimental and may change in the future")
+public interface WebSocketServerConnection extends Sender, BlockingSender {
+
+ /**
+ *
+ * @return the unique identifier assigned to this connection
+ */
+ String id();
+
+ /**
+ *
+ * @param name
+ * @return the actual value of the path parameter or null
+ * @see WebSocket#path()
+ */
+ String pathParam(String name);
+
+ /**
+ * Sends messages to all open clients connected to the same WebSocket endpoint.
+ *
+ * @return the broadcast sender
+ * @see #getOpenConnections()
+ */
+ BroadcastSender broadcast();
+
+ /**
+ * Sends messages to all open clients connected to the same WebSocket endpoint and matching the given filter predicate.
+ *
+ * @param filter
+ * @return the broadcast sender
+ * @see #getOpenConnections()
+ */
+ BroadcastSender broadcast(Predicate filter);
+
+ /**
+ * The returned set also includes the connection this method is called upon.
+ *
+ * @return the set of open connections to the same endpoint
+ */
+ Set getOpenConnections();
+
+ /**
+ * @return {@code true} if the HTTP connection is encrypted via SSL/TLS
+ */
+ boolean isSecure();
+
+ /**
+ * @return {@code true} if the WebSocket is closed
+ */
+ boolean isClosed();
+
+ /**
+ *
+ * @return {@code true} if the WebSocket is open
+ */
+ default boolean isOpen() {
+ return !isClosed();
+ }
+
+ /**
+ * Close the connection.
+ *
+ * @return a new {@link Uni} with a {@code null} item
+ */
+ @CheckReturnValue
+ Uni close();
+
+ /**
+ * Close the connection.
+ */
+ default void closeAndAwait() {
+ close().await().indefinitely();
+ }
+
+ /**
+ *
+ * @return the handshake request
+ */
+ HandshakeRequest handshakeRequest();
+
+ /**
+ * Makes it possible to send messages to all clients connected to the same WebSocket endpoint.
+ *
+ * @see WebSocketServerConnection#getOpenConnections()
+ */
+ interface BroadcastSender extends Sender, BlockingSender {
+
+ }
+
+ /**
+ * Provides some useful information about the initial handshake request.
+ */
+ interface HandshakeRequest {
+
+ /**
+ * The name is case insensitive.
+ *
+ * @param name
+ * @return the first header value for the given header name, or {@code null}
+ */
+ String header(String name);
+
+ /**
+ * The name is case insensitive.
+ *
+ * @param name
+ * @return an immutable list of header values for the given header name, never {@code null}
+ */
+ List headers(String name);
+
+ /**
+ * Returned header names are lower case.
+ *
+ * @return an immutable map of header names to header values
+ */
+ Map> headers();
+
+ /**
+ *
+ * @return the scheme
+ */
+ String scheme();
+
+ /**
+ *
+ * @return the host
+ */
+ String host();
+
+ /**
+ *
+ * @return the port
+ */
+ int port();
+
+ /**
+ *
+ * @return the path
+ */
+ String path();
+
+ /**
+ *
+ * @return the query string
+ */
+ String query();
+
+ }
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketServerException.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketServerException.java
new file mode 100644
index 0000000000000..c226d78983d47
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketServerException.java
@@ -0,0 +1,22 @@
+package io.quarkus.websockets.next;
+
+import io.smallrye.common.annotation.Experimental;
+
+@Experimental("This API is experimental and may change in the future")
+public class WebSocketServerException extends RuntimeException {
+
+ private static final long serialVersionUID = 903932032264812404L;
+
+ public WebSocketServerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public WebSocketServerException(String message) {
+ super(message);
+ }
+
+ public WebSocketServerException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java
new file mode 100644
index 0000000000000..6ef568f6345f7
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java
@@ -0,0 +1,21 @@
+package io.quarkus.websockets.next;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import io.quarkus.runtime.annotations.ConfigPhase;
+import io.quarkus.runtime.annotations.ConfigRoot;
+import io.smallrye.config.ConfigMapping;
+
+@ConfigMapping(prefix = "quarkus.websockets-next")
+@ConfigRoot(phase = ConfigPhase.RUN_TIME)
+public interface WebSocketsRuntimeConfig {
+
+ /**
+ * TODO Not implemented yet.
+ *
+ * The default timeout to complete processing of a message.
+ */
+ Optional timeout();
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/Codecs.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/Codecs.java
new file mode 100644
index 0000000000000..4bb5d61c9a8fd
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/Codecs.java
@@ -0,0 +1,169 @@
+package io.quarkus.websockets.next.runtime;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+import jakarta.inject.Singleton;
+
+import org.jboss.logging.Logger;
+
+import io.quarkus.arc.All;
+import io.quarkus.websockets.next.BinaryMessageCodec;
+import io.quarkus.websockets.next.MessageCodec;
+import io.quarkus.websockets.next.TextMessageCodec;
+import io.vertx.core.buffer.Buffer;
+
+@Singleton
+public class Codecs {
+
+ private static final Logger LOG = Logger.getLogger(Codecs.class);
+
+ @All
+ List> textCodecs;
+
+ @All
+ List> binaryCodecs;
+
+ public Object textDecode(Type type, String value, Class> codecBeanClass) {
+ if (codecBeanClass != null) {
+ for (TextMessageCodec> codec : textCodecs) {
+ if (codec.getClass().equals(codecBeanClass)) {
+ if (!codec.supports(type)) {
+ throw forcedCannotHandle(false, codec, type);
+ }
+ try {
+ return codec.decode(type, value);
+ } catch (Exception e) {
+ throw unableToDecode(false, codec, e);
+ }
+ }
+ }
+ } else {
+ for (TextMessageCodec> codec : textCodecs) {
+ if (codec.supports(type)) {
+ try {
+ return codec.decode(type, value);
+ } catch (Exception e) {
+ throw unableToDecode(false, codec, e);
+ }
+ }
+ }
+ }
+
+ throw noCodec(false, type);
+ }
+
+ public String textEncode(T message, Class> codecBeanClass) {
+ Class> type = message.getClass();
+ if (codecBeanClass != null) {
+ for (TextMessageCodec> codec : textCodecs) {
+ if (codec.getClass().equals(codecBeanClass)) {
+ if (!codec.supports(type)) {
+ throw forcedCannotHandle(false, codec, type);
+ }
+ try {
+ return codec.encode(cast(message));
+ } catch (Exception e) {
+ throw unableToEncode(false, codec, e);
+ }
+ }
+ }
+ } else {
+ for (TextMessageCodec> codec : textCodecs) {
+ if (codec.supports(type)) {
+ try {
+ return codec.encode(cast(message));
+ } catch (Exception e) {
+ throw unableToEncode(false, codec, e);
+ }
+ }
+ }
+ }
+ throw noCodec(false, type);
+ }
+
+ public Object binaryDecode(Type type, Buffer value, Class> codecBeanClass) {
+ if (codecBeanClass != null) {
+ for (BinaryMessageCodec> codec : binaryCodecs) {
+ if (codec.getClass().equals(codecBeanClass)) {
+ if (!codec.supports(type)) {
+ throw forcedCannotHandle(false, codec, type);
+ }
+ try {
+ return codec.decode(type, value);
+ } catch (Exception e) {
+ throw unableToDecode(false, codec, e);
+ }
+ }
+ }
+ } else {
+ for (BinaryMessageCodec> codec : binaryCodecs) {
+ if (codec.supports(type)) {
+ try {
+ return codec.decode(type, value);
+ } catch (Exception e) {
+ LOG.errorf(e, "Unable to decode binary message with %s", codec.getClass().getName());
+ }
+ }
+ }
+ }
+ throw noCodec(true, type);
+ }
+
+ public Buffer binaryEncode(T message, Class> codecBeanClass) {
+ Class> type = message.getClass();
+ if (codecBeanClass != null) {
+ for (BinaryMessageCodec> codec : binaryCodecs) {
+ if (codec.getClass().equals(codecBeanClass)) {
+ if (!codec.supports(type)) {
+ throw forcedCannotHandle(false, codec, type);
+ }
+ try {
+ return codec.encode(cast(message));
+ } catch (Exception e) {
+ throw unableToEncode(false, codec, e);
+ }
+ }
+ }
+ } else {
+ for (BinaryMessageCodec> codec : binaryCodecs) {
+ if (codec.supports(type)) {
+ try {
+ return codec.encode(cast(message));
+ } catch (Exception e) {
+ throw unableToEncode(true, codec, e);
+ }
+ }
+ }
+ }
+ throw noCodec(true, type);
+ }
+
+ IllegalStateException noCodec(boolean binary, Type type) {
+ String message = String.format("No %s codec handles the type %s", binary ? "binary" : "text", type);
+ throw new IllegalStateException(message);
+ }
+
+ IllegalStateException unableToEncode(boolean binary, MessageCodec, ?> codec, Exception e) {
+ String message = String.format("Unable to encode %s message with %s", binary ? "binary" : "text",
+ codec.getClass().getName());
+ throw new IllegalStateException(message, e);
+ }
+
+ IllegalStateException unableToDecode(boolean binary, MessageCodec, ?> codec, Exception e) {
+ String message = String.format("Unable to decode %s message with %s", binary ? "binary" : "text",
+ codec.getClass().getName());
+ throw new IllegalStateException(message, e);
+ }
+
+ IllegalStateException forcedCannotHandle(boolean binary, MessageCodec, ?> codec, Type type) {
+ throw new IllegalStateException(
+ String.format("Forced %s codec [%s] cannot handle the type %s", binary ? "binary" : "text",
+ codec.getClass().getName(), type));
+ }
+
+ @SuppressWarnings("unchecked")
+ static T cast(Object obj) {
+ return (T) obj;
+ }
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConcurrencyLimiter.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConcurrencyLimiter.java
new file mode 100644
index 0000000000000..67ba402aab090
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConcurrencyLimiter.java
@@ -0,0 +1,102 @@
+package io.quarkus.websockets.next.runtime;
+
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.logging.Logger;
+
+import io.quarkus.websockets.next.WebSocketServerConnection;
+import io.smallrye.mutiny.helpers.queues.Queues;
+import io.vertx.core.Context;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+
+/**
+ * Used to limit concurrent invocations.
+ */
+class ConcurrencyLimiter {
+
+ private static final Logger LOG = Logger.getLogger(ConcurrencyLimiter.class);
+
+ private final WebSocketServerConnection connection;
+ private final Queue queue;
+ private final AtomicLong uncompleted;
+ private final AtomicLong queueCounter;
+
+ ConcurrencyLimiter(WebSocketServerConnection connection) {
+ this.connection = connection;
+ this.uncompleted = new AtomicLong();
+ this.queueCounter = new AtomicLong();
+ this.queue = Queues.createMpscQueue();
+ }
+
+ /**
+ * This method must be always used before {@link #run(Runnable)} and the returned callback must be always invoked when an
+ * async computation completes.
+ *
+ * @param promise
+ * @return a new callback to complete the given promise
+ */
+ PromiseComplete newComplete(Promise promise) {
+ return new PromiseComplete(promise);
+ }
+
+ /**
+ * Run or queue up the given action.
+ *
+ * @param action
+ * @param context
+ */
+ void run(Context context, Runnable action) {
+ if (uncompleted.compareAndSet(0, 1)) {
+ LOG.debugf("Run action: %s", connection);
+ action.run();
+ } else {
+ long queueIndex = queueCounter.incrementAndGet();
+ LOG.debugf("Action queued as %s: %s", queueIndex, connection);
+ queue.offer(new Action(queueIndex, action, context));
+ // We need to make sure that at least one completion is in flight
+ if (uncompleted.getAndIncrement() == 0) {
+ Action queuedAction = queue.poll();
+ assert queuedAction != null;
+ LOG.debugf("Run action %s from queue: %s", queuedAction.queueIndex, connection);
+ queuedAction.runnable.run();
+ }
+ }
+ }
+
+ class PromiseComplete {
+
+ final Promise promise;
+
+ private PromiseComplete(Promise promise) {
+ this.promise = promise;
+ }
+
+ void failure(Throwable t) {
+ complete();
+ }
+
+ void complete() {
+ try {
+ promise.complete();
+ } finally {
+ if (uncompleted.decrementAndGet() == 0) {
+ return;
+ }
+ Action queuedAction = queue.poll();
+ assert queuedAction != null;
+ LOG.debugf("Run action %s from queue: %s", queuedAction.queueIndex, connection);
+ queuedAction.context.runOnContext(new Handler() {
+ @Override
+ public void handle(Void event) {
+ queuedAction.runnable.run();
+ }
+ });
+ }
+ }
+ }
+
+ record Action(long queueIndex, Runnable runnable, Context context) {
+ }
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java
new file mode 100644
index 0000000000000..71f368e6f5188
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java
@@ -0,0 +1,37 @@
+package io.quarkus.websockets.next.runtime;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import jakarta.annotation.PreDestroy;
+import jakarta.inject.Singleton;
+
+import io.quarkus.websockets.next.WebSocketServerConnection;
+
+@Singleton
+public class ConnectionManager {
+
+ private final ConcurrentMap> endpointToConnections = new ConcurrentHashMap<>();
+
+ void add(String endpoint, WebSocketServerConnection connection) {
+ endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection);
+ }
+
+ void remove(String endpoint, WebSocketServerConnection connection) {
+ Set connections = endpointToConnections.get(endpoint);
+ if (connections != null) {
+ connections.remove(connection);
+ }
+ }
+
+ Set getConnections(String endpoint) {
+ return endpointToConnections.get(endpoint);
+ }
+
+ @PreDestroy
+ void destroy() {
+ endpointToConnections.clear();
+ }
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java
new file mode 100644
index 0000000000000..091f939c4de10
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java
@@ -0,0 +1,67 @@
+package io.quarkus.websockets.next.runtime;
+
+import org.jboss.logging.Logger;
+
+import io.quarkus.arc.ManagedContext;
+import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
+import io.quarkus.websockets.next.WebSocketServerConnection;
+import io.quarkus.websockets.next.runtime.WebSocketSessionContext.SessionContextState;
+import io.smallrye.common.vertx.VertxContext;
+import io.vertx.core.Context;
+
+public class ContextSupport {
+
+ private static final Logger LOG = Logger.getLogger(ContextSupport.class);
+
+ private final WebSocketServerConnection connection;
+ private final SessionContextState sessionContextState;
+ private final WebSocketSessionContext sessionContext;
+ private final ManagedContext requestContext;
+
+ ContextSupport(WebSocketServerConnection connection, SessionContextState sessionContextState,
+ WebSocketSessionContext sessionContext,
+ ManagedContext requestContext) {
+ this.connection = connection;
+ this.sessionContextState = sessionContextState;
+ this.sessionContext = sessionContext;
+ this.requestContext = requestContext;
+ }
+
+ void start() {
+ LOG.debugf("Start contexts: %s", connection);
+ startSession();
+ // Activate a new request context
+ requestContext.activate();
+ }
+
+ void startSession() {
+ // Activate the captured session context
+ sessionContext.activate(sessionContextState);
+ }
+
+ void end(boolean terminateSession) {
+ LOG.debugf("End contexts: %s", connection);
+ requestContext.terminate();
+ if (terminateSession) {
+ // OnClose - terminate the session context
+ endSession();
+ } else {
+ sessionContext.deactivate();
+ }
+ }
+
+ void endSession() {
+ sessionContext.terminate();
+ }
+
+ static Context createNewDuplicatedContext(Context context, WebSocketServerConnection connection) {
+ Context duplicated = VertxContext.createNewDuplicatedContext(context);
+ VertxContextSafetyToggle.setContextSafe(duplicated, true);
+ // We need to store the connection in the duplicated context
+ // It's used to initialize the synthetic bean later on
+ duplicated.putLocal(WebSocketServerRecorder.WEB_SOCKET_CONN_KEY, connection);
+ LOG.debugf("New vertx duplicated context [%s] created: %s", duplicated, connection);
+ return duplicated;
+ }
+
+}
diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/JsonTextMessageCodec.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/JsonTextMessageCodec.java
new file mode 100644
index 0000000000000..52ae78ffca171
--- /dev/null
+++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/JsonTextMessageCodec.java
@@ -0,0 +1,53 @@
+package io.quarkus.websockets.next.runtime;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import jakarta.annotation.Priority;
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.quarkus.websockets.next.TextMessageCodec;
+
+@Singleton
+@Priority(0)
+public class JsonTextMessageCodec implements TextMessageCodec