From a7c301f4383977e243ed942cc53a0324711d4e7b Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Thu, 7 Mar 2024 14:58:27 -0500 Subject: [PATCH] Pause and Resume consumer (#874) --- .../Internals/JetStreamConstants.cs | 5 ++ src/NATS.Client/Internals/JsonUtils.cs | 22 ++++++ src/NATS.Client/JetStream/ApiConstants.cs | 3 + .../JetStream/ConsumerConfiguration.cs | 23 ++++++ src/NATS.Client/JetStream/ConsumerInfo.cs | 9 ++- .../JetStream/ConsumerPauseRequest.cs | 36 +++++++++ .../JetStream/ConsumerPauseResponse.cs | 51 ++++++++++++ .../JetStream/IJetStreamManagement.cs | 18 +++++ .../JetStream/JetStreamManagement.cs | 20 +++++ .../TestJetStreamManagement.cs | 78 +++++++++++++++++++ src/Tests/IntegrationTests/TestSuite.cs | 4 + 11 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 src/NATS.Client/JetStream/ConsumerPauseRequest.cs create mode 100644 src/NATS.Client/JetStream/ConsumerPauseResponse.cs diff --git a/src/NATS.Client/Internals/JetStreamConstants.cs b/src/NATS.Client/Internals/JetStreamConstants.cs index 5280f3273..483158629 100644 --- a/src/NATS.Client/Internals/JetStreamConstants.cs +++ b/src/NATS.Client/Internals/JetStreamConstants.cs @@ -74,6 +74,11 @@ public static class JetStreamConstants /// public const string JsapiConsumerDelete = "CONSUMER.DELETE.{0}.{1}"; + /// + /// JSAPI_CONSUMER_PAUSE is used to delete consumers. + /// + public const string JsapiConsumerPause = "CONSUMER.PAUSE.{0}.{1}"; + /// /// JSAPI_CONSUMER_NAMES is used to return a list of consumer names /// diff --git a/src/NATS.Client/Internals/JsonUtils.cs b/src/NATS.Client/Internals/JsonUtils.cs index a8ac7d508..9ef15791a 100644 --- a/src/NATS.Client/Internals/JsonUtils.cs +++ b/src/NATS.Client/Internals/JsonUtils.cs @@ -151,12 +151,34 @@ public static DateTime AsDate(JSONNode node) return DateTime.MinValue; } } + + public static DateTime? AsOptionalDate(JSONNode node) + { + try + { + if (node.IsNull) + { + return null; + } + return DateTime.Parse(node.Value).ToUniversalTime(); + } + catch (Exception) + { + return null; + } + } public static string ToString(DateTime dt) { // Assume MinValue is Unset return dt.Equals(DateTime.MinValue) ? null : UnsafeToString(dt); } + + public static string ToString(DateTime? dt) + { + // Assume MinValue is Unset + return !dt.HasValue || dt.Equals(DateTime.MinValue) ? null : UnsafeToString(dt.Value); + } public static string UnsafeToString(DateTime dt) { diff --git a/src/NATS.Client/JetStream/ApiConstants.cs b/src/NATS.Client/JetStream/ApiConstants.cs index 1c7bd1862..41eff8e7d 100644 --- a/src/NATS.Client/JetStream/ApiConstants.cs +++ b/src/NATS.Client/JetStream/ApiConstants.cs @@ -143,6 +143,9 @@ public static class ApiConstants public const string Options = "options"; public const string OptStartSeq = "opt_start_seq"; public const string OptStartTime = "opt_start_time"; + public const string Paused = "paused"; + public const string PauseRemaining = "pause_remaining"; + public const string PauseUntil = "pause_until"; public const string Placement = "placement"; public const string ProcessingTime = "processing_time"; public const string Republish = "republish"; diff --git a/src/NATS.Client/JetStream/ConsumerConfiguration.cs b/src/NATS.Client/JetStream/ConsumerConfiguration.cs index 57d1bb285..efee9758f 100644 --- a/src/NATS.Client/JetStream/ConsumerConfiguration.cs +++ b/src/NATS.Client/JetStream/ConsumerConfiguration.cs @@ -67,6 +67,7 @@ public sealed class ConsumerConfiguration : JsonSerializable public string DeliverGroup { get; } public string SampleFrequency { get; } public DateTime StartTime { get; } + public DateTime? PauseUntil { get; } public Duration AckWait { get; } public Duration IdleHeartbeat { get; } public Duration MaxExpires { get; } @@ -113,6 +114,7 @@ internal ConsumerConfiguration(JSONNode ccNode) SampleFrequency = ccNode[ApiConstants.SampleFreq].Value; StartTime = AsDate(ccNode[ApiConstants.OptStartTime]); + PauseUntil = AsOptionalDate(ccNode[ApiConstants.PauseUntil]); AckWait = AsDuration(ccNode, ApiConstants.AckWait, null); IdleHeartbeat = AsDuration(ccNode, ApiConstants.IdleHeartbeat, null); MaxExpires = AsDuration(ccNode, ApiConstants.MaxExpires, null); @@ -158,6 +160,7 @@ private ConsumerConfiguration(ConsumerConfigurationBuilder builder) SampleFrequency = builder._sampleFrequency; StartTime = builder._startTime; + PauseUntil = builder._pauseUntil; AckWait = builder._ackWait; IdleHeartbeat = builder._idleHeartbeat; MaxExpires = builder._maxExpires; @@ -192,6 +195,7 @@ public override JSONNode ToJsonNode() AddField(o, ApiConstants.DeliverGroup, DeliverGroup); AddField(o, ApiConstants.OptStartSeq, StartSeq); AddField(o, ApiConstants.OptStartTime, JsonUtils.ToString(StartTime)); + AddField(o, ApiConstants.PauseUntil, JsonUtils.ToString(PauseUntil)); AddField(o, ApiConstants.AckPolicy, AckPolicy.GetString()); AddField(o, ApiConstants.AckWait, AckWait); AddField(o, ApiConstants.MaxDeliver, MaxDeliver); @@ -252,6 +256,7 @@ internal IList GetChanges(ConsumerConfiguration server) if (InactiveThreshold != null && !InactiveThreshold.Equals(server.InactiveThreshold)) { changes.Add("InactiveThreshold"); } RecordWouldBeChange(StartTime, server.StartTime, "StartTime", changes); + RecordWouldBeChange(PauseUntil, server.PauseUntil, "PauseUntil", changes); RecordWouldBeChange(Description, server.Description, "Description", changes); RecordWouldBeChange(SampleFrequency, server.SampleFrequency, "SampleFrequency", changes); @@ -275,6 +280,11 @@ private void RecordWouldBeChange(DateTime request, DateTime server, string field { if (request != DateTime.MinValue && !request.Equals(server)) { changes.Add(field); } } + + private void RecordWouldBeChange(DateTime? request, DateTime? server, string field, IList changes) + { + RecordWouldBeChange(request.GetValueOrDefault(DateTime.MinValue), server.GetValueOrDefault(DateTime.MinValue), field, changes); + } internal static int GetOrUnset(int? val) { @@ -341,6 +351,7 @@ public sealed class ConsumerConfigurationBuilder internal string _sampleFrequency; internal DateTime _startTime; + internal DateTime? _pauseUntil; internal Duration _ackWait; internal Duration _idleHeartbeat; internal Duration _maxExpires; @@ -379,6 +390,7 @@ public ConsumerConfigurationBuilder(ConsumerConfiguration cc) _sampleFrequency = cc.SampleFrequency; _startTime = cc.StartTime; + _pauseUntil = cc.PauseUntil; _ackWait = cc.AckWait; _idleHeartbeat = cc.IdleHeartbeat; _maxExpires = cc.MaxExpires; @@ -506,6 +518,17 @@ public ConsumerConfigurationBuilder WithStartTime(DateTime startTime) return this; } + /// + /// Sets the time to pause the consumer until + /// + /// the time to pause the consumer until + /// The ConsumerConfigurationBuilder + public ConsumerConfigurationBuilder WithPauseUntil(DateTime? pauseUntil) + { + _pauseUntil = pauseUntil; + return this; + } + /// /// Sets the acknowledgement policy of the ConsumerConfiguration. /// diff --git a/src/NATS.Client/JetStream/ConsumerInfo.cs b/src/NATS.Client/JetStream/ConsumerInfo.cs index b7e31eb4b..040763a0b 100644 --- a/src/NATS.Client/JetStream/ConsumerInfo.cs +++ b/src/NATS.Client/JetStream/ConsumerInfo.cs @@ -12,6 +12,7 @@ // limitations under the License. using System; +using NATS.Client.Internals; using NATS.Client.Internals.SimpleJSON; using static NATS.Client.Internals.JsonUtils; @@ -33,7 +34,9 @@ public sealed class ConsumerInfo : ApiResponse public bool PushBound { get; private set; } public ulong CalculatedPending => NumPending + Delivered.ConsumerSeq; public DateTime Timestamp { get; private set; } - + public bool Paused { get; private set; } + public Duration PauseRemaining { get; private set; } + internal ConsumerInfo(Msg msg, bool throwOnError) : base(msg, throwOnError) { Init(JsonNode); @@ -64,6 +67,8 @@ private void Init(JSONNode ciNode) ClusterInfo = ClusterInfo.OptionalInstance(ciNode[ApiConstants.Cluster]); PushBound = ciNode[ApiConstants.PushBound].AsBool; Timestamp = AsDate(ciNode[ApiConstants.Timestamp]); + Paused = ciNode[ApiConstants.Paused].AsBool; + PauseRemaining = Paused ? JsonUtils.AsDuration(ciNode, ApiConstants.PauseRemaining, Duration.Zero) : null; } public override string ToString() @@ -80,6 +85,8 @@ public override string ToString() ", Timestamp=" + Timestamp + ", Delivered=" + Delivered + ", AckFloor=" + AckFloor + + ", Paused=" + Paused + + ", PauseRemaining=" + PauseRemaining + ", " + ObjectString("ClusterInfo", ClusterInfo) + ", " + "ConsumerConfiguration" + ConsumerConfiguration.ToJsonString() + '}'; diff --git a/src/NATS.Client/JetStream/ConsumerPauseRequest.cs b/src/NATS.Client/JetStream/ConsumerPauseRequest.cs new file mode 100644 index 000000000..207d171c1 --- /dev/null +++ b/src/NATS.Client/JetStream/ConsumerPauseRequest.cs @@ -0,0 +1,36 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using NATS.Client.Internals; +using NATS.Client.Internals.SimpleJSON; + +namespace NATS.Client.JetStream +{ + public sealed class ConsumerPauseRequest : JsonSerializable + { + public DateTime PauseUntil { get; } + + internal ConsumerPauseRequest(DateTime pauseUntil) + { + PauseUntil = pauseUntil; + } + + public override JSONNode ToJsonNode() + { + JSONObject jso = new JSONObject(); + JsonUtils.AddField(jso, ApiConstants.PauseUntil, PauseUntil); + return jso; + } + } +} diff --git a/src/NATS.Client/JetStream/ConsumerPauseResponse.cs b/src/NATS.Client/JetStream/ConsumerPauseResponse.cs new file mode 100644 index 000000000..1168687c4 --- /dev/null +++ b/src/NATS.Client/JetStream/ConsumerPauseResponse.cs @@ -0,0 +1,51 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using NATS.Client.Internals; +using static NATS.Client.Internals.JsonUtils; + +namespace NATS.Client.JetStream +{ + public sealed class ConsumerPauseResponse : ApiResponse + { + public bool Paused { get; private set; } + public DateTime? PauseUntil { get; private set; } + public Duration PauseRemaining { get; private set; } + + internal ConsumerPauseResponse(Msg msg, bool throwOnError) : base(msg, throwOnError) + { + Init(); + } + + internal ConsumerPauseResponse(string json, bool throwOnError) : base(json, throwOnError) + { + Init(); + } + + private void Init() + { + Paused = JsonNode[ApiConstants.Paused].AsBool; + if (Paused) + { + PauseUntil = AsDate(JsonNode[ApiConstants.PauseUntil]); + PauseRemaining = JsonUtils.AsDuration(JsonNode, ApiConstants.PauseRemaining, Duration.Zero); + } + else + { + PauseUntil = null; + PauseRemaining = null; + } + } + } +} diff --git a/src/NATS.Client/JetStream/IJetStreamManagement.cs b/src/NATS.Client/JetStream/IJetStreamManagement.cs index cce96cfe1..754a61521 100644 --- a/src/NATS.Client/JetStream/IJetStreamManagement.cs +++ b/src/NATS.Client/JetStream/IJetStreamManagement.cs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System; using System.Collections.Generic; namespace NATS.Client.JetStream @@ -97,6 +98,23 @@ public interface IJetStreamManagement /// True if the consumer was deleted. bool DeleteConsumer(string streamName, string consumer); + /// + /// Pauses a consumer. + /// + /// The name of the stream the consumer is attached to. + /// The name of the consumer. + /// Consumer is paused until this time. + /// ConsumerPauseResponse. + ConsumerPauseResponse PauseConsumer(string streamName, string consumer, DateTime pauseUntil); + + /// + /// Resumes a consumer. + /// + /// The name of the stream the consumer is attached to. + /// The name of the consumer. + /// True if the resume succeeded. + bool ResumeConsumer(string streamName, string consumer); + /// /// Gets information for an existing consumer. /// diff --git a/src/NATS.Client/JetStream/JetStreamManagement.cs b/src/NATS.Client/JetStream/JetStreamManagement.cs index 91f30bbf5..8fe742061 100644 --- a/src/NATS.Client/JetStream/JetStreamManagement.cs +++ b/src/NATS.Client/JetStream/JetStreamManagement.cs @@ -101,6 +101,26 @@ public bool DeleteConsumer(string streamName, string consumer) return new SuccessApiResponse(m, true).Success; } + public ConsumerPauseResponse PauseConsumer(string streamName, string consumer, DateTime pauseUntil) + { + Validator.ValidateStreamName(streamName, true); + Validator.ValidateNotNull(consumer, nameof(consumer)); + string subj = string.Format(JetStreamConstants.JsapiConsumerPause, streamName, consumer); + ConsumerPauseRequest cprq = new ConsumerPauseRequest(pauseUntil); + Msg m = RequestResponseRequired(subj, cprq.Serialize(), Timeout); + return new ConsumerPauseResponse(m, true); + } + + public bool ResumeConsumer(string streamName, string consumer) + { + Validator.ValidateStreamName(streamName, true); + Validator.ValidateNotNull(consumer, nameof(consumer)); + string subj = string.Format(JetStreamConstants.JsapiConsumerPause, streamName, consumer); + Msg m = RequestResponseRequired(subj, null, Timeout); + ConsumerPauseResponse cpre = new ConsumerPauseResponse(m, true); + return !cpre.Paused; + } + public ConsumerInfo GetConsumerInfo(string streamName, string consumer) { Validator.ValidateStreamName(streamName, true); diff --git a/src/Tests/IntegrationTests/TestJetStreamManagement.cs b/src/Tests/IntegrationTests/TestJetStreamManagement.cs index b3603d912..4ce871112 100644 --- a/src/Tests/IntegrationTests/TestJetStreamManagement.cs +++ b/src/Tests/IntegrationTests/TestJetStreamManagement.cs @@ -1117,5 +1117,83 @@ public void TestDirectMessageRepublishedSubject() Assert.Equal("tres", kve3.ValueAsString()); }); } + + [Fact] + public void TestPauseConsumer() + { + Context.RunInJsServer(AtLeast2_11, c => + { + IJetStreamManagement jsm = c.CreateJetStreamManagementContext(); + string stream = Stream(); + string subject = Subject(); + string con = Name(); + CreateMemoryStream(c, stream, subject); + + IList list = jsm.GetConsumers(stream); + Assert.Empty(list); + + // Add a consumer with pause + DateTime pauseUntil = DateTime.Now.Add(TimeSpan.FromMinutes(2)); + ConsumerConfiguration cc = ConsumerConfiguration.Builder().WithPauseUntil(pauseUntil).Build(); + + ConsumerInfo ci = jsm.AddOrUpdateConsumer(stream, cc); + Assert.True(ci.Paused); + Assert.True(ci.PauseRemaining.Millis > 60000); + Assert.Equal(pauseUntil.ToUniversalTime(), ci.ConsumerConfiguration.PauseUntil.Value); + + // Add a consumer + string name = Name(); + cc = ConsumerConfiguration.Builder().WithName(name).Build(); + ci = jsm.AddOrUpdateConsumer(stream, cc); + Assert.NotNull(name); + Assert.False(ci.Paused); + Assert.Null(ci.PauseRemaining); + + // Pause + ConsumerPauseResponse cpre = jsm.PauseConsumer(stream, name, pauseUntil); + Assert.True(cpre.Paused); + Assert.True(cpre.PauseRemaining.Millis > 60000); + Assert.Equal(pauseUntil.ToUniversalTime(), cpre.PauseUntil.Value); + + ci = jsm.GetConsumerInfo(stream, name); + Assert.True(ci.Paused); + Assert.True(ci.PauseRemaining.Millis > 60000); + Assert.Equal(pauseUntil.ToUniversalTime(), ci.ConsumerConfiguration.PauseUntil.Value); + + // Resume + Assert.True(jsm.ResumeConsumer(stream, name)); + ci = jsm.GetConsumerInfo(stream, name); + Assert.False(ci.Paused); + Assert.Null(ci.PauseRemaining); + Assert.False(ci.ConsumerConfiguration.PauseUntil.HasValue); + + // Pause again + cpre = jsm.PauseConsumer(stream, name, pauseUntil); + Assert.True(cpre.Paused); + Assert.True(cpre.PauseRemaining.Millis > 60000); + Assert.Equal(pauseUntil.ToUniversalTime(), cpre.PauseUntil.Value); + + ci = jsm.GetConsumerInfo(stream, name); + Assert.True(ci.Paused); + Assert.True(ci.PauseRemaining.Millis > 60000); + Assert.Equal(pauseUntil.ToUniversalTime(), ci.ConsumerConfiguration.PauseUntil.Value); + + // Resume via pause with no date + cpre = jsm.PauseConsumer(stream, name, DateTime.MinValue); + Assert.False(cpre.Paused); + Assert.Null(cpre.PauseRemaining); + Assert.False(cpre.PauseUntil.HasValue); + + ci = jsm.GetConsumerInfo(stream, name); + Assert.False(ci.Paused); + Assert.Null(cpre.PauseRemaining); + Assert.False(cpre.PauseUntil.HasValue); + + Assert.Throws(() => jsm.PauseConsumer(Stream(), name, pauseUntil)); + Assert.Throws(() => jsm.PauseConsumer(stream, Name(), pauseUntil)); + Assert.Throws(() => jsm.ResumeConsumer(Stream(), name)); + Assert.Throws(() => jsm.ResumeConsumer(stream, Name())); + }); + } } } diff --git a/src/Tests/IntegrationTests/TestSuite.cs b/src/Tests/IntegrationTests/TestSuite.cs index 67aa81ec2..9002282a6 100644 --- a/src/Tests/IntegrationTests/TestSuite.cs +++ b/src/Tests/IntegrationTests/TestSuite.cs @@ -51,6 +51,10 @@ public bool AtLeast2_10(ServerInfo si) { public bool AtLeast2_10_3(ServerInfo si) { return si.IsSameOrNewerThanVersion("2.10.3"); } + + public bool AtLeast2_11(ServerInfo si) { + return si.IsNewerVersionThan("2.10.99"); + } } ///