Skip to content

Commit

Permalink
UdpClient with span support (#53429)
Browse files Browse the repository at this point in the history
Add API from #864
  • Loading branch information
lateapexearlyspeed authored Jun 24, 2021
1 parent 1773f16 commit b317d06
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 70 deletions.
7 changes: 7 additions & 0 deletions src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,19 @@ public void JoinMulticastGroup(System.Net.IPAddress multicastAddr, int timeToLiv
public void JoinMulticastGroup(System.Net.IPAddress multicastAddr, System.Net.IPAddress localAddress) { }
public byte[] Receive([System.Diagnostics.CodeAnalysis.NotNullAttribute] ref System.Net.IPEndPoint? remoteEP) { throw null; }
public System.Threading.Tasks.Task<System.Net.Sockets.UdpReceiveResult> ReceiveAsync() { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Sockets.UdpReceiveResult> ReceiveAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
public int Send(byte[] dgram, int bytes) { throw null; }
public int Send(System.ReadOnlySpan<byte> datagram) {throw null; }
public int Send(byte[] dgram, int bytes, System.Net.IPEndPoint? endPoint) { throw null; }
public int Send(System.ReadOnlySpan<byte> datagram, System.Net.IPEndPoint? endPoint) { throw null; }
public int Send(byte[] dgram, int bytes, string? hostname, int port) { throw null; }
public int Send(System.ReadOnlySpan<byte> datagram, string? hostname, int port) { throw null; }
public System.Threading.Tasks.Task<int> SendAsync(byte[] datagram, int bytes) { throw null; }
public System.Threading.Tasks.ValueTask<int> SendAsync(System.ReadOnlyMemory<byte> datagram, System.Threading.CancellationToken cancellationToken = default) { throw null; }
public System.Threading.Tasks.Task<int> SendAsync(byte[] datagram, int bytes, System.Net.IPEndPoint? endPoint) { throw null; }
public System.Threading.Tasks.ValueTask<int> SendAsync(System.ReadOnlyMemory<byte> datagram, System.Net.IPEndPoint? endPoint, System.Threading.CancellationToken cancellationToken = default) { throw null; }
public System.Threading.Tasks.Task<int> SendAsync(byte[] datagram, int bytes, string? hostname, int port) { throw null; }
public System.Threading.Tasks.ValueTask<int> SendAsync(System.ReadOnlyMemory<byte> datagram, string? hostname, int port, System.Threading.CancellationToken cancellationToken = default) { throw null; }
}
public partial struct UdpReceiveResult : System.IEquatable<System.Net.Sockets.UdpReceiveResult>
{
Expand Down
188 changes: 163 additions & 25 deletions src/libraries/System.Net.Sockets/src/System/Net/Sockets/UDPClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using System.Runtime.Versioning;
using System.Threading;

namespace System.Net.Sockets
{
Expand Down Expand Up @@ -600,9 +601,46 @@ public void DropMulticastGroup(IPAddress multicastAddr, int ifindex)
public Task<int> SendAsync(byte[] datagram, int bytes) =>
SendAsync(datagram, bytes, null);

/// <summary>
/// Sends a UDP datagram asynchronously to a remote host.
/// </summary>
/// <param name="datagram">
/// An <see cref="ReadOnlyMemory{T}"/> of Type <see cref="byte"/> that specifies the UDP datagram that you intend to send.
/// </param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is None.
/// </param>
/// <returns>A <see cref="ValueTask{T}"/> that represents the asynchronous send operation. The value of its Result property contains the number of bytes sent.</returns>
/// <exception cref="ObjectDisposedException">The <see cref="UdpClient"/> is closed.</exception>
/// <exception cref="SocketException">An error occurred when accessing the socket.</exception>
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> datagram, CancellationToken cancellationToken = default) =>
SendAsync(datagram, null, cancellationToken);

public Task<int> SendAsync(byte[] datagram, int bytes, string? hostname, int port) =>
SendAsync(datagram, bytes, GetEndpoint(hostname, port));

/// <summary>
/// Sends a UDP datagram asynchronously to a remote host.
/// </summary>
/// <param name="datagram">
/// An <see cref="ReadOnlyMemory{T}"/> of Type <see cref="byte"/> that specifies the UDP datagram that you intend to send.
/// </param>
/// <param name="hostname">
/// The name of the remote host to which you intend to send the datagram.
/// </param>
/// <param name="port">
/// The remote port number with which you intend to communicate.
/// </param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is None.
/// </param>
/// <returns>A <see cref="ValueTask{T}"/> that represents the asynchronous send operation. The value of its Result property contains the number of bytes sent.</returns>
/// <exception cref="InvalidOperationException">The <see cref="UdpClient"/> has already established a default remote host.</exception>
/// <exception cref="ObjectDisposedException">The <see cref="UdpClient"/> is closed.</exception>
/// <exception cref="SocketException">An error occurred when accessing the socket.</exception>
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> datagram, string? hostname, int port, CancellationToken cancellationToken = default) =>
SendAsync(datagram, GetEndpoint(hostname, port), cancellationToken);

public Task<int> SendAsync(byte[] datagram, int bytes, IPEndPoint? endPoint)
{
ValidateDatagram(datagram, bytes, endPoint);
Expand All @@ -618,6 +656,39 @@ public Task<int> SendAsync(byte[] datagram, int bytes, IPEndPoint? endPoint)
}
}

/// <summary>
/// Sends a UDP datagram asynchronously to a remote host.
/// </summary>
/// <param name="datagram">
/// An <see cref="ReadOnlyMemory{T}"/> of Type <see cref="byte"/> that specifies the UDP datagram that you intend to send.
/// </param>
/// <param name="endPoint">
/// An <see cref="IPEndPoint"/> that represents the host and port to which to send the datagram.
/// </param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is None.
/// </param>
/// <returns>A <see cref="ValueTask{T}"/> that represents the asynchronous send operation. The value of its Result property contains the number of bytes sent.</returns>
/// <exception cref="InvalidOperationException"><see cref="UdpClient"/> has already established a default remote host and <paramref name="endPoint"/> is not <see langword="null"/>.</exception>
/// <exception cref="ObjectDisposedException">The <see cref="UdpClient"/> is closed.</exception>
/// <exception cref="SocketException">An error occurred when accessing the socket.</exception>
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> datagram, IPEndPoint? endPoint, CancellationToken cancellationToken = default)
{
ThrowIfDisposed();

if (endPoint is null)
{
return _clientSocket.SendAsync(datagram, SocketFlags.None, cancellationToken);
}
if (_active)
{
// Do not allow sending packets to arbitrary host when connected.
throw new InvalidOperationException(SR.net_udpconnected);
}
CheckForBroadcast(endPoint.Address);
return _clientSocket.SendToAsync(datagram, SocketFlags.None, endPoint, cancellationToken);
}

public Task<UdpReceiveResult> ReceiveAsync()
{
ThrowIfDisposed();
Expand All @@ -639,6 +710,36 @@ async Task<UdpReceiveResult> WaitAndWrap(Task<SocketReceiveFromResult> task)
}
}

/// <summary>
/// Returns a UDP datagram asynchronously that was sent by a remote host.
/// </summary>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests.
/// </param>
/// <returns>A <see cref="ValueTask{TResult}"/> representing the asynchronous operation.</returns>
/// <exception cref="ObjectDisposedException">The underlying <see cref="Socket"/> has been closed.</exception>
/// <exception cref="SocketException">An error occurred when accessing the socket.</exception>
public ValueTask<UdpReceiveResult> ReceiveAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();

return WaitAndWrap(_clientSocket.ReceiveFromAsync(
_buffer,
SocketFlags.None,
_family == AddressFamily.InterNetwork ? IPEndPointStatics.Any : IPEndPointStatics.IPv6Any, cancellationToken));

async ValueTask<UdpReceiveResult> WaitAndWrap(ValueTask<SocketReceiveFromResult> task)
{
SocketReceiveFromResult result = await task.ConfigureAwait(false);

byte[] buffer = result.ReceivedBytes < MaxUDPSize ?
_buffer.AsSpan(0, result.ReceivedBytes).ToArray() :
_buffer;

return new UdpReceiveResult(buffer, (IPEndPoint)result.RemoteEndPoint);
}
}

private void CreateClientSocket()
{
// Common initialization code.
Expand Down Expand Up @@ -892,45 +993,59 @@ public int Send(byte[] dgram, int bytes, IPEndPoint? endPoint)
return Client.SendTo(dgram, 0, bytes, SocketFlags.None, endPoint);
}


// Sends a UDP datagram to the specified port on the specified remote host.
public int Send(byte[] dgram, int bytes, string? hostname, int port)
/// <summary>
/// Sends a UDP datagram to the host at the specified remote endpoint.
/// </summary>
/// <param name="datagram">
/// An <see cref="ReadOnlySpan{T}"/> of Type <see cref="byte"/> that specifies the UDP datagram that you intend to send.
/// </param>
/// <param name="endPoint">
/// An <see cref="IPEndPoint"/> that represents the host and port to which to send the datagram.
/// </param>
/// <returns>The number of bytes sent.</returns>
/// <exception cref="InvalidOperationException"><see cref="UdpClient"/> has already established a default remote host and <paramref name="endPoint"/> is not <see langword="null"/>.</exception>
/// <exception cref="ObjectDisposedException"><see cref="UdpClient"/> is closed.</exception>
/// <exception cref="SocketException">An error occurred when accessing the socket.</exception>
public int Send(ReadOnlySpan<byte> datagram, IPEndPoint? endPoint)
{
ThrowIfDisposed();

if (dgram == null)
{
throw new ArgumentNullException(nameof(dgram));
}
if (_active && ((hostname != null) || (port != 0)))
if (_active && endPoint != null)
{
// Do not allow sending packets to arbitrary host when connected
throw new InvalidOperationException(SR.net_udpconnected);
}

if (hostname == null || port == 0)
{
return Client.Send(dgram, 0, bytes, SocketFlags.None);
}

IPAddress[] addresses = Dns.GetHostAddresses(hostname);

int i = 0;
for (; i < addresses.Length && !IsAddressFamilyCompatible(addresses[i].AddressFamily); i++)
if (endPoint == null)
{
; // just count the addresses
return Client.Send(datagram, SocketFlags.None);
}

if (addresses.Length == 0 || i == addresses.Length)
{
throw new ArgumentException(SR.net_invalidAddressList, nameof(hostname));
}
CheckForBroadcast(endPoint.Address);

CheckForBroadcast(addresses[i]);
IPEndPoint ipEndPoint = new IPEndPoint(addresses[i], port);
return Client.SendTo(dgram, 0, bytes, SocketFlags.None, ipEndPoint);
return Client.SendTo(datagram, SocketFlags.None, endPoint);
}

// Sends a UDP datagram to the specified port on the specified remote host.
public int Send(byte[] dgram, int bytes, string? hostname, int port) => Send(dgram, bytes, GetEndpoint(hostname, port));

/// <summary>
/// Sends a UDP datagram to a specified port on a specified remote host.
/// </summary>
/// <param name="datagram">
/// An <see cref="ReadOnlySpan{T}"/> of Type <see cref="byte"/> that specifies the UDP datagram that you intend to send.
/// </param>
/// <param name="hostname">
/// The name of the remote host to which you intend to send the datagram.
/// </param>
/// <param name="port">
/// The remote port number with which you intend to communicate.
/// </param>
/// <returns>The number of bytes sent.</returns>
/// <exception cref="InvalidOperationException">The <see cref="UdpClient"/> has already established a default remote host.</exception>
/// <exception cref="ObjectDisposedException">The <see cref="UdpClient"/> is closed.</exception>
/// <exception cref="SocketException">An error occurred when accessing the socket.</exception>
public int Send(ReadOnlySpan<byte> datagram, string? hostname, int port) => Send(datagram, GetEndpoint(hostname, port));

// Sends a UDP datagram to a remote host.
public int Send(byte[] dgram, int bytes)
Expand All @@ -950,6 +1065,29 @@ public int Send(byte[] dgram, int bytes)
return Client.Send(dgram, 0, bytes, SocketFlags.None);
}

/// <summary>
/// Sends a UDP datagram to a remote host.
/// </summary>
/// <param name="datagram">
/// An <see cref="ReadOnlySpan{T}"/> of Type <see cref="byte"/> that specifies the UDP datagram that you intend to send.
/// </param>
/// <returns>The number of bytes sent.</returns>
/// <exception cref="InvalidOperationException">The <see cref="UdpClient"/> has not established a default remote host.</exception>
/// <exception cref="ObjectDisposedException">The <see cref="UdpClient"/> is closed.</exception>
/// <exception cref="SocketException">An error occurred when accessing the socket.</exception>
public int Send(ReadOnlySpan<byte> datagram)
{
ThrowIfDisposed();

if (!_active)
{
// only allowed on connected socket
throw new InvalidOperationException(SR.net_notconnected);
}

return Client.Send(datagram, SocketFlags.None);
}

private void ThrowIfDisposed()
{
if (_disposed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace System.Net.Sockets
{
/// <summary>
/// Presents UDP receive result information from a call to the <see cref="UdpClient.ReceiveAsync"/> method
/// Presents UDP receive result information from a call to the <see cref="UdpClient.ReceiveAsync()"/> and <see cref="UdpClient.ReceiveAsync(System.Threading.CancellationToken)"/> method
/// </summary>
public struct UdpReceiveResult : IEquatable<UdpReceiveResult>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ public sealed class SendReceiveUdpClient : MemberDatas
{
[OuterLoop]
[Theory]
[MemberData(nameof(Loopbacks))]
public async Task SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress loopbackAddress)
[MemberData(nameof(LoopbacksAndUseMemory))]
public async Task SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress loopbackAddress, bool useMemoryOverload)
{
IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress;

Expand Down Expand Up @@ -66,7 +66,7 @@ public async Task SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress loopbackA
random.NextBytes(sendBuffer);
sendBuffer[0] = (byte)sentDatagrams;

int sent = await right.SendAsync(sendBuffer, DatagramSize, leftEndpoint);
int sent = useMemoryOverload ? await right.SendAsync(new ReadOnlyMemory<byte>(sendBuffer), leftEndpoint) : await right.SendAsync(sendBuffer, DatagramSize, leftEndpoint);

Assert.True(receiverAck.Wait(AckTimeout));
receiverAck.Reset();
Expand All @@ -85,5 +85,13 @@ public async Task SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress loopbackA
}
}
}

public static readonly object[][] LoopbacksAndUseMemory = new object[][]
{
new object[] { IPAddress.IPv6Loopback, true },
new object[] { IPAddress.IPv6Loopback, false },
new object[] { IPAddress.Loopback, true },
new object[] { IPAddress.Loopback, false },
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), asyn
await new SendReceive_Apm(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false);
await new SendReceive_Apm(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false);

await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback).ConfigureAwait(false);
await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback).ConfigureAwait(false);
await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback, false).ConfigureAwait(false);
await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback, false).ConfigureAwait(false);

await new NetworkStreamTest().CopyToAsync_AllDataCopied(4096, true).ConfigureAwait(false);
await new NetworkStreamTest().Timeout_Roundtrips().ConfigureAwait(false);
Expand Down
Loading

0 comments on commit b317d06

Please sign in to comment.