Skip to content

Commit

Permalink
Merge pull request #8880 from kobergj/NatsReconnects
Browse files Browse the repository at this point in the history
Fix nats reconnects
  • Loading branch information
kobergj authored Apr 18, 2024
2 parents 2f32fa3 + 7db6300 commit 6c3e730
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/nats-reconnects.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Nats reconnects

We fixed the reconnect handling of the natjs kv registry.

https://github.com/owncloud/ocis/pull/8880
38 changes: 34 additions & 4 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
Expand Down Expand Up @@ -36,27 +38,33 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
o(&options)
}
exp, _ := options.Context.Value(expiryKey{}).(time.Duration)
return &storeregistry{
n := &storeregistry{
opts: options,
store: natsjskv.NewStore(storeOptions(options)...),
typ: _registryName,
expiry: exp,
}
n.store = natsjskv.NewStore(n.storeOptions(options)...)
return n
}

type storeregistry struct {
opts registry.Options
store store.Store
typ string
expiry time.Duration
lock sync.RWMutex
}

// Init inits the registry
func (n *storeregistry) Init(opts ...registry.Option) error {
n.lock.Lock()
defer n.lock.Unlock()

for _, o := range opts {
o(&n.opts)
}
return n.store.Init(storeOptions(n.opts)...)
n.store = natsjskv.NewStore(n.storeOptions(n.opts)...)
return n.store.Init(n.storeOptions(n.opts)...)
}

// Options returns the configured options
Expand All @@ -66,6 +74,9 @@ func (n *storeregistry) Options() registry.Options {

// Register adds a service to the registry
func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

if s == nil {
return errors.New("wont store nil service")
}
Expand All @@ -82,11 +93,17 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti

// Deregister removes a service from the registry
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

return n.store.Delete(s.Name)
}

// GetService gets a specific service from the registry
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

recs, err := n.store.Read(s)
if err != nil {
return nil, err
Expand All @@ -104,6 +121,9 @@ func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*regist

// ListServices lists all registered services
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

keys, err := n.store.List()
if err != nil {
return nil, err
Expand Down Expand Up @@ -132,7 +152,7 @@ func (n *storeregistry) String() string {
return n.typ
}

func storeOptions(opts registry.Options) []store.Option {
func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
storeoptions := []store.Option{
store.Database("service-registry"),
store.Table("service-registry"),
Expand All @@ -150,6 +170,16 @@ func storeOptions(opts registry.Options) []store.Option {
natsOptions := nats.GetDefaultOptions()
natsOptions.Name = "nats-js-kv-registry"
natsOptions.User, natsOptions.Password = getAuth()
natsOptions.ReconnectedCB = func(_ *nats.Conn) {
if err := n.Init(); err != nil {
fmt.Println("cannot reconnect to nats")
os.Exit(1)
}
}
natsOptions.ClosedCB = func(_ *nats.Conn) {
fmt.Println("nats connection closed")
os.Exit(1)
}
storeoptions = append(storeoptions, natsjskv.NatsOptions(natsOptions))

if so, ok := opts.Context.Value(storeOptionsKey{}).([]store.Option); ok {
Expand Down

0 comments on commit 6c3e730

Please sign in to comment.