Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 8, 2020
1 parent ae1bd68 commit 69e0e20
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import javax.annotation.Nullable;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
Expand Down Expand Up @@ -224,10 +223,8 @@ final class PooledConnectionInitializer extends ChannelInitializer<Channel> impl
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);

Channel ch = ctx.channel();
protected void initChannel(Channel ch) {
ch.pipeline().remove(this);

if (log.isDebugEnabled()) {
log.debug(format(ch, "Created a new pooled channel, now {} active connections and {} inactive connections"),
Expand All @@ -245,11 +242,6 @@ public void handlerAdded(ChannelHandlerContext ctx) {
.addFirst(config.channelInitializer(pooledConnection, remoteAddress));
}

@Override
protected void initChannel(Channel ch) {
// noop
}

@Override
public void onComplete() {
// noop
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/reactor/netty/transport/TransportConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final ChannelGroup channelGroup() {
}

public int channelHash() {
return Objects.hash(attrs, channelGroup, localAddress != null ? localAddress.get() : 0, loggingHandler,
return Objects.hash(attrs, channelGroup, bindAddress != null ? bindAddress.get() : 0, loggingHandler,
loopResources, metricsRecorder, observer, options, preferNative);
}

Expand Down
10 changes: 2 additions & 8 deletions src/main/java/reactor/netty/transport/TransportConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ static Mono<Channel> doInitAndRegister(TransportConfig config, ChannelInitialize
Channel channel = null;
try {
channel = channelFactory.newChannel();
if (channelInitializer instanceof TransportServer.Acceptor) {
((TransportServer.Acceptor) channelInitializer).enableAutoReadTask(channel);
if (channelInitializer instanceof TransportServer.AcceptorInitializer) {
((TransportServer.AcceptorInitializer) channelInitializer).acceptor.enableAutoReadTask(channel);
}
channel.pipeline().addLast(channelInitializer);
setChannelOptions(channel, config.options());
Expand Down Expand Up @@ -427,9 +427,6 @@ public boolean tryFailure(Throwable cause) {
}
return true;
}
if (actual != null) {
Operators.onErrorDropped(cause, actual.currentContext());
}
return false;
}

Expand All @@ -447,9 +444,6 @@ public boolean trySuccess(Void result) {
}
return true;
}
if (actual != null) {
Operators.onNextDropped(channel, actual.currentContext());
}
return false;
}

Expand Down
33 changes: 26 additions & 7 deletions src/main/java/reactor/netty/transport/TransportServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand All @@ -48,6 +49,8 @@
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.HttpResources;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -93,7 +96,8 @@ public Mono<? extends DisposableServer> bind() {
DisposableBind disposableServer = new DisposableBind(sink, config, local);
Acceptor acceptor = new Acceptor(config.childEventLoopGroup(), config.channelInitializer(childObs, null),
config.childOptions(), config.childAttributes());
TransportConnector.bind(config, acceptor, local)
ChannelInitializer<Channel> channelInitializer = new AcceptorInitializer(acceptor);
TransportConnector.bind(config, channelInitializer, local)
.subscribe(disposableServer);
});
if (config.doOnBind() != null) {
Expand Down Expand Up @@ -306,7 +310,9 @@ public final T port(int port) {
return bindAddress(() -> AddressUtils.updatePort(configuration().bindAddress(), port));
}

static class Acceptor extends ChannelInitializer<Channel> {
static final Logger log = Loggers.getLogger(TransportServer.class);

static class Acceptor extends ChannelInboundHandlerAdapter {

final EventLoopGroup childGroup;
final ChannelHandler childHandler;
Expand Down Expand Up @@ -357,11 +363,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}

@Override
protected void initChannel(Channel ch) {
// noop
}

void enableAutoReadTask(Channel channel) {

// Task which is scheduled to re-enable auto-read.
Expand All @@ -378,6 +379,20 @@ static void forceClose(Channel child, Throwable t) {
}
}

static final class AcceptorInitializer extends ChannelInitializer<Channel> {

final Acceptor acceptor;

AcceptorInitializer(Acceptor acceptor) {
this.acceptor = acceptor;
}

@Override
public void initChannel(final Channel ch) {
ch.eventLoop().execute(() -> ch.pipeline().addLast(acceptor));
}
}

static final class ChildObserver implements ConnectionObserver {

final ConnectionObserver childObs;
Expand Down Expand Up @@ -438,8 +453,12 @@ public Context currentContext() {
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public final void dispose() {
if (channel != null && channel.isActive()) {
//"FutureReturnValueIgnored" this is deliberate
channel.close();

HttpResources.get()
.disposeWhen(bindAddress);
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/reactor/netty/tcp/TcpServerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ public void exposesRemoteAddress() throws InterruptedException {
return Flux.never();
})
.wiretap(true)
.bindNow();
.bindNow(Duration.ofSeconds(30));

assertNotNull(server);

Connection client = TcpClient.create().port(port)
.handle((in, out) -> out.sendString(Flux.just("Hello World!")))
.wiretap(true)
.connectNow();
.connectNow(Duration.ofSeconds(30));

assertNotNull(client);

Expand Down

0 comments on commit 69e0e20

Please sign in to comment.