Skip to content

Commit

Permalink
Fix Jetty WebSocket test failures for WebFlux
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Sep 17, 2021
1 parent 48875dc commit 513cc15
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 82 deletions.
1 change: 1 addition & 0 deletions spring-web/spring-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,5 @@ dependencies {
testFixturesImplementation("org.bouncycastle:bcpkix-jdk15on") {
because("needed by Netty's SelfSignedCertificate on JDK 15+")
}
testFixturesImplementation("org.eclipse.jetty.websocket:websocket-jetty-server")
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;

import org.springframework.http.server.reactive.JettyHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServletHttpHandlerAdapter;
Expand All @@ -45,6 +46,7 @@ protected void initServer() throws Exception {

this.contextHandler = new ServletContextHandler(this.jettyServer, "", false, false);
this.contextHandler.addServlet(servletHolder, "/");
this.contextHandler.addServletContainerInitializer(new JettyWebSocketServletContainerInitializer());
this.contextHandler.start();

ServerConnector connector = new ServerConnector(this.jettyServer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandle
Object jettyHandler = createHandler(
url, ContextWebSocketHandler.decorate(handler, contextView), completionSink);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setHeaders(headers);
request.setSubProtocols(handler.getSubProtocols());
try {
this.jettyClient.connect(jettyHandler, url, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,21 @@

package org.springframework.web.reactive.socket.server.upgrade;

import java.lang.reflect.Method;
import java.util.function.Supplier;

import jakarta.servlet.ServletContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.eclipse.jetty.websocket.server.JettyWebSocketCreator;
import org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer;
import reactor.core.publisher.Mono;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.target.EmptyTargetSource;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler;
Expand All @@ -53,36 +47,6 @@
*/
public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy {

private static final Class<?> webSocketCreatorClass;

private static final Method getContainerMethod;

private static final Method upgradeMethod;

private static final Method setAcceptedSubProtocol;

static {
// TODO: can switch to non-reflective implementation now

ClassLoader loader = JettyRequestUpgradeStrategy.class.getClassLoader();
try {
webSocketCreatorClass = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketCreator");

Class<?> type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer");
getContainerMethod = type.getMethod("getContainer", ServletContext.class);
Method upgrade = ReflectionUtils.findMethod(type, "upgrade", (Class<?>[]) null);
Assert.state(upgrade != null, "Upgrade method not found");
upgradeMethod = upgrade;

type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse");
setAcceptedSubProtocol = type.getMethod("setAcceptedSubProtocol", String.class);
}
catch (Exception ex) {
throw new IllegalStateException("No compatible Jetty version found", ex);
}
}


@Override
public Mono<Void> upgrade(
ServerWebExchange exchange, WebSocketHandler handler,
Expand All @@ -98,62 +62,31 @@ public Mono<Void> upgrade(
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();

// Trigger WebFlux preCommit actions and upgrade
// Trigger WebFlux preCommit actions before upgrade
return exchange.getResponse().setComplete()
.then(Mono.deferContextual(contextView -> {
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
ContextWebSocketHandler.decorate(handler, contextView),
session -> new JettyWebSocketSession(session, handshakeInfo, factory));

JettyWebSocketCreator webSocketCreator = (upgradeRequest, upgradeResponse) -> {
if (subProtocol != null) {
upgradeResponse.setAcceptedSubProtocol(subProtocol);
}
return adapter;
};

JettyWebSocketServerContainer container = JettyWebSocketServerContainer.getContainer(servletContext);

try {
Object creator = createJettyWebSocketCreator(adapter, subProtocol);
Object container = ReflectionUtils.invokeMethod(getContainerMethod, null, servletContext);
ReflectionUtils.invokeMethod(upgradeMethod, container, creator, servletRequest, servletResponse);
container.upgrade(webSocketCreator, servletRequest, servletResponse);
}
catch (Exception ex) {
return Mono.error(ex);
}

return Mono.empty();
}));
}

private static Object createJettyWebSocketCreator(
JettyWebSocketHandlerAdapter adapter, @Nullable String protocol) {

ProxyFactory factory = new ProxyFactory(EmptyTargetSource.INSTANCE);
factory.addInterface(webSocketCreatorClass);
factory.addAdvice(new WebSocketCreatorInterceptor(adapter, protocol));
return factory.getProxy();
}


/**
* Proxy for a JettyWebSocketCreator to supply the WebSocket handler and set the sub-protocol.
*/
private static class WebSocketCreatorInterceptor implements MethodInterceptor {

private final JettyWebSocketHandlerAdapter adapter;

@Nullable
private final String protocol;


public WebSocketCreatorInterceptor(
JettyWebSocketHandlerAdapter adapter, @Nullable String protocol) {

this.adapter = adapter;
this.protocol = protocol;
}

@Nullable
@Override
public Object invoke(@NonNull MethodInvocation invocation) {
if (this.protocol != null) {
ReflectionUtils.invokeMethod(
setAcceptedSubProtocol, invocation.getArguments()[2], this.protocol);
}
return this.adapter;
}
}

}

0 comments on commit 513cc15

Please sign in to comment.