Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same defaults for batch size and trigger interval as in 2.8. #2046

Merged
merged 5 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/opc-publisher/commandline.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ The following OPC Publisher configuration can be applied by Command Line Interfa
When both environment variable and CLI argument are provided, the command line option will override the environment variable.

```text

██████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ ██╗ ██╗███████╗██╗ ██╗███████╗██████╗
██╔═══██╗██╔══██╗██╔════╝ ██╔══██╗██║ ██║██╔══██╗██║ ██║██╔════╝██║ ██║██╔════╝██╔══██╗
██║ ██║██████╔╝██║ ██████╔╝██║ ██║██████╔╝██║ ██║███████╗███████║█████╗ ██████╔╝
██║ ██║██╔═══╝ ██║ ██╔═══╝ ██║ ██║██╔══██╗██║ ██║╚════██║██╔══██║██╔══╝ ██╔══██╗
╚██████╔╝██║ ╚██████╗ ██║ ╚██████╔╝██████╔╝███████╗██║███████║██║ ██║███████╗██║ ██║
╚═════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═════╝ ╚══════╝╚═╝╚══════╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝
2.9.1
2.9.2
General
-------

Expand Down Expand Up @@ -148,6 +147,7 @@ Messaging configuration
Allowed values:
`PublishTime`
`CurrentTimeUtc`
`CreatedTimeUtc`
Default: `PublishTime` to use the subscription
notification publish timestamp if available.
--npd, --maxnodesperdataset, --MaxNodesPerDataSet=VALUE
Expand Down Expand Up @@ -641,8 +641,8 @@ OPC UA Client configuration
Default: `True`.
--sn, --appcertsubjectname, --ApplicationCertificateSubjectName=VALUE
The subject name for the app cert.
Default: `CN=Microsoft.Azure.IIoT, C=DE, S=Bav,
O=Microsoft, DC=localhost`.
Default: `CN=<the value of --an|--appname>, C=DE,
S=Bav, O=Microsoft, DC=localhost`.
--an, --appname, --ApplicationName=VALUE
The name for the app (used during OPC UA
authentication).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,10 @@ public CommandLine(string[] args)
"Set to `False` to disable adding the publisher's own certificate to the trusted store automatically.\nDefault: `True`.\n",
(bool b) => this[OpcUaClientConfig.AddAppCertToTrustedStoreKey] = b.ToString() },
{ $"sn|appcertsubjectname=|{OpcUaClientConfig.ApplicationCertificateSubjectNameKey}=",
"The subject name for the app cert.\nDefault: `CN=Microsoft.Azure.IIoT, C=DE, S=Bav, O=Microsoft, DC=localhost`.\n",
"The subject name for the app cert.\nDefault: `CN=<the value of --an|--appname>, C=DE, S=Bav, O=Microsoft, DC=localhost`.\n",
s => this[OpcUaClientConfig.ApplicationCertificateSubjectNameKey] = s },
{ $"an|appname=|{OpcUaClientConfig.ApplicationNameKey}=",
"The name for the app (used during OPC UA authentication).\nDefault: `Microsoft.Azure.IIoT`\n",
$"The name for the app (used during OPC UA authentication).\nDefault: `{OpcUaClientConfig.ApplicationNameDefault}`\n",
s => this[OpcUaClientConfig.ApplicationNameKey] = s },
{ $"pki|pkirootpath=|{OpcUaClientConfig.PkiRootPathKey}=",
"PKI certificate store root path.\nDefault: `pki`.\n",
Expand Down
42 changes: 26 additions & 16 deletions src/Azure.IIoT.OpcUa.Publisher/src/Runtime/PublisherConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public sealed class PublisherConfig : PostConfigureOptionBase<PublisherOptions>
public const bool EnableDataSetRoutingInfoDefault = false;
public const MessageEncoding MessageEncodingDefault = MessageEncoding.Json;
public const int MaxNodesPerDataSetDefault = 1000;
public const int BatchSizeDefault = 100;
public const int BatchSizeLegacyDefault = 50;
public const int MaxNetworkMessageSendQueueSizeDefault = 4096;
public const int BatchTriggerIntervalDefaultMillis = 1 * 1000;
public const int BatchTriggerIntervalLLegacyDefaultMillis = 10 * 1000;
public const int DiagnosticsIntervalDefaultMillis = 60 * 1000;
public const int ScaleTestCountDefault = 1;
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
Expand All @@ -111,6 +111,18 @@ public override void PostConfigure(string? name, PublisherOptions options)
options.PublishedNodesFile = GetStringOrDefault(PublishedNodesFileKey);
}

if (options.DefaultTransport == null && Enum.TryParse<WriterGroupTransport>(
GetStringOrDefault(DefaultTransportKey), out var transport))
{
options.DefaultTransport = transport;
}

if (options.UseStandardsCompliantEncoding == null)
{
options.UseStandardsCompliantEncoding = GetBoolOrDefault(
UseStandardsCompliantEncodingKey, UseStandardsCompliantEncodingDefault);
}

if (options.CreatePublishFileIfNotExist == null)
{
options.CreatePublishFileIfNotExist = GetBoolOrNull(
Expand All @@ -125,15 +137,25 @@ public override void PostConfigure(string? name, PublisherOptions options)

if (options.BatchSize == null)
{
//
// Default to batch size of 50 if not using strict encoding and a
// transport was not specified to support backcompat with 2.8
//
options.BatchSize = GetIntOrDefault(BatchSizeKey,
BatchSizeDefault);
options.UseStandardsCompliantEncoding == true ||
options.DefaultTransport != null ? 0 : BatchSizeLegacyDefault);
}

if (options.BatchTriggerInterval == null)
{
//
// Default to batch interval of 10 seconds if not using strict encoding
// and a transport was not specified to support backcompat with 2.8
//
options.BatchTriggerInterval = GetDurationOrNull(BatchTriggerIntervalKey) ??
TimeSpan.FromMilliseconds(GetIntOrDefault(BatchTriggerIntervalKey,
BatchTriggerIntervalDefaultMillis));
options.UseStandardsCompliantEncoding == true ||
options.DefaultTransport != null ? 0 : BatchTriggerIntervalLLegacyDefaultMillis));
}

if (options.MaxNetworkMessageSendQueueSize == null)
Expand Down Expand Up @@ -173,12 +195,6 @@ public override void PostConfigure(string? name, PublisherOptions options)
DataSetMetaDataTopicTemplateKey);
}

if (options.DefaultTransport == null && Enum.TryParse<WriterGroupTransport>(
GetStringOrDefault(DefaultTransportKey), out var transport))
{
options.DefaultTransport = transport;
}

if (options.DisableOpenApiEndpoint == null)
{
options.DisableOpenApiEndpoint = GetBoolOrNull(DisableOpenApiEndpointKey);
Expand Down Expand Up @@ -237,12 +253,6 @@ public override void PostConfigure(string? name, PublisherOptions options)
DefaultMaxMessagesPerPublishKey);
}

if (options.UseStandardsCompliantEncoding == null)
{
options.UseStandardsCompliantEncoding = GetBoolOrDefault(
UseStandardsCompliantEncodingKey, UseStandardsCompliantEncodingDefault);
}

if (options.MessageTimestamp == null)
{
if (!Enum.TryParse<MessageTimestamp>(GetStringOrDefault(MessageTimestampKey),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public void Dispose()
chunkedMessage.AddProperty(OpcUa.Constants.MessagePropertyRoutingKey,
networkMessage.DataSetWriterGroup);
}

_logger.LogDebug(
"{Count} Notifications encoded into a network message (chunks:{Chunks})...",
notificationsPerMessage, validChunks);

chunkedMessages.Add((chunkedMessage, onSent));
}
else
Expand Down
52 changes: 38 additions & 14 deletions src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ public NetworkMessageSink(WriterGroupModel writerGroup,

_batchTriggerInterval = writerGroup.PublishingInterval
?? _options.Value.BatchTriggerInterval ?? TimeSpan.Zero;
//
// If the max notification per message is 1 then there is no need to
// have an interval publishing as the messages are emitted as soon
// as they arrive anyway
//
if (_maxNotificationsPerMessage == 1)
{
_batchTriggerInterval = TimeSpan.Zero;
}
_maxPublishQueueSize = (int?)writerGroup.PublishQueueSize
?? _options.Value.MaxNetworkMessageSendQueueSize ?? kMaxQueueSize;

//
// set notification buffer to 1 if no publishing interval otherwise queue
// as much as reasonable
// If undefined, set notification buffer to 1 if no publishing interval
// otherwise queue as much as reasonable
//
if (_maxNotificationsPerMessage < 1)
if (_maxNotificationsPerMessage <= 0)
{
_maxNotificationsPerMessage = _batchTriggerInterval == TimeSpan.Zero ?
1 : _maxPublishQueueSize;
Expand All @@ -112,21 +121,30 @@ public NetworkMessageSink(WriterGroupModel writerGroup,
_encodingBlock =
new TransformManyBlock<IOpcUaSubscriptionNotification[], (IEvent, Action)>(
EncodeNotifications, new ExecutionDataflowBlockOptions());
_batchDataSetMessageBlock = new BatchBlock<IOpcUaSubscriptionNotification>(
_notificationBufferBlock = new BatchBlock<IOpcUaSubscriptionNotification>(
_maxNotificationsPerMessage, new GroupingDataflowBlockOptions());
_sinkBlock = new ActionBlock<(IEvent, Action)>(
SendAsync, new ExecutionDataflowBlockOptions());

_batchDataSetMessageBlock.LinkTo(_encodingBlock);
_notificationBufferBlock.LinkTo(_encodingBlock);
_encodingBlock.LinkTo(_sinkBlock);

Source.OnMessage += OnMessageReceived;
Source.OnCounterReset += MessageTriggerCounterResetReceived;

InitializeMetrics();
_logger.LogInformation(
"Writer group {WriterGroup} set up to publish messages to {Transport}...",
writerGroup.Name, _eventClient.Name);
_logger.LogInformation("Writer group {WriterGroup} set up to publish " +
"notifications {Interval} {Batching} with {MaxSize} to {Transport} " +
"(queuing at most {MaxQueueSize} notifications)...",
writerGroup.Name ?? Constants.DefaultWriterGroupId,
_batchTriggerInterval == TimeSpan.Zero ?
"as soon as they arrive" : $"every {_batchTriggerInterval} (hh:mm:ss)",
_maxNotificationsPerMessage == 1 ?
"and individually" :
$"or when a batch of {_maxNotificationsPerMessage} notifications is ready",
_maxNetworkMessageSize == int.MaxValue ?
"unlimited" : $"at most {_maxNetworkMessageSize / 1024} kb",
_eventClient.Name, _maxPublishQueueSize);
}

/// <inheritdoc/>
Expand All @@ -147,8 +165,8 @@ public async ValueTask DisposeAsync()
// is then completed. If blocks are completed downstream first
// previous blocks will hang.
//
_batchDataSetMessageBlock.Complete();
await _batchDataSetMessageBlock.Completion.ConfigureAwait(false);
_notificationBufferBlock.Complete();
await _notificationBufferBlock.Completion.ConfigureAwait(false);
_encodingBlock.Complete();
await _encodingBlock.Completion.ConfigureAwait(false);
_sinkBlock.Complete();
Expand Down Expand Up @@ -184,6 +202,7 @@ public void Dispose()
{
try
{
Interlocked.Add(ref _notificationBufferInputCount, -input.Length);
return _messageEncoder.Encode(_eventClient.CreateEvent,
input, _maxNetworkMessageSize, _maxNotificationsPerMessage != 1);
}
Expand Down Expand Up @@ -212,6 +231,7 @@ private async Task SendAsync((IEvent Event, Action Complete) message)
{
// Throws if cancelled
await message.Event.SendAsync(_cts.Token).ConfigureAwait(false);
_logger.LogDebug("#{Attempt}: Network message sent.", attempt);
break;
}
catch (Exception e) when (
Expand Down Expand Up @@ -266,7 +286,9 @@ private void BatchTriggerIntervalTimer_Elapsed(object? state)
_batchTriggerIntervalTimer.Change(_batchTriggerInterval,
Timeout.InfiniteTimeSpan);
}
_batchDataSetMessageBlock.TriggerBatch();
_logger.LogDebug("Trigger notification batch (Interval:{Interval})...",
_batchTriggerInterval);
_notificationBufferBlock.TriggerBatch();
}

/// <summary>
Expand Down Expand Up @@ -308,7 +330,8 @@ private void OnMessageReceived(object? sender, IOpcUaSubscriptionNotification ar
LogNotification(args);
}

_batchDataSetMessageBlock.Post(args);
Interlocked.Increment(ref _notificationBufferInputCount);
_notificationBufferBlock.Post(args);
}
}

Expand Down Expand Up @@ -371,7 +394,7 @@ private void InitializeMetrics()
() => new Measurement<long>(_sinkBlock.InputCount, _metrics.TagList), "Messages",
"Telemetry messages queued for sending upstream.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_batch_input_queue_size",
() => new Measurement<long>(_batchDataSetMessageBlock.OutputCount, _metrics.TagList), "Notifications",
() => new Measurement<long>(_notificationBufferInputCount, _metrics.TagList), "Notifications",
"Telemetry messages queued for sending upstream.");
_meter.CreateObservableUpDownCounter("iiot_edge_publisher_encoding_input_queue_size",
() => new Measurement<long>(_encodingBlock.InputCount, _metrics.TagList), "Notifications",
Expand Down Expand Up @@ -401,6 +424,7 @@ private void InitializeMetrics()
private double UpTime => (DateTime.UtcNow - _startTime).TotalSeconds;
private long _messagesSentCount;
private long _sinkBlockInputDroppedCount;
private long _notificationBufferInputCount;
private DateTime _dataFlowStartTime = DateTime.MinValue;
private readonly int _maxNotificationsPerMessage;
private readonly int _maxNetworkMessageSize;
Expand All @@ -412,7 +436,7 @@ private void InitializeMetrics()
private readonly ILogger _logger;
private readonly IWriterGroupDiagnostics? _diagnostics;
private readonly bool _logNotifications;
private readonly BatchBlock<IOpcUaSubscriptionNotification> _batchDataSetMessageBlock;
private readonly BatchBlock<IOpcUaSubscriptionNotification> _notificationBufferBlock;
private readonly TransformManyBlock<IOpcUaSubscriptionNotification[], (IEvent, Action)> _encodingBlock;
private readonly ActionBlock<(IEvent, Action)> _sinkBlock;
private readonly DateTime _startTime = DateTime.UtcNow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,24 @@ public sealed class OpcUaClientConfig : PostConfigureOptionBase<OpcUaClientOptio
/// <inheritdoc/>
public override void PostConfigure(string? name, OpcUaClientOptions options)
{
if (options.ApplicationName == null)
if (string.IsNullOrEmpty(options.ApplicationName))
{
options.ApplicationName =
GetStringOrDefault(ApplicationNameKey, ApplicationNameDefault);
options.ApplicationName = GetStringOrDefault(ApplicationNameKey);
if (string.IsNullOrEmpty(options.ApplicationName) ||
options.ApplicationName == "Azure.IIoT.OpcUa.Publisher.Module")
{
options.ApplicationName = ApplicationNameDefault;
}
}

if (options.ApplicationUri == null)
if (string.IsNullOrEmpty(options.ApplicationUri))
{
options.ApplicationUri = GetStringOrDefault(ApplicationUriKey,
string.Format(CultureInfo.InvariantCulture,
ApplicationUriDefault, options.ApplicationName));
}

if (options.ProductUri == null)
if (string.IsNullOrEmpty(options.ProductUri))
{
options.ProductUri = GetStringOrDefault(ProductUriKey,
ProductUriDefault);
Expand Down