From a1606903ce8a914f549ba38a2acfe5903db8b417 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 8 Mar 2021 09:32:19 +1300 Subject: [PATCH 1/7] [Java] Parse socket parameters in channel URI. --- .../src/main/java/io/aeron/CommonContext.java | 10 +++++ .../io/aeron/driver/media/UdpChannel.java | 45 ++++++++++++++++++- .../java/io/aeron/driver/UdpChannelTest.java | 14 ++++++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/aeron-client/src/main/java/io/aeron/CommonContext.java b/aeron-client/src/main/java/io/aeron/CommonContext.java index 8aacc23990..8296c605cc 100644 --- a/aeron-client/src/main/java/io/aeron/CommonContext.java +++ b/aeron-client/src/main/java/io/aeron/CommonContext.java @@ -304,6 +304,16 @@ public static InferableBoolean parse(final String value) */ public static final String SPIES_SIMULATE_CONNECTION_PARAM_NAME = "ssc"; + /** + * Parameter name for the underlying OS socket send buffer size + */ + public static final String SOCKET_SNDBUF_PARAM_NAME = "socket-sndbuf"; + + /** + * Parameter name for the underlying OS socket receive buffer size + */ + public static final String SOCKET_RCVBUF_PARAM_NAME = "socket-rcvbuf"; + /** * Using an integer because there is no support for boolean. 1 is concluded, 0 is not concluded. */ diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java index 0c9bd4c443..93c98b6852 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java @@ -15,6 +15,7 @@ */ package io.aeron.driver.media; +import io.aeron.Aeron; import io.aeron.ChannelUri; import io.aeron.CommonContext; import io.aeron.driver.DefaultNameResolver; @@ -61,6 +62,8 @@ public final class UdpChannel private final NetworkInterface localInterface; private final ProtocolFamily protocolFamily; private final ChannelUri channelUri; + private final int socketRcvBuf; + private final int socketSndBuf; private UdpChannel(final Context context) { @@ -82,6 +85,8 @@ private UdpChannel(final Context context) localInterface = context.localInterface; protocolFamily = context.protocolFamily; channelUri = context.channelUri; + socketRcvBuf = context.socketRcvBuf; + socketSndBuf = context.socketSndBuf; } /** @@ -134,6 +139,8 @@ public static UdpChannel parse( final String controlMode = channelUri.get(CommonContext.MDC_CONTROL_MODE_PARAM_NAME); final boolean isManualControlMode = CommonContext.MDC_CONTROL_MODE_MANUAL.equals(controlMode); final boolean isDynamicControlMode = CommonContext.MDC_CONTROL_MODE_DYNAMIC.equals(controlMode); + final int socketRcvBuf = Integer.parseInt(channelUri.get(CommonContext.SOCKET_RCVBUF_PARAM_NAME, "-1")); + final int socketSndBuf = Integer.parseInt(channelUri.get(CommonContext.SOCKET_SNDBUF_PARAM_NAME, "-1")); final boolean requiresAdditionalSuffix = !isDestination && (null == endpointAddress && null == explicitControlAddress || @@ -186,7 +193,9 @@ public static UdpChannel parse( .isManualControlMode(isManualControlMode) .isDynamicControlMode(isDynamicControlMode) .hasExplicitEndpoint(hasExplicitEndpoint) - .hasNoDistinguishingCharacteristic(hasNoDistinguishingCharacteristic); + .hasNoDistinguishingCharacteristic(hasNoDistinguishingCharacteristic) + .socketRcvBuf(socketRcvBuf) + .socketSndBuf(socketSndBuf); if (null != tagIdStr) { @@ -623,6 +632,26 @@ public static InetSocketAddress destinationAddress(final ChannelUri uri, final N } } + /** + * Get the socket receive buffer size + * + * @return socket receive buffer size or {@link Aeron#NULL_VALUE} if not specified + */ + public int socketRcvBuf() + { + return socketRcvBuf; + } + + /** + * Get the socket send buffer size + * + * @return socket send buffer size or {@link Aeron#NULL_VALUE} if not specified + */ + public int socketSndBuf() + { + return socketSndBuf; + } + /** * Resolve and endpoint into a {@link InetSocketAddress}. * @@ -804,6 +833,8 @@ static class Context boolean hasMulticastTtl = false; boolean hasTagId = false; boolean hasNoDistinguishingCharacteristic = false; + int socketRcvBuf = Aeron.NULL_VALUE; + int socketSndBuf = Aeron.NULL_VALUE; Context uriStr(final String uri) { @@ -918,5 +949,17 @@ Context hasNoDistinguishingCharacteristic(final boolean hasNoDistinguishingChara this.hasNoDistinguishingCharacteristic = hasNoDistinguishingCharacteristic; return this; } + + Context socketRcvBuf(final int socketRcvBuf) + { + this.socketRcvBuf = socketRcvBuf; + return this; + } + + Context socketSndBuf(final int socketSndBuf) + { + this.socketSndBuf = socketSndBuf; + return this; + } } } diff --git a/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java b/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java index 90b4771ed9..324c4e41f2 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java @@ -15,6 +15,7 @@ */ package io.aeron.driver; +import io.aeron.Aeron; import io.aeron.ChannelUriStringBuilder; import io.aeron.driver.exceptions.InvalidChannelException; import io.aeron.driver.media.UdpChannel; @@ -458,6 +459,19 @@ void shouldUseTagsInCanonicalFormForWildcardPorts() UdpChannel.parse("aeron:udp?endpoint=127.0.0.1:0|tags=1001").canonicalForm()); } + @Test + void shouldParseSocketRcvAndSndBufSizes() + { + final UdpChannel udpChannelWithBufferSizes = UdpChannel.parse( + "aeron:udp?endpoint=127.0.0.1:9999|socket-sndbuf=4096|socket-rcvbuf=8192"); + assertEquals(4096, udpChannelWithBufferSizes.socketSndBuf()); + assertEquals(8192, udpChannelWithBufferSizes.socketRcvBuf()); + + final UdpChannel udpChannelWithoutBufferSizes = UdpChannel.parse("aeron:udp?endpoint=127.0.0.1:9999"); + assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketRcvBuf()); + assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketSndBuf()); + } + @ParameterizedTest @CsvSource({ "NAME_ENDPOINT,192.168.1.1,,,UDP-127.0.0.1:0-NAME_ENDPOINT", From 68cb1dbad637ead6ab73d58b2cd33672d9e5551d Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 8 Mar 2021 10:18:11 +1300 Subject: [PATCH 2/7] [Java] Make naming consistent. Use snd/rcv buf lengths for URI if specified. --- .../io/aeron/driver/media/UdpChannel.java | 41 +++++++++--------- .../driver/media/UdpChannelTransport.java | 18 ++++++-- .../driver/SelectorAndTransportTest.java | 43 +++++++++++++++++++ .../java/io/aeron/driver/UdpChannelTest.java | 8 ++-- 4 files changed, 81 insertions(+), 29 deletions(-) diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java index 93c98b6852..bc166e2f5a 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java @@ -15,7 +15,6 @@ */ package io.aeron.driver.media; -import io.aeron.Aeron; import io.aeron.ChannelUri; import io.aeron.CommonContext; import io.aeron.driver.DefaultNameResolver; @@ -62,8 +61,8 @@ public final class UdpChannel private final NetworkInterface localInterface; private final ProtocolFamily protocolFamily; private final ChannelUri channelUri; - private final int socketRcvBuf; - private final int socketSndBuf; + private final int socketRcvbufLength; + private final int socketSndbufLength; private UdpChannel(final Context context) { @@ -85,8 +84,8 @@ private UdpChannel(final Context context) localInterface = context.localInterface; protocolFamily = context.protocolFamily; channelUri = context.channelUri; - socketRcvBuf = context.socketRcvBuf; - socketSndBuf = context.socketSndBuf; + socketRcvbufLength = context.socketRcvbufLength; + socketSndbufLength = context.socketSndbufLength; } /** @@ -139,8 +138,8 @@ public static UdpChannel parse( final String controlMode = channelUri.get(CommonContext.MDC_CONTROL_MODE_PARAM_NAME); final boolean isManualControlMode = CommonContext.MDC_CONTROL_MODE_MANUAL.equals(controlMode); final boolean isDynamicControlMode = CommonContext.MDC_CONTROL_MODE_DYNAMIC.equals(controlMode); - final int socketRcvBuf = Integer.parseInt(channelUri.get(CommonContext.SOCKET_RCVBUF_PARAM_NAME, "-1")); - final int socketSndBuf = Integer.parseInt(channelUri.get(CommonContext.SOCKET_SNDBUF_PARAM_NAME, "-1")); + final int socketRcvbufLength = Integer.parseInt(channelUri.get(CommonContext.SOCKET_RCVBUF_PARAM_NAME, "0")); + final int socketSndbufLength = Integer.parseInt(channelUri.get(CommonContext.SOCKET_SNDBUF_PARAM_NAME, "0")); final boolean requiresAdditionalSuffix = !isDestination && (null == endpointAddress && null == explicitControlAddress || @@ -194,8 +193,8 @@ public static UdpChannel parse( .isDynamicControlMode(isDynamicControlMode) .hasExplicitEndpoint(hasExplicitEndpoint) .hasNoDistinguishingCharacteristic(hasNoDistinguishingCharacteristic) - .socketRcvBuf(socketRcvBuf) - .socketSndBuf(socketSndBuf); + .socketRcvbufLength(socketRcvbufLength) + .socketSndbufLength(socketSndbufLength); if (null != tagIdStr) { @@ -635,21 +634,21 @@ public static InetSocketAddress destinationAddress(final ChannelUri uri, final N /** * Get the socket receive buffer size * - * @return socket receive buffer size or {@link Aeron#NULL_VALUE} if not specified + * @return socket receive buffer size or 0 if not specified */ - public int socketRcvBuf() + public int socketRcvbufLength() { - return socketRcvBuf; + return socketRcvbufLength; } /** * Get the socket send buffer size * - * @return socket send buffer size or {@link Aeron#NULL_VALUE} if not specified + * @return socket send buffer size or 0 if not specified */ - public int socketSndBuf() + public int socketSndbufLenth() { - return socketSndBuf; + return socketSndbufLength; } /** @@ -833,8 +832,8 @@ static class Context boolean hasMulticastTtl = false; boolean hasTagId = false; boolean hasNoDistinguishingCharacteristic = false; - int socketRcvBuf = Aeron.NULL_VALUE; - int socketSndBuf = Aeron.NULL_VALUE; + int socketRcvbufLength = 0; + int socketSndbufLength = 0; Context uriStr(final String uri) { @@ -950,15 +949,15 @@ Context hasNoDistinguishingCharacteristic(final boolean hasNoDistinguishingChara return this; } - Context socketRcvBuf(final int socketRcvBuf) + Context socketRcvbufLength(final int socketRcvbufLength) { - this.socketRcvBuf = socketRcvBuf; + this.socketRcvbufLength = socketRcvbufLength; return this; } - Context socketSndBuf(final int socketSndBuf) + Context socketSndbufLength(final int socketSndbufLength) { - this.socketSndBuf = socketSndBuf; + this.socketSndbufLength = socketSndbufLength; return this; } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannelTransport.java b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannelTransport.java index 8eb7ca0c99..d1137c0f32 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannelTransport.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannelTransport.java @@ -173,14 +173,14 @@ else if (context.socketMulticastTtl() != 0) sendDatagramChannel.connect(connectAddress); } - if (0 != context.socketSndbufLength()) + if (0 != socketSndbufLength()) { - sendDatagramChannel.setOption(SO_SNDBUF, context.socketSndbufLength()); + sendDatagramChannel.setOption(SO_SNDBUF, socketSndbufLength()); } - if (0 != context.socketRcvbufLength()) + if (0 != socketRcvbufLength()) { - receiveDatagramChannel.setOption(SO_RCVBUF, context.socketRcvbufLength()); + receiveDatagramChannel.setOption(SO_RCVBUF, socketRcvbufLength()); } sendDatagramChannel.configureBlocking(false); @@ -436,4 +436,14 @@ public void updateEndpoint(final InetSocketAddress newAddress, final AtomicCount throw new AeronException(message, ex); } } + + private int socketSndbufLength() + { + return 0 != udpChannel.socketSndbufLenth() ? udpChannel.socketSndbufLenth() : context.socketSndbufLength(); + } + + private int socketRcvbufLength() + { + return 0 != udpChannel.socketRcvbufLength() ? udpChannel.socketRcvbufLength() : context.socketRcvbufLength(); + } } diff --git a/aeron-driver/src/test/java/io/aeron/driver/SelectorAndTransportTest.java b/aeron-driver/src/test/java/io/aeron/driver/SelectorAndTransportTest.java index 99ae27f17a..ca703b9910 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/SelectorAndTransportTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/SelectorAndTransportTest.java @@ -31,9 +31,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import java.io.IOException; import java.net.InetSocketAddress; +import java.net.StandardProtocolFamily; +import java.net.StandardSocketOptions; import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; @@ -131,6 +137,43 @@ public void shouldHandleBasicSetupAndTearDown() processLoop(dataTransportPoller, 5); } + @Test + void shouldSetSocketBufferSizesFromUdpChannelForReceiveChannel() throws IOException + { + final DatagramChannel spyChannel = spy(DatagramChannel.open(StandardProtocolFamily.INET)); + final UdpChannel channel = UdpChannel.parse( + "aeron:udp?endpoint=localhost:" + RCV_PORT + "|socket-sndbuf=8192|socket-rcvbuf=4096"); + receiveChannelEndpoint = new ReceiveChannelEndpoint( + channel, mockDispatcher, mockReceiveStatusIndicator, context); + + try (MockedStatic mockDatagramChannel = Mockito.mockStatic(DatagramChannel.class)) + { + mockDatagramChannel.when(() -> DatagramChannel.open(StandardProtocolFamily.INET)).thenReturn(spyChannel); + receiveChannelEndpoint.openDatagramChannel(mockReceiveStatusIndicator); + + verify(spyChannel).setOption(StandardSocketOptions.SO_SNDBUF, 8192); + verify(spyChannel).setOption(StandardSocketOptions.SO_RCVBUF, 4096); + } + } + + @Test + void shouldSetSocketBufferSizesFromUdpChannelForSendChannel() throws IOException + { + final DatagramChannel spyChannel = spy(DatagramChannel.open(StandardProtocolFamily.INET)); + final UdpChannel channel = UdpChannel.parse( + "aeron:udp?endpoint=localhost:" + RCV_PORT + "|socket-sndbuf=8192|socket-rcvbuf=4096"); + sendChannelEndpoint = new SendChannelEndpoint(channel, mockReceiveStatusIndicator, context); + + try (MockedStatic mockDatagramChannel = Mockito.mockStatic(DatagramChannel.class)) + { + mockDatagramChannel.when(() -> DatagramChannel.open(StandardProtocolFamily.INET)).thenReturn(spyChannel); + sendChannelEndpoint.openDatagramChannel(mockReceiveStatusIndicator); + + verify(spyChannel).setOption(StandardSocketOptions.SO_SNDBUF, 8192); + verify(spyChannel).setOption(StandardSocketOptions.SO_RCVBUF, 4096); + } + } + @Test @Timeout(10) public void shouldSendEmptyDataFrameUnicastFromSourceToReceiver() diff --git a/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java b/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java index 324c4e41f2..46bc969ca3 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java @@ -464,12 +464,12 @@ void shouldParseSocketRcvAndSndBufSizes() { final UdpChannel udpChannelWithBufferSizes = UdpChannel.parse( "aeron:udp?endpoint=127.0.0.1:9999|socket-sndbuf=4096|socket-rcvbuf=8192"); - assertEquals(4096, udpChannelWithBufferSizes.socketSndBuf()); - assertEquals(8192, udpChannelWithBufferSizes.socketRcvBuf()); + assertEquals(4096, udpChannelWithBufferSizes.socketSndbufLenth()); + assertEquals(8192, udpChannelWithBufferSizes.socketRcvbufLength()); final UdpChannel udpChannelWithoutBufferSizes = UdpChannel.parse("aeron:udp?endpoint=127.0.0.1:9999"); - assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketRcvBuf()); - assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketSndBuf()); + assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketRcvbufLength()); + assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketSndbufLenth()); } @ParameterizedTest From bcda42a83a50f6c48cc3af623149baa7c321bf6b Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 8 Mar 2021 10:51:58 +1300 Subject: [PATCH 3/7] [Java] Add socket rcv/snd buffer lengths to ChannelUriStringBuilder. --- .../io/aeron/ChannelUriStringBuilder.java | 91 +++++++++++++++++++ .../io/aeron/ChannelUriStringBuilderTest.java | 14 +++ .../io/aeron/driver/media/UdpChannel.java | 6 +- .../java/io/aeron/driver/UdpChannelTest.java | 5 +- 4 files changed, 111 insertions(+), 5 deletions(-) diff --git a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java index 9bdf15836b..9fc0ab484c 100644 --- a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java +++ b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java @@ -68,6 +68,8 @@ public final class ChannelUriStringBuilder private Boolean rejoin; private Boolean ssc; private boolean isSessionIdTagged; + private Integer socketSndbufLength; + private Integer socketRcvbufLength; /** * Clear out all the values thus setting back to the initial state. @@ -102,6 +104,8 @@ public ChannelUriStringBuilder clear() group = null; rejoin = null; isSessionIdTagged = false; + socketRcvbufLength = null; + socketSndbufLength = null; return this; } @@ -1441,6 +1445,83 @@ public ChannelUriStringBuilder initialPosition(final long position, final int in return this; } + /** + * Set the underlying OS send buffer length. + * + * @param socketSndbufLength paramter to be passed as SO_SNDBUF value + * @return this for a fluent API. + * @see CommonContext#SOCKET_SNDBUF_PARAM_NAME + */ + public ChannelUriStringBuilder socketSndbufLength(final Integer socketSndbufLength) + { + this.socketSndbufLength = socketSndbufLength; + return this; + } + + /** + * Set the underlying OS send buffer length from an existing {@link ChannelUri} which may be (null). + * + * @param channelUri to read the value from. + * @return this for a fluent API. + * @see CommonContext#SOCKET_SNDBUF_PARAM_NAME + */ + public ChannelUriStringBuilder socketSndbufLength(final ChannelUri channelUri) + { + final String socketSndbufLengthString = channelUri.get(SOCKET_SNDBUF_PARAM_NAME); + this.socketSndbufLength = null == socketSndbufLengthString ? null : Integer.valueOf(socketSndbufLengthString); + return this; + } + + /** + * Get the underling OS send buffer length setting + * + * @return underlying OS send buffer length setting or null if not specified. + * @see CommonContext#SOCKET_SNDBUF_PARAM_NAME + */ + public Integer socketSndbufLength() + { + return socketSndbufLength; + } + + + /** + * Set the underlying OS receive buffer length. + * + * @param socketRcvbufLength paramter to be passed as SO_SNDBUF value + * @return this for a fluent API. + * @see CommonContext#SOCKET_RCVBUF_PARAM_NAME + */ + public ChannelUriStringBuilder socketRcvbufLength(final Integer socketRcvbufLength) + { + this.socketRcvbufLength = socketRcvbufLength; + return this; + } + + /** + * Set the underlying OS receive buffer length from an existing {@link ChannelUri} which may be (null). + * + * @param channelUri to read the value from. + * @return this for a fluent API. + * @see CommonContext#SOCKET_RCVBUF_PARAM_NAME + */ + public ChannelUriStringBuilder socketRcvbufLength(final ChannelUri channelUri) + { + final String socketRcvbufLengthString = channelUri.get(SOCKET_RCVBUF_PARAM_NAME); + this.socketRcvbufLength = null == socketRcvbufLengthString ? null : Integer.valueOf(socketRcvbufLengthString); + return this; + } + + /** + * Get the underling OS receive buffer length setting + * + * @return underlying OS receive buffer length setting or null if not specified. + * @see CommonContext#SOCKET_RCVBUF_PARAM_NAME + */ + public Integer socketRcvbufLength() + { + return socketRcvbufLength; + } + /** * Build a channel URI String for the given parameters. * @@ -1578,6 +1659,16 @@ public String build() sb.append(SPIES_SIMULATE_CONNECTION_PARAM_NAME).append('=').append(ssc).append('|'); } + if (null != socketSndbufLength) + { + sb.append(SOCKET_SNDBUF_PARAM_NAME).append('=').append(socketSndbufLength).append('|'); + } + + if (null != socketRcvbufLength) + { + sb.append(SOCKET_RCVBUF_PARAM_NAME).append('=').append(socketRcvbufLength).append('|'); + } + final char lastChar = sb.charAt(sb.length() - 1); if (lastChar == '|' || lastChar == '?') { diff --git a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java index 8646fc8524..414f6b753c 100644 --- a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java +++ b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java @@ -100,4 +100,18 @@ public void shouldGenerateReplayUdpChannel() "aeron:udp?endpoint=address:9999|term-length=131072|init-term-id=777|term-id=999|term-offset=64", builder.build()); } + + @Test + public void shouldGenerateChannelWithSocketParameters() + { + final ChannelUriStringBuilder builder = new ChannelUriStringBuilder() + .media("udp") + .endpoint("address:9999") + .socketSndbufLength(8192) + .socketRcvbufLength(4096); + + assertEquals( + "aeron:udp?endpoint=address:9999|socket-sndbuf=8192|socket-rcvbuf=4096", + builder.build()); + } } \ No newline at end of file diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java index bc166e2f5a..7cf9b00315 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java +++ b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java @@ -138,8 +138,10 @@ public static UdpChannel parse( final String controlMode = channelUri.get(CommonContext.MDC_CONTROL_MODE_PARAM_NAME); final boolean isManualControlMode = CommonContext.MDC_CONTROL_MODE_MANUAL.equals(controlMode); final boolean isDynamicControlMode = CommonContext.MDC_CONTROL_MODE_DYNAMIC.equals(controlMode); - final int socketRcvbufLength = Integer.parseInt(channelUri.get(CommonContext.SOCKET_RCVBUF_PARAM_NAME, "0")); - final int socketSndbufLength = Integer.parseInt(channelUri.get(CommonContext.SOCKET_SNDBUF_PARAM_NAME, "0")); + final int socketRcvbufLength = Integer.parseInt( + channelUri.get(CommonContext.SOCKET_RCVBUF_PARAM_NAME, "0")); + final int socketSndbufLength = Integer.parseInt( + channelUri.get(CommonContext.SOCKET_SNDBUF_PARAM_NAME, "0")); final boolean requiresAdditionalSuffix = !isDestination && (null == endpointAddress && null == explicitControlAddress || diff --git a/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java b/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java index 46bc969ca3..c244d32b0f 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/UdpChannelTest.java @@ -15,7 +15,6 @@ */ package io.aeron.driver; -import io.aeron.Aeron; import io.aeron.ChannelUriStringBuilder; import io.aeron.driver.exceptions.InvalidChannelException; import io.aeron.driver.media.UdpChannel; @@ -468,8 +467,8 @@ void shouldParseSocketRcvAndSndBufSizes() assertEquals(8192, udpChannelWithBufferSizes.socketRcvbufLength()); final UdpChannel udpChannelWithoutBufferSizes = UdpChannel.parse("aeron:udp?endpoint=127.0.0.1:9999"); - assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketRcvbufLength()); - assertEquals(Aeron.NULL_VALUE, udpChannelWithoutBufferSizes.socketSndbufLenth()); + assertEquals(0, udpChannelWithoutBufferSizes.socketRcvbufLength()); + assertEquals(0, udpChannelWithoutBufferSizes.socketSndbufLenth()); } @ParameterizedTest From 8ac89d79109169eec25c52606de72d8f45482e71 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 8 Mar 2021 11:44:47 +1300 Subject: [PATCH 4/7] [C] Add channel parameters for socket snd/rcv buf. --- aeron-client/src/main/c/uri/aeron_uri.c | 22 +++++++++++++++++++ aeron-client/src/main/c/uri/aeron_uri.h | 3 +++ .../main/c/media/aeron_receive_destination.c | 7 ++++-- .../c/media/aeron_send_channel_endpoint.c | 7 ++++-- .../src/main/c/media/aeron_udp_channel.c | 6 +++++ .../src/main/c/media/aeron_udp_channel.h | 2 ++ .../src/test/c/aeron_udp_channel_test.cpp | 9 ++++++++ 7 files changed, 52 insertions(+), 4 deletions(-) diff --git a/aeron-client/src/main/c/uri/aeron_uri.c b/aeron-client/src/main/c/uri/aeron_uri.c index ddee45b41e..f3ab442066 100755 --- a/aeron-client/src/main/c/uri/aeron_uri.c +++ b/aeron-client/src/main/c/uri/aeron_uri.c @@ -437,6 +437,28 @@ int aeron_uri_get_ats(aeron_uri_params_t *uri_params, aeron_uri_ats_status_t *ur return 1; } +int aeron_uri_get_socket_bufs(aeron_uri_params_t *uri_params, size_t *socket_sndbuf, size_t *socket_rcvbuf) +{ + int64_t socket_sndbuf_tmp = 0; + if (aeron_uri_get_int64(uri_params, AERON_URI_SOCKET_SNDBUF_KEY, &socket_sndbuf_tmp) < 0) + { + AERON_SET_ERR(EINVAL, "%s", "Failed to parse socket-sndbuf"); + return -1; + } + + int64_t socket_rcvbuf_tmp = 0; + if (aeron_uri_get_int64(uri_params, AERON_URI_SOCKET_RCVBUF_KEY, &socket_rcvbuf_tmp) < 0) + { + AERON_SET_ERR(EINVAL, "%s", "Failed to parse socket-rcvbuf"); + return -1; + } + + *socket_sndbuf = (size_t)socket_sndbuf_tmp; + *socket_rcvbuf = (size_t)socket_rcvbuf_tmp; + + return 0; +} + int64_t aeron_uri_parse_tag(const char *tag_str) { errno = 0; diff --git a/aeron-client/src/main/c/uri/aeron_uri.h b/aeron-client/src/main/c/uri/aeron_uri.h index 370bf92140..abe436d280 100644 --- a/aeron-client/src/main/c/uri/aeron_uri.h +++ b/aeron-client/src/main/c/uri/aeron_uri.h @@ -62,6 +62,8 @@ aeron_uri_params_t; #define AERON_URI_CC_KEY "cc" #define AERON_URI_SPIES_SIMULATE_CONNECTION_KEY "ssc" #define AERON_URI_ATS_KEY "ats" +#define AERON_URI_SOCKET_SNDBUF_KEY "socket-sndbuf" +#define AERON_URI_SOCKET_RCVBUF_KEY "socket-rcvbuf" #define AERON_URI_INVALID_TAG (-1) @@ -133,6 +135,7 @@ int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t int aeron_uri_get_bool(aeron_uri_params_t *uri_params, const char *key, bool *retval); int aeron_uri_get_ats(aeron_uri_params_t *uri_params, aeron_uri_ats_status_t *uri_ats_status); int aeron_uri_sprint(aeron_uri_t *uri, char *buffer, size_t buffer_len); +int aeron_uri_get_socket_bufs(aeron_uri_params_t *uri_params, size_t *socket_sndbuf, size_t *socket_rcvbuf); int64_t aeron_uri_parse_tag(const char *tag_str); diff --git a/aeron-driver/src/main/c/media/aeron_receive_destination.c b/aeron-driver/src/main/c/media/aeron_receive_destination.c index 4f2a41b466..6efe0ff6e3 100644 --- a/aeron-driver/src/main/c/media/aeron_receive_destination.c +++ b/aeron-driver/src/main/c/media/aeron_receive_destination.c @@ -41,14 +41,17 @@ int aeron_receive_destination_create( _destination->transport.data_paths = _destination->data_paths; _destination->local_sockaddr_indicator.counter_id = AERON_NULL_COUNTER_ID; + const size_t socket_sndbuf = 0 != channel->socket_sndbuf ? channel->socket_sndbuf : context->socket_sndbuf; + const size_t socket_rcvbuf = 0 != channel->socket_rcvbuf ? channel->socket_rcvbuf : context->socket_rcvbuf; + if (context->udp_channel_transport_bindings->init_func( &_destination->transport, &channel->remote_data, &channel->local_data, channel->interface_index, 0 != channel->multicast_ttl ? channel->multicast_ttl : context->multicast_ttl, - context->socket_rcvbuf, - context->socket_sndbuf, + socket_rcvbuf, + socket_sndbuf, context, AERON_UDP_CHANNEL_TRANSPORT_AFFINITY_RECEIVER) < 0) { diff --git a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c index e43fa04055..9809caa123 100644 --- a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c +++ b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c @@ -82,14 +82,17 @@ int aeron_send_channel_endpoint_create( _endpoint->data_paths = &context->sender_proxy->sender->data_paths; _endpoint->transport.data_paths = _endpoint->data_paths; + const size_t socket_sndbuf = 0 != channel->socket_sndbuf ? channel->socket_sndbuf : context->socket_sndbuf; + const size_t socket_rcvbuf = 0 != channel->socket_rcvbuf ? channel->socket_rcvbuf : context->socket_rcvbuf; + if (context->udp_channel_transport_bindings->init_func( &_endpoint->transport, channel->is_multicast ? &channel->remote_control : &channel->local_control, channel->is_multicast ? &channel->local_control : &channel->remote_control, channel->interface_index, 0 != channel->multicast_ttl ? channel->multicast_ttl : context->multicast_ttl, - context->socket_rcvbuf, - context->socket_sndbuf, + socket_rcvbuf, + socket_sndbuf, context, AERON_UDP_CHANNEL_TRANSPORT_AFFINITY_SENDER) < 0) { diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel.c b/aeron-driver/src/main/c/media/aeron_udp_channel.c index a2779e563a..589679c317 100644 --- a/aeron-driver/src/main/c/media/aeron_udp_channel.c +++ b/aeron-driver/src/main/c/media/aeron_udp_channel.c @@ -290,6 +290,12 @@ int aeron_udp_channel_parse( goto error_cleanup; } + if (aeron_uri_get_socket_bufs( + &_channel->uri.params.udp.additional_params, &_channel->socket_sndbuf, &_channel->socket_rcvbuf) < 0) + { + goto error_cleanup; + } + if (aeron_is_addr_multicast(&endpoint_addr)) { memcpy(&_channel->remote_data, &endpoint_addr, AERON_ADDR_LEN(&endpoint_addr)); diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel.h b/aeron-driver/src/main/c/media/aeron_udp_channel.h index 16a752a919..b7163c7613 100644 --- a/aeron-driver/src/main/c/media/aeron_udp_channel.h +++ b/aeron-driver/src/main/c/media/aeron_udp_channel.h @@ -42,6 +42,8 @@ typedef struct aeron_udp_channel_stct bool is_dynamic_control_mode; bool is_multicast; aeron_uri_ats_status_t ats_status; + size_t socket_sndbuf; + size_t socket_rcvbuf; } aeron_udp_channel_t; diff --git a/aeron-driver/src/test/c/aeron_udp_channel_test.cpp b/aeron-driver/src/test/c/aeron_udp_channel_test.cpp index c886eefb1b..0b7adf2c1b 100644 --- a/aeron-driver/src/test/c/aeron_udp_channel_test.cpp +++ b/aeron-driver/src/test/c/aeron_udp_channel_test.cpp @@ -413,6 +413,15 @@ TEST_F(UdpChannelTest, shouldHandleTooSmallBuffer) ASSERT_LE(aeron_format_source_identity(buffer, AERON_NETUTIL_FORMATTED_MAX_LENGTH - 1, &addr), 0); } +TEST_F(UdpChannelTest, shouldParseSocketBufferParameters) +{ + const char *uri = "aeron:udp?interface=localhost|endpoint=224.10.9.9:40124|socket-sndbuf=8192|socket-rcvbuf=4096"; + ASSERT_EQ(parse_udp_channel(uri), 0) << aeron_errmsg(); + + ASSERT_EQ(8192u, m_channel->socket_sndbuf); + ASSERT_EQ(4096u, m_channel->socket_rcvbuf); +} + TEST_P(UdpChannelNamesParameterisedTest, shouldBeValid) { const char *endpoint_name = std::get<0>(GetParam()); From daaced16b3b1e8826ccfbbe279136444c5de08f6 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 8 Mar 2021 12:04:23 +1300 Subject: [PATCH 5/7] [C++] Add channel parameters for socket snd/rcv buf to ChannelUriStringBuilder (currently only in the wrapper). --- .../src/main/cpp_wrapper/ChannelUri.h | 2 + .../cpp_wrapper/ChannelUriStringBuilder.h | 37 ++++++ .../src/test/cpp_wrapper/CMakeLists.txt | 1 + .../ChannelUriStringBuilderTest.cpp | 115 ++++++++++++++++++ 4 files changed, 155 insertions(+) create mode 100644 aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp diff --git a/aeron-client/src/main/cpp_wrapper/ChannelUri.h b/aeron-client/src/main/cpp_wrapper/ChannelUri.h index 8afea32a9c..f40ad75ab7 100644 --- a/aeron-client/src/main/cpp_wrapper/ChannelUri.h +++ b/aeron-client/src/main/cpp_wrapper/ChannelUri.h @@ -60,6 +60,8 @@ constexpr const char CONGESTION_CONTROL_PARAM_NAME[] = "cc"; constexpr const char FLOW_CONTROL_PARAM_NAME[] = "fc"; constexpr const char GROUP_TAG_PARAM_NAME[] = "gtag"; constexpr const char SPIES_SIMULATE_CONNECTION_PARAM_NAME[] = "ssc"; +constexpr const char SOCKET_SNDBUF_PARAM_NAME[] = "socket-sndbuf"; +constexpr const char SOCKET_RCVBUF_PARAM_NAME[] = "socket-rcvbuf"; using namespace aeron::util; diff --git a/aeron-client/src/main/cpp_wrapper/ChannelUriStringBuilder.h b/aeron-client/src/main/cpp_wrapper/ChannelUriStringBuilder.h index d55a860d63..b583ea8edf 100644 --- a/aeron-client/src/main/cpp_wrapper/ChannelUriStringBuilder.h +++ b/aeron-client/src/main/cpp_wrapper/ChannelUriStringBuilder.h @@ -62,6 +62,7 @@ class ChannelUriStringBuilder m_group.reset(nullptr); m_rejoin.reset(nullptr); m_ssc.reset(nullptr); + m_isSessionIdTagged = false; return *this; } @@ -295,6 +296,30 @@ class ChannelUriStringBuilder return *this; } + inline this_t &socketSndbufLength(std::uint32_t socketSndbufLength) + { + m_socketSndbufLength.reset(new Value(socketSndbufLength)); + return *this; + } + + inline this_t &socketSndbufLength(std::nullptr_t socketSndbufLength) + { + m_socketSndbufLength.reset(nullptr); + return *this; + } + + inline this_t &socketRcvbufLength(std::uint32_t socketRcvbufLength) + { + m_socketRcvbufLength.reset(new Value(socketRcvbufLength)); + return *this; + } + + inline this_t &socketRcvbufLength(std::nullptr_t) + { + m_socketRcvbufLength.reset(nullptr); + return *this; + } + inline this_t &initialPosition(std::int64_t position, std::int32_t initialTermId, std::int32_t termLength) { if (position < 0 || 0 != (position & (aeron::concurrent::logbuffer::FrameDescriptor::FRAME_ALIGNMENT - 1))) @@ -445,6 +470,16 @@ class ChannelUriStringBuilder sb << SPIES_SIMULATE_CONNECTION_PARAM_NAME << '=' << (m_ssc->value == 1 ? "true" : "false") << '|'; } + if (m_socketSndbufLength) + { + sb << SOCKET_SNDBUF_PARAM_NAME << '=' << m_socketSndbufLength->value << '|'; + } + + if (m_socketRcvbufLength) + { + sb << SOCKET_RCVBUF_PARAM_NAME << '=' << m_socketRcvbufLength->value << '|'; + } + std::string result = sb.str(); const char lastChar = result.back(); @@ -493,6 +528,8 @@ class ChannelUriStringBuilder std::unique_ptr m_group; std::unique_ptr m_rejoin; std::unique_ptr m_ssc; + std::unique_ptr m_socketSndbufLength; + std::unique_ptr m_socketRcvbufLength; bool m_isSessionIdTagged = false; inline static std::string prefixTag(bool isTagged, Value &value) diff --git a/aeron-client/src/test/cpp_wrapper/CMakeLists.txt b/aeron-client/src/test/cpp_wrapper/CMakeLists.txt index 5fc59adb84..1e914ccf79 100644 --- a/aeron-client/src/test/cpp_wrapper/CMakeLists.txt +++ b/aeron-client/src/test/cpp_wrapper/CMakeLists.txt @@ -61,3 +61,4 @@ aeron_client_wrapper_test(livenessTimeoutTest LivenessTimeoutTest.cpp) aeron_client_native_test(livenessTimeoutTest LivenessTimeoutTest.cpp) aeron_client_wrapper_test(exceptionsTest ExceptionsTest.cpp) +aeron_client_wrapper_test(channelUriStringBuilderTest ChannelUriStringBuilderTest.cpp) diff --git a/aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp b/aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp new file mode 100644 index 0000000000..f1a7814d98 --- /dev/null +++ b/aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp @@ -0,0 +1,115 @@ +/* + * Copyright 2014-2021 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gtest/gtest.h" + +#include "ChannelUriStringBuilder.h" + +using namespace aeron; + +TEST(ChannelUriStringBuilderTest, shouldGenerateBasicIpcChannel) +{ + ChannelUriStringBuilder builder; + + builder.media(IPC_MEDIA); + + ASSERT_EQ(builder.build(), "aeron:ipc"); +} + +TEST(ChannelUriStringBuilderTest, shouldGenerateBasicUdpChannel) +{ + ChannelUriStringBuilder builder; + + builder + .media(UDP_MEDIA) + .endpoint("localhost:9999"); + + ASSERT_EQ(builder.build(), "aeron:udp?endpoint=localhost:9999"); +} + +TEST(ChannelUriStringBuilderTest, shouldGenerateBasicUdpChannelSpy) +{ + ChannelUriStringBuilder builder; + + builder + .prefix(SPY_QUALIFIER) + .media(UDP_MEDIA) + .endpoint("localhost:9999"); + + ASSERT_EQ(builder.build(), "aeron-spy:aeron:udp?endpoint=localhost:9999"); +} + +TEST(ChannelUriStringBuilderTest, shouldGenerateComplexUdpChannel) +{ + ChannelUriStringBuilder builder; + + builder + .media(UDP_MEDIA) + .endpoint("localhost:9999") + .ttl(9) + .termLength(1024 * 128); + + ASSERT_EQ(builder.build(), "aeron:udp?endpoint=localhost:9999|term-length=131072|ttl=9"); +} + +TEST(ChannelUriStringBuilderTest, shouldGenerateReplayUdpChannel) +{ + ChannelUriStringBuilder builder; + + builder + .media(UDP_MEDIA) + .endpoint("localhost:9999") + .termLength(1024 * 128) + .initialTermId(777) + .termId(999) + .termOffset(64); + + ASSERT_EQ( + builder.build(), + "aeron:udp?endpoint=localhost:9999|term-length=131072|init-term-id=777|term-id=999|term-offset=64"); +} + +TEST(ChannelUriStringBuilderTest, shouldGenerateInitialPosition) +{ + ChannelUriStringBuilder builder; + + std::uint32_t termLength = 1024 * 128; + std::int64_t position = (termLength * 3) + 64; + + builder + .media(UDP_MEDIA) + .endpoint("localhost:9999") + .initialPosition(position, 777, termLength); + + ASSERT_EQ( + builder.build(), + "aeron:udp?endpoint=localhost:9999|term-length=131072|init-term-id=777|term-id=780|term-offset=64"); +} + +TEST(ChannelUriStringBuilderTest, shouldGenerateSocketSndRcvbufLengths) +{ + ChannelUriStringBuilder builder; + + builder + .media(UDP_MEDIA) + .endpoint("localhost:9999") + .socketSndbufLength(8192) + .socketRcvbufLength(4096); + + ASSERT_EQ( + builder.build(), + "aeron:udp?endpoint=localhost:9999|socket-sndbuf=8192|socket-rcvbuf=4096"); +} \ No newline at end of file From 4826b8c251918d0ef86dc68f4397016c4b78e601 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 8 Mar 2021 12:23:23 +1300 Subject: [PATCH 6/7] [C++] New line --- .../src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp b/aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp index f1a7814d98..c2ec305e35 100644 --- a/aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp +++ b/aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp @@ -112,4 +112,4 @@ TEST(ChannelUriStringBuilderTest, shouldGenerateSocketSndRcvbufLengths) ASSERT_EQ( builder.build(), "aeron:udp?endpoint=localhost:9999|socket-sndbuf=8192|socket-rcvbuf=4096"); -} \ No newline at end of file +} From 269b37dde669439ca616bf1c6b87b08d7c18936a Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Mon, 8 Mar 2021 12:25:14 +1300 Subject: [PATCH 7/7] [Java] New line. --- .../src/test/java/io/aeron/ChannelUriStringBuilderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java index 414f6b753c..69aae9090f 100644 --- a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java +++ b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java @@ -114,4 +114,4 @@ public void shouldGenerateChannelWithSocketParameters() "aeron:udp?endpoint=address:9999|socket-sndbuf=8192|socket-rcvbuf=4096", builder.build()); } -} \ No newline at end of file +}