Skip to content

Commit

Permalink
Use the new Transport API for UDP transport
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 3, 2020
1 parent e63faa8 commit 438e35e
Show file tree
Hide file tree
Showing 20 changed files with 645 additions and 1,404 deletions.
101 changes: 95 additions & 6 deletions src/main/java/reactor/netty/resources/NewConnectionProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,29 @@
import java.util.function.Supplier;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.resolver.AddressResolverGroup;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.netty.ChannelBindException;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.transport.TransportConfig;
import reactor.netty.transport.TransportConnector;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import javax.annotation.Nullable;

import static reactor.netty.ReactorNetty.format;

/**
Expand Down Expand Up @@ -75,9 +84,33 @@ public Mono<? extends Connection> acquire(Bootstrap b) {
else {
f = bootstrap.bind();
}
DisposableConnect disposableConnect = new DisposableConnect(sink, f, bootstrap);
f.addListener(disposableConnect);
sink.onCancel(disposableConnect);
DisposableConnectChannelFuture disposableConnectChannelFuture = new DisposableConnectChannelFuture(sink, f, bootstrap);
f.addListener(disposableConnectChannelFuture);
sink.onCancel(disposableConnectChannelFuture);
});
}

@Override
@SuppressWarnings("unchecked")
public <CONN> Mono<CONN> acquire(TransportConfig config, @Nullable Supplier<? extends SocketAddress> remoteAddress,
@Nullable AddressResolverGroup<?> resolverGroup) {
return Mono.create(sink -> {
SocketAddress remote = null;
if (remoteAddress != null) {
remote = Objects.requireNonNull(remoteAddress.get(), "Remote Address supplier returned null");
}

ConnectionObserver connectionObserver =
new NewConnectionObserver((MonoSink<Connection>) sink, config.connectionObserver());
DisposableConnect disposableConnect = new DisposableConnect((MonoSink<Connection>) sink, config.localAddress());
if (remote != null) {
TransportConnector.connect(config, remote, resolverGroup, connectionObserver)
.subscribe(disposableConnect);
}
else {
TransportConnector.bind(config, connectionObserver)
.subscribe(disposableConnect);
}
});
}

Expand All @@ -103,15 +136,71 @@ static void convertLazyRemoteAddress(Bootstrap b) {
}


static final class DisposableConnect
implements Disposable, ChannelFutureListener {
static final class DisposableConnect implements CoreSubscriber<Channel>, Disposable {
final MonoSink<Connection> sink;
final Supplier<? extends SocketAddress> localAddress;

Subscription subscription;

DisposableConnect(MonoSink<Connection> sink, @Nullable Supplier<? extends SocketAddress> localAddress) {
this.sink = sink;
this.localAddress = localAddress;
}

@Override
public Context currentContext() {
return sink.currentContext();
}

@Override
public void dispose() {
subscription.cancel();
}

@Override
public void onComplete() {
}

@Override
public void onError(Throwable t) {
if (localAddress != null && (t instanceof BindException ||
// With epoll/kqueue transport it is
// io.netty.channel.unix.Errors$NativeIoException: bind(..) failed: Address already in use
(t instanceof IOException && t.getMessage() != null &&
t.getMessage().contains("Address already in use")))) {
sink.error(ChannelBindException.fail(localAddress.get(), null));
}
else {
sink.error(t);
}
}

@Override
public void onNext(Channel channel) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "Connected new channel"));
}
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
sink.onCancel(this);
s.request(Long.MAX_VALUE);
}
}
}


static final class DisposableConnectChannelFuture implements Disposable, ChannelFutureListener {

final MonoSink<Connection> sink;
final ChannelFuture f;
final Bootstrap bootstrap;


DisposableConnect(MonoSink<Connection> sink, ChannelFuture f, Bootstrap
DisposableConnectChannelFuture(MonoSink<Connection> sink, ChannelFuture f, Bootstrap
bootstrap) {
this.sink = sink;
this.f = f;
Expand Down
Loading

0 comments on commit 438e35e

Please sign in to comment.