diff --git a/pkg/cluster/resourcemanager/kvmanager.go b/pkg/cluster/resourcemanager/kvmanager.go index 1f39f149f..ae6ba2085 100644 --- a/pkg/cluster/resourcemanager/kvmanager.go +++ b/pkg/cluster/resourcemanager/kvmanager.go @@ -29,6 +29,7 @@ import ( "github.com/ortuman/jackal/pkg/cluster/instance" "github.com/ortuman/jackal/pkg/cluster/kv" kvtypes "github.com/ortuman/jackal/pkg/cluster/kv/types" + "github.com/ortuman/jackal/pkg/hook" c2smodel "github.com/ortuman/jackal/pkg/model/c2s" ) @@ -38,28 +39,107 @@ const ( kvResourceManagerType = "kv" ) +type kvResources struct { + mu sync.RWMutex + store map[string][]c2smodel.ResourceDesc +} + +func (r *kvResources) get(username, resource string) c2smodel.ResourceDesc { + r.mu.RLock() + defer r.mu.RUnlock() + + rss := r.store[username] + for _, res := range rss { + if res.JID().Resource() != resource { + continue + } + return res + } + return nil +} + +func (r *kvResources) getAll(username string) []c2smodel.ResourceDesc { + r.mu.RLock() + defer r.mu.RUnlock() + + rss := r.store[username] + if len(rss) == 0 { + return nil + } + retVal := make([]c2smodel.ResourceDesc, len(rss)) + for i, res := range rss { + retVal[i] = res + } + return retVal +} + +func (r *kvResources) put(res c2smodel.ResourceDesc) { + r.mu.Lock() + defer r.mu.Unlock() + + jd := res.JID() + + var username, resource = jd.Node(), jd.Resource() + var found bool + + rss := r.store[username] + for i := 0; i < len(rss); i++ { + if rss[i].JID().Resource() != resource { + continue + } + rss[i] = res + found = true + break + } + if !found { + rss = append(rss, res) + } + r.store[username] = rss + return +} + +func (r *kvResources) del(username, resource string) { + r.mu.Lock() + defer r.mu.Unlock() + + rss := r.store[username] + for i := 0; i < len(rss); i++ { + if rss[i].JID().Resource() != resource { + continue + } + rss = append(rss[:i], rss[i+1:]...) + if len(rss) > 0 { + r.store[username] = rss + } else { + delete(r.store, username) + } + return + } +} + type kvManager struct { kv kv.KV + hk *hook.Hooks logger kitlog.Logger ctx context.Context ctxCancel context.CancelFunc - storeMu sync.RWMutex - store map[string][]c2smodel.ResourceDesc + instResMu sync.RWMutex + instRes map[string]*kvResources - // active put key set stopCh chan struct{} } // NewKVManager creates a new resource manager given a KV storage instance. -func NewKVManager(kv kv.KV, logger kitlog.Logger) Manager { +func NewKVManager(kv kv.KV, hk *hook.Hooks, logger kitlog.Logger) Manager { ctx, ctxCancel := context.WithCancel(context.Background()) return &kvManager{ kv: kv, + hk: hk, logger: logger, ctx: ctx, ctxCancel: ctxCancel, - store: make(map[string][]c2smodel.ResourceDesc), + instRes: make(map[string]*kvResources), stopCh: make(chan struct{}), } } @@ -80,30 +160,24 @@ func (m *kvManager) PutResource(ctx context.Context, res c2smodel.ResourceDesc) } func (m *kvManager) GetResource(_ context.Context, username, resource string) (c2smodel.ResourceDesc, error) { - m.storeMu.RLock() - defer m.storeMu.RUnlock() + m.instResMu.RLock() + defer m.instResMu.RUnlock() - rss := m.store[username] - for _, res := range rss { - if res.JID().Resource() != resource { - continue + for _, kvr := range m.instRes { + if res := kvr.get(username, resource); res != nil { + return res, nil } - return res, nil } return nil, nil } func (m *kvManager) GetResources(_ context.Context, username string) ([]c2smodel.ResourceDesc, error) { - m.storeMu.RLock() - defer m.storeMu.RUnlock() + m.instResMu.RLock() + defer m.instResMu.RUnlock() - rss := m.store[username] - if len(rss) == 0 { - return nil, nil - } - retVal := make([]c2smodel.ResourceDesc, len(rss)) - for i, res := range rss { - retVal[i] = res + var retVal []c2smodel.ResourceDesc + for _, kvr := range m.instRes { + retVal = append(retVal, kvr.getAll(username)...) } return retVal, nil } @@ -119,6 +193,8 @@ func (m *kvManager) DelResource(ctx context.Context, username, resource string) } func (m *kvManager) Start(ctx context.Context) error { + m.hk.AddHook(hook.MemberListUpdated, m.onMemberListUpdated, hook.DefaultPriority) + if err := m.watchKVResources(ctx); err != nil { return err } @@ -127,6 +203,8 @@ func (m *kvManager) Start(ctx context.Context) error { } func (m *kvManager) Stop(_ context.Context) error { + m.hk.RemoveHook(hook.MemberListUpdated, m.onMemberListUpdated) + // stop watching changes... m.ctxCancel() <-m.stopCh @@ -135,6 +213,20 @@ func (m *kvManager) Stop(_ context.Context) error { return nil } +func (m *kvManager) onMemberListUpdated(_ context.Context, execCtx *hook.ExecutionContext) error { + inf := execCtx.Info.(*hook.MemberListInfo) + if len(inf.UnregisteredKeys) == 0 { + return nil + } + // drop unregistered instance(s) resources + m.instResMu.Lock() + for _, instanceID := range inf.UnregisteredKeys { + delete(m.instRes, instanceID) + } + m.instResMu.Unlock() + return nil +} + func (m *kvManager) watchKVResources(ctx context.Context) error { ch := make(chan error, 1) go func() { @@ -203,46 +295,31 @@ func (m *kvManager) processKVEvents(kvEvents []kvtypes.WatchEvent) error { } func (m *kvManager) inMemPut(res c2smodel.ResourceDesc) { - m.storeMu.Lock() - defer m.storeMu.Unlock() + m.instResMu.Lock() + defer m.instResMu.Unlock() - jd := res.JID() - - var username, resource = jd.Node(), jd.Resource() - var found bool + instID := res.InstanceID() - rss := m.store[username] - for i := 0; i < len(rss); i++ { - if rss[i].JID().Resource() != resource { - continue + kvr := m.instRes[instID] + if kvr == nil { + kvr = &kvResources{ + store: make(map[string][]c2smodel.ResourceDesc), } - rss[i] = res - found = true - break - } - if !found { - rss = append(rss, res) + m.instRes[instID] = kvr } - m.store[username] = rss + kvr.put(res) return } func (m *kvManager) inMemDel(username, resource string) { - m.storeMu.Lock() - defer m.storeMu.Unlock() + m.instResMu.RLock() + defer m.instResMu.RUnlock() - rss := m.store[username] - for i := 0; i < len(rss); i++ { - if rss[i].JID().Resource() != resource { - continue - } - rss = append(rss[:i], rss[i+1:]...) - if len(rss) > 0 { - m.store[username] = rss - } else { - delete(m.store, username) + for _, kvr := range m.instRes { + if kvr.get(username, resource) != nil { + kvr.del(username, resource) + return } - return } } diff --git a/pkg/cluster/resourcemanager/kvmanager_test.go b/pkg/cluster/resourcemanager/kvmanager_test.go index f9f44c556..02bff8ce9 100644 --- a/pkg/cluster/resourcemanager/kvmanager_test.go +++ b/pkg/cluster/resourcemanager/kvmanager_test.go @@ -25,6 +25,7 @@ import ( "github.com/jackal-xmpp/stravaganza" "github.com/jackal-xmpp/stravaganza/jid" "github.com/ortuman/jackal/pkg/cluster/instance" + "github.com/ortuman/jackal/pkg/hook" c2smodel "github.com/ortuman/jackal/pkg/model/c2s" "github.com/stretchr/testify/require" ) @@ -35,7 +36,7 @@ func TestResourceManager_SetResource(t *testing.T) { kvmock := &kvMock{} - h := NewKVManager(kvmock, kitlog.NewNopLogger()) + h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger()) kvmock.PutFunc = func(ctx context.Context, key string, value string) error { r, _ := decodeResource(key, []byte(value)) r1 = r @@ -60,7 +61,7 @@ func TestResourceManager_GetResource(t *testing.T) { kvmock := &kvMock{} kvmock.PutFunc = func(ctx context.Context, key string, value string) error { return nil } - h := NewKVManager(kvmock, kitlog.NewNopLogger()) + h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger()) // when r0 := testResource("megaman-2", 10, "ortuman", "yard") @@ -85,7 +86,7 @@ func TestResourceManager_GetResources(t *testing.T) { kvmock := &kvMock{} kvmock.PutFunc = func(ctx context.Context, key string, value string) error { return nil } - h := NewKVManager(kvmock, kitlog.NewNopLogger()) + h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger()) r0 := testResource("abc1234", 100, "ortuman", "yard") r1 := testResource("bcd1234", 50, "ortuman", "balcony") @@ -108,7 +109,7 @@ func TestResourceManager_DelResource(t *testing.T) { kvmock := &kvMock{} kvmock.PutFunc = func(ctx context.Context, key string, value string) error { return nil } - h := NewKVManager(kvmock, kitlog.NewNopLogger()) + h := NewKVManager(kvmock, hook.NewHooks(), kitlog.NewNopLogger()) r0 := testResource("megaman-2", 10, "ortuman", "yard") _ = h.PutResource(context.Background(), r0) diff --git a/pkg/jackal/jackal.go b/pkg/jackal/jackal.go index 39880df0e..1e1eb0e3c 100644 --- a/pkg/jackal/jackal.go +++ b/pkg/jackal/jackal.go @@ -275,7 +275,7 @@ func (j *Jackal) initCluster(cfg ClusterConfig) error { case noneClusterType: j.memberList = memberlist.NewKVMemberList(cfg.Server.Port, j.kv, j.hk, j.logger) - j.resMng = resourcemanager.NewKVManager(j.kv, j.logger) + j.resMng = resourcemanager.NewKVManager(j.kv, j.hk, j.logger) default: return fmt.Errorf("unrecognized cluster type: %s", cfg.Type) @@ -284,8 +284,8 @@ func (j *Jackal) initCluster(cfg ClusterConfig) error { j.clusterConnMng = clusterconnmanager.NewManager(j.hk, j.logger) j.registerStartStopper(j.clusterConnMng) - j.registerStartStopper(j.memberList) j.registerStartStopper(j.resMng) + j.registerStartStopper(j.memberList) return nil }