Skip to content

Commit

Permalink
consumer create for server v2.9.0 (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Sep 15, 2022
1 parent 07fe2e2 commit a98a0b9
Show file tree
Hide file tree
Showing 18 changed files with 420 additions and 180 deletions.
79 changes: 45 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -944,37 +944,48 @@ in the JetStream samples for detailed and runnable samples.

Subscription creation has many checks to make sure that a valid, operable subscription can be made.

| Name | Group | Code | Description |
| --- | --- | --- | --- |
| JsSoDurableMismatch | SO | 90101 | Builder durable must match the consumer configuration durable if both are provided. |
| JsSoDeliverGroupMismatch | SO | 90102 | Builder deliver group must match the consumer configuration deliver group if both are provided. |
| JsSoDeliverSubjectMismatch | SO | 90103 | Builder deliver subject must match the consumer configuration deliver subject if both are provided. |
| JsSoOrderedNotAllowedWithBind | SO | 90104 | Bind is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDeliverGroup | SO | 90105 | Deliver group is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. |
| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. |
| JsSoOrderedRequiresMaxDeliver | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. |
| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. |
| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. |
| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. |
| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. |
| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. |
| JsSubFcHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. |
| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. |
| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. |
| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. |
| _removed_ | SUB | 90010 | |
| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. |
| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. |
| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. |
| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. |
| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. |
| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. |
| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. |
| JsSubOrderedNotAllowOnQueues | SUB | 90018 | Ordered consumer not allowed on queues. |
| JsSubPushCantHaveMaxBatch | SUB | 90019 | Push subscriptions cannot supply max batch. |
| JsSubPushCantHaveMaxBytes | SUB | 90020 | Push subscriptions cannot supply max bytes. |
| Name | Group | Code | Description |
|----------------------------------------------|-------|-------|-----------------------------------------------------------------------------------------------------|
| JsSoDurableMismatch | SO | 90101 | Builder durable must match the consumer configuration durable if both are provided. |
| JsSoDeliverGroupMismatch | SO | 90102 | Builder deliver group must match the consumer configuration deliver group if both are provided. |
| JsSoDeliverSubjectMismatch | SO | 90103 | Builder deliver subject must match the consumer configuration deliver subject if both are provided. |
| JsSoOrderedNotAllowedWithBind | SO | 90104 | Bind is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDeliverGroup | SO | 90105 | Deliver group is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDurable | SO | 90106 | Durable is not allowed with an ordered consumer. |
| JsSoOrderedNotAllowedWithDeliverSubject | SO | 90107 | Deliver subject is not allowed with an ordered consumer. |
| JsSoOrderedRequiresAckPolicyNone | SO | 90108 | Ordered consumer requires Ack Policy None. |
| JsSoOrderedRequiresMaxDeliver | SO | 90109 | Max deliver is limited to 1 with an ordered consumer. |
| JsSoNameMismatch | SO | 90110 | Builder name must match the consumer configuration name if both are provided. |
| JsSubPullCantHaveDeliverGroup | SUB | 90001 | Pull subscriptions can't have a deliver group. |
| JsSubPullCantHaveDeliverSubject | SUB | 90002 | Pull subscriptions can't have a deliver subject. |
| JsSubPushCantHaveMaxPullWaiting | SUB | 90003 | Push subscriptions cannot supply max pull waiting. |
| JsSubQueueDeliverGroupMismatch | SUB | 90004 | Queue / deliver group mismatch. |
| JsSubFcHbNotValidPull | SUB | 90005 | Flow Control and/or heartbeat is not valid with a pull subscription. |
| JsSubFcHbNotValidQueue | SUB | 90006 | Flow Control and/or heartbeat is not valid in queue mode. |
| JsSubNoMatchingStreamForSubject | SUB | 90007 | No matching streams for subject. |
| JsSubConsumerAlreadyConfiguredAsPush | SUB | 90008 | Consumer is already configured as a push consumer. |
| JsSubConsumerAlreadyConfiguredAsPull | SUB | 90009 | Consumer is already configured as a pull consumer. |
| _removed_ | SUB | 90010 | |
| JsSubSubjectDoesNotMatchFilter | SUB | 90011 | Subject does not match consumer configuration filter. |
| JsSubConsumerAlreadyBound | SUB | 90012 | Consumer is already bound to a subscription. |
| JsSubExistingConsumerNotQueue | SUB | 90013 | Existing consumer is not configured as a queue / deliver group. |
| JsSubExistingConsumerIsQueue | SUB | 90014 | Existing consumer is configured as a queue / deliver group. |
| JsSubExistingQueueDoesNotMatchRequestedQueue | SUB | 90015 | Existing consumer deliver group does not match requested queue / deliver group. |
| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. |
| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. |
| JsSubOrderedNotAllowOnQueues | SUB | 90018 | Ordered consumer not allowed on queues. |
| JsSubPushCantHaveMaxBatch | SUB | 90019 | Push subscriptions cannot supply max batch. |
| JsSubPushCantHaveMaxBytes | SUB | 90020 | Push subscriptions cannot supply max bytes. |
| OsObjectNotFound | OS | 90201 | The object was not found. |
| OsObjectIsDeleted | OS | 90202 | The object is deleted. |
| OsObjectAlreadyExists | OS | 90203 | An object with that name already exists. |
| OsCantLinkToLink | OS | 90204 | A link cannot link to another link. |
| OsGetDigestMismatch | OS | 90205 | Digest does not match meta data. |
| OsGetChunksMismatch | OS | 90206 | Number of chunks does not match meta data. |
| OsGetSizeMismatch | OS | 90207 | Total size does not match meta data. |
| OsGetLinkToBucket | OS | 90208 | Cannot get object, it is a link to a bucket. |
| JsConsumerCantUseNameBefore290 | CON | 90301 | Name field not valid against pre v2.9.0 servers. |
| JsConsumerNameDurableMismatch | CON | 90302 | Name must match durable if both are supplied. |

### Message Acknowledgements

Expand Down Expand Up @@ -1053,9 +1064,9 @@ To that end, with any contributions, certainly feel free to code in a more .NET

## TODO

* [ ] Key Value
* [ ] Ordered Consumer
* [ ] Object Store
* [x] Key Value
* [x] Ordered Consumer
* [x] Object Store
* [x] JetStream
* [ ] Another performance pass - look at stream directly over socket, contention, fastpath optimizations, rw locks.
* [ ] Rx API (unified over NATS Streaming?)
Expand Down
15 changes: 11 additions & 4 deletions src/NATS.Client/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ public class NATSJetStreamClientException : NATSException
{
private ClientExDetail _detail;

internal NATSJetStreamClientException(ClientExDetail detail)
: base(detail.Message)
internal NATSJetStreamClientException(ClientExDetail detail) : base(detail.Message)
{
_detail = detail;
}
Expand Down Expand Up @@ -248,26 +247,31 @@ public sealed class ClientExDetail

public static readonly ClientExDetail JsSoDurableMismatch = new ClientExDetail(So, 90101, "Builder durable must match the consumer configuration durable if both are provided.");
public static readonly ClientExDetail JsSoDeliverGroupMismatch = new ClientExDetail(So, 90102, "Builder deliver group must match the consumer configuration deliver group if both are provided.");
public static readonly ClientExDetail JsSoDeliverSubjectGroupMismatch = new ClientExDetail(So, 90103, "Builder deliver subject must match the consumer configuration deliver subject if both are provided.");
public static readonly ClientExDetail JsSoDeliverSubjectMismatch = new ClientExDetail(So, 90103, "Builder deliver subject must match the consumer configuration deliver subject if both are provided.");
public static readonly ClientExDetail JsSoOrderedNotAllowedWithBind = new ClientExDetail(So, 90104, "Bind is not allowed with an ordered consumer.");
public static readonly ClientExDetail JsSoOrderedNotAllowedWithDeliverGroup = new ClientExDetail(So, 90105, "Deliver group is not allowed with an ordered consumer.");
public static readonly ClientExDetail JsSoOrderedNotAllowedWithDurable = new ClientExDetail(So, 90106, "Durable is not allowed with an ordered consumer.");
public static readonly ClientExDetail JsSoOrderedNotAllowedWithDeliverSubject = new ClientExDetail(So, 90107, "Deliver subject is not allowed with an ordered consumer.");
public static readonly ClientExDetail JsSoOrderedRequiresAckPolicyNone = new ClientExDetail(So, 90108, "Ordered consumer requires Ack Policy None.");
public static readonly ClientExDetail JsSoOrderedRequiresMaxDeliver = new ClientExDetail(So, 90109, "Max deliver is limited to 1 with an ordered consumer.");
public static readonly ClientExDetail JsSoNameMismatch = new ClientExDetail(So, 90110, "Builder name must match the consumer configuration name if both are provided.");

public static readonly ClientExDetail OsObjectNotFound = new ClientExDetail(Os, 90201, "The object was not found.");
public static readonly ClientExDetail OsObjectIsDeleted = new ClientExDetail(Os, 90202, "The object is deleted.");
public static readonly ClientExDetail OsObjectAlreadyExists = new ClientExDetail(Os, 90203, "An object with that name already exists.");
public static readonly ClientExDetail OsCantLinkToLink = new ClientExDetail(Os, 90204, "A link cannot link to another link.");
public static readonly ClientExDetail OsGetDigestMismatch = new ClientExDetail(Os, 90205, "Digest does not match meta data.");
public static readonly ClientExDetail OsGetChunksMismatch = new ClientExDetail(Os, 90206, "Number of chunks ddoes not match meta data.");
public static readonly ClientExDetail OsGetChunksMismatch = new ClientExDetail(Os, 90206, "Number of chunks does not match meta data.");
public static readonly ClientExDetail OsGetSizeMismatch = new ClientExDetail(Os, 90207, "Total size does not match meta data.");
public static readonly ClientExDetail OsGetLinkToBucket = new ClientExDetail(Os, 90208, "Cannot get object, it is a link to a bucket.");

public static readonly ClientExDetail JsConsumerCantUseNameBefore290 = new ClientExDetail(Con, 90301, "Name field not valid against pre v2.9.0 servers.");
public static readonly ClientExDetail JsConsumerNameDurableMismatch = new ClientExDetail(Con, 90302, "Name must match durable if both are supplied.");

private const string Sub = "SUB";
private const string So = "SO";
private const string Os = "OS";
private const string Con = "CON";

public string Id { get; }
public string Message { get; }
Expand Down Expand Up @@ -295,5 +299,8 @@ internal NATSJetStreamClientException Instance(string extraMessage)

[Obsolete("constant name had typo, replaced with JsSubFcHbNotValidQueue")]
public static readonly ClientExDetail JsSubFcHbHbNotValidQueue = new ClientExDetail(Sub, 90006, "Flow Control and/or heartbeat is not valid in queue mode.");

[Obsolete("constant name had typo, replaced with JsSoDeliverSubjectMismatch")]
public static readonly ClientExDetail JsSoDeliverSubjectGroupMismatch = new ClientExDetail(So, 90103, "Builder deliver subject must match the consumer configuration deliver subject if both are provided.");
}
}
4 changes: 4 additions & 0 deletions src/NATS.Client/Internals/JetStreamConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public static class JetStreamConstants
/// </summary>
public const string JsapiConsumerCreate = "CONSUMER.CREATE.{0}";

public const string JsapiConsumerCreateV290 = "CONSUMER.CREATE.{0}.{1}";

public const string JsapiConsumerCreateV290WithFilter = "CONSUMER.CREATE.{0}.{1}.{2}";

/// <summary>
/// JSAPI_DURABLE_CREATE is used to create durable consumers.
/// </summary>
Expand Down
28 changes: 19 additions & 9 deletions src/NATS.Client/Internals/Validator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ internal static class Validator
private static readonly char[] WildGt = { '*', '>'};
private static readonly char[] WildGtDot = { '*', '>', '.'};
private static readonly char[] WildGtDollar = {'*', '>', '$'};
private static readonly char[] WildGtDotSlashes = { '*', '>', '.', '\\', '/'};

internal static string Required(string s, string label) {
if (EmptyAsNull(s) == null) {
Expand All @@ -32,20 +33,15 @@ public static string ValidateQueueName(string s, bool required) {
}

public static string ValidateStreamName(string s, bool required) {
return ValidatePrintableExceptWildDotGt(s, "Stream", required);
return ValidatePrintableExceptWildDotGtSlashes(s, "Stream", required);
}

public static string ValidateDurable(string s, bool required) {
return ValidatePrintableExceptWildDotGt(s, "Durable", required);
return ValidatePrintableExceptWildDotGtSlashes(s, "Durable", required);
}

public static string ValidateDurableRequired(string durable, ConsumerConfiguration cc)
{
if (durable != null) return ValidateDurable(durable, true);
if (cc != null) return ValidateDurable(cc.Durable, true);

throw new ArgumentException(
"Durable is required and cannot contain a '.', '*' or '>' [null]");
public static string ValidateConsumerName(string s, bool required) {
return ValidatePrintableExceptWildDotGtSlashes(s, "Name", required);
}

public static string ValidatePrefixOrDomain(string s, string label, bool required) {
Expand Down Expand Up @@ -146,6 +142,16 @@ public static string ValidatePrintableExceptWildDotGt(string s, string label, bo
});
}

public static string ValidatePrintableExceptWildDotGtSlashes(string s, string label, bool required)
{
return Validate(s, required, label, () => {
if (NotPrintableOrHasWildGtDotSlashes(s)) {
throw new ArgumentException($"{label} must be in the printable ASCII range and cannot include `*` or `.` [{s}]");
}
return s;
});
}

public static string ValidatePrintableExceptWildGt(string s, string label, bool required)
{
return Validate(s, required, label, () => {
Expand Down Expand Up @@ -517,6 +523,10 @@ public static bool NotPrintableOrHasWildGtDot(string s) {
return NotPrintableOrHasChars(s, WildGtDot);
}

public static bool NotPrintableOrHasWildGtDotSlashes(string s) {
return NotPrintableOrHasChars(s, WildGtDotSlashes);
}

public static bool NotPrintableOrHasWildGtDollar(string s) {
return NotPrintableOrHasChars(s, WildGtDollar);
}
Expand Down
Loading

0 comments on commit a98a0b9

Please sign in to comment.