Skip to content

Commit

Permalink
Move stream controller to jsm.go for pedantic mode
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelattwood committed Jan 21, 2025
1 parent 206f797 commit 347964e
Show file tree
Hide file tree
Showing 13 changed files with 369 additions and 163 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/fsnotify/fsnotify v1.8.0
github.com/go-logr/logr v1.4.2
github.com/google/go-cmp v0.6.0
github.com/nats-io/jsm.go v0.1.2
github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/jsm.go v0.1.1-0.20250120135113-0db99fd62ad9
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26
github.com/nats-io/nats.go v1.38.0
github.com/onsi/ginkgo/v2 v2.22.2
github.com/onsi/gomega v1.36.2
Expand Down Expand Up @@ -42,6 +42,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/go-tpm v0.9.3 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
github.com/google/uuid v1.6.0 // indirect
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0=
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -41,6 +43,8 @@ github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcb
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -73,12 +77,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jsm.go v0.1.2 h1:T4Fq88a03sPAPWYwrOLQ85oanYsC2Bs6517rUiWBMpQ=
github.com/nats-io/jsm.go v0.1.2/go.mod h1:tnubE70CAKi5TNfQiq6XHFqWTuSIe1H7X4sDwfq6ZK8=
github.com/nats-io/jsm.go v0.1.1-0.20250120135113-0db99fd62ad9 h1:2DfL+nZjlB8kQh5GN5IEBdVroHbRaRod3BAi07Ag89Q=
github.com/nats-io/jsm.go v0.1.1-0.20250120135113-0db99fd62ad9/go.mod h1:w/SA3/rNK5xl6ZsCqKLVgQzVLfxbYNScle8LcQ5gSBM=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4=
github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26 h1:mN0eraizaHih90T32ItWe58+l31eVjoD6JOu99WqdNc=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250109022000-c0ae95d0be26/go.mod h1:nXRZ6eQo2lmNpZLVNIMDNwKM7FgbHgPJ1pIRcPOpVuk=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/account_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024.
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
56 changes: 45 additions & 11 deletions internal/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"fmt"

"github.com/nats-io/jsm.go"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
Expand Down Expand Up @@ -62,13 +63,55 @@ type Closable interface {
Close()
}

func CreateJSMClient(cfg *NatsConfig, pedantic bool) (*jsm.Manager, Closable, error) {
nc, err := createNatsConn(cfg, pedantic)
if err != nil {
return nil, nil, fmt.Errorf("create nats connection: %w", err)
}

major, minor, _, err := versionComponents(nc.ConnectedServerVersion())
if err != nil {
return nil, nil, fmt.Errorf("parse server version: %w", err)
}

// JetStream pedantic mode unsupported prior to NATS Server 2.11
if pedantic && major < 2 || (major == 2 && minor < 11) {
pedantic = false
}

jsmOpts := make([]jsm.Option, 0)
if pedantic {
jsmOpts = append(jsmOpts, jsm.WithPedanticRequests())
}

jsmClient, err := jsm.New(nc, jsmOpts...)
if err != nil {
return nil, nil, fmt.Errorf("new jsm client: %w", err)
}

return jsmClient, nc, nil
}

// CreateJetStreamClient creates new Jetstream client with a connection based on the given NatsConfig.
// Returns a jetstream.Jetstream client and the Closable of the underlying connection.
// Close should be called when the client is no longer used.
func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, Closable, error) {
nc, err := createNatsConn(cfg, pedantic)
if err != nil {
return nil, nil, fmt.Errorf("create nats connection: %w", err)
}

js, err := jetstream.New(nc)
if err != nil {
return nil, nil, fmt.Errorf("new jetstream: %w", err)
}
return js, nc, nil
}

func createNatsConn(cfg *NatsConfig, pedantic bool) (*nats.Conn, error) {
opts, err := cfg.buildOptions()
if err != nil {
return nil, nil, fmt.Errorf("nats options: %w", err)
return nil, err
}

// Set pedantic option
Expand All @@ -82,14 +125,5 @@ func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream,
// client should always attempt to reconnect
opts = append(opts, nats.MaxReconnects(-1))

nc, err := nats.Connect(cfg.ServerURL, opts...)
if err != nil {
return nil, nil, fmt.Errorf("nats connect: %w", err)
}

js, err := jetstream.New(nc)
if err != nil {
return nil, nil, fmt.Errorf("new jetstream: %w", err)
}
return js, nc, nil
return nats.Connect(cfg.ServerURL, opts...)
}
2 changes: 1 addition & 1 deletion internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024.
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024.
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
53 changes: 53 additions & 0 deletions internal/controller/jetstream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package controller

import (
"context"
"errors"
"fmt"
"math/rand/v2"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

"github.com/google/go-cmp/cmp"
"github.com/nats-io/jsm.go"
jsmapi "github.com/nats-io/jsm.go/api"
js "github.com/nats-io/nack/controllers/jetstream"
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats.go/jetstream"
Expand All @@ -19,6 +24,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)

type JetStreamController interface {
client.Client

Expand All @@ -36,6 +43,9 @@ type JetStreamController interface {
// Returns the error of the operation or errors during client setup.
WithJetStreamClient(opts api.ConnectionOpts, ns string, op func(js jetstream.JetStream) error) error

// WithJSMClient provides a jsm.go client to the given operation.
WithJSMClient(opts api.ConnectionOpts, ns string, op func(jsm *jsm.Manager) error) error

RequeueInterval() time.Duration
}

Expand Down Expand Up @@ -76,6 +86,21 @@ func (c *jsController) ValidNamespace(namespace string) bool {
return ns == "" || ns == namespace
}

func (c *jsController) WithJSMClient(opts api.ConnectionOpts, ns string, op func(js *jsm.Manager) error) error {
cfg, err := c.natsConfigFromOpts(opts, ns)
if err != nil {
return err
}

jsmClient, closer, err := CreateJSMClient(cfg, true)
if err != nil {
return fmt.Errorf("create jsm client: %w", err)
}
defer closer.Close()

return op(jsmClient)
}

func (c *jsController) WithJetStreamClient(opts api.ConnectionOpts, ns string, op func(js jetstream.JetStream) error) error {
// Build single use client
// TODO(future-feature): Use client-pool instead of single use client
Expand Down Expand Up @@ -284,3 +309,31 @@ func jsonString(v string) []byte {
func compareConfigState(actual any, desired any) string {
return cmp.Diff(actual, desired)
}

func getErrCode(err error) uint16 {
if apiErr, ok := err.(jsmapi.ApiError); ok {
return apiErr.NatsErrorCode()
}

return 0
}

func versionComponents(version string) (major, minor, patch int, err error) {
m := semVerRe.FindStringSubmatch(version)
if m == nil {
return 0, 0, 0, errors.New("invalid semver")
}
major, err = strconv.Atoi(m[1])
if err != nil {
return -1, -1, -1, err
}
minor, err = strconv.Atoi(m[2])
if err != nil {
return -1, -1, -1, err
}
patch, err = strconv.Atoi(m[3])
if err != nil {
return -1, -1, -1, err
}
return major, minor, patch, err
}
2 changes: 1 addition & 1 deletion internal/controller/keyvalue_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024.
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/objectstore_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024.
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 347964e

Please sign in to comment.