From 438e35ef9112db80f9df04bbc93637931a5ccb97 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 3 Apr 2020 15:14:31 +0300 Subject: [PATCH] Use the new Transport API for UDP transport --- .../resources/NewConnectionProvider.java | 101 +++- .../java/reactor/netty/udp/UdpClient.java | 445 ++--------------- .../reactor/netty/udp/UdpClientBootstrap.java | 41 -- .../reactor/netty/udp/UdpClientConfig.java | 147 ++++++ .../reactor/netty/udp/UdpClientConnect.java | 48 -- .../java/reactor/netty/udp/UdpClientDoOn.java | 78 --- .../reactor/netty/udp/UdpClientObserve.java | 43 -- .../reactor/netty/udp/UdpClientOperator.java | 44 -- .../reactor/netty/udp/UdpClientRunOn.java | 63 --- .../java/reactor/netty/udp/UdpServer.java | 447 ++++-------------- .../java/reactor/netty/udp/UdpServerBind.java | 49 -- .../reactor/netty/udp/UdpServerBootstrap.java | 41 -- .../reactor/netty/udp/UdpServerConfig.java | 245 ++++++++++ .../java/reactor/netty/udp/UdpServerDoOn.java | 73 --- .../reactor/netty/udp/UdpServerObserve.java | 43 -- .../reactor/netty/udp/UdpServerOperator.java | 44 -- .../reactor/netty/udp/UdpServerRunOn.java | 63 --- .../java/reactor/netty/udp/UdpClientTest.java | 28 +- .../reactor/netty/udp/UdpMetricsTests.java | 2 +- .../reactor/netty/udp/UdpServerTests.java | 4 +- 20 files changed, 645 insertions(+), 1404 deletions(-) delete mode 100644 src/main/java/reactor/netty/udp/UdpClientBootstrap.java create mode 100644 src/main/java/reactor/netty/udp/UdpClientConfig.java delete mode 100644 src/main/java/reactor/netty/udp/UdpClientConnect.java delete mode 100644 src/main/java/reactor/netty/udp/UdpClientDoOn.java delete mode 100644 src/main/java/reactor/netty/udp/UdpClientObserve.java delete mode 100644 src/main/java/reactor/netty/udp/UdpClientOperator.java delete mode 100644 src/main/java/reactor/netty/udp/UdpClientRunOn.java delete mode 100644 src/main/java/reactor/netty/udp/UdpServerBind.java delete mode 100644 src/main/java/reactor/netty/udp/UdpServerBootstrap.java create mode 100644 src/main/java/reactor/netty/udp/UdpServerConfig.java delete mode 100644 src/main/java/reactor/netty/udp/UdpServerDoOn.java delete mode 100644 src/main/java/reactor/netty/udp/UdpServerObserve.java delete mode 100644 src/main/java/reactor/netty/udp/UdpServerOperator.java delete mode 100644 src/main/java/reactor/netty/udp/UdpServerRunOn.java diff --git a/src/main/java/reactor/netty/resources/NewConnectionProvider.java b/src/main/java/reactor/netty/resources/NewConnectionProvider.java index 83a0f3ef6c..6c7cdf8942 100644 --- a/src/main/java/reactor/netty/resources/NewConnectionProvider.java +++ b/src/main/java/reactor/netty/resources/NewConnectionProvider.java @@ -23,20 +23,29 @@ import java.util.function.Supplier; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.resolver.AddressResolverGroup; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.Operators; import reactor.netty.ChannelBindException; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.channel.BootstrapHandlers; import reactor.netty.channel.ChannelOperations; +import reactor.netty.transport.TransportConfig; +import reactor.netty.transport.TransportConnector; import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.context.Context; +import javax.annotation.Nullable; + import static reactor.netty.ReactorNetty.format; /** @@ -75,9 +84,33 @@ public Mono acquire(Bootstrap b) { else { f = bootstrap.bind(); } - DisposableConnect disposableConnect = new DisposableConnect(sink, f, bootstrap); - f.addListener(disposableConnect); - sink.onCancel(disposableConnect); + DisposableConnectChannelFuture disposableConnectChannelFuture = new DisposableConnectChannelFuture(sink, f, bootstrap); + f.addListener(disposableConnectChannelFuture); + sink.onCancel(disposableConnectChannelFuture); + }); + } + + @Override + @SuppressWarnings("unchecked") + public Mono acquire(TransportConfig config, @Nullable Supplier remoteAddress, + @Nullable AddressResolverGroup resolverGroup) { + return Mono.create(sink -> { + SocketAddress remote = null; + if (remoteAddress != null) { + remote = Objects.requireNonNull(remoteAddress.get(), "Remote Address supplier returned null"); + } + + ConnectionObserver connectionObserver = + new NewConnectionObserver((MonoSink) sink, config.connectionObserver()); + DisposableConnect disposableConnect = new DisposableConnect((MonoSink) sink, config.localAddress()); + if (remote != null) { + TransportConnector.connect(config, remote, resolverGroup, connectionObserver) + .subscribe(disposableConnect); + } + else { + TransportConnector.bind(config, connectionObserver) + .subscribe(disposableConnect); + } }); } @@ -103,15 +136,71 @@ static void convertLazyRemoteAddress(Bootstrap b) { } - static final class DisposableConnect - implements Disposable, ChannelFutureListener { + static final class DisposableConnect implements CoreSubscriber, Disposable { + final MonoSink sink; + final Supplier localAddress; + + Subscription subscription; + + DisposableConnect(MonoSink sink, @Nullable Supplier localAddress) { + this.sink = sink; + this.localAddress = localAddress; + } + + @Override + public Context currentContext() { + return sink.currentContext(); + } + + @Override + public void dispose() { + subscription.cancel(); + } + + @Override + public void onComplete() { + } + + @Override + public void onError(Throwable t) { + if (localAddress != null && (t instanceof BindException || + // With epoll/kqueue transport it is + // io.netty.channel.unix.Errors$NativeIoException: bind(..) failed: Address already in use + (t instanceof IOException && t.getMessage() != null && + t.getMessage().contains("Address already in use")))) { + sink.error(ChannelBindException.fail(localAddress.get(), null)); + } + else { + sink.error(t); + } + } + + @Override + public void onNext(Channel channel) { + if (log.isDebugEnabled()) { + log.debug(format(channel, "Connected new channel")); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(subscription, s)) { + this.subscription = s; + sink.onCancel(this); + s.request(Long.MAX_VALUE); + } + } + } + + + static final class DisposableConnectChannelFuture implements Disposable, ChannelFutureListener { final MonoSink sink; final ChannelFuture f; final Bootstrap bootstrap; - DisposableConnect(MonoSink sink, ChannelFuture f, Bootstrap + DisposableConnectChannelFuture(MonoSink sink, ChannelFuture f, Bootstrap bootstrap) { this.sink = sink; this.f = f; diff --git a/src/main/java/reactor/netty/udp/UdpClient.java b/src/main/java/reactor/netty/udp/UdpClient.java index 6e02043c5f..a325fd3037 100644 --- a/src/main/java/reactor/netty/udp/UdpClient.java +++ b/src/main/java/reactor/netty/udp/UdpClient.java @@ -16,46 +16,26 @@ package reactor.netty.udp; import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.time.Duration; +import java.util.Collections; import java.util.Objects; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.util.AttributeKey; import io.netty.util.NetUtil; import org.reactivestreams.Publisher; -import reactor.core.Exceptions; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.NettyPipeline; -import reactor.netty.channel.BootstrapHandlers; -import reactor.netty.channel.ChannelMetricsRecorder; -import reactor.netty.channel.MicrometerChannelMetricsRecorder; -import reactor.netty.resources.LoopResources; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.transport.TransportClient; import reactor.util.Logger; import reactor.util.Loggers; -import reactor.util.Metrics; import static reactor.netty.ReactorNetty.format; /** * A UdpClient allows to build in a safe immutable way a UDP client that is materialized - * and connecting when {@link #connect(Bootstrap)} is ultimately called. - *

- *

Internally, materialization happens in two phases, first {@link #configure()} is - * called to retrieve a ready to use {@link Bootstrap} then {@link #connect(Bootstrap)} - * is called. + * and connecting when {@link #connect()} is ultimately called. *

*

Example: *

@@ -71,8 +51,11 @@
  * }
  *
  * @author Stephane Maldini
+ * @author Violeta Georgieva
  */
-public abstract class UdpClient {
+public class UdpClient extends TransportClient {
+
+	static final UdpClient INSTANCE = new UdpClient();
 
 	/**
 	 * Prepare a {@link UdpClient}
@@ -80,146 +63,25 @@ public abstract class UdpClient {
 	 * @return a {@link UdpClient}
 	 */
 	public static UdpClient create() {
-		return UdpClientConnect.INSTANCE;
+		return UdpClient.INSTANCE;
 	}
 
-	/**
-	 * The address to which this client should connect on subscribe.
-	 *
-	 * @param connectingAddressSupplier A supplier of the address to connect to.
-	 *
-	 * @return a new {@link UdpClient}
-	 */
-	public final UdpClient addressSupplier(Supplier connectingAddressSupplier) {
-		Objects.requireNonNull(connectingAddressSupplier, "connectingAddressSupplier");
-		return bootstrap(b -> b.remoteAddress(connectingAddressSupplier.get()));
-	}
+	final UdpClientConfig config;
 
-	/**
-	 * Inject default attribute to the future child {@link Channel} connections. They
-	 * will be available via {@link Channel#attr(AttributeKey)}.
-	 *
-	 * @param key the attribute key
-	 * @param value the attribute value
-	 * @param  the attribute type
-	 *
-	 * @return a new {@link UdpClient}
-	 *
-	 * @see Bootstrap#attr(AttributeKey, Object)
-	 */
-	public final  UdpClient attr(AttributeKey key, T value) {
-		Objects.requireNonNull(key, "key");
-		Objects.requireNonNull(value, "value");
-		return bootstrap(b -> b.attr(key, value));
+	UdpClient() {
+		this.config = new UdpClientConfig(
+				ConnectionProvider.newConnection(),
+				Collections.singletonMap(ChannelOption.AUTO_READ, false),
+				() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
 	}
 
-	/**
-	 * Apply {@link Bootstrap} configuration given mapper taking currently configured one
-	 * and returning a new one to be ultimately used for socket binding. 

Configuration - * will apply during {@link #configure()} phase. - * - * @param bootstrapMapper A bootstrap mapping function to update configuration and return an - * enriched bootstrap. - * - * @return a new {@link UdpClient} - */ - public final UdpClient bootstrap(Function bootstrapMapper) { - return new UdpClientBootstrap(this, bootstrapMapper); + UdpClient(UdpClientConfig config) { + this.config = config; } - /** - * Connect the {@link UdpClient} and return a {@link Mono} of {@link Connection}. If - * {@link Mono} is cancelled, the underlying connection will be aborted. Once the {@link - * Connection} has been emitted and is not necessary anymore, disposing main client - * loop must be done by the user via {@link Connection#dispose()}. - * - * If update configuration phase fails, a {@link Mono#error(Throwable)} will be returned - * - * @return a {@link Mono} of {@link Connection} - */ - public final Mono connect() { - Bootstrap b; - try{ - b = configure(); - } - catch (Throwable t){ - Exceptions.throwIfJvmFatal(t); - return Mono.error(t); - } - return connect(b); - } - - /** - * Block the {@link UdpClient} and return a {@link Connection}. Disposing must be - * done by the user via {@link Connection#dispose()}. - * - * @param timeout connect timeout - * - * @return a {@link Mono} of {@link Connection} - */ - public final Connection connectNow(Duration timeout) { - Objects.requireNonNull(timeout, "timeout"); - try { - return Objects.requireNonNull(connect().block(timeout), "aborted"); - } - catch (IllegalStateException e) { - if (e.getMessage().contains("blocking read")) { - throw new IllegalStateException("UdpClient couldn't be started within " - + timeout.toMillis() + "ms"); - } - throw e; - } - } - - /** - * Setup a callback called when {@link io.netty.channel.Channel} is about to - * connect. - * - * @param doOnConnect a consumer observing client start event - * - * @return a new {@link UdpClient} - */ - public final UdpClient doOnConnect(Consumer doOnConnect) { - Objects.requireNonNull(doOnConnect, "doOnConnect"); - return new UdpClientDoOn(this, doOnConnect, null, null); - } - - /** - * Setup a callback called when {@link io.netty.channel.Channel} is - * connected. - * - * @param doOnConnected a consumer observing client started event - * - * @return a new {@link UdpClient} - */ - public final UdpClient doOnConnected(Consumer doOnConnected) { - Objects.requireNonNull(doOnConnected, "doOnConnected"); - return new UdpClientDoOn(this, null, doOnConnected, null); - } - - /** - * Setup a callback called when {@link io.netty.channel.Channel} is - * disconnected. - * - * @param doOnDisconnected a consumer observing client stop event - * - * @return a new {@link UdpClient} - */ - public final UdpClient doOnDisconnected(Consumer doOnDisconnected) { - Objects.requireNonNull(doOnDisconnected, "doOnDisconnected"); - return new UdpClientDoOn(this, null, null, doOnDisconnected); - } - - /** - * The host to which this client should connect. - * - * @param host The host to connect to. - * - * @return a new {@link UdpClient} - */ - public final UdpClient host(String host) { - Objects.requireNonNull(host, "host"); - return bootstrap(b -> b.remoteAddress(host, getPort(b))); + @Override + public UdpClientConfig configuration() { + return config; } /** @@ -232,271 +94,38 @@ public final UdpClient host(String host) { */ public final UdpClient handle(BiFunction> handler) { Objects.requireNonNull(handler, "handler"); - return doOnConnected(c -> { - if (log.isDebugEnabled()) { - log.debug(format(c.channel(), "Handler is being applied: {}"), handler); - } - - Mono.fromDirect(handler.apply((UdpInbound) c, (UdpOutbound) c)) - .subscribe(c.disposeSubscriber()); - }); - } - - /** - * Setup all lifecycle callbacks called on or after {@link io.netty.channel.Channel} - * has been connected and after it has been disconnected. - * - * @param observer a consumer observing state changes - * - * @return a new {@link UdpClient} - */ - public final UdpClient observe(ConnectionObserver observer) { - return new UdpClientObserve(this, observer); - } - - /** - * Set a {@link ChannelOption} value for low level connection settings like {@code SO_TIMEOUT} - * or {@code SO_KEEPALIVE}. This will apply to each new channel from remote peer. - * - * @param key the option key - * @param value the option value - * @param the option type - * - * @return new {@link UdpClient} - * - * @see Bootstrap#option(ChannelOption, Object) - */ - public final UdpClient option(ChannelOption key, T value) { - Objects.requireNonNull(key, "key"); - Objects.requireNonNull(value, "value"); - return bootstrap(b -> b.option(key, value)); + return doOnConnected(new OnConnectedHandle(handler)); } - /** - * The port to which this client should connect. - * - * @param port The port to connect to. - * - * @return a new {@link UdpClient} - */ - public final UdpClient port(int port) { - return bootstrap(b -> b.remoteAddress(getHost(b), port)); + @Override + protected UdpClient duplicate() { + return new UdpClient(new UdpClientConfig(configuration())); } - /** - * Run IO loops on the given {@link EventLoopGroup}. - * - * @param eventLoopGroup an eventLoopGroup to share - * - * @return a new {@link UdpClient} - */ - public final UdpClient runOn(EventLoopGroup eventLoopGroup) { - Objects.requireNonNull(eventLoopGroup, "eventLoopGroup"); - return runOn(preferNative -> eventLoopGroup); - } - - /** - * Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} - * container. Will prefer native (epoll/kqueue) implementation if available unless the - * environment property {@code reactor.netty.native} is set to {@code false}. - * - * @param channelResources a {@link LoopResources} accepting native runtime - * expectation and returning an eventLoopGroup - * - * @return a new {@link UdpClient} - */ - public final UdpClient runOn(LoopResources channelResources) { - return runOn(channelResources, LoopResources.DEFAULT_NATIVE); - } - - /** - * Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} - * container. - * - * @param channelResources a {@link LoopResources} accepting native runtime - * expectation and returning an eventLoopGroup. - * @param preferNative Should the connector prefer native (epoll/kqueue) if available. - * - * @return a new {@link UdpClient} - */ - public final UdpClient runOn(LoopResources channelResources, boolean preferNative) { - return new UdpClientRunOn(this, channelResources, preferNative, null); - } - - /** - * Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} - * container. - * - * @param channelResources a {@link LoopResources} accepting native runtime - * expectation and returning an eventLoopGroup. - * @param family a specific {@link InternetProtocolFamily} to run with - * - * @return a new {@link UdpClient} - */ - public final UdpClient runOn(LoopResources channelResources, InternetProtocolFamily family) { - return new UdpClientRunOn(this, channelResources, false, family); - } - - /** - * Whether to enable metrics to be collected and registered in Micrometer's - * {@link io.micrometer.core.instrument.Metrics#globalRegistry globalRegistry} - * under the name {@link reactor.netty.Metrics#UDP_CLIENT_PREFIX}. Applications can - * separately register their own - * {@link io.micrometer.core.instrument.config.MeterFilter filters} associated with this name. - * For example, to put an upper bound on the number of tags produced: - *

-	 * MeterFilter filter = ... ;
-	 * Metrics.globalRegistry.config().meterFilter(MeterFilter.maximumAllowableTags(UDP_CLIENT_PREFIX, 100, filter));
-	 * 
- *

By default this is not enabled. - * - * @param metricsEnabled true enables metrics collection; false disables it - * @return a new {@link UdpClient} - */ - public final UdpClient metrics(boolean metricsEnabled) { - if (metricsEnabled) { - if (!Metrics.isInstrumentationAvailable()) { - throw new UnsupportedOperationException( - "To enable metrics, you must add the dependency `io.micrometer:micrometer-core`" + - " to the class path first"); - } - - return bootstrap(b -> BootstrapHandlers.updateMetricsSupport(b, MicrometerUdpClientMetricsRecorder.INSTANCE)); - } - else { - return bootstrap(BootstrapHandlers::removeMetricsSupport); - } - } - - /** - * Specifies whether the metrics are enabled on the {@link UdpClient}. - * All generated metrics are provided to the specified recorder - * which is only instantiated if metrics are being enabled. - * - * @param metricsEnabled if true enables the metrics on the client. - * @param recorder a supplier for the {@link ChannelMetricsRecorder} - * @return a new {@link UdpClient} - * @since 0.9.7 - */ - public final UdpClient metrics(boolean metricsEnabled, Supplier recorder) { - if (metricsEnabled) { - Objects.requireNonNull(recorder, "recorder"); - return bootstrap(b -> BootstrapHandlers.updateMetricsSupport(b, recorder.get())); - } - else { - return bootstrap(BootstrapHandlers::removeMetricsSupport); - } - } - - /** - * Apply or remove a wire logger configuration using {@link UdpClient} category - * and {@code DEBUG} logger level - * - * @param enable Specifies whether the wire logger configuration will be added to - * the pipeline - * @return a new {@link UdpClient} - */ - public final UdpClient wiretap(boolean enable) { - if (enable) { - return bootstrap(b -> BootstrapHandlers.updateLogSupport(b, LOGGING_HANDLER)); - } - else { - return bootstrap(b -> BootstrapHandlers.removeConfiguration(b, NettyPipeline.LoggingHandler)); - } - } - - /** - * Apply a wire logger configuration using the specified category - * and {@code DEBUG} logger level - * - * @param category the logger category - * - * @return a new {@link UdpClient} - */ - public final UdpClient wiretap(String category) { - return wiretap(category, LogLevel.DEBUG); - } - - /** - * Apply a wire logger configuration using the specified category - * and logger level - * - * @param category the logger category - * @param level the logger level - * - * @return a new {@link UdpClient} - */ - public final UdpClient wiretap(String category, LogLevel level) { - Objects.requireNonNull(category, "category"); - Objects.requireNonNull(level, "level"); - return bootstrap(b -> BootstrapHandlers.updateLogSupport(b, - new LoggingHandler(category, level))); - } - - /** - * Materialize a Bootstrap from the parent {@link UdpClient} chain to use with {@link - * #connect(Bootstrap)} or separately - * - * @return a configured {@link Bootstrap} - */ - protected Bootstrap configure() { - return DEFAULT_BOOTSTRAP.clone(); - } - - /** - * Connect the {@link UdpClient} and return a {@link Mono} of {@link Connection} - * - * @param b the {@link Bootstrap} to connect - * - * @return a {@link Mono} of {@link Connection} - */ - protected abstract Mono connect(Bootstrap b); - - /** * The default port for reactor-netty servers. Defaults to 12012 but can be tuned via * the {@code PORT} environment variable. */ - static final int DEFAULT_PORT = - System.getenv("PORT") != null ? Integer.parseInt(System.getenv("PORT")) : - 12012; + static final int DEFAULT_PORT = System.getenv("PORT") != null ? Integer.parseInt(System.getenv("PORT")) : 12012; - static final Bootstrap DEFAULT_BOOTSTRAP = - new Bootstrap().option(ChannelOption.AUTO_READ, false) - .remoteAddress(NetUtil.LOCALHOST, DEFAULT_PORT); + static final Logger log = Loggers.getLogger(UdpClient.class); - static { - BootstrapHandlers.channelOperationFactory(DEFAULT_BOOTSTRAP, (ch, c, msg) -> new UdpOperations(ch, c)); - } - - static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(UdpClient.class); - static final Logger log = Loggers.getLogger(UdpClient.class); + static final class OnConnectedHandle implements Consumer { - static String getHost(Bootstrap b) { - if (b.config() - .remoteAddress() instanceof InetSocketAddress) { - return ((InetSocketAddress) b.config() - .remoteAddress()).getHostString(); - } - return NetUtil.LOCALHOST.getHostAddress(); - } + final BiFunction> handler; - static int getPort(Bootstrap b) { - if (b.config() - .remoteAddress() instanceof InetSocketAddress) { - return ((InetSocketAddress) b.config() - .remoteAddress()).getPort(); + OnConnectedHandle(BiFunction> handler) { + this.handler = handler; } - return DEFAULT_PORT; - } - - static final class MicrometerUdpClientMetricsRecorder extends MicrometerChannelMetricsRecorder { - static final MicrometerUdpClientMetricsRecorder INSTANCE = - new MicrometerUdpClientMetricsRecorder(reactor.netty.Metrics.UDP_CLIENT_PREFIX, "udp"); + @Override + public void accept(Connection c) { + if (log.isDebugEnabled()) { + log.debug(format(c.channel(), "Handler is being applied: {}"), handler); + } - MicrometerUdpClientMetricsRecorder(String name, String protocol) { - super(name, protocol); + Mono.fromDirect(handler.apply((UdpInbound) c, (UdpOutbound) c)) + .subscribe(c.disposeSubscriber()); } } } diff --git a/src/main/java/reactor/netty/udp/UdpClientBootstrap.java b/src/main/java/reactor/netty/udp/UdpClientBootstrap.java deleted file mode 100644 index 2c3e42837a..0000000000 --- a/src/main/java/reactor/netty/udp/UdpClientBootstrap.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; -import java.util.function.Function; - -import io.netty.bootstrap.Bootstrap; - -/** - * @author Stephane Maldini - */ -final class UdpClientBootstrap extends UdpClientOperator { - - - final Function bootstrapMapper; - - UdpClientBootstrap(UdpClient client, - Function bootstrapMapper) { - super(client); - this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper"); - } - - @Override - protected Bootstrap configure() { - return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper"); - } -} diff --git a/src/main/java/reactor/netty/udp/UdpClientConfig.java b/src/main/java/reactor/netty/udp/UdpClientConfig.java new file mode 100644 index 0000000000..4bfe3d7bbe --- /dev/null +++ b/src/main/java/reactor/netty/udp/UdpClientConfig.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.udp; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ReflectiveChannelFactory; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.handler.logging.LoggingHandler; +import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; +import reactor.netty.channel.ChannelMetricsRecorder; +import reactor.netty.channel.ChannelOperations; +import reactor.netty.channel.MicrometerChannelMetricsRecorder; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; +import reactor.netty.transport.TransportClientConfig; + +import javax.annotation.Nullable; +import java.net.SocketAddress; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * Encapsulate all necessary configuration for UDP client transport. + * + * @author Violeta Georgieva + * @since 1.0.0 + */ +public final class UdpClientConfig extends TransportClientConfig { + + UdpClientConfig(ConnectionProvider connectionProvider, Map, ?> options, + Supplier remoteAddress) { + super(connectionProvider, options, remoteAddress); + } + + UdpClientConfig(UdpClientConfig parent) { + super(parent); + } + + @Override + protected ChannelOperations.OnSetup channelOperationsProvider() { + return DEFAULT_OPS; + } + + @Override + protected ChannelFactory connectionFactory(EventLoopGroup elg) { + LoopResources loopResources = loopResources() != null ? loopResources() : defaultLoopResources(); + ChannelFactory channelFactory; + if (isPreferNative()) { + channelFactory = new ReflectiveChannelFactory<>(loopResources.onDatagramChannel(elg)); + } + else { + channelFactory = () -> new NioDatagramChannel(family()); + } + return channelFactory; + } + + @Override + protected LoggingHandler defaultLoggingHandler() { + return LOGGING_HANDLER; + } + + @Override + protected LoopResources defaultLoopResources() { + return UdpResources.get(); + } + + @Override + protected ChannelMetricsRecorder defaultMetricsRecorder() { + return MicrometerUdpClientMetricsRecorder.INSTANCE; + } + + @Override + protected EventLoopGroup eventLoopGroup() { + LoopResources loopResources = loopResources() != null ? loopResources() : defaultLoopResources(); + return loopResources.onClient(isPreferNative()); + } + + @Nullable + @Override + protected ConnectionObserver lifecycleObserver() { + if (doOnConnected() == null && doOnDisconnected() == null) { + return null; + } + return new UdpClientDoOn(doOnConnected(), doOnDisconnected()); + } + + static final ChannelOperations.OnSetup DEFAULT_OPS = (ch, c, msg) -> new UdpOperations(ch, c); + + static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(UdpClient.class); + + static final class MicrometerUdpClientMetricsRecorder extends MicrometerChannelMetricsRecorder { + + static final MicrometerUdpClientMetricsRecorder INSTANCE = + new MicrometerUdpClientMetricsRecorder(reactor.netty.Metrics.UDP_CLIENT_PREFIX, "udp"); + + MicrometerUdpClientMetricsRecorder(String name, String protocol) { + super(name, protocol); + } + } + + static final class UdpClientDoOn implements ConnectionObserver { + + Consumer doOnConnected; + Consumer doOnDisconnected; + + UdpClientDoOn(@Nullable Consumer doOnConnected, + @Nullable Consumer doOnDisconnected) { + this.doOnConnected = doOnConnected; + this.doOnDisconnected = doOnDisconnected; + } + + @Override + public void onStateChange(Connection connection, State newState) { + if (doOnConnected != null && newState == State.CONFIGURED) { + doOnConnected.accept(connection); + return; + } + if (doOnDisconnected != null) { + if (newState == State.DISCONNECTING) { + connection.onDispose(() -> doOnDisconnected.accept(connection)); + } + else if (newState == State.RELEASED) { + doOnDisconnected.accept(connection); + } + } + } + } +} diff --git a/src/main/java/reactor/netty/udp/UdpClientConnect.java b/src/main/java/reactor/netty/udp/UdpClientConnect.java deleted file mode 100644 index 48bc8aa919..0000000000 --- a/src/main/java/reactor/netty/udp/UdpClientConnect.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.EventLoopGroup; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; -import reactor.netty.resources.ConnectionProvider; -import reactor.netty.resources.LoopResources; - -/** - * @author Stephane Maldini - */ -final class UdpClientConnect extends UdpClient { - - static final UdpClientConnect INSTANCE = new UdpClientConnect(); - - @Override - protected Mono connect(Bootstrap b) { - //Default group and channel - if (b.config() - .group() == null) { - - UdpResources loopResources = UdpResources.get(); - EventLoopGroup elg = loopResources.onClient(LoopResources.DEFAULT_NATIVE); - - b.group(elg) - .channel(loopResources.onDatagramChannel(elg)); - } - - return ConnectionProvider.newConnection() - .acquire(b); - } -} diff --git a/src/main/java/reactor/netty/udp/UdpClientDoOn.java b/src/main/java/reactor/netty/udp/UdpClientDoOn.java deleted file mode 100644 index 71bacfc628..0000000000 --- a/src/main/java/reactor/netty/udp/UdpClientDoOn.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.function.Consumer; -import javax.annotation.Nullable; - -import io.netty.bootstrap.Bootstrap; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.BootstrapHandlers; - -/** - * @author Stephane Maldini - */ -final class UdpClientDoOn extends UdpClientOperator implements ConnectionObserver { - - final Consumer onConnect; - final Consumer onConnected; - final Consumer onDisconnected; - - UdpClientDoOn(UdpClient client, - @Nullable Consumer onConnect, - @Nullable Consumer onConnected, - @Nullable Consumer onDisconnected) { - super(client); - this.onConnect = onConnect; - this.onConnected = onConnected; - this.onDisconnected = onDisconnected; - } - - @Override - public Bootstrap configure() { - Bootstrap b = source.configure(); - ConnectionObserver observer = BootstrapHandlers.connectionObserver(b); - BootstrapHandlers.connectionObserver(b, observer.then(this)); - return b; - } - - @Override - public Mono connect(Bootstrap b) { - if (onConnect != null) { - return source.connect(b) - .doOnSubscribe(s -> onConnect.accept(b)); - } - return source.connect(b); - } - - @Override - public void onStateChange(Connection connection, State newState) { - if (onConnected != null && newState == State.CONFIGURED) { - onConnected.accept(connection); - return; - } - if (onDisconnected != null) { - if (newState == State.DISCONNECTING) { - connection.onDispose(() -> onDisconnected.accept(connection)); - } - else if (newState == State.RELEASED) { - onDisconnected.accept(connection); - } - } - } -} diff --git a/src/main/java/reactor/netty/udp/UdpClientObserve.java b/src/main/java/reactor/netty/udp/UdpClientObserve.java deleted file mode 100644 index 04730524e4..0000000000 --- a/src/main/java/reactor/netty/udp/UdpClientObserve.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; - -import io.netty.bootstrap.Bootstrap; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.BootstrapHandlers; - -/** - * @author Stephane Maldini - */ -final class UdpClientObserve extends UdpClientOperator { - - final ConnectionObserver observer; - - UdpClientObserve(UdpClient client, ConnectionObserver observer) { - super(client); - this.observer = Objects.requireNonNull(observer, "observer"); - } - - @Override - public Bootstrap configure() { - Bootstrap b = source.configure(); - ConnectionObserver observer = BootstrapHandlers.connectionObserver(b); - BootstrapHandlers.connectionObserver(b, observer.then(this.observer)); - return b; - } -} diff --git a/src/main/java/reactor/netty/udp/UdpClientOperator.java b/src/main/java/reactor/netty/udp/UdpClientOperator.java deleted file mode 100644 index ec9ec0e2c3..0000000000 --- a/src/main/java/reactor/netty/udp/UdpClientOperator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; - -import io.netty.bootstrap.Bootstrap; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; - -/** - * @author Stephane Maldini - */ -abstract class UdpClientOperator extends UdpClient { - - final UdpClient source; - - UdpClientOperator(UdpClient source) { - this.source = Objects.requireNonNull(source, "source"); - } - - @Override - protected Bootstrap configure() { - return source.configure(); - } - - @Override - protected Mono connect(Bootstrap b) { - return source.connect(b); - } -} diff --git a/src/main/java/reactor/netty/udp/UdpClientRunOn.java b/src/main/java/reactor/netty/udp/UdpClientRunOn.java deleted file mode 100644 index 7af2a19add..0000000000 --- a/src/main/java/reactor/netty/udp/UdpClientRunOn.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; -import javax.annotation.Nullable; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.channel.socket.nio.NioDatagramChannel; -import reactor.netty.resources.LoopResources; - -/** - * @author Stephane Maldini - */ -final class UdpClientRunOn extends UdpClientOperator { - - final LoopResources loopResources; - final boolean preferNative; - final InternetProtocolFamily family; - - UdpClientRunOn(UdpClient server, - LoopResources loopResources, - boolean preferNative, - @Nullable InternetProtocolFamily family) { - super(server); - this.loopResources = Objects.requireNonNull(loopResources, "loopResources"); - this.preferNative = preferNative; - this.family = family; - } - - @Override - protected Bootstrap configure() { - Bootstrap b = source.configure(); - - boolean useNative = family == null && preferNative; - - EventLoopGroup elg = loopResources.onClient(useNative); - - if (useNative) { - b.channel(loopResources.onDatagramChannel(elg)); - } - else { - b.channelFactory(() -> new NioDatagramChannel(family)); - } - - return b.group(elg); - } -} diff --git a/src/main/java/reactor/netty/udp/UdpServer.java b/src/main/java/reactor/netty/udp/UdpServer.java index 6e8f26dfd2..3ab5a7902d 100644 --- a/src/main/java/reactor/netty/udp/UdpServer.java +++ b/src/main/java/reactor/netty/udp/UdpServer.java @@ -16,46 +16,29 @@ package reactor.netty.udp; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.time.Duration; +import java.util.Collections; import java.util.Objects; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.util.AttributeKey; import io.netty.util.NetUtil; import org.reactivestreams.Publisher; -import reactor.core.Exceptions; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.NettyPipeline; -import reactor.netty.channel.BootstrapHandlers; -import reactor.netty.channel.ChannelMetricsRecorder; -import reactor.netty.channel.MicrometerChannelMetricsRecorder; -import reactor.netty.resources.LoopResources; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.transport.AddressUtils; +import reactor.netty.transport.Transport; import reactor.util.Logger; import reactor.util.Loggers; -import reactor.util.Metrics; import static reactor.netty.ReactorNetty.format; /** * A UdpServer allows to build in a safe immutable way a UDP server that is materialized - * and connecting when {@link #bind(Bootstrap)} is ultimately called. - *

- *

Internally, materialization happens in two phases, first {@link #configure()} is - * called to retrieve a ready to use {@link Bootstrap} then {@link #bind(Bootstrap)} - * is called. + * and connecting when {@link #bind()} is ultimately called. *

*

Example: *

@@ -71,8 +54,11 @@
  * }
  *
  * @author Stephane Maldini
+ * @author Violeta Georgieva
  */
-public abstract class UdpServer {
+public class UdpServer extends Transport {
+
+	static final UdpServer INSTANCE = new UdpServer();
 
 	/**
 	 * Prepare a {@link UdpServer}
@@ -80,135 +66,130 @@ public abstract class UdpServer {
 	 * @return a {@link UdpServer}
 	 */
 	public static UdpServer create() {
-		return UdpServerBind.INSTANCE;
+		return UdpServer.INSTANCE;
 	}
 
-	/**
-	 * The address to which this server should bind on subscribe.
-	 *
-	 * @param bindingAddressSupplier A supplier of the address to bind to.
-	 *
-	 * @return a new {@link UdpServer}
-	 */
-	public final UdpServer addressSupplier(Supplier bindingAddressSupplier) {
-		Objects.requireNonNull(bindingAddressSupplier, "bindingAddressSupplier");
-		return bootstrap(b -> b.localAddress(bindingAddressSupplier.get()));
-	}
+	final UdpServerConfig config;
 
-	/**
-	 * Inject default attribute to the future child {@link Channel} connections. They
-	 * will be available via {@link Channel#attr(AttributeKey)}.
-	 *
-	 * @param key the attribute key
-	 * @param value the attribute value
-	 * @param  the attribute type
-	 *
-	 * @return a new {@link UdpServer}
-	 *
-	 * @see Bootstrap#attr(AttributeKey, Object)
-	 */
-	public final  UdpServer attr(AttributeKey key, T value) {
-		Objects.requireNonNull(key, "key");
-		Objects.requireNonNull(value, "value");
-		return bootstrap(b -> b.attr(key, value));
+	UdpServer() {
+		this.config = new UdpServerConfig(
+				Collections.singletonMap(ChannelOption.AUTO_READ, false),
+				() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
 	}
 
-	/**
-	 * Apply {@link Bootstrap} configuration given mapper taking currently configured one
-	 * and returning a new one to be ultimately used for socket binding. 

Configuration - * will apply during {@link #configure()} phase. - * - * @param bootstrapMapper A bootstrap mapping function to update configuration and return an - * enriched bootstrap. - * - * @return a new {@link UdpServer} - */ - public final UdpServer bootstrap(Function bootstrapMapper) { - return new UdpServerBootstrap(this, bootstrapMapper); + UdpServer(UdpServerConfig config) { + this.config = config; } /** - * Bind the {@link UdpServer} and return a {@link Mono} of {@link Connection}. If + * Binds the {@link UdpServer} and returns a {@link Mono} of {@link Connection}. If * {@link Mono} is cancelled, the underlying binding will be aborted. Once the {@link - * Connection} has been emitted and is not necessary anymore, disposing main server + * Connection} has been emitted and is not necessary anymore, disposing the main server * loop must be done by the user via {@link Connection#dispose()}. * - * If update configuration phase fails, a {@link Mono#error(Throwable)} will be returned - * * @return a {@link Mono} of {@link Connection} */ - public final Mono bind() { - Bootstrap b; - try{ - b = configure(); + public Mono bind() { + UdpServer dup; + ConnectionObserver lifecycleObserver = config.lifecycleObserver(); + if (lifecycleObserver != null) { + dup = observe(lifecycleObserver); } - catch (Throwable t){ - Exceptions.throwIfJvmFatal(t); - return Mono.error(t); + else { + dup = duplicate(); + } + + UdpServerConfig config = dup.configuration(); + Mono mono = ConnectionProvider.newConnection() + .acquire(config, null, null); + if (config.doOnBind() != null) { + mono = mono.doOnSubscribe(s -> config.doOnBind().accept(config)); } - return bind(b); + return mono; } /** - * Start a Server in a blocking fashion, and wait for it to finish initializing. The + * Starts the server in a blocking fashion, and waits for it to finish initializing + * or the startup timeout expires (the startup timeout is {@code 45} seconds). The * returned {@link Connection} offers simple server API, including to {@link * Connection#disposeNow()} shut it down in a blocking fashion. * - * @param timeout max startup timeout + * @return a {@link Connection} + */ + public final Connection bindNow() { + return bindNow(Duration.ofSeconds(45)); + } + + /** + * Start the server in a blocking fashion, and wait for it to finish initializing + * or the provided startup timeout expires. The returned {@link Connection} + * offers simple server API, including to {@link Connection#disposeNow()} + * shut it down in a blocking fashion. * + * @param timeout max startup timeout * @return a {@link Connection} */ public final Connection bindNow(Duration timeout) { + Objects.requireNonNull(timeout, "timeout"); try { return Objects.requireNonNull(bind().block(timeout), "aborted"); } catch (IllegalStateException e) { if (e.getMessage().contains("blocking read")) { - throw new IllegalStateException("UdpServer couldn't be started within " - + timeout.toMillis() + "ms"); + throw new IllegalStateException("UdpServer couldn't be started within " + timeout.toMillis() + "ms"); } throw e; } } + @Override + public UdpServerConfig configuration() { + return config; + } + /** - * Setup a callback called when {@link io.netty.channel.Channel} is about to - * bind. - * - * @param doOnBind a consumer observing server start event + * Set or add a callback called when {@link UdpServer} is about to start listening for incoming traffic. * - * @return a new {@link UdpServer} + * @param doOnBind a consumer observing connected events + * @return a new {@link UdpServer} reference */ - public final UdpServer doOnBind(Consumer doOnBind) { + public final UdpServer doOnBind(Consumer doOnBind) { Objects.requireNonNull(doOnBind, "doOnBind"); - return new UdpServerDoOn(this, doOnBind, null, null); - + UdpServer dup = duplicate(); + @SuppressWarnings("unchecked") + Consumer current = (Consumer) dup.configuration().doOnBind; + dup.configuration().doOnBind = current == null ? doOnBind : current.andThen(doOnBind); + return dup; } /** - * Setup a callback called when {@link io.netty.channel.Channel} is - * bound. + * Set or add a callback called after {@link UdpServer} has been started. * - * @param doOnBound a consumer observing server started event - * - * @return a new {@link UdpServer} + * @param doOnBound a consumer observing connected events + * @return a new {@link UdpServer} reference */ public final UdpServer doOnBound(Consumer doOnBound) { Objects.requireNonNull(doOnBound, "doOnBound"); - return new UdpServerDoOn(this, null, doOnBound, null); + UdpServer dup = duplicate(); + @SuppressWarnings("unchecked") + Consumer current = (Consumer) dup.configuration().doOnBound; + dup.configuration().doOnBound = current == null ? doOnBound : current.andThen(doOnBound); + return dup; } /** - * Setup a callback called when {@link io.netty.channel.Channel} is - * unbound. + * Set or add a callback called after {@link UdpServer} has been shutdown. * - * @param doOnUnbound a consumer observing server stop event - * - * @return a new {@link UdpServer} + * @param doOnUnbound a consumer observing unbound events + * @return a new {@link UdpServer} reference */ public final UdpServer doOnUnbound(Consumer doOnUnbound) { Objects.requireNonNull(doOnUnbound, "doOnUnbound"); - return new UdpServerDoOn(this, null, null, doOnUnbound); + UdpServer dup = duplicate(); + @SuppressWarnings("unchecked") + Consumer current = (Consumer) dup.configuration().doOnUnbound; + dup.configuration().doOnUnbound = current == null ? doOnUnbound : current.andThen(doOnUnbound); + return dup; } /** @@ -221,282 +202,58 @@ public final UdpServer doOnUnbound(Consumer doOnUnbound) { */ public final UdpServer handle(BiFunction> handler) { Objects.requireNonNull(handler, "handler"); - return doOnBound(c -> { - if (log.isDebugEnabled()) { - log.debug(format(c.channel(), "Handler is being applied: {}"), handler); - } - - Mono.fromDirect(handler.apply((UdpInbound) c, (UdpOutbound) c)) - .subscribe(c.disposeSubscriber()); - }); + return doOnBound(new OnBoundHandle(handler)); } /** * The host to which this server should bind. * - * @param host The host to bind to. - * - * @return a new {@link UdpServer} + * @param host the host to bind to. + * @return a new {@link UdpServer} reference */ public final UdpServer host(String host) { - Objects.requireNonNull(host, "host"); - return bootstrap(b -> b.localAddress(host, getPort(b))); - } - - /** - * Setup all lifecycle callbacks called on or after {@link io.netty.channel.Channel} - * has been connected and after it has been disconnected. - * - * @param observer a consumer observing state changes - * - * @return a new {@link UdpServer} - */ - public final UdpServer observe(ConnectionObserver observer) { - return new UdpServerObserve(this, observer); - } - - /** - * Set a {@link ChannelOption} value for low level connection settings like {@code SO_TIMEOUT} - * or {@code SO_KEEPALIVE}. This will apply to each new channel from remote peer. - * - * @param key the option key - * @param value the option value - * @param the option type - * - * @return new {@link UdpServer} - * - * @see Bootstrap#option(ChannelOption, Object) - */ - public final UdpServer option(ChannelOption key, T value) { - Objects.requireNonNull(key, "key"); - Objects.requireNonNull(value, "value"); - return bootstrap(b -> b.option(key, value)); + return localAddress(() -> AddressUtils.updateHost(configuration().localAddress(), host)); } /** * The port to which this server should bind. * * @param port The port to bind to. - * - * @return a new {@link UdpServer} + * @return a new {@link UdpServer} reference */ public final UdpServer port(int port) { - return bootstrap(b -> b.localAddress(getHost(b), port)); - } - - /** - * Run IO loops on the given {@link EventLoopGroup}. - * - * @param eventLoopGroup an eventLoopGroup to share - * - * @return a new {@link UdpServer} - */ - public final UdpServer runOn(EventLoopGroup eventLoopGroup) { - Objects.requireNonNull(eventLoopGroup, "eventLoopGroup"); - return runOn(preferNative -> eventLoopGroup); - } - - /** - * Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} - * container. Will prefer native (epoll/kqueue) implementation if available unless the - * environment property {@code reactor.netty.native} is set to {@code false}. - * - * @param channelResources a {@link LoopResources} accepting native runtime - * expectation and returning an eventLoopGroup - * - * @return a new {@link UdpServer} - */ - public final UdpServer runOn(LoopResources channelResources) { - return runOn(channelResources, LoopResources.DEFAULT_NATIVE); + return localAddress(() -> AddressUtils.updatePort(configuration().localAddress(), port)); } - /** - * Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} - * container. - * - * @param channelResources a {@link LoopResources} accepting native runtime - * expectation and returning an eventLoopGroup. - * @param preferNative Should the connector prefer native (epoll/kqueue) if available. - * - * @return a new {@link UdpServer} - */ - public final UdpServer runOn(LoopResources channelResources, boolean preferNative) { - return new UdpServerRunOn(this, channelResources, preferNative, null); - } - - /** - * Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources} - * container. - * - * @param channelResources a {@link LoopResources} accepting native runtime - * expectation and returning an eventLoopGroup. - * @param family a specific {@link InternetProtocolFamily} to run with - * - * @return a new {@link UdpServer} - */ - public final UdpServer runOn(LoopResources channelResources, InternetProtocolFamily family) { - return new UdpServerRunOn(this, channelResources, false, family); + @Override + protected UdpServer duplicate() { + return new UdpServer(new UdpServerConfig(configuration())); } - /** - * Whether to enable metrics to be collected and registered in Micrometer's - * {@link io.micrometer.core.instrument.Metrics#globalRegistry globalRegistry} - * under the name {@link reactor.netty.Metrics#UDP_SERVER_PREFIX}. Applications can - * separately register their own - * {@link io.micrometer.core.instrument.config.MeterFilter filters} associated with this name. - * For example, to put an upper bound on the number of tags produced: - *

-	 * MeterFilter filter = ... ;
-	 * Metrics.globalRegistry.config().meterFilter(MeterFilter.maximumAllowableTags(UDP_SERVER_PREFIX, 100, filter));
-	 * 
- *

By default this is not enabled. - * - * @param metricsEnabled true enables metrics collection; false disables it - * @return a new {@link UdpServer} - */ - public final UdpServer metrics(boolean metricsEnabled) { - if (metricsEnabled) { - if (!Metrics.isInstrumentationAvailable()) { - throw new UnsupportedOperationException( - "To enable metrics, you must add the dependency `io.micrometer:micrometer-core`" + - " to the class path first"); - } - - return bootstrap(b -> BootstrapHandlers.updateMetricsSupport(b, MicrometerUdpServerMetricsRecorder.INSTANCE)); - } - else { - return bootstrap(BootstrapHandlers::removeMetricsSupport); - } - } - - /** - * Specifies whether the metrics are enabled on the {@link UdpServer}. - * All generated metrics are provided to the specified recorder - * which is only instantiated if metrics are being enabled. - * - * @param metricsEnabled if true enables the metrics on the server. - * @param recorder a supplier for the {@link ChannelMetricsRecorder} - * @return a new {@link UdpServer} - * @since 0.9.7 - */ - public final UdpServer metrics(boolean metricsEnabled, Supplier recorder) { - if (metricsEnabled) { - Objects.requireNonNull(recorder, "recorder"); - return bootstrap(b -> BootstrapHandlers.updateMetricsSupport(b, recorder.get())); - } - else { - return bootstrap(BootstrapHandlers::removeMetricsSupport); - } - } - - /** - * Apply or remove a wire logger configuration using {@link UdpServer} category - * and {@code DEBUG} logger level - * - * @param enable Specifies whether the wire logger configuration will be added to - * the pipeline - * @return a new {@link UdpServer} - */ - public final UdpServer wiretap(boolean enable) { - if (enable) { - return bootstrap(b -> BootstrapHandlers.updateLogSupport(b, LOGGING_HANDLER)); - } - else { - return bootstrap(b -> BootstrapHandlers.removeConfiguration(b, NettyPipeline.LoggingHandler)); - } - } - - /** - * Apply a wire logger configuration using the specified category - * and {@code DEBUG} logger level - * - * @param category the logger category - * - * @return a new {@link UdpServer} - */ - public final UdpServer wiretap(String category) { - return wiretap(category, LogLevel.DEBUG); - } - - /** - * Apply a wire logger configuration using the specified category - * and logger level - * - * @param category the logger category - * @param level the logger level - * - * @return a new {@link UdpServer} - */ - public final UdpServer wiretap(String category, LogLevel level) { - Objects.requireNonNull(category, "category"); - Objects.requireNonNull(level, "level"); - return bootstrap(b -> BootstrapHandlers.updateLogSupport(b, - new LoggingHandler(category, level))); - } - - /** - * Materialize a Bootstrap from the parent {@link UdpServer} chain to use with {@link - * #bind(Bootstrap)} or separately - * - * @return a configured {@link Bootstrap} - */ - protected Bootstrap configure() { - return DEFAULT_BOOTSTRAP.clone(); - } - - /** - * Bind the {@link UdpServer} and return a {@link Mono} of {@link Connection} - * - * @param b the {@link Bootstrap} to bind - * - * @return a {@link Mono} of {@link Connection} - */ - protected abstract Mono bind(Bootstrap b); - /** * The default port for reactor-netty servers. Defaults to 12012 but can be tuned via * the {@code PORT} environment variable. */ - static final int DEFAULT_PORT = - System.getenv("PORT") != null ? Integer.parseInt(System.getenv("PORT")) : - 12012; + static final int DEFAULT_PORT = System.getenv("PORT") != null ? Integer.parseInt(System.getenv("PORT")) : 12012; - static final Bootstrap DEFAULT_BOOTSTRAP = - new Bootstrap().option(ChannelOption.AUTO_READ, false) - .localAddress(NetUtil.LOCALHOST, DEFAULT_PORT); + static final Logger log = Loggers.getLogger(UdpServer.class); - static { - BootstrapHandlers.channelOperationFactory(DEFAULT_BOOTSTRAP, (ch, c, msg) -> new UdpOperations(ch, c)); - } + static final class OnBoundHandle implements Consumer { - static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(UdpServer.class); - static final Logger log = Loggers.getLogger(UdpServer.class); + final BiFunction> handler; - static String getHost(Bootstrap b) { - if (b.config() - .localAddress() instanceof InetSocketAddress) { - return ((InetSocketAddress) b.config() - .localAddress()).getHostString(); + OnBoundHandle(BiFunction> handler) { + this.handler = handler; } - return NetUtil.LOCALHOST.getHostAddress(); - } - static int getPort(Bootstrap b) { - if (b.config() - .localAddress() instanceof InetSocketAddress) { - return ((InetSocketAddress) b.config() - .localAddress()).getPort(); - } - return DEFAULT_PORT; - } - - static final class MicrometerUdpServerMetricsRecorder extends MicrometerChannelMetricsRecorder { - - static final MicrometerUdpServerMetricsRecorder INSTANCE = - new MicrometerUdpServerMetricsRecorder(reactor.netty.Metrics.UDP_SERVER_PREFIX, "udp"); + @Override + public void accept(Connection c) { + if (log.isDebugEnabled()) { + log.debug(format(c.channel(), "Handler is being applied: {}"), handler); + } - MicrometerUdpServerMetricsRecorder(String name, String protocol) { - super(name, protocol); + Mono.fromDirect(handler.apply((UdpInbound) c, (UdpOutbound) c)) + .subscribe(c.disposeSubscriber()); } } } diff --git a/src/main/java/reactor/netty/udp/UdpServerBind.java b/src/main/java/reactor/netty/udp/UdpServerBind.java deleted file mode 100644 index 521e8cea6b..0000000000 --- a/src/main/java/reactor/netty/udp/UdpServerBind.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.EventLoopGroup; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; -import reactor.netty.resources.ConnectionProvider; -import reactor.netty.resources.LoopResources; - -/** - * @author Stephane Maldini - */ -final class UdpServerBind extends UdpServer { - - static final UdpServerBind INSTANCE = new UdpServerBind(); - - @Override - protected Mono bind(Bootstrap b) { - - //Default group and channel - if (b.config() - .group() == null) { - - UdpResources loopResources = UdpResources.get(); - EventLoopGroup elg = loopResources.onClient(LoopResources.DEFAULT_NATIVE); - - b.group(elg) - .channel(loopResources.onDatagramChannel(elg)); - } - - return ConnectionProvider.newConnection() - .acquire(b); - } -} diff --git a/src/main/java/reactor/netty/udp/UdpServerBootstrap.java b/src/main/java/reactor/netty/udp/UdpServerBootstrap.java deleted file mode 100644 index 56b47cfc92..0000000000 --- a/src/main/java/reactor/netty/udp/UdpServerBootstrap.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; -import java.util.function.Function; - -import io.netty.bootstrap.Bootstrap; - -/** - * @author Stephane Maldini - */ -final class UdpServerBootstrap extends UdpServerOperator { - - - final Function bootstrapMapper; - - UdpServerBootstrap(UdpServer client, - Function bootstrapMapper) { - super(client); - this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper"); - } - - @Override - protected Bootstrap configure() { - return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper"); - } -} diff --git a/src/main/java/reactor/netty/udp/UdpServerConfig.java b/src/main/java/reactor/netty/udp/UdpServerConfig.java new file mode 100644 index 0000000000..7f1bd3a8b4 --- /dev/null +++ b/src/main/java/reactor/netty/udp/UdpServerConfig.java @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.udp; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ReflectiveChannelFactory; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.handler.logging.LoggingHandler; +import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; +import reactor.netty.NettyPipeline; +import reactor.netty.channel.ByteBufAllocatorMetrics; +import reactor.netty.channel.ChannelMetricsHandler; +import reactor.netty.channel.ChannelMetricsRecorder; +import reactor.netty.channel.ChannelOperations; +import reactor.netty.channel.MicrometerChannelMetricsRecorder; +import reactor.netty.resources.LoopResources; +import reactor.netty.transport.TransportConfig; +import reactor.util.Logger; +import reactor.util.Loggers; + +import javax.annotation.Nullable; +import java.net.SocketAddress; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static reactor.netty.ReactorNetty.format; + +/** + * Encapsulate all necessary configuration for UDP client transport. + * + * @author Violeta Georgieva + * @since 1.0.0 + */ +public final class UdpServerConfig extends TransportConfig { + + Consumer doOnBind; + Consumer doOnBound; + Consumer doOnUnbound; + + UdpServerConfig(Map, ?> options, Supplier localAddress) { + super(options, localAddress); + } + + UdpServerConfig(UdpServerConfig parent) { + super(parent); + this.doOnBind = parent.doOnBind; + this.doOnBound = parent.doOnBound; + this.doOnUnbound = parent.doOnUnbound; + } + + /** + * Return the configured callback + * + * @return the configured callback + */ + @Nullable + public final Consumer doOnBind() { + return doOnBind; + } + + /** + * Return the configured callback + * + * @return the configured callback + */ + @Nullable + public final Consumer doOnBound() { + return doOnBound; + } + + /** + * Return the configured callback + * + * @return the configured callback + */ + @Nullable + public final Consumer doOnUnbound() { + return doOnUnbound; + } + + @Override + protected ChannelInitializer channelInitializer(ChannelOperations.OnSetup channelOperationsProvider, + ConnectionObserver connectionObserver, @Nullable SocketAddress remoteAddress) { + return new UdpServerChannelInitializer(this, channelOperationsProvider, connectionObserver, remoteAddress); + } + + @Override + protected ChannelOperations.OnSetup channelOperationsProvider() { + return DEFAULT_OPS; + } + + @Override + protected ChannelFactory connectionFactory(EventLoopGroup elg) { + LoopResources loopResources = loopResources() != null ? loopResources() : defaultLoopResources(); + ChannelFactory channelFactory; + if (isPreferNative()) { + channelFactory = new ReflectiveChannelFactory<>(loopResources.onDatagramChannel(elg)); + } + else { + channelFactory = () -> new NioDatagramChannel(family()); + } + return channelFactory; + } + + @Override + protected LoggingHandler defaultLoggingHandler() { + return LOGGING_HANDLER; + } + + @Override + protected LoopResources defaultLoopResources() { + return UdpResources.get(); + } + + @Override + protected ChannelMetricsRecorder defaultMetricsRecorder() { + return MicrometerUdpServerMetricsRecorder.INSTANCE; + } + + @Override + protected EventLoopGroup eventLoopGroup() { + LoopResources loopResources = loopResources() != null ? loopResources() : defaultLoopResources(); + return loopResources.onClient(isPreferNative()); + } + + @Nullable + @Override + protected ConnectionObserver lifecycleObserver() { + if (doOnBound() == null && doOnUnbound() == null) { + return null; + } + return new UdpServerDoOn(doOnBound(), doOnUnbound()); + } + + static final ChannelOperations.OnSetup DEFAULT_OPS = (ch, c, msg) -> new UdpOperations(ch, c); + + static final Logger log = Loggers.getLogger(UdpServerConfig.class); + + static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(UdpServer.class); + + static final class MicrometerUdpServerMetricsRecorder extends MicrometerChannelMetricsRecorder { + + static final MicrometerUdpServerMetricsRecorder INSTANCE = + new MicrometerUdpServerMetricsRecorder(reactor.netty.Metrics.UDP_SERVER_PREFIX, "udp"); + + MicrometerUdpServerMetricsRecorder(String name, String protocol) { + super(name, protocol); + } + } + + static final class UdpServerChannelInitializer extends ChannelInitializer { + + final TransportConfig config; + final ChannelOperations.OnSetup channelOperationsProvider; + final ConnectionObserver connectionObserver; + final SocketAddress remoteAddress; + + UdpServerChannelInitializer(TransportConfig config, ChannelOperations.OnSetup channelOperationsProvider, + ConnectionObserver connectionObserver, @Nullable SocketAddress remoteAddress) { + this.config = config; + this.channelOperationsProvider = channelOperationsProvider; + this.connectionObserver = connectionObserver; + this.remoteAddress = remoteAddress; + } + + @Override + protected void initChannel(Channel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + LoggingHandler loggingHandler = config.loggingHandler(); + if (loggingHandler != null) { + pipeline.addFirst(NettyPipeline.LoggingHandler, loggingHandler); + } + + ChannelMetricsRecorder channelMetricsRecorder = config.metricsRecorder(); + if (channelMetricsRecorder != null) { + pipeline.addFirst(NettyPipeline.ChannelMetricsHandler, + new ChannelMetricsHandler(channelMetricsRecorder, remoteAddress, false)); + + ByteBufAllocator alloc = ch.alloc(); + if (alloc instanceof PooledByteBufAllocator) { + ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric()); + } + else if (alloc instanceof UnpooledByteBufAllocator) { + ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric()); + } + } + + ChannelOperations.addReactiveBridge(ch, channelOperationsProvider, connectionObserver); + + pipeline.remove(this); + + if (log.isDebugEnabled()) { + log.debug(format(ch, "Initialized pipeline {}"), pipeline.toString()); + } + } + } + + static final class UdpServerDoOn implements ConnectionObserver { + + final Consumer onBound; + final Consumer onUnbound; + + UdpServerDoOn(@Nullable Consumer onBound, + @Nullable Consumer onUnbound) { + this.onBound = onBound; + this.onUnbound = onUnbound; + } + + @Override + public void onStateChange(Connection connection, State newState) { + if (onBound != null && newState == State.CONFIGURED) { + onBound.accept(connection); + return; + } + if (onUnbound != null && newState == State.DISCONNECTING) { + connection.onDispose(() -> onUnbound.accept(connection)); + } + } + } +} diff --git a/src/main/java/reactor/netty/udp/UdpServerDoOn.java b/src/main/java/reactor/netty/udp/UdpServerDoOn.java deleted file mode 100644 index 4277e89d50..0000000000 --- a/src/main/java/reactor/netty/udp/UdpServerDoOn.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.function.Consumer; -import javax.annotation.Nullable; - -import io.netty.bootstrap.Bootstrap; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.BootstrapHandlers; - -/** - * @author Stephane Maldini - */ -final class UdpServerDoOn extends UdpServerOperator implements ConnectionObserver { - - final Consumer onBind; - final Consumer onBound; - final Consumer onUnbound; - - UdpServerDoOn(UdpServer server, - @Nullable Consumer onBind, - @Nullable Consumer onBound, - @Nullable Consumer onUnbound) { - super(server); - this.onBind = onBind; - this.onBound = onBound; - this.onUnbound = onUnbound; - } - - @Override - public Bootstrap configure() { - Bootstrap b = source.configure(); - ConnectionObserver observer = BootstrapHandlers.connectionObserver(b); - BootstrapHandlers.connectionObserver(b, observer.then(this)); - return b; - } - - @Override - public Mono bind(Bootstrap b) { - if (onBind != null) { - return source.bind(b) - .doOnSubscribe(s -> onBind.accept(b)); - } - return source.bind(b); - } - - @Override - public void onStateChange(Connection connection, State newState) { - if (onBound != null && newState == State.CONFIGURED) { - onBound.accept(connection); - return; - } - if (onUnbound != null && newState == State.DISCONNECTING) { - connection.onDispose(() -> onUnbound.accept(connection)); - } - } -} diff --git a/src/main/java/reactor/netty/udp/UdpServerObserve.java b/src/main/java/reactor/netty/udp/UdpServerObserve.java deleted file mode 100644 index c3785d433d..0000000000 --- a/src/main/java/reactor/netty/udp/UdpServerObserve.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; - -import io.netty.bootstrap.Bootstrap; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.BootstrapHandlers; - -/** - * @author Stephane Maldini - */ -final class UdpServerObserve extends UdpServerOperator { - - final ConnectionObserver observer; - - UdpServerObserve(UdpServer server, ConnectionObserver observer) { - super(server); - this.observer = Objects.requireNonNull(observer, "observer"); - } - - @Override - public Bootstrap configure() { - Bootstrap b = source.configure(); - ConnectionObserver observer = BootstrapHandlers.connectionObserver(b); - BootstrapHandlers.connectionObserver(b, observer.then(this.observer)); - return b; - } -} diff --git a/src/main/java/reactor/netty/udp/UdpServerOperator.java b/src/main/java/reactor/netty/udp/UdpServerOperator.java deleted file mode 100644 index 380d34ad83..0000000000 --- a/src/main/java/reactor/netty/udp/UdpServerOperator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; - -import io.netty.bootstrap.Bootstrap; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; - -/** - * @author Stephane Maldini - */ -abstract class UdpServerOperator extends UdpServer { - - final UdpServer source; - - UdpServerOperator(UdpServer source) { - this.source = Objects.requireNonNull(source, "source"); - } - - @Override - protected Bootstrap configure() { - return source.configure(); - } - - @Override - protected Mono bind(Bootstrap b) { - return source.bind(b); - } -} diff --git a/src/main/java/reactor/netty/udp/UdpServerRunOn.java b/src/main/java/reactor/netty/udp/UdpServerRunOn.java deleted file mode 100644 index 1a83f8f5ee..0000000000 --- a/src/main/java/reactor/netty/udp/UdpServerRunOn.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package reactor.netty.udp; - -import java.util.Objects; -import javax.annotation.Nullable; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.channel.socket.nio.NioDatagramChannel; -import reactor.netty.resources.LoopResources; - -/** - * @author Stephane Maldini - */ -final class UdpServerRunOn extends UdpServerOperator { - - final LoopResources loopResources; - final boolean preferNative; - final InternetProtocolFamily family; - - UdpServerRunOn(UdpServer server, - LoopResources loopResources, - boolean preferNative, - @Nullable InternetProtocolFamily family) { - super(server); - this.loopResources = Objects.requireNonNull(loopResources, "loopResources"); - this.preferNative = preferNative; - this.family = family; - } - - @Override - protected Bootstrap configure() { - Bootstrap b = source.configure(); - - boolean useNative = family == null && preferNative; - - EventLoopGroup elg = loopResources.onClient(useNative); - - if (useNative) { - b.channel(loopResources.onDatagramChannel(elg)); - } - else { - b.channelFactory(() -> new NioDatagramChannel(family)); - } - - return b.group(elg); - } -} diff --git a/src/test/java/reactor/netty/udp/UdpClientTest.java b/src/test/java/reactor/netty/udp/UdpClientTest.java index 2ca2d756ac..34f324685e 100644 --- a/src/test/java/reactor/netty/udp/UdpClientTest.java +++ b/src/test/java/reactor/netty/udp/UdpClientTest.java @@ -69,9 +69,7 @@ public void smokeTest() throws Exception { .runOn(resources) .handle((in, out) -> { in.receive() - .subscribe(b -> { - latch.countDown(); - }); + .subscribe(b -> latch.countDown()); return out.sendString(Mono.just("ping1")) .then(out.sendString(Mono.just("ping2"))) .neverComplete(); @@ -87,9 +85,7 @@ public void smokeTest() throws Exception { .runOn(resources) .handle((in, out) -> { in.receive() - .subscribe(b -> { - latch.countDown(); - }); + .subscribe(b -> latch.countDown()); return out.sendString(Mono.just("ping3")) .then(out.sendString(Mono.just("ping4"))) .neverComplete(); @@ -108,14 +104,22 @@ public void smokeTest() throws Exception { @Test public void testIssue192() { LoopResources resources = LoopResources.create("testIssue192"); - UdpServer server = UdpServer.create() - .runOn(resources); - UdpClient client = UdpClient.create() - .runOn(resources); + Mono server = UdpServer.create() + .runOn(resources) + .bind(); + Mono client = UdpClient.create() + .runOn(resources) + .connect(); assertThat(Thread.getAllStackTraces().keySet().stream().noneMatch(t -> t.getName().startsWith("testIssue192"))).isTrue(); - server.bind(); - client.connect(); + + Connection conn1 = server.block(Duration.ofSeconds(30)); + Connection conn2 = client.block(Duration.ofSeconds(30)); + assertThat(conn1).isNotNull(); + assertThat(conn2).isNotNull(); assertThat(Thread.getAllStackTraces().keySet().stream().anyMatch(t -> t.getName().startsWith("testIssue192"))).isTrue(); + + conn1.disposeNow(); + conn2.disposeNow(); resources.dispose(); } } diff --git a/src/test/java/reactor/netty/udp/UdpMetricsTests.java b/src/test/java/reactor/netty/udp/UdpMetricsTests.java index 3a3120771c..8ad7f44c45 100644 --- a/src/test/java/reactor/netty/udp/UdpMetricsTests.java +++ b/src/test/java/reactor/netty/udp/UdpMetricsTests.java @@ -70,7 +70,7 @@ public void setUp() { udpClient = UdpClient.create() - .addressSupplier(() -> serverConnection.address()) + .remoteAddress(() -> serverConnection.address()) .metrics(true); registry = new SimpleMeterRegistry(); diff --git a/src/test/java/reactor/netty/udp/UdpServerTests.java b/src/test/java/reactor/netty/udp/UdpServerTests.java index b0628d6980..fb96a1670a 100644 --- a/src/test/java/reactor/netty/udp/UdpServerTests.java +++ b/src/test/java/reactor/netty/udp/UdpServerTests.java @@ -143,7 +143,7 @@ public void supportsUdpMulticast() throws Exception { Connection server = UdpServer.create() .option(ChannelOption.SO_REUSEADDR, true) - .addressSupplier(() -> new InetSocketAddress(port)) + .localAddress(() -> new InetSocketAddress(port)) .runOn(resources, InternetProtocolFamily.IPv4) .handle((in, out) -> { Flux.generate(s -> { @@ -258,7 +258,7 @@ public void portBindingException() { try { UdpServer.create() - .addressSupplier(conn::address) + .localAddress(conn::address) .bindNow(Duration.ofSeconds(30)); fail("illegal-success"); }