From 10d4dbcaa959ef370533097e60ea9e0815f3d54a Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Wed, 29 Jun 2022 06:41:59 +0200 Subject: [PATCH] Improve ResendData server support (#1854) * add resend data support to sample node manager * After WG discussion, ensure after ResendData only a single value per monitored item is sent in the next publish (sync client cache) * Seperate resend data state from ready to publish with IsResendData property --- .../DataChangeMonitoredItem.cs | 71 ++++-- .../SampleNodeManager/SampleNodeManager.cs | 4 +- .../ServerControls.Net4/ServerForm.cs | 2 +- .../UA Server Controls.csproj | 8 +- .../Subscription/IMonitoredItem.cs | 7 +- .../Subscription/MonitoredItem.cs | 144 +++++------- .../Subscription/Subscription.cs | 16 +- Tests/Opc.Ua.Client.Tests/ClientTest.cs | 2 +- .../Opc.Ua.Server.Tests/CommonTestWorkers.cs | 50 ++++- .../ReferenceServerTest.cs | 209 +++++++++++++----- 10 files changed, 326 insertions(+), 187 deletions(-) diff --git a/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs b/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs index e17a44260c..b91b4a916b 100644 --- a/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs +++ b/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs @@ -29,6 +29,7 @@ using System; using System.Collections.Generic; +using System.Text; using Opc.Ua.Server; namespace Opc.Ua.Sample @@ -68,6 +69,7 @@ public DataChangeMonitoredItem( m_nextSampleTime = DateTime.UtcNow.Ticks; m_readyToPublish = false; m_readyToTrigger = false; + m_resendData = false; m_alwaysReportUpdates = alwaysReportUpdates; } @@ -104,6 +106,7 @@ public DataChangeMonitoredItem( m_nextSampleTime = DateTime.UtcNow.Ticks; m_readyToPublish = false; m_readyToTrigger = false; + m_resendData = false; m_queue = null; m_filter = filter; m_range = 0; @@ -465,11 +468,18 @@ public bool IsReadyToTrigger } } - public void SetupResendDataTrigger() + /// + public bool IsResendData { - // Does nothing since this type of Monitored Item does not support Resend data functionality + get + { + lock (m_lock) + { + return m_resendData; + } + } } - + /// /// Returns the results for the create request. /// @@ -516,12 +526,22 @@ public ServiceResult GetModifyResult(out MonitoredItemModifyResult result) return ServiceResult.Good; } } + + /// + public void SetupResendDataTrigger() + { + lock (m_lock) + { + if (m_monitoringMode == MonitoringMode.Reporting) + { + m_resendData = true; + } + } + } #endregion #region IDataChangeMonitoredItem Members - /// - /// Queues a new data change. - /// + /// public void QueueValue(DataValue value, ServiceResult error) { QueueValue(value, error, false); @@ -529,15 +549,13 @@ public void QueueValue(DataValue value, ServiceResult error) #endregion #region IDataChangeMonitoredItem2 Members - /// - /// Queues a new data change. - /// + /// public void QueueValue(DataValue value, ServiceResult error, bool ignoreFilters) { lock (m_lock) { // check if value has changed. - if (!m_alwaysReportUpdates) + if (!m_alwaysReportUpdates && !ignoreFilters) { if (!Opc.Ua.Server.MonitoredItem.ValueChanged(value, error, m_lastValue, m_lastError, m_filter, m_range)) { @@ -686,22 +704,23 @@ public bool Publish(OperationContext context, Queue n // check if not ready to publish. if (!IsReadyToPublish) { - return false; + if (!m_resendData) + { + return false; + } + } + else + { + // update sample time. + IncrementSampleTime(); } - // update sample time. - IncrementSampleTime(); - // update publish flag. m_readyToPublish = false; m_readyToTrigger = false; // check if queuing is enabled. - if (m_queue == null) - { - Publish(context, m_lastValue, m_lastError, notifications, diagnostics); - } - else + if (m_queue != null && (!m_resendData || m_queue.ItemsInQueue != 0)) { DataValue value = null; ServiceResult error = null; @@ -709,8 +728,21 @@ public bool Publish(OperationContext context, Queue n while (m_queue.Publish(out value, out error)) { Publish(context, value, error, notifications, diagnostics); + + if (m_resendData) + { + m_readyToPublish = m_queue.ItemsInQueue > 0; + break; + } } } + else + { + Publish(context, m_lastValue, m_lastError, notifications, diagnostics); + } + + // update flags + m_resendData = false; return true; } @@ -828,6 +860,7 @@ private void Publish( private bool m_alwaysReportUpdates; private bool m_semanticsChanged; private bool m_structureChanged; + private bool m_resendData; #endregion } } diff --git a/Applications/Quickstarts.Servers/SampleNodeManager/SampleNodeManager.cs b/Applications/Quickstarts.Servers/SampleNodeManager/SampleNodeManager.cs index 93c4b49068..c7721dcc13 100644 --- a/Applications/Quickstarts.Servers/SampleNodeManager/SampleNodeManager.cs +++ b/Applications/Quickstarts.Servers/SampleNodeManager/SampleNodeManager.cs @@ -2970,7 +2970,6 @@ public virtual void TransferMonitoredItems( { monitoredItems[ii].SetupResendDataTrigger(); } - errors[ii] = StatusCodes.Good; } } @@ -2989,10 +2988,9 @@ protected virtual void OnMonitoredItemsTransferred( IList monitoredItems ) { - // does nothing. + // overridden by the sub-class. } - /// /// Changes the monitoring mode for a set of monitored items. /// diff --git a/Applications/ServerControls.Net4/ServerForm.cs b/Applications/ServerControls.Net4/ServerForm.cs index e3d17aee20..f9e3ca1375 100644 --- a/Applications/ServerControls.Net4/ServerForm.cs +++ b/Applications/ServerControls.Net4/ServerForm.cs @@ -59,7 +59,7 @@ public ServerForm() /// /// Creates a form which displays the status for a UA server. /// - public ServerForm(StandardServer server, ApplicationConfiguration configuration, bool showCertificateValidationDialog = false) + public ServerForm(StandardServer server, ApplicationConfiguration configuration, bool showCertificateValidationDialog = true) { InitializeComponent(); diff --git a/Applications/ServerControls.Net4/UA Server Controls.csproj b/Applications/ServerControls.Net4/UA Server Controls.csproj index 2bb65e01f3..74adb2ea0d 100644 --- a/Applications/ServerControls.Net4/UA Server Controls.csproj +++ b/Applications/ServerControls.Net4/UA Server Controls.csproj @@ -60,18 +60,12 @@ - - 3.5 - True - + 3.5 - - 3.5 - diff --git a/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs index e0b7218017..6f87302948 100644 --- a/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs @@ -97,7 +97,12 @@ public interface IMonitoredItem bool IsReadyToTrigger { get; set; } /// - /// Setup the resend data trigger by setting the monitor item in ResendData state + /// Gets a value indicating whether the monitored item is resending data. + /// + bool IsResendData { get; } + + /// + /// Set the resend data trigger flag. /// void SetupResendDataTrigger(); diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index 311a85a330..84937013d9 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -35,18 +35,6 @@ namespace Opc.Ua.Server { - /// - /// Specifies the values of the ResendData state for a monitored item - /// The state is set on the ResendData method call - /// - internal enum ResendDataState : int - { - ///The Monitored item does not participate in the ResendData - NonResendData = 0, - ///The Monitored item participates in ResendData - ResendData = 1 - } - /// /// A handle that describes how to access a node/attribute via an i/o manager. /// @@ -207,7 +195,7 @@ private void Initialize() m_readyToTrigger = false; m_sourceSamplingInterval = 0; m_samplingError = ServiceResult.Good; - m_resendDataState = (int)ResendDataState.NonResendData; + m_resendData = false; } #endregion @@ -252,12 +240,8 @@ public bool IsReadyToPublish } } - bool isResendData = (int)ResendDataState.ResendData == Interlocked.CompareExchange(ref m_resendDataState, - (int)ResendDataState.ResendData, - (int)ResendDataState.ResendData); - // check if not ready to publish in case it doesn't ResendData - if (!m_readyToPublish && (!isResendData)) + if (!m_readyToPublish) { ServerUtils.EventLog.MonitoredItemReady(m_id, "FALSE"); return false; @@ -282,7 +266,7 @@ public bool IsReadyToPublish // re-queue if too little time has passed since the last publish, in case it doesn't ResendData long now = HiResClock.TickCount64; - if ((m_nextSamplingTime > now) && !isResendData) + if (m_nextSamplingTime > now) { ServerUtils.EventLog.MonitoredItemReady(m_id, Utils.Format("FALSE {0}ms", m_nextSamplingTime - now)); return false; @@ -321,12 +305,29 @@ public bool IsReadyToTrigger } } - /// - /// Setup the resend data trigger by setting the monitor item in ResendData state - /// + /// + public bool IsResendData + { + get + { + lock (m_lock) + { + return m_resendData; + } + } + } + + /// public void SetupResendDataTrigger() { - Interlocked.Exchange(ref m_resendDataState, (int)ResendDataState.ResendData); + lock (m_lock) + { + if (m_monitoringMode == MonitoringMode.Reporting && + (m_typeMask & MonitoredItemTypeMask.DataChange) != 0) + { + m_resendData = true; + } + } } /// @@ -1250,90 +1251,65 @@ public virtual bool Publish( return false; } - // only publish if reporting or resending data - ResendDataState selectedToResendData = (ResendDataState)Interlocked.CompareExchange(ref m_resendDataState, (int)ResendDataState.NonResendData, (int)ResendDataState.ResendData); - bool isResendData = selectedToResendData == ResendDataState.ResendData; - - if (!IsReadyToPublish && !isResendData) + if (!IsReadyToPublish) { - return false; + if (!m_resendData) + { + return false; + } } - - // pull any unprocessed data. - if (m_calculator != null) + else { - if (m_calculator.HasEndTimePassed(DateTime.UtcNow)) + // pull any unprocessed data. + if (m_calculator != null) { - DataValue processedValue = m_calculator.GetProcessedValue(false); - - while (processedValue != null) + if (m_calculator.HasEndTimePassed(DateTime.UtcNow)) { + DataValue processedValue = m_calculator.GetProcessedValue(false); + + while (processedValue != null) + { + AddValueToQueue(processedValue, null); + } + + processedValue = m_calculator.GetProcessedValue(true); AddValueToQueue(processedValue, null); } - - processedValue = m_calculator.GetProcessedValue(true); - AddValueToQueue(processedValue, null); } + + IncrementSampleTime(); } - // go to the next sampling interval. - IncrementSampleTime(); + m_readyToPublish = false; - if (isResendData) + // check if queueing enabled. + if (m_queue != null && (!m_resendData || m_queue.ItemsInQueue != 0)) { - // check if queueing enabled. - if (m_queue != null && m_queue.ItemsInQueue != 0) - { - DataValue value = null; - ServiceResult error = null; + DataValue value = null; + ServiceResult error = null; - if (m_queue.ItemsInQueue > 1) - { - // pop the first value - m_queue.Publish(out value, out error); - // publish the next - m_queue.Publish(out value, out error); - Publish(context, notifications, diagnostics, value, error); - } - else // m_queue.ItemsInQueue == 1 + while (m_queue.Publish(out value, out error)) + { + Publish(context, notifications, diagnostics, value, error); + if (m_resendData) { - m_queue.Publish(out value, out error); - Publish(context, notifications, diagnostics, value, error); + m_readyToPublish = m_queue.ItemsInQueue > 0; + break; } } - // publish last value if no queuing or no items are queued - else - { - ServerUtils.EventLog.DequeueValue(m_lastValue.WrappedValue, m_lastValue.StatusCode); - Publish(context, notifications, diagnostics, m_lastValue, m_lastError); - } } + + // publish last value if no queuing or no items are queued else { - // check if queueing enabled. - if (m_queue != null) - { - DataValue value = null; - ServiceResult error = null; - - while (m_queue.Publish(out value, out error)) - { - Publish(context, notifications, diagnostics, value, error); - } - } - - // publish last value if no queuing or no items are queued - else - { - ServerUtils.EventLog.DequeueValue(m_lastValue.WrappedValue, m_lastValue.StatusCode); - Publish(context, notifications, diagnostics, m_lastValue, m_lastError); - } + ServerUtils.EventLog.DequeueValue(m_lastValue.WrappedValue, m_lastValue.StatusCode); + Publish(context, notifications, diagnostics, m_lastValue, m_lastError); } // reset state variables. m_overflow = false; - m_readyToPublish = false; m_readyToTrigger = false; + m_resendData = false; m_triggered = false; return false; @@ -1883,7 +1859,7 @@ private void QueueOverflowHandler() private ServiceResult m_samplingError; private IAggregateCalculator m_calculator; private bool m_triggered; - private int m_resendDataState; + private bool m_resendData; #endregion } } diff --git a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs index 20108fa620..ff7ccfdd0b 100644 --- a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs +++ b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs @@ -450,7 +450,7 @@ public PublishingState PublishTimerExpired() IMonitoredItem monitoredItem = current.Value; // check if the item is ready to publish. - if (monitoredItem.IsReadyToPublish) + if (monitoredItem.IsReadyToPublish || monitoredItem.IsResendData) { m_itemsToCheck.Remove(current); m_itemsToPublish.AddLast(current); @@ -494,7 +494,7 @@ public PublishingState PublishTimerExpired() m_itemsToCheck.Remove(current); m_itemsToPublish.AddLast(current); } - + current = next; } } @@ -537,7 +537,7 @@ public void TransferSession(OperationContext context, bool sendInitialValues) { // locked by caller m_session = context.Session; - + var monitoredItems = m_monitoredItems.Select(v => v.Value.Value).ToList(); var errors = new List(monitoredItems.Count); for (int ii = 0; ii < monitoredItems.Count; ii++) @@ -578,15 +578,9 @@ public void ResendData(OperationContext context) lock (m_lock) { var monitoredItems = m_monitoredItems.Select(v => v.Value.Value).ToList(); - // process MI when MonitoringMode is set to Reporting foreach (IMonitoredItem monitoredItem in monitoredItems) { - if ((monitoredItem.MonitoringMode == MonitoringMode.Reporting) && - ((monitoredItem.MonitoredItemType & MonitoredItemTypeMask.DataChange) != 0)) - { - IDataChangeMonitoredItem2 dataChangeMonitoredItem = (IDataChangeMonitoredItem2)monitoredItem; - dataChangeMonitoredItem.SetupResendDataTrigger(); - } + monitoredItem.SetupResendDataTrigger(); } } } @@ -820,7 +814,7 @@ private NotificationMessage InnerPublish( // check if a keep alive should be sent if there is no data. bool keepAliveIfNoData = m_keepAliveCounter >= m_maxKeepAliveCount; - + availableSequenceNumbers = new UInt32Collection(); moreNotifications = false; diff --git a/Tests/Opc.Ua.Client.Tests/ClientTest.cs b/Tests/Opc.Ua.Client.Tests/ClientTest.cs index a077068bb0..68afe28351 100644 --- a/Tests/Opc.Ua.Client.Tests/ClientTest.cs +++ b/Tests/Opc.Ua.Client.Tests/ClientTest.cs @@ -687,7 +687,7 @@ public async Task TransferSubscriptionNative(bool sendInitialData) var namespaceUris = Session.NamespaceUris; NodeId[] testSet = CommonTestWorkers.NodeIdTestSetStatic.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris)).ToArray(); var clientTestServices = new ClientTestServices(Session); - CommonTestWorkers.CreateSubscriptionForTransfer(clientTestServices, requestHeader, testSet, out var subscriptionIds); + var subscriptionIds = CommonTestWorkers.CreateSubscriptionForTransfer(clientTestServices, requestHeader, testSet, 0, -1); TestContext.Out.WriteLine("Transfer SubscriptionIds: {0}", subscriptionIds[0]); diff --git a/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs b/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs index 29c7e7c459..2cbf118e75 100644 --- a/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs +++ b/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs @@ -39,6 +39,9 @@ namespace Opc.Ua.Server.Tests /// public static class CommonTestWorkers { + public const int DefaultMonitoredItemsQueueSize = 0; + public const int DefaultMonitoredItemsSamplingInterval = -1; + #region Public Test Sets public static readonly ExpandedNodeId[] NodeIdTestSetStatic = { @@ -54,6 +57,15 @@ public static class CommonTestWorkers new ExpandedNodeId("Scalar_Static_Variant", Quickstarts.ReferenceServer.Namespaces.ReferenceServer), }; + // static variables from namespace TestData + public static readonly ExpandedNodeId[] NodeIdTestDataSetStatic = + { + new ExpandedNodeId(TestData.Variables.Data_Static_Scalar_Int16Value, TestData.Namespaces.TestData), + new ExpandedNodeId(TestData.Variables.Data_Static_Scalar_Int32Value, TestData.Namespaces.TestData), + new ExpandedNodeId(TestData.Variables.Data_Static_Scalar_UInt16Value, TestData.Namespaces.TestData), + new ExpandedNodeId(TestData.Variables.Data_Static_Scalar_UInt32Value, TestData.Namespaces.TestData), + }; + public static readonly ExpandedNodeId[] NodeIdTestSetSimulation = { new ExpandedNodeId("Scalar_Simulation_SByte", Quickstarts.ReferenceServer.Namespaces.ReferenceServer), @@ -67,6 +79,13 @@ public static class CommonTestWorkers new ExpandedNodeId("Scalar_Simulation_QualifiedName", Quickstarts.ReferenceServer.Namespaces.ReferenceServer), new ExpandedNodeId("Scalar_Simulation_Variant", Quickstarts.ReferenceServer.Namespaces.ReferenceServer), }; + + public static readonly ExpandedNodeId[] NodeIdMemoryBufferSimulation = + { + // dynamic variables from namespace MemoryBuffer + new ExpandedNodeId("UInt32[64]", MemoryBuffer.Namespaces.MemoryBuffer + "/Instance"), + new ExpandedNodeId("Double[40]", MemoryBuffer.Namespaces.MemoryBuffer + "/Instance"), + }; #endregion #region Public Workers @@ -427,21 +446,24 @@ public static void SubscriptionTest( /// /// Worker method to test TransferSubscriptions of a server. /// - public static void CreateSubscriptionForTransfer( + public static UInt32Collection CreateSubscriptionForTransfer( IServerTestServices services, RequestHeader requestHeader, NodeId[] testNodes, - out UInt32Collection subscriptionIds) + uint queueSize = DefaultMonitoredItemsQueueSize, + int samplingInterval = DefaultMonitoredItemsSamplingInterval) { // start time + requestHeader.Timestamp = DateTime.UtcNow; uint subscriptionId = CreateSubscription(services, requestHeader); + uint clientHandle = 1; foreach (NodeId testNode in testNodes) { - CreateMonitoredItem(services, requestHeader, subscriptionId, testNode); + CreateMonitoredItem(services, requestHeader, subscriptionId, testNode, clientHandle++, queueSize, samplingInterval); } - subscriptionIds = new UInt32Collection(); + var subscriptionIds = new UInt32Collection(); subscriptionIds.Add(subscriptionId); // enable publishing @@ -465,6 +487,8 @@ public static void CreateSubscriptionForTransfer( // static node, do not acknoledge Assert.AreEqual(1, availableSequenceNumbers.Count); + + return subscriptionIds; } /// @@ -516,6 +540,13 @@ public static void TransferSubscriptionTest( ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, acknoledgements); Assert.AreEqual(subscriptionIds[0], publishedId); Assert.AreEqual(sendInitialData ? 1 : 0, notificationMessage.NotificationData.Count); + if (sendInitialData) + { + var items = notificationMessage.NotificationData.FirstOrDefault(); + Assert.IsTrue(items.Body is Opc.Ua.DataChangeNotification); + var monitoredItemsCollection = ((Opc.Ua.DataChangeNotification)items.Body).MonitoredItems; + Assert.IsNotEmpty(monitoredItemsCollection); + } //Assert.AreEqual(0, availableSequenceNumbers.Count); requestHeader.Timestamp = DateTime.UtcNow; @@ -592,9 +623,12 @@ private static uint CreateSubscription(IServerTestServices services, RequestHead private static void CreateMonitoredItem( IServerTestServices services, RequestHeader requestHeader, - uint subscriptionId, NodeId nodeId) + uint subscriptionId, NodeId nodeId, + uint clientHandle, + uint queueSize, + int samplingInterval + ) { - uint queueSize = 5; var itemsToCreate = new MonitoredItemCreateRequestCollection { // add item new MonitoredItemCreateRequest { @@ -604,8 +638,8 @@ private static void CreateMonitoredItem( }, MonitoringMode = MonitoringMode.Reporting, RequestedParameters = new MonitoringParameters { - ClientHandle = 1u, - SamplingInterval = -1, + ClientHandle = clientHandle, + SamplingInterval = samplingInterval, Filter = null, DiscardOldest = true, QueueSize = queueSize diff --git a/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs b/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs index 0360b06c1b..d601bcca03 100644 --- a/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs +++ b/Tests/Opc.Ua.Server.Tests/ReferenceServerTest.cs @@ -33,6 +33,7 @@ using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using NUnit.Framework; +using Opc.Ua.Test; using Quickstarts.ReferenceServer; namespace Opc.Ua.Server.Tests @@ -49,11 +50,16 @@ public class ReferenceServerTests { const double kMaxAge = 10000; const uint kTimeoutHint = 10000; + const uint kQueueSize = 5; + ServerFixture m_fixture; ReferenceServer m_server; RequestHeader m_requestHeader; OperationLimits m_operationLimits; ReferenceDescriptionCollection m_referenceDescriptions; + RandomSource m_random; + DataGenerator m_generator; + #region Test Setup /// @@ -90,6 +96,8 @@ public void SetUp() m_requestHeader = m_server.CreateAndActivateSession(TestContext.CurrentContext.Test.Name); m_requestHeader.Timestamp = DateTime.UtcNow; m_requestHeader.TimeoutHint = kTimeoutHint; + m_random = new RandomSource(999); + m_generator = new DataGenerator(m_random); } /// @@ -279,6 +287,19 @@ public void Write() ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, dataValues); } + /// + /// Update static Nodes, read modify write. + /// + [Test, Order(350)] + public void ReadWriteUpdateNodes() + { + // Nodes + var namespaceUris = m_server.CurrentInstance.NamespaceUris; + NodeId[] testSet = CommonTestWorkers.NodeIdTestSetStatic.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris)).ToArray(); + + UpdateValues(testSet); + } + /// /// Browse full address space. /// @@ -344,8 +365,7 @@ public void TransferSubscriptionSessionClosed(bool sendInitialData, bool useSecu var namespaceUris = m_server.CurrentInstance.NamespaceUris; NodeId[] testSet = CommonTestWorkers.NodeIdTestSetStatic.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris)).ToArray(); transferRequestHeader.Timestamp = DateTime.UtcNow; - CommonTestWorkers.CreateSubscriptionForTransfer(serverTestServices, transferRequestHeader, - testSet, out var subscriptionIds); + var subscriptionIds = CommonTestWorkers.CreateSubscriptionForTransfer(serverTestServices, transferRequestHeader, testSet, kQueueSize, -1); transferRequestHeader.Timestamp = DateTime.UtcNow; m_server.CloseSession(transferRequestHeader, false); @@ -386,8 +406,7 @@ public void TransferSubscription(bool sendInitialData, bool useSecurity) { var namespaceUris = m_server.CurrentInstance.NamespaceUris; NodeId[] testSet = CommonTestWorkers.NodeIdTestSetStatic.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris)).ToArray(); - CommonTestWorkers.CreateSubscriptionForTransfer(serverTestServices, m_requestHeader, - testSet, out var subscriptionIds); + var subscriptionIds = CommonTestWorkers.CreateSubscriptionForTransfer(serverTestServices, m_requestHeader, testSet, kQueueSize, -1); RequestHeader transferRequestHeader = m_server.CreateAndActivateSession("TransferSession", useSecurity); var transferSecurityContext = SecureChannelContext.Current; @@ -415,9 +434,15 @@ public void TransferSubscription(bool sendInitialData, bool useSecurity) /// /// Create a subscription with a monitored item. /// Call ResendData. + /// Ensure only a single value per monitored item is returned after ResendData was called. /// [Test] - public void ResendData() + [NonParallelizable] + [TestCase(true, kQueueSize)] + [TestCase(false, kQueueSize)] + [TestCase(true, 0U)] + [TestCase(false, 0U)] + public void ResendData(bool updateValues, uint queueSize) { var serverTestServices = new ServerTestServices(m_server); // save old security context, test fixture can only work with one session @@ -425,45 +450,32 @@ public void ResendData() try { var namespaceUris = m_server.CurrentInstance.NamespaceUris; - NodeId[] testSet = CommonTestWorkers.NodeIdTestSetStatic.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris)).ToArray(); + NodeIdCollection testSetCollection = CommonTestWorkers.NodeIdTestSetStatic.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris)).ToArray(); + testSetCollection.AddRange(CommonTestWorkers.NodeIdTestDataSetStatic.Select(n => ExpandedNodeId.ToNodeId(n, namespaceUris)).ToArray()); + NodeId[] testSet = testSetCollection.ToArray(); + //Re-use method CreateSubscriptionForTransfer to create a subscription - CommonTestWorkers.CreateSubscriptionForTransfer(serverTestServices, m_requestHeader, - testSet, out var subscriptionIds); + var subscriptionIds = CommonTestWorkers.CreateSubscriptionForTransfer(serverTestServices, m_requestHeader, testSet, queueSize, 0); RequestHeader resendDataRequestHeader = m_server.CreateAndActivateSession("ResendData"); var resendDataSecurityContext = SecureChannelContext.Current; + SecureChannelContext.Current = securityContext; // After the ResendData call there will be data to publish again - MethodState methodStateInstance = (MethodState)m_server.CurrentInstance. - DiagnosticsNodeManager.FindPredefinedNode(MethodIds.Server_ResendData, typeof(MethodState)); - var nodesToCall = new CallMethodRequestCollection(); - nodesToCall.Add(new CallMethodRequest() { - ObjectId = ObjectIds.Server, - MethodId = MethodIds.Server_ResendData, - InputArguments = new VariantCollection() { new Variant(subscriptionIds.Last()) } - }); - - //call ResendData method from the same session context - m_requestHeader.Timestamp = DateTime.UtcNow; - var response = m_server.Call(m_requestHeader, - nodesToCall, - out var results, - out var diagnosticInfos); - - Assert.IsTrue(StatusCode.IsGood(results[0].StatusCode)); - ServerFixtureUtils.ValidateResponse(response); + var nodesToCall = ResendDataCall(StatusCodes.Good, subscriptionIds); Thread.Sleep(1000); // Make sure publish queue becomes empty by consuming it Assert.AreEqual(1, subscriptionIds.Count); + // Issue a Publish request m_requestHeader.Timestamp = DateTime.UtcNow; var acknoledgements = new SubscriptionAcknowledgementCollection(); - response = serverTestServices.Publish(m_requestHeader, acknoledgements, + var response = serverTestServices.Publish(m_requestHeader, acknoledgements, out uint publishedId, out UInt32Collection availableSequenceNumbers, out bool moreNotifications, out NotificationMessage notificationMessage, - out StatusCodeCollection _, out diagnosticInfos); + out StatusCodeCollection _, out DiagnosticInfoCollection diagnosticInfos); Assert.AreEqual(StatusCodes.Good, response.ServiceResult.Code); ServerFixtureUtils.ValidateResponse(response); @@ -488,15 +500,18 @@ public void ResendData() Assert.AreEqual(0, notificationMessage.NotificationData.Count); } - // Validate ResendData method call from same and different session contexts + // Validate ResendData method call returns error from different session contexts // call ResendData method from different session context + SecureChannelContext.Current = resendDataSecurityContext; resendDataRequestHeader.Timestamp = DateTime.UtcNow; response = m_server.Call(resendDataRequestHeader, nodesToCall, - out results, + out var results, out diagnosticInfos); + SecureChannelContext.Current = securityContext; + Assert.AreEqual(StatusCodes.BadUserAccessDenied, results[0].StatusCode.Code); ServerFixtureUtils.ValidateResponse(response); @@ -513,15 +528,19 @@ public void ResendData() Assert.AreEqual(subscriptionIds[0], publishedId); Assert.AreEqual(0, notificationMessage.NotificationData.Count); - //call ResendData method from the same session context - m_requestHeader.Timestamp = DateTime.UtcNow; - response = m_server.Call(m_requestHeader, - nodesToCall, - out results, - out diagnosticInfos); + if (updateValues) + { + UpdateValues(testSet); - Assert.IsTrue(StatusCode.IsGood(results[0].StatusCode)); - ServerFixtureUtils.ValidateResponse(response); + // fill queues, but only a single value per resend publish shall be returned + for (int i = 1; i < queueSize; i++) + { + UpdateValues(testSet); + } + } + + // call ResendData method from the same session context + ResendDataCall(StatusCodes.Good, subscriptionIds); // Data should be available for publishing now m_requestHeader.Timestamp = DateTime.UtcNow; @@ -535,22 +554,35 @@ public void ResendData() ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, acknoledgements); Assert.AreEqual(subscriptionIds[0], publishedId); Assert.AreEqual(1, notificationMessage.NotificationData.Count); + var items = notificationMessage.NotificationData.FirstOrDefault(); + Assert.IsTrue(items.Body is Opc.Ua.DataChangeNotification); + var monitoredItemsCollection = ((Opc.Ua.DataChangeNotification)items.Body).MonitoredItems; + Assert.AreEqual(testSet.Length, monitoredItemsCollection.Count); - // Call ResendData method with invalid subscription Id - nodesToCall = new CallMethodRequestCollection(); - nodesToCall.Add(new CallMethodRequest() { - ObjectId = ObjectIds.Server, - MethodId = MethodIds.Server_ResendData, - InputArguments = new VariantCollection() { new Variant(subscriptionIds.Last() + 20) } - }); - m_requestHeader.Timestamp = DateTime.UtcNow; - response = m_server.Call(m_requestHeader, - nodesToCall, - out results, - out diagnosticInfos); + Thread.Sleep(1000); - Assert.AreEqual(StatusCodes.BadSubscriptionIdInvalid, results[0].StatusCode.Code); - ServerFixtureUtils.ValidateResponse(response); + if (updateValues && queueSize > 1) + { + // remaining queue Data should be sent in this publish + m_requestHeader.Timestamp = DateTime.UtcNow; + response = serverTestServices.Publish(m_requestHeader, acknoledgements, + out publishedId, out availableSequenceNumbers, + out moreNotifications, out notificationMessage, + out StatusCodeCollection _, out diagnosticInfos); + + Assert.AreEqual(StatusCodes.Good, response.ServiceResult.Code); + ServerFixtureUtils.ValidateResponse(response); + ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, acknoledgements); + Assert.AreEqual(subscriptionIds[0], publishedId); + Assert.AreEqual(1, notificationMessage.NotificationData.Count); + items = notificationMessage.NotificationData.FirstOrDefault(); + Assert.IsTrue(items.Body is Opc.Ua.DataChangeNotification); + monitoredItemsCollection = ((Opc.Ua.DataChangeNotification)items.Body).MonitoredItems; + Assert.AreEqual(testSet.Length * (queueSize - 1), monitoredItemsCollection.Count, testSet.Length); + } + + // Call ResendData method with invalid subscription Id + ResendDataCall(StatusCodes.BadSubscriptionIdInvalid, new UInt32Collection() { subscriptionIds.Last() + 20 }); // Nothing to publish since previous ResendData call did not execute m_requestHeader.Timestamp = DateTime.UtcNow; @@ -576,5 +608,78 @@ public void ResendData() } } #endregion + + #region Private Methods + private CallMethodRequestCollection ResendDataCall(StatusCode expectedStatus, UInt32Collection subscriptionIds) + { + // Find the ResendData method + var nodesToCall = new CallMethodRequestCollection(); + foreach (var subscriptionId in subscriptionIds) + { + nodesToCall.Add(new CallMethodRequest() { + ObjectId = ObjectIds.Server, + MethodId = MethodIds.Server_ResendData, + InputArguments = new VariantCollection() { new Variant(subscriptionId) } + }); + } + + //call ResendData method with subscription ids + m_requestHeader.Timestamp = DateTime.UtcNow; + var response = m_server.Call(m_requestHeader, + nodesToCall, + out var results, + out var diagnosticInfos); + + Assert.AreEqual(expectedStatus, results[0].StatusCode.Code); + ServerFixtureUtils.ValidateResponse(response); + + return nodesToCall; + } + + /// + /// Read Values of NodeIds, determine types, write back new random values. + /// + /// The nodeIds to modify. + private void UpdateValues(NodeId[] testSet) + { + // Read values + var requestHeader = m_requestHeader; + var nodesToRead = new ReadValueIdCollection(); + foreach (NodeId nodeId in testSet) + { + nodesToRead.Add(new ReadValueId() { NodeId = nodeId, AttributeId = Attributes.Value }); + } + var response = m_server.Read(requestHeader, kMaxAge, TimestampsToReturn.Neither, nodesToRead, + out var readDataValues, out var diagnosticInfos); + + ServerFixtureUtils.ValidateResponse(response); + ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, readDataValues); + Assert.AreEqual(testSet.Length, readDataValues.Count); + + var modifiedValues = new DataValueCollection(); + foreach (var dataValue in readDataValues) + { + var typeInfo = TypeInfo.Construct(dataValue.Value); + Assert.IsNotNull(typeInfo); + var value = m_generator.GetRandom(typeInfo.BuiltInType); + modifiedValues.Add(new DataValue() { WrappedValue = new Variant(value) }); + } + + int ii = 0; + var nodesToWrite = new WriteValueCollection(); + foreach (NodeId nodeId in testSet) + { + nodesToWrite.Add(new WriteValue() { NodeId = nodeId, AttributeId = Attributes.Value, Value = modifiedValues[ii] }); + ii++; + } + + // Write Nodes + requestHeader.Timestamp = DateTime.UtcNow; + response = m_server.Write(requestHeader, nodesToWrite, + out var writeDataValues, out diagnosticInfos); + ServerFixtureUtils.ValidateResponse(response); + ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, writeDataValues); + } + #endregion } }