Skip to content

Commit

Permalink
#488 Propagate context in websocket handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Oct 31, 2018
2 parents 69c404d + 504ffef commit de6145f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
Expand All @@ -77,6 +78,7 @@
import reactor.netty.http.HttpOperations;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import static reactor.netty.ReactorNetty.format;

Expand Down
58 changes: 51 additions & 7 deletions src/main/java/reactor/netty/http/server/HttpServerOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.util.AsciiString;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
Expand All @@ -68,6 +70,7 @@
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static reactor.netty.ReactorNetty.format;
Expand Down Expand Up @@ -549,13 +552,11 @@ final Mono<Void> withWebsocketSupport(String url,

if (rebind(ops)) {
return FutureMono.from(ops.handshakerResult)
.doOnSuccess(aVoid -> {
//skip handler if no matching subprotocol
if (protocols == null || ops.selectedSubprotocol() != null) {
Mono.fromDirect(websocketHandler.apply(ops, ops))
.doAfterSuccessOrError(ops)
.subscribe();
}
.doOnEach(signal -> {
if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) {
websocketHandler.apply(ops, ops)
.subscribe(new WebsocketSubscriber(ops, signal.getContext()));
}
});
}
}
Expand All @@ -565,6 +566,49 @@ final Mono<Void> withWebsocketSupport(String url,
return Mono.error(new IllegalStateException("Failed to upgrade to websocket"));
}

static final class WebsocketSubscriber implements CoreSubscriber<Void>, ChannelFutureListener {
final WebsocketServerOperations ops;
final Context context;

WebsocketSubscriber(WebsocketServerOperations ops, Context context) {
this.ops = ops;
this.context = context;
}

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Void aVoid) {

}

@Override
public void onError(Throwable t) {
ops.onOutboundError(t);
}

@Override
public void operationComplete(ChannelFuture future) {
ops.terminate();
}

@Override
public void onComplete() {
if (ops.channel()
.isActive()) {
ops.sendCloseNow(null, this);
}
}

@Override
public Context currentContext() {
return context;
}
}

static final Logger log = Loggers.getLogger(HttpServerOperations.class);
final static AsciiString EVENT_STREAM = new AsciiString("text/event-stream");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package reactor.netty.http.server;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -51,7 +50,7 @@
* @author Simon Baslé
*/
final class WebsocketServerOperations extends HttpServerOperations
implements WebsocketInbound, WebsocketOutbound, BiConsumer<Void, Throwable> {
implements WebsocketInbound, WebsocketOutbound {

final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;
Expand Down Expand Up @@ -126,18 +125,6 @@ public void onInboundNext(ChannelHandlerContext ctx, Object frame) {
protected void onOutboundComplete() {
}

@Override
public void accept(Void aVoid, Throwable throwable) {
if (throwable == null) {
if (channel().isActive()) {
sendCloseNow(null, f -> terminate());
}
}
else {
onOutboundError(throwable);
}
}

@Override
protected void onOutboundError(Throwable err) {
if (channel().isActive()) {
Expand Down

0 comments on commit de6145f

Please sign in to comment.