From 44bca0a18f85a7b685c132a39f9d9aed028e1fa3 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Tue, 13 Feb 2024 16:00:10 +0100 Subject: [PATCH] Adds the ability to pause and resume consumers Additionally the ability to create a consumer paused till a dealine and associated advisories Signed-off-by: R.I.Pienaar --- api/consumers.go | 19 +++++ api/gen.go | 3 + api/jetstream/advisory/consumer_pause.go | 42 +++++++++++ api/schemas_generated.go | 61 +++++++++++++++- consumers.go | 45 ++++++++++++ go.mod | 12 ++-- go.sum | 24 +++---- .../nats/context/user_pass_token_creds.json | 6 +- .../jetstream/advisory/v1/consumer_pause.json | 50 ++++++++++++++ .../api/v1/account_purge_response.json | 2 +- .../api/v1/consumer_pause_request.json | 13 ++++ .../api/v1/consumer_pause_response.json | 35 ++++++++++ .../jetstream/api/v1/definitions.json | 12 ++++ .../jetstream/advisory/v1/consumer_pause.json | 50 ++++++++++++++ .../api/v1/account_purge_response.json | 2 +- .../api/v1/consumer_configuration.json | 6 ++ .../api/v1/consumer_create_request.json | 6 ++ .../api/v1/consumer_create_response.json | 16 +++++ .../api/v1/consumer_info_response.json | 16 +++++ .../api/v1/consumer_list_response.json | 16 +++++ .../api/v1/consumer_pause_request.json | 15 ++++ .../api/v1/consumer_pause_response.json | 69 +++++++++++++++++++ 22 files changed, 498 insertions(+), 22 deletions(-) create mode 100644 api/jetstream/advisory/consumer_pause.go create mode 100644 schema_source/jetstream/advisory/v1/consumer_pause.json create mode 100644 schema_source/jetstream/api/v1/consumer_pause_request.json create mode 100644 schema_source/jetstream/api/v1/consumer_pause_response.json create mode 100644 schemas/jetstream/advisory/v1/consumer_pause.json create mode 100644 schemas/jetstream/api/v1/consumer_pause_request.json create mode 100644 schemas/jetstream/api/v1/consumer_pause_response.json diff --git a/api/consumers.go b/api/consumers.go index f3e7a309..2d078650 100644 --- a/api/consumers.go +++ b/api/consumers.go @@ -38,6 +38,7 @@ const ( JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s" JSApiRequestNext = "$JS.API.CONSUMER.MSG.NEXT.*.*" JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s" + JSApiconsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s" JSMetricConsumerAckPre = JSMetricPrefix + ".CONSUMER.ACK" JSAdvisoryConsumerMaxDeliveryExceedPre = JSAdvisoryPrefix + ".CONSUMER.MAX_DELIVERIES" ) @@ -150,6 +151,19 @@ type JSApiConsumerLeaderStepDownResponse struct { Success bool `json:"success,omitempty"` } +// io.nats.jetstream.api.v1.consumer_pause_request +type JSApiConsumerPauseRequest struct { + PauseUntil time.Time `json:"pause_until,omitempty"` +} + +// io.nats.jetstream.api.v1.consumer_pause_response +type JSApiConsumerPauseResponse struct { + JSApiResponse + Paused bool `json:"paused"` + PauseUntil time.Time `json:"pause_until"` + PauseRemaining time.Duration `json:"pause_remaining,omitempty"` +} + type AckPolicy int const ( @@ -352,6 +366,9 @@ type ConsumerConfig struct { // Metadata is additional metadata for the Consumer. Metadata map[string]string `json:"metadata,omitempty"` + // PauseUntil is for suspending the consumer until the deadline. + PauseUntil time.Time `json:"pause_until,omitempty"` + // Don't add to general clients. Direct bool `json:"direct,omitempty"` } @@ -377,6 +394,8 @@ type ConsumerInfo struct { NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` + Paused bool `json:"paused,omitempty"` + PauseRemaining time.Duration `json:"pause_remaining,omitempty"` TimeStamp time.Time `json:"ts"` } diff --git a/api/gen.go b/api/gen.go index 2a4ee271..47da4f7b 100644 --- a/api/gen.go +++ b/api/gen.go @@ -197,6 +197,7 @@ func main() { &schema{P: "jetstream/advisory/v1/terminated.json", St: "jsadvisory.JSConsumerDeliveryTerminatedAdvisoryV1"}, &schema{P: "jetstream/advisory/v1/stream_action.json", St: "jsadvisory.JSStreamActionAdvisoryV1"}, &schema{P: "jetstream/advisory/v1/consumer_action.json", St: "jsadvisory.JSConsumerActionAdvisoryV1"}, + &schema{P: "jetstream/advisory/v1/consumer_pause.json", St: "jsadvisory.JSConsumerPauseAdvisoryV1"}, &schema{P: "jetstream/advisory/v1/snapshot_create.json", St: "jsadvisory.JSSnapshotCreateAdvisoryV1"}, &schema{P: "jetstream/advisory/v1/snapshot_complete.json", St: "jsadvisory.JSSnapshotCompleteAdvisoryV1"}, &schema{P: "jetstream/advisory/v1/restore_create.json", St: "jsadvisory.JSRestoreCreateAdvisoryV1"}, @@ -223,6 +224,8 @@ func main() { &schema{P: "jetstream/api/v1/consumer_names_response.json", St: "JSApiConsumerNamesResponse"}, &schema{P: "jetstream/api/v1/consumer_getnext_request.json", St: "JSApiConsumerGetNextRequest"}, &schema{P: "jetstream/api/v1/consumer_leader_stepdown_response.json", St: "JSApiConsumerLeaderStepDownResponse"}, + &schema{P: "jetstream/api/v1/consumer_pause_request.json", St: "JSApiConsumerPauseRequest"}, + &schema{P: "jetstream/api/v1/consumer_pause_response.json", St: "JSApiConsumerPauseResponse"}, &schema{P: "jetstream/api/v1/stream_create_request.json", St: "JSApiStreamCreateRequest"}, &schema{P: "jetstream/api/v1/stream_create_response.json", St: "JSApiStreamCreateResponse"}, &schema{P: "jetstream/api/v1/stream_delete_response.json", St: "JSApiStreamDeleteResponse"}, diff --git a/api/jetstream/advisory/consumer_pause.go b/api/jetstream/advisory/consumer_pause.go new file mode 100644 index 00000000..e6c980c5 --- /dev/null +++ b/api/jetstream/advisory/consumer_pause.go @@ -0,0 +1,42 @@ +package advisory + +import ( + "time" + + "github.com/nats-io/jsm.go/api/event" +) + +// JSConsumerPauseAdvisoryV1 indicates that a consumer was paused or unpaused +type JSConsumerPauseAdvisoryV1 struct { + event.NATSEvent + + Stream string `json:"stream"` + Consumer string `json:"consumer"` + Paused bool `json:"paused"` + PauseUntil time.Time `json:"pause_until,omitempty"` + Domain string `json:"domain,omitempty"` +} + +func init() { + err := event.RegisterTextCompactTemplate("io.nats.jetstream.advisory.v1.consumer_pause", `{{ .Time | ShortTime }} [Consumer Pause] Consumer: {{ .Stream }} > {{ .Consumer }} Paused: {{ .Paused }}{{ if .Paused }} until {{ .PauseUntil }}{{ end }}`) + if err != nil { + panic(err) + } + + err = event.RegisterTextExtendedTemplate("io.nats.jetstream.advisory.v1.consumer_pause", ` +[{{ .Time | ShortTime }}] [{{ .ID }}] Consumer Pause + + Stream: {{ .Stream }} + Consumer: {{ .Consumer }} + Paused: {{ .Paused }} +{{- if .Paused }} + Until: {{ .PauseUntil }} +{{- end }} +{{- if .Domain }} + Domain: {{ .Domain }} +{{- end }} +`) + if err != nil { + panic(err) + } +} diff --git a/api/schemas_generated.go b/api/schemas_generated.go index f3422b3b..2f7eac79 100644 --- a/api/schemas_generated.go +++ b/api/schemas_generated.go @@ -1,4 +1,4 @@ -// auto generated 2024-02-08 10:37:12.960275 +0100 CET m=+0.008558457 +// auto generated 2024-02-13 15:58:31.227879 +0100 CET m=+0.013734209 package api @@ -22,6 +22,7 @@ var schemaTypes = map[string]func() any{ "io.nats.jetstream.advisory.v1.terminated": func() any { return &jsadvisory.JSConsumerDeliveryTerminatedAdvisoryV1{} }, "io.nats.jetstream.advisory.v1.stream_action": func() any { return &jsadvisory.JSStreamActionAdvisoryV1{} }, "io.nats.jetstream.advisory.v1.consumer_action": func() any { return &jsadvisory.JSConsumerActionAdvisoryV1{} }, + "io.nats.jetstream.advisory.v1.consumer_pause": func() any { return &jsadvisory.JSConsumerPauseAdvisoryV1{} }, "io.nats.jetstream.advisory.v1.snapshot_create": func() any { return &jsadvisory.JSSnapshotCreateAdvisoryV1{} }, "io.nats.jetstream.advisory.v1.snapshot_complete": func() any { return &jsadvisory.JSSnapshotCompleteAdvisoryV1{} }, "io.nats.jetstream.advisory.v1.restore_create": func() any { return &jsadvisory.JSRestoreCreateAdvisoryV1{} }, @@ -48,6 +49,8 @@ var schemaTypes = map[string]func() any{ "io.nats.jetstream.api.v1.consumer_names_response": func() any { return &JSApiConsumerNamesResponse{} }, "io.nats.jetstream.api.v1.consumer_getnext_request": func() any { return &JSApiConsumerGetNextRequest{} }, "io.nats.jetstream.api.v1.consumer_leader_stepdown_response": func() any { return &JSApiConsumerLeaderStepDownResponse{} }, + "io.nats.jetstream.api.v1.consumer_pause_request": func() any { return &JSApiConsumerPauseRequest{} }, + "io.nats.jetstream.api.v1.consumer_pause_response": func() any { return &JSApiConsumerPauseResponse{} }, "io.nats.jetstream.api.v1.stream_create_request": func() any { return &JSApiStreamCreateRequest{} }, "io.nats.jetstream.api.v1.stream_create_response": func() any { return &JSApiStreamCreateResponse{} }, "io.nats.jetstream.api.v1.stream_delete_response": func() any { return &JSApiStreamDeleteResponse{} }, @@ -480,6 +483,62 @@ func (t JSApiConsumerLeaderStepDownResponse) Schema() ([]byte, error) { return scfs.Load(f) } +// Validate performs a JSON Schema validation of the configuration +func (t JSApiConsumerPauseRequest) Validate(v ...StructValidator) (valid bool, errors []string) { + if len(v) == 0 || v[0] == nil { + return true, nil + } + + return v[0].ValidateStruct(t, t.SchemaType()) +} + +// SchemaType is the NATS schema type io.nats.jetstream.api.v1.consumer_pause_request +func (t JSApiConsumerPauseRequest) SchemaType() string { + return "io.nats.jetstream.api.v1.consumer_pause_request" +} + +// SchemaID is the url to the JSON Schema for JetStream Consumer Configuration +func (t JSApiConsumerPauseRequest) SchemaID() string { + return "https://raw.githubusercontent.com/nats-io/jsm.go/master/schemas/jetstream/api/v1/consumer_pause_request.json" +} + +// Schema is a JSON Schema document for the JetStream Consumer Configuration +func (t JSApiConsumerPauseRequest) Schema() ([]byte, error) { + f, err := SchemaFileForType(t.SchemaType()) + if err != nil { + return nil, err + } + return scfs.Load(f) +} + +// Validate performs a JSON Schema validation of the configuration +func (t JSApiConsumerPauseResponse) Validate(v ...StructValidator) (valid bool, errors []string) { + if len(v) == 0 || v[0] == nil { + return true, nil + } + + return v[0].ValidateStruct(t, t.SchemaType()) +} + +// SchemaType is the NATS schema type io.nats.jetstream.api.v1.consumer_pause_response +func (t JSApiConsumerPauseResponse) SchemaType() string { + return "io.nats.jetstream.api.v1.consumer_pause_response" +} + +// SchemaID is the url to the JSON Schema for JetStream Consumer Configuration +func (t JSApiConsumerPauseResponse) SchemaID() string { + return "https://raw.githubusercontent.com/nats-io/jsm.go/master/schemas/jetstream/api/v1/consumer_pause_response.json" +} + +// Schema is a JSON Schema document for the JetStream Consumer Configuration +func (t JSApiConsumerPauseResponse) Schema() ([]byte, error) { + f, err := SchemaFileForType(t.SchemaType()) + if err != nil { + return nil, err + } + return scfs.Load(f) +} + // Validate performs a JSON Schema validation of the configuration func (t JSApiStreamCreateRequest) Validate(v ...StructValidator) (valid bool, errors []string) { if len(v) == 0 || v[0] == nil { diff --git a/consumers.go b/consumers.go index 4fa341d4..5e342e44 100644 --- a/consumers.go +++ b/consumers.go @@ -618,6 +618,13 @@ func ConsumerMetadata(meta map[string]string) ConsumerOption { } } +func PauseUntil(deadline time.Time) ConsumerOption { + return func(o *api.ConsumerConfig) error { + o.PauseUntil = deadline + return nil + } +} + // UpdateConfiguration updates the consumer configuration // At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error { @@ -888,6 +895,43 @@ func (c *Consumer) LeaderStepDown() error { return nil } +// Pause requests a consumer be paused until the deadline, if it fails to pause an error is returned. +// +// A common reason for failures is when a time is supplied that is in the past from the perspective of the server +func (c *Consumer) Pause(deadline time.Time) (*api.JSApiConsumerPauseResponse, error) { + var resp *api.JSApiConsumerPauseResponse + req := api.JSApiConsumerPauseRequest{ + PauseUntil: deadline, + } + + err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), &req, &resp) + if err != nil { + return nil, err + } + + if !resp.Paused { + return nil, fmt.Errorf("pause request failed, perhaps due to a time in the past") + } + + return resp, nil +} + +// Resume requests the server resumes a paused consumer +func (c *Consumer) Resume() error { + var resp *api.JSApiConsumerPauseResponse + + err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), nil, &resp) + if err != nil { + return err + } + + if resp.Paused { + return fmt.Errorf("pause request failed for an unknown reason") + } + + return nil +} + func (c *Consumer) Name() string { return c.name } func (c *Consumer) IsSampled() bool { return c.SampleFrequency() != "" } func (c *Consumer) IsPullMode() bool { return c.cfg.DeliverSubject == "" } @@ -922,6 +966,7 @@ func (c *Consumer) InactiveThreshold() time.Duration { return c.cfg.InactiveThre func (c *Consumer) Replicas() int { return c.cfg.Replicas } func (c *Consumer) Metadata() map[string]string { return c.cfg.Metadata } func (c *Consumer) MemoryStorage() bool { return c.cfg.MemoryStorage } +func (c *Consumer) PauseUntil() time.Time { return c.cfg.PauseUntil } func (c *Consumer) StartTime() time.Time { if c.cfg.OptStartTime == nil { return time.Time{} diff --git a/go.mod b/go.mod index 3b7369e6..cb0e8da3 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/expr-lang/expr v1.15.8 github.com/google/go-cmp v0.6.0 - github.com/klauspost/compress v1.17.5 - github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88 - github.com/nats-io/nats.go v1.32.0 + github.com/klauspost/compress v1.17.6 + github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba + github.com/nats-io/nats.go v1.33.1 github.com/nats-io/nuid v1.0.1 golang.org/x/net v0.20.0 golang.org/x/text v0.14.0 @@ -18,10 +18,10 @@ require ( require ( github.com/kr/pretty v0.1.0 // indirect github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.3 // indirect + github.com/nats-io/jwt/v2 v2.5.4 // indirect github.com/nats-io/nkeys v0.4.7 // indirect - golang.org/x/crypto v0.18.0 // indirect - golang.org/x/sys v0.16.0 // indirect + golang.org/x/crypto v0.19.0 // indirect + golang.org/x/sys v0.17.0 // indirect golang.org/x/time v0.5.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/go.sum b/go.sum index 5bdbef57..7f91d8a0 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/expr-lang/expr v1.15.8 h1:FL8+d3rSSP4tmK9o+vKfSMqqpGL8n15pEPiHcnBpxoI github.com/expr-lang/expr v1.15.8/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E= -github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= +github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -14,25 +14,25 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= -github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88 h1:mQUXBh1zwlTogpLmb3F8wJC/OrJlgQ2j76LD1BVHp64= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA= -github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0= -github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI= +github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba h1:idIfFiRzXv2wHFpxHH4nSoPtg7UVr4vQPB1NraU0D4c= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba/go.mod h1:Co2t9J1pk4WXyMZiNFkLjFiD7hKE/jjsXtDWCyLfcgw= +github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= +github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= diff --git a/natscontext/testdata/nats/context/user_pass_token_creds.json b/natscontext/testdata/nats/context/user_pass_token_creds.json index ebac35ac..a7e3734c 100644 --- a/natscontext/testdata/nats/context/user_pass_token_creds.json +++ b/natscontext/testdata/nats/context/user_pass_token_creds.json @@ -16,5 +16,9 @@ "jetstream_event_prefix": "", "inbox_prefix": "", "user_jwt": "", - "color_scheme": "" + "color_scheme": "", + "tls_first": false, + "windows_cert_store": "", + "windows_cert_match_by": "", + "windows_cert_match": "" } \ No newline at end of file diff --git a/schema_source/jetstream/advisory/v1/consumer_pause.json b/schema_source/jetstream/advisory/v1/consumer_pause.json new file mode 100644 index 00000000..07cfca5c --- /dev/null +++ b/schema_source/jetstream/advisory/v1/consumer_pause.json @@ -0,0 +1,50 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_pause.json", + "description": "An Advisory sent when consumer is paused or resumed", + "title": "io.nats.jetstream.advisory.v1.consumer_pause", + "type": "object", + "required": [ + "type", + "id", + "timestamp", + "stream", + "consumer", + "paused" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.advisory.v1.consumer_pause" + }, + "id": { + "type": "string", + "description": "Unique correlation ID for this event" + }, + "timestamp": { + "type": "string", + "description": "The time this event was created in RFC3339 format" + }, + "stream": { + "type": "string", + "description": "The name of the Stream the Consumer belongs to" + }, + "consumer": { + "type": "string", + "description": "The name of the Consumer that elected a new leader" + }, + "paused": { + "type": "boolean", + "description": "Indicates the consumer is paused" + }, + "pause_until": { + "description": "When paused the time the consumer will be unpaused, RFC3339 format", + "type": "string" + }, + "domain": { + "description": "The domain hosting the Stream and Consumer if configured", + "type": "string" + } + } +} diff --git a/schema_source/jetstream/api/v1/account_purge_response.json b/schema_source/jetstream/api/v1/account_purge_response.json index ca9724da..95f9018b 100644 --- a/schema_source/jetstream/api/v1/account_purge_response.json +++ b/schema_source/jetstream/api/v1/account_purge_response.json @@ -13,7 +13,7 @@ "properties": { "initiated": { "type": "boolean", - "description": "If the purge operation was succesfully started", + "description": "If the purge operation was successfully started", "default": false } } diff --git a/schema_source/jetstream/api/v1/consumer_pause_request.json b/schema_source/jetstream/api/v1/consumer_pause_request.json new file mode 100644 index 00000000..e8ebdd4f --- /dev/null +++ b/schema_source/jetstream/api/v1/consumer_pause_request.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/api/v1/consumer_pause_request.json", + "description": "A request to the JetStream $JS.API.CONSUMER.PAUSE API", + "title": "io.nats.jetstream.api.v1.consumer_pause_request", + "type": "object", + "properties": { + "pause_until": { + "description": "Time to pause until, when empty or a time in the past will unpause the consumer", + "$ref": "definitions.json#/definitions/golang_time" + } + } +} \ No newline at end of file diff --git a/schema_source/jetstream/api/v1/consumer_pause_response.json b/schema_source/jetstream/api/v1/consumer_pause_response.json new file mode 100644 index 00000000..8a6f5a88 --- /dev/null +++ b/schema_source/jetstream/api/v1/consumer_pause_response.json @@ -0,0 +1,35 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/api/v1/consumer_pause_response.json", + "description": "A response from the JetStream $JS.API.CONSUMER.PAUSE API", + "title": "io.nats.jetstream.api.v1.consumer_pause_response", + "type": "object", + "oneOf": [ + { + "$ref": "definitions.json#/definitions/error_response" + }, + { + "type": "object", + "properties": { + "paused": { + "description": "Indicates if after parsing the pause_until property if the consumer was paused", + "type": "boolean" + }, + "pause_until": { + "description": "The deadline till the consumer will be unpaused, only usable if 'paused' is true", + "$ref": "definitions.json#/definitions/golang_time" + }, + "pause_remaining": { + "description": "When paused the time remaining until unpause", + "$ref": "definitions.json#/definitions/golang_duration_nanos" + } + } + } + ], + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.api.v1.consumer_pause_response" + } + } +} \ No newline at end of file diff --git a/schema_source/jetstream/api/v1/definitions.json b/schema_source/jetstream/api/v1/definitions.json index af216d07..b5ab89c3 100644 --- a/schema_source/jetstream/api/v1/definitions.json +++ b/schema_source/jetstream/api/v1/definitions.json @@ -636,6 +636,14 @@ "push_bound": { "description": "Indicates if any client is connected and receiving messages from a push consumer", "type": "boolean" + }, + "paused": { + "description": "Indicates if the consumer is currently in a paused state", + "type": "boolean" + }, + "pause_until": { + "description": "A deadline time for when the consumer will be paused. Only usable if 'paused' is true", + "$ref": "#/definitions/golang_time" } } }, @@ -780,6 +788,10 @@ "description": "Additional metadata for the Consumer", "type": "object", "additionalProperties": { "type": "string" } + }, + "pause_until": { + "description": "When creating a consumer supplying a time in the future will act as a deadline for when the consumer will be paused till", + "$ref": "#/definitions/golang_time" } } }, diff --git a/schemas/jetstream/advisory/v1/consumer_pause.json b/schemas/jetstream/advisory/v1/consumer_pause.json new file mode 100644 index 00000000..07cfca5c --- /dev/null +++ b/schemas/jetstream/advisory/v1/consumer_pause.json @@ -0,0 +1,50 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_pause.json", + "description": "An Advisory sent when consumer is paused or resumed", + "title": "io.nats.jetstream.advisory.v1.consumer_pause", + "type": "object", + "required": [ + "type", + "id", + "timestamp", + "stream", + "consumer", + "paused" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.advisory.v1.consumer_pause" + }, + "id": { + "type": "string", + "description": "Unique correlation ID for this event" + }, + "timestamp": { + "type": "string", + "description": "The time this event was created in RFC3339 format" + }, + "stream": { + "type": "string", + "description": "The name of the Stream the Consumer belongs to" + }, + "consumer": { + "type": "string", + "description": "The name of the Consumer that elected a new leader" + }, + "paused": { + "type": "boolean", + "description": "Indicates the consumer is paused" + }, + "pause_until": { + "description": "When paused the time the consumer will be unpaused, RFC3339 format", + "type": "string" + }, + "domain": { + "description": "The domain hosting the Stream and Consumer if configured", + "type": "string" + } + } +} diff --git a/schemas/jetstream/api/v1/account_purge_response.json b/schemas/jetstream/api/v1/account_purge_response.json index bcad6316..b3fa4d5f 100644 --- a/schemas/jetstream/api/v1/account_purge_response.json +++ b/schemas/jetstream/api/v1/account_purge_response.json @@ -42,7 +42,7 @@ "properties": { "initiated": { "type": "boolean", - "description": "If the purge operation was succesfully started", + "description": "If the purge operation was successfully started", "default": false } } diff --git a/schemas/jetstream/api/v1/consumer_configuration.json b/schemas/jetstream/api/v1/consumer_configuration.json index f8599a68..f9b0017c 100644 --- a/schemas/jetstream/api/v1/consumer_configuration.json +++ b/schemas/jetstream/api/v1/consumer_configuration.json @@ -278,6 +278,12 @@ "additionalProperties": { "type": "string" } + }, + "pause_until": { + "description": "When creating a consumer supplying a time in the future will act as a deadline for when the consumer will be paused till", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } } diff --git a/schemas/jetstream/api/v1/consumer_create_request.json b/schemas/jetstream/api/v1/consumer_create_request.json index 7ded4791..a44e0dbc 100644 --- a/schemas/jetstream/api/v1/consumer_create_request.json +++ b/schemas/jetstream/api/v1/consumer_create_request.json @@ -290,6 +290,12 @@ "additionalProperties": { "type": "string" } + }, + "pause_until": { + "description": "When creating a consumer supplying a time in the future will act as a deadline for when the consumer will be paused till", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_create_response.json b/schemas/jetstream/api/v1/consumer_create_response.json index d76dcbca..cb8511f9 100644 --- a/schemas/jetstream/api/v1/consumer_create_response.json +++ b/schemas/jetstream/api/v1/consumer_create_response.json @@ -312,6 +312,12 @@ "additionalProperties": { "type": "string" } + }, + "pause_until": { + "description": "When creating a consumer supplying a time in the future will act as a deadline for when the consumer will be paused till", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, @@ -469,6 +475,16 @@ "push_bound": { "description": "Indicates if any client is connected and receiving messages from a push consumer", "type": "boolean" + }, + "paused": { + "description": "Indicates if the consumer is currently in a paused state", + "type": "boolean" + }, + "pause_until": { + "description": "A deadline time for when the consumer will be paused. Only usable if 'paused' is true", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_info_response.json b/schemas/jetstream/api/v1/consumer_info_response.json index 48ffb368..c43102a9 100644 --- a/schemas/jetstream/api/v1/consumer_info_response.json +++ b/schemas/jetstream/api/v1/consumer_info_response.json @@ -312,6 +312,12 @@ "additionalProperties": { "type": "string" } + }, + "pause_until": { + "description": "When creating a consumer supplying a time in the future will act as a deadline for when the consumer will be paused till", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, @@ -469,6 +475,16 @@ "push_bound": { "description": "Indicates if any client is connected and receiving messages from a push consumer", "type": "boolean" + }, + "paused": { + "description": "Indicates if the consumer is currently in a paused state", + "type": "boolean" + }, + "pause_until": { + "description": "A deadline time for when the consumer will be paused. Only usable if 'paused' is true", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_list_response.json b/schemas/jetstream/api/v1/consumer_list_response.json index fc6a00bd..dd6e06b6 100644 --- a/schemas/jetstream/api/v1/consumer_list_response.json +++ b/schemas/jetstream/api/v1/consumer_list_response.json @@ -377,6 +377,12 @@ "additionalProperties": { "type": "string" } + }, + "pause_until": { + "description": "When creating a consumer supplying a time in the future will act as a deadline for when the consumer will be paused till", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, @@ -534,6 +540,16 @@ "push_bound": { "description": "Indicates if any client is connected and receiving messages from a push consumer", "type": "boolean" + }, + "paused": { + "description": "Indicates if the consumer is currently in a paused state", + "type": "boolean" + }, + "pause_until": { + "description": "A deadline time for when the consumer will be paused. Only usable if 'paused' is true", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_pause_request.json b/schemas/jetstream/api/v1/consumer_pause_request.json new file mode 100644 index 00000000..b0849dc3 --- /dev/null +++ b/schemas/jetstream/api/v1/consumer_pause_request.json @@ -0,0 +1,15 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/api/v1/consumer_pause_request.json", + "description": "A request to the JetStream $JS.API.CONSUMER.PAUSE API", + "title": "io.nats.jetstream.api.v1.consumer_pause_request", + "type": "object", + "properties": { + "pause_until": { + "description": "Time to pause until, when empty or a time in the past will unpause the consumer", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" + } + } +} diff --git a/schemas/jetstream/api/v1/consumer_pause_response.json b/schemas/jetstream/api/v1/consumer_pause_response.json new file mode 100644 index 00000000..c17fbb0b --- /dev/null +++ b/schemas/jetstream/api/v1/consumer_pause_response.json @@ -0,0 +1,69 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/api/v1/consumer_pause_response.json", + "description": "A response from the JetStream $JS.API.CONSUMER.PAUSE API", + "title": "io.nats.jetstream.api.v1.consumer_pause_response", + "type": "object", + "oneOf": [ + { + "type": "object", + "required": [ + "error" + ], + "properties": { + "error": { + "type": "object", + "required": [ + "code" + ], + "properties": { + "code": { + "type": "integer", + "description": "HTTP like error code in the 300 to 500 range", + "minimum": 300, + "maximum": 699 + }, + "description": { + "type": "string", + "description": "A human friendly description of the error" + }, + "err_code": { + "type": "integer", + "description": "The NATS error code unique to each kind of error", + "minimum": 0, + "maximum": 65535 + } + } + } + } + }, + { + "type": "object", + "properties": { + "paused": { + "description": "Indicates if after parsing the pause_until property if the consumer was paused", + "type": "boolean" + }, + "pause_until": { + "description": "The deadline till the consumer will be unpaused, only usable if 'paused' is true", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" + }, + "pause_remaining": { + "description": "When paused the time remaining until unpause", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 + } + } + } + ], + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.api.v1.consumer_pause_response" + } + } +}