Skip to content

Commit

Permalink
Provide access to netty's ChannelHandlerContext to EgressRequest and …
Browse files Browse the repository at this point in the history
…IngressRequest.

When RxNetty provides a way to get ChannelHandlerContext from HttpServerRequest (Issue 214: ReactiveX/RxNetty#214) we will only need this in EgressRequest.
  • Loading branch information
Nitesh Kant committed Aug 23, 2014
1 parent 8929bb3 commit d84321b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
13 changes: 10 additions & 3 deletions zuul-core/src/main/java/com/netflix/zuul/EgressRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
package com.netflix.zuul;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;

import java.util.Map;

public class EgressRequest<T> {

private HttpClientRequest<ByteBuf> nettyRequest;
private final ChannelHandlerContext channelHandlerContext;
private final T state;

private EgressRequest(HttpClientRequest<ByteBuf> nettyRequest, T state) {
private EgressRequest(HttpClientRequest<ByteBuf> nettyRequest, ChannelHandlerContext channelHandlerContext, T state) {
this.nettyRequest = nettyRequest;
this.channelHandlerContext = channelHandlerContext;
this.state = state;
}

Expand All @@ -38,7 +42,7 @@ public static <T> EgressRequest<T> copiedFrom(IngressRequest ingressReq, T reque
clientReq = clientReq.withHeader(entry.getKey(), entry.getValue());
}
clientReq = clientReq.withContentSource(nettyReq.getContent());
return new EgressRequest<>(clientReq, requestState);
return new EgressRequest<>(clientReq, ingressReq.getNettyChannelContext(), requestState);
}

public void addHeader(String name, String value) {
Expand All @@ -60,5 +64,8 @@ public HttpClientRequest<ByteBuf> getUnderlyingNettyReq() {
public T get() {
return state;
}


public ChannelHandlerContext getNettyChannelContext() {
return channelHandlerContext;
}
}
22 changes: 19 additions & 3 deletions zuul-core/src/main/java/com/netflix/zuul/IngressRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,36 @@
package com.netflix.zuul;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;

public class IngressRequest {

private final HttpServerRequest<ByteBuf> nettyRequest;

private IngressRequest(HttpServerRequest<ByteBuf> nettyRequest) {
// TODO: Remove when HttpServerRequest provides a way to get to the ChanelHandlerContext. Issue: https://github.com/ReactiveX/RxNetty/issues/214
private final ChannelHandlerContext nettyChannelContext;

private IngressRequest(HttpServerRequest<ByteBuf> nettyRequest, ChannelHandlerContext nettyChannelContext) {
this.nettyRequest = nettyRequest;
this.nettyChannelContext = nettyChannelContext;
}

public static IngressRequest from(HttpServerRequest<ByteBuf> nettyRequest, ChannelHandlerContext nettyChannelContext) {
return new IngressRequest(nettyRequest, nettyChannelContext);
}

public static IngressRequest from(HttpServerRequest<ByteBuf> nettyRequest) {
return new IngressRequest(nettyRequest);
/*Visible for testing*/ static IngressRequest from(HttpServerRequest<ByteBuf> nettyRequest) {
return new IngressRequest(nettyRequest, null);
}

/* package-private */ HttpServerRequest<ByteBuf> getNettyRequest() {
return nettyRequest;
}

/* package-private */ ChannelHandlerContext getNettyChannelContext() {
return nettyChannelContext;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public ZuulRouter(FilterProcessor<Request, Response> filterProcessor) {

@Override
public Observable<Void> route(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
final IngressRequest ingressReq = IngressRequest.from(request);
final IngressRequest ingressReq = IngressRequest.from(request, response.getChannelHandlerContext());
return filterProcessor.applyAllFilters(ingressReq).
flatMap(egressResp -> {
response.setStatus(egressResp.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import rx.Observable;
import rx.functions.Func1;

import java.util.Map;
Expand All @@ -38,7 +37,7 @@ public ZuulRxNettyServer(int port, FilterProcessor<Request, Response> filterProc
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port,
(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
final IngressRequest ingressReq = IngressRequest.from(request);
final IngressRequest ingressReq = IngressRequest.from(request, response.getChannelHandlerContext());
return filterProcessor.applyAllFilters(ingressReq).
flatMap(egressResp -> {
response.setStatus(egressResp.getStatus());
Expand Down

0 comments on commit d84321b

Please sign in to comment.