From 0ad68989d36a28ce18205d35eff5702ec74d7a41 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Tue, 31 Jan 2023 17:07:39 -0500 Subject: [PATCH 01/18] Add support for an embedded NATS server Signed-off-by: Byron Ruth --- go.mod | 9 ++- go.sum | 13 ++-- pkg/drivers/jetstream/jetstream.go | 96 +++++++++++++++++++++++------- 3 files changed, 89 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 30bbc14f..1240d9ca 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,13 @@ require ( github.com/Rican7/retry v0.1.0 github.com/canonical/go-dqlite v1.5.1 github.com/go-sql-driver/mysql v1.6.0 + github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.3.1 - github.com/klauspost/compress v1.14.4 + github.com/klauspost/compress v1.15.11 github.com/mattn/go-sqlite3 v1.14.15 github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee - github.com/nats-io/nats.go v1.17.1-0.20220923204156-36d2b654c70f + github.com/nats-io/nats-server/v2 v2.9.11 + github.com/nats-io/nats.go v1.19.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 github.com/rancher/wrangler v0.8.3 @@ -41,14 +43,15 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect - github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.3.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect diff --git a/go.sum b/go.sum index bfb9d8e8..61fa73af 100644 --- a/go.sum +++ b/go.sum @@ -391,8 +391,9 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -451,13 +452,15 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee h1:+l6i7zS8N1LOokm7dzShezI9STRGrzp0O49Pw8Jetdk= github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee/go.mod h1:EKSYvbvWAoh0hIfuZ+ieWm8u0VOTRTeDfuQvNPKRqEg= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72 h1:Moe/K4fo/5FCNpE/TYrMt7sEPUuldBVJ0D4g/SWFkd0= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc= +github.com/nats-io/nats-server/v2 v2.9.11 h1:4y5SwWvWI59V5mcqtuoqKq6L9NDUydOP3Ekwuwl8cZI= +github.com/nats-io/nats-server/v2 v2.9.11/go.mod h1:b0oVuxSlkvS3ZjMkncFeACGyZohbO4XhSqW1Lt7iRRY= github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nats.go v1.17.1-0.20220923204156-36d2b654c70f h1:HPCi1mdDweg4Kwpp9Ej5zH0qylyo7vJ7aHPY6YjTNv4= -github.com/nats-io/nats.go v1.17.1-0.20220923204156-36d2b654c70f/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.19.0 h1:H6j8aBnTQFoVrTGB6Xjd903UMdE7jz6DS4YkmAqgZ9Q= +github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/pkg/drivers/jetstream/jetstream.go b/pkg/drivers/jetstream/jetstream.go index 960c07f6..f4740e54 100644 --- a/pkg/drivers/jetstream/jetstream.go +++ b/pkg/drivers/jetstream/jetstream.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "net/url" + "os" + "os/signal" "regexp" "sort" "strconv" @@ -16,6 +18,7 @@ import ( "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/nats-io/jsm.go/natscontext" + natsserver "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) @@ -31,11 +34,15 @@ var ( ) type Config struct { - natsURL string - options []nats.Option - revHistory uint8 - bucket string - slowMethod time.Duration + clientURL string + clientOptions []nats.Option + revHistory uint8 + bucket string + slowMethod time.Duration + // If true, a server will be embedded and started. + embedServer bool + // Path to a server configuration file when embedded. + serverConfig string } type JetStream struct { @@ -46,6 +53,7 @@ type JetStream struct { jetStream nats.JetStreamContext slowMethod time.Duration server.Backend + natsConn *nats.Conn } type JSValue struct { @@ -100,20 +108,63 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac return nil, err } + // Run an embedded server. + var ns *natsserver.Server + if config.embedServer { + opts := &natsserver.Options{} + if config.serverConfig == "" { + // TODO: Other defaults for easy single node config? + opts.JetStream = true + } else { + // Parse the server config file as options + opts, err = natsserver.ProcessConfigFile(config.serverConfig) + if err != nil { + return nil, fmt.Errorf("failed to process NATS server config file: %w", err) + } + } + + // Create the server, ensure the options are all valid. + ns, err = natsserver.NewServer(opts) + if err != nil { + return nil, fmt.Errorf("failed to create embedded NATS server: %w", err) + } + // Start the server. + go ns.Start() + logrus.Infof("using an embedded NATS server") + + // Wait for the server to be ready. + // TODO: limit the number of retries? + for { + if ns.ReadyForConnections(5 * time.Second) { + break + } + } + + // TODO: No method on backend.Driver exists to indicate a shutdown. + sigch := make(chan os.Signal, 1) + signal.Notify(sigch, os.Interrupt) + go func() { + <-sigch + ns.Shutdown() + }() + + // Use the local server's client URL. + config.clientURL = ns.ClientURL() + } + logrus.Infof("using bucket: %s", config.bucket) - logrus.Infof("connecting to %s", config.natsURL) + logrus.Infof("connecting to %s", config.clientURL) - nopts := append(config.options, nats.Name("k3s-server using bucket: "+config.bucket)) + nopts := append(config.clientOptions, nats.Name("k3s-server using bucket: "+config.bucket)) - conn, err := nats.Connect(config.natsURL, nopts...) + conn, err := nats.Connect(config.clientURL, nopts...) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to connect to NATS server: %w", err) } js, err := conn.JetStream() - if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get JetStream context: %w", err) } bucket, err := js.KeyValue(config.bucket) @@ -144,15 +195,13 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac // parseNatsConnection returns nats connection url, bucketName and []nats.Option, error func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { - jsConfig := &Config{ slowMethod: defaultSlowMethod, revHistory: defaultRevHistory, } connections := strings.Split(dsn, ",") jsConfig.bucket = defaultBucket - - jsConfig.options = make([]nats.Option, 0) + jsConfig.clientOptions = make([]nats.Option, 0) u, err := url.Parse(connections[0]) if err != nil { @@ -192,11 +241,11 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { } if tlsInfo.KeyFile != "" && tlsInfo.CertFile != "" { - jsConfig.options = append(jsConfig.options, nats.ClientCert(tlsInfo.CertFile, tlsInfo.KeyFile)) + jsConfig.clientOptions = append(jsConfig.clientOptions, nats.ClientCert(tlsInfo.CertFile, tlsInfo.KeyFile)) } if tlsInfo.CAFile != "" { - jsConfig.options = append(jsConfig.options, nats.RootCAs(tlsInfo.CAFile)) + jsConfig.clientOptions = append(jsConfig.clientOptions, nats.RootCAs(tlsInfo.CAFile)) } if hasContext { @@ -212,11 +261,11 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { // command line options provided to kine will override the file // https://github.com/nats-io/jsm.go/blob/v0.0.29/natscontext/context.go#L257 // allows for user, creds, nke, token, certifcate, ca, inboxprefix from the context.json - natsClientOpts, err := natsContext.NATSOptions(jsConfig.options...) + natsClientOpts, err := natsContext.NATSOptions(jsConfig.clientOptions...) if err != nil { return nil, err } - jsConfig.options = natsClientOpts + jsConfig.clientOptions = natsClientOpts } connBuilder := strings.Builder{} @@ -239,14 +288,19 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { if u.User != nil && idx == 0 { userInfo := strings.Split(u.User.String(), ":") if len(userInfo) > 1 { - jsConfig.options = append(jsConfig.options, nats.UserInfo(userInfo[0], userInfo[1])) + jsConfig.clientOptions = append(jsConfig.clientOptions, nats.UserInfo(userInfo[0], userInfo[1])) } else { - jsConfig.options = append(jsConfig.options, nats.Token(userInfo[0])) + jsConfig.clientOptions = append(jsConfig.clientOptions, nats.Token(userInfo[0])) } } connBuilder.WriteString(u.Host) } - jsConfig.natsURL = connBuilder.String() + jsConfig.clientURL = connBuilder.String() + + if queryMap.Has("embedServer") { + jsConfig.embedServer = true + jsConfig.serverConfig = queryMap.Get("serverConfig") + } logrus.Infof("using config %v", jsConfig) From 7c11c5b1a8b59500cebe57dfac97495fecb02760 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Tue, 31 Jan 2023 21:30:55 -0500 Subject: [PATCH 02/18] Put embedded server behind build flag Rename various identifiers to NATS rather than JetStream, which is a subsystem of NATS. Signed-off-by: Byron Ruth --- .../{jetstream => nats}/kv/etcd_encoder.go | 0 pkg/drivers/{jetstream => nats}/kv/kv.go | 0 .../{jetstream/jetstream.go => nats/nats.go} | 23 ++++----------- pkg/drivers/nats/server/interface.go | 10 +++++++ pkg/drivers/nats/server/no_server.go | 12 ++++++++ pkg/drivers/nats/server/server.go | 28 +++++++++++++++++++ pkg/endpoint/endpoint.go | 22 +++++++-------- scripts/test | 4 +-- scripts/{test-run-jetstream => test-run-nats} | 3 +- 9 files changed, 70 insertions(+), 32 deletions(-) rename pkg/drivers/{jetstream => nats}/kv/etcd_encoder.go (100%) rename pkg/drivers/{jetstream => nats}/kv/kv.go (100%) rename pkg/drivers/{jetstream/jetstream.go => nats/nats.go} (97%) create mode 100644 pkg/drivers/nats/server/interface.go create mode 100644 pkg/drivers/nats/server/no_server.go create mode 100644 pkg/drivers/nats/server/server.go rename scripts/{test-run-jetstream => test-run-nats} (96%) diff --git a/pkg/drivers/jetstream/kv/etcd_encoder.go b/pkg/drivers/nats/kv/etcd_encoder.go similarity index 100% rename from pkg/drivers/jetstream/kv/etcd_encoder.go rename to pkg/drivers/nats/kv/etcd_encoder.go diff --git a/pkg/drivers/jetstream/kv/kv.go b/pkg/drivers/nats/kv/kv.go similarity index 100% rename from pkg/drivers/jetstream/kv/kv.go rename to pkg/drivers/nats/kv/kv.go diff --git a/pkg/drivers/jetstream/jetstream.go b/pkg/drivers/nats/nats.go similarity index 97% rename from pkg/drivers/jetstream/jetstream.go rename to pkg/drivers/nats/nats.go index f4740e54..84673716 100644 --- a/pkg/drivers/jetstream/jetstream.go +++ b/pkg/drivers/nats/nats.go @@ -1,4 +1,4 @@ -package jetstream +package nats import ( "context" @@ -14,11 +14,11 @@ import ( "sync" "time" - "github.com/k3s-io/kine/pkg/drivers/jetstream/kv" + "github.com/k3s-io/kine/pkg/drivers/nats/kv" + natsserver "github.com/k3s-io/kine/pkg/drivers/nats/server" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/nats-io/jsm.go/natscontext" - natsserver "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" ) @@ -109,22 +109,9 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac } // Run an embedded server. - var ns *natsserver.Server + var ns natsserver.Server if config.embedServer { - opts := &natsserver.Options{} - if config.serverConfig == "" { - // TODO: Other defaults for easy single node config? - opts.JetStream = true - } else { - // Parse the server config file as options - opts, err = natsserver.ProcessConfigFile(config.serverConfig) - if err != nil { - return nil, fmt.Errorf("failed to process NATS server config file: %w", err) - } - } - - // Create the server, ensure the options are all valid. - ns, err = natsserver.NewServer(opts) + ns, err = natsserver.New(config.serverConfig) if err != nil { return nil, fmt.Errorf("failed to create embedded NATS server: %w", err) } diff --git a/pkg/drivers/nats/server/interface.go b/pkg/drivers/nats/server/interface.go new file mode 100644 index 00000000..e97ebe2d --- /dev/null +++ b/pkg/drivers/nats/server/interface.go @@ -0,0 +1,10 @@ +package server + +import "time" + +type Server interface { + Start() + Shutdown() + ClientURL() string + ReadyForConnections(wait time.Duration) bool +} diff --git a/pkg/drivers/nats/server/no_server.go b/pkg/drivers/nats/server/no_server.go new file mode 100644 index 00000000..be5faf54 --- /dev/null +++ b/pkg/drivers/nats/server/no_server.go @@ -0,0 +1,12 @@ +//go:build !nats +// +build !nats + +package server + +import ( + "errors" +) + +func New(configFile string) (Server, error) { + return nil, errors.New(`this binary is built without embedded nats support, compile with "-tags nats"`) +} diff --git a/pkg/drivers/nats/server/server.go b/pkg/drivers/nats/server/server.go new file mode 100644 index 00000000..4ff4a093 --- /dev/null +++ b/pkg/drivers/nats/server/server.go @@ -0,0 +1,28 @@ +//go:build nats +// +build nats + +package server + +import ( + "fmt" + + "github.com/nats-io/nats-server/v2/server" +) + +func New(configFile string) (Server, error) { + opts := &server.Options{} + + if configFile == "" { + // TODO: Other defaults for easy single node config? + opts.JetStream = true + } else { + // Parse the server config file as options + var err error + opts, err = server.ProcessConfigFile(configFile) + if err != nil { + return nil, fmt.Errorf("failed to process NATS server config file: %w", err) + } + } + + return server.NewServer(opts) +} diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index 189a9cad..eb0e85dc 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -9,8 +9,8 @@ import ( "github.com/k3s-io/kine/pkg/drivers/dqlite" "github.com/k3s-io/kine/pkg/drivers/generic" - "github.com/k3s-io/kine/pkg/drivers/jetstream" "github.com/k3s-io/kine/pkg/drivers/mysql" + "github.com/k3s-io/kine/pkg/drivers/nats" "github.com/k3s-io/kine/pkg/drivers/pgsql" "github.com/k3s-io/kine/pkg/drivers/sqlite" "github.com/k3s-io/kine/pkg/metrics" @@ -27,13 +27,13 @@ import ( ) const ( - KineSocket = "unix://kine.sock" - SQLiteBackend = "sqlite" - DQLiteBackend = "dqlite" - ETCDBackend = "etcd3" - JetStreamBackend = "jetstream" - MySQLBackend = "mysql" - PostgresBackend = "postgres" + KineSocket = "unix://kine.sock" + SQLiteBackend = "sqlite" + DQLiteBackend = "dqlite" + ETCDBackend = "etcd3" + NATSBackend = "nats" + MySQLBackend = "mysql" + PostgresBackend = "postgres" ) type Config struct { @@ -246,8 +246,8 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case MySQLBackend: backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) - case JetStreamBackend: - backend, err = jetstream.New(ctx, dsn, cfg.BackendTLSConfig) + case NATSBackend: + backend, err = nats.New(ctx, dsn, cfg.BackendTLSConfig) default: return false, nil, fmt.Errorf("storage backend is not defined") } @@ -262,7 +262,7 @@ func ParseStorageEndpoint(storageEndpoint string) (string, string) { case "": return SQLiteBackend, "" case "nats": - return JetStreamBackend, storageEndpoint + return NATSBackend, storageEndpoint case "http": fallthrough case "https": diff --git a/scripts/test b/scripts/test index f1c0a4de..3f2d3384 100755 --- a/scripts/test +++ b/scripts/test @@ -26,7 +26,7 @@ echo "Did test-run-postgres $?" . ./scripts/test-run-cockroachdb echo "Did test-run-cockroachdb $?" -. ./scripts/test-run-jetstream -echo "Did test-jetstream $?" +. ./scripts/test-run-nats +echo "Did test-nats$?" exit 0 diff --git a/scripts/test-run-jetstream b/scripts/test-run-nats similarity index 96% rename from scripts/test-run-jetstream rename to scripts/test-run-nats index faa188ff..d63c191d 100755 --- a/scripts/test-run-jetstream +++ b/scripts/test-run-nats @@ -16,7 +16,8 @@ export -f start-test # test minimum supported version and most recent version of NATS VERSION_LIST="\ nats 2.7.4 - nats 2.9.1" + nats 2.8.4 + nats 2.9.11" while read ENGINE VERSION; do LABEL=$ENGINE-$VERSION DB_PASSWORD_ENV=NATS_JS_PASSWORD DB_ARGS="-js" DB_IMAGE=docker.io/library/$ENGINE:$VERSION run-test From a87bda42e9604a49a52180fa66710db04a3fcbd9 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 1 Feb 2023 11:01:27 -0500 Subject: [PATCH 03/18] Refactoring Signed-off-by: Byron Ruth --- pkg/drivers/nats/nats.go | 506 ++++++++++++++------------- pkg/drivers/nats/server/no_server.go | 4 +- pkg/drivers/nats/server/server.go | 11 +- 3 files changed, 268 insertions(+), 253 deletions(-) diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index 84673716..3d8606c5 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -25,6 +25,7 @@ import ( const ( defaultBucket = "kine" + defaultReplicas = 1 defaultRevHistory = 10 defaultSlowMethod = 500 * time.Millisecond ) @@ -34,26 +35,68 @@ var ( ) type Config struct { - clientURL string + // Client URL which could be a list of comma separated URLs. + clientURL string + // Client connection options. clientOptions []nats.Option - revHistory uint8 - bucket string - slowMethod time.Duration + // Number of revisions to keep in history. Defaults to 10. + revHistory uint8 + // Name of the bucket. Defaults to "kine". + bucket string + // Number of replicas for the bucket. Defaults to 1 + replicas int + // Indicates the duration of a method before it is considered slow. Defaults to 500ms. + slowThreshold time.Duration // If true, a server will be embedded and started. embedServer bool // Path to a server configuration file when embedded. serverConfig string + // If true, the embedded server will log to stdout. + stdoutLogging bool } -type JetStream struct { - kvBucket nats.KeyValue - kvBucketMutex *sync.RWMutex - kvDirectoryMutex *sync.RWMutex - kvDirectoryMuxes map[string]*sync.RWMutex - jetStream nats.JetStreamContext - slowMethod time.Duration - server.Backend - natsConn *nats.Conn +type Driver struct { + nc *nats.Conn + js nats.JetStreamContext + kv nats.KeyValue + + dirMu *sync.RWMutex + subMus map[string]*sync.RWMutex + + slowThreshold time.Duration +} + +func (d *Driver) logMethod(dur time.Duration, str string, args ...any) { + if dur > d.slowThreshold { + logrus.Warnf(str, args...) + } else { + logrus.Tracef(str, args...) + } +} + +func getTopLevelKey(key string) string { + if toplevelKeyMatch.MatchString(key) { + matches := toplevelKeyMatch.FindStringSubmatch(key) + return matches[1] + } + return "" +} + +func (d *Driver) lockFolder(key string) (unlock func()) { + lockFolder := getTopLevelKey(key) + if lockFolder == "" { + return func() {} + } + + d.dirMu.Lock() + mu, ok := d.subMus[lockFolder] + if !ok { + mu = &sync.RWMutex{} + d.subMus[lockFolder] = mu + } + d.dirMu.Unlock() + mu.Lock() + return mu.Unlock } type JSValue struct { @@ -63,45 +106,45 @@ type JSValue struct { Delete bool } -// New get the JetStream Backend, establish connection to NATS JetStream. At the moment nats.go does not have -// connection string support so kine will use: -// nats://(token|username:password)hostname:port?bucket=bucketName&contextFile=nats-context&slowMethod=&revHistory=`. +// New return an implementation of server.Backend using NATS + JetStream. +// Various connection formats are supported using the nats:// scheme +// and having the following format: // -// If contextFile is provided then do not provide a hostname:port in the endpoint URL, instead use the context file to -// provide the NATS server url(s). +// nats://@:?` // -// bucket: specifies the bucket on the nats server for the k8s key/values for this cluster (optional) -// contextFile: specifies the nats context file to load e.g. /etc/nats/context.json -// revHistory: controls the rev history for JetStream defaults to 10 must be > 2 and <= 64 -// slowMethod: used to log methods slower than provided duration default 500ms +// - auth - optional and can be user/pass or token +// - hostname:port - specifies the NATS server address +// - params - optional query parameters // -// Multiple urls can be passed in a comma separated format - only the first in the list will be evaluated for query -// parameters. While auth is valid in the url, the preferred way to pass auth is through a context file. If user/pass or -// token are provided in the url only the first one will be used for all urls. -/// -// If no bucket query parameter is provided it will default to kine +// The following query parameters are supported: // -// https://docs.nats.io/using-nats/nats-tools/nats_cli#configuration-contexts +// - bucket - specifies the name of the NATS key-value bucket. Default is "kine" +// - replicas - specifies the number of replicas for the bucket. Default is 1 +// - revHistory - specifies the number of revisions to keep in history. Default is 10 +// - slowMethod - specifies the duration of a method before it is considered slow. Default is 500ms +// - contextFile - specifies the path to a NATS context file +// - embedServer - specifies whether to embed a NATS server. Default is false. +// - serverConfig - specifies the path to a NATS server configuration file if embedServer is true. +// - stdoutLogging - specifies whether to log to stdout if embedServer is true. Default is false. // -// example nats-context.json: -/* -{ - "description": "optional context description", - "url": "nats://127.0.0.1:4222", - "token": "", - "user": "", - "password": "", - "creds": "", - "nkey": "", - "cert": "", - "key": "", - "ca": "", - "nsc": "", - "jetstream_domain": "", - "jetstream_api_prefix": "", - "jetstream_event_prefix": "" -} -*/ +// If contextFile is provided then hostname:port should not be provided. +// +// Multiple URLs can be passed in a comma separated format, however only the first URL +// in the list will be evaluated for query parameters. While auth is valid in the URL, the +// recommended way to pass auth is through a context file. If user/pass or token are provided +// in the URL only the first one will be used for all URLs. +// +// See https://docs.nats.io/using-nats/nats-tools/nats_cli#configuration-contexts for more information +// on configuring a NATS context file. +// +// Examples: +// +// - nats://localhost:4222?replicas=3 +// - nats://?contextFile=/path/to/context.json +// - nats://user:pass@localhost:4222 +// - nats://token@localhost:4222 +// - nats://?embedServer=true +// - nats://?embedServer=true&serverConfig=/path/to/server.conf func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Backend, error) { config, err := parseNatsConnection(connection, tlsInfo) if err != nil { @@ -109,15 +152,16 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac } // Run an embedded server. - var ns natsserver.Server if config.embedServer { - ns, err = natsserver.New(config.serverConfig) + logrus.Infof("using an embedded NATS server") + + ns, err := natsserver.New(config.serverConfig, config.stdoutLogging) if err != nil { return nil, fmt.Errorf("failed to create embedded NATS server: %w", err) } // Start the server. go ns.Start() - logrus.Infof("using an embedded NATS server") + logrus.Infof("started embedded NATS server") // Wait for the server to be ready. // TODO: limit the number of retries? @@ -133,6 +177,7 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac go func() { <-sigch ns.Shutdown() + logrus.Infof("embedded NATS server shutdown") }() // Use the local server's client URL. @@ -161,6 +206,7 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac Bucket: config.bucket, Description: "Holds kine key/values", History: config.revHistory, + Replicas: config.replicas, }) } @@ -170,75 +216,86 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac return nil, err } - return &JetStream{ - kvBucket: kvB, - kvBucketMutex: &sync.RWMutex{}, - kvDirectoryMutex: &sync.RWMutex{}, - kvDirectoryMuxes: make(map[string]*sync.RWMutex), - jetStream: js, - slowMethod: config.slowMethod, + return &Driver{ + kv: kvB, + dirMu: &sync.RWMutex{}, + subMus: make(map[string]*sync.RWMutex), + js: js, + slowThreshold: config.slowThreshold, }, nil } // parseNatsConnection returns nats connection url, bucketName and []nats.Option, error func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { - jsConfig := &Config{ - slowMethod: defaultSlowMethod, - revHistory: defaultRevHistory, + config := &Config{ + slowThreshold: defaultSlowMethod, + revHistory: defaultRevHistory, + bucket: defaultBucket, + replicas: defaultReplicas, } - connections := strings.Split(dsn, ",") - jsConfig.bucket = defaultBucket - jsConfig.clientOptions = make([]nats.Option, 0) + // Parse the first URL in the connection string which contains the + // query parameters. + connections := strings.Split(dsn, ",") u, err := url.Parse(connections[0]) if err != nil { return nil, err } + // Extract the query parameters to build configuration. queryMap, err := url.ParseQuery(u.RawQuery) if err != nil { return nil, err } - if b, ok := queryMap["bucket"]; ok { - jsConfig.bucket = b[0] + if v := queryMap.Get("bucket"); v != "" { + config.bucket = v + } + + if v := queryMap.Get("replicas"); v != "" { + if r, err := strconv.ParseUint(v, 10, 8); err == nil { + if r >= 1 && r <= 5 { + config.replicas = int(r) + } else { + return nil, fmt.Errorf("invalid replicas, must be >= 1 and <= 5") + } + } } - if r, ok := queryMap["slowMethod"]; ok { - if dur, err := time.ParseDuration(r[0]); err == nil { - jsConfig.slowMethod = dur + if d := queryMap.Get("slowMethod"); d != "" { + if dur, err := time.ParseDuration(d); err == nil { + config.slowThreshold = dur } else { - return nil, err + return nil, fmt.Errorf("invalid slowMethod duration: %w", err) } } - if r, ok := queryMap["revHistory"]; ok { - if revs, err := strconv.ParseUint(r[0], 10, 8); err == nil { + if r := queryMap.Get("revHistory"); r != "" { + if revs, err := strconv.ParseUint(r, 10, 8); err == nil { if revs >= 2 && revs <= 64 { - jsConfig.revHistory = uint8(revs) + config.revHistory = uint8(revs) } else { - return nil, fmt.Errorf("invalid revHistory, must be => 2 and <= 64") + return nil, fmt.Errorf("invalid revHistory, must be >= 2 and <= 64") } } } - contextFile, hasContext := queryMap["contextFile"] - if hasContext && u.Host != "" { - return jsConfig, fmt.Errorf("when using context endpoint no host should be provided") - } - if tlsInfo.KeyFile != "" && tlsInfo.CertFile != "" { - jsConfig.clientOptions = append(jsConfig.clientOptions, nats.ClientCert(tlsInfo.CertFile, tlsInfo.KeyFile)) + config.clientOptions = append(config.clientOptions, nats.ClientCert(tlsInfo.CertFile, tlsInfo.KeyFile)) } if tlsInfo.CAFile != "" { - jsConfig.clientOptions = append(jsConfig.clientOptions, nats.RootCAs(tlsInfo.CAFile)) + config.clientOptions = append(config.clientOptions, nats.RootCAs(tlsInfo.CAFile)) } - if hasContext { - logrus.Infof("loading nats contextFile=%s", contextFile[0]) + if f := queryMap.Get("contextFile"); f != "" { + if u.Host != "" { + return config, fmt.Errorf("when using context endpoint no host should be provided") + } - natsContext, err := natscontext.NewFromFile(contextFile[0]) + logrus.Debugf("loading nats context file: %s", f) + + natsContext, err := natscontext.NewFromFile(f) if err != nil { return nil, err } @@ -248,11 +305,11 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { // command line options provided to kine will override the file // https://github.com/nats-io/jsm.go/blob/v0.0.29/natscontext/context.go#L257 // allows for user, creds, nke, token, certifcate, ca, inboxprefix from the context.json - natsClientOpts, err := natsContext.NATSOptions(jsConfig.clientOptions...) + natsClientOpts, err := natsContext.NATSOptions(config.clientOptions...) if err != nil { return nil, err } - jsConfig.clientOptions = natsClientOpts + config.clientOptions = natsClientOpts } connBuilder := strings.Builder{} @@ -275,28 +332,31 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { if u.User != nil && idx == 0 { userInfo := strings.Split(u.User.String(), ":") if len(userInfo) > 1 { - jsConfig.clientOptions = append(jsConfig.clientOptions, nats.UserInfo(userInfo[0], userInfo[1])) + config.clientOptions = append(config.clientOptions, nats.UserInfo(userInfo[0], userInfo[1])) } else { - jsConfig.clientOptions = append(jsConfig.clientOptions, nats.Token(userInfo[0])) + config.clientOptions = append(config.clientOptions, nats.Token(userInfo[0])) } } connBuilder.WriteString(u.Host) } - jsConfig.clientURL = connBuilder.String() + config.clientURL = connBuilder.String() + + // If this option is set, consider the server-specific options. if queryMap.Has("embedServer") { - jsConfig.embedServer = true - jsConfig.serverConfig = queryMap.Get("serverConfig") + config.embedServer = true + config.serverConfig = queryMap.Get("serverConfig") + config.stdoutLogging = queryMap.Has("stdoutLogging") } - logrus.Infof("using config %v", jsConfig) + logrus.Debugf("using config %#v", config) - return jsConfig, nil + return config, nil } -func (j *JetStream) Start(ctx context.Context) error { +func (d *Driver) Start(ctx context.Context) error { // See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L97 - if _, err := j.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0); err != nil { + if _, err := d.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0); err != nil { if err != server.ErrKeyExists { logrus.Errorf("Failed to create health check key: %v", err) } @@ -304,14 +364,14 @@ func (j *JetStream) Start(ctx context.Context) error { return nil } -func (j *JetStream) isKeyExpired(_ context.Context, createTime time.Time, value *JSValue) bool { +func (d *Driver) isKeyExpired(_ context.Context, createTime time.Time, value *JSValue) bool { requestTime := time.Now() expired := false if value.KV.Lease > 0 { if requestTime.After(createTime.Add(time.Second * time.Duration(value.KV.Lease))) { expired = true - if err := j.kvBucket.Delete(value.KV.Key); err != nil { + if err := d.kv.Delete(value.KV.Key); err != nil { logrus.Warnf("problem deleting expired key=%s, error=%v", value.KV.Key, err) } } @@ -321,51 +381,48 @@ func (j *JetStream) isKeyExpired(_ context.Context, createTime time.Time, value } // Get returns the associated server.KeyValue -func (j *JetStream) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) { - //logrus.Tracef("GET %s, rev=%d", key, revision) +func (d *Driver) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) { start := time.Now() defer func() { - duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + dur := time.Since(start) size := 0 if kvRet != nil { size = len(kvRet.Value) } fStr := "GET %s, rev=%d => revRet=%d, kv=%v, size=%d, err=%v, duration=%s" - if duration > j.slowMethod { - logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.String()) - } else { - logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.String()) - } + d.logMethod(dur, fStr, key, revision, revRet, kvRet != nil, size, errRet, dur) }() - currentRev, err := j.currentRevision() + currentRev, err := d.currentRevision() if err != nil { return currentRev, nil, err } - if rev, kv, err := j.get(ctx, key, revision, false); err == nil { + rev, kv, err := d.get(ctx, key, revision, false) + if err == nil { if kv == nil { return currentRev, nil, nil } return rev, kv.KV, nil - } else if err == nats.ErrKeyNotFound { + } + + if err == nats.ErrKeyNotFound { return currentRev, nil, nil - } else { - return rev, nil, err } -} -func (j *JetStream) get(ctx context.Context, key string, revision int64, includeDeletes bool) (int64, *JSValue, error) { + return rev, nil, err +} - compactRev, err := j.compactRevision() +func (d *Driver) get(ctx context.Context, key string, revision int64, includeDeletes bool) (int64, *JSValue, error) { + compactRev, err := d.compactRevision() if err != nil { return 0, nil, err } // Get latest revision if revision <= 0 { - if entry, err := j.kvBucket.Get(key); err == nil { - + entry, err := d.kv.Get(key) + if err == nil { val, err := decode(entry) if err != nil { return 0, nil, err @@ -375,67 +432,59 @@ func (j *JetStream) get(ctx context.Context, key string, revision int64, include return 0, nil, nats.ErrKeyNotFound } - if j.isKeyExpired(ctx, entry.Created(), &val) { + if d.isKeyExpired(ctx, entry.Created(), &val) { return 0, nil, nats.ErrKeyNotFound } return val.KV.ModRevision, &val, nil - } else if err == nats.ErrKeyNotFound { - return 0, nil, err - } else { - return 0, nil, err } - } else { - if revision < compactRev { - logrus.Warnf("requested revision that has been compacted") + if err == nats.ErrKeyNotFound { + return 0, nil, err } - if entry, err := j.kvBucket.GetRevision(key, uint64(revision)); err == nil { - val, err := decode(entry) - if err != nil { - return 0, nil, err - } + return 0, nil, err + } - if val.Delete && !includeDeletes { - return 0, nil, nats.ErrKeyNotFound - } + if revision < compactRev { + logrus.Warnf("requested revision has been compacted") + } - if j.isKeyExpired(ctx, entry.Created(), &val) { - return 0, nil, nats.ErrKeyNotFound - } - return val.KV.ModRevision, &val, nil - } else if err == nats.ErrKeyNotFound { - return 0, nil, err - } else { + entry, err := d.kv.GetRevision(key, uint64(revision)) + if err == nil { + val, err := decode(entry) + if err != nil { return 0, nil, err } + + if val.Delete && !includeDeletes { + return 0, nil, nats.ErrKeyNotFound + } + + if d.isKeyExpired(ctx, entry.Created(), &val) { + return 0, nil, nats.ErrKeyNotFound + } + return val.KV.ModRevision, &val, nil + } + + if err == nats.ErrKeyNotFound { + return 0, nil, err } + + return 0, nil, err } // Create -func (j *JetStream) Create(ctx context.Context, key string, value []byte, lease int64) (revRet int64, errRet error) { +func (d *Driver) Create(ctx context.Context, key string, value []byte, lease int64) (revRet int64, errRet error) { start := time.Now() defer func() { - duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + dur := time.Since(start) fStr := "CREATE %s, size=%d, lease=%d => rev=%d, err=%v, duration=%s" - if duration > j.slowMethod { - logrus.Warnf(fStr, key, len(value), lease, revRet, errRet, duration.String()) - } else { - logrus.Tracef(fStr, key, len(value), lease, revRet, errRet, duration.String()) - } + d.logMethod(dur, fStr, key, len(value), lease, revRet, errRet, dur) }() - lockFolder := getTopLevelKey(key) - if lockFolder != "" { - j.kvDirectoryMutex.Lock() - if _, ok := j.kvDirectoryMuxes[lockFolder]; !ok { - j.kvDirectoryMuxes[lockFolder] = &sync.RWMutex{} - } - j.kvDirectoryMutex.Unlock() - j.kvDirectoryMuxes[lockFolder].Lock() - defer j.kvDirectoryMuxes[lockFolder].Unlock() - } + // Lock the folder containing this key. + defer d.lockFolder(key)() // check if key exists already - rev, prevKV, err := j.get(ctx, key, 0, true) + rev, prevKV, err := d.get(ctx, key, 0, true) if err != nil && err != nats.ErrKeyNotFound { return 0, err } @@ -466,42 +515,31 @@ func (j *JetStream) Create(ctx context.Context, key string, value []byte, lease } if prevKV != nil { - seq, err := j.kvBucket.Put(key, event) + seq, err := d.kv.Put(key, event) if err != nil { return 0, err } return int64(seq), nil } - seq, err := j.kvBucket.Create(key, event) + seq, err := d.kv.Create(key, event) if err != nil { return 0, err } return int64(seq), nil } -func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) { +func (d *Driver) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) { start := time.Now() defer func() { - duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + dur := time.Since(start) fStr := "DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v, duration=%s" - if duration > j.slowMethod { - logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.String()) - } else { - logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.String()) - } + d.logMethod(dur, fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, dur) }() - lockFolder := getTopLevelKey(key) - if lockFolder != "" { - j.kvDirectoryMutex.Lock() - if _, ok := j.kvDirectoryMuxes[lockFolder]; !ok { - j.kvDirectoryMuxes[lockFolder] = &sync.RWMutex{} - } - j.kvDirectoryMutex.Unlock() - j.kvDirectoryMuxes[lockFolder].Lock() - defer j.kvDirectoryMuxes[lockFolder].Unlock() - } - rev, value, err := j.get(ctx, key, 0, true) + // Lock the folder containing this key. + defer d.lockFolder(key)() + + rev, value, err := d.get(ctx, key, 0, true) if err != nil { if err == nats.ErrKeyNotFound { return rev, nil, true, nil @@ -531,12 +569,12 @@ func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (rev return rev, nil, false, err } - deleteRev, err := j.kvBucket.Put(key, deleteEventBytes) + deleteRev, err := d.kv.Put(key, deleteEventBytes) if err != nil { return rev, value.KV, false, nil } - err = j.kvBucket.Delete(key) + err = d.kv.Delete(key) if err != nil { return rev, value.KV, false, nil } @@ -544,16 +582,12 @@ func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (rev return int64(deleteRev), value.KV, true, nil } -func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) { +func (d *Driver) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) { start := time.Now() defer func() { - duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + dur := time.Since(start) fStr := "LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v, duration=%s" - if duration > j.slowMethod { - logrus.Warnf(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.String()) - } else { - logrus.Tracef(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.String()) - } + d.logMethod(dur, fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, dur) }() // its assumed that when there is a start key that that key exists. @@ -563,7 +597,7 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re } } - rev, err := j.currentRevision() + rev, err := d.currentRevision() if err != nil { return 0, nil, err } @@ -576,7 +610,7 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re histories := make(map[string][]nats.KeyValueEntry) var minRev int64 //var innerEntry nats.KeyValueEntry - if entries, err := j.kvBucket.History(startKey, nats.Context(ctx)); err == nil { + if entries, err := d.kv.History(startKey, nats.Context(ctx)); err == nil { histories[startKey] = entries for i := len(entries) - 1; i >= 0; i-- { // find the matching startKey @@ -590,14 +624,14 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re return 0, nil, err } - keys, err := j.getKeys(ctx, prefix, true) + keys, err := d.getKeys(ctx, prefix, true) if err != nil { return 0, nil, err } for _, key := range keys { if key != startKey { - if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil { + if history, err := d.kv.History(key, nats.Context(ctx)); err == nil { histories[key] = history } else { // should not happen @@ -640,14 +674,14 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re if current { - entries, err := j.getKeyValues(ctx, prefix, true) + entries, err := d.getKeyValues(ctx, prefix, true) if err != nil { return 0, nil, err } for _, e := range entries { if count < limit || limit == 0 { kv, err := decode(e) - if !j.isKeyExpired(ctx, e.Created(), &kv) && err == nil { + if !d.isKeyExpired(ctx, e.Created(), &kv) && err == nil { kvs = append(kvs, kv.KV) count++ } @@ -657,7 +691,7 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re } } else { - keys, err := j.getKeys(ctx, prefix, true) + keys, err := d.getKeys(ctx, prefix, true) if err != nil { return 0, nil, err } @@ -667,7 +701,7 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re for _, key := range keys { if count < limit || limit == 0 { - if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil { + if history, err := d.kv.History(key, nats.Context(ctx)); err == nil { for i := len(history) - 1; i >= 0; i-- { if int64(history[i].Revision()) <= revision { if entry, err := decode(history[i]); err == nil { @@ -690,15 +724,15 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re return rev, kvs, nil } -func (j *JetStream) listAfter(ctx context.Context, prefix string, revision int64) (revRet int64, eventRet []*server.Event, errRet error) { +func (d *Driver) listAfter(ctx context.Context, prefix string, revision int64) (revRet int64, eventRet []*server.Event, errRet error) { - entries, err := j.getKeyValues(ctx, prefix, false) + entries, err := d.getKeyValues(ctx, prefix, false) if err != nil { return 0, nil, err } - rev, err := j.currentRevision() + rev, err := d.currentRevision() if err != nil { return 0, nil, err } @@ -715,7 +749,7 @@ func (j *JetStream) listAfter(ctx context.Context, prefix string, revision int64 KV: kv.KV, PrevKV: &server.KeyValue{}, } - if _, prevKV, err := j.Get(ctx, kv.KV.Key, "", 1, kv.PrevRevision); err == nil && prevKV != nil { + if _, prevKV, err := d.Get(ctx, kv.KV.Key, "", 1, kv.PrevRevision); err == nil && prevKV != nil { event.PrevKV = prevKV } @@ -726,58 +760,42 @@ func (j *JetStream) listAfter(ctx context.Context, prefix string, revision int64 } // Count returns an exact count of the number of matching keys and the current revision of the database -func (j *JetStream) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) { +func (d *Driver) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) { start := time.Now() defer func() { - duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + dur := time.Since(start) fStr := "COUNT %s => rev=%d, count=%d, err=%v, duration=%s" - if duration > j.slowMethod { - logrus.Warnf(fStr, prefix, revRet, count, err, duration.String()) - } else { - logrus.Tracef(fStr, prefix, revRet, count, err, duration.String()) - } + d.logMethod(dur, fStr, prefix, revRet, count, err, dur) }() - entries, err := j.getKeys(ctx, prefix, false) + entries, err := d.getKeys(ctx, prefix, false) if err != nil { return 0, 0, err } // current revision - currentRev, err := j.currentRevision() + currentRev, err := d.currentRevision() if err != nil { return 0, 0, err } return currentRev, int64(len(entries)), nil } -func (j *JetStream) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) { +func (d *Driver) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) { start := time.Now() defer func() { - duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond()) + dur := time.Since(start) kvRev := int64(0) if kvRet != nil { kvRev = kvRet.ModRevision } fStr := "UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v, duration=%s" - if duration > j.slowMethod { - logrus.Warnf(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.String()) - } else { - logrus.Tracef(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.String()) - } + d.logMethod(dur, fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, dur) }() - lockFolder := getTopLevelKey(key) - if lockFolder != "" { - j.kvDirectoryMutex.Lock() - if _, ok := j.kvDirectoryMuxes[lockFolder]; !ok { - j.kvDirectoryMuxes[lockFolder] = &sync.RWMutex{} - } - j.kvDirectoryMutex.Unlock() - j.kvDirectoryMuxes[lockFolder].Lock() - defer j.kvDirectoryMuxes[lockFolder].Unlock() - } + // Lock the folder containing the key. + defer d.lockFolder(key)() - rev, prevKV, err := j.get(ctx, key, 0, false) + rev, prevKV, err := d.get(ctx, key, 0, false) if err != nil { if err == nats.ErrKeyNotFound { @@ -814,7 +832,7 @@ func (j *JetStream) Update(ctx context.Context, key string, value []byte, revisi return 0, nil, false, err } - seq, err := j.kvBucket.Put(key, valueBytes) + seq, err := d.kv.Put(key, valueBytes) if err != nil { return 0, nil, false, err } @@ -825,14 +843,14 @@ func (j *JetStream) Update(ctx context.Context, key string, value []byte, revisi } -func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { +func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { - watcher, err := j.kvBucket.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) + watcher, err := d.kv.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) if revision > 0 { revision-- } - _, events, err := j.listAfter(ctx, prefix, revision) + _, events, err := d.listAfter(ctx, prefix, revision) if err != nil { logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision) @@ -872,7 +890,7 @@ func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <- if err != nil { logrus.Warnf("watch event: could not decode %s seq %d", i.Key(), i.Revision()) } - if _, prevEntry, prevErr := j.get(ctx, i.Key(), value.PrevRevision, false); prevErr == nil { + if _, prevEntry, prevErr := d.get(ctx, i.Key(), value.PrevRevision, false); prevErr == nil { if prevEntry != nil { prevValue = *prevEntry } @@ -906,7 +924,7 @@ func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <- // getPreviousEntry returns the nats.KeyValueEntry previous to the one provided, if the previous entry is a nats.KeyValuePut // operation. If it is not a KeyValuePut then it will return nil. -func (j *JetStream) getPreviousEntry(ctx context.Context, entry nats.KeyValueEntry) (result *nats.KeyValueEntry, e error) { +func (d *Driver) getPreviousEntry(ctx context.Context, entry nats.KeyValueEntry) (result *nats.KeyValueEntry, e error) { defer func() { if result != nil { logrus.Debugf("getPreviousEntry %s:%d found=true %d", entry.Key(), entry.Revision(), (*result).Revision()) @@ -915,7 +933,7 @@ func (j *JetStream) getPreviousEntry(ctx context.Context, entry nats.KeyValueEnt } }() found := false - entries, err := j.kvBucket.History(entry.Key(), nats.Context(ctx)) + entries, err := d.kv.History(entry.Key(), nats.Context(ctx)) if err == nil { for idx := len(entries) - 1; idx >= 0; idx-- { if found { @@ -934,8 +952,8 @@ func (j *JetStream) getPreviousEntry(ctx context.Context, entry nats.KeyValueEnt } // DbSize get the kineBucket size from JetStream. -func (j *JetStream) DbSize(context.Context) (int64, error) { - status, err := j.kvBucket.Status() +func (d *Driver) DbSize(context.Context) (int64, error) { + status, err := d.kv.Status() if err != nil { return -1, err } @@ -962,16 +980,16 @@ func decode(e nats.KeyValueEntry) (JSValue, error) { return v, nil } -func (j *JetStream) currentRevision() (int64, error) { - status, err := j.kvBucket.Status() +func (d *Driver) currentRevision() (int64, error) { + status, err := d.kv.Status() if err != nil { return 0, err } return int64(status.(*nats.KeyValueBucketStatus).StreamInfo().State.LastSeq), nil } -func (j *JetStream) compactRevision() (int64, error) { - status, err := j.kvBucket.Status() +func (d *Driver) compactRevision() (int64, error) { + status, err := d.kv.Status() if err != nil { return 0, err } @@ -979,8 +997,8 @@ func (j *JetStream) compactRevision() (int64, error) { } // getKeyValues returns a []nats.KeyValueEntry matching prefix -func (j *JetStream) getKeyValues(ctx context.Context, prefix string, sortResults bool) ([]nats.KeyValueEntry, error) { - watcher, err := j.kvBucket.Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) +func (d *Driver) getKeyValues(ctx context.Context, prefix string, sortResults bool) ([]nats.KeyValueEntry, error) { + watcher, err := d.kv.Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) if err != nil { return nil, err } @@ -1009,8 +1027,8 @@ func (j *JetStream) getKeyValues(ctx context.Context, prefix string, sortResults } // getKeys returns a list of keys matching a prefix -func (j *JetStream) getKeys(ctx context.Context, prefix string, sortResults bool) ([]string, error) { - watcher, err := j.kvBucket.Watch(prefix, nats.MetaOnly(), nats.IgnoreDeletes(), nats.Context(ctx)) +func (d *Driver) getKeys(ctx context.Context, prefix string, sortResults bool) ([]string, error) { + watcher, err := d.kv.Watch(prefix, nats.MetaOnly(), nats.IgnoreDeletes(), nats.Context(ctx)) if err != nil { return nil, err } @@ -1036,11 +1054,3 @@ func (j *JetStream) getKeys(ctx context.Context, prefix string, sortResults bool return keys, nil } - -func getTopLevelKey(key string) string { - if toplevelKeyMatch.MatchString(key) { - matches := toplevelKeyMatch.FindStringSubmatch(key) - return matches[1] - } - return "" -} diff --git a/pkg/drivers/nats/server/no_server.go b/pkg/drivers/nats/server/no_server.go index be5faf54..60c8f44b 100644 --- a/pkg/drivers/nats/server/no_server.go +++ b/pkg/drivers/nats/server/no_server.go @@ -7,6 +7,6 @@ import ( "errors" ) -func New(configFile string) (Server, error) { - return nil, errors.New(`this binary is built without embedded nats support, compile with "-tags nats"`) +func New(configFile string, stdoutLogging bool) (Server, error) { + return nil, errors.New(`this binary is built without embedded NATS support, compile with "-tags nats"`) } diff --git a/pkg/drivers/nats/server/server.go b/pkg/drivers/nats/server/server.go index 4ff4a093..4942e9a9 100644 --- a/pkg/drivers/nats/server/server.go +++ b/pkg/drivers/nats/server/server.go @@ -9,14 +9,14 @@ import ( "github.com/nats-io/nats-server/v2/server" ) -func New(configFile string) (Server, error) { +func New(configFile string, stdoutLogging bool) (Server, error) { opts := &server.Options{} if configFile == "" { // TODO: Other defaults for easy single node config? opts.JetStream = true } else { - // Parse the server config file as options + // Parse the server config file as options. var err error opts, err = server.ProcessConfigFile(configFile) if err != nil { @@ -24,5 +24,10 @@ func New(configFile string) (Server, error) { } } - return server.NewServer(opts) + srv, err := server.NewServer(opts) + if stdoutLogging { + srv.ConfigureLogger() + } + + return srv, err } From c7c96ba1a2edcc258ae2062e30da8b169973c332 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 1 Feb 2023 11:14:22 -0500 Subject: [PATCH 04/18] Remove locking Signed-off-by: Byron Ruth --- pkg/drivers/nats/nats.go | 32 -------------------------------- 1 file changed, 32 deletions(-) diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index 3d8606c5..7d449f4f 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -11,7 +11,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/k3s-io/kine/pkg/drivers/nats/kv" @@ -60,9 +59,6 @@ type Driver struct { js nats.JetStreamContext kv nats.KeyValue - dirMu *sync.RWMutex - subMus map[string]*sync.RWMutex - slowThreshold time.Duration } @@ -82,23 +78,6 @@ func getTopLevelKey(key string) string { return "" } -func (d *Driver) lockFolder(key string) (unlock func()) { - lockFolder := getTopLevelKey(key) - if lockFolder == "" { - return func() {} - } - - d.dirMu.Lock() - mu, ok := d.subMus[lockFolder] - if !ok { - mu = &sync.RWMutex{} - d.subMus[lockFolder] = mu - } - d.dirMu.Unlock() - mu.Lock() - return mu.Unlock -} - type JSValue struct { KV *server.KeyValue PrevRevision int64 @@ -218,8 +197,6 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac return &Driver{ kv: kvB, - dirMu: &sync.RWMutex{}, - subMus: make(map[string]*sync.RWMutex), js: js, slowThreshold: config.slowThreshold, }, nil @@ -480,9 +457,6 @@ func (d *Driver) Create(ctx context.Context, key string, value []byte, lease int d.logMethod(dur, fStr, key, len(value), lease, revRet, errRet, dur) }() - // Lock the folder containing this key. - defer d.lockFolder(key)() - // check if key exists already rev, prevKV, err := d.get(ctx, key, 0, true) if err != nil && err != nats.ErrKeyNotFound { @@ -536,9 +510,6 @@ func (d *Driver) Delete(ctx context.Context, key string, revision int64) (revRet d.logMethod(dur, fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, dur) }() - // Lock the folder containing this key. - defer d.lockFolder(key)() - rev, value, err := d.get(ctx, key, 0, true) if err != nil { if err == nats.ErrKeyNotFound { @@ -792,9 +763,6 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision, d.logMethod(dur, fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, dur) }() - // Lock the folder containing the key. - defer d.lockFolder(key)() - rev, prevKV, err := d.get(ctx, key, 0, false) if err != nil { From 3e4e69a4bd7120c1610cd1ec349b1404ba417b9e Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 14 Apr 2023 12:46:57 -0400 Subject: [PATCH 05/18] Revert "Remove locking" This reverts commit e22b756b1fd4fe6aefc20365eceba08442465a6e. Signed-off-by: Byron Ruth --- pkg/drivers/nats/nats.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index 7d449f4f..3d8606c5 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -11,6 +11,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/k3s-io/kine/pkg/drivers/nats/kv" @@ -59,6 +60,9 @@ type Driver struct { js nats.JetStreamContext kv nats.KeyValue + dirMu *sync.RWMutex + subMus map[string]*sync.RWMutex + slowThreshold time.Duration } @@ -78,6 +82,23 @@ func getTopLevelKey(key string) string { return "" } +func (d *Driver) lockFolder(key string) (unlock func()) { + lockFolder := getTopLevelKey(key) + if lockFolder == "" { + return func() {} + } + + d.dirMu.Lock() + mu, ok := d.subMus[lockFolder] + if !ok { + mu = &sync.RWMutex{} + d.subMus[lockFolder] = mu + } + d.dirMu.Unlock() + mu.Lock() + return mu.Unlock +} + type JSValue struct { KV *server.KeyValue PrevRevision int64 @@ -197,6 +218,8 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac return &Driver{ kv: kvB, + dirMu: &sync.RWMutex{}, + subMus: make(map[string]*sync.RWMutex), js: js, slowThreshold: config.slowThreshold, }, nil @@ -457,6 +480,9 @@ func (d *Driver) Create(ctx context.Context, key string, value []byte, lease int d.logMethod(dur, fStr, key, len(value), lease, revRet, errRet, dur) }() + // Lock the folder containing this key. + defer d.lockFolder(key)() + // check if key exists already rev, prevKV, err := d.get(ctx, key, 0, true) if err != nil && err != nats.ErrKeyNotFound { @@ -510,6 +536,9 @@ func (d *Driver) Delete(ctx context.Context, key string, revision int64) (revRet d.logMethod(dur, fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, dur) }() + // Lock the folder containing this key. + defer d.lockFolder(key)() + rev, value, err := d.get(ctx, key, 0, true) if err != nil { if err == nats.ErrKeyNotFound { @@ -763,6 +792,9 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision, d.logMethod(dur, fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, dur) }() + // Lock the folder containing the key. + defer d.lockFolder(key)() + rev, prevKV, err := d.get(ctx, key, 0, false) if err != nil { From b6289cccd0cb4cfc7d5922044c146c522e37a029 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 14 Apr 2023 14:23:21 -0400 Subject: [PATCH 06/18] Update NATS versions and add embedded test Signed-off-by: Byron Ruth --- go.mod | 18 ++++++++--------- go.sum | 37 +++++++++++++++++----------------- scripts/test | 5 ++++- scripts/test-run-nats | 4 ++-- scripts/test-run-nats-embedded | 13 ++++++++++++ 5 files changed, 47 insertions(+), 30 deletions(-) create mode 100755 scripts/test-run-nats-embedded diff --git a/go.mod b/go.mod index 1240d9ca..664c844a 100644 --- a/go.mod +++ b/go.mod @@ -8,11 +8,11 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.3.1 - github.com/klauspost/compress v1.15.11 + github.com/klauspost/compress v1.16.4 github.com/mattn/go-sqlite3 v1.14.15 github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee - github.com/nats-io/nats-server/v2 v2.9.11 - github.com/nats-io/nats.go v1.19.0 + github.com/nats-io/nats-server/v2 v2.9.15 + github.com/nats-io/nats.go v1.25.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 github.com/rancher/wrangler v0.8.3 @@ -51,8 +51,8 @@ require ( github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.3.0 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/jwt/v2 v2.4.1 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -80,10 +80,10 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.6.0 // indirect - golang.org/x/net v0.6.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/text v0.7.0 // indirect + golang.org/x/crypto v0.8.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index 61fa73af..e6bd4e68 100644 --- a/go.sum +++ b/go.sum @@ -392,8 +392,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= -github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= +github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -453,16 +453,17 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+ github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee h1:+l6i7zS8N1LOokm7dzShezI9STRGrzp0O49Pw8Jetdk= github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee/go.mod h1:EKSYvbvWAoh0hIfuZ+ieWm8u0VOTRTeDfuQvNPKRqEg= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= -github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc= -github.com/nats-io/nats-server/v2 v2.9.11 h1:4y5SwWvWI59V5mcqtuoqKq6L9NDUydOP3Ekwuwl8cZI= -github.com/nats-io/nats-server/v2 v2.9.11/go.mod h1:b0oVuxSlkvS3ZjMkncFeACGyZohbO4XhSqW1Lt7iRRY= +github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c= +github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE= github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nats.go v1.19.0 h1:H6j8aBnTQFoVrTGB6Xjd903UMdE7jz6DS4YkmAqgZ9Q= -github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= +github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= @@ -696,8 +697,8 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= -golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -783,8 +784,8 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -863,8 +864,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -877,8 +878,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -942,7 +943,7 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/scripts/test b/scripts/test index 3f2d3384..52763089 100755 --- a/scripts/test +++ b/scripts/test @@ -27,6 +27,9 @@ echo "Did test-run-postgres $?" echo "Did test-run-cockroachdb $?" . ./scripts/test-run-nats -echo "Did test-nats$?" +echo "did test-nats$?" + +. ./scripts/test-run-nats-embedded +echo "did test-nats-embedded$?" exit 0 diff --git a/scripts/test-run-nats b/scripts/test-run-nats index d63c191d..1cf5ba07 100755 --- a/scripts/test-run-nats +++ b/scripts/test-run-nats @@ -5,7 +5,7 @@ start-test() { local port=$(cat $TEST_DIR/databases/*/metadata/port) local pass=$(cat $TEST_DIR/databases/*/metadata/password) local image=$(cat $TEST_DIR/databases/*/metadata/image) - DB_CONNECTION_TEST="docker container run --rm --name connection-test --entrypoint /usr/local/bin/nats docker.io/natsio/nats-box:0.13.0 server check connection --server=nats://$ip:$port" \ + DB_CONNECTION_TEST="docker container run --rm --name connection-test --entrypoint /usr/local/bin/nats docker.io/natsio/nats-box:0.13.8 server check connection --server=nats://$ip:$port" \ timeout --foreground 1m bash -c "wait-for-db-connection" KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port" provision-kine local kine_url=$(cat $TEST_DIR/kine/*/metadata/url) @@ -17,7 +17,7 @@ export -f start-test VERSION_LIST="\ nats 2.7.4 nats 2.8.4 - nats 2.9.11" + nats 2.9.15" while read ENGINE VERSION; do LABEL=$ENGINE-$VERSION DB_PASSWORD_ENV=NATS_JS_PASSWORD DB_ARGS="-js" DB_IMAGE=docker.io/library/$ENGINE:$VERSION run-test diff --git a/scripts/test-run-nats-embedded b/scripts/test-run-nats-embedded new file mode 100755 index 00000000..748619d0 --- /dev/null +++ b/scripts/test-run-nats-embedded @@ -0,0 +1,13 @@ +#!/bin/bash + +start-test() { + local ip=$(cat $TEST_DIR/databases/*/metadata/ip) + local port=$(cat $TEST_DIR/databases/*/metadata/port) + KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port?embedServer" provision-kine + local kine_url=$(cat $TEST_DIR/kine/*/metadata/url) + K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster +} +export -f start-test + +LABEL=$ENGINE-$VERSION-embed run-test + From 2d6e699d6b8d95280233a5a88a5278b9c42d2de6 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 14 Apr 2023 14:23:36 -0400 Subject: [PATCH 07/18] Add nats build tag for tests Signed-off-by: Byron Ruth --- scripts/build | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/build b/scripts/build index 54004ee1..684c7fea 100755 --- a/scripts/build +++ b/scripts/build @@ -19,9 +19,9 @@ LINKFLAGS="-X github.com/k3s-io/kine/pkg/version.Version=$VERSION" LINKFLAGS="-X github.com/k3s-io/kine/pkg/version.GitCommit=$COMMIT $LINKFLAGS" echo Building Kine -CGO_CFLAGS="-DSQLITE_ENABLE_DBSTAT_VTAB=1 -DSQLITE_USE_ALLOCA=1" go build -ldflags "$LINKFLAGS $OTHER_LINKFLAGS" -o bin/kine +CGO_CFLAGS="-DSQLITE_ENABLE_DBSTAT_VTAB=1 -DSQLITE_USE_ALLOCA=1" go build -ldflags "$LINKFLAGS $OTHER_LINKFLAGS" -tags nats -o bin/kine if [ "$CROSS" = "true" ] && [ "$ARCH" = "amd64" ]; then - GOOS=darwin go build -ldflags "$LINKFLAGS" -o bin/kine-darwin - GOOS=windows go build -ldflags "$LINKFLAGS" -o bin/kine-windows - GOOS=linux go build -ldflags "$LINKFLAGS" -o bin/kine-amd64 + GOOS=darwin go build -ldflags "$LINKFLAGS" -tags nats -o bin/kine-darwin + GOOS=windows go build -ldflags "$LINKFLAGS" -tags nats -o bin/kine-windows + GOOS=linux go build -ldflags "$LINKFLAGS" -tags nats -o bin/kine-amd64 fi From ae5db325698f2672a0fa8f93406ec5e31e08dd98 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 14 Apr 2023 15:08:33 -0400 Subject: [PATCH 08/18] Support embedded socket connection Signed-off-by: Byron Ruth --- pkg/drivers/nats/nats.go | 14 +++++++++++--- pkg/drivers/nats/server/interface.go | 6 +++++- pkg/drivers/nats/server/no_server.go | 2 +- pkg/drivers/nats/server/server.go | 5 ++++- scripts/test | 3 +++ scripts/test-run-nats-embedded | 2 +- scripts/test-run-nats-socket | 13 +++++++++++++ 7 files changed, 38 insertions(+), 7 deletions(-) create mode 100755 scripts/test-run-nats-socket diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index 3d8606c5..d5bea7f8 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -49,6 +49,8 @@ type Config struct { slowThreshold time.Duration // If true, a server will be embedded and started. embedServer bool + // If true, use a socket for the embedded server. + dontListen bool // Path to a server configuration file when embedded. serverConfig string // If true, the embedded server will log to stdout. @@ -151,14 +153,21 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac return nil, err } + nopts := append(config.clientOptions, nats.Name("k3s-server using bucket: "+config.bucket)) + // Run an embedded server. if config.embedServer { logrus.Infof("using an embedded NATS server") - ns, err := natsserver.New(config.serverConfig, config.stdoutLogging) + ns, err := natsserver.New(config.serverConfig, config.dontListen, config.stdoutLogging) if err != nil { return nil, fmt.Errorf("failed to create embedded NATS server: %w", err) } + + if config.dontListen { + nopts = append(nopts, nats.InProcessServer(ns)) + } + // Start the server. go ns.Start() logrus.Infof("started embedded NATS server") @@ -187,8 +196,6 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac logrus.Infof("using bucket: %s", config.bucket) logrus.Infof("connecting to %s", config.clientURL) - nopts := append(config.clientOptions, nats.Name("k3s-server using bucket: "+config.bucket)) - conn, err := nats.Connect(config.clientURL, nopts...) if err != nil { return nil, fmt.Errorf("failed to connect to NATS server: %w", err) @@ -347,6 +354,7 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { config.embedServer = true config.serverConfig = queryMap.Get("serverConfig") config.stdoutLogging = queryMap.Has("stdoutLogging") + config.dontListen = queryMap.Has("dontListen") } logrus.Debugf("using config %#v", config) diff --git a/pkg/drivers/nats/server/interface.go b/pkg/drivers/nats/server/interface.go index e97ebe2d..01a1397c 100644 --- a/pkg/drivers/nats/server/interface.go +++ b/pkg/drivers/nats/server/interface.go @@ -1,10 +1,14 @@ package server -import "time" +import ( + "net" + "time" +) type Server interface { Start() Shutdown() ClientURL() string ReadyForConnections(wait time.Duration) bool + InProcessConn() (net.Conn, error) } diff --git a/pkg/drivers/nats/server/no_server.go b/pkg/drivers/nats/server/no_server.go index 60c8f44b..b41aa18e 100644 --- a/pkg/drivers/nats/server/no_server.go +++ b/pkg/drivers/nats/server/no_server.go @@ -7,6 +7,6 @@ import ( "errors" ) -func New(configFile string, stdoutLogging bool) (Server, error) { +func New(_ string, _, _ bool) (Server, error) { return nil, errors.New(`this binary is built without embedded NATS support, compile with "-tags nats"`) } diff --git a/pkg/drivers/nats/server/server.go b/pkg/drivers/nats/server/server.go index 4942e9a9..de79824d 100644 --- a/pkg/drivers/nats/server/server.go +++ b/pkg/drivers/nats/server/server.go @@ -9,7 +9,7 @@ import ( "github.com/nats-io/nats-server/v2/server" ) -func New(configFile string, stdoutLogging bool) (Server, error) { +func New(configFile string, dontListen, stdoutLogging bool) (Server, error) { opts := &server.Options{} if configFile == "" { @@ -24,6 +24,9 @@ func New(configFile string, stdoutLogging bool) (Server, error) { } } + opts.DontListen = dontListen + opts.Port = 0 + srv, err := server.NewServer(opts) if stdoutLogging { srv.ConfigureLogger() diff --git a/scripts/test b/scripts/test index 52763089..4d6173bb 100755 --- a/scripts/test +++ b/scripts/test @@ -32,4 +32,7 @@ echo "did test-nats$?" . ./scripts/test-run-nats-embedded echo "did test-nats-embedded$?" +. ./scripts/test-run-nats-socket +echo "did test-nats-socket$?" + exit 0 diff --git a/scripts/test-run-nats-embedded b/scripts/test-run-nats-embedded index 748619d0..7922b1bc 100755 --- a/scripts/test-run-nats-embedded +++ b/scripts/test-run-nats-embedded @@ -9,5 +9,5 @@ start-test() { } export -f start-test -LABEL=$ENGINE-$VERSION-embed run-test +LABEL=nats-embedded run-test diff --git a/scripts/test-run-nats-socket b/scripts/test-run-nats-socket new file mode 100755 index 00000000..c9ebc66d --- /dev/null +++ b/scripts/test-run-nats-socket @@ -0,0 +1,13 @@ +#!/bin/bash + +start-test() { + local ip=$(cat $TEST_DIR/databases/*/metadata/ip) + local port=$(cat $TEST_DIR/databases/*/metadata/port) + KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port?embedServer&dontListen" provision-kine + local kine_url=$(cat $TEST_DIR/kine/*/metadata/url) + K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster +} +export -f start-test + +LABEL=nats-socket run-test + From c6f31e5c9ab54ff14722ec9bbe7858f76c7a955b Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 17 Apr 2023 16:08:59 -0400 Subject: [PATCH 09/18] Update to NATS v2.9.16 Signed-off-by: Byron Ruth --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 664c844a..e11f7b18 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,10 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.3.1 - github.com/klauspost/compress v1.16.4 + github.com/klauspost/compress v1.16.5 github.com/mattn/go-sqlite3 v1.14.15 github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee - github.com/nats-io/nats-server/v2 v2.9.15 + github.com/nats-io/nats-server/v2 v2.9.16 github.com/nats-io/nats.go v1.25.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index e6bd4e68..2d719dff 100644 --- a/go.sum +++ b/go.sum @@ -392,8 +392,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= -github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -456,8 +456,8 @@ github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hl github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc= -github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c= -github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE= +github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc= +github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0= github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= From 4c0f2763076dcdbb99ec5c3dd0aae24956ac18f4 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 17 Apr 2023 16:09:18 -0400 Subject: [PATCH 10/18] Remove unused variables Signed-off-by: Byron Ruth --- scripts/test-run-nats-socket | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/scripts/test-run-nats-socket b/scripts/test-run-nats-socket index c9ebc66d..34187236 100755 --- a/scripts/test-run-nats-socket +++ b/scripts/test-run-nats-socket @@ -1,9 +1,7 @@ #!/bin/bash start-test() { - local ip=$(cat $TEST_DIR/databases/*/metadata/ip) - local port=$(cat $TEST_DIR/databases/*/metadata/port) - KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port?embedServer&dontListen" provision-kine + KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://?embedServer&dontListen" provision-kine local kine_url=$(cat $TEST_DIR/kine/*/metadata/url) K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster } From ba7bd51cfa50d0603232964ffca72595c4cb4ffd Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 17 Apr 2023 16:09:36 -0400 Subject: [PATCH 11/18] Add backend "jetstream://" URI support Signed-off-by: Byron Ruth --- pkg/endpoint/endpoint.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index eb0e85dc..7dbf1c84 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -27,13 +27,14 @@ import ( ) const ( - KineSocket = "unix://kine.sock" - SQLiteBackend = "sqlite" - DQLiteBackend = "dqlite" - ETCDBackend = "etcd3" - NATSBackend = "nats" - MySQLBackend = "mysql" - PostgresBackend = "postgres" + KineSocket = "unix://kine.sock" + SQLiteBackend = "sqlite" + DQLiteBackend = "dqlite" + ETCDBackend = "etcd3" + JetStreamBackend = "jetstream" + NATSBackend = "nats" + MySQLBackend = "mysql" + PostgresBackend = "postgres" ) type Config struct { @@ -246,7 +247,7 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) case MySQLBackend: backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer) - case NATSBackend: + case NATSBackend, JetStreamBackend: backend, err = nats.New(ctx, dsn, cfg.BackendTLSConfig) default: return false, nil, fmt.Errorf("storage backend is not defined") From edf479b026faef6afc4341865f32807d14baf302 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 17 Apr 2023 16:52:09 -0400 Subject: [PATCH 12/18] Remove override of port Signed-off-by: Byron Ruth --- pkg/drivers/nats/server/server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/drivers/nats/server/server.go b/pkg/drivers/nats/server/server.go index de79824d..8d3096ad 100644 --- a/pkg/drivers/nats/server/server.go +++ b/pkg/drivers/nats/server/server.go @@ -25,7 +25,6 @@ func New(configFile string, dontListen, stdoutLogging bool) (Server, error) { } opts.DontListen = dontListen - opts.Port = 0 srv, err := server.NewServer(opts) if stdoutLogging { From 686f79869b8fa1b596efa5c69422fc538db35b11 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 19 Apr 2023 13:40:49 -0400 Subject: [PATCH 13/18] Add examples doc Signed-off-by: Byron Ruth --- examples/nats.md | 62 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 examples/nats.md diff --git a/examples/nats.md b/examples/nats.md new file mode 100644 index 00000000..27e05deb --- /dev/null +++ b/examples/nats.md @@ -0,0 +1,62 @@ +# Using the NATS backend + +[NATS](https://nats.io) is a high-performance connective technology supporting a spectrum of messaging, streaming, and materialization capabilities. + +One supported materialization a key-value store which is what is used in the implementation of the KINE backend. + +There are two options for integrating NATS: + +- As an external server or cluster KINE connects to +- As an embedded server within KINE itself + +## Configuring KINE + +This is done by specifying the `--endpoint` option having the following format where everything but the `nats://` scheme is optional. + +``` +nats://[[[@]]:][?]` +``` + +The above tokens are defined as follows: + +- `auth` - Inline username and password or standlaone token. Defaults is nothing. +- `host` - Hostname or IP address the NATS server is addressed to. Default is `127.0.0.1`. +- `port` - Port to bind the connection with the NATS server. Default is `4222`. +- `params` - Query parameters which are listed below with defaults. + +The following query parameters are supported: + +- `bucket` - Specifies the name of the NATS key-value bucket. Default is `kine`. +- `replicas` - Specifies the number of replicas for the bucket. Default is `1`. +- `revHistory` - Specifies the number of revisions to keep in history. Default is `10`. +- `slowMethod` - Specifies the duration of a method before it is considered slow. Default is `500ms`. +- `contextFile` - Specifies the path to a NATS context file. If this is provided, the `auth` and `host` should not be provided. See the available [options](https://docs.nats.io/using-nats/nats-tools/nats_cli#configuration-contexts). Default is nothing. +- `embedServer` - Specifies whether to embed a NATS server rather than relying on an external one. Default is `false`. +- `serverConfig` - Specifies the path to a NATS server configuration file if `embedServer` is `true`. Defaults to nothing. +- `dontListen` - Specifies whether the embedded NATS server should not rely on the TCP stack for a local client connection. Defaults to `false` +- `stdoutLogging` - Specifies whether to log to STDOUT if `embedServer` is `true`. Default is false. + +## Examples + +### External + +- `nats://` +- `nats://localhost:4222?replicas=3` +- `nats://?contextFile=/path/to/context.json` +- `nats://user:pass@localhost:4222` +- `nats://token@localhost` + +Multiple URLs can be passed in a comma separated format, however only the first URL +in the list will be evaluated for query parameters. For example: + +``` +nats://nats-1.example.com?bucket=k3s,nats://nats-2.example.com,nats://nats-3.example.com +``` + +However, if a `contextFile` is feasible to specify, this is recommended rather than an inline comma-separated list. + +### Embedded + +- `nats://?embedServer` +- `nats://?embedServer&serverConfig=/path/to/server.conf` +- `nats://?embedServer&dontListen` From e8464102319b3114a7bc4e29de4b99e61a92ef9e Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 19 Apr 2023 15:52:22 -0400 Subject: [PATCH 14/18] Remove IP and port Co-authored-by: Brad Davidson Signed-off-by: Byron Ruth --- scripts/test-run-nats-embedded | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/scripts/test-run-nats-embedded b/scripts/test-run-nats-embedded index 7922b1bc..eae7e7a3 100755 --- a/scripts/test-run-nats-embedded +++ b/scripts/test-run-nats-embedded @@ -1,9 +1,7 @@ #!/bin/bash start-test() { - local ip=$(cat $TEST_DIR/databases/*/metadata/ip) - local port=$(cat $TEST_DIR/databases/*/metadata/port) - KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port?embedServer" provision-kine + KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://?embedServer" provision-kine local kine_url=$(cat $TEST_DIR/kine/*/metadata/url) K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster } From 0ff130f95f3cec8fbaec7cf5e5b281524654a0fd Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 19 Apr 2023 15:53:15 -0400 Subject: [PATCH 15/18] Fix whitespace Co-authored-by: Brad Davidson Signed-off-by: Byron Ruth --- scripts/test | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/test b/scripts/test index 4d6173bb..15d7814e 100755 --- a/scripts/test +++ b/scripts/test @@ -27,12 +27,12 @@ echo "Did test-run-postgres $?" echo "Did test-run-cockroachdb $?" . ./scripts/test-run-nats -echo "did test-nats$?" +echo "did test-nats $?" . ./scripts/test-run-nats-embedded -echo "did test-nats-embedded$?" +echo "did test-nats-embedded $?" . ./scripts/test-run-nats-socket -echo "did test-nats-socket$?" +echo "did test-nats-socket $?" exit 0 From 4fb670b96ae4454e3a3bcd9f0c909f68e43ad6ff Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Thu, 20 Apr 2023 17:22:18 -0400 Subject: [PATCH 16/18] Default to embedded it available Fix log line for embedded. Signed-off-by: Byron Ruth --- pkg/drivers/nats/nats.go | 5 +++-- pkg/drivers/nats/server/no_server.go | 4 ++++ pkg/drivers/nats/server/server.go | 4 ++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index d5bea7f8..64b3a333 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -191,10 +191,11 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac // Use the local server's client URL. config.clientURL = ns.ClientURL() + } else { + logrus.Infof("connecting to %s", config.clientURL) } logrus.Infof("using bucket: %s", config.bucket) - logrus.Infof("connecting to %s", config.clientURL) conn, err := nats.Connect(config.clientURL, nopts...) if err != nil { @@ -350,7 +351,7 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { config.clientURL = connBuilder.String() // If this option is set, consider the server-specific options. - if queryMap.Has("embedServer") { + if queryMap.Has("embedServer") || (natsserver.Embedded && dsn == "nats://") { config.embedServer = true config.serverConfig = queryMap.Get("serverConfig") config.stdoutLogging = queryMap.Has("stdoutLogging") diff --git a/pkg/drivers/nats/server/no_server.go b/pkg/drivers/nats/server/no_server.go index b41aa18e..73c9a1b0 100644 --- a/pkg/drivers/nats/server/no_server.go +++ b/pkg/drivers/nats/server/no_server.go @@ -7,6 +7,10 @@ import ( "errors" ) +const ( + Embedded = false +) + func New(_ string, _, _ bool) (Server, error) { return nil, errors.New(`this binary is built without embedded NATS support, compile with "-tags nats"`) } diff --git a/pkg/drivers/nats/server/server.go b/pkg/drivers/nats/server/server.go index 8d3096ad..fb4c72be 100644 --- a/pkg/drivers/nats/server/server.go +++ b/pkg/drivers/nats/server/server.go @@ -9,6 +9,10 @@ import ( "github.com/nats-io/nats-server/v2/server" ) +const ( + Embedded = true +) + func New(configFile string, dontListen, stdoutLogging bool) (Server, error) { opts := &server.Options{} From 14734aa9f0a51277ccccef46f40294e9d4dac070 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 24 Apr 2023 09:09:35 -0400 Subject: [PATCH 17/18] Change to use noEmbed to disable embedding Signed-off-by: Byron Ruth --- examples/nats.md | 65 +++++++++++++--------- pkg/drivers/nats/nats.go | 83 +++++++++++----------------- pkg/drivers/nats/server/interface.go | 8 +++ pkg/drivers/nats/server/no_server.go | 2 +- pkg/drivers/nats/server/server.go | 26 ++++++--- 5 files changed, 99 insertions(+), 85 deletions(-) diff --git a/examples/nats.md b/examples/nats.md index 27e05deb..364e018b 100644 --- a/examples/nats.md +++ b/examples/nats.md @@ -1,12 +1,18 @@ # Using the NATS backend -[NATS](https://nats.io) is a high-performance connective technology supporting a spectrum of messaging, streaming, and materialization capabilities. +[NATS](https://nats.io) is a high-performance connective technology supporting a spectrum of messaging, streaming, and state materialization capabilities. -One supported materialization a key-value store which is what is used in the implementation of the KINE backend. +The Key-Value (KV) capability is used in the KINE backend and supports: -There are two options for integrating NATS: +- Optimized get, put, create, and update commands + - create expects the key to not exist (or was previously deleted) + - update requires a version number used for OCC against the key +- Historical and versioned values per key +- Subscription of changes for the entire KV store a subset of keys -- As an external server or cluster KINE connects to +There are two options for using NATS: + +- As an external NATS deployment KINE connects to - As an embedded server within KINE itself ## Configuring KINE @@ -19,44 +25,53 @@ nats://[[[@]]:][?]` The above tokens are defined as follows: -- `auth` - Inline username and password or standlaone token. Defaults is nothing. -- `host` - Hostname or IP address the NATS server is addressed to. Default is `127.0.0.1`. +- `host` - Hostname or IP address the NATS server is addressed to. Default is `0.0.0.0`. - `port` - Port to bind the connection with the NATS server. Default is `4222`. +- `auth` - Inline username and password or standlaone token. Defaults is nothing. - `params` - Query parameters which are listed below with defaults. -The following query parameters are supported: +The following client query parameters are supported: - `bucket` - Specifies the name of the NATS key-value bucket. Default is `kine`. - `replicas` - Specifies the number of replicas for the bucket. Default is `1`. - `revHistory` - Specifies the number of revisions to keep in history. Default is `10`. - `slowMethod` - Specifies the duration of a method before it is considered slow. Default is `500ms`. - `contextFile` - Specifies the path to a NATS context file. If this is provided, the `auth` and `host` should not be provided. See the available [options](https://docs.nats.io/using-nats/nats-tools/nats_cli#configuration-contexts). Default is nothing. -- `embedServer` - Specifies whether to embed a NATS server rather than relying on an external one. Default is `false`. -- `serverConfig` - Specifies the path to a NATS server configuration file if `embedServer` is `true`. Defaults to nothing. + +These query parameters are relevant when the server is embedded: + +- `serverConfig` - Specifies the path to a NATS server configuration file in embedded mode. Defaults to nothing. - `dontListen` - Specifies whether the embedded NATS server should not rely on the TCP stack for a local client connection. Defaults to `false` -- `stdoutLogging` - Specifies whether to log to STDOUT if `embedServer` is `true`. Default is false. +- `noEmbed` - Specifies whether to explicitly _not_ run an embedded server and instead use an external server. Default is `false`. +- `stdoutLogging` - Specifies whether to log to STDOUT if running in embedded mode. Default is false. -## Examples +### Examples -### External +KINE can be built with our without embedding NATS. -- `nats://` -- `nats://localhost:4222?replicas=3` -- `nats://?contextFile=/path/to/context.json` -- `nats://user:pass@localhost:4222` -- `nats://token@localhost` +If NATS is embedded, `nats://` defaults to running an embedded server using the default `host` and `port`. -Multiple URLs can be passed in a comma separated format, however only the first URL -in the list will be evaluated for query parameters. For example: +For an embedded server, a path to a custom server configuration file can be provided. + +``` +nats://?serverConfig=/path/to/server.conf +``` + +If there is a need to disable the TCP listener, specify `dontListen` and the local client connection will talk to the server over a socket. ``` -nats://nats-1.example.com?bucket=k3s,nats://nats-2.example.com,nats://nats-3.example.com +nats://?dontListen ``` -However, if a `contextFile` is feasible to specify, this is recommended rather than an inline comma-separated list. +If an external NATS server should be used, then `noEmbed` can be set to explicitly not run an embedded server. -### Embedded +``` +nats://?noEmbed +``` -- `nats://?embedServer` -- `nats://?embedServer&serverConfig=/path/to/server.conf` -- `nats://?embedServer&dontListen` +Multiple URLs can be passed in a comma separated format, however only the first URL +in the list will be evaluated for query parameters. For example: + +``` +nats://nats-1.example.com,nats://nats-2.example.com,nats://nats-3.example.com +``` diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index 64b3a333..507278f9 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -47,14 +47,18 @@ type Config struct { replicas int // Indicates the duration of a method before it is considered slow. Defaults to 500ms. slowThreshold time.Duration - // If true, a server will be embedded and started. - embedServer bool + // If true, an embedded server will not be used. + noEmbed bool // If true, use a socket for the embedded server. dontListen bool // Path to a server configuration file when embedded. serverConfig string // If true, the embedded server will log to stdout. stdoutLogging bool + // The explicit host to listen on when embedded. + host string + // The explicit port to listen on when embedded. + port int } type Driver struct { @@ -109,57 +113,26 @@ type JSValue struct { } // New return an implementation of server.Backend using NATS + JetStream. -// Various connection formats are supported using the nats:// scheme -// and having the following format: -// -// nats://@:?` -// -// - auth - optional and can be user/pass or token -// - hostname:port - specifies the NATS server address -// - params - optional query parameters -// -// The following query parameters are supported: -// -// - bucket - specifies the name of the NATS key-value bucket. Default is "kine" -// - replicas - specifies the number of replicas for the bucket. Default is 1 -// - revHistory - specifies the number of revisions to keep in history. Default is 10 -// - slowMethod - specifies the duration of a method before it is considered slow. Default is 500ms -// - contextFile - specifies the path to a NATS context file -// - embedServer - specifies whether to embed a NATS server. Default is false. -// - serverConfig - specifies the path to a NATS server configuration file if embedServer is true. -// - stdoutLogging - specifies whether to log to stdout if embedServer is true. Default is false. -// -// If contextFile is provided then hostname:port should not be provided. -// -// Multiple URLs can be passed in a comma separated format, however only the first URL -// in the list will be evaluated for query parameters. While auth is valid in the URL, the -// recommended way to pass auth is through a context file. If user/pass or token are provided -// in the URL only the first one will be used for all URLs. -// -// See https://docs.nats.io/using-nats/nats-tools/nats_cli#configuration-contexts for more information -// on configuring a NATS context file. -// -// Examples: -// -// - nats://localhost:4222?replicas=3 -// - nats://?contextFile=/path/to/context.json -// - nats://user:pass@localhost:4222 -// - nats://token@localhost:4222 -// - nats://?embedServer=true -// - nats://?embedServer=true&serverConfig=/path/to/server.conf +// See the `examples/nats.md` file for examples of connection strings. func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Backend, error) { - config, err := parseNatsConnection(connection, tlsInfo) + config, err := parseConnection(connection, tlsInfo) if err != nil { return nil, err } - nopts := append(config.clientOptions, nats.Name("k3s-server using bucket: "+config.bucket)) + nopts := append(config.clientOptions, nats.Name("kine using bucket: "+config.bucket)) - // Run an embedded server. - if config.embedServer { + // Run an embedded server if available and not disabled. + if natsserver.Embedded && !config.noEmbed { logrus.Infof("using an embedded NATS server") - ns, err := natsserver.New(config.serverConfig, config.dontListen, config.stdoutLogging) + ns, err := natsserver.New(&natsserver.Config{ + Host: config.host, + Port: config.port, + ConfigFile: config.serverConfig, + DontListen: config.dontListen, + StdoutLogging: config.stdoutLogging, + }) if err != nil { return nil, fmt.Errorf("failed to create embedded NATS server: %w", err) } @@ -191,7 +164,9 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac // Use the local server's client URL. config.clientURL = ns.ClientURL() - } else { + } + + if !config.dontListen { logrus.Infof("connecting to %s", config.clientURL) } @@ -233,8 +208,8 @@ func New(ctx context.Context, connection string, tlsInfo tls.Config) (server.Bac }, nil } -// parseNatsConnection returns nats connection url, bucketName and []nats.Option, error -func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { +// parseConnection returns nats connection url, bucketName and []nats.Option, error +func parseConnection(dsn string, tlsInfo tls.Config) (*Config, error) { config := &Config{ slowThreshold: defaultSlowMethod, revHistory: defaultRevHistory, @@ -250,6 +225,12 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { return nil, err } + // Extract the host and port if embedded server is used. + config.host = u.Hostname() + if u.Port() != "" { + config.port, _ = strconv.Atoi(u.Port()) + } + // Extract the query parameters to build configuration. queryMap, err := url.ParseQuery(u.RawQuery) if err != nil { @@ -350,9 +331,9 @@ func parseNatsConnection(dsn string, tlsInfo tls.Config) (*Config, error) { config.clientURL = connBuilder.String() - // If this option is set, consider the server-specific options. - if queryMap.Has("embedServer") || (natsserver.Embedded && dsn == "nats://") { - config.embedServer = true + // Config options only relevant if built with embedded NATS. + if natsserver.Embedded { + config.noEmbed = queryMap.Has("noEmbed") config.serverConfig = queryMap.Get("serverConfig") config.stdoutLogging = queryMap.Has("stdoutLogging") config.dontListen = queryMap.Has("dontListen") diff --git a/pkg/drivers/nats/server/interface.go b/pkg/drivers/nats/server/interface.go index 01a1397c..b23993f0 100644 --- a/pkg/drivers/nats/server/interface.go +++ b/pkg/drivers/nats/server/interface.go @@ -12,3 +12,11 @@ type Server interface { ReadyForConnections(wait time.Duration) bool InProcessConn() (net.Conn, error) } + +type Config struct { + Host string + Port int + ConfigFile string + DontListen bool + StdoutLogging bool +} diff --git a/pkg/drivers/nats/server/no_server.go b/pkg/drivers/nats/server/no_server.go index 73c9a1b0..9f1844ca 100644 --- a/pkg/drivers/nats/server/no_server.go +++ b/pkg/drivers/nats/server/no_server.go @@ -11,6 +11,6 @@ const ( Embedded = false ) -func New(_ string, _, _ bool) (Server, error) { +func New(*Config) (Server, error) { return nil, errors.New(`this binary is built without embedded NATS support, compile with "-tags nats"`) } diff --git a/pkg/drivers/nats/server/server.go b/pkg/drivers/nats/server/server.go index fb4c72be..89806f31 100644 --- a/pkg/drivers/nats/server/server.go +++ b/pkg/drivers/nats/server/server.go @@ -13,25 +13,35 @@ const ( Embedded = true ) -func New(configFile string, dontListen, stdoutLogging bool) (Server, error) { +func New(c *Config) (Server, error) { opts := &server.Options{} - if configFile == "" { - // TODO: Other defaults for easy single node config? - opts.JetStream = true - } else { + if c.ConfigFile != "" { // Parse the server config file as options. var err error - opts, err = server.ProcessConfigFile(configFile) + opts, err = server.ProcessConfigFile(c.ConfigFile) if err != nil { return nil, fmt.Errorf("failed to process NATS server config file: %w", err) } } - opts.DontListen = dontListen + // Note, if don't listen is set, host and port will be ignored. + opts.DontListen = c.DontListen + + // Only override host and port if set explicitly. + if c.Host != "" { + opts.Host = c.Host + } + if c.Port != 0 { + opts.Port = c.Port + } + + // TODO: Other defaults for embedded config? + // Explicitly set JetStream to true since we need the KV store. + opts.JetStream = true srv, err := server.NewServer(opts) - if stdoutLogging { + if c.StdoutLogging { srv.ConfigureLogger() } From 189720a18c1605f8bba4025a70951df36ff44bf2 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 24 Apr 2023 11:03:42 -0400 Subject: [PATCH 18/18] Fix test script and update to 2.9.16 Signed-off-by: Byron Ruth --- scripts/test-run-nats | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/test-run-nats b/scripts/test-run-nats index 1cf5ba07..2b62eed4 100755 --- a/scripts/test-run-nats +++ b/scripts/test-run-nats @@ -7,7 +7,7 @@ start-test() { local image=$(cat $TEST_DIR/databases/*/metadata/image) DB_CONNECTION_TEST="docker container run --rm --name connection-test --entrypoint /usr/local/bin/nats docker.io/natsio/nats-box:0.13.8 server check connection --server=nats://$ip:$port" \ timeout --foreground 1m bash -c "wait-for-db-connection" - KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port" provision-kine + KINE_IMAGE=$IMAGE KINE_ENDPOINT="nats://$ip:$port?noEmbed" provision-kine local kine_url=$(cat $TEST_DIR/kine/*/metadata/url) K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster } @@ -17,7 +17,7 @@ export -f start-test VERSION_LIST="\ nats 2.7.4 nats 2.8.4 - nats 2.9.15" + nats 2.9.16" while read ENGINE VERSION; do LABEL=$ENGINE-$VERSION DB_PASSWORD_ENV=NATS_JS_PASSWORD DB_ARGS="-js" DB_IMAGE=docker.io/library/$ENGINE:$VERSION run-test