Skip to content

Commit

Permalink
StreamConfiguration and ConsumerConfiguration metadata support (#756)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Apr 3, 2023
1 parent 5cdd9a1 commit 1da3602
Show file tree
Hide file tree
Showing 26 changed files with 439 additions and 234 deletions.
15 changes: 14 additions & 1 deletion src/NATS.Client/Internals/JsonUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static Dictionary<string, string> StringStringDictionay(JSONNode node, st
{
temp[key] = meta[key];
}
return temp.Count == 0 ? null : temp;
return temp;
}

public static MsgHeader AsHeaders(JSONNode node, string field)
Expand Down Expand Up @@ -374,6 +374,19 @@ public static void AddField(JSONObject o, string field, IList<string> values)
}
}

public static void AddField<T>(JSONObject o, string field, IList<T> values) where T : JsonSerializable
{
if (values != null && values.Count > 0)
{
JSONArray ja = new JSONArray();
foreach (JsonSerializable v in values)
{
ja.Add(v.ToJsonNode());
}
o[field] = ja;
}
}

public static void AddField(JSONObject o, string field, MsgHeader headers)
{
if (headers != null && headers.Count > 0)
Expand Down
116 changes: 116 additions & 0 deletions src/NATS.Client/Internals/ServerVersion.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;

namespace NATS.Client.Internals
{
internal class ServerVersion : IComparable<ServerVersion> {
private const string NoExtra = "~";

readonly int major;
readonly int minor;
readonly int patch;
readonly string extra;

internal ServerVersion(String v) {
string[] split = v.Replace("v", "").Replace("-", ".").Split('.');
if (v.StartsWith("v")) {
split = v.Substring(1).Replace("-", ".").Split('.');
}
else {
split = v.Replace("-", ".").Split('.');
}
int mjr;
int mnr;
int ptch;
if (int.TryParse(split[0], out mjr) &&
int.TryParse(split[1], out mnr) &&
int.TryParse(split[2], out ptch))
{
major = mjr;
minor = mnr;
patch = ptch;
string xtra = null;
for (int i = 3; i < split.Length; i++)
{
if (i == 3)
{
xtra = "-" + split[i];
}
else
{
//noinspection StringConcatenationInLoop
xtra = xtra + "." + split[i];
}
}
extra = xtra;
}
else
{
major = 0;
minor = 0;
patch = 0;
extra = null;
}
}

public override string ToString()
{
return $"{major}.{minor}.{patch}" + (extra == null ? "" : extra);
}

public int CompareTo(ServerVersion o)
{
int c = major.CompareTo(o.major);
if (c == 0) {
c = minor.CompareTo(o.minor);
if (c == 0) {
c = patch.CompareTo(o.patch);
if (c == 0) {
if (extra == null) {
c = o.extra == null ? 0 : 1;
}
else if (o.extra == null) {
c = -1;
}
else {
c = extra.CompareTo(o.extra);
}
}
}
}
return c;
}

public static bool IsNewer(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) > 0;
}

public static bool IsSame(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) == 0;
}

public static bool IsOlder(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) < 0;
}

public static bool IsSameOrOlder(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) <= 0;
}

public static bool IsSameOrNewer(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) >= 0;
}
}
}
15 changes: 14 additions & 1 deletion src/NATS.Client/Internals/Token.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
using static NATS.Client.Internals.NatsConstants;
// Copyright 2017-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using static NATS.Client.Internals.NatsConstants;

namespace NATS.Client.Internals
{
Expand Down
40 changes: 39 additions & 1 deletion src/NATS.Client/Internals/Validator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
using System;
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Text;
using System.Text.RegularExpressions;
Expand Down Expand Up @@ -623,5 +636,30 @@ public static bool IsSemVer(String s)
{
return Regex.IsMatch(s, SemVerPattern);
}

public static bool DictionariesEqual(IDictionary<string, string> d1, IDictionary<string, string> d2)
{
if (d1 == d2)
{
return true;
}

if (d1 == null || d2 == null || d1.Count != d2.Count)
{
return false;
}

foreach (KeyValuePair<string, string> pair in d1)
{
string value;
if (!d2.TryGetValue(pair.Key, out value) ||
!pair.Value.Equals(value))
{
return false;
}
}

return true;
}
}
}
37 changes: 34 additions & 3 deletions src/NATS.Client/JetStream/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
Expand Down Expand Up @@ -86,7 +86,8 @@ public sealed class ConsumerConfiguration : JsonSerializable
public bool HeadersOnly => _headersOnly ?? false;
public bool MemStorage => _memStorage ?? false;
public IList<Duration> Backoff { get; }

public Dictionary<string, string> Metadata { get; }

internal ConsumerConfiguration(string json) : this(JSON.Parse(json)) {}

internal ConsumerConfiguration(JSONNode ccNode)
Expand Down Expand Up @@ -122,6 +123,7 @@ internal ConsumerConfiguration(JSONNode ccNode)
_memStorage = ccNode[ApiConstants.MemStorage].AsBool;

Backoff = DurationList(ccNode, ApiConstants.Backoff);
Metadata = JsonUtils.StringStringDictionay(ccNode, ApiConstants.Metadata);
}

private ConsumerConfiguration(ConsumerConfigurationBuilder builder)
Expand Down Expand Up @@ -157,6 +159,7 @@ private ConsumerConfiguration(ConsumerConfigurationBuilder builder)
_numReplicas = builder._numReplicas;

Backoff = builder._backoff;
Metadata = builder._metadata;
}

public override JSONNode ToJsonNode()
Expand Down Expand Up @@ -190,7 +193,7 @@ public override JSONNode ToJsonNode()
AddField(o, ApiConstants.Backoff, Backoff);
AddField(o, ApiConstants.NumReplicas, NumReplicas);
AddField(o, ApiConstants.MemStorage, MemStorage);

AddField(o, ApiConstants.Metadata, Metadata);
return o;
}

Expand Down Expand Up @@ -229,6 +232,12 @@ internal IList<string> GetChanges(ConsumerConfiguration server)
RecordWouldBeChange(DeliverGroup, server.DeliverGroup, "DeliverGroup", changes);

if (!Backoff.SequenceEqual(server.Backoff)) { changes.Add("Backoff"); }

if (!Validator.DictionariesEqual(Metadata, server.Metadata))
{
changes.Add("Metadata");
}

return changes;
}

Expand Down Expand Up @@ -326,6 +335,7 @@ public sealed class ConsumerConfigurationBuilder
internal bool? _headersOnly;
internal bool? _memStorage;
internal IList<Duration> _backoff = new List<Duration>();
internal Dictionary<string, string> _metadata = new Dictionary<string, string>();

public ConsumerConfigurationBuilder() {}

Expand Down Expand Up @@ -363,6 +373,10 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc)
_headersOnly = cc._headersOnly;
_memStorage = cc._memStorage;
_backoff = new List<Duration>(cc.Backoff);
foreach (string key in cc.Metadata.Keys)
{
_metadata[key] = cc.Metadata[key];
}
}

/// <summary>
Expand Down Expand Up @@ -797,6 +811,23 @@ public ConsumerConfigurationBuilder WithBackoff(params long[] backoffsMillis) {
return this;
}

/// <summary>
/// Sets the metadata for the configuration
/// </summary>
/// <param name="metadata">the metadata dictionary</param>
/// <returns>The ConsumerConfigurationBuilder</returns>
public ConsumerConfigurationBuilder WithMetadata(Dictionary<string, string> metadata) {
_metadata.Clear();
if (metadata != null)
{
foreach (string key in metadata.Keys)
{
_metadata[key] = metadata[key];
}
}
return this;
}

/// <summary>
/// Builds the ConsumerConfiguration
/// </summary>
Expand Down
33 changes: 22 additions & 11 deletions src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ Subscription CreateSubscription(string subject, string queueName,
string inboxDeliver = userCC.DeliverSubject;

// 3. Does this consumer already exist?
if (consumerName != null) {
if (consumerName != null)
{
ConsumerInfo serverInfo = LookupConsumerInfo(stream, consumerName);

if (serverInfo != null) { // the consumer for that durable already exists
Expand All @@ -245,36 +246,46 @@ Subscription CreateSubscription(string subject, string queueName,
// check to see if the user sent a different version than the server has
// modifications are not allowed
IList<string> changes = userCC.GetChanges(serverCC);
if (changes.Count > 0) {
if (changes.Count > 0)
{
throw JsSubExistingConsumerCannotBeModified.Instance($"[{string.Join(",", changes)}]");
}

if (isPullMode) {
if (!string.IsNullOrWhiteSpace(serverCC.DeliverSubject)) {
if (isPullMode)
{
if (!string.IsNullOrWhiteSpace(serverCC.DeliverSubject))
{
throw JsSubConsumerAlreadyConfiguredAsPush.Instance();
}
}
else if (string.IsNullOrWhiteSpace(serverCC.DeliverSubject)) {
else if (string.IsNullOrWhiteSpace(serverCC.DeliverSubject))
{
throw JsSubConsumerAlreadyConfiguredAsPull.Instance();
}

if (string.IsNullOrWhiteSpace(serverCC.DeliverGroup)) {
if (string.IsNullOrWhiteSpace(serverCC.DeliverGroup))
{
// lookedUp was null/empty, means existing consumer is not a queue consumer
if (qgroup == null) {
if (qgroup == null)
{
// ok fine, no queue requested and the existing consumer is also not a queue consumer
// we must check if the consumer is in use though
if (serverInfo.PushBound) {
if (serverInfo.PushBound)
{
throw JsSubConsumerAlreadyBound.Instance();
}
}
else { // else they requested a queue but this durable was not configured as queue
else
{ // else they requested a queue but this durable was not configured as queue
throw JsSubExistingConsumerNotQueue.Instance();
}
}
else if (qgroup == null) {
else if (qgroup == null)
{
throw JsSubExistingConsumerIsQueue.Instance();
}
else if (!serverCC.DeliverGroup.Equals(qgroup)) {
else if (!serverCC.DeliverGroup.Equals(qgroup))
{
throw JsSubExistingQueueDoesNotMatchRequestedQueue.Instance();
}

Expand Down
Loading

0 comments on commit 1da3602

Please sign in to comment.