diff --git a/README.md b/README.md index 11d814f4f..a05e86e9a 100755 --- a/README.md +++ b/README.md @@ -972,51 +972,54 @@ In addition to some generic validation messages for values in builders, there ar * Consumer creation * Object Store operations -| Name | Group | Code | Description | -|----------------------------------------------|-------|-------|-----------------------------------------------------------------------------------------------------| -| JsSoDurableMismatch | SO | 90101 | Builder durable must match the consumer configuration durable if both are provided. | -| JsSoDeliverGroupMismatch | SO | 90102 | Builder deliver group must match the consumer configuration deliver group if both are provided. | -| JsSoDeliverSubjectMismatch | SO | 90103 | Builder deliver subject must match the consumer configuration deliver subject if both are provided. | -| JsSoOrderedNotAllowedWithBind | SO | 90104 | Bind is not allowed with an ordered consumer. | -| JsSoOrderedNotAllowedWithDeliverGroup | SO | 90105 | Deliver group is not allowed with an ordered consumer. | -| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. | -| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. | -| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. | -| JsSoOrderedRequiresMaxDeliverOfOne | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. | -| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. | -| JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. | -| JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. | -| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. | -| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. | -| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. | -| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. | -| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. | -| JsSubFcHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. | -| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. | -| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. | -| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. | -| _removed_ | SUB | 90010 | | -| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. | -| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. | -| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. | -| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. | -| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. | -| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. | -| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. | -| JsSubOrderedNotAllowOnQueues | SUB | 90018 | Ordered consumer not allowed on queues. | -| JsSubPushCantHaveMaxBatch | SUB | 90019 | Push subscriptions cannot supply max batch. | -| JsSubPushCantHaveMaxBytes | SUB | 90020 | Push subscriptions cannot supply max bytes. | -| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. | -| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. | -| OsObjectNotFound | OS | 90201 | The object was not found. | -| OsObjectIsDeleted | OS | 90202 | The object is deleted. | -| OsObjectAlreadyExists | OS | 90203 | An object with that name already exists. | -| OsCantLinkToLink | OS | 90204 | A link cannot link to another link. | -| OsGetDigestMismatch | OS | 90205 | Digest does not match meta data. | -| OsGetChunksMismatch | OS | 90206 | Number of chunks does not match meta data. | -| OsGetSizeMismatch | OS | 90207 | Total size does not match meta data. | -| OsGetLinkToBucket | OS | 90208 | Cannot get object, it is a link to a bucket. | -| OsLinkNotAllowOnPut | OS | 90209 | Link not allowed in metadata when putting an object. | +| Name | Group | Code | Description | +|----------------------------------------------|-------|-------|----------------------------------------------------------------------------------------------------------------| +| JsSoDurableMismatch | SO | 90101 | Builder durable must match the consumer configuration durable if both are provided. | +| JsSoDeliverGroupMismatch | SO | 90102 | Builder deliver group must match the consumer configuration deliver group if both are provided. | +| JsSoDeliverSubjectMismatch | SO | 90103 | Builder deliver subject must match the consumer configuration deliver subject if both are provided. | +| JsSoOrderedNotAllowedWithBind | SO | 90104 | Bind is not allowed with an ordered consumer. | +| JsSoOrderedNotAllowedWithDeliverGroup | SO | 90105 | Deliver group is not allowed with an ordered consumer. | +| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. | +| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. | +| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. | +| JsSoOrderedRequiresMaxDeliverOfOne | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. | +| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. | +| JsSoOrderedMemStorageNotSuppliedOrTrue | SO | 90111 | Mem Storage must be true if supplied. | +| JsSoOrderedReplicasNotSuppliedOrOne | SO | 90112 | Replicas must be 1 if supplied. | +| JsSoNameOrDurableRequiredForBind | SO | 90113 | Name or Durable required for Bind. | +| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. | +| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. | +| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. | +| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. | +| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. | +| JsSubFcHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. | +| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. | +| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. | +| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. | +| _removed_ | SUB | 90010 | | +| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. | +| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. | +| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. | +| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. | +| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. | +| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. | +| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. | +| JsSubOrderedNotAllowOnQueues | SUB | 90018 | Ordered consumer not allowed on queues. | +| JsSubPushCantHaveMaxBatch | SUB | 90019 | Push subscriptions cannot supply max batch. | +| JsSubPushCantHaveMaxBytes | SUB | 90020 | Push subscriptions cannot supply max bytes. | +| JsSubSubjectNeededToLookupStream | SUB | 90022 | Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject. | +| JsConsumerCreate290NotAvailable | CON | 90301 | Name field not valid when v2.9.0 consumer create api is not available. | +| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. | +| JsMultipleFilterSubjects210NotAvailable | CON | 90303 | Multiple filter subjects not available until server version 2.10.0. | +| OsObjectNotFound | OS | 90201 | The object was not found. | +| OsObjectIsDeleted | OS | 90202 | The object is deleted. | +| OsObjectAlreadyExists | OS | 90203 | An object with that name already exists. | +| OsCantLinkToLink | OS | 90204 | A link cannot link to another link. | +| OsGetDigestMismatch | OS | 90205 | Digest does not match meta data. | +| OsGetChunksMismatch | OS | 90206 | Number of chunks does not match meta data. | +| OsGetSizeMismatch | OS | 90207 | Total size does not match meta data. | +| OsGetLinkToBucket | OS | 90208 | Cannot get object, it is a link to a bucket. | +| OsLinkNotAllowOnPut | OS | 90209 | Link not allowed in metadata when putting an object. | ### Message Acknowledgements diff --git a/src/NATS.Client/Exceptions.cs b/src/NATS.Client/Exceptions.cs index 33e40ded2..2bf7d1b7b 100644 --- a/src/NATS.Client/Exceptions.cs +++ b/src/NATS.Client/Exceptions.cs @@ -245,6 +245,8 @@ public sealed class ClientExDetail public static readonly ClientExDetail JsSubOrderedNotAllowOnQueues = new ClientExDetail(Sub, 90018, "Ordered consumer not allowed on queues."); public static readonly ClientExDetail JsSubPushCantHaveMaxBatch = new ClientExDetail(Sub, 90019, "Push subscriptions cannot supply max batch."); public static readonly ClientExDetail JsSubPushCantHaveMaxBytes = new ClientExDetail(Sub, 90020, "Push subscriptions cannot supply max bytes."); + public static readonly ClientExDetail JsSubSubjectNeededToLookupStream = new ClientExDetail(Sub, 90022, "Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject."); + /* Not used in this client. */ // public static readonly ClientExDetail JsSubPushAsyncCantSetPending = new ClientExDetail(Sub, 90021, "Pending limits must be set directly on the dispatcher."); public static readonly ClientExDetail JsSoDurableMismatch = new ClientExDetail(So, 90101, "Builder durable must match the consumer configuration durable if both are provided."); @@ -259,6 +261,7 @@ public sealed class ClientExDetail public static readonly ClientExDetail JsSoNameMismatch = new ClientExDetail(So, 90110, "Builder name must match the consumer configuration name if both are provided."); public static readonly ClientExDetail JsSoOrderedMemStorageNotSuppliedOrTrue = new ClientExDetail(So, 90111, "Mem Storage must be true if supplied."); public static readonly ClientExDetail JsSoOrderedReplicasNotSuppliedOrOne = new ClientExDetail(So, 90112, "Replicas must be 1 if supplied."); + public static readonly ClientExDetail JsSoNameOrDurableRequiredForBind = new ClientExDetail(So, 90113, "Name or Durable required for Bind."); public static readonly ClientExDetail OsObjectNotFound = new ClientExDetail(Os, 90201, "The object was not found."); public static readonly ClientExDetail OsObjectIsDeleted = new ClientExDetail(Os, 90202, "The object is deleted."); @@ -272,7 +275,8 @@ public sealed class ClientExDetail public static readonly ClientExDetail JsConsumerCreate290NotAvailable = new ClientExDetail(Con, 90301, "Name field not valid when v2.9.0 consumer create api is not available."); public static readonly ClientExDetail JsConsumerNameDurableMismatch = new ClientExDetail(Con, 90302, "Name must match durable if both are supplied."); - + public static readonly ClientExDetail JsMultipleFilterSubjects210NotAvailable = new ClientExDetail(Con, 90303, "Multiple filter subjects not available until server version 2.10.0."); + private const string Sub = "SUB"; private const string So = "SO"; private const string Os = "OS"; diff --git a/src/NATS.Client/Internals/Validator.cs b/src/NATS.Client/Internals/Validator.cs index 4acee35f1..79db2c3ff 100644 --- a/src/NATS.Client/Internals/Validator.cs +++ b/src/NATS.Client/Internals/Validator.cs @@ -45,30 +45,83 @@ internal static void Required(IDictionary d, string } } - internal static string ValidateSubject(string s, bool required) - { - return ValidateSubject(s, "Subject", required, false); + /* + cannot contain spaces \r \n \t + cannot start or end with subject token delimiter . + some things don't allow it to end greater + */ + public static string ValidateSubjectTerm(string subject, string label, bool required) + { + Tuple t = IsValidSubjectTerm(subject, label, required); + if (t.Item1) + { + return t.Item2; + } + throw new ArgumentException(t.Item2); } - - public static string ValidateSubject(string subject, string label, bool required, bool cantEndWithGt) { - if (EmptyAsNull(subject) == null) { + + /* + * If is valid, tuple item1 is true and item2 is the subject + * If is not valid, tuple item1 is false and item2 is the error message + */ + internal static Tuple IsValidSubjectTerm(string subject, string label, bool required) { + subject = EmptyAsNull(subject); + if (subject == null) { if (required) { - throw new ArgumentException($"{label} cannot be null or empty."); + return new Tuple(false, $"{label} cannot be null or empty."); } - return null; + return new Tuple(true, null); + } + if (subject.EndsWith(".")) { + return new Tuple(false, $"{label} cannot end with '.'"); } + string[] segments = subject.Split('.'); - for (int x = 0; x < segments.Length; x++) { - string segment = segments[x]; - if (segment.Equals(">")) { - if (cantEndWithGt || x != segments.Length - 1) { // if it can end with gt, gt must be last segment - throw new ArgumentException(label + " cannot contain '>'"); + for (int seg = 0; seg < segments.Length; seg++) { + string segment = segments[seg]; + int sl = segment.Length; + if (sl == 0) { + if (seg == 0) { + return new Tuple(false, $"{label} cannot start with '.'"); + } + return new Tuple(false, $"{label} segment cannot be empty"); + } + else { + for (int m = 0; m < sl; m++) { + char c = segment[m]; + switch (c) { + case ' ': + case '\r': + case '\n': + case '\t': + return new Tuple(false, $"{label} cannot contain space, tab, carriage return or linefeed character"); + case '*': + if (sl != 1) { + return new Tuple(false, $"{label} wildcard improperly placed."); + } + break; + case '>': + if (sl != 1 || seg != segments.Length - 1) { + return new Tuple(false, $"{label} wildcard improperly placed."); + } + break; + } } - } - else if (!segment.Equals("*") && NotPrintable(segment)) { - throw new ArgumentException(label + " must be printable characters only."); } } + return new Tuple(true, subject); + } + + public static string ValidateSubject(string s, bool required) + { + return ValidateSubject(s, "Subject", required, false); + } + + public static string ValidateSubject(string subject, string label, bool required, bool cantEndWithGt) { + subject = ValidateSubjectTerm(subject, label, required); + if (subject != null && cantEndWithGt && subject.EndsWith(".>")) { + throw new ArgumentException($"{label} last segment cannot be '>'"); + } return subject; } @@ -444,7 +497,7 @@ internal static long ValidateNotNegative(long l, string label) { // ---------------------------------------------------------------------------------------------------- // Helpers // ---------------------------------------------------------------------------------------------------- - + [Obsolete("This property is obsolete. use string.IsNullOrWhiteSpace(string) instead.", false)] public static bool NullOrEmpty(string s) { return string.IsNullOrWhiteSpace(s); @@ -604,13 +657,28 @@ public static bool NotPrintableOrHasWildGtDollar(string s) { public static string EmptyAsNull(string s) { - return NullOrEmpty(s) ? null : s; + return string.IsNullOrWhiteSpace(s) ? null : s; } public static string EmptyOrNullAs(string s, string ifEmpty) { - return NullOrEmpty(s) ? ifEmpty : s; + return string.IsNullOrWhiteSpace(s) ? ifEmpty : s; + } + + public static IList EmptyAsNull(IList list) + { + return EmptyOrNull(list) ? null : list; + } + + public static bool EmptyOrNull(IList list) + { + return list == null || list.Count == 0; } + public static bool EmptyOrNull(TSource[] list) + { + return list == null || list.Length == 0; + } + public static bool ZeroOrLtMinus1(long l) { return l == 0 || l < -1; @@ -663,7 +731,7 @@ public static bool IsSemVer(string s) return Regex.IsMatch(s, SemVerPattern); } - public static bool SequenceEqual(IList l1, IList l2, bool nullSecondEqualsEmptyFirst = true) + public static bool SequenceEqual(IList l1, IList l2, bool nullSecondEqualsEmptyFirst = true) { if (l1 == null) { @@ -678,6 +746,29 @@ public static bool SequenceEqual(IList l1, IList l2, return l1.SequenceEqual(l2); } + // This function tests filter subject equivalency + // It does not care what order and also assumes that there are no duplicates. + // From the server: consumer subject filters cannot overlap [10138] + public static bool ConsumerFilterSubjectsAreEquivalent(IList l1, IList l2) + { + if (l1 == null || l1.Count == 0) + { + return l2 == null || l2.Count == 0; + } + + if (l2 == null || l1.Count != l2.Count) + { + return false; + } + + foreach (T t in l1) { + if (!l2.Contains(t)) { + return false; + } + } + return true; + } + public static bool DictionariesEqual(IDictionary d1, IDictionary d2) { if (d1 == d2) diff --git a/src/NATS.Client/JetStream/ApiConstants.cs b/src/NATS.Client/JetStream/ApiConstants.cs index 8fd07f55d..3a7181a74 100644 --- a/src/NATS.Client/JetStream/ApiConstants.cs +++ b/src/NATS.Client/JetStream/ApiConstants.cs @@ -69,6 +69,7 @@ internal static class ApiConstants internal const string External = "external"; internal const string Filter = "filter"; internal const string FilterSubject = "filter_subject"; + internal const string FilterSubjects = "filter_subjects"; internal const string FirstSequence = "first_seq"; internal const string FirstTs = "first_ts"; internal const string FlowControl = "flow_control"; diff --git a/src/NATS.Client/JetStream/ConsumerConfiguration.cs b/src/NATS.Client/JetStream/ConsumerConfiguration.cs index 02be2cf65..57d1bb285 100644 --- a/src/NATS.Client/JetStream/ConsumerConfiguration.cs +++ b/src/NATS.Client/JetStream/ConsumerConfiguration.cs @@ -65,7 +65,6 @@ public sealed class ConsumerConfiguration : JsonSerializable public string Name { get; } public string DeliverSubject { get; } public string DeliverGroup { get; } - public string FilterSubject { get; } public string SampleFrequency { get; } public DateTime StartTime { get; } public Duration AckWait { get; } @@ -75,8 +74,6 @@ public sealed class ConsumerConfiguration : JsonSerializable public ulong StartSeq => GetOrUnset(_startSeq); public int MaxDeliver => GetOrUnset(_maxDeliver); - [Obsolete("This property is obsolete. Use RateLimitBps.", false)] - public long RateLimit => (long)GetOrUnset(_rateLimitBps); public ulong RateLimitBps => GetOrUnset(_rateLimitBps); public int MaxAckPending => GetOrUnset(_maxAckPending); public int MaxPullWaiting => GetOrUnset(_maxPullWaiting); @@ -89,6 +86,17 @@ public sealed class ConsumerConfiguration : JsonSerializable public IList Backoff => _backoff ?? new List(); public IDictionary Metadata => _metadata ?? new Dictionary(); + internal IList _filterSubjects; + public string FilterSubject => + _filterSubjects == null || _filterSubjects.Count != 1 ? null : _filterSubjects[0]; + + public IList FilterSubjects => _filterSubjects; + + public bool HasMultipleFilterSubjects => _filterSubjects != null && FilterSubjects.Count > 1; + + [Obsolete("This property is obsolete. Use RateLimitBps.", false)] + public long RateLimit => (long)GetOrUnset(_rateLimitBps); + internal ConsumerConfiguration(string json) : this(JSON.Parse(json)) {} internal ConsumerConfiguration(JSONNode ccNode) @@ -102,7 +110,6 @@ internal ConsumerConfiguration(JSONNode ccNode) Name = ccNode[ApiConstants.Name].Value; DeliverSubject = ccNode[ApiConstants.DeliverSubject].Value; DeliverGroup = ccNode[ApiConstants.DeliverGroup].Value; - FilterSubject = ccNode[ApiConstants.FilterSubject].Value; SampleFrequency = ccNode[ApiConstants.SampleFreq].Value; StartTime = AsDate(ccNode[ApiConstants.OptStartTime]); @@ -125,6 +132,16 @@ internal ConsumerConfiguration(JSONNode ccNode) _backoff = DurationList(ccNode, ApiConstants.Backoff, true); _metadata = StringStringDictionary(ccNode, ApiConstants.Metadata, true); + string tempFs = EmptyAsNull(ccNode[ApiConstants.FilterSubject].Value); + if (tempFs == null) + { + _filterSubjects = EmptyAsNull(StringList(ccNode, ApiConstants.FilterSubjects)); + } + else + { + _filterSubjects = new List(); + _filterSubjects.Add(tempFs); + } } private ConsumerConfiguration(ConsumerConfigurationBuilder builder) @@ -138,7 +155,6 @@ private ConsumerConfiguration(ConsumerConfigurationBuilder builder) Name = builder._name; DeliverSubject = builder._deliverSubject; DeliverGroup = builder._deliverGroup; - FilterSubject = builder._filterSubject; SampleFrequency = builder._sampleFrequency; StartTime = builder._startTime; @@ -161,6 +177,7 @@ private ConsumerConfiguration(ConsumerConfigurationBuilder builder) _backoff = builder._backoff; _metadata = builder._metadata; + _filterSubjects = builder._filterSubjects; } public override JSONNode ToJsonNode() @@ -178,7 +195,6 @@ public override JSONNode ToJsonNode() AddField(o, ApiConstants.AckPolicy, AckPolicy.GetString()); AddField(o, ApiConstants.AckWait, AckWait); AddField(o, ApiConstants.MaxDeliver, MaxDeliver); - AddField(o, ApiConstants.FilterSubject, FilterSubject); AddField(o, ApiConstants.ReplayPolicy, ReplayPolicy.GetString()); AddField(o, ApiConstants.SampleFreq, SampleFrequency); AddField(o, ApiConstants.RateLimitBps, RateLimitBps); @@ -195,6 +211,17 @@ public override JSONNode ToJsonNode() AddField(o, ApiConstants.NumReplicas, NumReplicas); AddField(o, ApiConstants.MemStorage, MemStorage); AddField(o, ApiConstants.Metadata, Metadata); + if (_filterSubjects != null) + { + if (_filterSubjects.Count == 1) + { + AddField(o, ApiConstants.FilterSubject, _filterSubjects[0]); + } + else + { + AddField(o, ApiConstants.FilterSubjects, _filterSubjects); + } + } return o; } @@ -226,7 +253,6 @@ internal IList GetChanges(ConsumerConfiguration server) RecordWouldBeChange(StartTime, server.StartTime, "StartTime", changes); - RecordWouldBeChange(FilterSubject, server.FilterSubject, "FilterSubject", changes); RecordWouldBeChange(Description, server.Description, "Description", changes); RecordWouldBeChange(SampleFrequency, server.SampleFrequency, "SampleFrequency", changes); RecordWouldBeChange(DeliverSubject, server.DeliverSubject, "DeliverSubject", changes); @@ -234,6 +260,7 @@ internal IList GetChanges(ConsumerConfiguration server) if (_backoff != null && !SequenceEqual(_backoff, server._backoff)) { changes.Add("Backoff"); } if (_metadata != null && !DictionariesEqual(_metadata, server._metadata)) { changes.Add("Metadata"); } + if (_filterSubjects != null && !SequenceEqual(_filterSubjects, server._filterSubjects)) { changes.Add("FilterSubjects"); } return changes; } @@ -311,7 +338,6 @@ public sealed class ConsumerConfigurationBuilder internal string _name; internal string _deliverSubject; internal string _deliverGroup; - internal string _filterSubject; internal string _sampleFrequency; internal DateTime _startTime; @@ -333,6 +359,7 @@ public sealed class ConsumerConfigurationBuilder internal bool? _memStorage; internal IList _backoff; internal Dictionary _metadata; + internal IList _filterSubjects; public ConsumerConfigurationBuilder() {} @@ -349,7 +376,6 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc) _name = cc.Name; _deliverSubject = cc.DeliverSubject; _deliverGroup = cc.DeliverGroup; - _filterSubject = cc.FilterSubject; _sampleFrequency = cc.SampleFrequency; _startTime = cc.StartTime; @@ -383,6 +409,11 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc) _metadata[key] = cc.Metadata[key]; } } + + if (cc.FilterSubjects != null) + { + _filterSubjects = new List(cc.FilterSubjects); + } } /// @@ -521,12 +552,59 @@ public ConsumerConfigurationBuilder WithMaxDeliver(long? maxDeliver) /// /// Sets the filter subject of the ConsumerConfiguration. + /// Replaces any other filter subjects set in the builder /// /// the filter subject /// The ConsumerConfigurationBuilder public ConsumerConfigurationBuilder WithFilterSubject(string filterSubject) { - _filterSubject = EmptyAsNull(filterSubject); + if (string.IsNullOrWhiteSpace(filterSubject)) + { + this._filterSubjects = null; + } + else { + this._filterSubjects = new List(); + this._filterSubjects.Add(filterSubject); + } + return this; + } + + /// + /// Sets the filter subject of the ConsumerConfiguration. + /// Replaces any other filter subjects set in the builder + /// + /// one or more filter subjects + /// The ConsumerConfigurationBuilder + public ConsumerConfigurationBuilder WithFilterSubjects(params string[] filterSubjects) + { + if (filterSubjects == null) + { + this._filterSubjects = null; + return this; + } + + return WithFilterSubjects(new List(filterSubjects)); + } + + /// + /// Sets the filter subject of the ConsumerConfiguration. + /// Replaces any other filter subjects set in the builder + /// + /// one or more filter subjects + /// The ConsumerConfigurationBuilder + public ConsumerConfigurationBuilder WithFilterSubjects(IList filterSubjects) + { + this._filterSubjects = new List(); + if (filterSubjects != null) { + foreach (string fs in filterSubjects) { + if (!string.IsNullOrWhiteSpace(fs)) { + this._filterSubjects.Add(fs); + } + } + } + if (this._filterSubjects.Count == 0) { + this._filterSubjects = null; + } return this; } diff --git a/src/NATS.Client/JetStream/JetStream.cs b/src/NATS.Client/JetStream/JetStream.cs index 0761d6f56..8e3f31cdf 100644 --- a/src/NATS.Client/JetStream/JetStream.cs +++ b/src/NATS.Client/JetStream/JetStream.cs @@ -164,26 +164,33 @@ internal delegate MessageManager MessageManagerFactory( (conn, js, stream, so, cc, queueMode, syncMode) => new PullOrderedMessageManager(conn, js, stream, so, cc, syncMode); - Subscription CreateSubscription(string subject, string queueName, - EventHandler userHandler, bool autoAck, - PushSubscribeOptions pushSubscribeOptions, - PullSubscribeOptions pullSubscribeOptions) + internal Subscription CreateSubscription(string userSubscribeSubject, + PushSubscribeOptions pushSubscribeOptions, + PullSubscribeOptions pullSubscribeOptions, + string queueName, + EventHandler userHandler, + bool autoAck) { - // 1. Prepare for all the validation + // Parameter notes. For those relating to the callers, you can see all the callers further down in this source file. + // - pull subscribe callers guarantee that pullSubscribeOptions is not null + // - qgroup is always null with pull callers + // - callers only ever provide one of the subscribe options + + // 1. Initial prep and validation bool isPullMode = pullSubscribeOptions != null; SubscribeOptions so; string stream; - string qgroup; ConsumerConfiguration userCC; + string settledDeliverGroup = null; // push might set this - if (isPullMode) { + if (isPullMode) + { so = pullSubscribeOptions; // options must have already been checked to be non-null stream = pullSubscribeOptions.Stream; userCC = so.ConsumerConfiguration; - qgroup = null; // just to make compiler happy both paths set variable ValidateNotSupplied(userCC.DeliverGroup, JsSubPullCantHaveDeliverGroup); ValidateNotSupplied(userCC.DeliverSubject, JsSubPullCantHaveDeliverSubject); } @@ -207,29 +214,62 @@ Subscription CreateSubscription(string subject, string queueName, } // figure out the queue name - qgroup = ValidateMustMatchIfBothSupplied(userCC.DeliverGroup, queueName, JsSubQueueDeliverGroupMismatch); - if (so.Ordered && qgroup != null) { + settledDeliverGroup = ValidateMustMatchIfBothSupplied(userCC.DeliverGroup, queueName, JsSubQueueDeliverGroupMismatch); + if (so.Ordered && settledDeliverGroup != null) { throw JsSubOrderedNotAllowOnQueues.Instance(); } } - // 2A. Flow Control / heartbeat not always valid + // 1B. Flow Control / heartbeat not always valid if (userCC.FlowControl || userCC.IdleHeartbeat != null && userCC.IdleHeartbeat.Millis > 0) { if (isPullMode) { throw JsSubFcHbNotValidPull.Instance(); } - if (qgroup != null) { + if (settledDeliverGroup != null) { throw JsSubFcHbNotValidQueue.Instance(); } } - // 2B. Did they tell me what stream? No? look it up. - if (string.IsNullOrWhiteSpace(stream)) { - stream = LookupStreamBySubject(subject); - if (stream == null) { + // 2. figure out user provided subjects and prepare the settledFilterSubjects + userSubscribeSubject = EmptyAsNull(userSubscribeSubject); + IList settledFilterSubjects = new List(); + if (userCC.FilterSubjects == null) // empty filterSubjects gives null + { + // userCC.filterSubjects empty, populate settledFilterSubjects w/userSubscribeSubject if possible + if (userSubscribeSubject != null) { + settledFilterSubjects.Add(userSubscribeSubject); + } + } + else { + // userCC.filterSubjects not empty, validate them + foreach (string fs in userCC.FilterSubjects) + { + settledFilterSubjects.Add(fs); + } + // If userSubscribeSubject is provided it must be one of the filter subjects. + if (userSubscribeSubject != null && !settledFilterSubjects.Contains(userSubscribeSubject)) { + throw JsSubSubjectDoesNotMatchFilter.Instance(); + } + } + + // 3. Did they tell me what stream? No? look it up. + string settledStream; + if (string.IsNullOrWhiteSpace(stream)) + { + if (settledFilterSubjects.Count == 0) + { + throw JsSubSubjectNeededToLookupStream.Instance(); + } + settledStream = LookupStreamBySubject(settledFilterSubjects[0]); + if (settledStream == null) + { throw JsSubNoMatchingStreamForSubject.Instance(); } } + else + { + settledStream = stream; + } ConsumerConfiguration serverCC = null; string consumerName = userCC.Durable; @@ -239,10 +279,11 @@ Subscription CreateSubscription(string subject, string queueName, } string inboxDeliver = userCC.DeliverSubject; - // 3. Does this consumer already exist? - if (consumerName != null) + // 4. Does this consumer already exist? FastBind bypasses the lookup; + // the dev better know what they are doing... + if (!so.FastBind && consumerName != null) { - ConsumerInfo serverInfo = LookupConsumerInfo(stream, consumerName); + ConsumerInfo serverInfo = LookupConsumerInfo(settledStream, consumerName); if (serverInfo != null) { // the consumer for that durable already exists serverCC = serverInfo.ConsumerConfiguration; @@ -255,6 +296,7 @@ Subscription CreateSubscription(string subject, string queueName, throw JsSubExistingConsumerCannotBeModified.Instance($"[{string.Join(",", changes)}]"); } + // deliver subject must be null/empty for pull, defined for push if (isPullMode) { if (!string.IsNullOrWhiteSpace(serverCC.DeliverSubject)) @@ -270,7 +312,7 @@ Subscription CreateSubscription(string subject, string queueName, if (string.IsNullOrWhiteSpace(serverCC.DeliverGroup)) { // lookedUp was null/empty, means existing consumer is not a queue consumer - if (qgroup == null) + if (settledDeliverGroup == null) { // ok fine, no queue requested and the existing consumer is also not a queue consumer // we must check if the consumer is in use though @@ -284,21 +326,27 @@ Subscription CreateSubscription(string subject, string queueName, throw JsSubExistingConsumerNotQueue.Instance(); } } - else if (qgroup == null) + else if (settledDeliverGroup == null) { throw JsSubExistingConsumerIsQueue.Instance(); } - else if (!serverCC.DeliverGroup.Equals(qgroup)) + else if (!serverCC.DeliverGroup.Equals(settledDeliverGroup)) { throw JsSubExistingQueueDoesNotMatchRequestedQueue.Instance(); } - // durable already exists, make sure the filter subject matches - if (string.IsNullOrWhiteSpace(subject)) + // consumer already exists, make sure the filter subject matches + // subscribeSubject, if supplied came from the user directly + // or in the userCC or might not have been in either place + if (settledFilterSubjects.Count == 0) { - subject = userCC.FilterSubject; + // still also might be null, which the server treats as > + if (serverCC.FilterSubjects != null) + { + settledFilterSubjects = serverCC.FilterSubjects; + } } - else if (!IsFilterMatch(subject, serverCC.FilterSubject, stream)) + else if (!ConsumerFilterSubjectsAreEquivalent(settledFilterSubjects, serverCC.FilterSubjects)) { throw JsSubSubjectDoesNotMatchFilter.Instance(); } @@ -310,42 +358,51 @@ Subscription CreateSubscription(string subject, string queueName, } } - // 4. If pull or no deliver subject (inbox) provided or found, make an inbox. + // 5. If pull or no deliver subject (inbox) provided or found, make an inbox. + string settledInboxDeliver; if (isPullMode) { - inboxDeliver = Conn.NewInbox() + ".*"; + settledInboxDeliver = Conn.NewInbox() + ".*"; } else if (string.IsNullOrWhiteSpace(inboxDeliver)) { - inboxDeliver = Conn.NewInbox(); + settledInboxDeliver = Conn.NewInbox(); + } + else + { + settledInboxDeliver = inboxDeliver; } - // 5. If consumer does not exist, create and settle on the config. Name will have to wait + // 6. If consumer does not exist, create and settle on the config. Name will have to wait // If the consumer exists, I know what the settled info is - if (serverCC == null) { + ConsumerConfiguration settledCC; + string settledConsumerName; + if (so.FastBind || serverCC != null) + { + settledCC = serverCC; + settledConsumerName = so.Name; + } + else + { ConsumerConfiguration.ConsumerConfigurationBuilder ccBuilder = ConsumerConfiguration.Builder(userCC); // Pull mode doesn't maintain a deliver subject. It's actually an error if we send it. if (!isPullMode) { - ccBuilder.WithDeliverSubject(inboxDeliver); + ccBuilder.WithDeliverSubject(settledInboxDeliver); } - if (string.IsNullOrWhiteSpace(userCC.FilterSubject)) - { - ccBuilder.WithFilterSubject(subject); - } + // userCC.filterSubjects might have originally been empty + // but there might have been a userSubscribeSubject, + // so this makes sure it's resolved either way + ccBuilder.WithFilterSubjects(settledFilterSubjects); - if (string.IsNullOrWhiteSpace(userCC.DeliverGroup) && !string.IsNullOrWhiteSpace(qgroup)) - { - ccBuilder.WithDeliverGroup(qgroup); - } + ccBuilder.WithDeliverGroup(settledDeliverGroup); - // createOrUpdateConsumer can fail for security reasons, maybe other reasons? - serverCC = ccBuilder.Build(); - consumerName = null; + settledCC = ccBuilder.Build(); + settledConsumerName = null; // the server will give us a name } - // 6. create the subscription + // 7. create the subscription bool syncMode = userHandler == null; MessageManager mm; Connection.CreateSyncSubscriptionDelegate syncSubDelegate = null; @@ -353,19 +410,21 @@ Subscription CreateSubscription(string subject, string queueName, if (isPullMode) { MessageManagerFactory mmFactory = so.Ordered ? _pullOrderedMessageManagerFactory : _pullMessageManagerFactory; - mm = mmFactory((Connection)Conn, this, stream, so, serverCC, qgroup != null, syncMode); + mm = mmFactory((Connection)Conn, this, settledStream, so, settledCC, false, syncMode); if (syncMode) { syncSubDelegate = (dConn, dSubject, dQueue) => { - return new JetStreamPullSubscription(dConn, dSubject, this, stream, consumerName, inboxDeliver, mm); + return new JetStreamPullSubscription(dConn, dSubject, this, + settledStream, settledConsumerName, settledInboxDeliver, mm); }; } else { asyncSubDelegate = (dConn, dSubject, dQueue) => { - JetStreamPullAsyncSubscription asub = new JetStreamPullAsyncSubscription(dConn, dSubject, this, stream, consumerName, inboxDeliver, mm); + JetStreamPullAsyncSubscription asub = new JetStreamPullAsyncSubscription(dConn, dSubject, this, + settledStream, settledConsumerName, settledInboxDeliver, mm); asub.SetPendingLimits(so.PendingMessageLimit, so.PendingByteLimit); return asub; }; @@ -374,12 +433,14 @@ Subscription CreateSubscription(string subject, string queueName, else { MessageManagerFactory mmFactory = so.Ordered ? _pushOrderedMessageManagerFactory : _pushMessageManagerFactory; - mm = mmFactory((Connection)Conn, this, stream, so, serverCC, qgroup != null, syncMode); + mm = mmFactory((Connection)Conn, this, settledStream, so, settledCC, settledDeliverGroup != null, syncMode); if (syncMode) { syncSubDelegate = (dConn, dSubject, dQueue) => { - JetStreamPushSyncSubscription ssub = new JetStreamPushSyncSubscription(dConn, dSubject, dQueue, this, stream, consumerName, inboxDeliver, mm); + JetStreamPushSyncSubscription ssub = + new JetStreamPushSyncSubscription(dConn, dSubject, dQueue, this, + settledStream, settledConsumerName, settledInboxDeliver, mm); ssub.SetPendingLimits(so.PendingMessageLimit, so.PendingByteLimit); return ssub; }; @@ -388,7 +449,9 @@ Subscription CreateSubscription(string subject, string queueName, { asyncSubDelegate = (dConn, dSubject, dQueue) => { - JetStreamPushAsyncSubscription asub = new JetStreamPushAsyncSubscription(dConn, dSubject, dQueue, this, stream, consumerName, inboxDeliver, mm); + JetStreamPushAsyncSubscription asub = + new JetStreamPushAsyncSubscription(dConn, dSubject, dQueue, this, + settledStream, settledConsumerName, settledInboxDeliver, mm); asub.SetPendingLimits(so.PendingMessageLimit, so.PendingByteLimit); return asub; }; @@ -398,11 +461,11 @@ Subscription CreateSubscription(string subject, string queueName, Subscription sub; if (syncSubDelegate != null) { - sub = ((Connection)Conn).subscribeSync(inboxDeliver, queueName, syncSubDelegate); + sub = ((Connection)Conn).subscribeSync(settledInboxDeliver, settledDeliverGroup, syncSubDelegate); } else { - bool handlerAutoAck = autoAck && serverCC.AckPolicy != AckPolicy.None; + bool handlerAutoAck = autoAck && settledCC.AckPolicy != AckPolicy.None; EventHandler handler = (sender, args) => { if (mm.Manage(args.Message) == ManageResult.Message) @@ -414,15 +477,15 @@ Subscription CreateSubscription(string subject, string queueName, } } }; - sub = ((Connection)Conn).subscribeAsync(inboxDeliver, queueName, handler, asyncSubDelegate); + sub = ((Connection)Conn).subscribeAsync(settledInboxDeliver, settledDeliverGroup, handler, asyncSubDelegate); } - // 7. The consumer might need to be created, do it here - if (consumerName == null) + // 8. The consumer might need to be created, do it here + if (settledConsumerName == null) { try { - ConsumerInfo ci = AddOrUpdateConsumerInternal(stream, serverCC); + ConsumerInfo ci = CreateConsumerInternal(settledStream, settledCC); if (sub is JetStreamAbstractSyncSubscription syncSub) { syncSub.UpdateConsumer(ci.Name); @@ -489,77 +552,75 @@ private Boolean IsFilterMatch(string subscribeSubject, string filterSubject, str public IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOptions options) { + subject = ValidateSubject(subject, false); ValidateNotNull(options, "Pull Subscribe Options"); - ValidateSubject(subject, IsSubjectRequired(options)); - return (IJetStreamPullSubscription) CreateSubscription(subject, null, null, false, null, options); + return (IJetStreamPullSubscription) CreateSubscription(subject, null, options, null, null, false); } public IJetStreamPullAsyncSubscription PullSubscribeAsync(string subject, EventHandler handler, PullSubscribeOptions options) { - ValidateSubject(subject, IsSubjectRequired(options)); + subject = ValidateSubject(subject, false); ValidateNotNull(handler, "Handler"); ValidateNotNull(options, "Pull Subscribe Options"); - return (IJetStreamPullAsyncSubscription) CreateSubscription(subject, null, handler, false, null, options); + return (IJetStreamPullAsyncSubscription) CreateSubscription(subject, null, options, null, handler, false); } public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, EventHandler handler, bool autoAck) { - ValidateSubject(subject, true); + subject = ValidateSubject(subject, true); ValidateNotNull(handler, "Handler"); - return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, null, handler, autoAck, null, null); + return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, null, null, null, handler, autoAck); } public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler handler, bool autoAck) { - ValidateSubject(subject, true); + subject = ValidateSubject(subject, true); queue = EmptyAsNull(ValidateQueueName(queue, false)); ValidateNotNull(handler, "Handler"); - return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, queue, handler, autoAck, null, null); + return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, null, null, queue, handler, autoAck); } public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, EventHandler handler, bool autoAck, PushSubscribeOptions options) { - ValidateSubject(subject, IsSubjectRequired(options)); + subject = ValidateSubject(subject, false); ValidateNotNull(handler, "Handler"); - return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, null, handler, autoAck, options, null); + return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, options, null, null, handler, autoAck); } public IJetStreamPushAsyncSubscription PushSubscribeAsync(string subject, string queue, EventHandler handler, bool autoAck, PushSubscribeOptions options) { - ValidateSubject(subject, IsSubjectRequired(options)); + subject = ValidateSubject(subject, false); queue = EmptyAsNull(ValidateQueueName(queue, false)); ValidateNotNull(handler, "Handler"); - return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, queue, handler, autoAck, options, null); + return (IJetStreamPushAsyncSubscription) CreateSubscription(subject, options, null, queue, handler, autoAck); } public IJetStreamPushSyncSubscription PushSubscribeSync(string subject) { - ValidateSubject(subject, true); - return (IJetStreamPushSyncSubscription) CreateSubscription(subject, null, null, false, null, null); + subject = ValidateSubject(subject, true); + return (IJetStreamPushSyncSubscription) CreateSubscription(subject, null, null, null, null, false); } public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, PushSubscribeOptions options) { - ValidateSubject(subject, IsSubjectRequired(options)); - return (IJetStreamPushSyncSubscription) CreateSubscription(subject, null, null, false, options, null); + subject = ValidateSubject(subject, false); + return (IJetStreamPushSyncSubscription) CreateSubscription(subject, options, null, null, null, false); } public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string queue) { - ValidateSubject(subject, true); + subject = ValidateSubject(subject, true); queue = EmptyAsNull(ValidateQueueName(queue, false)); - return (IJetStreamPushSyncSubscription) CreateSubscription(subject, queue, null, false, null, null); + return (IJetStreamPushSyncSubscription) CreateSubscription(subject, null, null, queue, null, false); } public IJetStreamPushSyncSubscription PushSubscribeSync(string subject, string queue, PushSubscribeOptions options) { - ValidateSubject(subject, IsSubjectRequired(options)); + subject = ValidateSubject(subject, false); queue = EmptyAsNull(ValidateQueueName(queue, false)); - return (IJetStreamPushSyncSubscription) CreateSubscription(subject, queue, null, false, options, null); + return (IJetStreamPushSyncSubscription) CreateSubscription(subject, options, null, queue, null, false); } - private bool IsSubjectRequired(SubscribeOptions options) => options == null || !options.Bind; - public IStreamContext GetStreamContext(string streamName) { Validator.ValidateStreamName(streamName, true); diff --git a/src/NATS.Client/JetStream/JetStreamBase.cs b/src/NATS.Client/JetStream/JetStreamBase.cs index 43b94dd66..a71b3d177 100644 --- a/src/NATS.Client/JetStream/JetStreamBase.cs +++ b/src/NATS.Client/JetStream/JetStreamBase.cs @@ -39,12 +39,35 @@ public class JetStreamBase public IConnection Conn { get; } public int Timeout { get; } + private bool? _consumerCreate290Available; + private bool? _multipleSubjectFilter210Available; + + protected bool ConsumerCreate290Available() + { + if (!_consumerCreate290Available.HasValue) + { + _consumerCreate290Available = Conn.ServerInfo.IsSameOrNewerThanVersion("2.9.0") && + !JetStreamOptions.IsOptOut290ConsumerCreate; + } + return _consumerCreate290Available.Value; + } + + protected bool MultipleSubjectFilter210Available() + { + if (!_multipleSubjectFilter210Available.HasValue) + { + _multipleSubjectFilter210Available = Conn.ServerInfo.IsSameOrNewerThanVersion("2.9.99"); + } + return _multipleSubjectFilter210Available.Value; + } + protected JetStreamBase(IConnection connection, JetStreamOptions options) { Conn = connection; JetStreamOptions = options ?? JetStreamOptions.DefaultJsOptions; Prefix = JetStreamOptions.Prefix; Timeout = JetStreamOptions.RequestTimeout?.Millis ?? Conn.Opts.Timeout; + } internal static ServerInfo ServerInfoOrException(IConnection conn) @@ -67,27 +90,33 @@ internal ConsumerInfo GetConsumerInfoInternal(string streamName, string consumer return new ConsumerInfo(m, true); } - internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerConfiguration config) + internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfiguration config) { - bool consumerCreate290Available = ServerInfoOrException(Conn).IsSameOrNewerThanVersion("2.9.0") && !JetStreamOptions.IsOptOut290ConsumerCreate; - // ConsumerConfiguration validates that name and durable are the same if both are supplied. string consumerName = Validator.EmptyAsNull(config.Name); - if (consumerName != null && !consumerCreate290Available) + if (consumerName != null && !ConsumerCreate290Available()) { throw ClientExDetail.JsConsumerCreate290NotAvailable.Instance(); } - string durable = Validator.EmptyAsNull(config.Durable); + bool hasMultipleFilterSubjects = config.HasMultipleFilterSubjects; + + // seems strange that this could happen, but checking anyway... + if (hasMultipleFilterSubjects && !MultipleSubjectFilter210Available()) { + throw ClientExDetail.JsMultipleFilterSubjects210NotAvailable.Instance(); + } + + string durable = Validator.EmptyAsNull(config.Durable); string subj; - if (consumerCreate290Available) + if (ConsumerCreate290Available() && !hasMultipleFilterSubjects) { if (consumerName == null) { // if both consumerName and durable are null, generate a name - consumerName = durable == null ? GenerateConsumerName() : durable; + consumerName = durable ?? GenerateConsumerName(); } - string fs = Validator.EmptyAsNull(config.FilterSubject); + + string fs = config.FilterSubject; // we've already determined not multiple so this gives us 1 or null if (fs == null || fs.Equals(">")) { subj = string.Format(JetStreamConstants.JsapiConsumerCreateV290, streamName, consumerName); diff --git a/src/NATS.Client/JetStream/JetStreamManagement.cs b/src/NATS.Client/JetStream/JetStreamManagement.cs index ad9a3bcf4..9e29adcb6 100644 --- a/src/NATS.Client/JetStream/JetStreamManagement.cs +++ b/src/NATS.Client/JetStream/JetStreamManagement.cs @@ -87,7 +87,7 @@ public ConsumerInfo AddOrUpdateConsumer(string streamName, ConsumerConfiguration { Validator.ValidateStreamName(streamName, true); Validator.ValidateNotNull(config, nameof(config)); - return AddOrUpdateConsumerInternal(streamName, config); + return CreateConsumerInternal(streamName, config); } public bool DeleteConsumer(string streamName, string consumer) diff --git a/src/NATS.Client/JetStream/OrderedConsumerConfiguration.cs b/src/NATS.Client/JetStream/OrderedConsumerConfiguration.cs index 4f0d95825..6b8319d13 100644 --- a/src/NATS.Client/JetStream/OrderedConsumerConfiguration.cs +++ b/src/NATS.Client/JetStream/OrderedConsumerConfiguration.cs @@ -12,6 +12,7 @@ // limitations under the License. using System; +using System.Collections.Generic; using NATS.Client.Internals; namespace NATS.Client.JetStream @@ -20,13 +21,18 @@ public sealed class OrderedConsumerConfiguration { public const string DefaultFilterSubject = ">"; - public string FilterSubject { get; private set; } public DeliverPolicy? DeliverPolicy { get; private set; } public ulong StartSequence { get; private set; } public DateTime StartTime { get; private set; } public ReplayPolicy? ReplayPolicy { get; private set; } public bool? HeadersOnly { get; private set; } + public string FilterSubject => FilterSubjects.Count == 1 ? FilterSubjects[0] : null; + + public IList FilterSubjects { get; } + + public bool HasMultipleFilterSubjects => FilterSubjects != null && FilterSubjects.Count > 1; + /// /// OrderedConsumerConfiguration creation works like a builder. /// The builder supports chaining and will create a default set of options if @@ -35,7 +41,8 @@ public sealed class OrderedConsumerConfiguration public OrderedConsumerConfiguration() { StartSequence = ConsumerConfiguration.UlongUnset; - FilterSubject = DefaultFilterSubject; + FilterSubjects = new List(); + FilterSubjects.Add(DefaultFilterSubject); } /// @@ -44,10 +51,53 @@ public OrderedConsumerConfiguration() /// the filter subject /// Builder public OrderedConsumerConfiguration WithFilterSubject(string filterSubject) { - FilterSubject = Validator.EmptyOrNullAs(filterSubject, DefaultFilterSubject); + FilterSubjects.Clear(); + if (string.IsNullOrEmpty(filterSubject)) + { + FilterSubjects.Add(DefaultFilterSubject); + } + else + { + FilterSubjects.Add(filterSubject); + } + return this; + } + + /// + /// Sets the filter subject of the OrderedConsumerConfiguration. + /// + /// one or more filter subjects + /// The ConsumerConfigurationBuilder + public OrderedConsumerConfiguration WithFilterSubjects(params string[] filterSubjects) + { + return Validator.EmptyOrNull(filterSubjects) + ? WithFilterSubject(null) + : WithFilterSubjects(new List(filterSubjects)); + } + + + /// + /// Sets the filter subject of the OrderedConsumerConfiguration. + /// + /// one or more filter subjects + /// The ConsumerConfigurationBuilder + public OrderedConsumerConfiguration WithFilterSubjects(IList filterSubjects) + { + FilterSubjects.Clear(); + if (filterSubjects != null) { + foreach (string fs in filterSubjects) { + if (!string.IsNullOrWhiteSpace(fs)) { + FilterSubjects.Add(fs); + } + } + } + if (FilterSubjects.Count == 0) { + FilterSubjects.Add(DefaultFilterSubject); + } return this; } + /// /// Sets the delivery policy of the OrderedConsumerConfiguration. /// diff --git a/src/NATS.Client/JetStream/OrderedMessageManager.cs b/src/NATS.Client/JetStream/OrderedMessageManager.cs index f7ae62d26..52683375d 100644 --- a/src/NATS.Client/JetStream/OrderedMessageManager.cs +++ b/src/NATS.Client/JetStream/OrderedMessageManager.cs @@ -84,7 +84,7 @@ private void HandleErrorCondition() .WithStartTime(DateTime.MinValue) // clear start time in case it was originally set .Build(); - Js.AddOrUpdateConsumerInternal(Stream, userCc); + Js.CreateConsumerInternal(Stream, userCc); // 4. restart the manager. Startup(Sub); diff --git a/src/NATS.Client/JetStream/PullOrderedMessageManager.cs b/src/NATS.Client/JetStream/PullOrderedMessageManager.cs index 4ffbbe3f8..972cc24c0 100644 --- a/src/NATS.Client/JetStream/PullOrderedMessageManager.cs +++ b/src/NATS.Client/JetStream/PullOrderedMessageManager.cs @@ -83,7 +83,7 @@ private void HandleErrorCondition() { .WithStartSequence(LastStreamSeq + 1) .WithStartTime(DateTime.MinValue) // clear start time in case it was originally set .Build(); - Js.AddOrUpdateConsumerInternal(Stream, userCC); + Js.CreateConsumerInternal(Stream, userCC); // 4. restart the manager. Startup(Sub); diff --git a/src/NATS.Client/JetStream/PullSubscribeOptions.cs b/src/NATS.Client/JetStream/PullSubscribeOptions.cs index 3182c99ac..185a78deb 100644 --- a/src/NATS.Client/JetStream/PullSubscribeOptions.cs +++ b/src/NATS.Client/JetStream/PullSubscribeOptions.cs @@ -53,6 +53,23 @@ protected override PullSubscribeOptionsSubscribeOptionsBuilder GetThis() return this; } + /// + /// Specify binding to an existing consumer via name. + /// The client does not validate that the provided consumer configuration + /// is consistent with the server version or that + /// consumer type (push versus pull) matches the subscription type. + /// An inconsistent consumer configuration for instance can result in + /// receiving messages from unexpected subjects. + /// A consumer type mismatch will result in an error from the server. + /// + /// whether to fast bind or not + /// The builder + public PullSubscribeOptionsSubscribeOptionsBuilder WithFastBind(bool isFastBind) + { + _fastBind = isFastBind; + return GetThis(); + } + /// /// Builds the PullSubscribeOptions /// diff --git a/src/NATS.Client/JetStream/SubscribeOptions.cs b/src/NATS.Client/JetStream/SubscribeOptions.cs index bf3efb16b..a1949ecc2 100644 --- a/src/NATS.Client/JetStream/SubscribeOptions.cs +++ b/src/NATS.Client/JetStream/SubscribeOptions.cs @@ -29,11 +29,13 @@ public abstract class SubscribeOptions public string Stream { get; } public bool Pull { get; } public bool Bind { get; } + public bool FastBind { get; } public bool Ordered { get; } internal int MessageAlarmTime { get; } public ConsumerConfiguration ConsumerConfiguration { get; } public long PendingMessageLimit { get; } public long PendingByteLimit { get; } + public string Name { get; } /// /// Gets the durable name @@ -56,7 +58,8 @@ protected SubscribeOptions(ISubscribeOptionsBuilder builder, bool pull, long pendingByteLimit = Defaults.SubPendingBytesLimit) { Pull = pull; - Bind = builder.Bind; + FastBind = builder.FastBind; + Bind = FastBind || builder.Bind; Ordered = builder.Ordered; MessageAlarmTime = builder.MessageAlarmTime; @@ -65,14 +68,25 @@ protected SubscribeOptions(ISubscribeOptionsBuilder builder, bool pull, throw JsSoOrderedNotAllowedWithBind.Instance(); } - Stream = ValidateStreamName(builder.Stream, builder.Bind); + Stream = ValidateStreamName(builder.Stream, builder.Bind); // required when bind mode + + // read the consumer names and do basic validation + // A1. validate name input + string temp = ValidateMustMatchIfBothSupplied(builder.Name, builder.Cc?.Name, JsSoNameMismatch); + // B1. Must be a valid consumer name if supplied + temp = ValidateConsumerName(temp, false); + // A2. validate durable input string durable = ValidateMustMatchIfBothSupplied(builder.Durable, builder.Cc?.Durable, JsSoDurableMismatch); - durable = ValidateDurable(durable, builder.Bind); + // B2. Must be a valid consumer name if supplied + durable = ValidateDurable(durable, false); - string name = ValidateMustMatchIfBothSupplied(builder.Name, builder.Cc?.Name, JsSoNameMismatch); - - ValidateMustMatchIfBothSupplied(name, durable, JsConsumerNameDurableMismatch); + // C. name must match durable if both supplied + Name = ValidateMustMatchIfBothSupplied(temp, durable, JsConsumerNameDurableMismatch); + + if (Bind && Name == null) { + throw JsSoNameOrDurableRequiredForBind.Instance(); + } deliverGroup = ValidateMustMatchIfBothSupplied(deliverGroup, builder.Cc?.DeliverGroup, JsSoDeliverGroupMismatch); @@ -121,7 +135,7 @@ protected SubscribeOptions(ISubscribeOptionsBuilder builder, bool pull, .WithAckPolicy(AckPolicy.None) .WithMaxDeliver(1) .WithAckWait(Duration.OfHours(22)) - .WithName(name) + .WithName(Name) .WithMemStorage(true) .WithNumReplicas(1); @@ -138,7 +152,7 @@ protected SubscribeOptions(ISubscribeOptionsBuilder builder, bool pull, .WithDurable(durable) .WithDeliverSubject(deliverSubject) .WithDeliverGroup(deliverGroup) - .WithName(name) + .WithName(Name) .Build(); } } @@ -147,6 +161,7 @@ public interface ISubscribeOptionsBuilder { string Stream { get; } bool Bind { get; } + bool FastBind { get; } string Durable { get; } string Name { get; } ConsumerConfiguration Cc { get; } @@ -158,6 +173,7 @@ public abstract class SubscribeOptionsBuilder : ISubscribeOptionsBuilde { protected string _stream; protected bool _bind; + protected bool _fastBind; protected string _durable; protected string _name; protected ConsumerConfiguration _config; @@ -166,6 +182,7 @@ public abstract class SubscribeOptionsBuilder : ISubscribeOptionsBuilde public string Stream => _stream; public bool Bind => _bind; + public bool FastBind => _fastBind; public string Durable => _durable; public string Name => _name; public ConsumerConfiguration Cc => _config; @@ -210,8 +227,13 @@ public TB WithName(string name) } /// - /// Set as a direct subscribe + /// Specify binding to an existing consumer via name. + /// The client validates regular (non-fast) + /// binds to ensure that provided consumer configuration + /// is consistent with the server version and that + /// consumer type (push versus pull) matches the subscription type. /// + /// the bind flag /// The builder public TB WithBind(bool isBind) { diff --git a/src/NATS.Client/Subscription.cs b/src/NATS.Client/Subscription.cs index 944108396..8f0049960 100644 --- a/src/NATS.Client/Subscription.cs +++ b/src/NATS.Client/Subscription.cs @@ -688,7 +688,7 @@ public long Dropped #region validation - static private readonly char[] invalidSubjectChars = { '\r', '\n', '\t', ' '}; + private static readonly char[] invalidSubjectChars = { '\r', '\n', '\t', ' '}; private static bool ContainsInvalidChars(string value) { @@ -702,17 +702,7 @@ private static bool ContainsInvalidChars(string value) /// true if valid, false otherwise. public static bool IsValidSubject(string subject) { - if (ContainsInvalidChars(subject)) - { - return false; - } - - // Avoid split for performance, in case this is ever called in the fastpath. - if (subject.StartsWith(".") || subject.EndsWith(".") || subject.Contains("..")) - { - return false; - } - return true; + return Validator.IsValidSubjectTerm(subject, "subject", true).Item1; } /// @@ -735,7 +725,7 @@ public static bool IsValidPrefix(string prefix) /// true is the queue group name is valid, false otherwise. public static bool IsValidQueueGroupName(string queueGroup) { - return ContainsInvalidChars(queueGroup) == false; + return Validator.IsValidSubjectTerm(queueGroup, "queueGroup", true).Item1; } #endregion diff --git a/src/Tests/IntegrationTests/TestJetStream.cs b/src/Tests/IntegrationTests/TestJetStream.cs index 824825389..feb52dcb5 100644 --- a/src/Tests/IntegrationTests/TestJetStream.cs +++ b/src/Tests/IntegrationTests/TestJetStream.cs @@ -89,13 +89,18 @@ public void TestCreateWithOptionsForCoverage() { c.CreateJetStreamManagementContext(jso); }); } + + private void UnsubscribeEnsureNotBound(ISubscription sub) + { + sub.Unsubscribe(); + sub.Connection.Flush(); + Thread.Sleep(50); + } [Fact] public void TestJetStreamSubscribe() { Context.RunInJsServer(c => { - bool atLeast290 = c.ServerInfo.IsSameOrNewerThanVersion("2.9.0"); - IJetStream js = c.CreateJetStreamContext(); IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); @@ -162,7 +167,7 @@ public void TestJetStreamSubscribe() { js.PushSubscribeAsync("", Queue(102), (o, a) => { }, false, psoBind); // test 2.9.0 - if (atLeast290) { + if (AtLeast290(c)) { ConsumerConfiguration cc = ConsumerConfiguration.Builder().WithName(Name(1)).Build(); pso = PushSubscribeOptions.Builder().WithConfiguration(cc).Build(); IJetStreamPushSyncSubscription sub = js.PushSubscribeSync(SUBJECT, pso); @@ -206,64 +211,102 @@ public void TestJetStreamSubscribe() { } }); } - - private void UnsubscribeEnsureNotBound(ISubscription sub) - { - sub.Unsubscribe(); - sub.Connection.Flush(); - Thread.Sleep(50); + + [Fact] + public void TestJetStreamSubscribeLenientSubject() { + Context.RunInJsServer(c => + { + CreateDefaultTestStream(c); + IJetStream js = c.CreateJetStreamContext(); + + js.PushSubscribeSync(SUBJECT, (PushSubscribeOptions)null); + js.PushSubscribeSync(SUBJECT, null, (PushSubscribeOptions)null); // queue name is not required, just a weird way to call this api + js.PushSubscribeAsync(SUBJECT, (s, e) => {}, false, (PushSubscribeOptions)null); + js.PushSubscribeAsync(SUBJECT, null, (s, e) => {}, false, (PushSubscribeOptions)null); // queue name is not required, just a weird way to call this api + + PushSubscribeOptions pso = ConsumerConfiguration.Builder().WithFilterSubject(SUBJECT).BuildPushSubscribeOptions(); + js.PushSubscribeSync(null, pso); + js.PushSubscribeSync(null, null, pso); + js.PushSubscribeAsync(null, (s, e) => {}, false, pso); + js.PushSubscribeAsync(null, null, (s, e) => {}, false, pso); + + PushSubscribeOptions psoF = ConsumerConfiguration.Builder().BuildPushSubscribeOptions(); + + Assert.Throws(() => js.PushSubscribeSync(null, psoF)); + Assert.Throws(() => js.PushSubscribeSync(null, psoF)); + Assert.Throws(() => js.PushSubscribeSync(null, null, psoF)); + Assert.Throws(() => js.PushSubscribeAsync(null, (s, e) => {}, false, psoF)); + Assert.Throws(() => js.PushSubscribeAsync(null, null, (s, e) => {}, false, psoF)); + + Assert.Throws(() => js.PushSubscribeSync(null, (PushSubscribeOptions)null)); + Assert.Throws(() => js.PushSubscribeSync(null, (PushSubscribeOptions)null)); + Assert.Throws(() => js.PushSubscribeSync(null, null, (PushSubscribeOptions)null)); + Assert.Throws(() => js.PushSubscribeAsync(null, (s, e) => {}, false, (PushSubscribeOptions)null)); + Assert.Throws(() => js.PushSubscribeAsync(null, null, (s, e) => {}, false, (PushSubscribeOptions)null)); + + PullSubscribeOptions lso = ConsumerConfiguration.Builder().WithFilterSubject(SUBJECT).BuildPullSubscribeOptions(); + js.PullSubscribe(null, lso); + js.PullSubscribeAsync(null, (s, e) => {}, lso); + + PullSubscribeOptions lsoF = ConsumerConfiguration.Builder().BuildPullSubscribeOptions(); + Assert.Throws(() => js.PullSubscribe(null, lsoF)); + Assert.Throws(() => js.PullSubscribeAsync(null, (s, e) => {}, lsoF)); + + Assert.Throws(() => js.PullSubscribe(null, (PullSubscribeOptions)null)); + Assert.Throws(() => js.PullSubscribeAsync(null, (s, e) => {}, (PullSubscribeOptions)null)); + }); } - + [Fact] public void TestJetStreamSubscribeErrors() { Context.RunInJsServer(c => { IJetStream js = c.CreateJetStreamContext(); - + // stream not found PushSubscribeOptions psoInvalidStream = PushSubscribeOptions.Builder().WithStream(STREAM).Build(); Assert.Throws(() => js.PushSubscribeSync(SUBJECT, psoInvalidStream)); - void AssertThrowsForSubject(Func testCode) - { - ArgumentException ae = Assert.Throws(testCode); - Assert.StartsWith("Subject", ae.Message); - } - - void AssertThrowsForQueue(Func testCode) - { - ArgumentException ae = Assert.Throws(testCode); - Assert.StartsWith("Queue", ae.Message); - } - - void AssertThrowsForHandler(Func testCode) - { - ArgumentNullException ae = Assert.Throws(testCode); - Assert.Equal("Handler", ae.ParamName); - } + Exception ex = null; + foreach (string bad in BadSubjectsOrQueues) { + if (string.IsNullOrEmpty(bad)) + { + ex = Assert.Throws(() => js.PushSubscribeSync(bad)); + Assert.StartsWith("Subject", ex.Message); + NATSJetStreamClientException ce = Assert.Throws(() => + js.PushSubscribeSync(bad, (PushSubscribeOptions)null)); + Assert.Contains(JsSubSubjectNeededToLookupStream.Id, ce.Message); - // subject - AssertThrowsForSubject(() => js.PushSubscribeSync(HasSpace)); - AssertThrowsForSubject(() => js.PushSubscribeSync(null, (PushSubscribeOptions)null)); - AssertThrowsForSubject(() => js.PushSubscribeSync(HasSpace, Plain)); - AssertThrowsForSubject(() => js.PushSubscribeSync(null, Plain, null)); - AssertThrowsForSubject(() => js.PushSubscribeAsync(HasSpace, null, false)); - AssertThrowsForSubject(() => js.PushSubscribeAsync(HasSpace, null, false, null)); - AssertThrowsForSubject(() => js.PushSubscribeAsync(HasSpace, Plain, null, false)); - AssertThrowsForSubject(() => js.PushSubscribeAsync(HasSpace, Plain, null, false, null)); - - // queue - AssertThrowsForQueue(() => js.PushSubscribeSync(Plain, HasSpace)); - AssertThrowsForQueue(() => js.PushSubscribeSync(Plain, HasSpace, null)); - AssertThrowsForQueue(() => js.PushSubscribeAsync(Plain, HasSpace, null, false)); - AssertThrowsForQueue(() => js.PushSubscribeAsync(Plain, HasSpace, null, false, null)); - + Assert.StartsWith("Subject", ex.Message); + } + else + { + // subject + ex = Assert.Throws(() => js.PushSubscribeSync(bad)); + Assert.StartsWith("Subject", ex.Message); + ex = Assert.Throws(() => + js.PushSubscribeSync(bad, (PushSubscribeOptions)null)); + Assert.StartsWith("Subject", ex.Message); + + // queue + if (!string.IsNullOrEmpty(bad)) + { + ex = Assert.Throws(() => js.PushSubscribeSync(SUBJECT, bad, null)); + Assert.StartsWith("Queue", ex.Message); + ex = Assert.Throws(() => + js.PushSubscribeAsync(SUBJECT, bad, (s, e) => { }, false, null)); + Assert.StartsWith("Queue", ex.Message); + } + } + } + // handler - AssertThrowsForHandler(() => js.PushSubscribeAsync(Plain, null, false)); - AssertThrowsForHandler(() => js.PushSubscribeAsync(Plain, null, false, null)); - AssertThrowsForHandler(() => js.PushSubscribeAsync(Plain, Plain, null, false)); - AssertThrowsForHandler(() => js.PushSubscribeAsync(Plain, Plain, null, false, null)); - + ex = Assert.Throws(() => js.PushSubscribeAsync(SUBJECT, null, false)); + Assert.Contains("Handler", ex.Message); + ex = Assert.Throws(() => js.PushSubscribeAsync(SUBJECT, null, false, null)); + Assert.Contains("Handler", ex.Message); + ex = Assert.Throws(() => js.PushSubscribeAsync(SUBJECT, QUEUE, null, false, null)); + Assert.Contains("Handler", ex.Message); }); } @@ -308,7 +351,7 @@ public void TestFilterSubjectEphemeral() { // subscribe to A cc = ConsumerConfiguration.Builder().WithFilterSubject(subjectA).WithAckPolicy(AckPolicy.None).Build(); pso = PushSubscribeOptions.Builder().WithConfiguration(cc).Build(); - sub = js.PushSubscribeSync(subjectWild, pso); + sub = js.PushSubscribeSync(subjectA, pso); c.Flush(1000); m = sub.NextMessage(1000); @@ -322,7 +365,7 @@ public void TestFilterSubjectEphemeral() { // subscribe to B cc = ConsumerConfiguration.Builder().WithFilterSubject(subjectB).WithAckPolicy(AckPolicy.None).Build(); pso = PushSubscribeOptions.Builder().WithConfiguration(cc).Build(); - sub = js.PushSubscribeSync(subjectWild, pso); + sub = js.PushSubscribeSync(subjectB, pso); c.Flush(1000); m = sub.NextMessage(1000); @@ -370,17 +413,21 @@ public void TestBindExceptions() { CreateDefaultTestStream(c); - Assert.Throws( - () => PushSubscribeOptions.Builder().WithStream(STREAM).WithBind(true).Build()); + NATSJetStreamClientException e; + + e = Assert.Throws( + () => PushSubscribeOptions.Builder().WithStream(STREAM).WithBind(true).Build()); + Assert.Contains(JsSoNameOrDurableRequiredForBind.Id, e.Message); Assert.Throws( - () => PushSubscribeOptions.Builder().WithDurable(DURABLE).WithBind(true).Build()); + () => PushSubscribeOptions.Builder().WithDurable(DURABLE).WithBind(true).Build()); Assert.Throws( - () => PushSubscribeOptions.Builder().WithStream(string.Empty).WithBind(true).Build()); + () => PushSubscribeOptions.Builder().WithStream(string.Empty).WithBind(true).Build()); - Assert.Throws( - () => PushSubscribeOptions.Builder().WithStream(STREAM).WithDurable(string.Empty).WithBind(true).Build()); + e = Assert.Throws( + () => PushSubscribeOptions.Builder().WithStream(STREAM).WithDurable(string.Empty).WithBind(true).Build()); + Assert.Contains(JsSoNameOrDurableRequiredForBind.Id, e.Message); }); } @@ -462,83 +509,77 @@ public void TestFilterMismatchErrors() CreateMemoryStream(jsm, STREAM, SUBJECT); // will work as SubscribeSubject equals Filter Subject - SubscribeOk(js, jsm, SUBJECT, SUBJECT); - SubscribeOk(js, jsm, ">", ">"); - SubscribeOk(js, jsm, "*", "*"); - - // will work as SubscribeSubject != empty Filter Subject, - // b/c Stream has exactly 1 subject and is a match. - SubscribeOk(js, jsm, "", SUBJECT); - - // will work as SubscribeSubject != Filter Subject of '>' - // b/c Stream has exactly 1 subject and is a match. - SubscribeOk(js, jsm, ">", SUBJECT); + FilterMatchSubscribeOk(js, jsm, SUBJECT, SUBJECT); + FilterMatchSubscribeOk(js, jsm, ">", ">"); + FilterMatchSubscribeOk(js, jsm, "*", "*"); // will not work - SubscribeEx(js, jsm, "*", SUBJECT); + FilterMatchSubscribeEx(js, jsm, SUBJECT, ""); + FilterMatchSubscribeEx(js, jsm, SUBJECT, ">"); + FilterMatchSubscribeEx(js, jsm, SUBJECT, "*"); // multiple subjects no wildcards jsm.DeleteStream(STREAM); CreateMemoryStream(jsm, STREAM, SUBJECT, Subject(2)); // will work as SubscribeSubject equals Filter Subject - SubscribeOk(js, jsm, SUBJECT, SUBJECT); - SubscribeOk(js, jsm, ">", ">"); - SubscribeOk(js, jsm, "*", "*"); + FilterMatchSubscribeOk(js, jsm, SUBJECT, SUBJECT); + FilterMatchSubscribeOk(js, jsm, ">", ">"); + FilterMatchSubscribeOk(js, jsm, "*", "*"); // will not work because stream has more than 1 subject - SubscribeEx(js, jsm, "", SUBJECT); - SubscribeEx(js, jsm, ">", SUBJECT); - SubscribeEx(js, jsm, "*", SUBJECT); + FilterMatchSubscribeEx(js, jsm, SUBJECT, ""); + FilterMatchSubscribeEx(js, jsm, SUBJECT, ">"); + FilterMatchSubscribeEx(js, jsm, SUBJECT, "*"); // multiple subjects via '>' jsm.DeleteStream(STREAM); CreateMemoryStream(jsm, STREAM, SUBJECT_GT); // will work, exact matches - SubscribeOk(js, jsm, SubjectDot("1"), SubjectDot("1")); - SubscribeOk(js, jsm, ">", ">"); + FilterMatchSubscribeOk(js, jsm, SubjectDot("1"), SubjectDot("1")); + FilterMatchSubscribeOk(js, jsm, ">", ">"); // will not work because mismatch / stream has more than 1 subject - SubscribeEx(js, jsm, "", SubjectDot("1")); - SubscribeEx(js, jsm, ">", SubjectDot("1")); - SubscribeEx(js, jsm, SUBJECT_GT, SubjectDot("1")); + FilterMatchSubscribeEx(js, jsm, SubjectDot("1"), ""); + FilterMatchSubscribeEx(js, jsm, SubjectDot("1"), ">"); + FilterMatchSubscribeEx(js, jsm, SubjectDot("1"), SUBJECT_GT); // multiple subjects via '*' jsm.DeleteStream(STREAM); CreateMemoryStream(jsm, STREAM, SUBJECT_STAR); // will work, exact matches - SubscribeOk(js, jsm, SubjectDot("1"), SubjectDot("1")); - SubscribeOk(js, jsm, ">", ">"); + FilterMatchSubscribeOk(js, jsm, SubjectDot("1"), SubjectDot("1")); + FilterMatchSubscribeOk(js, jsm, ">", ">"); // will not work because mismatch / stream has more than 1 subject - SubscribeEx(js, jsm, "", SubjectDot("1")); - SubscribeEx(js, jsm, ">", SubjectDot("1")); - SubscribeEx(js, jsm, SUBJECT_STAR, SubjectDot("1")); + FilterMatchSubscribeEx(js, jsm, SubjectDot("1"), ""); + FilterMatchSubscribeEx(js, jsm, SubjectDot("1"), ">"); + FilterMatchSubscribeEx(js, jsm, SubjectDot("1"), SUBJECT_STAR); }); } - private void SubscribeOk(IJetStream js, IJetStreamManagement jsm, string fs, string ss) + private void FilterMatchSubscribeOk(IJetStream js, IJetStreamManagement jsm, string subscribeSubject, params string[] filterSubjects) { int i = Rndm.Next(); // just want a unique number - SetupConsumer(jsm, i, fs); - js.PushSubscribeSync(ss, ConsumerConfiguration.Builder().WithDurable(Durable(i)).BuildPushSubscribeOptions()).Unsubscribe(); + FilterMatchSetupConsumer(jsm, i, filterSubjects); + js.PushSubscribeSync(subscribeSubject, ConsumerConfiguration.Builder().WithDurable(Durable(i)).BuildPushSubscribeOptions()).Unsubscribe(); } - private void SubscribeEx(IJetStream js, IJetStreamManagement jsm, string fs, string ss) + private void FilterMatchSubscribeEx(IJetStream js, IJetStreamManagement jsm, string subscribeSubject, params string[] filterSubjects) { int i = Rndm.Next(); // just want a unique number - SetupConsumer(jsm, i, fs); + FilterMatchSetupConsumer(jsm, i, filterSubjects); NATSJetStreamClientException e = Assert.Throws( - () => js.PushSubscribeSync(ss, ConsumerConfiguration.Builder().WithDurable(Durable(i)).BuildPushSubscribeOptions())); + () => js.PushSubscribeSync(subscribeSubject, ConsumerConfiguration.Builder().WithDurable(Durable(i)).BuildPushSubscribeOptions())); Assert.Contains(JsSubSubjectDoesNotMatchFilter.Id, e.Message); } - private void SetupConsumer(IJetStreamManagement jsm, int i, string fs) + private void FilterMatchSetupConsumer(IJetStreamManagement jsm, int i, params string[] fs) { jsm.AddOrUpdateConsumer(STREAM, - ConsumerConfiguration.Builder().WithDeliverSubject(Deliver(i)).WithDurable(Durable(i)).WithFilterSubject(fs).Build()); + ConsumerConfiguration.Builder().WithDeliverSubject(Deliver(i)).WithDurable(Durable(i)).WithFilterSubjects(fs).Build()); } [Fact] @@ -546,7 +587,9 @@ public void TestBindDurableDeliverSubject() { Context.RunInJsServer(c => { - CreateDefaultTestStream(c); + string stream = Stream(); + string subject = Subject(); + CreateMemoryStream(c, stream, subject); IJetStream js = c.CreateJetStreamContext(); IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); @@ -555,41 +598,38 @@ public void TestBindDurableDeliverSubject() ConsumerConfiguration ccDurPush = ConsumerConfiguration.Builder() .WithDurable(Durable(1)) .WithDeliverSubject(Deliver(1)) + .WithFilterSubject(subject) .Build(); - jsm.AddOrUpdateConsumer(STREAM, ccDurPush); + jsm.AddOrUpdateConsumer(stream, ccDurPush); // create a durable pull subscriber - notice no deliver subject ConsumerConfiguration ccDurPull = ConsumerConfiguration.Builder() .WithDurable(Durable(2)) .Build(); - jsm.AddOrUpdateConsumer(STREAM, ccDurPull); + jsm.AddOrUpdateConsumer(stream, ccDurPull); // try to pull subscribe against a push durable NATSJetStreamClientException e = Assert.Throws( - () => js.PullSubscribe(SUBJECT, PullSubscribeOptions.Builder().WithDurable(Durable(1)).Build())); + () => js.PullSubscribe(subject, PullSubscribeOptions.Builder().WithDurable(Durable(1)).Build())); Assert.Contains(JsSubConsumerAlreadyConfiguredAsPush.Id, e.Message); // try to pull bind against a push durable e = Assert.Throws( - () => js.PullSubscribe(SUBJECT, PullSubscribeOptions.BindTo(STREAM, Durable(1)))); + () => js.PullSubscribe(subject, PullSubscribeOptions.BindTo(stream, Durable(1)))); Assert.Contains(JsSubConsumerAlreadyConfiguredAsPush.Id, e.Message); - // this one is okay - IJetStreamPullSubscription sub = js.PullSubscribe(SUBJECT, PullSubscribeOptions.Builder().WithDurable(Durable(2)).Build()); - sub.Unsubscribe(); // so I can re-use the durable - // try to push subscribe against a pull durable e = Assert.Throws( - () => js.PushSubscribeSync(SUBJECT, PushSubscribeOptions.Builder().WithDurable(Durable(2)).Build())); + () => js.PushSubscribeSync(subject, PushSubscribeOptions.Builder().WithDurable(Durable(2)).Build())); Assert.Contains(JsSubConsumerAlreadyConfiguredAsPull.Id, e.Message); // try to push bind against a pull durable e = Assert.Throws( - () => js.PushSubscribeSync(SUBJECT, PushSubscribeOptions.BindTo(STREAM, Durable(2)))); + () => js.PushSubscribeSync(subject, PushSubscribeOptions.BindTo(stream, Durable(2)))); Assert.Contains(JsSubConsumerAlreadyConfiguredAsPull.Id, e.Message); // this one is okay - js.PushSubscribeSync(SUBJECT, PushSubscribeOptions.Builder().WithDurable(Durable(1)).Build()); + js.PushSubscribeSync(subject, PushSubscribeOptions.Builder().WithDurable(Durable(1)).Build()); }); } @@ -632,6 +672,7 @@ public void TestConsumerIsNotModified() .WithMaxAckPending(65000) .WithMaxDeliver(5) .WithReplayPolicy(ReplayPolicy.Instant) + .WithFilterSubject(SUBJECT) .Build(); jsm.AddOrUpdateConsumer(STREAM, cc); @@ -649,6 +690,7 @@ public void TestConsumerIsNotModified() .WithMaxDeliver(43) .WithRateLimitBps(44) .WithMaxAckPending(45) + .WithFilterSubject(SUBJECT) .Build(); jsm.AddOrUpdateConsumer(STREAM, cc); @@ -658,6 +700,7 @@ public void TestConsumerIsNotModified() cc = ConsumerConfiguration.Builder() .WithDurable(Durable(22)) .WithMaxPullWaiting(46) + .WithFilterSubject(SUBJECT) .Build(); jsm.AddOrUpdateConsumer(STREAM, cc); @@ -670,6 +713,7 @@ public void TestConsumerIsNotModified() .WithDeliverSubject(Deliver(3)) .WithDurable(Durable(3)) .WithStartTime(DateTime.UtcNow.AddHours(1)) + .WithFilterSubject(SUBJECT) .Build(); jsm.AddOrUpdateConsumer(STREAM, cc); @@ -683,6 +727,7 @@ public void TestConsumerIsNotModified() .WithFlowControl(1000) .WithHeadersOnly(true) .WithAckWait(2000) + .WithFilterSubject(SUBJECT) .Build(); jsm.AddOrUpdateConsumer(STREAM, cc); @@ -696,6 +741,7 @@ public void TestConsumerIsNotModified() .WithDeliverPolicy(DeliverPolicy.Last) .WithAckPolicy(AckPolicy.None) .WithReplayPolicy(ReplayPolicy.Original) + .WithFilterSubject(SUBJECT) .Build(); jsm.AddOrUpdateConsumer(STREAM, cc); @@ -711,63 +757,68 @@ public void TestSubscribeDurableConsumerMustMatch() { IJetStream js = c.CreateJetStreamContext(); IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); - CreateDefaultTestStream(jsm); + string stream = Stream(); + string subject = Subject(); + CreateMemoryStream(jsm, stream, subject); // push - jsm.AddOrUpdateConsumer(STREAM, PushDurableBuilder().Build()); - - ChangeExPush(js, PushDurableBuilder().WithDeliverPolicy(DeliverPolicy.Last), "DeliverPolicy"); - ChangeExPush(js, PushDurableBuilder().WithDeliverPolicy(DeliverPolicy.New), "DeliverPolicy"); - ChangeExPush(js, PushDurableBuilder().WithAckPolicy(AckPolicy.None), "AckPolicy"); - ChangeExPush(js, PushDurableBuilder().WithAckPolicy(AckPolicy.All), "AckPolicy"); - ChangeExPush(js, PushDurableBuilder().WithReplayPolicy(ReplayPolicy.Original), "ReplayPolicy"); - - ChangeExPush(js, PushDurableBuilder().WithFlowControl(10000), "FlowControl"); - ChangeExPush(js, PushDurableBuilder().WithHeadersOnly(true), "HeadersOnly"); - - ChangeExPush(js, PushDurableBuilder().WithStartTime(DateTime.UtcNow), "StartTime"); - ChangeExPush(js, PushDurableBuilder().WithAckWait(Duration.OfMillis(1)), "AckWait"); - ChangeExPush(js, PushDurableBuilder().WithDescription("x"), "Description"); - ChangeExPush(js, PushDurableBuilder().WithSampleFrequency("x"), "SampleFrequency"); - ChangeExPush(js, PushDurableBuilder().WithIdleHeartbeat(Duration.OfMillis(1000)), "IdleHeartbeat"); - ChangeExPush(js, PushDurableBuilder().WithMaxExpires(Duration.OfMillis(1000)), "MaxExpires"); - ChangeExPush(js, PushDurableBuilder().WithInactiveThreshold(Duration.OfMillis(1000)), "InactiveThreshold"); + string uname = Durable(); + string deliver = Deliver(); + jsm.AddOrUpdateConsumer(stream, PushDurableBuilder(subject, uname, deliver).Build()); + + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithDeliverPolicy(DeliverPolicy.Last), "DeliverPolicy"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithDeliverPolicy(DeliverPolicy.New), "DeliverPolicy"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithAckPolicy(AckPolicy.None), "AckPolicy"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithAckPolicy(AckPolicy.All), "AckPolicy"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithReplayPolicy(ReplayPolicy.Original), "ReplayPolicy"); + + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithFlowControl(10000), "FlowControl"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithHeadersOnly(true), "HeadersOnly"); + + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithStartTime(DateTime.UtcNow), "StartTime"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithAckWait(Duration.OfMillis(1)), "AckWait"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithDescription("x"), "Description"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithSampleFrequency("x"), "SampleFrequency"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithIdleHeartbeat(Duration.OfMillis(1000)), "IdleHeartbeat"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMaxExpires(Duration.OfMillis(1000)), "MaxExpires"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithInactiveThreshold(Duration.OfMillis(1000)), "InactiveThreshold"); // value - ChangeExPush(js, PushDurableBuilder().WithMaxDeliver(1), "MaxDeliver"); - ChangeExPush(js, PushDurableBuilder().WithMaxAckPending(0), "MaxAckPending"); - ChangeExPush(js, PushDurableBuilder().WithAckWait(0), "AckWait"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMaxDeliver(1), "MaxDeliver"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMaxAckPending(0), "MaxAckPending"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithAckWait(0), "AckWait"); // value unsigned - ChangeExPush(js, PushDurableBuilder().WithStartSequence(1), "StartSequence"); - ChangeExPush(js, PushDurableBuilder().WithRateLimitBps(1), "RateLimitBps"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithStartSequence(1), "StartSequence"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithRateLimitBps(1), "RateLimitBps"); // unset doesn't fail because the server provides a value equal to the unset - ChangeOkPush(js, PushDurableBuilder().WithMaxDeliver(-1)); + ChangeOkPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMaxDeliver(-1)); // unset doesn't fail because the server does not provide a value - ChangeOkPush(js, PushDurableBuilder().WithStartSequence(0)); - ChangeOkPush(js, PushDurableBuilder().WithRateLimitBps(0)); + ChangeOkPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithStartSequence(0)); + ChangeOkPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithRateLimitBps(0)); // unset fail b/c the server does set a value that is not equal to the unset or the minimum - ChangeExPush(js, PushDurableBuilder().WithMaxAckPending(-1), "MaxAckPending"); - ChangeExPush(js, PushDurableBuilder().WithMaxAckPending(0), "MaxAckPending"); - ChangeExPush(js, PushDurableBuilder().WithAckWait(-1), "AckWait"); - ChangeExPush(js, PushDurableBuilder().WithAckWait(0), "AckWait"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMaxAckPending(-1), "MaxAckPending"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMaxAckPending(0), "MaxAckPending"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithAckWait(-1), "AckWait"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithAckWait(0), "AckWait"); // pull - jsm.AddOrUpdateConsumer(STREAM, PullDurableBuilder().Build()); + string lname = Durable(); + jsm.AddOrUpdateConsumer(stream, PullDurableBuilder(subject, lname).Build()); // value - ChangeExPull(js, PullDurableBuilder().WithMaxPullWaiting(0), "MaxPullWaiting"); - ChangeExPull(js, PullDurableBuilder().WithMaxBatch(0), "MaxBatch"); - ChangeExPull(js, PullDurableBuilder().WithMaxBytes(0), "MaxBytes"); + ChangeExPull(js, subject, PullDurableBuilder(subject, lname).WithMaxPullWaiting(0), "MaxPullWaiting"); + ChangeExPull(js, subject, PullDurableBuilder(subject, lname).WithMaxBatch(0), "MaxBatch"); + ChangeExPull(js, subject, PullDurableBuilder(subject, lname).WithMaxBytes(0), "MaxBytes"); // unsets fail b/c the server does set a value - ChangeExPull(js, PullDurableBuilder().WithMaxPullWaiting(-1), "MaxPullWaiting"); + ChangeExPull(js, subject, PullDurableBuilder(subject, lname).WithMaxPullWaiting(-1), "MaxPullWaiting"); // unset - ChangeOkPull(js, PullDurableBuilder().WithMaxBatch(-1)); + ChangeOkPull(js, subject, PullDurableBuilder(subject, lname).WithMaxBatch(-1)); if (c.ServerInfo.IsNewerVersionThan("2.9.99")) { @@ -778,39 +829,39 @@ public void TestSubscribeDurableConsumerMustMatch() { metadataA["b"] = "B"; // metadata server null versus new not null - jsm.AddOrUpdateConsumer(STREAM, PushDurableBuilder().Build()); - ChangeExPush(js, PushDurableBuilder().WithMetadata(metadataA), "Metadata"); + jsm.AddOrUpdateConsumer(stream, PushDurableBuilder(subject, uname, deliver).Build()); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMetadata(metadataA), "Metadata"); // metadata server not null versus new null - jsm.AddOrUpdateConsumer(STREAM, PushDurableBuilder().WithMetadata(metadataA).Build()); - ChangeOkPush(js, PushDurableBuilder()); + jsm.AddOrUpdateConsumer(stream, PushDurableBuilder(subject, uname, deliver).WithMetadata(metadataA).Build()); + ChangeOkPush(js, subject, PushDurableBuilder(subject, uname, deliver)); // metadata server not null versus new not null but different - ChangeExPush(js, PushDurableBuilder().WithMetadata(metadataB), "Metadata"); + ChangeExPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMetadata(metadataB), "Metadata"); // metadata server not null versus new not null and same - ChangeOkPush(js, PushDurableBuilder().WithMetadata(metadataA)); + ChangeOkPush(js, subject, PushDurableBuilder(subject, uname, deliver).WithMetadata(metadataA)); } }); } - private void ChangeOkPush(IJetStream js, ConsumerConfiguration.ConsumerConfigurationBuilder builder) { - js.PushSubscribeSync(SUBJECT, PushSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build()).Unsubscribe(); + private void ChangeOkPush(IJetStream js, string subject, ConsumerConfiguration.ConsumerConfigurationBuilder builder) { + js.PushSubscribeSync(subject, PushSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build()).Unsubscribe(); } - private void ChangeOkPull(IJetStream js, ConsumerConfiguration.ConsumerConfigurationBuilder builder) { - js.PullSubscribe(SUBJECT, PullSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build()).Unsubscribe(); + private void ChangeOkPull(IJetStream js, string subject, ConsumerConfiguration.ConsumerConfigurationBuilder builder) { + js.PullSubscribe(subject, PullSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build()).Unsubscribe(); } - private void ChangeExPush(IJetStream js, ConsumerConfiguration.ConsumerConfigurationBuilder builder, string changedField) { + private void ChangeExPush(IJetStream js, string subject, ConsumerConfiguration.ConsumerConfigurationBuilder builder, string changedField) { NATSJetStreamClientException e = Assert.Throws( - () => js.PushSubscribeSync(SUBJECT, PushSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build())); + () => js.PushSubscribeSync(subject, PushSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build())); _ChangeEx(e, changedField); } - private void ChangeExPull(IJetStream js, ConsumerConfiguration.ConsumerConfigurationBuilder builder, string changedField) { + private void ChangeExPull(IJetStream js, string subject, ConsumerConfiguration.ConsumerConfigurationBuilder builder, string changedField) { NATSJetStreamClientException e = Assert.Throws( - () => js.PullSubscribe(SUBJECT, PullSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build())); + () => js.PullSubscribe(subject, PullSubscribeOptions.Builder().WithConfiguration(builder.Build()).Build())); _ChangeEx(e, changedField); } @@ -821,12 +872,14 @@ private void _ChangeEx(NATSJetStreamClientException e, string changedField) Assert.Contains(changedField, msg); } - private ConsumerConfiguration.ConsumerConfigurationBuilder PushDurableBuilder() { - return ConsumerConfiguration.Builder().WithDurable(PUSH_DURABLE).WithDeliverSubject(DELIVER); + private ConsumerConfiguration.ConsumerConfigurationBuilder PushDurableBuilder(string subject, string durable, string deliver) { + return ConsumerConfiguration.Builder().WithDurable(durable) + .WithDeliverSubject(deliver) + .WithFilterSubject(subject); } - private ConsumerConfiguration.ConsumerConfigurationBuilder PullDurableBuilder() { - return ConsumerConfiguration.Builder().WithDurable(PULL_DURABLE); + private ConsumerConfiguration.ConsumerConfigurationBuilder PullDurableBuilder(string subject, string durable) { + return ConsumerConfiguration.Builder().WithDurable(durable).WithFilterSubject(subject); } [Fact] diff --git a/src/Tests/IntegrationTests/TestJetStreamManagement.cs b/src/Tests/IntegrationTests/TestJetStreamManagement.cs index 8265e6c9e..59b3574b0 100644 --- a/src/Tests/IntegrationTests/TestJetStreamManagement.cs +++ b/src/Tests/IntegrationTests/TestJetStreamManagement.cs @@ -688,7 +688,7 @@ private static void AddConsumer(IJetStreamManagement jsm, bool atLeast290, int i } Assert.Equal(Durable(id), ci.ConsumerConfiguration.Durable); if (fs == null) { - Assert.Empty(ci.ConsumerConfiguration.FilterSubject); + Assert.Null(ci.ConsumerConfiguration.FilterSubject); } if (deliver) { Assert.Equal(Deliver(id), ci.ConsumerConfiguration.DeliverSubject); diff --git a/src/Tests/IntegrationTests/TestJetStreamPull.cs b/src/Tests/IntegrationTests/TestJetStreamPull.cs index e8a487973..e2e8cd281 100644 --- a/src/Tests/IntegrationTests/TestJetStreamPull.cs +++ b/src/Tests/IntegrationTests/TestJetStreamPull.cs @@ -19,6 +19,7 @@ using NATS.Client.Internals; using NATS.Client.JetStream; using Xunit; +using Xunit.Abstractions; using static UnitTests.TestBase; using static IntegrationTests.JetStreamTestBase; using static NATS.Client.Internals.JetStreamConstants; @@ -28,7 +29,15 @@ namespace IntegrationTests { public class TestJetStreamPull : TestSuite { - public TestJetStreamPull(AutoServerSuiteContext context) : base(context) {} + private readonly ITestOutputHelper output; + + public TestJetStreamPull(ITestOutputHelper output, AutoServerSuiteContext context) : base(context) + { + this.output = output; + Console.SetOut(new ConsoleWriter(output)); + } + + // public TestJetStreamPull(AutoServerSuiteContext context) : base(context) {} [Fact] public void TestFetch() @@ -346,9 +355,9 @@ public void TestAckNak() Context.RunInJsServer(c => { // create the stream. - string stream = Stream(Nuid.NextGlobal()); - string subject = Subject(Nuid.NextGlobal()); - string durable = Nuid.NextGlobal(); + string stream = Stream(); + string subject = Subject(); + string durable = Durable(); CreateMemoryStream(c, stream, subject); // Create our JetStream context. @@ -739,23 +748,20 @@ public void TestExceedsMaxRequestBytes1stMessage() [Fact] public void TestExceedsMaxRequestBytesNthMessage() { - bool skip = false; TestEventHandler handler = new TestEventHandler(); - Context.RunInJsServer(handler.Modifier, c => + Context.RunInJsServer(AtLeast291, handler.Modifier, c => { - skip = VersionIsBefore(c, "2.9.1"); - if (skip) - { - return; - } - string stream = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); string durable = Nuid.NextGlobal(); IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); CreateMemoryStream(jsm, stream, subject); IJetStream js = c.CreateJetStreamContext(); - jsm.AddOrUpdateConsumer(stream, Builder().WithDurable(durable).Build()); + + jsm.AddOrUpdateConsumer(stream, Builder().WithDurable(durable) + .WithAckPolicy(AckPolicy.None) + .WithFilterSubject(subject) + .Build()); PullSubscribeOptions so = PullSubscribeOptions.BindTo(stream, durable); IJetStreamPullSubscription sub = js.PullSubscribe(subject, so); @@ -780,22 +786,16 @@ public void TestExceedsMaxRequestBytesNthMessage() [Fact] public void TestExceedsMaxRequestBytesExactBytes() { - bool skip = false; TestEventHandler handler = new TestEventHandler(); - Context.RunInJsServer(handler.Modifier, c => + Context.RunInJsServer(AtLeast291, handler.Modifier, c => { - skip = VersionIsBefore(c, "2.9.1"); - if (skip) - { - return; - } string stream = Stream(Nuid.NextGlobal()); string subject = "subject-ExMaxRqBytesExactBytes"; string durable = Nuid.NextGlobal(); IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); CreateMemoryStream(jsm, stream, subject); IJetStream js = c.CreateJetStreamContext(); - jsm.AddOrUpdateConsumer(stream, Builder().WithDurable(durable).Build()); + jsm.AddOrUpdateConsumer(stream, Builder().WithDurable(durable).WithFilterSubject(subject).Build()); PullSubscribeOptions so = PullSubscribeOptions.BindTo(stream, durable); IJetStreamPullSubscription sub = js.PullSubscribe(subject, so); diff --git a/src/Tests/IntegrationTests/TestSuite.cs b/src/Tests/IntegrationTests/TestSuite.cs index 4541e031d..b57bdc551 100644 --- a/src/Tests/IntegrationTests/TestSuite.cs +++ b/src/Tests/IntegrationTests/TestSuite.cs @@ -31,6 +31,22 @@ protected TestSuite(TSuiteContext context) { Context = context; } + + public bool AtLeast290(IConnection c) { + return AtLeast290(c.ServerInfo); + } + + public bool AtLeast290(ServerInfo si) { + return si.IsSameOrNewerThanVersion("2.9.0"); + } + + public bool AtLeast291(ServerInfo si) { + return si.IsSameOrNewerThanVersion("2.9.1"); + } + + public bool AtLeast210(ServerInfo si) { + return si.IsNewerVersionThan("2.9.99"); + } } /// @@ -461,6 +477,8 @@ public class AutoServerSuiteContext : SuiteContext public void RunInJsServer(Action test) => base.RunInJsServer(AutoServer(), null, null, test); public void RunInJsServer(Func versionCheck, Action test) => base.RunInJsServer(AutoServer(), versionCheck, null, test); public void RunInJsServer(Action optionsModifier, Action test) => base.RunInJsServer(AutoServer(), optionsModifier, test); + public void RunInJsServer(Func versionCheck, Action optionsModifier, Action test) => + base.RunInJsServer(AutoServer(), versionCheck, optionsModifier, test); } public sealed class SkipPlatformsWithoutSignals : FactAttribute diff --git a/src/Tests/IntegrationTestsInternal/TestJetStreamConsumer.cs b/src/Tests/IntegrationTestsInternal/TestJetStreamConsumer.cs index 73838509b..8c61067e0 100644 --- a/src/Tests/IntegrationTestsInternal/TestJetStreamConsumer.cs +++ b/src/Tests/IntegrationTestsInternal/TestJetStreamConsumer.cs @@ -347,5 +347,86 @@ private static CountdownEvent setupPullFactory(IJetStream js) new PullHeartbeatErrorSimulator(conn, so, false, latch); return latch; } + + [Fact] + public void TestMultipleSubjectFilters() { + Context.RunInJsServer(AtLeast210, c => { + // Setup + IJetStream js = c.CreateJetStreamContext(); + IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); + + string subject1 = Subject(); + string subject2 = Subject(); + string stream = Stream(); + CreateMemoryStream(jsm, stream, subject1, subject2); + JsPublish(js, subject1, 10); + JsPublish(js, subject2, 5); + + StreamInfo si = jsm.GetStreamInfo(stream); + + // push ephemeral + ConsumerConfiguration cc = ConsumerConfiguration.Builder() + .WithFilterSubjects(subject1, subject2).Build(); + IJetStreamPushSyncSubscription pushSub = js.PushSubscribeSync(null, + PushSubscribeOptions.Builder().WithConfiguration(cc).Build()); + ValidateMultipleSubjectFilterSub(pushSub, subject1); + + // pull ephemeral + IJetStreamPullSubscription pullSub = js.PullSubscribe(null, + PullSubscribeOptions.Builder().WithConfiguration(cc).Build()); + pullSub.PullExpiresIn(15, 1000); + ValidateMultipleSubjectFilterSub(pullSub, subject1); + + // push named + string name = Name(); + cc = ConsumerConfiguration.Builder() + .WithFilterSubjects(subject1, subject2).WithName(name).WithDeliverSubject(Deliver()).Build(); + jsm.AddOrUpdateConsumer(stream, cc); + pushSub = js.PushSubscribeSync(null, PushSubscribeOptions.Builder().WithConfiguration(cc).Build()); + ValidateMultipleSubjectFilterSub(pushSub, subject1); + + name = Name(); + cc = ConsumerConfiguration.Builder() + .WithFilterSubjects(subject1, subject2).WithName(name).WithDeliverSubject(Deliver()).Build(); + jsm.AddOrUpdateConsumer(stream, cc); + pushSub = js.PushSubscribeSync(null, PushSubscribeOptions.BindTo(stream, name)); + ValidateMultipleSubjectFilterSub(pushSub, subject1); + + // pull named + name = Name(); + cc = ConsumerConfiguration.Builder().WithFilterSubjects(subject1, subject2).WithName(name).Build(); + jsm.AddOrUpdateConsumer(stream, cc); + pullSub = js.PullSubscribe(null, PullSubscribeOptions.Builder().WithConfiguration(cc).Build()); + pullSub.PullExpiresIn(15, 1000); + ValidateMultipleSubjectFilterSub(pullSub, subject1); + + name = Name(); + cc = ConsumerConfiguration.Builder().WithFilterSubjects(subject1, subject2).WithName(name).Build(); + jsm.AddOrUpdateConsumer(stream, cc); + pullSub = js.PullSubscribe(null, PullSubscribeOptions.BindTo(stream, name)); + pullSub.PullExpiresIn(15, 1000); + ValidateMultipleSubjectFilterSub(pullSub, subject1); + }); + } + + private static void ValidateMultipleSubjectFilterSub(ISyncSubscription sub, string subject1) { + int count1 = 0; + int count2 = 0; + try + { + while (true) { + Msg m = sub.NextMessage(1000); + if (m.Subject.Equals(subject1)) { + count1++; + } + else { + count2++; + } + } + } + catch (NATSTimeoutException) {} + Assert.Equal(10, count1); + Assert.Equal(5, count2); + } } } diff --git a/src/Tests/IntegrationTestsInternal/TestService.cs b/src/Tests/IntegrationTestsInternal/TestService.cs index 10506c2ba..2a98f96f9 100644 --- a/src/Tests/IntegrationTestsInternal/TestService.cs +++ b/src/Tests/IntegrationTestsInternal/TestService.cs @@ -367,7 +367,6 @@ public void TestServiceBuilderConstruction() Assert.Throws(() => Service.Builder().WithName(HasStar)); // invalid in the middle Assert.Throws(() => Service.Builder().WithName(HasGt)); // invalid in the middle Assert.Throws(() => Service.Builder().WithName(HasDollar)); - Assert.Throws(() => Service.Builder().WithName(HasLow)); Assert.Throws(() => Service.Builder().WithName(Has127)); Assert.Throws(() => Service.Builder().WithName(HasFwdSlash)); Assert.Throws(() => Service.Builder().WithName(HasBackSlash)); @@ -565,18 +564,19 @@ public void TestEndpointConstruction() { Assert.Throws(() => new Endpoint(HasStar)); // invalid in the middle Assert.Throws(() => new Endpoint(HasGt)); // invalid in the middle Assert.Throws(() => new Endpoint(HasDollar)); - Assert.Throws(() => new Endpoint(HasLow)); Assert.Throws(() => new Endpoint(Has127)); Assert.Throws(() => new Endpoint(HasFwdSlash)); Assert.Throws(() => new Endpoint(HasBackSlash)); Assert.Throws(() => new Endpoint(HasEquals)); Assert.Throws(() => new Endpoint(HasTic)); - // fewer subjects are bad - Assert.Throws(() => new Endpoint(NAME, HasSpace)); - Assert.Throws(() => new Endpoint(NAME, HasLow)); - Assert.Throws(() => new Endpoint(NAME, Has127)); - Assert.Throws(() => new Endpoint(NAME, "foo.>.bar")); // gt is not last segment + foreach (string bad in BadSubjectsOrQueues) + { + if (!string.IsNullOrEmpty(bad)) + { + Assert.Throws(() => new Endpoint(NAME, bad)); + } + } } [Fact] @@ -651,12 +651,11 @@ public void TestGroupConstruction() { g1 = new Group("foo.*"); Assert.Equal("foo.*", g1.Name); - - Assert.Throws(() => new Group(HasSpace)); - Assert.Throws(() => new Group(HasLow)); - Assert.Throws(() => new Group(Has127)); - Assert.Throws(() => new Group("foo.>")); // gt is last segment - Assert.Throws(() => new Group("foo.>.bar")); // gt is not last segment + + foreach (string bad in BadSubjectsOrQueues) + { + Assert.Throws(() => new Group(bad)); + } } [Fact] diff --git a/src/Tests/IntegrationTestsInternal/TestSimplification.cs b/src/Tests/IntegrationTestsInternal/TestSimplification.cs index 3fb5869b4..75b103c27 100644 --- a/src/Tests/IntegrationTestsInternal/TestSimplification.cs +++ b/src/Tests/IntegrationTestsInternal/TestSimplification.cs @@ -28,16 +28,11 @@ namespace IntegrationTests public class TestSimplification : TestSuite { public TestSimplification(OneServerSuiteContext context) : base(context) {} - - private bool RunTest(ServerInfo si) - { - return si.IsSameOrNewerThanVersion("2.9.1"); - } [Fact] public void TestStreamContext() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string stream = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); @@ -127,7 +122,7 @@ private static void _TestStreamContext(string expectedStreamName, string subject [Fact] public void TestFetch() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string stream = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); @@ -237,7 +232,7 @@ private static string generateConsumerName(int maxMessages, int maxBytes) { [Fact] public void TestIterableConsumer() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string streamName = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); string durable = Nuid.NextGlobal(); @@ -271,7 +266,7 @@ public void TestIterableConsumer() [Fact] public void TestOrderedIterableConsumerBasic() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string streamName = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); @@ -341,7 +336,7 @@ private static void _testIterable(IJetStream js, int stopCount, IIterableConsume [Fact] public void TestConsumeWithHandler() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string streamName = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); string durable = Nuid.NextGlobal(); @@ -376,7 +371,7 @@ public void TestConsumeWithHandler() [Fact] public void TestNext() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string streamName = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); string durable = Nuid.NextGlobal(); @@ -412,7 +407,7 @@ public void TestCoverage() { string durable5 = Nuid.NextGlobal(); string durable6 = Nuid.NextGlobal(); - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); IJetStream js = c.CreateJetStreamContext(); @@ -600,7 +595,7 @@ protected override bool BeforeChannelAddCheck(Msg msg) [Fact] public void TestOrderedActives() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string streamName = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); string durable = Nuid.NextGlobal(); @@ -678,7 +673,7 @@ private static void testOrderedActiveIterable(IStreamContext sc, OrderedConsumer [Fact] public void TestOrderedConsume() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string streamName = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); @@ -726,7 +721,7 @@ public void TestOrderedConsume() { [Fact] public void TestOrderedMultipleWays() { - Context.RunInJsServer(si => RunTest(si), c => { + Context.RunInJsServer(AtLeast291, c => { string streamName = Stream(Nuid.NextGlobal()); string subject = Subject(Nuid.NextGlobal()); diff --git a/src/Tests/UnitTests/Internals/TestValidator.cs b/src/Tests/UnitTests/Internals/TestValidator.cs index 8dc2d7c40..75c90ca08 100644 --- a/src/Tests/UnitTests/Internals/TestValidator.cs +++ b/src/Tests/UnitTests/Internals/TestValidator.cs @@ -30,22 +30,29 @@ public TestValidator() } [Fact] - public void TestValidateMessageSubjectRequired() + public void TestValidateSubject() { - AllowedRequired(ValidateSubject, Plain, HasPrintable, HasDot, HasStar, HasGt, HasDollar); - NotAllowedRequired(ValidateSubject, null, string.Empty, HasSpace, HasLow, Has127); - NotAllowedRequired(ValidateSubject, _utfOnlyStrings); - AllowedNotRequiredEmptyAsNull(ValidateSubject, null, string.Empty); + // subject is required + AllowedRequired(ValidateSubject, Plain, HasPrintable, HasDot, HasDollar, HasLow, Has127); + AllowedRequired(ValidateSubject, _utfOnlyStrings); + AllowedRequired(ValidateSubject, StarSegment, GtLastSegment); + NotAllowedRequired(ValidateSubject, null, string.Empty, HasSpace, HasCr, HasLf); + NotAllowedRequired(ValidateSubject, StartsWithDot, StarNotSegment, GtNotSegment, EmptySegment, GtNotLastSegment); + NotAllowedRequired(ValidateSubject, EndsWithDot, EndsWithDotSpace, EndsWithCr, EndsWithLf, EndsWithTab); + - NotAllowedRequired(ValidateSubject, null, string.Empty, HasSpace, HasLow, Has127); + // subject not required, null and empty both mean not supplied AllowedNotRequiredEmptyAsNull(ValidateSubject, null, string.Empty); + NotAllowedNotRequired(ValidateSubject, HasSpace, HasCr, HasLf); + NotAllowedNotRequired(ValidateSubject, StartsWithDot, StarNotSegment, GtNotSegment, EmptySegment, GtNotLastSegment); + NotAllowedNotRequired(ValidateSubject, EndsWithDot, EndsWithDotSpace, EndsWithCr, EndsWithLf, EndsWithTab); } [Fact] public void TestValidateReplyTo() { AllowedRequired(ValidateReplyTo, Plain, HasPrintable, HasDot, HasDollar); - NotAllowedRequired(ValidateReplyTo, null, string.Empty, HasSpace, HasStar, HasGt, HasLow, Has127); + NotAllowedRequired(ValidateReplyTo, null, string.Empty, HasSpace, HasStar, HasGt, Has127); NotAllowedRequired(ValidateReplyTo, _utfOnlyStrings); AllowedNotRequiredEmptyAsNull(ValidateReplyTo, null, string.Empty); } @@ -54,7 +61,7 @@ public void TestValidateReplyTo() public void TestValidateQueueNameRequired() { AllowedRequired(ValidateQueueName, Plain, HasPrintable, HasDollar); - NotAllowedRequired(ValidateQueueName, null, string.Empty, HasSpace, HasDot, HasStar, HasGt, HasLow, Has127); + NotAllowedRequired(ValidateQueueName, null, string.Empty, HasSpace, HasDot, HasStar, HasGt, Has127); NotAllowedRequired(ValidateQueueName, _utfOnlyStrings); AllowedNotRequiredEmptyAsNull(ValidateQueueName, null, string.Empty); } @@ -63,7 +70,7 @@ public void TestValidateQueueNameRequired() public void TestValidateStreamName() { AllowedRequired(ValidateStreamName, Plain, HasPrintable, HasDollar); - NotAllowedRequired(ValidateStreamName, null, string.Empty, HasSpace, HasDot, HasStar, HasGt, HasLow, Has127); + NotAllowedRequired(ValidateStreamName, null, string.Empty, HasSpace, HasDot, HasStar, HasGt, Has127); NotAllowedRequired(ValidateStreamName, _utfOnlyStrings); AllowedNotRequiredEmptyAsNull(ValidateStreamName, null, string.Empty); } @@ -72,7 +79,7 @@ public void TestValidateStreamName() public void TestValidateDurable() { AllowedRequired(ValidateDurable, Plain, HasPrintable, HasDollar); - NotAllowedRequired(ValidateDurable, null, string.Empty, HasSpace, HasDot, HasStar, HasGt, HasLow, Has127); + NotAllowedRequired(ValidateDurable, null, string.Empty, HasSpace, HasDot, HasStar, HasGt, Has127); NotAllowedRequired(ValidateDurable, _utfOnlyStrings); AllowedNotRequiredEmptyAsNull(ValidateDurable, null, string.Empty); } @@ -213,7 +220,6 @@ public void TestValidateJetStreamPrefix() Assert.Throws(() => ValidateJetStreamPrefix(HasGt)); Assert.Throws(() => ValidateJetStreamPrefix(HasDollar)); Assert.Throws(() => ValidateJetStreamPrefix(HasSpace)); - Assert.Throws(() => ValidateJetStreamPrefix(HasLow)); } [Fact] @@ -258,6 +264,14 @@ private void AllowedRequired(Func test, params string[] st } } + private void NotAllowedRequired(Func test, params string[] strings) + { + foreach (string s in strings) + { + Assert.Throws(() => test.Invoke(s, true)); + } + } + private void AllowedNotRequiredEmptyAsNull(Func test, params string[] strings) { foreach (string s in strings) { @@ -265,11 +279,11 @@ private void AllowedNotRequiredEmptyAsNull(Func test, para } } - private void NotAllowedRequired(Func test, params string[] strings) + private void NotAllowedNotRequired(Func test, params string[] strings) { foreach (string s in strings) { - Assert.Throws(() => test.Invoke(s, true)); + Assert.Throws(() => test.Invoke(s, false)); } } diff --git a/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs b/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs index 985487042..511ccd5a9 100644 --- a/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs +++ b/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs @@ -303,10 +303,10 @@ public void ChangeFieldsIdentified() AssertNotChange(Builder(orig).WithMaxBytes(null).Build(), orig); AssertChange(Builder(orig).WithMaxBytes(1).Build(), orig, "MaxBytes"); - AssertNotChange(Builder(orig).WithFilterSubject("").Build(), orig); + AssertNotChange(Builder(orig).WithFilterSubject(string.Empty).Build(), orig); ccTest = Builder(orig).WithFilterSubject(Plain).Build(); AssertNotChange(ccTest, ccTest); - AssertChange(ccTest, orig, "FilterSubject"); + AssertChange(ccTest, orig, "FilterSubjects"); AssertNotChange(Builder(orig).WithDescription("").Build(), orig); ccTest = Builder(orig).WithDescription(Plain).Build(); diff --git a/src/Tests/UnitTests/JetStream/TestJetStreamOptions.cs b/src/Tests/UnitTests/JetStream/TestJetStreamOptions.cs index a080f3357..b3c13100a 100644 --- a/src/Tests/UnitTests/JetStream/TestJetStreamOptions.cs +++ b/src/Tests/UnitTests/JetStream/TestJetStreamOptions.cs @@ -101,9 +101,8 @@ public void TestPrefixValidation() AssertValidPrefix(HasTic); AssertInvalidPrefix(HasSpace); - AssertInvalidPrefix(HasStar); - AssertInvalidPrefix(HasGt); - AssertInvalidPrefix(HasLow); + AssertInvalidPrefix(StarNotSegment); + AssertInvalidPrefix(GtNotSegment); AssertInvalidPrefix(Has127); AssertInvalidPrefix("."); @@ -145,7 +144,6 @@ public void TestDomainValidation() AssertInvalidDomain(HasSpace); AssertInvalidDomain(HasStar); AssertInvalidDomain(HasGt); - AssertInvalidDomain(HasLow); AssertInvalidDomain(Has127); AssertInvalidDomain("."); diff --git a/src/Tests/UnitTests/JetStream/TestKeyValueConfiguration.cs b/src/Tests/UnitTests/JetStream/TestKeyValueConfiguration.cs index c467f083f..2cd77dd43 100644 --- a/src/Tests/UnitTests/JetStream/TestKeyValueConfiguration.cs +++ b/src/Tests/UnitTests/JetStream/TestKeyValueConfiguration.cs @@ -156,10 +156,9 @@ public void TestConstructionInvalidsCoverage() { Assert.Throws(() => KeyValueConfiguration.Builder().WithName(HasSpace)); Assert.Throws(() => KeyValueConfiguration.Builder().WithName(HasPrintable)); Assert.Throws(() => KeyValueConfiguration.Builder().WithName(HasDot)); - Assert.Throws(() => KeyValueConfiguration.Builder().WithName(HasStar)); - Assert.Throws(() => KeyValueConfiguration.Builder().WithName(HasGt)); + Assert.Throws(() => KeyValueConfiguration.Builder().WithName(StarNotSegment)); + Assert.Throws(() => KeyValueConfiguration.Builder().WithName(GtNotSegment)); Assert.Throws(() => KeyValueConfiguration.Builder().WithName(HasDollar)); - Assert.Throws(() => KeyValueConfiguration.Builder().WithName(HasLow)); Assert.Throws(() => KeyValueConfiguration.Builder(Has127)); Assert.Throws(() => KeyValueConfiguration.Builder(HasFwdSlash)); Assert.Throws(() => KeyValueConfiguration.Builder(HasBackSlash)); diff --git a/src/Tests/UnitTests/JetStream/TestObjectStoreApi.cs b/src/Tests/UnitTests/JetStream/TestObjectStoreApi.cs index e19313bf3..0baabcf80 100644 --- a/src/Tests/UnitTests/JetStream/TestObjectStoreApi.cs +++ b/src/Tests/UnitTests/JetStream/TestObjectStoreApi.cs @@ -298,10 +298,9 @@ public void TestConstructionInvalidsCoverage() { Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(HasSpace)); Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(HasPrintable)); Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(HasDot)); - Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(HasStar)); - Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(HasGt)); + Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(StarNotSegment)); + Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(GtNotSegment)); Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(HasDollar)); - Assert.Throws(() => ObjectStoreConfiguration.Builder().WithName(HasLow)); Assert.Throws(() => ObjectStoreConfiguration.Builder(Has127)); Assert.Throws(() => ObjectStoreConfiguration.Builder(HasFwdSlash)); Assert.Throws(() => ObjectStoreConfiguration.Builder(HasBackSlash)); diff --git a/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs b/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs index 52e0ca238..5405d3cbd 100644 --- a/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs +++ b/src/Tests/UnitTests/JetStream/TestPushPullSubscribeOptions.cs @@ -22,7 +22,7 @@ namespace UnitTests.JetStream { public class TestSubscribeOptions : TestBase { - private static readonly string[] BadNames = {HasDot, HasGt, HasStar, HasFwdSlash, HasBackSlash}; + private static readonly string[] BadNames = {HasDot, GtNotSegment, StarNotSegment, HasFwdSlash, HasBackSlash}; [Fact] public void TestPushAffirmative() diff --git a/src/Tests/UnitTests/TestBase.cs b/src/Tests/UnitTests/TestBase.cs index 6670c2b7a..0058d0181 100644 --- a/src/Tests/UnitTests/TestBase.cs +++ b/src/Tests/UnitTests/TestBase.cs @@ -54,8 +54,23 @@ public override void WriteLine(string m) // ---------------------------------------------------------------------------------------------------- // unit test // ---------------------------------------------------------------------------------------------------- + public const string StarSegment = "*.star.*.segment.*"; + public const string GtLastSegment = "gt.last.>"; + public const string GtNotLastSegment = "gt.>.notlast"; + public const string StartsWithDot = ".starts-with-dot"; + public const string EndsWithDot = "ends-with-dot."; + public const string EndsWithDotSpace = "ends-with-space. "; + public const string EndsWithCr = "ends-with-space.\r"; + public const string EndsWithLf = "ends-with-space.\n"; + public const string EndsWithTab = "ends-with-space.\t"; + public const string StarNotSegment = "star*not*segment"; + public const string GtNotSegment = "gt>not>segment"; + public const string EmptySegment = "blah..blah"; + public const string Plain = "plain"; public const string HasSpace = "has space"; + public const string StartsSpace = " startsspace"; + public const string EndsSpace = "endsspace "; public const string HasPrintable = "has-print!able"; public const string HasDot = "has.dot"; public const string HasStar = "has*star"; @@ -63,13 +78,21 @@ public override void WriteLine(string m) public const string HasDash = "has-dash"; public const string HasUnder = "has_under"; public const string HasDollar = "has$dollar"; - public const string HasLow = "has\tlower\rthan\nspace"; + public const string HasCr = "has\rcr"; + public const string HasLf = "has\nlf"; + public const string HasTab = "has\ttab"; public const string HasFwdSlash = "has/fwd/slash"; public const string HasBackSlash = "has\\back\\slash"; public const string HasEquals = "has=equals"; public const string HasTic = "has`tic"; + public static readonly string HasLow = "has" + (char)0 + "low"; public static readonly string Has127 = "has" + (char)127 + "127"; + + public static readonly string[] BadSubjectsOrQueues = new string[] { + HasSpace, HasCr, HasLf, HasTab, StartsSpace, EndsSpace, null, string.Empty + }; + public static string ReadDataFile(string name) { return File.ReadAllText(FileSpec(name)); diff --git a/src/Tests/UnitTests/TestSubscriptions.cs b/src/Tests/UnitTests/TestSubscriptions.cs index 15481ab04..4dc893cfb 100644 --- a/src/Tests/UnitTests/TestSubscriptions.cs +++ b/src/Tests/UnitTests/TestSubscriptions.cs @@ -13,13 +13,18 @@ using NATS.Client; using Xunit; +using static UnitTests.TestBase; namespace UnitTests { public class TestSubscriptions { - static readonly string[] invalidSubjects = { "foo bar", "foo..bar", ".foo", "bar.baz.", "baz\t.foo" }; - static readonly string[] invalidQNames = { "foo group", "group\t1", "g1\r\n2" }; + static readonly string[] invalid = + { + string.Empty, HasSpace, HasCr, HasLf, HasTab, + StartsWithDot, StarNotSegment, GtNotSegment, EmptySegment, + EndsWithDot, EndsWithDotSpace, EndsWithDot, EndsWithCr, EndsWithLf, EndsWithTab + }; [Fact] public void TestSubscriptionValidationAPI() @@ -27,12 +32,12 @@ public void TestSubscriptionValidationAPI() Assert.True(Subscription.IsValidSubject("foo")); Assert.True(Subscription.IsValidSubject("foo.bar")); - foreach (string s in invalidSubjects) + foreach (string s in invalid) { Assert.False(Subscription.IsValidSubject(s)); } - foreach (string s in invalidQNames) + foreach (string s in invalid) { Assert.False(Subscription.IsValidQueueGroupName(s)); }