Skip to content

Commit

Permalink
fix #488 Context is now propagated to websocket handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Oct 31, 2018
1 parent 3a5ede1 commit cd48ade
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.util.AsciiString;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.FutureMono;
Expand All @@ -64,6 +66,7 @@
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static reactor.ipc.netty.ReactorNetty.format;
Expand Down Expand Up @@ -518,10 +521,12 @@ final Mono<Void> withWebsocketSupport(String url,

if (replace(ops)) {
return FutureMono.from(ops.handshakerResult)
.doOnSuccess(aVoid ->
Mono.from(websocketHandler.apply(ops, ops))
.doAfterSuccessOrError(ops)
.subscribe());
.doOnEach(signal -> {
if(!signal.hasError()) {
websocketHandler.apply(ops, ops)
.subscribe(new WebsocketSubscriber(ops, signal.getContext()));
}
});
}
}
else {
Expand All @@ -530,6 +535,49 @@ final Mono<Void> withWebsocketSupport(String url,
return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}

static final class WebsocketSubscriber implements CoreSubscriber<Void>, ChannelFutureListener {
final HttpServerWSOperations ops;
final Context context;

WebsocketSubscriber(HttpServerWSOperations ops, Context context) {
this.ops = ops;
this.context = context;
}

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Void aVoid) {

}

@Override
public void onError(Throwable t) {
ops.onOutboundError(t);
}

@Override
public void operationComplete(ChannelFuture future) {
ops.onHandlerTerminate();
}

@Override
public void onComplete() {
if (ops.channel()
.isActive()) {
ops.sendCloseNow(null, this);
}
}

@Override
public Context currentContext() {
return context;
}
}

static final Logger log = Loggers.getLogger(HttpServerOperations.class);

final static AsciiString EVENT_STREAM = new AsciiString("text/event-stream");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package reactor.ipc.netty.http.server;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -48,7 +47,7 @@
* @author Simon Baslé
*/
final class HttpServerWSOperations extends HttpServerOperations
implements WebsocketInbound, WebsocketOutbound, BiConsumer<Void, Throwable> {
implements WebsocketInbound, WebsocketOutbound {

final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
Expand Down Expand Up @@ -115,18 +114,6 @@ public void onInboundNext(ChannelHandlerContext ctx, Object frame) {
protected void onOutboundComplete() {
}

@Override
public void accept(Void aVoid, Throwable throwable) {
if (throwable == null) {
if (channel().isActive()) {
sendCloseNow(null, f -> onHandlerTerminate());
}
}
else {
onOutboundError(throwable);
}
}

@Override
protected void onOutboundError(Throwable err) {
if (channel().isActive()) {
Expand Down
30 changes: 29 additions & 1 deletion src/test/java/reactor/ipc/netty/http/client/WebsocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
import reactor.ipc.netty.resources.PoolResources;
import reactor.test.StepVerifier;
import reactor.util.context.Context;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -682,7 +683,7 @@ public void testClientOnCloseIsInvokedServerInitiatedClose() throws Exception {
}

@Test
public void testIssue444() throws InterruptedException {
public void testIssue444() {
doTestIssue444((in, out) ->
out.sendObject(Flux.error(new Throwable())
.onErrorResume(ex -> out.sendClose(1001, "Going Away"))
Expand Down Expand Up @@ -720,4 +721,31 @@ private void doTestIssue444(BiFunction<WebsocketInbound, WebsocketOutbound, Publ
.expectComplete()
.verify(Duration.ofSeconds(30));
}


@Test
public void testContext() {
httpServer = HttpServer.create(0)
.newHandler((in, out) -> out.sendWebsocket((i, o) ->
o.sendString(Mono.subscriberContext()
.map(ctx -> ctx.getOrDefault(
"test",
"fail")))
).subscriberContext(Context.of("test", "success"))
)
.block(Duration.ofSeconds(30));
assertNotNull(httpServer);

StepVerifier.create(
HttpClient.create(httpServer.address()
.getPort())
.ws("/test")
.flatMapMany(in -> in.receiveWebsocket()
.receive()
.aggregate()
.asString())
)
.expectNext("success")
.verifyComplete();
}
}

0 comments on commit cd48ade

Please sign in to comment.