From 347964ef335fdb23972167d71c18c92ffb1442a3 Mon Sep 17 00:00:00 2001 From: Samuel Attwood Date: Tue, 21 Jan 2025 03:09:25 -0500 Subject: [PATCH] Move stream controller to jsm.go for pedantic mode --- go.mod | 5 +- go.sum | 12 +- .../controller/account_controller_test.go | 2 +- internal/controller/client.go | 56 +++- internal/controller/consumer_controller.go | 2 +- .../controller/consumer_controller_test.go | 2 +- internal/controller/jetstream_controller.go | 53 ++++ .../controller/keyvalue_controller_test.go | 2 +- .../controller/objectstore_controller_test.go | 2 +- internal/controller/stream_controller.go | 286 ++++++++++++------ internal/controller/stream_controller_test.go | 90 +++--- internal/controller/suite_test.go | 2 +- .../apis/jetstream/v1beta2/streamtypes.go | 18 +- 13 files changed, 369 insertions(+), 163 deletions(-) diff --git a/go.mod b/go.mod index e4bd242e..8eb30738 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,8 @@ require ( github.com/fsnotify/fsnotify v1.8.0 github.com/go-logr/logr v1.4.2 github.com/google/go-cmp v0.6.0 - github.com/nats-io/jsm.go v0.1.2 - github.com/nats-io/nats-server/v2 v2.10.24 + github.com/nats-io/jsm.go v0.1.1-0.20250120135113-0db99fd62ad9 + github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26 github.com/nats-io/nats.go v1.38.0 github.com/onsi/ginkgo/v2 v2.22.2 github.com/onsi/gomega v1.36.2 @@ -42,6 +42,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.9 // indirect + github.com/google/go-tpm v0.9.3 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/go.sum b/go.sum index 46225f7c..40c02964 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -41,6 +43,8 @@ github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcb github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= +github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -73,12 +77,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/jsm.go v0.1.2 h1:T4Fq88a03sPAPWYwrOLQ85oanYsC2Bs6517rUiWBMpQ= -github.com/nats-io/jsm.go v0.1.2/go.mod h1:tnubE70CAKi5TNfQiq6XHFqWTuSIe1H7X4sDwfq6ZK8= +github.com/nats-io/jsm.go v0.1.1-0.20250120135113-0db99fd62ad9 h1:2DfL+nZjlB8kQh5GN5IEBdVroHbRaRod3BAi07Ag89Q= +github.com/nats-io/jsm.go v0.1.1-0.20250120135113-0db99fd62ad9/go.mod h1:w/SA3/rNK5xl6ZsCqKLVgQzVLfxbYNScle8LcQ5gSBM= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= -github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= -github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26 h1:mN0eraizaHih90T32ItWe58+l31eVjoD6JOu99WqdNc= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26/go.mod h1:nXRZ6eQo2lmNpZLVNIMDNwKM7FgbHgPJ1pIRcPOpVuk= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= diff --git a/internal/controller/account_controller_test.go b/internal/controller/account_controller_test.go index 2067b758..e2388de8 100644 --- a/internal/controller/account_controller_test.go +++ b/internal/controller/account_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/internal/controller/client.go b/internal/controller/client.go index 7e79a658..7667cec7 100644 --- a/internal/controller/client.go +++ b/internal/controller/client.go @@ -3,6 +3,7 @@ package controller import ( "fmt" + "github.com/nats-io/jsm.go" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) @@ -62,13 +63,55 @@ type Closable interface { Close() } +func CreateJSMClient(cfg *NatsConfig, pedantic bool) (*jsm.Manager, Closable, error) { + nc, err := createNatsConn(cfg, pedantic) + if err != nil { + return nil, nil, fmt.Errorf("create nats connection: %w", err) + } + + major, minor, _, err := versionComponents(nc.ConnectedServerVersion()) + if err != nil { + return nil, nil, fmt.Errorf("parse server version: %w", err) + } + + // JetStream pedantic mode unsupported prior to NATS Server 2.11 + if pedantic && major < 2 || (major == 2 && minor < 11) { + pedantic = false + } + + jsmOpts := make([]jsm.Option, 0) + if pedantic { + jsmOpts = append(jsmOpts, jsm.WithPedanticRequests()) + } + + jsmClient, err := jsm.New(nc, jsmOpts...) + if err != nil { + return nil, nil, fmt.Errorf("new jsm client: %w", err) + } + + return jsmClient, nc, nil +} + // CreateJetStreamClient creates new Jetstream client with a connection based on the given NatsConfig. // Returns a jetstream.Jetstream client and the Closable of the underlying connection. // Close should be called when the client is no longer used. func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, Closable, error) { + nc, err := createNatsConn(cfg, pedantic) + if err != nil { + return nil, nil, fmt.Errorf("create nats connection: %w", err) + } + + js, err := jetstream.New(nc) + if err != nil { + return nil, nil, fmt.Errorf("new jetstream: %w", err) + } + return js, nc, nil +} + +func createNatsConn(cfg *NatsConfig, pedantic bool) (*nats.Conn, error) { opts, err := cfg.buildOptions() if err != nil { - return nil, nil, fmt.Errorf("nats options: %w", err) + return nil, err } // Set pedantic option @@ -82,14 +125,5 @@ func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, // client should always attempt to reconnect opts = append(opts, nats.MaxReconnects(-1)) - nc, err := nats.Connect(cfg.ServerURL, opts...) - if err != nil { - return nil, nil, fmt.Errorf("nats connect: %w", err) - } - - js, err := jetstream.New(nc) - if err != nil { - return nil, nil, fmt.Errorf("new jetstream: %w", err) - } - return js, nc, nil + return nats.Connect(cfg.ServerURL, opts...) } diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index 21efa23f..f0aebbe2 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/internal/controller/consumer_controller_test.go b/internal/controller/consumer_controller_test.go index 5b72f579..fa11ba9f 100644 --- a/internal/controller/consumer_controller_test.go +++ b/internal/controller/consumer_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/internal/controller/jetstream_controller.go b/internal/controller/jetstream_controller.go index 05fa338b..202b6973 100644 --- a/internal/controller/jetstream_controller.go +++ b/internal/controller/jetstream_controller.go @@ -2,14 +2,19 @@ package controller import ( "context" + "errors" "fmt" "math/rand/v2" "os" "path/filepath" + "regexp" + "strconv" "strings" "time" "github.com/google/go-cmp/cmp" + "github.com/nats-io/jsm.go" + jsmapi "github.com/nats-io/jsm.go/api" js "github.com/nats-io/nack/controllers/jetstream" api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" "github.com/nats-io/nats.go/jetstream" @@ -19,6 +24,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`) + type JetStreamController interface { client.Client @@ -36,6 +43,9 @@ type JetStreamController interface { // Returns the error of the operation or errors during client setup. WithJetStreamClient(opts api.ConnectionOpts, ns string, op func(js jetstream.JetStream) error) error + // WithJSMClient provides a jsm.go client to the given operation. + WithJSMClient(opts api.ConnectionOpts, ns string, op func(jsm *jsm.Manager) error) error + RequeueInterval() time.Duration } @@ -76,6 +86,21 @@ func (c *jsController) ValidNamespace(namespace string) bool { return ns == "" || ns == namespace } +func (c *jsController) WithJSMClient(opts api.ConnectionOpts, ns string, op func(js *jsm.Manager) error) error { + cfg, err := c.natsConfigFromOpts(opts, ns) + if err != nil { + return err + } + + jsmClient, closer, err := CreateJSMClient(cfg, true) + if err != nil { + return fmt.Errorf("create jsm client: %w", err) + } + defer closer.Close() + + return op(jsmClient) +} + func (c *jsController) WithJetStreamClient(opts api.ConnectionOpts, ns string, op func(js jetstream.JetStream) error) error { // Build single use client // TODO(future-feature): Use client-pool instead of single use client @@ -284,3 +309,31 @@ func jsonString(v string) []byte { func compareConfigState(actual any, desired any) string { return cmp.Diff(actual, desired) } + +func getErrCode(err error) uint16 { + if apiErr, ok := err.(jsmapi.ApiError); ok { + return apiErr.NatsErrorCode() + } + + return 0 +} + +func versionComponents(version string) (major, minor, patch int, err error) { + m := semVerRe.FindStringSubmatch(version) + if m == nil { + return 0, 0, 0, errors.New("invalid semver") + } + major, err = strconv.Atoi(m[1]) + if err != nil { + return -1, -1, -1, err + } + minor, err = strconv.Atoi(m[2]) + if err != nil { + return -1, -1, -1, err + } + patch, err = strconv.Atoi(m[3]) + if err != nil { + return -1, -1, -1, err + } + return major, minor, patch, err +} diff --git a/internal/controller/keyvalue_controller_test.go b/internal/controller/keyvalue_controller_test.go index 82cac273..088a20c2 100644 --- a/internal/controller/keyvalue_controller_test.go +++ b/internal/controller/keyvalue_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/internal/controller/objectstore_controller_test.go b/internal/controller/objectstore_controller_test.go index e3a0d999..22de4d16 100644 --- a/internal/controller/objectstore_controller_test.go +++ b/internal/controller/objectstore_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/internal/controller/stream_controller.go b/internal/controller/stream_controller.go index 49d18554..4b516c47 100644 --- a/internal/controller/stream_controller.go +++ b/internal/controller/stream_controller.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import ( "time" "github.com/go-logr/logr" + "github.com/nats-io/jsm.go" + jsmapi "github.com/nats-io/jsm.go/api" api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" "github.com/nats-io/nats.go/jetstream" v1 "k8s.io/api/core/v1" @@ -135,8 +137,8 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st if !stream.Spec.PreventDelete && !r.ReadOnly() { log.Info("Deleting stream.") - err := r.WithJetStreamClient(stream.Spec.ConnectionOpts, stream.Namespace, func(js jetstream.JetStream) error { - _, err := getServerStreamState(ctx, js, stream) + err := r.WithJSMClient(stream.Spec.ConnectionOpts, stream.Namespace, func(js *jsm.Manager) error { + _, err := getServerStreamState(js, stream) // If we have no known state for this stream it has never been reconciled. // If we are also receiving an error fetching state, either the stream does not exist // or this resource config is invalid. @@ -144,7 +146,7 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st return nil } - return js.DeleteStream(ctx, stream.Spec.Name) + return js.DeleteStream(stream.Spec.Name) }) if errors.Is(err, jetstream.ErrStreamNotFound) { log.Info("Stream does not exist, unable to delete.", "streamName", stream.Spec.Name) @@ -180,14 +182,15 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, } // CreateOrUpdateStream is called on every reconciliation when the stream is not to be deleted. - err = r.WithJetStreamClient(stream.Spec.ConnectionOpts, stream.Namespace, func(js jetstream.JetStream) error { + err = r.WithJSMClient(stream.Spec.ConnectionOpts, stream.Namespace, func(js *jsm.Manager) error { storedState, err := getStoredStreamState(stream) if err != nil { log.Error(err, "Failed to fetch stored stream state") } - serverState, err := getServerStreamState(ctx, js, stream) + serverState, err := getServerStreamState(js, stream) if err != nil { + log.Info("get err", "err", err, "code", getErrCode(err)) return err } @@ -211,18 +214,28 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, return nil } - var updatedStream jetstream.Stream + var updatedStream *jsm.Stream err = nil if serverState == nil { log.Info("Creating Stream.") - updatedStream, err = js.CreateStream(ctx, targetConfig) + updatedStream, err = js.NewStream(stream.Spec.Name, targetConfig...) if err != nil { return err } } else if !stream.Spec.PreventUpdate { log.Info("Updating Stream.") - updatedStream, err = js.UpdateStream(ctx, targetConfig) + s, err := js.LoadStream(stream.Spec.Name) + if err != nil { + return err + } + + err = s.UpdateConfiguration(*serverState, targetConfig...) + if err != nil { + return err + } + + updatedStream, err = js.LoadStream(stream.Spec.Name) if err != nil { return err } @@ -234,7 +247,7 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, if updatedStream != nil { // Store known state in annotation - updatedState, err := json.Marshal(updatedStream.CachedInfo().Config) + updatedState, err := json.Marshal(updatedStream.Configuration()) if err != nil { return err } @@ -274,8 +287,8 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, return nil } -func getStoredStreamState(stream *api.Stream) (*jetstream.StreamConfig, error) { - var storedState *jetstream.StreamConfig +func getStoredStreamState(stream *api.Stream) (*jsmapi.StreamConfig, error) { + var storedState *jsmapi.StreamConfig if state, ok := stream.Annotations[stateAnnotationStream]; ok { err := json.Unmarshal([]byte(state), &storedState) if err != nil { @@ -287,155 +300,218 @@ func getStoredStreamState(stream *api.Stream) (*jetstream.StreamConfig, error) { } // Fetch the current state of the stream from the server. -// ErrStreamNotFound is considered a valid response and does not return error -func getServerStreamState(ctx context.Context, js jetstream.JetStream, stream *api.Stream) (*jetstream.StreamConfig, error) { - s, err := js.Stream(ctx, stream.Spec.Name) - if errors.Is(err, jetstream.ErrStreamNotFound) { +// JSStreamNotFoundErr is considered a valid response and does not return error +func getServerStreamState(jsm *jsm.Manager, stream *api.Stream) (*jsmapi.StreamConfig, error) { + s, err := jsm.LoadStream(stream.Spec.Name) + // 10059 -> JSStreamNotFoundErr + if jsmapi.IsNatsErr(err, 10059) { return nil, nil } if err != nil { return nil, err } - return &s.CachedInfo().Config, nil + streamCfg := s.Configuration() + return &streamCfg, nil } // streamSpecToConfig creates a jetstream.StreamConfig matching the given stream resource spec -func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) { - // Set directly mapped fields - config := jetstream.StreamConfig{ - Name: spec.Name, - Description: spec.Description, - Subjects: spec.Subjects, - MaxConsumers: spec.MaxConsumers, - MaxMsgs: int64(spec.MaxMsgs), - MaxBytes: int64(spec.MaxBytes), - DiscardNewPerSubject: spec.DiscardPerSubject, - MaxMsgsPerSubject: int64(spec.MaxMsgsPerSubject), - MaxMsgSize: int32(spec.MaxMsgSize), - Replicas: spec.Replicas, - NoAck: spec.NoAck, - Sealed: spec.Sealed, - DenyDelete: spec.DenyDelete, - DenyPurge: spec.DenyPurge, - AllowRollup: spec.AllowRollup, - FirstSeq: spec.FirstSequence, - AllowDirect: spec.AllowDirect, - MirrorDirect: spec.MirrorDirect, - Metadata: spec.Metadata, +func streamSpecToConfig(spec *api.StreamSpec) ([]jsm.StreamOption, error) { + opts := []jsm.StreamOption{ + jsm.StreamDescription(spec.Description), + jsm.Subjects(spec.Subjects...), + jsm.MaxConsumers(spec.MaxConsumers), + jsm.MaxMessages(int64(spec.MaxMsgs)), + jsm.MaxBytes(int64(spec.MaxBytes)), + jsm.MaxMessageSize(int32(spec.MaxMsgSize)), + jsm.Replicas(spec.Replicas), } // Set not directly mapped fields // retention - if spec.Retention != "" { - // Wrap string in " to be properly unmarshalled as json string - err := config.Retention.UnmarshalJSON(jsonString(spec.Retention)) - if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("invalid retention policy: %w", err) - } - } - - // discard - if spec.Discard != "" { - err := config.Discard.UnmarshalJSON(jsonString(spec.Discard)) - if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("invalid retention policy: %w", err) - } + switch spec.Retention { + case "limits": + opts = append(opts, jsm.LimitsRetention()) + case "interest": + opts = append(opts, jsm.InterestRetention()) + case "workqueue": + opts = append(opts, jsm.WorkQueueRetention()) + } + + // maxMsgsPerSubject + if spec.MaxMsgsPerSubject > 0 { + opts = append(opts, func(o *jsmapi.StreamConfig) error { + o.MaxMsgsPer = int64(spec.MaxMsgsPerSubject) + return nil + }) } // maxAge if spec.MaxAge != "" { d, err := time.ParseDuration(spec.MaxAge) if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("parse max age: %w", err) + return nil, fmt.Errorf("parse max age: %w", err) } - config.MaxAge = d + opts = append(opts, jsm.MaxAge(d)) } + // storage - if spec.Storage != "" { - err := config.Storage.UnmarshalJSON(jsonString(spec.Storage)) - if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("invalid storage: %w", err) - } + switch spec.Storage { + case "file": + opts = append(opts, jsm.FileStorage()) + case "memory": + opts = append(opts, jsm.MemoryStorage()) } - // duplicates + // discard + switch spec.Discard { + case "old": + opts = append(opts, jsm.DiscardOld()) + case "new": + opts = append(opts, jsm.DiscardNew()) + } + + // noAck + if spec.NoAck { + opts = append(opts, jsm.NoAck()) + } + + // duplicateWindow if spec.DuplicateWindow != "" { d, err := time.ParseDuration(spec.DuplicateWindow) if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("parse duplicate window: %w", err) + return nil, fmt.Errorf("parse duplicate window: %w", err) } - config.Duplicates = d + opts = append(opts, jsm.DuplicateWindow(d)) } // placement if spec.Placement != nil { - config.Placement = &jetstream.Placement{ - Cluster: spec.Placement.Cluster, - Tags: spec.Placement.Tags, + if spec.Placement.Cluster != "" { + opts = append(opts, jsm.PlacementCluster(spec.Placement.Cluster)) + } + if spec.Placement.Tags != nil { + opts = append(opts, jsm.PlacementTags(spec.Placement.Tags...)) } } // mirror if spec.Mirror != nil { - ss, err := mapStreamSource(spec.Mirror) + ss, err := mapJSMStreamSource(spec.Mirror) if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("map mirror stream source: %w", err) + return nil, fmt.Errorf("map mirror stream source: %w", err) } - config.Mirror = ss + opts = append(opts, jsm.Mirror(ss)) } // sources if spec.Sources != nil { - config.Sources = []*jetstream.StreamSource{} + streamSources := make([]*jsmapi.StreamSource, 0) for _, source := range spec.Sources { - s, err := mapStreamSource(source) + ss, err := mapJSMStreamSource(source) if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("map stream source: %w", err) + return nil, fmt.Errorf("map stream source: %w", err) } - config.Sources = append(config.Sources, s) + streamSources = append(streamSources, ss) } + + opts = append(opts, jsm.Sources(streamSources...)) } // compression - if spec.Compression != "" { - err := config.Compression.UnmarshalJSON(jsonString(spec.Compression)) - if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("invalid compression: %w", err) - } + switch spec.Compression { + case "s2": + opts = append(opts, jsm.Compression(jsmapi.S2Compression)) + case "none": + opts = append(opts, jsm.Compression(jsmapi.NoCompression)) } // subjectTransform if spec.SubjectTransform != nil { - config.SubjectTransform = &jetstream.SubjectTransformConfig{ + st := &jsmapi.SubjectTransformConfig{ Source: spec.SubjectTransform.Source, Destination: spec.SubjectTransform.Dest, } + + opts = append(opts, jsm.SubjectTransform(st)) } // rePublish if spec.RePublish != nil { - config.RePublish = &jetstream.RePublish{ + r := &jsmapi.RePublish{ Source: spec.RePublish.Source, Destination: spec.RePublish.Destination, HeadersOnly: spec.RePublish.HeadersOnly, } + + opts = append(opts, jsm.Republish(r)) + } + + if spec.Sealed { + opts = append(opts, func(o *jsmapi.StreamConfig) error { + o.Sealed = spec.Sealed + return nil + }) + } + + // denyDelete + if spec.DenyDelete { + opts = append(opts, jsm.DenyDelete()) + } + + // denyPurge + if spec.DenyPurge { + opts = append(opts, jsm.DenyPurge()) + } + + // allowDirect + if spec.AllowDirect { + opts = append(opts, jsm.AllowDirect()) + } + + // allowRollup + if spec.AllowRollup { + opts = append(opts, jsm.AllowRollup()) + } + + // mirrorDirect + if spec.MirrorDirect { + opts = append(opts, jsm.MirrorDirect()) + } + + // discardPerSubject + if spec.DiscardPerSubject { + opts = append(opts, jsm.DiscardNewPerSubject()) + } + + // firstSequence + if spec.FirstSequence > 0 { + opts = append(opts, jsm.FirstSequence(spec.FirstSequence)) + } + + // metadata + if spec.Metadata != nil { + opts = append(opts, jsm.StreamMetadata(spec.Metadata)) } // consumerLimits if spec.ConsumerLimits != nil { - inactiveThreshold, err := time.ParseDuration(spec.ConsumerLimits.InactiveThreshold) - if err != nil { - return jetstream.StreamConfig{}, fmt.Errorf("parse inactive threshold: %w", err) + cl := jsmapi.StreamConsumerLimits{ + MaxAckPending: spec.ConsumerLimits.MaxAckPending, } - config.ConsumerLimits = jetstream.StreamConsumerLimits{ - InactiveThreshold: inactiveThreshold, - MaxAckPending: spec.ConsumerLimits.MaxAckPending, + if spec.ConsumerLimits.InactiveThreshold != "" { + inactiveThreshold, err := time.ParseDuration(spec.ConsumerLimits.InactiveThreshold) + if err != nil { + return nil, fmt.Errorf("parse inactive threshold: %w", err) + } + cl.InactiveThreshold = inactiveThreshold } + + opts = append(opts, jsm.ConsumerLimits(cl)) } - return config, nil + return opts, nil } func mapStreamSource(ss *api.StreamSource) (*jetstream.StreamSource, error) { @@ -447,6 +523,7 @@ func mapStreamSource(ss *api.StreamSource) (*jetstream.StreamSource, error) { if ss.OptStartSeq > 0 { jss.OptStartSeq = uint64(ss.OptStartSeq) } + if ss.OptStartTime != "" { t, err := time.Parse(time.RFC3339, ss.OptStartTime) if err != nil { @@ -472,6 +549,41 @@ func mapStreamSource(ss *api.StreamSource) (*jetstream.StreamSource, error) { return jss, nil } +func mapJSMStreamSource(ss *api.StreamSource) (*jsmapi.StreamSource, error) { + jss := &jsmapi.StreamSource{ + Name: ss.Name, + FilterSubject: ss.FilterSubject, + } + + if ss.OptStartSeq > 0 { + jss.OptStartSeq = uint64(ss.OptStartSeq) + } + + if ss.OptStartTime != "" { + t, err := time.Parse(time.RFC3339, ss.OptStartTime) + if err != nil { + return nil, fmt.Errorf("parse opt start time: %w", err) + } + jss.OptStartTime = &t + } + + if ss.ExternalAPIPrefix != "" || ss.ExternalDeliverPrefix != "" { + jss.External = &jsmapi.ExternalStream{ + ApiPrefix: ss.ExternalAPIPrefix, + DeliverPrefix: ss.ExternalDeliverPrefix, + } + } + + for _, transform := range ss.SubjectTransforms { + jss.SubjectTransforms = append(jss.SubjectTransforms, jsmapi.SubjectTransformConfig{ + Source: transform.Source, + Destination: transform.Dest, + }) + } + + return jss, nil +} + // SetupWithManager sets up the controller with the Manager. func (r *StreamReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/internal/controller/stream_controller_test.go b/internal/controller/stream_controller_test.go index b9ac6c6b..c37e158f 100644 --- a/internal/controller/stream_controller_test.go +++ b/internal/controller/stream_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import ( "testing" "time" + jsmapi "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats.go/jetstream" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -215,7 +216,7 @@ var _ = Describe("Stream Controller", func() { }) It("should not create the stream", func(ctx SpecContext) { _, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) - Expect(err.Error()).To(HaveSuffix("can not be sealed")) + Expect(err.Error()).To(HaveSuffix("can not be sealed (10052)")) }) }) @@ -579,13 +580,13 @@ func Test_mapSpecToConfig(t *testing.T) { tests := []struct { name string spec *api.StreamSpec - want jetstream.StreamConfig + want jsmapi.StreamConfig wantErr bool }{ { name: "empty spec", spec: &api.StreamSpec{}, - want: jetstream.StreamConfig{}, + want: jsmapi.StreamConfig{}, wantErr: false, }, { @@ -617,7 +618,6 @@ func Test_mapSpecToConfig(t *testing.T) { Dest: "transform-dest", }}, }, - Name: "stream-name", NoAck: true, Placement: &api.StreamPlacement{ Cluster: "test-cluster", @@ -665,75 +665,72 @@ func Test_mapSpecToConfig(t *testing.T) { }, }, }, - want: jetstream.StreamConfig{ - Name: "stream-name", - Description: "stream description", - Subjects: []string{"orders.*"}, - Retention: jetstream.InterestPolicy, - MaxConsumers: -1, - MaxMsgs: -1, - MaxBytes: -1, - Discard: jetstream.DiscardNew, - DiscardNewPerSubject: true, - MaxAge: time.Second * 30, - MaxMsgsPerSubject: 10, - MaxMsgSize: -1, - Storage: jetstream.FileStorage, - Replicas: 3, - NoAck: true, - Duplicates: time.Second * 5, - Placement: &jetstream.Placement{ + want: jsmapi.StreamConfig{ + Description: "stream description", + Subjects: []string{"orders.*"}, + Retention: jsmapi.InterestPolicy, + MaxConsumers: -1, + MaxMsgs: -1, + MaxBytes: -1, + Discard: jsmapi.DiscardNew, + DiscardNewPer: true, + MaxAge: time.Second * 30, + MaxMsgsPer: 10, + MaxMsgSize: -1, + Storage: jsmapi.FileStorage, + Replicas: 3, + NoAck: true, + Duplicates: time.Second * 5, + Placement: &jsmapi.Placement{ Cluster: "test-cluster", Tags: []string{"tag"}, }, - Mirror: &jetstream.StreamSource{ + Mirror: &jsmapi.StreamSource{ Name: "mirror", OptStartSeq: 5, OptStartTime: &date, FilterSubject: "orders", - SubjectTransforms: []jetstream.SubjectTransformConfig{{ + SubjectTransforms: []jsmapi.SubjectTransformConfig{{ Source: "transform-source", Destination: "transform-dest", }}, - External: &jetstream.ExternalStream{ - APIPrefix: "api", + External: &jsmapi.ExternalStream{ + ApiPrefix: "api", DeliverPrefix: "deliver", }, - Domain: "", }, - Sources: []*jetstream.StreamSource{{ + Sources: []*jsmapi.StreamSource{{ Name: "source", OptStartSeq: 5, OptStartTime: &date, FilterSubject: "orders", - SubjectTransforms: []jetstream.SubjectTransformConfig{{ + SubjectTransforms: []jsmapi.SubjectTransformConfig{{ Source: "transform-source", Destination: "transform-dest", }}, - External: &jetstream.ExternalStream{ - APIPrefix: "api", + External: &jsmapi.ExternalStream{ + ApiPrefix: "api", DeliverPrefix: "deliver", }, - Domain: "", }}, - Sealed: false, - DenyDelete: true, - DenyPurge: true, - AllowRollup: true, - Compression: jetstream.S2Compression, - FirstSeq: 42, - SubjectTransform: &jetstream.SubjectTransformConfig{ + Sealed: false, + DenyDelete: true, + DenyPurge: true, + RollupAllowed: true, + Compression: jsmapi.S2Compression, + FirstSeq: 42, + SubjectTransform: &jsmapi.SubjectTransformConfig{ Source: "transform-source", Destination: "transform-dest", }, - RePublish: &jetstream.RePublish{ + RePublish: &jsmapi.RePublish{ Source: "re-publish-source", Destination: "re-publish-dest", HeadersOnly: true, }, AllowDirect: true, MirrorDirect: false, - ConsumerLimits: jetstream.StreamConsumerLimits{}, + ConsumerLimits: jsmapi.StreamConsumerLimits{}, Metadata: map[string]string{ "meta": "data", }, @@ -745,14 +742,19 @@ func Test_mapSpecToConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { assert := assert.New(t) - got, err := streamSpecToConfig(tt.spec) + sOpts, err := streamSpecToConfig(tt.spec) if (err != nil) != tt.wantErr { t.Errorf("streamSpecToConfig() error = %v, wantErr %v", err, tt.wantErr) return } + got := &jsmapi.StreamConfig{} + for _, o := range sOpts { + o(got) + } + // Compare nested structs - assert.EqualValues(tt.want, got) + assert.EqualValues(tt.want, *got) }) } } diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index a445def9..562cb278 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go index 964a5e27..3476318a 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go @@ -27,32 +27,32 @@ type StreamSpec struct { Subjects []string `json:"subjects"` Retention string `json:"retention"` MaxConsumers int `json:"maxConsumers"` + MaxMsgsPerSubject int `json:"maxMsgsPerSubject"` MaxMsgs int `json:"maxMsgs"` MaxBytes int `json:"maxBytes"` - Discard string `json:"discard"` - DiscardPerSubject bool `json:"discardPerSubject"` // Maps to DiscardNewPerSubject MaxAge string `json:"maxAge"` - MaxMsgsPerSubject int `json:"maxMsgsPerSubject"` MaxMsgSize int `json:"maxMsgSize"` Storage string `json:"storage"` + Discard string `json:"discard"` Replicas int `json:"replicas"` NoAck bool `json:"noAck"` DuplicateWindow string `json:"duplicateWindow"` // Maps to Duplicates Placement *StreamPlacement `json:"placement"` Mirror *StreamSource `json:"mirror"` Sources []*StreamSource `json:"sources"` - Sealed bool `json:"sealed"` - DenyDelete bool `json:"denyDelete"` - DenyPurge bool `json:"denyPurge"` - AllowRollup bool `json:"allowRollup"` Compression string `json:"compression"` - FirstSequence uint64 `json:"firstSequence"` // Maps to FirstSeq SubjectTransform *SubjectTransform `json:"subjectTransform"` RePublish *RePublish `json:"republish"` + Sealed bool `json:"sealed"` + DenyDelete bool `json:"denyDelete"` + DenyPurge bool `json:"denyPurge"` AllowDirect bool `json:"allowDirect"` + AllowRollup bool `json:"allowRollup"` // Maps to RollupAllowed MirrorDirect bool `json:"mirrorDirect"` - ConsumerLimits *ConsumerLimits `json:"consumerLimits"` + DiscardPerSubject bool `json:"discardPerSubject"` // Maps to DiscardNewPer + FirstSequence uint64 `json:"firstSequence"` // Maps to FirstSeq Metadata map[string]string `json:"metadata"` + ConsumerLimits *ConsumerLimits `json:"consumerLimits"` BaseStreamConfig }