Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
cluster: stream management service (#236)
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman authored May 31, 2022
1 parent efb887a commit 972c707
Show file tree
Hide file tree
Showing 26 changed files with 1,027 additions and 227 deletions.
5 changes: 2 additions & 3 deletions pkg/admin/pb/users.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pkg/c2s/pb/resourceinfo.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 20 additions & 15 deletions pkg/cluster/connmanager/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ var stmErrReasonMap = map[streamerror.Reason]clusterpb.StreamErrorReason{
var dialFn = dialContext

type clusterConn struct {
target string
ver *version.SemanticVersion
cc io.Closer
lr LocalRouter
cr ComponentRouter
target string
ver *version.SemanticVersion
cc io.Closer
lcRouter LocalRouter
compRouter ComponentRouter
stmMgmt StreamManagement
}

func newConn(addr string, port int, ver *version.SemanticVersion) *clusterConn {
Expand All @@ -63,20 +64,22 @@ func newConn(addr string, port int, ver *version.SemanticVersion) *clusterConn {
}
}

func (c *clusterConn) LocalRouter() LocalRouter { return c.lr }
func (c *clusterConn) ComponentRouter() ComponentRouter { return c.cr }
func (c *clusterConn) LocalRouter() LocalRouter { return c.lcRouter }
func (c *clusterConn) ComponentRouter() ComponentRouter { return c.compRouter }
func (c *clusterConn) StreamManagement() StreamManagement { return c.stmMgmt }

func (c *clusterConn) clusterAPIVer() *version.SemanticVersion {
return c.ver
}

func (c *clusterConn) dialContext(ctx context.Context) error {
lr, cr, cc, err := dialFn(ctx, c.target)
lcRouter, compRouter, stmMgmt, cc, err := dialFn(ctx, c.target)
if err != nil {
return err
}
c.lr = lr
c.cr = cr
c.lcRouter = lcRouter
c.compRouter = compRouter
c.stmMgmt = stmMgmt
c.cc = cc
return nil
}
Expand All @@ -97,7 +100,7 @@ func toProtoStreamError(sErr *streamerror.Error) *clusterpb.StreamError {
return pse
}

func dialContext(ctx context.Context, target string) (lr LocalRouter, cr ComponentRouter, cc io.Closer, err error) {
func dialContext(ctx context.Context, target string) (lcRouter LocalRouter, compRouter ComponentRouter, stmMgmt StreamManagement, cc io.Closer, err error) {
grpcConn, err := grpc.DialContext(ctx,
target,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Expand All @@ -109,9 +112,11 @@ func dialContext(ctx context.Context, target string) (lr LocalRouter, cr Compone
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
lr = &localRouter{cl: clusterpb.NewLocalRouterClient(grpcConn)}
cr = &componentRouter{cl: clusterpb.NewComponentRouterClient(grpcConn)}
return lr, cr, grpcConn, nil
lcRouter = &localRouter{cl: clusterpb.NewLocalRouterClient(grpcConn)}
compRouter = &componentRouter{cl: clusterpb.NewComponentRouterClient(grpcConn)}
stmMgmt = &streamManagement{cl: clusterpb.NewStreamManagementClient(grpcConn)}

return lcRouter, compRouter, stmMgmt, grpcConn, nil
}
1 change: 1 addition & 0 deletions pkg/cluster/connmanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

//go:generate moq -out localrouter.mock_test.go . LocalRouter:localRouterMock
//go:generate moq -out componentrouter.mock_test.go . ComponentRouter:componentRouterMock
//go:generate moq -out streammanagement.mock_test.go . StreamManagement:streamManagementMock

//go:generate moq -out grpcconn.mock_test.go . grpcConn
type grpcConn interface {
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/connmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
type Conn interface {
LocalRouter() LocalRouter
ComponentRouter() ComponentRouter
StreamManagement() StreamManagement
}

// Manager is the cluster connection manager.
Expand Down
22 changes: 11 additions & 11 deletions pkg/cluster/connmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ import (
"testing"

kitlog "github.com/go-kit/log"

clustermodel "github.com/ortuman/jackal/pkg/model/cluster"

"github.com/ortuman/jackal/pkg/hook"
clustermodel "github.com/ortuman/jackal/pkg/model/cluster"
"github.com/ortuman/jackal/pkg/version"
"github.com/stretchr/testify/require"
)

func TestConnections_UpdateMembers(t *testing.T) {
// given
lrMock := &localRouterMock{}
crMock := &componentRouterMock{}
lcRouterMock := &localRouterMock{}
compRouterMock := &componentRouterMock{}
stmMgmtMock := &streamManagementMock{}

ccMock := &grpcConnMock{}
ccMock.CloseFunc = func() error { return nil }

dialFn = func(ctx context.Context, target string) (lr LocalRouter, cr ComponentRouter, cc io.Closer, err error) {
return lrMock, crMock, ccMock, nil
dialFn = func(ctx context.Context, target string) (LocalRouter, ComponentRouter, StreamManagement, io.Closer, error) {
return lcRouterMock, compRouterMock, stmMgmtMock, ccMock, nil
}
hk := hook.NewHooks()
connMng := NewManager(hk, kitlog.NewNopLogger())
Expand Down Expand Up @@ -80,12 +79,13 @@ func TestConnections_UpdateMembers(t *testing.T) {

func TestConnections_IncompatibleClusterAPI(t *testing.T) {
// given
lrMock := &localRouterMock{}
crMock := &componentRouterMock{}
localRouterMock := &localRouterMock{}
compRouterMock := &componentRouterMock{}
stmMgmtMock := &streamManagementMock{}
ccMock := &grpcConnMock{}

dialFn = func(ctx context.Context, target string) (lr LocalRouter, cr ComponentRouter, cc io.Closer, err error) {
return lrMock, crMock, ccMock, nil
dialFn = func(ctx context.Context, target string) (LocalRouter, ComponentRouter, StreamManagement, io.Closer, error) {
return localRouterMock, compRouterMock, stmMgmtMock, ccMock, nil
}
hk := hook.NewHooks()
connMng := NewManager(hk, kitlog.NewNopLogger())
Expand Down
78 changes: 78 additions & 0 deletions pkg/cluster/connmanager/streammanagement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 The jackal Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clusterconnmanager

import (
"context"

"github.com/jackal-xmpp/stravaganza"
clusterpb "github.com/ortuman/jackal/pkg/cluster/pb"
streamqueue "github.com/ortuman/jackal/pkg/module/xep0198/queue"
)

// StreamQueue represents a stream managed queue.
type StreamQueue struct {
// Elements contains the queue elements.
Elements []streamqueue.Element

// Nonce is the nonce queue byte slice.
Nonce []byte

// InH is the queue incoming h value.
InH uint32

// OutH is the queue outgoing h value.
OutH uint32
}

// StreamManagement defines a stream management service.
type StreamManagement interface {
TransferQueue(ctx context.Context, queueID string) (*StreamQueue, error)
}

type streamManagement struct {
cl clusterpb.StreamManagementClient
}

func (cc *streamManagement) TransferQueue(ctx context.Context, queueID string) (*StreamQueue, error) {
resp, err := cc.cl.TransferQueue(ctx, &clusterpb.TransferQueueRequest{
Identifier: queueID,
})
if err != nil {
return nil, err
}
if resp == nil {
return nil, nil
}
elements := make([]streamqueue.Element, 0, len(resp.Elements))

for _, elem := range resp.Elements {
b := stravaganza.NewBuilderFromProto(elem.GetStanza())
stanza, err := b.BuildStanza()
if err != nil {
return nil, err
}
elements = append(elements, streamqueue.Element{
Stanza: stanza,
H: elem.GetH(),
})
}
return &StreamQueue{
Elements: elements,
Nonce: resp.GetNonce(),
InH: resp.GetInH(),
OutH: resp.GetOutH(),
}, nil
}
Loading

0 comments on commit 972c707

Please sign in to comment.