Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamConfiguration and ConsumerConfiguration metadata support #756

Merged
merged 6 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/NATS.Client/Internals/JsonUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static Dictionary<string, string> StringStringDictionay(JSONNode node, st
{
temp[key] = meta[key];
}
return temp.Count == 0 ? null : temp;
return temp;
}

public static MsgHeader AsHeaders(JSONNode node, string field)
Expand Down Expand Up @@ -374,6 +374,19 @@ public static void AddField(JSONObject o, string field, IList<string> values)
}
}

public static void AddField<T>(JSONObject o, string field, IList<T> values) where T : JsonSerializable
{
if (values != null && values.Count > 0)
{
JSONArray ja = new JSONArray();
foreach (JsonSerializable v in values)
{
ja.Add(v.ToJsonNode());
}
o[field] = ja;
}
}

public static void AddField(JSONObject o, string field, MsgHeader headers)
{
if (headers != null && headers.Count > 0)
Expand Down
116 changes: 116 additions & 0 deletions src/NATS.Client/Internals/ServerVersion.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;

namespace NATS.Client.Internals
{
internal class ServerVersion : IComparable<ServerVersion> {
private const string NoExtra = "~";

readonly int major;
readonly int minor;
readonly int patch;
readonly string extra;

internal ServerVersion(String v) {
string[] split = v.Replace("v", "").Replace("-", ".").Split('.');
if (v.StartsWith("v")) {
split = v.Substring(1).Replace("-", ".").Split('.');
}
else {
split = v.Replace("-", ".").Split('.');
}
int mjr;
int mnr;
int ptch;
if (int.TryParse(split[0], out mjr) &&
int.TryParse(split[1], out mnr) &&
int.TryParse(split[2], out ptch))
{
major = mjr;
minor = mnr;
patch = ptch;
string xtra = null;
for (int i = 3; i < split.Length; i++)
{
if (i == 3)
{
xtra = "-" + split[i];
}
else
{
//noinspection StringConcatenationInLoop
xtra = xtra + "." + split[i];
}
}
extra = xtra;
}
else
{
major = 0;
minor = 0;
patch = 0;
extra = null;
}
}

public override string ToString()
{
return $"{major}.{minor}.{patch}" + (extra == null ? "" : extra);
}

public int CompareTo(ServerVersion o)
{
int c = major.CompareTo(o.major);
if (c == 0) {
c = minor.CompareTo(o.minor);
if (c == 0) {
c = patch.CompareTo(o.patch);
if (c == 0) {
if (extra == null) {
c = o.extra == null ? 0 : 1;
}
else if (o.extra == null) {
c = -1;
}
else {
c = extra.CompareTo(o.extra);
}
}
}
}
return c;
}

public static bool IsNewer(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) > 0;
}

public static bool IsSame(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) == 0;
}

public static bool IsOlder(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) < 0;
}

public static bool IsSameOrOlder(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) <= 0;
}

public static bool IsSameOrNewer(String v, String than) {
return new ServerVersion(v).CompareTo(new ServerVersion(than)) >= 0;
}
}
}
15 changes: 14 additions & 1 deletion src/NATS.Client/Internals/Token.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
using static NATS.Client.Internals.NatsConstants;
// Copyright 2017-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using static NATS.Client.Internals.NatsConstants;

namespace NATS.Client.Internals
{
Expand Down
40 changes: 39 additions & 1 deletion src/NATS.Client/Internals/Validator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
using System;
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Text;
using System.Text.RegularExpressions;
Expand Down Expand Up @@ -623,5 +636,30 @@ public static bool IsSemVer(String s)
{
return Regex.IsMatch(s, SemVerPattern);
}

public static bool DictionariesEqual(IDictionary<string, string> d1, IDictionary<string, string> d2)
{
if (d1 == d2)
{
return true;
}

if (d1 == null || d2 == null || d1.Count != d2.Count)
{
return false;
}

foreach (KeyValuePair<string, string> pair in d1)
{
string value;
if (!d2.TryGetValue(pair.Key, out value) ||
!pair.Value.Equals(value))
{
return false;
}
}

return true;
}
}
}
37 changes: 34 additions & 3 deletions src/NATS.Client/JetStream/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
Expand Down Expand Up @@ -86,7 +86,8 @@ public sealed class ConsumerConfiguration : JsonSerializable
public bool HeadersOnly => _headersOnly ?? false;
public bool MemStorage => _memStorage ?? false;
public IList<Duration> Backoff { get; }

public Dictionary<string, string> Metadata { get; }

internal ConsumerConfiguration(string json) : this(JSON.Parse(json)) {}

internal ConsumerConfiguration(JSONNode ccNode)
Expand Down Expand Up @@ -122,6 +123,7 @@ internal ConsumerConfiguration(JSONNode ccNode)
_memStorage = ccNode[ApiConstants.MemStorage].AsBool;

Backoff = DurationList(ccNode, ApiConstants.Backoff);
Metadata = JsonUtils.StringStringDictionay(ccNode, ApiConstants.Metadata);
}

private ConsumerConfiguration(ConsumerConfigurationBuilder builder)
Expand Down Expand Up @@ -157,6 +159,7 @@ private ConsumerConfiguration(ConsumerConfigurationBuilder builder)
_numReplicas = builder._numReplicas;

Backoff = builder._backoff;
Metadata = builder._metadata;
}

public override JSONNode ToJsonNode()
Expand Down Expand Up @@ -190,7 +193,7 @@ public override JSONNode ToJsonNode()
AddField(o, ApiConstants.Backoff, Backoff);
AddField(o, ApiConstants.NumReplicas, NumReplicas);
AddField(o, ApiConstants.MemStorage, MemStorage);

AddField(o, ApiConstants.Metadata, Metadata);
return o;
}

Expand Down Expand Up @@ -229,6 +232,12 @@ internal IList<string> GetChanges(ConsumerConfiguration server)
RecordWouldBeChange(DeliverGroup, server.DeliverGroup, "DeliverGroup", changes);

if (!Backoff.SequenceEqual(server.Backoff)) { changes.Add("Backoff"); }

if (!Validator.DictionariesEqual(Metadata, server.Metadata))
{
changes.Add("Metadata");
}

return changes;
}

Expand Down Expand Up @@ -326,6 +335,7 @@ public sealed class ConsumerConfigurationBuilder
internal bool? _headersOnly;
internal bool? _memStorage;
internal IList<Duration> _backoff = new List<Duration>();
internal Dictionary<string, string> _metadata = new Dictionary<string, string>();

public ConsumerConfigurationBuilder() {}

Expand Down Expand Up @@ -363,6 +373,10 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc)
_headersOnly = cc._headersOnly;
_memStorage = cc._memStorage;
_backoff = new List<Duration>(cc.Backoff);
foreach (string key in cc.Metadata.Keys)
{
_metadata[key] = cc.Metadata[key];
}
}

/// <summary>
Expand Down Expand Up @@ -797,6 +811,23 @@ public ConsumerConfigurationBuilder WithBackoff(params long[] backoffsMillis) {
return this;
}

/// <summary>
/// Sets the metadata for the configuration
/// </summary>
/// <param name="metadata">the metadata dictionary</param>
/// <returns>The ConsumerConfigurationBuilder</returns>
public ConsumerConfigurationBuilder WithMetadata(Dictionary<string, string> metadata) {
_metadata.Clear();
if (metadata != null)
{
foreach (string key in metadata.Keys)
{
_metadata[key] = metadata[key];
}
}
return this;
}

/// <summary>
/// Builds the ConsumerConfiguration
/// </summary>
Expand Down
33 changes: 22 additions & 11 deletions src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ Subscription CreateSubscription(string subject, string queueName,
string inboxDeliver = userCC.DeliverSubject;

// 3. Does this consumer already exist?
if (consumerName != null) {
if (consumerName != null)
{
ConsumerInfo serverInfo = LookupConsumerInfo(stream, consumerName);

if (serverInfo != null) { // the consumer for that durable already exists
Expand All @@ -245,36 +246,46 @@ Subscription CreateSubscription(string subject, string queueName,
// check to see if the user sent a different version than the server has
// modifications are not allowed
IList<string> changes = userCC.GetChanges(serverCC);
if (changes.Count > 0) {
if (changes.Count > 0)
{
throw JsSubExistingConsumerCannotBeModified.Instance($"[{string.Join(",", changes)}]");
}

if (isPullMode) {
if (!string.IsNullOrWhiteSpace(serverCC.DeliverSubject)) {
if (isPullMode)
{
if (!string.IsNullOrWhiteSpace(serverCC.DeliverSubject))
{
throw JsSubConsumerAlreadyConfiguredAsPush.Instance();
}
}
else if (string.IsNullOrWhiteSpace(serverCC.DeliverSubject)) {
else if (string.IsNullOrWhiteSpace(serverCC.DeliverSubject))
{
throw JsSubConsumerAlreadyConfiguredAsPull.Instance();
}

if (string.IsNullOrWhiteSpace(serverCC.DeliverGroup)) {
if (string.IsNullOrWhiteSpace(serverCC.DeliverGroup))
{
// lookedUp was null/empty, means existing consumer is not a queue consumer
if (qgroup == null) {
if (qgroup == null)
{
// ok fine, no queue requested and the existing consumer is also not a queue consumer
// we must check if the consumer is in use though
if (serverInfo.PushBound) {
if (serverInfo.PushBound)
{
throw JsSubConsumerAlreadyBound.Instance();
}
}
else { // else they requested a queue but this durable was not configured as queue
else
{ // else they requested a queue but this durable was not configured as queue
throw JsSubExistingConsumerNotQueue.Instance();
}
}
else if (qgroup == null) {
else if (qgroup == null)
{
throw JsSubExistingConsumerIsQueue.Instance();
}
else if (!serverCC.DeliverGroup.Equals(qgroup)) {
else if (!serverCC.DeliverGroup.Equals(qgroup))
{
throw JsSubExistingQueueDoesNotMatchRequestedQueue.Instance();
}

Expand Down
Loading