From eb5035cc19e3a142f96401f63103463ca300bb7c Mon Sep 17 00:00:00 2001 From: jkoberg Date: Wed, 17 Apr 2024 15:44:28 +0200 Subject: [PATCH 1/2] fix(natsjsregistry): fix reconnects Signed-off-by: jkoberg --- changelog/unreleased/nats-reconnects.md | 5 ++++ ocis-pkg/natsjsregistry/registry.go | 38 ++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 4 deletions(-) create mode 100644 changelog/unreleased/nats-reconnects.md diff --git a/changelog/unreleased/nats-reconnects.md b/changelog/unreleased/nats-reconnects.md new file mode 100644 index 00000000000..05bd0b5c197 --- /dev/null +++ b/changelog/unreleased/nats-reconnects.md @@ -0,0 +1,5 @@ +Bugfix: Nats reconnects + +Natsjs kv registry could not handle reconnects correctly. This fixes it. + +https://github.com/owncloud/ocis/pull/8880 diff --git a/ocis-pkg/natsjsregistry/registry.go b/ocis-pkg/natsjsregistry/registry.go index feeac744b9f..789b49607d4 100644 --- a/ocis-pkg/natsjsregistry/registry.go +++ b/ocis-pkg/natsjsregistry/registry.go @@ -5,8 +5,10 @@ import ( "context" "encoding/json" "errors" + "fmt" "os" "strings" + "sync" "time" natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv" @@ -36,12 +38,13 @@ func NewRegistry(opts ...registry.Option) registry.Registry { o(&options) } exp, _ := options.Context.Value(expiryKey{}).(time.Duration) - return &storeregistry{ + n := &storeregistry{ opts: options, - store: natsjskv.NewStore(storeOptions(options)...), typ: _registryName, expiry: exp, } + n.store = natsjskv.NewStore(n.storeOptions(options)...) + return n } type storeregistry struct { @@ -49,14 +52,19 @@ type storeregistry struct { store store.Store typ string expiry time.Duration + lock sync.RWMutex } // Init inits the registry func (n *storeregistry) Init(opts ...registry.Option) error { + n.lock.Lock() + defer n.lock.Unlock() + for _, o := range opts { o(&n.opts) } - return n.store.Init(storeOptions(n.opts)...) + n.store = natsjskv.NewStore(n.storeOptions(n.opts)...) + return n.store.Init(n.storeOptions(n.opts)...) } // Options returns the configured options @@ -66,6 +74,9 @@ func (n *storeregistry) Options() registry.Options { // Register adds a service to the registry func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOption) error { + n.lock.RLock() + defer n.lock.RUnlock() + if s == nil { return errors.New("wont store nil service") } @@ -82,11 +93,17 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti // Deregister removes a service from the registry func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error { + n.lock.RLock() + defer n.lock.RUnlock() + return n.store.Delete(s.Name) } // GetService gets a specific service from the registry func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) { + n.lock.RLock() + defer n.lock.RUnlock() + recs, err := n.store.Read(s) if err != nil { return nil, err @@ -104,6 +121,9 @@ func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*regist // ListServices lists all registered services func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) { + n.lock.RLock() + defer n.lock.RUnlock() + keys, err := n.store.List() if err != nil { return nil, err @@ -132,7 +152,7 @@ func (n *storeregistry) String() string { return n.typ } -func storeOptions(opts registry.Options) []store.Option { +func (n *storeregistry) storeOptions(opts registry.Options) []store.Option { storeoptions := []store.Option{ store.Database("service-registry"), store.Table("service-registry"), @@ -150,6 +170,16 @@ func storeOptions(opts registry.Options) []store.Option { natsOptions := nats.GetDefaultOptions() natsOptions.Name = "nats-js-kv-registry" natsOptions.User, natsOptions.Password = getAuth() + natsOptions.ReconnectedCB = func(_ *nats.Conn) { + if err := n.Init(); err != nil { + fmt.Println("cannot reconnect to nats") + os.Exit(1) + } + } + natsOptions.ClosedCB = func(_ *nats.Conn) { + fmt.Println("nats connection closed") + os.Exit(1) + } storeoptions = append(storeoptions, natsjskv.NatsOptions(natsOptions)) if so, ok := opts.Context.Value(storeOptionsKey{}).([]store.Option); ok { From 7db63005c45c07191b3c1177bbbe4db9ad9ae7d4 Mon Sep 17 00:00:00 2001 From: kobergj Date: Thu, 18 Apr 2024 13:05:53 +0200 Subject: [PATCH 2/2] fix(changelog): improve wording Co-authored-by: Ralf Haferkamp Signed-off-by: jkoberg --- changelog/unreleased/nats-reconnects.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/unreleased/nats-reconnects.md b/changelog/unreleased/nats-reconnects.md index 05bd0b5c197..e7e526451e0 100644 --- a/changelog/unreleased/nats-reconnects.md +++ b/changelog/unreleased/nats-reconnects.md @@ -1,5 +1,5 @@ Bugfix: Nats reconnects -Natsjs kv registry could not handle reconnects correctly. This fixes it. +We fixed the reconnect handling of the natjs kv registry. https://github.com/owncloud/ocis/pull/8880