Skip to content

Commit

Permalink
feat(store/natsjskv): make encoding of keys configurable
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Dec 12, 2023
1 parent 4d424e3 commit e4df5ed
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 24 deletions.
2 changes: 1 addition & 1 deletion v4/store/nats-js-kv/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func testSetup(ctx context.Context, t *testing.T, opts ...store.Option) store.St
nCtx, cancel := context.WithCancel(ctx)
addr := startNatsServer(nCtx, t)

opts = append(opts, store.Nodes(addr))
opts = append(opts, store.Nodes(addr), EncodeKeys())
s = NewStore(opts...)

err = s.Init()
Expand Down
44 changes: 27 additions & 17 deletions v4/store/nats-js-kv/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ import (
)

// NatsKey is a convenience function to create a key for the nats kv store.
func NatsKey(table, microkey string) string {
return NewKey(table, microkey, "").NatsKey()
func (n *natsStore) NatsKey(table, microkey string) string {
return n.NewKey(table, microkey, "").NatsKey()
}

// MicroKey is a convenience function to create a key for the micro interface.
func MicroKey(table, natskey string) string {
return NewKey(table, "", natskey).MicroKey()
func (n *natsStore) MicroKey(table, natskey string) string {
return n.NewKey(table, "", natskey).MicroKey()
}

// MicroKeyFilter is a convenience function to create a key for the micro interface.
// It returns false if the key does not match the table, prefix or suffix.
func MicroKeyFilter(table, natskey string, prefix, suffix string) (string, bool) {
k := NewKey(table, "", natskey)
func (n *natsStore) MicroKeyFilter(table, natskey string, prefix, suffix string) (string, bool) {
k := n.NewKey(table, "", natskey)
return k.MicroKey(), k.Check(table, prefix, suffix)
}

// Key represents a key in the store.
// They are used to convert nats keys (base64 encoded) to micro keys (plain text - no table prefix) and vice versa.
// They are used to convert nats keys (base32 encoded) to micro keys (plain text - no table prefix) and vice versa.
type Key struct {
// Plain is the plain key as requested by the go-micro interface.
Plain string
Expand All @@ -34,7 +34,7 @@ type Key struct {
}

// NewKey creates a new key. Either plain or encoded must be set.
func NewKey(table string, plain, encoded string) *Key {
func (n *natsStore) NewKey(table string, plain, encoded string) *Key {
k := &Key{
Plain: plain,
Encoded: encoded,
Expand All @@ -43,9 +43,9 @@ func NewKey(table string, plain, encoded string) *Key {
switch {
case k.Plain != "":
k.Full = getKey(k.Plain, table)
k.Encoded = encode(k.Full)
k.Encoded = encode(k.Full, n.encoding)
case k.Encoded != "":
k.Full = decode(k.Encoded)
k.Full = decode(k.Encoded, n.encoding)
k.Plain = trimKey(k.Full, table)
}

Expand Down Expand Up @@ -79,17 +79,27 @@ func (k *Key) Check(table, prefix, suffix string) bool {
return true
}

func encode(s string) string {
return base32.StdEncoding.EncodeToString([]byte(s))
func encode(s string, alg string) string {
switch alg {
case "base32":
return base32.StdEncoding.EncodeToString([]byte(s))
default:
return s
}
}

func decode(s string) string {
b, err := base32.StdEncoding.DecodeString(s)
if err != nil {
func decode(s string, alg string) string {
switch alg {
case "base32":
b, err := base32.StdEncoding.DecodeString(s)
if err != nil {
return s
}

return string(b)
default:
return s
}

return string(b)
}

func getKey(key, table string) string {
Expand Down
15 changes: 10 additions & 5 deletions v4/store/nats-js-kv/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type natsStore struct {
sync.Once
sync.RWMutex

encoding string
ttl time.Duration
storageType nats.StorageType
description string
Expand Down Expand Up @@ -143,6 +144,10 @@ func (n *natsStore) setOption(opts ...store.Option) {
n.description = text
}

if encoding, ok := n.opts.Context.Value(keyEncodeOptionsKey{}).(string); ok {
n.encoding = encoding
}

// Assign store option server addresses to nats options
if len(n.opts.Nodes) > 0 {
n.nopts.Url = ""
Expand Down Expand Up @@ -238,8 +243,8 @@ func (n *natsStore) Write(rec *store.Record, opts ...store.WriteOption) error {
return errors.Wrap(err, "Failed to marshal object")
}

if _, err := store.Put(NatsKey(opt.Table, rec.Key), b); err != nil {
return errors.Wrapf(err, "Failed to store data in bucket '%s'", NatsKey(opt.Table, rec.Key))
if _, err := store.Put(n.NatsKey(opt.Table, rec.Key), b); err != nil {
return errors.Wrapf(err, "Failed to store data in bucket '%s'", n.NatsKey(opt.Table, rec.Key))
}

return nil
Expand Down Expand Up @@ -280,7 +285,7 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
return ErrBucketNotFound
}

if err := store.Delete(NatsKey(opt.Table, key)); err != nil {
if err := store.Delete(n.NatsKey(opt.Table, key)); err != nil {
return errors.Wrap(err, "Failed to delete data")
}

Expand Down Expand Up @@ -415,7 +420,7 @@ func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record,

func (n *natsStore) natsKeys(bucket nats.KeyValue, table, key string, prefix, suffix bool) ([]string, error) {
if !suffix && !prefix {
return []string{NatsKey(table, key)}, nil
return []string{n.NatsKey(table, key)}, nil
}

toS := func(s string, b bool) string {
Expand Down Expand Up @@ -449,7 +454,7 @@ func (n *natsStore) getKeys(bucket nats.KeyValue, table string, prefix, suffix s
microKeys := make([]string, 0, len(names))

for _, k := range names {
mkey, ok := MicroKeyFilter(table, k, prefix, suffix)
mkey, ok := n.MicroKeyFilter(table, k, prefix, suffix)
if !ok {
continue
}
Expand Down
5 changes: 4 additions & 1 deletion v4/store/nats-js-kv/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestNats(t *testing.T) {
for i := 0; i < 5; i++ {
ctx, cancel = context.WithCancel(context.Background())
addr := startNatsServer(ctx, t)
s := NewStore(store.Nodes(addr))
s := NewStore(store.Nodes(addr), EncodeKeys())

// Test String method
t.Log("Testing:", s.String())
Expand Down Expand Up @@ -76,6 +76,9 @@ func TestOptions(t *testing.T) {
Storage: nats.MemoryStorage,
Replicas: 1,
}),

// Encode keys to avoid character limitations
EncodeKeys(),
)
defer cancel()

Expand Down
8 changes: 8 additions & 0 deletions v4/store/nats-js-kv/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type kvOptionsKey struct{}
type ttlOptionsKey struct{}
type memoryOptionsKey struct{}
type descriptionOptionsKey struct{}
type keyEncodeOptionsKey struct{}

// NatsOptions accepts nats.Options.
func NatsOptions(opts nats.Options) store.Option {
Expand Down Expand Up @@ -61,6 +62,13 @@ func DefaultDescription(text string) store.Option {
return setStoreOption(descriptionOptionsKey{}, text)
}

// EncodeKeys will "base32" encode the keys.
// This is to work around limited characters usable as keys for the natsjs kv store.
// See details here: https://docs.nats.io/nats-concepts/subjects#characters-allowed-for-subject-names
func EncodeKeys() store.Option {
return setStoreOption(keyEncodeOptionsKey{}, "base32")
}

// DeleteBucket will use the key passed to Delete as a bucket (database) name,
//
// and delete the bucket.
Expand Down

0 comments on commit e4df5ed

Please sign in to comment.