Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
cluster: drop all instance resources on memberlist unregister (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman authored May 2, 2022
1 parent 1c3b383 commit f665e2f
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 57 deletions.
179 changes: 128 additions & 51 deletions pkg/cluster/resourcemanager/kvmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ortuman/jackal/pkg/cluster/instance"
"github.com/ortuman/jackal/pkg/cluster/kv"
kvtypes "github.com/ortuman/jackal/pkg/cluster/kv/types"
"github.com/ortuman/jackal/pkg/hook"
c2smodel "github.com/ortuman/jackal/pkg/model/c2s"
)

Expand All @@ -38,28 +39,107 @@ const (
kvResourceManagerType = "kv"
)

type kvResources struct {
mu sync.RWMutex
store map[string][]c2smodel.ResourceDesc
}

func (r *kvResources) get(username, resource string) c2smodel.ResourceDesc {
r.mu.RLock()
defer r.mu.RUnlock()

rss := r.store[username]
for _, res := range rss {
if res.JID().Resource() != resource {
continue
}
return res
}
return nil
}

func (r *kvResources) getAll(username string) []c2smodel.ResourceDesc {
r.mu.RLock()
defer r.mu.RUnlock()

rss := r.store[username]
if len(rss) == 0 {
return nil
}
retVal := make([]c2smodel.ResourceDesc, len(rss))
for i, res := range rss {
retVal[i] = res
}
return retVal
}

func (r *kvResources) put(res c2smodel.ResourceDesc) {
r.mu.Lock()
defer r.mu.Unlock()

jd := res.JID()

var username, resource = jd.Node(), jd.Resource()
var found bool

rss := r.store[username]
for i := 0; i < len(rss); i++ {
if rss[i].JID().Resource() != resource {
continue
}
rss[i] = res
found = true
break
}
if !found {
rss = append(rss, res)
}
r.store[username] = rss
return
}

func (r *kvResources) del(username, resource string) {
r.mu.Lock()
defer r.mu.Unlock()

rss := r.store[username]
for i := 0; i < len(rss); i++ {
if rss[i].JID().Resource() != resource {
continue
}
rss = append(rss[:i], rss[i+1:]...)
if len(rss) > 0 {
r.store[username] = rss
} else {
delete(r.store, username)
}
return
}
}

type kvManager struct {
kv kv.KV
hk *hook.Hooks
logger kitlog.Logger
ctx context.Context
ctxCancel context.CancelFunc

storeMu sync.RWMutex
store map[string][]c2smodel.ResourceDesc
instResMu sync.RWMutex
instRes map[string]*kvResources

// active put key set
stopCh chan struct{}
}

// NewKVManager creates a new resource manager given a KV storage instance.
func NewKVManager(kv kv.KV, logger kitlog.Logger) Manager {
func NewKVManager(kv kv.KV, hk *hook.Hooks, logger kitlog.Logger) Manager {
ctx, ctxCancel := context.WithCancel(context.Background())
return &kvManager{
kv: kv,
hk: hk,
logger: logger,
ctx: ctx,
ctxCancel: ctxCancel,
store: make(map[string][]c2smodel.ResourceDesc),
instRes: make(map[string]*kvResources),
stopCh: make(chan struct{}),
}
}
Expand All @@ -80,30 +160,24 @@ func (m *kvManager) PutResource(ctx context.Context, res c2smodel.ResourceDesc)
}

func (m *kvManager) GetResource(_ context.Context, username, resource string) (c2smodel.ResourceDesc, error) {
m.storeMu.RLock()
defer m.storeMu.RUnlock()
m.instResMu.RLock()
defer m.instResMu.RUnlock()

rss := m.store[username]
for _, res := range rss {
if res.JID().Resource() != resource {
continue
for _, kvr := range m.instRes {
if res := kvr.get(username, resource); res != nil {
return res, nil
}
return res, nil
}
return nil, nil
}

func (m *kvManager) GetResources(_ context.Context, username string) ([]c2smodel.ResourceDesc, error) {
m.storeMu.RLock()
defer m.storeMu.RUnlock()
m.instResMu.RLock()
defer m.instResMu.RUnlock()

rss := m.store[username]
if len(rss) == 0 {
return nil, nil
}
retVal := make([]c2smodel.ResourceDesc, len(rss))
for i, res := range rss {
retVal[i] = res
var retVal []c2smodel.ResourceDesc
for _, kvr := range m.instRes {
retVal = append(retVal, kvr.getAll(username)...)
}
return retVal, nil
}
Expand All @@ -119,6 +193,8 @@ func (m *kvManager) DelResource(ctx context.Context, username, resource string)
}

func (m *kvManager) Start(ctx context.Context) error {
m.hk.AddHook(hook.MemberListUpdated, m.onMemberListUpdated, hook.DefaultPriority)

if err := m.watchKVResources(ctx); err != nil {
return err
}
Expand All @@ -127,6 +203,8 @@ func (m *kvManager) Start(ctx context.Context) error {
}

func (m *kvManager) Stop(_ context.Context) error {
m.hk.RemoveHook(hook.MemberListUpdated, m.onMemberListUpdated)

// stop watching changes...
m.ctxCancel()
<-m.stopCh
Expand All @@ -135,6 +213,20 @@ func (m *kvManager) Stop(_ context.Context) error {
return nil
}

func (m *kvManager) onMemberListUpdated(_ context.Context, execCtx *hook.ExecutionContext) error {
inf := execCtx.Info.(*hook.MemberListInfo)
if len(inf.UnregisteredKeys) == 0 {
return nil
}
// drop unregistered instance(s) resources
m.instResMu.Lock()
for _, instanceID := range inf.UnregisteredKeys {
delete(m.instRes, instanceID)
}
m.instResMu.Unlock()
return nil
}

func (m *kvManager) watchKVResources(ctx context.Context) error {
ch := make(chan error, 1)
go func() {
Expand Down Expand Up @@ -203,46 +295,31 @@ func (m *kvManager) processKVEvents(kvEvents []kvtypes.WatchEvent) error {
}

func (m *kvManager) inMemPut(res c2smodel.ResourceDesc) {
m.storeMu.Lock()
defer m.storeMu.Unlock()
m.instResMu.Lock()
defer m.instResMu.Unlock()

jd := res.JID()

var username, resource = jd.Node(), jd.Resource()
var found bool
instID := res.InstanceID()

rss := m.store[username]
for i := 0; i < len(rss); i++ {
if rss[i].JID().Resource() != resource {
continue
kvr := m.instRes[instID]
if kvr == nil {
kvr = &kvResources{
store: make(map[string][]c2smodel.ResourceDesc),
}
rss[i] = res
found = true
break
}
if !found {
rss = append(rss, res)
m.instRes[instID] = kvr
}
m.store[username] = rss
kvr.put(res)
return
}

func (m *kvManager) inMemDel(username, resource string) {
m.storeMu.Lock()
defer m.storeMu.Unlock()
m.instResMu.RLock()
defer m.instResMu.RUnlock()

rss := m.store[username]
for i := 0; i < len(rss); i++ {
if rss[i].JID().Resource() != resource {
continue
}
rss = append(rss[:i], rss[i+1:]...)
if len(rss) > 0 {
m.store[username] = rss
} else {
delete(m.store, username)
for _, kvr := range m.instRes {
if kvr.get(username, resource) != nil {
kvr.del(username, resource)
return
}
return
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/cluster/resourcemanager/kvmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jackal-xmpp/stravaganza"
"github.com/jackal-xmpp/stravaganza/jid"
"github.com/ortuman/jackal/pkg/cluster/instance"
"github.com/ortuman/jackal/pkg/hook"
c2smodel "github.com/ortuman/jackal/pkg/model/c2s"
"github.com/stretchr/testify/require"
)
Expand All @@ -35,7 +36,7 @@ func TestResourceManager_SetResource(t *testing.T) {

kvmock := &kvMock{}

h := NewKVManager(kvmock, kitlog.NewNopLogger())
h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger())
kvmock.PutFunc = func(ctx context.Context, key string, value string) error {
r, _ := decodeResource(key, []byte(value))
r1 = r
Expand All @@ -60,7 +61,7 @@ func TestResourceManager_GetResource(t *testing.T) {
kvmock := &kvMock{}
kvmock.PutFunc = func(ctx context.Context, key string, value string) error { return nil }

h := NewKVManager(kvmock, kitlog.NewNopLogger())
h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger())

// when
r0 := testResource("megaman-2", 10, "ortuman", "yard")
Expand All @@ -85,7 +86,7 @@ func TestResourceManager_GetResources(t *testing.T) {
kvmock := &kvMock{}
kvmock.PutFunc = func(ctx context.Context, key string, value string) error { return nil }

h := NewKVManager(kvmock, kitlog.NewNopLogger())
h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger())

r0 := testResource("abc1234", 100, "ortuman", "yard")
r1 := testResource("bcd1234", 50, "ortuman", "balcony")
Expand All @@ -108,7 +109,7 @@ func TestResourceManager_DelResource(t *testing.T) {
kvmock := &kvMock{}
kvmock.PutFunc = func(ctx context.Context, key string, value string) error { return nil }

h := NewKVManager(kvmock, kitlog.NewNopLogger())
h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger())

r0 := testResource("megaman-2", 10, "ortuman", "yard")
_ = h.PutResource(context.Background(), r0)
Expand Down
4 changes: 2 additions & 2 deletions pkg/jackal/jackal.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (j *Jackal) initCluster(cfg ClusterConfig) error {

case noneClusterType:
j.memberList = memberlist.NewKVMemberList(cfg.Server.Port, j.kv, j.hk, j.logger)
j.resMng = resourcemanager.NewKVManager(j.kv, j.logger)
j.resMng = resourcemanager.NewKVManager(j.kv, j.hk, j.logger)

default:
return fmt.Errorf("unrecognized cluster type: %s", cfg.Type)
Expand All @@ -284,8 +284,8 @@ func (j *Jackal) initCluster(cfg ClusterConfig) error {
j.clusterConnMng = clusterconnmanager.NewManager(j.hk, j.logger)

j.registerStartStopper(j.clusterConnMng)
j.registerStartStopper(j.memberList)
j.registerStartStopper(j.resMng)
j.registerStartStopper(j.memberList)
return nil
}

Expand Down

0 comments on commit f665e2f

Please sign in to comment.