Skip to content

Commit

Permalink
Reduce locking in secure channel client send/receive codepath (#2531)
Browse files Browse the repository at this point in the history
During high load there were a few portions in the send/receive pipeline identified which may cause lock contention and thread starvation due to locks:

- Remove lock for the codepath when a incoming message is processed.
- Add a concurrent dictionary to allow for the processing of OperationCompleted and to add new requests without locking
- Allow to process normal service requests without locking

Due to the change it is possible to send KeepAlive even when a incoming message is received.
  • Loading branch information
mregen authored Feb 28, 2024
1 parent 22096f4 commit 988be41
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
19 changes: 8 additions & 11 deletions Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,23 +339,20 @@ public virtual bool ChannelFull
/// <inheritdoc/>
public virtual void OnMessageReceived(IMessageSocket source, ArraySegment<byte> message)
{
lock (DataLock)
try
{
try
{
uint messageType = BitConverter.ToUInt32(message.Array, message.Offset);
uint messageType = BitConverter.ToUInt32(message.Array, message.Offset);

if (!HandleIncomingMessage(messageType, message))
{
BufferManager.ReturnBuffer(message.Array, "OnMessageReceived");
}
}
catch (Exception e)
if (!HandleIncomingMessage(messageType, message))
{
HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message.");
BufferManager.ReturnBuffer(message.Array, "OnMessageReceived");
}
}
catch (Exception e)
{
HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message.");
BufferManager.ReturnBuffer(message.Array, "OnMessageReceived");
}
}

#region Incoming Message Support Functions
Expand Down
37 changes: 19 additions & 18 deletions Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Security.Cryptography.X509Certificates;
Expand Down Expand Up @@ -79,7 +80,7 @@ public UaSCUaBinaryClientChannel(
ClientCertificateChain = clientCertificateChain;
}

m_requests = new Dictionary<uint, WriteOperation>();
m_requests = new ConcurrentDictionary<uint, WriteOperation>();
m_lastRequestId = 0;
m_ConnectCallback = new EventHandler<IMessageSocketAsyncEventArgs>(OnConnectComplete);
m_startHandshake = new TimerCallback(OnScheduledHandshake);
Expand Down Expand Up @@ -767,17 +768,17 @@ protected override void HandleWriteComplete(BufferCollection buffers, object sta
/// <returns>True if the function takes ownership of the buffer.</returns>
protected override bool HandleIncomingMessage(uint messageType, ArraySegment<byte> messageChunk)
{
lock (DataLock)
// process a response.
if (TcpMessageType.IsType(messageType, TcpMessageType.Message))
{
// process a response.
if (TcpMessageType.IsType(messageType, TcpMessageType.Message))
{
//Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId);
return ProcessResponseMessage(messageType, messageChunk);
}
//Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId);
return ProcessResponseMessage(messageType, messageChunk);
}

lock (DataLock)
{
// check for acknowledge.
else if (messageType == TcpMessageType.Acknowledge)
if (messageType == TcpMessageType.Acknowledge)
{
//Utils.LogTrace("ChannelId {0}: ProcessAcknowledgeMessage", ChannelId);
return ProcessAcknowledgeMessage(messageChunk);
Expand Down Expand Up @@ -1234,7 +1235,10 @@ private WriteOperation BeginOperation(int timeout, AsyncCallback callback, objec
{
WriteOperation operation = new WriteOperation(timeout, callback, state);
operation.RequestId = Utils.IncrementIdentifier(ref m_lastRequestId);
m_requests.Add(operation.RequestId, operation);
if (!m_requests.TryAdd(operation.RequestId, operation))
{
throw new ServiceResultException(StatusCodes.BadUnexpectedError, "Could not add operation to list of pending operations.");
}
return operation;
}

Expand All @@ -1248,15 +1252,12 @@ private void OperationCompleted(WriteOperation operation)
return;
}

lock (DataLock)
if (m_handshakeOperation == operation)
{
if (m_handshakeOperation == operation)
{
m_handshakeOperation = null;
}

m_requests.Remove(operation.RequestId);
m_handshakeOperation = null;
}

m_requests.TryRemove(operation.RequestId, out _);
}

/// <summary>
Expand Down Expand Up @@ -1542,7 +1543,7 @@ private bool ProcessResponseMessage(uint messageType, ArraySegment<byte> message
private Uri m_url;
private Uri m_via;
private long m_lastRequestId;
private Dictionary<uint, WriteOperation> m_requests;
private ConcurrentDictionary<uint, WriteOperation> m_requests;
private WriteOperation m_handshakeOperation;
private ChannelToken m_requestedToken;
private Timer m_handshakeTimer;
Expand Down
1 change: 0 additions & 1 deletion Tests/Opc.Ua.Gds.Tests/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,6 @@ private int GoodServersOnNetworkCount()
#region Private Fields
private const int kGoodApplicationsTestCount = 10;
private const int kInvalidApplicationsTestCount = 10;
private const int kRandomStart = 1;
private ApplicationTestDataGenerator m_appTestDataGenerator;
private GlobalDiscoveryTestServer m_server;
private GlobalDiscoveryTestClient m_gdsClient;
Expand Down
7 changes: 5 additions & 2 deletions Tests/Opc.Ua.Gds.Tests/PushTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ public void UpdateCertificateCASigned()
byte[] privateKey = null;
byte[] certificate = null;
byte[][] issuerCertificates = null;

Thread.Sleep(1000);

DateTime now = DateTime.UtcNow;
do
{
Expand Down Expand Up @@ -868,11 +871,11 @@ private async Task CreateCATestCerts(string tempStorePath)
Assert.IsTrue(EraseStore(tempStorePath));

string subjectName = "CN=CA Test Cert, O=OPC Foundation";
X509Certificate2 newCACert = CertificateFactory.CreateCertificate(
X509Certificate2 newCACert = await CertificateFactory.CreateCertificate(
null, null, subjectName, null)
.SetCAConstraint()
.CreateForRSA()
.AddToStore(CertificateStoreType.Directory, tempStorePath);
.AddToStoreAsync(CertificateStoreType.Directory, tempStorePath).ConfigureAwait(false);

m_caCert = newCACert;

Expand Down

0 comments on commit 988be41

Please sign in to comment.