Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MG-2142 - Consume Things connect/disconnect event in bootstrap #2192

Merged
merged 20 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bootstrap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string
return lm.svc.RemoveChannelHandler(ctx, id)
}

func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", channelID),
slog.String("thing_id", thingID),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Connect thing handler failed to complete successfully", args...)
return
}
lm.logger.Info("Connect thing handler completed successfully", args...)
}(time.Now())

return lm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
Expand Down
10 changes: 10 additions & 0 deletions bootstrap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string
return mm.svc.RemoveChannelHandler(ctx, id)
}

// ConnectThingHandler instruments ConnectThingHandler method with metrics.
func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing_handler").Add(1)
mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

// DisconnectThingHandler instruments DisconnectThingHandler method with metrics.
func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
Expand Down
6 changes: 4 additions & 2 deletions bootstrap/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ type ConfigRepository interface {
// RemoveChannel removes channel with the given ID.
RemoveChannel(ctx context.Context, id string) error

// DisconnectHandler changes state of the Config when the corresponding Thing is
// disconnected from the Channel.
// ConnectThing changes state of the Config when the corresponding Thing is connected to the Channel.
ConnectThing(ctx context.Context, channelID, thingID string) error
rodneyosodo marked this conversation as resolved.
Show resolved Hide resolved

// DisconnectThing changes state of the Config when the corresponding Thing is disconnected from the Channel.
DisconnectThing(ctx context.Context, channelID, thingID string) error
}
4 changes: 2 additions & 2 deletions bootstrap/events/consumer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type updateChannelEvent struct {
}

// Connection event is either connect or disconnect event.
type disconnectEvent struct {
thingID string
type connectionEvent struct {
thingIDs []string
channelID string
}
73 changes: 67 additions & 6 deletions bootstrap/events/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ import (
"time"

"github.com/absmach/magistrala/bootstrap"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/events"
)

const (
thingRemove = "thing.remove"
thingDisconnect = "policy.delete"
thingConnect = "group.assign"
thingDisconnect = "group.unassign"

channelPrefix = "group."
channelUpdate = channelPrefix + "update"
channelRemove = channelPrefix + "remove"

memberKind = "things"
relation = "group"
)

type eventHandler struct {
Expand All @@ -42,9 +47,32 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
case thingRemove:
rte := decodeRemoveThing(msg)
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingConnect:
cte := decodeConnectThing(msg)
rodneyosodo marked this conversation as resolved.
Show resolved Hide resolved
if cte.channelID == "" || len(cte.thingIDs) == 0 {
return svcerr.ErrMalformedEntity
}
rodneyosodo marked this conversation as resolved.
Show resolved Hide resolved
for _, thingID := range cte.thingIDs {
if thingID == "" {
return svcerr.ErrMalformedEntity
}
if err := es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil {
return err
}
}
case thingDisconnect:
rodneyosodo marked this conversation as resolved.
Show resolved Hide resolved
dte := decodeDisconnectThing(msg)
err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID)
if dte.channelID == "" || len(dte.thingIDs) == 0 {
return svcerr.ErrMalformedEntity
}
for _, thingID := range dte.thingIDs {
if thingID == "" {
return svcerr.ErrMalformedEntity
}
if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil {
return err
}
}
case channelUpdate:
uce := decodeUpdateChannel(msg)
err = es.handleUpdateChannel(ctx, uce)
Expand Down Expand Up @@ -87,10 +115,24 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent {
}
}

func decodeDisconnectThing(event map[string]interface{}) disconnectEvent {
return disconnectEvent{
channelID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
func decodeConnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
return connectionEvent{}
}

return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
}
}

func decodeDisconnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
return connectionEvent{}
}
return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
}
}

Expand All @@ -114,6 +156,25 @@ func read(event map[string]interface{}, key, def string) string {
return val
}

// ReadStringSlice reads string slice from event map.
// If value is not a string slice, returns empty slice.
func ReadStringSlice(event map[string]interface{}, key string) []string {
var res []string

vals, ok := event[key].([]interface{})
if !ok {
return res
}

for _, v := range vals {
if s, ok := v.(string); ok {
res = append(res, s)
}
}

return res
}

func readTime(event map[string]interface{}, key string, def time.Time) time.Time {
val, ok := event[key].(time.Time)
if !ok {
Expand Down
14 changes: 14 additions & 0 deletions bootstrap/events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -276,6 +277,19 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) {
return val, nil
}

type connectThingEvent struct {
thingID string
channelID string
}

func (cte connectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": cte.thingID,
"channel_id": cte.channelID,
"operation": thingConnect,
}, nil
}

type disconnectThingEvent struct {
thingID string
channelID string
Expand Down
17 changes: 15 additions & 2 deletions bootstrap/events/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,27 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra
return es.Publish(ctx, ev)
}

func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}

ev := connectThingEvent{
thingID: thingID,
channelID: channelID,
}

return es.Publish(ctx, ev)
}

func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}

ev := disconnectThingEvent{
thingID,
channelID,
thingID: thingID,
channelID: channelID,
}

return es.Publish(ctx, ev)
Expand Down
84 changes: 83 additions & 1 deletion bootstrap/events/producer/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) {
}
}

func TestConnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

svc, boot, _, _ := newService(t, redisURL)

err = redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

cases := []struct {
desc string
channelID string
thingID string
err error
event map[string]interface{}
}{
{
desc: "connect thing handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
{
desc: "add non-existing channel handler",
channelID: "unknown",
err: nil,
event: nil,
},
{
desc: "add channel handler with empty ID",
channelID: "",
err: nil,
event: nil,
},
{
desc: "add channel handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
}

lastID := "0"
for _, tc := range cases {
repoCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

streams := redisClient.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{streamID, lastID},
Count: 1,
Block: time.Second,
}).Val()

var event map[string]interface{}
if len(streams) > 0 && len(streams[0].Messages) > 0 {
msg := streams[0].Messages[0]
event = msg.Values
event["timestamp"] = msg.ID
lastID = msg.ID
}

test(t, tc.event, event, tc.desc)
repoCall.Unset()
}
}

func TestDisconnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
Expand Down Expand Up @@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) {

lastID := "0"
for _, tc := range cases {
repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
repoCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

Expand Down
18 changes: 18 additions & 0 deletions bootstrap/mocks/configs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading