From 988be41a87c4a3d7b865e8314d33409eca347d63 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Wed, 28 Feb 2024 09:00:07 +0100 Subject: [PATCH] Reduce locking in secure channel client send/receive codepath (#2531) During high load there were a few portions in the send/receive pipeline identified which may cause lock contention and thread starvation due to locks: - Remove lock for the codepath when a incoming message is processed. - Add a concurrent dictionary to allow for the processing of OperationCompleted and to add new requests without locking - Allow to process normal service requests without locking Due to the change it is possible to send KeepAlive even when a incoming message is received. --- .../Stack/Tcp/UaSCBinaryChannel.cs | 19 ++++------ .../Stack/Tcp/UaSCBinaryClientChannel.cs | 37 ++++++++++--------- Tests/Opc.Ua.Gds.Tests/ClientTest.cs | 1 - Tests/Opc.Ua.Gds.Tests/PushTest.cs | 7 +++- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs index 9423d0ab4..d60c80382 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs @@ -339,23 +339,20 @@ public virtual bool ChannelFull /// public virtual void OnMessageReceived(IMessageSocket source, ArraySegment message) { - lock (DataLock) + try { - try - { - uint messageType = BitConverter.ToUInt32(message.Array, message.Offset); + uint messageType = BitConverter.ToUInt32(message.Array, message.Offset); - if (!HandleIncomingMessage(messageType, message)) - { - BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); - } - } - catch (Exception e) + if (!HandleIncomingMessage(messageType, message)) { - HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message."); BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); } } + catch (Exception e) + { + HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message."); + BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); + } } #region Incoming Message Support Functions diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs index f151b19cb..59a746d90 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs @@ -11,6 +11,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. */ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Security.Cryptography.X509Certificates; @@ -79,7 +80,7 @@ public UaSCUaBinaryClientChannel( ClientCertificateChain = clientCertificateChain; } - m_requests = new Dictionary(); + m_requests = new ConcurrentDictionary(); m_lastRequestId = 0; m_ConnectCallback = new EventHandler(OnConnectComplete); m_startHandshake = new TimerCallback(OnScheduledHandshake); @@ -767,17 +768,17 @@ protected override void HandleWriteComplete(BufferCollection buffers, object sta /// True if the function takes ownership of the buffer. protected override bool HandleIncomingMessage(uint messageType, ArraySegment messageChunk) { - lock (DataLock) + // process a response. + if (TcpMessageType.IsType(messageType, TcpMessageType.Message)) { - // process a response. - if (TcpMessageType.IsType(messageType, TcpMessageType.Message)) - { - //Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId); - return ProcessResponseMessage(messageType, messageChunk); - } + //Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId); + return ProcessResponseMessage(messageType, messageChunk); + } + lock (DataLock) + { // check for acknowledge. - else if (messageType == TcpMessageType.Acknowledge) + if (messageType == TcpMessageType.Acknowledge) { //Utils.LogTrace("ChannelId {0}: ProcessAcknowledgeMessage", ChannelId); return ProcessAcknowledgeMessage(messageChunk); @@ -1234,7 +1235,10 @@ private WriteOperation BeginOperation(int timeout, AsyncCallback callback, objec { WriteOperation operation = new WriteOperation(timeout, callback, state); operation.RequestId = Utils.IncrementIdentifier(ref m_lastRequestId); - m_requests.Add(operation.RequestId, operation); + if (!m_requests.TryAdd(operation.RequestId, operation)) + { + throw new ServiceResultException(StatusCodes.BadUnexpectedError, "Could not add operation to list of pending operations."); + } return operation; } @@ -1248,15 +1252,12 @@ private void OperationCompleted(WriteOperation operation) return; } - lock (DataLock) + if (m_handshakeOperation == operation) { - if (m_handshakeOperation == operation) - { - m_handshakeOperation = null; - } - - m_requests.Remove(operation.RequestId); + m_handshakeOperation = null; } + + m_requests.TryRemove(operation.RequestId, out _); } /// @@ -1542,7 +1543,7 @@ private bool ProcessResponseMessage(uint messageType, ArraySegment message private Uri m_url; private Uri m_via; private long m_lastRequestId; - private Dictionary m_requests; + private ConcurrentDictionary m_requests; private WriteOperation m_handshakeOperation; private ChannelToken m_requestedToken; private Timer m_handshakeTimer; diff --git a/Tests/Opc.Ua.Gds.Tests/ClientTest.cs b/Tests/Opc.Ua.Gds.Tests/ClientTest.cs index 2b7034f43..334b60fbb 100644 --- a/Tests/Opc.Ua.Gds.Tests/ClientTest.cs +++ b/Tests/Opc.Ua.Gds.Tests/ClientTest.cs @@ -1430,7 +1430,6 @@ private int GoodServersOnNetworkCount() #region Private Fields private const int kGoodApplicationsTestCount = 10; private const int kInvalidApplicationsTestCount = 10; - private const int kRandomStart = 1; private ApplicationTestDataGenerator m_appTestDataGenerator; private GlobalDiscoveryTestServer m_server; private GlobalDiscoveryTestClient m_gdsClient; diff --git a/Tests/Opc.Ua.Gds.Tests/PushTest.cs b/Tests/Opc.Ua.Gds.Tests/PushTest.cs index 8b27723c4..c0d61ca19 100644 --- a/Tests/Opc.Ua.Gds.Tests/PushTest.cs +++ b/Tests/Opc.Ua.Gds.Tests/PushTest.cs @@ -407,6 +407,9 @@ public void UpdateCertificateCASigned() byte[] privateKey = null; byte[] certificate = null; byte[][] issuerCertificates = null; + + Thread.Sleep(1000); + DateTime now = DateTime.UtcNow; do { @@ -868,11 +871,11 @@ private async Task CreateCATestCerts(string tempStorePath) Assert.IsTrue(EraseStore(tempStorePath)); string subjectName = "CN=CA Test Cert, O=OPC Foundation"; - X509Certificate2 newCACert = CertificateFactory.CreateCertificate( + X509Certificate2 newCACert = await CertificateFactory.CreateCertificate( null, null, subjectName, null) .SetCAConstraint() .CreateForRSA() - .AddToStore(CertificateStoreType.Directory, tempStorePath); + .AddToStoreAsync(CertificateStoreType.Directory, tempStorePath).ConfigureAwait(false); m_caCert = newCACert;