Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Unix domain datagram sockets when using native epoll/kqueue transport #1741

Merged
merged 2 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/asciidoc/udp-client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,17 @@ include::{examplesdir}/metrics/custom/Application.java[lines=18..37]
----
<1> Enables UDP client metrics and provides {javadoc}/reactor/netty/channel/ChannelMetricsRecorder.html[`ChannelMetricsRecorder`] implementation.
====

== Unix Domain Sockets
The `UdpClient` supports Unix Domain Datagram Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

====
[source,java,indent=0]
.{examplesdir}/uds/Application.java
----
include::{examplesdir}/uds/Application.java[lines=18..42]
----
<1> Specifies `DomainSocketAddress` that will be used
====
14 changes: 14 additions & 0 deletions docs/asciidoc/udp-server.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,17 @@ include::{examplesdir}/metrics/custom/Application.java[lines=18..35]
----
<1> Enables UDP server metrics and provides {javadoc}/reactor/netty/channel/ChannelMetricsRecorder.html[`ChannelMetricsRecorder`] implementation.
====

== Unix Domain Sockets
The `UdpServer` supports Unix Domain Datagram Sockets (UDS) when native transport is in use.

The following example shows how to use UDS support:

====
[source,java,indent=0]
.{examplesdir}/uds/Application.java
----
include::{examplesdir}/uds/Application.java[lines=18..48]
----
<1> Specifies `DomainSocketAddress` that will be used
====
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.channel.Channel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.unix.DomainDatagramChannel;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
Expand All @@ -44,7 +45,7 @@ public interface DisposableChannel extends Disposable {
*/
default SocketAddress address() {
Channel c = channel();
if (c instanceof DatagramChannel) {
if (c instanceof DatagramChannel || c instanceof DomainDatagramChannel) {
SocketAddress a = c.remoteAddress();
return a != null ? a : c.localAddress();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollDomainDatagramChannel;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
Expand All @@ -29,6 +30,7 @@
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.unix.DomainDatagramChannel;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.ServerDomainSocketChannel;
import reactor.util.Logger;
Expand Down Expand Up @@ -60,6 +62,9 @@ public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass)
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new EpollServerDomainSocketChannel();
}
if (channelClass.equals(DomainDatagramChannel.class)) {
return (CHANNEL) new EpollDomainDatagramChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.kqueue.KQueueDomainDatagramChannel;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
Expand All @@ -29,6 +30,7 @@
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.unix.DomainDatagramChannel;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.ServerDomainSocketChannel;
import reactor.util.Logger;
Expand Down Expand Up @@ -59,6 +61,9 @@ public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass)
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new KQueueServerDomainSocketChannel();
}
if (channelClass.equals(DomainDatagramChannel.class)) {
return (CHANNEL) new KQueueDomainDatagramChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.unix.DomainDatagramChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.resolver.AddressResolverGroup;
Expand Down Expand Up @@ -79,19 +80,13 @@ public final InternetProtocolFamily family() {

@Override
protected Class<? extends Channel> channelType(boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
return DatagramChannel.class;
return isDomainSocket ? DomainDatagramChannel.class : DatagramChannel.class;
}

@Override
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
if (isPreferNative()) {
return () -> loopResources().onChannel(DatagramChannel.class, elg);
return super.connectionFactory(elg, isDomainSocket);
}
else {
return () -> new NioDatagramChannel(family());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ interface UdpConnection {
* @param multicastAddress multicast address of the group to join
*
* @return a {@link Publisher} that will be complete when the group has been joined
* @throws UnsupportedOperationException when Unix Domain Sockets
*/
default Mono<Void> join(InetAddress multicastAddress) {
return join(multicastAddress, null);
Expand All @@ -43,6 +44,7 @@ default Mono<Void> join(InetAddress multicastAddress) {
* @param multicastAddress multicast address of the group to join
*
* @return a {@link Publisher} that will be complete when the group has been joined
* @throws UnsupportedOperationException when Unix Domain Sockets
*/
Mono<Void> join(final InetAddress multicastAddress, @Nullable NetworkInterface iface);

Expand All @@ -52,6 +54,7 @@ default Mono<Void> join(InetAddress multicastAddress) {
* @param multicastAddress multicast address of the group to leave
*
* @return a {@link Publisher} that will be complete when the group has been left
* @throws UnsupportedOperationException when Unix Domain Sockets
*/
default Mono<Void> leave(InetAddress multicastAddress) {
return leave(multicastAddress, null);
Expand All @@ -63,6 +66,7 @@ default Mono<Void> leave(InetAddress multicastAddress) {
* @param multicastAddress multicast address of the group to leave
*
* @return a {@link Publisher} that will be complete when the group has been left
* @throws UnsupportedOperationException when Unix Domain Sockets
*/
Mono<Void> leave(final InetAddress multicastAddress, @Nullable NetworkInterface iface);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@
final class UdpOperations extends ChannelOperations<UdpInbound, UdpOutbound>
implements UdpInbound, UdpOutbound {

final DatagramChannel datagramChannel;

UdpOperations(Connection c, ConnectionObserver listener) {
super(c, listener);
this.datagramChannel = (DatagramChannel) c.channel();
}

/**
Expand All @@ -55,6 +52,10 @@ final class UdpOperations extends ChannelOperations<UdpInbound, UdpOutbound>
*/
@Override
public Mono<Void> join(final InetAddress multicastAddress, @Nullable NetworkInterface iface) {
if (!(connection().channel() instanceof DatagramChannel)) {
throw new UnsupportedOperationException();
}
DatagramChannel datagramChannel = (DatagramChannel) connection().channel();
if (null == iface && null != datagramChannel.config().getNetworkInterface()) {
iface = datagramChannel.config().getNetworkInterface();
}
Expand Down Expand Up @@ -86,6 +87,10 @@ public Mono<Void> join(final InetAddress multicastAddress, @Nullable NetworkInte
*/
@Override
public Mono<Void> leave(final InetAddress multicastAddress, @Nullable NetworkInterface iface) {
if (!(connection().channel() instanceof DatagramChannel)) {
throw new UnsupportedOperationException();
}
DatagramChannel datagramChannel = (DatagramChannel) connection().channel();
if (null == iface && null != datagramChannel.config().getNetworkInterface()) {
iface = datagramChannel.config().getNetworkInterface();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.unix.DomainDatagramChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.ChannelPipelineConfigurer;
Expand Down Expand Up @@ -117,19 +118,13 @@ public final InternetProtocolFamily family() {

@Override
protected Class<? extends Channel> channelType(boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
return DatagramChannel.class;
return isDomainSocket ? DomainDatagramChannel.class : DatagramChannel.class;
}

@Override
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
if (isPreferNative()) {
return () -> loopResources().onChannel(DatagramChannel.class, elg);
return super.connectionFactory(elg, isDomainSocket);
}
else {
return () -> new NioDatagramChannel(family());
Expand Down
Loading