Skip to content

Commit

Permalink
Move consumer controller to jsm.go for pedantic mode
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelattwood committed Jan 21, 2025
1 parent 347964e commit 405cbf9
Show file tree
Hide file tree
Showing 8 changed files with 452 additions and 292 deletions.
220 changes: 150 additions & 70 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/nats-io/jsm.go"
jsmapi "github.com/nats-io/jsm.go/api"

"github.com/go-logr/logr"
"github.com/nats-io/nats.go/jetstream"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -149,21 +153,21 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger
}

if !consumer.Spec.PreventDelete && !r.ReadOnly() {
err := r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
_, err := getServerConsumerState(ctx, js, consumer)
err := r.WithJSMClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js *jsm.Manager) error {
_, err := getServerConsumerState(js, consumer)
// If we have no known state for this consumer it has never been reconciled.
// If we are also receiving an error fetching state, either the consumer does not exist
// or this resource config is invalid.
if err != nil && storedState == nil {
return nil
}

return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
return js.DeleteConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
})
switch {
case errors.Is(err, jetstream.ErrConsumerNotFound):
case jsm.IsNatsError(err, JSConsumerNotFoundErr):
log.Info("Consumer does not exist. Unable to delete.")
case errors.Is(err, jetstream.ErrStreamNotFound):
case jsm.IsNatsError(err, JSStreamNotFoundErr):
log.Info("Stream of consumer does not exist. Unable to delete.")
case err != nil:
if storedState == nil {
Expand Down Expand Up @@ -201,13 +205,13 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
return fmt.Errorf("map consumer spec to target config: %w", err)
}

err = r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
err = r.WithJSMClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js *jsm.Manager) error {
storedState, err := getStoredConsumerState(consumer)
if err != nil {
log.Error(err, "Failed to fetch stored consumer state.")
}

serverState, err := getServerConsumerState(ctx, js, consumer)
serverState, err := getServerConsumerState(js, consumer)
if err != nil {
return err
}
Expand All @@ -232,18 +236,28 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
return nil
}

var updatedConsumer jetstream.Consumer
var updatedConsumer *jsm.Consumer
err = nil

if serverState == nil {
log.Info("Creating Consumer.")
updatedConsumer, err = js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
updatedConsumer, err = js.NewConsumer(consumer.Spec.StreamName, targetConfig...)
if err != nil {
return err
}
} else if !consumer.Spec.PreventUpdate {
log.Info("Updating Consumer.")
updatedConsumer, err = js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
c, err := js.LoadConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
if err != nil {
return err
}

err = c.UpdateConfiguration(targetConfig...)
if err != nil {
return err
}

updatedConsumer, err = js.LoadConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
if err != nil {
return err
}
Expand All @@ -255,7 +269,7 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger

if updatedConsumer != nil {
// Store known state in annotation
updatedState, err := json.Marshal(updatedConsumer.CachedInfo().Config)
updatedState, err := json.Marshal(updatedConsumer.Configuration())
if err != nil {
return err
}
Expand Down Expand Up @@ -295,8 +309,8 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
return nil
}

func getStoredConsumerState(consumer *api.Consumer) (*jetstream.ConsumerConfig, error) {
var storedState *jetstream.ConsumerConfig
func getStoredConsumerState(consumer *api.Consumer) (*jsmapi.ConsumerConfig, error) {
var storedState *jsmapi.ConsumerConfig
if state, ok := consumer.Annotations[stateAnnotationConsumer]; ok {
err := json.Unmarshal([]byte(state), &storedState)
if err != nil {
Expand All @@ -309,88 +323,148 @@ func getStoredConsumerState(consumer *api.Consumer) (*jetstream.ConsumerConfig,

// Fetch the current state of the consumer from the server.
// ErrConsumerNotFound is considered a valid response and does not return error
func getServerConsumerState(ctx context.Context, js jetstream.JetStream, consumer *api.Consumer) (*jetstream.ConsumerConfig, error) {
c, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
if errors.Is(err, jetstream.ErrConsumerNotFound) {
func getServerConsumerState(js *jsm.Manager, consumer *api.Consumer) (*jsmapi.ConsumerConfig, error) {
c, err := js.LoadConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
if jsm.IsNatsError(err, JSConsumerNotFoundErr) {
return nil, nil
}
if err != nil {
return nil, err
}

return &c.CachedInfo().Config, nil
consumerCfg := c.Configuration()
return &consumerCfg, nil
}

func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {
config := &jetstream.ConsumerConfig{
Durable: spec.DurableName,
Description: spec.Description,
OptStartSeq: uint64(spec.OptStartSeq),
MaxDeliver: spec.MaxDeliver,
FilterSubject: spec.FilterSubject,
RateLimit: uint64(spec.RateLimitBps),
SampleFrequency: spec.SampleFreq,
MaxWaiting: spec.MaxWaiting,
MaxAckPending: spec.MaxAckPending,
HeadersOnly: spec.HeadersOnly,
MaxRequestBatch: spec.MaxRequestBatch,
MaxRequestMaxBytes: spec.MaxRequestMaxBytes,
Replicas: spec.Replicas,
MemoryStorage: spec.MemStorage,
FilterSubjects: spec.FilterSubjects,
Metadata: spec.Metadata,
}

// DeliverPolicy
if spec.DeliverPolicy != "" {
err := config.DeliverPolicy.UnmarshalJSON(jsonString(spec.DeliverPolicy))
func consumerSpecToConfig(spec *api.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts := []jsm.ConsumerOption{
jsm.ConsumerDescription(spec.Description),
jsm.DeliverySubject(spec.DeliverSubject),
jsm.DeliverGroup(spec.DeliverGroup),
jsm.DurableName(spec.DurableName),
jsm.MaxAckPending(uint(spec.MaxAckPending)),
jsm.MaxWaiting(uint(spec.MaxWaiting)),
jsm.RateLimitBitsPerSecond(uint64(spec.RateLimitBps)),
jsm.MaxRequestBatch(uint(spec.MaxRequestBatch)),
jsm.MaxRequestMaxBytes(spec.MaxRequestMaxBytes),
jsm.ConsumerOverrideReplicas(spec.Replicas),
jsm.ConsumerMetadata(spec.Metadata),
}

// ackPolicy
switch spec.AckPolicy {
case "none":
opts = append(opts, jsm.AcknowledgeNone())
case "all":
opts = append(opts, jsm.AcknowledgeAll())
case "explicit":
opts = append(opts, jsm.AcknowledgeExplicit())
case "":
default:
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'", spec.AckPolicy)
}

// ackWait
if spec.AckWait != "" {
d, err := time.ParseDuration(spec.AckWait)
if err != nil {
return nil, fmt.Errorf("invalid delivery policy: %w", err)
return nil, fmt.Errorf("invalid ack wait duration: %w", err)
}
opts = append(opts, jsm.AckWait(d))
}

// OptStartTime RFC3339
if spec.OptStartTime != "" {
// deliverPolicy
switch spec.DeliverPolicy {
case "all":
opts = append(opts, jsm.DeliverAllAvailable())
case "last":
opts = append(opts, jsm.StartWithLastReceived())
case "new":
opts = append(opts, jsm.StartWithNextReceived())
case "byStartSequence":
opts = append(opts, jsm.StartAtSequence(uint64(spec.OptStartSeq)))
case "byStartTime":
if spec.OptStartTime == "" {
return nil, fmt.Errorf("'optStartTime' is required for deliver policy 'byStartTime'")
}
t, err := time.Parse(time.RFC3339, spec.OptStartTime)
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
return nil, err
}
config.OptStartTime = &t
opts = append(opts, jsm.StartAtTime(t))
case "":
default:
return nil, fmt.Errorf("invalid value for 'deliverPolicy': '%s'. Must be one of 'all', 'last', 'new', 'byStartSequence', 'byStartTime'", spec.DeliverPolicy)
}

// AckPolicy
if spec.AckPolicy != "" {
err := config.AckPolicy.UnmarshalJSON(jsonString(spec.AckPolicy))
if err != nil {
return nil, fmt.Errorf("invalid ack policy: %w", err)
}
// filterSubject
if spec.FilterSubject != "" && len(spec.FilterSubjects) > 0 {
return nil, errors.New("cannot set both 'filterSubject' and 'filterSubjects'")
}

// AckWait
if spec.AckWait != "" {
d, err := time.ParseDuration(spec.AckWait)
if spec.FilterSubject != "" {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubject))
} else if len(spec.FilterSubjects) > 0 {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubjects...))
}

// flowControl
if spec.FlowControl {
opts = append(opts, jsm.PushFlowControl())
}

// heartbeatInterval
if spec.HeartbeatInterval != "" {
d, err := time.ParseDuration(spec.HeartbeatInterval)
if err != nil {
return nil, fmt.Errorf("invalid ack wait duration: %w", err)
return nil, fmt.Errorf("invalid heartbeat interval: %w", err)
}
config.AckWait = d

opts = append(opts, jsm.IdleHeartbeat(d))
}

// BackOff
for _, bo := range spec.BackOff {
d, err := time.ParseDuration(bo)
if err != nil {
return nil, fmt.Errorf("invalid backoff: %w", err)
// maxDeliver
if spec.MaxDeliver != 0 {
opts = append(opts, jsm.MaxDeliveryAttempts(spec.MaxDeliver))
}

// backoff
if len(spec.BackOff) > 0 {
backoffs := make([]time.Duration, 0)
for _, bo := range spec.BackOff {
d, err := time.ParseDuration(bo)
if err != nil {
return nil, fmt.Errorf("invalid backoff: %w", err)
}
backoffs = append(backoffs, d)
}

config.BackOff = append(config.BackOff, d)
opts = append(opts, jsm.BackoffIntervals(backoffs...))
}

// ReplayPolicy
if spec.ReplayPolicy != "" {
err := config.ReplayPolicy.UnmarshalJSON(jsonString(spec.ReplayPolicy))
// replayPolicy
switch spec.ReplayPolicy {
case "instant":
opts = append(opts, jsm.ReplayInstantly())
case "original":
opts = append(opts, jsm.ReplayAsReceived())
case "":
default:
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'", spec.ReplayPolicy)
}

if spec.SampleFreq != "" {
n, err := strconv.Atoi(
strings.TrimSuffix(spec.SampleFreq, "%"),
)
if err != nil {
return nil, fmt.Errorf("invalid replay policy: %w", err)
return nil, err
}
opts = append(opts, jsm.SamplePercent(n))
}

if spec.HeadersOnly {
opts = append(opts, jsm.DeliverHeadersOnly())
}

// MaxRequestExpires
Expand All @@ -399,18 +473,24 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
}
config.MaxRequestExpires = d
opts = append(opts, jsm.MaxRequestExpires(d))
}

// inactiveThreshold
if spec.InactiveThreshold != "" {
d, err := time.ParseDuration(spec.InactiveThreshold)
if err != nil {
return nil, fmt.Errorf("invalid inactive threshold: %w", err)
}
config.InactiveThreshold = d
opts = append(opts, jsm.InactiveThreshold(d))
}

// memStorage
if spec.MemStorage {
opts = append(opts, jsm.ConsumerOverrideMemoryStorage())
}

return config, nil
return opts, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading

0 comments on commit 405cbf9

Please sign in to comment.