diff --git a/.golangci.yaml b/.golangci.yaml
index ddff21fa..602941a1 100644
--- a/.golangci.yaml
+++ b/.golangci.yaml
@@ -112,10 +112,10 @@ linters-settings:
min-occurrences: 3
depguard:
rules:
- main:
- deny:
- - pkg: "github.com/golang/protobuf/proto"
- desc: not allowed
+ main:
+ deny:
+ - pkg: "github.com/golang/protobuf/proto"
+ desc: not allowed
misspell:
# Correct spellings using locale preferences for US or UK.
# Default is to use a neutral variety of English.
diff --git a/go.work b/go.work
index 0f32c94a..d2685597 100644
--- a/go.work
+++ b/go.work
@@ -1,4 +1,4 @@
-go 1.18
+go 1.21
use (
./v4/acme/certmagic
@@ -84,6 +84,7 @@ use (
./v4/store/memory
./v4/store/mysql
./v4/store/nats-js
+ ./v4/store/nats-js-kv
./v4/store/redis
./v4/sync/consul
./v4/sync/etcd
diff --git a/v4/store/nats-js-kv/README.md b/v4/store/nats-js-kv/README.md
new file mode 100644
index 00000000..84db5e6d
--- /dev/null
+++ b/v4/store/nats-js-kv/README.md
@@ -0,0 +1,79 @@
+# NATS JetStream Key Value Store Plugin
+
+This plugin uses the NATS JetStream [KeyValue Store](https://docs.nats.io/nats-concepts/jetstream/key-value-store) to implement the Go-Micro store interface.
+
+You can use this plugin like any other store plugin.
+To start a local NATS JetStream server run `nats-server -js`.
+
+To manually create a new storage object call:
+
+```go
+natsjskv.NewStore(opts ...store.Option)
+```
+
+The Go-Micro store interface uses databases and tables to store keys. These translate
+to buckets (key value stores) and key prefixes. If no database (bucket name) is provided, "default" will be used.
+
+You can call `Write` with any arbitrary database name, and if a bucket with that name does not exist yet,
+it will be automatically created.
+
+If a table name is provided, it will use it to prefix the key as `
_`.
+
+To delete a bucket, and all the key/value pairs in it, pass the `DeleteBucket` option to the `Delete`
+method, then they key name will be interpreted as a bucket name, and the bucket will be deleted.
+
+Next to the default store options, a few NATS specific options are available:
+
+
+```go
+// NatsOptions accepts nats.Options
+NatsOptions(opts nats.Options)
+
+// JetStreamOptions accepts multiple nats.JSOpt
+JetStreamOptions(opts ...nats.JSOpt)
+
+// KeyValueOptions accepts multiple nats.KeyValueConfig
+// This will create buckets with the provided configs at initialization.
+//
+// type KeyValueConfig struct {
+// Bucket string
+// Description string
+// MaxValueSize int32
+// History uint8
+// TTL time.Duration
+// MaxBytes int64
+// Storage StorageType
+// Replicas int
+// Placement *Placement
+// RePublish *RePublish
+// Mirror *StreamSource
+// Sources []*StreamSource
+}
+KeyValueOptions(cfg ...*nats.KeyValueConfig)
+
+// DefaultTTL sets the default TTL to use for new buckets
+// By default no TTL is set.
+//
+// TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL.
+// Either set a default TTL with this option or provide bucket specific options
+// with ObjectStoreOptions
+DefaultTTL(ttl time.Duration)
+
+// DefaultMemory sets the default storage type to memory only.
+//
+// The default is file storage, persisting storage between service restarts.
+// Be aware that the default storage location of NATS the /tmp dir is, and thus
+// won't persist reboots.
+DefaultMemory()
+
+// DefaultDescription sets the default description to use when creating new
+// buckets. The default is "Store managed by go-micro"
+DefaultDescription(text string)
+
+// DeleteBucket will use the key passed to Delete as a bucket (database) name,
+// and delete the bucket.
+// This option should not be combined with the store.DeleteFrom option, as
+// that will overwrite the delete action.
+DeleteBucket()
+```
+
diff --git a/v4/store/nats-js-kv/context.go b/v4/store/nats-js-kv/context.go
new file mode 100644
index 00000000..e5753f24
--- /dev/null
+++ b/v4/store/nats-js-kv/context.go
@@ -0,0 +1,18 @@
+package natsjskv
+
+import (
+ "context"
+
+ "go-micro.dev/v4/store"
+)
+
+// setStoreOption returns a function to setup a context with given value.
+func setStoreOption(k, v interface{}) store.Option {
+ return func(o *store.Options) {
+ if o.Context == nil {
+ o.Context = context.Background()
+ }
+
+ o.Context = context.WithValue(o.Context, k, v)
+ }
+}
diff --git a/v4/store/nats-js-kv/go.mod b/v4/store/nats-js-kv/go.mod
new file mode 100644
index 00000000..736074ff
--- /dev/null
+++ b/v4/store/nats-js-kv/go.mod
@@ -0,0 +1,66 @@
+module github.com/go-micro/plugins/v4/store/nats-js-kv
+
+go 1.21
+
+require (
+ github.com/cornelk/hashmap v1.0.8
+ github.com/nats-io/nats-server/v2 v2.8.4
+)
+
+require (
+ github.com/Microsoft/go-winio v0.5.2 // indirect
+ github.com/ProtonMail/go-crypto v0.0.0-20220824120805-4b6e5c587895 // indirect
+ github.com/acomagu/bufpipe v1.0.3 // indirect
+ github.com/bitly/go-simplejson v0.5.0 // indirect
+ github.com/cloudflare/circl v1.2.0 // indirect
+ github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/emirpasic/gods v1.18.1 // indirect
+ github.com/fsnotify/fsnotify v1.5.4 // indirect
+ github.com/go-git/gcfg v1.5.0 // indirect
+ github.com/go-git/go-billy/v5 v5.3.1 // indirect
+ github.com/go-git/go-git/v5 v5.4.2 // indirect
+ github.com/golang/protobuf v1.5.2 // indirect
+ github.com/google/go-cmp v0.5.6 // indirect
+ github.com/imdario/mergo v0.3.13 // indirect
+ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
+ github.com/kevinburke/ssh_config v1.2.0 // indirect
+ github.com/klauspost/compress v1.17.0 // indirect
+ github.com/minio/highwayhash v1.0.2 // indirect
+ github.com/mitchellh/go-homedir v1.1.0 // indirect
+ github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
+ github.com/nats-io/nkeys v0.4.5 // indirect
+ github.com/nats-io/nuid v1.0.1 // indirect
+ github.com/nxadm/tail v1.4.8 // indirect
+ github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
+ github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/russross/blackfriday/v2 v2.1.0 // indirect
+ github.com/sergi/go-diff v1.2.0 // indirect
+ github.com/test-go/testify v1.1.4 // indirect
+ github.com/urfave/cli/v2 v2.14.0 // indirect
+ github.com/xanzy/ssh-agent v0.3.2 // indirect
+ github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
+ golang.org/x/mod v0.8.0 // indirect
+ golang.org/x/sys v0.5.0 // indirect
+ golang.org/x/text v0.13.0 // indirect
+ golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
+ golang.org/x/tools v0.6.0 // indirect
+ golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
+ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
+ gopkg.in/warnings.v0 v0.1.2 // indirect
+ gopkg.in/yaml.v3 v3.0.1 // indirect
+)
+
+require (
+ github.com/google/uuid v1.3.0
+ github.com/miekg/dns v1.1.50 // indirect
+ github.com/nats-io/nats.go v1.31.0
+ github.com/pkg/errors v0.9.1
+ github.com/stretchr/testify v1.7.1
+ go-micro.dev/v4 v4.9.0
+ golang.org/x/crypto v0.6.0 // indirect
+ golang.org/x/net v0.6.0 // indirect
+ golang.org/x/sync v0.1.0 // indirect
+ google.golang.org/protobuf v1.28.1 // indirect
+)
diff --git a/v4/store/nats-js-kv/go.sum b/v4/store/nats-js-kv/go.sum
new file mode 100644
index 00000000..b2e17e36
--- /dev/null
+++ b/v4/store/nats-js-kv/go.sum
@@ -0,0 +1,241 @@
+github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
+github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0=
+github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
+github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
+github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=
+github.com/ProtonMail/go-crypto v0.0.0-20220824120805-4b6e5c587895 h1:NsReiLpErIPzRrnogAXYwSoU7txA977LjDGrbkewJbg=
+github.com/ProtonMail/go-crypto v0.0.0-20220824120805-4b6e5c587895/go.mod h1:UBYPn8k0D56RtnR8RFQMjmh4KrZzWJ5o7Z9SYjossQ8=
+github.com/acomagu/bufpipe v1.0.3 h1:fxAGrHZTgQ9w5QqVItgzwj235/uYZYgbXitB+dLupOk=
+github.com/acomagu/bufpipe v1.0.3/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
+github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA=
+github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
+github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
+github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
+github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y=
+github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
+github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
+github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
+github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
+github.com/bwesterb/go-ristretto v1.2.1/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
+github.com/cloudflare/circl v1.1.0/go.mod h1:prBCrKB9DV4poKZY1l9zBXg2QJY7mvgRvtMxxK7fi4I=
+github.com/cloudflare/circl v1.2.0 h1:NheeISPSUcYftKlfrLuOo4T62FkmD4t4jviLfFFYaec=
+github.com/cloudflare/circl v1.2.0/go.mod h1:Ch2UgYr6ti2KTtlejELlROl0YIYj7SLjAC8M+INXlMk=
+github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc=
+github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k=
+github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
+github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
+github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
+github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
+github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
+github.com/gliderlabs/ssh v0.2.2 h1:6zsha5zo/TWhRhwqCD3+EarCAgZ2yN28ipRnGPnwkI0=
+github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
+github.com/go-git/gcfg v1.5.0 h1:Q5ViNfGF8zFgyJWPqYwA7qGFoMTEiBmdlkcfRmpIMa4=
+github.com/go-git/gcfg v1.5.0/go.mod h1:5m20vg6GwYabIxaOonVkTdrILxQMpEShl1xiMF4ua+E=
+github.com/go-git/go-billy/v5 v5.2.0/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0=
+github.com/go-git/go-billy/v5 v5.3.1 h1:CPiOUAzKtMRvolEKw+bG1PLRpT7D3LIs3/3ey4Aiu34=
+github.com/go-git/go-billy/v5 v5.3.1/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0=
+github.com/go-git/go-git-fixtures/v4 v4.2.1 h1:n9gGL1Ct/yIw+nfsfr8s4+sbhT+Ncu2SubfXjIWgci8=
+github.com/go-git/go-git-fixtures/v4 v4.2.1/go.mod h1:K8zd3kDUAykwTdDCr+I0per6Y6vMiRR/nnVTBtavnB0=
+github.com/go-git/go-git/v5 v5.4.2 h1:BXyZu9t0VkbiHtqrsvdq39UDhGJTl1h55VW6CSC4aY4=
+github.com/go-git/go-git/v5 v5.4.2/go.mod h1:gQ1kArt6d+n+BGd+/B/I74HwRTLhth2+zti4ihgckDc=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
+github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
+github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
+github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
+github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
+github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
+github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
+github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
+github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
+github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
+github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
+github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
+github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A=
+github.com/matryer/is v1.2.0/go.mod h1:2fLPjFQM9rhQ15aVEtbuwhJinnOqrmgXPNdZsdwlWXA=
+github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
+github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
+github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
+github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
+github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
+github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
+github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
+github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
+github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
+github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
+github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
+github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
+github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
+github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
+github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
+github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
+github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
+github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
+github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
+github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
+github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
+github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
+github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
+github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
+github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
+github.com/urfave/cli/v2 v2.14.0 h1:sFRL29Dm9JhXSMYb96raDeo/Q/JRyPXPs8u+4CkMlI8=
+github.com/urfave/cli/v2 v2.14.0/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI=
+github.com/xanzy/ssh-agent v0.3.0/go.mod h1:3s9xbODqPuuhK9JV1R321M/FlMZSBvE5aY6eAcqrDh0=
+github.com/xanzy/ssh-agent v0.3.2 h1:eKj4SX2Fe7mui28ZgnFW5fmTz1EIr7ugo5s6wDxdHBM=
+github.com/xanzy/ssh-agent v0.3.2/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
+github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
+github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+go-micro.dev/v4 v4.9.0 h1:pd1CpqMT9hA47jSmX8mfdGK865PkMh95Rwj5RdfqPqE=
+go-micro.dev/v4 v4.9.0/go.mod h1:Ju8HrZ5hQSF+QguZ2QUs9Kbe42MHP1tJa/fpP5g07Cs=
+golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
+golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
+golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM=
+golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
+golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210326060303-6b1517762897/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b h1:ZmngSVLe/wycRns9MKikG9OWIEjGcGAkacif7oYQaUY=
+golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc=
+golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210502180810-71e4cd670f79/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220315194320-039c03cc5b86/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY=
+golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
+golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
+golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
+google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
+gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/v4/store/nats-js-kv/helpers_test.go b/v4/store/nats-js-kv/helpers_test.go
new file mode 100644
index 00000000..eff394bf
--- /dev/null
+++ b/v4/store/nats-js-kv/helpers_test.go
@@ -0,0 +1,184 @@
+package natsjskv
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "testing"
+ "time"
+
+ nserver "github.com/nats-io/nats-server/v2/server"
+ "github.com/pkg/errors"
+ "github.com/test-go/testify/require"
+ "go-micro.dev/v4/store"
+)
+
+func testSetup(ctx context.Context, t *testing.T, opts ...store.Option) store.Store {
+ t.Helper()
+
+ var err error
+ var s store.Store
+ for i := 0; i < 5; i++ {
+ nCtx, cancel := context.WithCancel(ctx)
+ addr := startNatsServer(nCtx, t)
+
+ opts = append(opts, store.Nodes(addr), EncodeKeys())
+ s = NewStore(opts...)
+
+ err = s.Init()
+ if err != nil {
+ t.Log(errors.Wrap(err, "Error: Server initialization failed, restarting server"))
+ cancel()
+ if err = s.Close(); err != nil {
+ t.Logf("Failed to close store: %v", err)
+ }
+ time.Sleep(time.Second)
+ continue
+ }
+
+ go func() {
+ <-ctx.Done()
+ cancel()
+ if err = s.Close(); err != nil {
+ t.Logf("Failed to close store: %v", err)
+ }
+ }()
+
+ return s
+ }
+ t.Error(errors.Wrap(err, "Store initialization failed"))
+ return s
+}
+
+func startNatsServer(ctx context.Context, t *testing.T) string {
+ t.Helper()
+ natsAddr := getFreeLocalhostAddress()
+ natsPort, err := strconv.Atoi(strings.Split(natsAddr, ":")[1])
+ if err != nil {
+ t.Logf("Failed to parse port from address: %v", err)
+ }
+
+ clusterName := "gomicro-store-test-cluster"
+
+ // start the NATS with JetStream server
+ go natsServer(ctx,
+ t,
+ &nserver.Options{
+ Host: strings.Split(natsAddr, ":")[0],
+ Port: natsPort,
+ Cluster: nserver.ClusterOpts{
+ Name: clusterName,
+ },
+ },
+ )
+
+ time.Sleep(2 * time.Second)
+
+ return natsAddr
+}
+
+func getFreeLocalhostAddress() string {
+ l, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return ""
+ }
+
+ addr := l.Addr().String()
+ if err := l.Close(); err != nil {
+ return addr
+ }
+ return addr
+}
+
+func natsServer(ctx context.Context, t *testing.T, opts *nserver.Options) {
+ t.Helper()
+
+ opts.TLSTimeout = 180
+ server, err := nserver.NewServer(
+ opts,
+ )
+ require.NoError(t, err)
+ if err != nil {
+ return
+ }
+ defer server.Shutdown()
+
+ server.SetLoggerV2(
+ NewLogWrapper(),
+ false, false, false,
+ )
+
+ tmpdir := t.TempDir()
+ natsdir := filepath.Join(tmpdir, "nats-js")
+ jsConf := &nserver.JetStreamConfig{
+ StoreDir: natsdir,
+ }
+
+ // first start NATS
+ go server.Start()
+ time.Sleep(time.Second)
+
+ // second start JetStream
+ err = server.EnableJetStream(jsConf)
+ require.NoError(t, err)
+ if err != nil {
+ return
+ }
+
+ // This fixes some issues where tests fail because directory cleanup fails
+ t.Cleanup(func() {
+ contents, err := filepath.Glob(natsdir + "/*")
+ if err != nil {
+ t.Logf("Failed to glob directory: %v", err)
+ }
+ for _, item := range contents {
+ if err := os.RemoveAll(item); err != nil {
+ t.Logf("Failed to remove file: %v", err)
+ }
+ }
+ if err := os.RemoveAll(natsdir); err != nil {
+ t.Logf("Failed to remove directory: %v", err)
+ }
+ })
+
+ <-ctx.Done()
+}
+
+func NewLogWrapper() *LogWrapper {
+ return &LogWrapper{}
+}
+
+type LogWrapper struct {
+}
+
+// Noticef logs a notice statement.
+func (l *LogWrapper) Noticef(_ string, _ ...interface{}) {
+}
+
+// Warnf logs a warning statement.
+func (l *LogWrapper) Warnf(format string, v ...interface{}) {
+ fmt.Printf(format+"\n", v...)
+}
+
+// Fatalf logs a fatal statement.
+func (l *LogWrapper) Fatalf(format string, v ...interface{}) {
+ fmt.Printf(format+"\n", v...)
+}
+
+// Errorf logs an error statement.
+func (l *LogWrapper) Errorf(format string, v ...interface{}) {
+ fmt.Printf(format+"\n", v...)
+}
+
+// Debugf logs a debug statement.
+func (l *LogWrapper) Debugf(_ string, _ ...interface{}) {
+}
+
+// Tracef logs a trace statement.
+func (l *LogWrapper) Tracef(format string, v ...interface{}) {
+ fmt.Printf(format+"\n", v...)
+}
diff --git a/v4/store/nats-js-kv/keys.go b/v4/store/nats-js-kv/keys.go
new file mode 100644
index 00000000..8eb2869c
--- /dev/null
+++ b/v4/store/nats-js-kv/keys.go
@@ -0,0 +1,119 @@
+package natsjskv
+
+import (
+ "encoding/base32"
+ "strings"
+)
+
+// NatsKey is a convenience function to create a key for the nats kv store.
+func (n *natsStore) NatsKey(table, microkey string) string {
+ return n.NewKey(table, microkey, "").NatsKey()
+}
+
+// MicroKey is a convenience function to create a key for the micro interface.
+func (n *natsStore) MicroKey(table, natskey string) string {
+ return n.NewKey(table, "", natskey).MicroKey()
+}
+
+// MicroKeyFilter is a convenience function to create a key for the micro interface.
+// It returns false if the key does not match the table, prefix or suffix.
+func (n *natsStore) MicroKeyFilter(table, natskey string, prefix, suffix string) (string, bool) {
+ k := n.NewKey(table, "", natskey)
+ return k.MicroKey(), k.Check(table, prefix, suffix)
+}
+
+// Key represents a key in the store.
+// They are used to convert nats keys (base32 encoded) to micro keys (plain text - no table prefix) and vice versa.
+type Key struct {
+ // Plain is the plain key as requested by the go-micro interface.
+ Plain string
+ // Full is the full key including the table prefix.
+ Full string
+ // Encoded is the base64 encoded key as used by the nats kv store.
+ Encoded string
+}
+
+// NewKey creates a new key. Either plain or encoded must be set.
+func (n *natsStore) NewKey(table string, plain, encoded string) *Key {
+ k := &Key{
+ Plain: plain,
+ Encoded: encoded,
+ }
+
+ switch {
+ case k.Plain != "":
+ k.Full = getKey(k.Plain, table)
+ k.Encoded = encode(k.Full, n.encoding)
+ case k.Encoded != "":
+ k.Full = decode(k.Encoded, n.encoding)
+ k.Plain = trimKey(k.Full, table)
+ }
+
+ return k
+}
+
+// NatsKey returns a key the nats kv store can work with.
+func (k *Key) NatsKey() string {
+ return k.Encoded
+}
+
+// MicroKey returns a key the micro interface can work with.
+func (k *Key) MicroKey() string {
+ return k.Plain
+}
+
+// Check returns false if the key does not match the table, prefix or suffix.
+func (k *Key) Check(table, prefix, suffix string) bool {
+ if table != "" && k.Full != getKey(k.Plain, table) {
+ return false
+ }
+
+ if prefix != "" && !strings.HasPrefix(k.Plain, prefix) {
+ return false
+ }
+
+ if suffix != "" && !strings.HasSuffix(k.Plain, suffix) {
+ return false
+ }
+
+ return true
+}
+
+func encode(s string, alg string) string {
+ switch alg {
+ case "base32":
+ return base32.StdEncoding.EncodeToString([]byte(s))
+ default:
+ return s
+ }
+}
+
+func decode(s string, alg string) string {
+ switch alg {
+ case "base32":
+ b, err := base32.StdEncoding.DecodeString(s)
+ if err != nil {
+ return s
+ }
+
+ return string(b)
+ default:
+ return s
+ }
+}
+
+func getKey(key, table string) string {
+ if table != "" {
+ return table + "_" + key
+ }
+
+ return key
+}
+
+func trimKey(key, table string) string {
+ if table != "" {
+ return strings.TrimPrefix(key, table+"_")
+ }
+
+ return key
+}
diff --git a/v4/store/nats-js-kv/nats.go b/v4/store/nats-js-kv/nats.go
new file mode 100644
index 00000000..13ae81d2
--- /dev/null
+++ b/v4/store/nats-js-kv/nats.go
@@ -0,0 +1,484 @@
+// Package natsjskv is a go-micro store plugin for NATS JetStream Key-Value store.
+package natsjskv
+
+import (
+ "context"
+ "encoding/json"
+ "sync"
+ "time"
+
+ "github.com/cornelk/hashmap"
+ "github.com/nats-io/nats.go"
+ "github.com/pkg/errors"
+ "go-micro.dev/v4/store"
+ "go-micro.dev/v4/util/cmd"
+)
+
+var (
+ // ErrBucketNotFound is returned when the requested bucket does not exist.
+ ErrBucketNotFound = errors.New("Bucket (database) not found")
+)
+
+// KeyValueEnvelope is the data structure stored in the key value store.
+type KeyValueEnvelope struct {
+ Key string `json:"key"`
+ Data []byte `json:"data"`
+ Metadata map[string]interface{} `json:"metadata"`
+}
+
+type natsStore struct {
+ sync.Once
+ sync.RWMutex
+
+ encoding string
+ ttl time.Duration
+ storageType nats.StorageType
+ description string
+
+ opts store.Options
+ nopts nats.Options
+ jsopts []nats.JSOpt
+ kvConfigs []*nats.KeyValueConfig
+
+ conn *nats.Conn
+ js nats.JetStreamContext
+ buckets *hashmap.Map[string, nats.KeyValue]
+}
+
+func init() {
+ cmd.DefaultStores["natsjskv"] = NewStore
+}
+
+// NewStore will create a new NATS JetStream Object Store.
+func NewStore(opts ...store.Option) store.Store {
+ options := store.Options{
+ Nodes: []string{},
+ Database: "default",
+ Table: "",
+ Context: context.Background(),
+ }
+
+ n := &natsStore{
+ description: "KeyValue storage administered by go-micro store plugin",
+ opts: options,
+ jsopts: []nats.JSOpt{},
+ kvConfigs: []*nats.KeyValueConfig{},
+ buckets: hashmap.New[string, nats.KeyValue](),
+ storageType: nats.FileStorage,
+ }
+
+ n.setOption(opts...)
+
+ return n
+}
+
+// Init initializes the store. It must perform any required setup on the
+// backing storage implementation and check that it is ready for use,
+// returning any errors.
+func (n *natsStore) Init(opts ...store.Option) error {
+ n.setOption(opts...)
+
+ // Connect to NATS servers
+ conn, err := n.nopts.Connect()
+ if err != nil {
+ return errors.Wrap(err, "Failed to connect to NATS Server")
+ }
+
+ // Create JetStream context
+ js, err := conn.JetStream(n.jsopts...)
+ if err != nil {
+ return errors.Wrap(err, "Failed to create JetStream context")
+ }
+
+ n.conn = conn
+ n.js = js
+
+ // Create default config if no configs present
+ if len(n.kvConfigs) == 0 {
+ if _, err := n.mustGetBucketByName(n.opts.Database); err != nil {
+ return err
+ }
+ }
+
+ // Create kv store buckets
+ for _, cfg := range n.kvConfigs {
+ if _, err := n.mustGetBucket(cfg); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (n *natsStore) setOption(opts ...store.Option) {
+ for _, o := range opts {
+ o(&n.opts)
+ }
+
+ n.Once.Do(func() {
+ n.nopts = nats.GetDefaultOptions()
+ })
+
+ // Extract options from context
+ if nopts, ok := n.opts.Context.Value(natsOptionsKey{}).(nats.Options); ok {
+ n.nopts = nopts
+ }
+
+ if jsopts, ok := n.opts.Context.Value(jsOptionsKey{}).([]nats.JSOpt); ok {
+ n.jsopts = append(n.jsopts, jsopts...)
+ }
+
+ if cfg, ok := n.opts.Context.Value(kvOptionsKey{}).([]*nats.KeyValueConfig); ok {
+ n.kvConfigs = append(n.kvConfigs, cfg...)
+ }
+
+ if ttl, ok := n.opts.Context.Value(ttlOptionsKey{}).(time.Duration); ok {
+ n.ttl = ttl
+ }
+
+ if sType, ok := n.opts.Context.Value(memoryOptionsKey{}).(nats.StorageType); ok {
+ n.storageType = sType
+ }
+
+ if text, ok := n.opts.Context.Value(descriptionOptionsKey{}).(string); ok {
+ n.description = text
+ }
+
+ if encoding, ok := n.opts.Context.Value(keyEncodeOptionsKey{}).(string); ok {
+ n.encoding = encoding
+ }
+
+ // Assign store option server addresses to nats options
+ if len(n.opts.Nodes) > 0 {
+ n.nopts.Url = ""
+ n.nopts.Servers = n.opts.Nodes
+ }
+
+ if len(n.nopts.Servers) == 0 && n.nopts.Url == "" {
+ n.nopts.Url = nats.DefaultURL
+ }
+}
+
+// Options allows you to view the current options.
+func (n *natsStore) Options() store.Options {
+ return n.opts
+}
+
+// Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error.
+func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
+ if err := n.initConn(); err != nil {
+ return nil, err
+ }
+
+ opt := store.ReadOptions{}
+
+ for _, o := range opts {
+ o(&opt)
+ }
+
+ if opt.Database == "" {
+ opt.Database = n.opts.Database
+ }
+
+ if opt.Table == "" {
+ opt.Table = n.opts.Table
+ }
+
+ bucket, ok := n.buckets.Get(opt.Database)
+ if !ok {
+ return nil, ErrBucketNotFound
+ }
+
+ keys, err := n.natsKeys(bucket, opt.Table, key, opt.Prefix, opt.Suffix)
+ if err != nil {
+ return nil, err
+ }
+
+ records := make([]*store.Record, 0, len(keys))
+
+ for _, key := range keys {
+ rec, ok, err := n.getRecord(bucket, key)
+ if err != nil {
+ return nil, err
+ }
+
+ if ok {
+ records = append(records, rec)
+ }
+ }
+
+ return enforceLimits(records, opt.Limit, opt.Offset), nil
+}
+
+// Write writes a record to the store, and returns an error if the record was not written.
+func (n *natsStore) Write(rec *store.Record, opts ...store.WriteOption) error {
+ if err := n.initConn(); err != nil {
+ return err
+ }
+
+ opt := store.WriteOptions{}
+ for _, o := range opts {
+ o(&opt)
+ }
+
+ if opt.Database == "" {
+ opt.Database = n.opts.Database
+ }
+
+ if opt.Table == "" {
+ opt.Table = n.opts.Table
+ }
+
+ store, err := n.mustGetBucketByName(opt.Database)
+ if err != nil {
+ return err
+ }
+
+ b, err := json.Marshal(KeyValueEnvelope{
+ Key: rec.Key,
+ Data: rec.Value,
+ Metadata: rec.Metadata,
+ })
+ if err != nil {
+ return errors.Wrap(err, "Failed to marshal object")
+ }
+
+ if _, err := store.Put(n.NatsKey(opt.Table, rec.Key), b); err != nil {
+ return errors.Wrapf(err, "Failed to store data in bucket '%s'", n.NatsKey(opt.Table, rec.Key))
+ }
+
+ return nil
+}
+
+// Delete removes the record with the corresponding key from the store.
+func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
+ if err := n.initConn(); err != nil {
+ return err
+ }
+
+ opt := store.DeleteOptions{}
+
+ for _, o := range opts {
+ o(&opt)
+ }
+
+ if opt.Database == "" {
+ opt.Database = n.opts.Database
+ }
+
+ if opt.Table == "" {
+ opt.Table = n.opts.Table
+ }
+
+ if opt.Table == "DELETE_BUCKET" {
+ n.buckets.Del(key)
+
+ if err := n.js.DeleteKeyValue(key); err != nil {
+ return errors.Wrap(err, "Failed to delete bucket")
+ }
+
+ return nil
+ }
+
+ store, ok := n.buckets.Get(opt.Database)
+ if !ok {
+ return ErrBucketNotFound
+ }
+
+ if err := store.Delete(n.NatsKey(opt.Table, key)); err != nil {
+ return errors.Wrap(err, "Failed to delete data")
+ }
+
+ return nil
+}
+
+// List returns any keys that match, or an empty list with no error if none matched.
+func (n *natsStore) List(opts ...store.ListOption) ([]string, error) {
+ if err := n.initConn(); err != nil {
+ return nil, err
+ }
+
+ opt := store.ListOptions{}
+ for _, o := range opts {
+ o(&opt)
+ }
+
+ if opt.Database == "" {
+ opt.Database = n.opts.Database
+ }
+
+ if opt.Table == "" {
+ opt.Table = n.opts.Table
+ }
+
+ store, ok := n.buckets.Get(opt.Database)
+ if !ok {
+ return nil, ErrBucketNotFound
+ }
+
+ keys, err := n.microKeys(store, opt.Table, opt.Prefix, opt.Suffix)
+ if err != nil {
+ return nil, errors.Wrap(err, "Failed to list keys in bucket")
+ }
+
+ return enforceLimits(keys, opt.Limit, opt.Offset), nil
+}
+
+// Close the store.
+func (n *natsStore) Close() error {
+ n.conn.Close()
+ return nil
+}
+
+// String returns the name of the implementation.
+func (n *natsStore) String() string {
+ return "NATS JetStream KeyValueStore"
+}
+
+// thread safe way to initialize the connection.
+func (n *natsStore) initConn() error {
+ if n.hasConn() {
+ return nil
+ }
+
+ n.Lock()
+ defer n.Unlock()
+
+ // check if conn was initialized meanwhile
+ if n.conn != nil {
+ return nil
+ }
+
+ return n.Init()
+}
+
+// thread safe way to check if n is initialized.
+func (n *natsStore) hasConn() bool {
+ n.RLock()
+ defer n.RUnlock()
+
+ return n.conn != nil
+}
+
+// mustGetDefaultBucket returns the bucket with the given name creating it with default configuration if needed.
+func (n *natsStore) mustGetBucketByName(name string) (nats.KeyValue, error) {
+ return n.mustGetBucket(&nats.KeyValueConfig{
+ Bucket: name,
+ Description: n.description,
+ TTL: n.ttl,
+ Storage: n.storageType,
+ })
+}
+
+// mustGetBucket creates a new bucket if it does not exist yet.
+func (n *natsStore) mustGetBucket(kv *nats.KeyValueConfig) (nats.KeyValue, error) {
+ if store, ok := n.buckets.Get(kv.Bucket); ok {
+ return store, nil
+ }
+
+ store, err := n.js.KeyValue(kv.Bucket)
+ if err != nil {
+ if !errors.Is(err, nats.ErrBucketNotFound) {
+ return nil, errors.Wrapf(err, "Failed to get bucket (%s)", kv.Bucket)
+ }
+
+ store, err = n.js.CreateKeyValue(kv)
+ if err != nil {
+ return nil, errors.Wrapf(err, "Failed to create bucket (%s)", kv.Bucket)
+ }
+ }
+
+ n.buckets.Set(kv.Bucket, store)
+
+ return store, nil
+}
+
+// getRecord returns the record with the given key from the nats kv store.
+func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record, bool, error) {
+ obj, err := bucket.Get(key)
+ if errors.Is(err, nats.ErrKeyNotFound) {
+ return nil, false, nil
+ } else if err != nil {
+ return nil, false, errors.Wrap(err, "Failed to get object from bucket")
+ }
+
+ var kv KeyValueEnvelope
+ if err := json.Unmarshal(obj.Value(), &kv); err != nil {
+ return nil, false, errors.Wrap(err, "Failed to unmarshal object")
+ }
+
+ if obj.Operation() != nats.KeyValuePut {
+ return nil, false, nil
+ }
+
+ return &store.Record{
+ Key: kv.Key,
+ Value: kv.Data,
+ Metadata: kv.Metadata,
+ }, true, nil
+}
+
+func (n *natsStore) natsKeys(bucket nats.KeyValue, table, key string, prefix, suffix bool) ([]string, error) {
+ if !suffix && !prefix {
+ return []string{n.NatsKey(table, key)}, nil
+ }
+
+ toS := func(s string, b bool) string {
+ if b {
+ return s
+ }
+
+ return ""
+ }
+
+ keys, _, err := n.getKeys(bucket, table, toS(key, prefix), toS(key, suffix))
+
+ return keys, err
+}
+
+func (n *natsStore) microKeys(bucket nats.KeyValue, table, prefix, suffix string) ([]string, error) {
+ _, keys, err := n.getKeys(bucket, table, prefix, suffix)
+
+ return keys, err
+}
+
+func (n *natsStore) getKeys(bucket nats.KeyValue, table string, prefix, suffix string) ([]string, []string, error) {
+ names, err := bucket.Keys(nats.IgnoreDeletes())
+ if errors.Is(err, nats.ErrKeyNotFound) {
+ return []string{}, []string{}, nil
+ } else if err != nil {
+ return []string{}, []string{}, errors.Wrap(err, "Failed to list objects")
+ }
+
+ natsKeys := make([]string, 0, len(names))
+ microKeys := make([]string, 0, len(names))
+
+ for _, k := range names {
+ mkey, ok := n.MicroKeyFilter(table, k, prefix, suffix)
+ if !ok {
+ continue
+ }
+
+ natsKeys = append(natsKeys, k)
+ microKeys = append(microKeys, mkey)
+ }
+
+ return natsKeys, microKeys, nil
+}
+
+// enforces offset and limit without causing a panic.
+func enforceLimits[V any](recs []V, limit, offset uint) []V {
+ l := uint(len(recs))
+
+ from := offset
+ if from > l {
+ from = l
+ }
+
+ to := l
+ if limit > 0 && offset+limit < l {
+ to = offset + limit
+ }
+
+ return recs[from:to]
+}
diff --git a/v4/store/nats-js-kv/nats_test.go b/v4/store/nats-js-kv/nats_test.go
new file mode 100644
index 00000000..5f248f5c
--- /dev/null
+++ b/v4/store/nats-js-kv/nats_test.go
@@ -0,0 +1,339 @@
+package natsjskv
+
+import (
+ "context"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/nats-io/nats.go"
+ "github.com/pkg/errors"
+ "go-micro.dev/v4/store"
+)
+
+func TestNats(t *testing.T) {
+ // Setup without calling Init on purpose
+ var err error
+ var cancel func()
+ var ctx context.Context
+ for i := 0; i < 5; i++ {
+ ctx, cancel = context.WithCancel(context.Background())
+ addr := startNatsServer(ctx, t)
+ s := NewStore(store.Nodes(addr), EncodeKeys())
+
+ // Test String method
+ t.Log("Testing:", s.String())
+
+ err = basicTest(t, s)
+ if err != nil {
+ t.Log(err)
+ continue
+ }
+
+ // Test reading non-existing key
+ r, err := s.Read("this-is-a-random-key")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(r) > 0 {
+ t.Fatal("Lenth should be 0")
+ }
+ err = s.Close()
+ if err != nil {
+ t.Logf("Failed to close store: %v", err)
+ }
+ cancel()
+ return
+ }
+ cancel()
+ t.Fatal(err)
+}
+
+func TestOptions(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ s := testSetup(ctx, t,
+ DefaultMemory(),
+
+ // Having a non-default description will trigger nats.ErrStreamNameAlreadyInUse
+ // since the buckets have been created in previous tests with a different description.
+ //
+ // NOTE: this is only the case with a manually set up server, not with current
+ // test setup, where new servers are started for each test.
+ DefaultDescription("My fancy description"),
+
+ // Option has no effect in this context, just to test setting the option
+ JetStreamOptions(nats.PublishAsyncMaxPending(256)),
+
+ // Sets a custom NATS client name, just to test the NatsOptions() func
+ NatsOptions(nats.Options{Name: "Go NATS Store Plugin Tests Client"}),
+
+ KeyValueOptions(&nats.KeyValueConfig{
+ Bucket: "TestBucketName",
+ Description: "This bucket is not used",
+ TTL: 5 * time.Minute,
+ MaxBytes: 1024,
+ Storage: nats.MemoryStorage,
+ Replicas: 1,
+ }),
+
+ // Encode keys to avoid character limitations
+ EncodeKeys(),
+ )
+ defer cancel()
+
+ if err := basicTest(t, s); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestTTL(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ ttl := 500 * time.Millisecond
+ s := testSetup(ctx, t,
+ DefaultTTL(ttl),
+
+ // Since these buckets will be new they will have the new description
+ DefaultDescription("My fancy description"),
+ )
+ defer cancel()
+
+ // Use a uuid to make sure a new bucket is created when using local server
+ id := uuid.New().String()
+ for _, r := range table {
+ if err := s.Write(r.Record, store.WriteTo(r.Database+id, r.Table)); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ time.Sleep(ttl * 2)
+
+ for _, r := range table {
+ res, err := s.Read(r.Record.Key, store.ReadFrom(r.Database+id, r.Table))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(res) > 0 {
+ t.Fatal("Fetched record while it should have expired")
+ }
+ }
+}
+
+func TestMetaData(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ s := testSetup(ctx, t)
+ defer cancel()
+
+ record := store.Record{
+ Key: "KeyOne",
+ Value: []byte("Some value"),
+ Metadata: map[string]interface{}{
+ "meta-one": "val",
+ "meta-two": 5,
+ },
+ Expiry: 0,
+ }
+ bucket := "meta-data-test"
+ if err := s.Write(&record, store.WriteTo(bucket, "")); err != nil {
+ t.Fatal(err)
+ }
+
+ r, err := s.Read(record.Key, store.ReadFrom(bucket, ""))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(r) == 0 {
+ t.Fatal("No results found")
+ }
+
+ m := r[0].Metadata
+ if m["meta-one"].(string) != record.Metadata["meta-one"].(string) ||
+ m["meta-two"].(float64) != float64(record.Metadata["meta-two"].(int)) {
+ t.Fatalf("Metadata does not match: (%+v) != (%+v)", m, record.Metadata)
+ }
+}
+
+func TestDelete(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ s := testSetup(ctx, t)
+ defer cancel()
+
+ for _, r := range table {
+ if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := s.Delete(r.Record.Key, store.DeleteFrom(r.Database, r.Table)); err != nil {
+ t.Fatal(err)
+ }
+ time.Sleep(time.Second)
+
+ res, err := s.Read(r.Record.Key, store.ReadFrom(r.Database, r.Table))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(res) > 0 {
+ t.Fatalf("Failed to delete %s:%s from %s %s (len: %d)", r.Record.Key, r.Record.Value, r.Database, r.Table, len(res))
+ }
+ }
+}
+
+func TestList(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ s := testSetup(ctx, t)
+ defer cancel()
+
+ for _, r := range table {
+ if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ l := []struct {
+ Database string
+ Table string
+ Length int
+ Prefix string
+ Suffix string
+ Offset int
+ Limit int
+ }{
+ {Length: 7},
+ {Database: "prefix-test", Length: 7},
+ {Database: "prefix-test", Offset: 2, Length: 5},
+ {Database: "prefix-test", Offset: 2, Limit: 3, Length: 3},
+ {Database: "prefix-test", Table: "names", Length: 3},
+ {Database: "prefix-test", Table: "cities", Length: 4},
+ {Database: "prefix-test", Table: "cities", Suffix: "City", Length: 3},
+ {Database: "prefix-test", Table: "cities", Suffix: "City", Limit: 2, Length: 2},
+ {Database: "prefix-test", Table: "cities", Suffix: "City", Offset: 1, Length: 2},
+ {Prefix: "test", Length: 1},
+ {Table: "some_table", Prefix: "test", Suffix: "test", Length: 2},
+ }
+
+ for i, entry := range l {
+ // Test listing keys
+ keys, err := s.List(
+ store.ListFrom(entry.Database, entry.Table),
+ store.ListPrefix(entry.Prefix),
+ store.ListSuffix(entry.Suffix),
+ store.ListOffset(uint(entry.Offset)),
+ store.ListLimit(uint(entry.Limit)),
+ )
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(keys) != entry.Length {
+ t.Fatalf("Length of returned keys is invalid for test %d - %+v (%d)", i+1, entry, len(keys))
+ }
+
+ // Test reading keys
+ if entry.Prefix != "" || entry.Suffix != "" {
+ var key string
+ options := []store.ReadOption{
+ store.ReadFrom(entry.Database, entry.Table),
+ store.ReadLimit(uint(entry.Limit)),
+ store.ReadOffset(uint(entry.Offset)),
+ }
+ if entry.Prefix != "" {
+ key = entry.Prefix
+ options = append(options, store.ReadPrefix())
+ }
+ if entry.Suffix != "" {
+ key = entry.Suffix
+ options = append(options, store.ReadSuffix())
+ }
+ r, err := s.Read(key, options...)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(r) != entry.Length {
+ t.Fatalf("Length of read keys is invalid for test %d - %+v (%d)", i+1, entry, len(r))
+ }
+ }
+ }
+}
+
+func TestDeleteBucket(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ s := testSetup(ctx, t)
+ defer cancel()
+
+ for _, r := range table {
+ if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ bucket := "prefix-test"
+ if err := s.Delete(bucket, DeleteBucket()); err != nil {
+ t.Fatal(err)
+ }
+
+ keys, err := s.List(store.ListFrom(bucket, ""))
+ if err != nil && !errors.Is(err, ErrBucketNotFound) {
+ t.Fatalf("Failed to delete bucket: %v", err)
+ }
+
+ if len(keys) > 0 {
+ t.Fatal("Length of key list should be 0 after bucket deletion")
+ }
+
+ r, err := s.Read("", store.ReadPrefix(), store.ReadFrom(bucket, ""))
+ if err != nil && !errors.Is(err, ErrBucketNotFound) {
+ t.Fatalf("Failed to delete bucket: %v", err)
+ }
+ if len(r) > 0 {
+ t.Fatal("Length of record list should be 0 after bucket deletion", len(r))
+ }
+}
+
+func TestEnforceLimits(t *testing.T) {
+ s := []string{"a", "b", "c", "d"}
+ var testCasts = []struct {
+ Alias string
+ Offset uint
+ Limit uint
+ Expected []string
+ }{
+ {"plain", 0, 0, []string{"a", "b", "c", "d"}},
+ {"offset&limit-1", 1, 3, []string{"b", "c", "d"}},
+ {"offset&limit-2", 1, 1, []string{"b"}},
+ {"offset=length", 4, 0, []string{}},
+ {"offset>length", 222, 0, []string{}},
+ {"limit>length", 0, 36, []string{"a", "b", "c", "d"}},
+ }
+ for _, tc := range testCasts {
+ actual := enforceLimits(s, tc.Limit, tc.Offset)
+ if !reflect.DeepEqual(actual, tc.Expected) {
+ t.Fatalf("%s: Expected %v, got %v", tc.Alias, tc.Expected, actual)
+ }
+ }
+}
+
+func basicTest(t *testing.T, s store.Store) error {
+ t.Helper()
+ for _, test := range table {
+ if err := s.Write(test.Record, store.WriteTo(test.Database, test.Table)); err != nil {
+ return errors.Wrap(err, "Failed to write record in basic test")
+ }
+ r, err := s.Read(test.Record.Key, store.ReadFrom(test.Database, test.Table))
+ if err != nil {
+ return errors.Wrap(err, "Failed to read record in basic test")
+ }
+ if len(r) == 0 {
+ t.Fatalf("No results found for %s (%s) %s", test.Record.Key, test.Database, test.Table)
+ }
+
+ key := test.Record.Key
+ val1 := string(test.Record.Value)
+
+ key2 := r[0].Key
+ val2 := string(r[0].Value)
+ if val1 != val2 {
+ t.Fatalf("Value not equal for (%s: %s) != (%s: %s)", key, val1, key2, val2)
+ }
+ }
+ return nil
+}
diff --git a/v4/store/nats-js-kv/options.go b/v4/store/nats-js-kv/options.go
new file mode 100644
index 00000000..a9c9ca61
--- /dev/null
+++ b/v4/store/nats-js-kv/options.go
@@ -0,0 +1,83 @@
+package natsjskv
+
+import (
+ "time"
+
+ "github.com/nats-io/nats.go"
+ "go-micro.dev/v4/store"
+)
+
+// store.Option.
+type natsOptionsKey struct{}
+type jsOptionsKey struct{}
+type kvOptionsKey struct{}
+type ttlOptionsKey struct{}
+type memoryOptionsKey struct{}
+type descriptionOptionsKey struct{}
+type keyEncodeOptionsKey struct{}
+
+// NatsOptions accepts nats.Options.
+func NatsOptions(opts nats.Options) store.Option {
+ return setStoreOption(natsOptionsKey{}, opts)
+}
+
+// JetStreamOptions accepts multiple nats.JSOpt.
+func JetStreamOptions(opts ...nats.JSOpt) store.Option {
+ return setStoreOption(jsOptionsKey{}, opts)
+}
+
+// KeyValueOptions accepts multiple nats.KeyValueConfig
+// This will create buckets with the provided configs at initialization.
+func KeyValueOptions(cfg ...*nats.KeyValueConfig) store.Option {
+ return setStoreOption(kvOptionsKey{}, cfg)
+}
+
+// DefaultTTL sets the default TTL to use for new buckets
+//
+// By default no TTL is set.
+//
+// TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL.
+// Either set a default TTL with this option or provide bucket specific options
+//
+// with ObjectStoreOptions
+func DefaultTTL(ttl time.Duration) store.Option {
+ return setStoreOption(ttlOptionsKey{}, ttl)
+}
+
+// DefaultMemory sets the default storage type to memory only.
+//
+// The default is file storage, persisting storage between service restarts.
+//
+// Be aware that the default storage location of NATS the /tmp dir is, and thus
+//
+// won't persist reboots.
+func DefaultMemory() store.Option {
+ return setStoreOption(memoryOptionsKey{}, nats.MemoryStorage)
+}
+
+// DefaultDescription sets the default description to use when creating new
+//
+// buckets. The default is "Store managed by go-micro"
+func DefaultDescription(text string) store.Option {
+ return setStoreOption(descriptionOptionsKey{}, text)
+}
+
+// EncodeKeys will "base32" encode the keys.
+// This is to work around limited characters usable as keys for the natsjs kv store.
+// See details here: https://docs.nats.io/nats-concepts/subjects#characters-allowed-for-subject-names
+func EncodeKeys() store.Option {
+ return setStoreOption(keyEncodeOptionsKey{}, "base32")
+}
+
+// DeleteBucket will use the key passed to Delete as a bucket (database) name,
+//
+// and delete the bucket.
+//
+// This option should not be combined with the store.DeleteFrom option, as
+//
+// that will overwrite the delete action.
+func DeleteBucket() store.DeleteOption {
+ return func(d *store.DeleteOptions) {
+ d.Table = "DELETE_BUCKET"
+ }
+}
diff --git a/v4/store/nats-js-kv/test_data.go b/v4/store/nats-js-kv/test_data.go
new file mode 100644
index 00000000..8bdfb55c
--- /dev/null
+++ b/v4/store/nats-js-kv/test_data.go
@@ -0,0 +1,138 @@
+package natsjskv
+
+import "go-micro.dev/v4/store"
+
+type test struct {
+ Record *store.Record
+ Database string
+ Table string
+}
+
+var (
+ table = []test{
+ {
+ Record: &store.Record{
+ Key: "One",
+ Value: []byte("First value"),
+ },
+ },
+ {
+ Record: &store.Record{
+ Key: "Two",
+ Value: []byte("Second value"),
+ },
+ Table: "prefix_test",
+ },
+ {
+ Record: &store.Record{
+ Key: "Third",
+ Value: []byte("Third value"),
+ },
+ Database: "new-bucket",
+ },
+ {
+ Record: &store.Record{
+ Key: "Four",
+ Value: []byte("Fourth value"),
+ },
+ Database: "new-bucket",
+ Table: "prefix_test",
+ },
+ {
+ Record: &store.Record{
+ Key: "empty-value",
+ Value: []byte{},
+ },
+ Database: "new-bucket",
+ },
+ {
+ Record: &store.Record{
+ Key: "Alex",
+ Value: []byte("Some value"),
+ },
+ Database: "prefix-test",
+ Table: "names",
+ },
+ {
+ Record: &store.Record{
+ Key: "Jones",
+ Value: []byte("Some value"),
+ },
+ Database: "prefix-test",
+ Table: "names",
+ },
+ {
+ Record: &store.Record{
+ Key: "Adrianna",
+ Value: []byte("Some value"),
+ },
+ Database: "prefix-test",
+ Table: "names",
+ },
+ {
+ Record: &store.Record{
+ Key: "MexicoCity",
+ Value: []byte("Some value"),
+ },
+ Database: "prefix-test",
+ Table: "cities",
+ },
+ {
+ Record: &store.Record{
+ Key: "HoustonCity",
+ Value: []byte("Some value"),
+ },
+ Database: "prefix-test",
+ Table: "cities",
+ },
+ {
+ Record: &store.Record{
+ Key: "ZurichCity",
+ Value: []byte("Some value"),
+ },
+ Database: "prefix-test",
+ Table: "cities",
+ },
+ {
+ Record: &store.Record{
+ Key: "Helsinki",
+ Value: []byte("Some value"),
+ },
+ Database: "prefix-test",
+ Table: "cities",
+ },
+ {
+ Record: &store.Record{
+ Key: "testKeytest",
+ Value: []byte("Some value"),
+ },
+ Table: "some_table",
+ },
+ {
+ Record: &store.Record{
+ Key: "testSecondtest",
+ Value: []byte("Some value"),
+ },
+ Table: "some_table",
+ },
+ {
+ Record: &store.Record{
+ Key: "lalala",
+ Value: []byte("Some value"),
+ },
+ Table: "some_table",
+ },
+ {
+ Record: &store.Record{
+ Key: "testAnothertest",
+ Value: []byte("Some value"),
+ },
+ },
+ {
+ Record: &store.Record{
+ Key: "FobiddenCharactersAreAllowed:|@..+",
+ Value: []byte("data no matter"),
+ },
+ },
+ }
+)