From 8c1a7da193b626d5632f128ea3d1c9ffce00bf3c Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Fri, 19 Apr 2024 15:42:15 +0300 Subject: [PATCH 01/20] Refactor connectionEvent in events.go Signed-off-by: JeffMboya --- bootstrap/events/consumer/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bootstrap/events/consumer/events.go b/bootstrap/events/consumer/events.go index 1f47340825..e5f26a6158 100644 --- a/bootstrap/events/consumer/events.go +++ b/bootstrap/events/consumer/events.go @@ -18,7 +18,7 @@ type updateChannelEvent struct { } // Connection event is either connect or disconnect event. -type disconnectEvent struct { +type connectionEvent struct { thingID string channelID string } From 651dc726df60ebe89a7fb2122cdf9daaed3f87ee Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Tue, 23 Apr 2024 14:53:40 +0300 Subject: [PATCH 02/20] Add ConnectThing method to bootstrap Signed-off-by: JeffMboya Add ConnectThing method to bootstrap Signed-off-by: JeffMboya Consume ThingConnect in bootstrap Signed-off-by: JeffMboya Consume ThingConnect in bootstrap Signed-off-by: JeffMboya Consume ThingConnect in bootstrap Signed-off-by: JeffMboya Consume ThingConnect event Signed-off-by: JeffMboya Consume ThingsConnect event Signed-off-by: JeffMboya Consume ThingsConnect event Signed-off-by: JeffMboya Consume ThingsConnect event Signed-off-by: JeffMboya Implement ReadStringSlice Signed-off-by: JeffMboya Add memberKind and relation checks Signed-off-by: JeffMboya Add memberKind and relation checks Signed-off-by: JeffMboya Add memberKind and relation checks Signed-off-by: JeffMboya Add memberKind and relation checks Signed-off-by: JeffMboya Add memberKind and relation checks Signed-off-by: JeffMboya Add TestDisconnectThing to configs_test Signed-off-by: JeffMboya Update mocks Signed-off-by: JeffMboya --- bootstrap/api/logging.go | 18 +++++ bootstrap/api/metrics.go | 10 +++ bootstrap/configs.go | 6 +- bootstrap/events/consumer/events.go | 4 +- bootstrap/events/consumer/streams.go | 57 +++++++++++++-- bootstrap/events/producer/events.go | 22 ++++-- bootstrap/events/producer/streams.go | 17 ++++- bootstrap/events/producer/streams_test.go | 86 ++++++++++++++++++++++- bootstrap/mocks/configs.go | 26 +++++-- bootstrap/mocks/service.go | 26 +++++-- bootstrap/postgres/configs.go | 53 ++++++++++---- bootstrap/postgres/configs_test.go | 27 ++++++- bootstrap/service.go | 19 +++-- bootstrap/service_test.go | 43 ++++++++++++ bootstrap/tracing/tracing.go | 11 +++ internal/groups/events/events.go | 16 +++-- internal/groups/events/streams.go | 12 ++-- 17 files changed, 401 insertions(+), 52 deletions(-) diff --git a/bootstrap/api/logging.go b/bootstrap/api/logging.go index eb92d7a45e..a9a5f25466 100644 --- a/bootstrap/api/logging.go +++ b/bootstrap/api/logging.go @@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string return lm.svc.RemoveChannelHandler(ctx, id) } +func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + slog.String("channel_id", channelID), + slog.String("thing_id", thingID), + } + if err != nil { + args = append(args, slog.Any("error", err)) + lm.logger.Warn("Connect thing handler failed to complete successfully", args...) + return + } + lm.logger.Info("Connect thing handler completed successfully", args...) + }(time.Now()) + + return lm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { defer func(begin time.Time) { args := []any{ diff --git a/bootstrap/api/metrics.go b/bootstrap/api/metrics.go index 8137635e3c..9e42a57c18 100644 --- a/bootstrap/api/metrics.go +++ b/bootstrap/api/metrics.go @@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string return mm.svc.RemoveChannelHandler(ctx, id) } +// ConnectThingHandler instruments ConnectThingHandler method with metrics. +func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { + defer func(begin time.Time) { + mm.counter.With("method", "connect_thing_handler").Add(1) + mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + // DisconnectThingHandler instruments DisconnectThingHandler method with metrics. func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { defer func(begin time.Time) { diff --git a/bootstrap/configs.go b/bootstrap/configs.go index 6b7df9b42e..e6a5050743 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -112,7 +112,11 @@ type ConfigRepository interface { // RemoveChannel removes channel with the given ID. RemoveChannel(ctx context.Context, id string) error + // ConnectHandler changes state of the Config when the corresponding Thing is + // connected to the Channel. + ConnectThing(ctx context.Context, mgChannel, mgThing string) error + // DisconnectHandler changes state of the Config when the corresponding Thing is // disconnected from the Channel. - DisconnectThing(ctx context.Context, channelID, thingID string) error + DisconnectThing(ctx context.Context, mgChannel, mgThing string) error } diff --git a/bootstrap/events/consumer/events.go b/bootstrap/events/consumer/events.go index e5f26a6158..7ec78f6d00 100644 --- a/bootstrap/events/consumer/events.go +++ b/bootstrap/events/consumer/events.go @@ -19,6 +19,6 @@ type updateChannelEvent struct { // Connection event is either connect or disconnect event. type connectionEvent struct { - thingID string - channelID string + mgThing []string + mgChannel string } diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index 10d8ff77eb..55bf33f0f1 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -14,7 +14,8 @@ import ( const ( thingRemove = "thing.remove" - thingDisconnect = "policy.delete" + thingConnect = "group.assign" + thingDisconnect = "group.unassign" channelPrefix = "group." channelUpdate = channelPrefix + "update" @@ -42,9 +43,20 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { case thingRemove: rte := decodeRemoveThing(msg) err = es.svc.RemoveConfigHandler(ctx, rte.id) + case thingConnect: + cte := decodeConnectThing(msg) + for _, mgThing := range cte.mgThing { + if err = es.svc.ConnectThingHandler(ctx, cte.mgChannel, mgThing); err != nil { + return err + } + } case thingDisconnect: dte := decodeDisconnectThing(msg) - err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID) + for _, mgThing := range dte.mgThing { + if err = es.svc.DisconnectThingHandler(ctx, dte.mgChannel, mgThing); err != nil { + return err + } + } case channelUpdate: uce := decodeUpdateChannel(msg) err = es.handleUpdateChannel(ctx, uce) @@ -87,10 +99,24 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent { } } -func decodeDisconnectThing(event map[string]interface{}) disconnectEvent { - return disconnectEvent{ - channelID: read(event, "chan_id", ""), - thingID: read(event, "thing_id", ""), +func decodeConnectThing(event map[string]interface{}) connectionEvent { + if event["memberKind"] != "things" && event["relation"] != "group" { + return connectionEvent{} + } + + return connectionEvent{ + mgChannel: read(event, "group_id", ""), + mgThing: ReadStringSlice(event, "member_ids"), + } +} + +func decodeDisconnectThing(event map[string]interface{}) connectionEvent { + if event["memberKind"] != "things" && event["relation"] != "group" { + return connectionEvent{} + } + return connectionEvent{ + mgChannel: read(event, "group_id", ""), + mgThing: ReadStringSlice(event, "member_ids"), } } @@ -114,6 +140,25 @@ func read(event map[string]interface{}, key, def string) string { return val } +// ReadStringSlice reads string slice from event map. +// If value is not a string slice, returns empty slice. +func ReadStringSlice(event map[string]interface{}, key string) []string { + var res []string + + vals, ok := event[key].([]interface{}) + if !ok { + return res + } + + for _, v := range vals { + if s, ok := v.(string); ok { + res = append(res, s) + } + } + + return res +} + func readTime(event map[string]interface{}, key string, def time.Time) time.Time { val, ok := event[key].(time.Time) if !ok { diff --git a/bootstrap/events/producer/events.go b/bootstrap/events/producer/events.go index ed15704a4d..15c0a05cc0 100644 --- a/bootstrap/events/producer/events.go +++ b/bootstrap/events/producer/events.go @@ -23,6 +23,7 @@ const ( thingBootstrap = thingPrefix + "bootstrap" thingStateChange = thingPrefix + "change_state" thingUpdateConnections = thingPrefix + "update_connections" + thingConnect = thingPrefix + "connect" thingDisconnect = thingPrefix + "disconnect" channelPrefix = "group." @@ -276,15 +277,28 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) { return val, nil } +type connectThingEvent struct { + mgThing string + mgChannel string +} + +func (cte connectThingEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "thing_id": cte.mgThing, + "channel_id": cte.mgChannel, + "operation": thingConnect, + }, nil +} + type disconnectThingEvent struct { - thingID string - channelID string + mgThing string + mgChannel string } func (dte disconnectThingEvent) Encode() (map[string]interface{}, error) { return map[string]interface{}{ - "thing_id": dte.thingID, - "channel_id": dte.channelID, + "thing_id": dte.mgThing, + "channel_id": dte.mgChannel, "operation": thingDisconnect, }, nil } diff --git a/bootstrap/events/producer/streams.go b/bootstrap/events/producer/streams.go index 5765137758..62f1c7fac8 100644 --- a/bootstrap/events/producer/streams.go +++ b/bootstrap/events/producer/streams.go @@ -207,14 +207,27 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra return es.Publish(ctx, ev) } +func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil { + return err + } + + ev := connectThingEvent{ + mgThing: thingID, + mgChannel: channelID, + } + + return es.Publish(ctx, ev) +} + func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil { return err } ev := disconnectThingEvent{ - thingID, - channelID, + mgThing: thingID, + mgChannel: channelID, } return es.Publish(ctx, ev) diff --git a/bootstrap/events/producer/streams_test.go b/bootstrap/events/producer/streams_test.go index 175c79d365..a97d1010bb 100644 --- a/bootstrap/events/producer/streams_test.go +++ b/bootstrap/events/producer/streams_test.go @@ -48,6 +48,7 @@ const ( thingBootstrap = thingPrefix + "bootstrap" thingStateChange = thingPrefix + "change_state" thingUpdateConnections = thingPrefix + "update_connections" + thingConnect = thingPrefix + "connect" thingDisconnect = thingPrefix + "disconnect" channelPrefix = "group." @@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) { } } +func TestConnectThingHandler(t *testing.T) { + err := redisClient.FlushAll(context.Background()).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + svc, boot, _, _ := newService(t, redisURL) + + err = redisClient.FlushAll(context.Background()).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + cases := []struct { + desc string + channelID string + thingID string + err error + event map[string]interface{} + }{ + { + desc: "connect thing handler successfully", + channelID: channel.ID, + thingID: "1", + err: nil, + event: map[string]interface{}{ + "channel_id": channel.ID, + "thing_id": "1", + "operation": thingConnect, + "timestamp": time.Now().UnixNano(), + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "add non-existing channel handler", + channelID: "unknown", + err: nil, + event: nil, + }, + { + desc: "add channel handler with empty ID", + channelID: "", + err: nil, + event: nil, + }, + { + desc: "add channel handler successfully", + channelID: channel.ID, + thingID: "1", + err: nil, + event: map[string]interface{}{ + "channel_id": channel.ID, + "thing_id": "1", + "operation": thingConnect, + "timestamp": time.Now().UnixNano(), + "occurred_at": time.Now().UnixNano(), + }, + }, + } + + lastID := "0" + for _, tc := range cases { + svcCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) + err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(context.Background(), &redis.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + Block: time.Second, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + event["timestamp"] = msg.ID + lastID = msg.ID + } + + test(t, tc.event, event, tc.desc) + svcCall.Unset() + } +} + func TestDisconnectThingHandler(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) { lastID := "0" for _, tc := range cases { - repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + svcCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -1116,7 +1198,7 @@ func TestDisconnectThingHandler(t *testing.T) { } test(t, tc.event, event, tc.desc) - repoCall.Unset() + svcCall.Unset() } } diff --git a/bootstrap/mocks/configs.go b/bootstrap/mocks/configs.go index 6345bff34e..478589c8e4 100644 --- a/bootstrap/mocks/configs.go +++ b/bootstrap/mocks/configs.go @@ -35,9 +35,27 @@ func (_m *ConfigRepository) ChangeState(ctx context.Context, owner string, id st return r0 } -// DisconnectThing provides a mock function with given fields: ctx, channelID, thingID -func (_m *ConfigRepository) DisconnectThing(ctx context.Context, channelID string, thingID string) error { - ret := _m.Called(ctx, channelID, thingID) +// ConnectThing provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *ConfigRepository) ConnectThing(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) + + if len(ret) == 0 { + panic("no return value specified for ConnectThing") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, mgChannel, mgThing) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DisconnectThing provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *ConfigRepository) DisconnectThing(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) if len(ret) == 0 { panic("no return value specified for DisconnectThing") @@ -45,7 +63,7 @@ func (_m *ConfigRepository) DisconnectThing(ctx context.Context, channelID strin var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, channelID, thingID) + r0 = rf(ctx, mgChannel, mgThing) } else { r0 = ret.Error(0) } diff --git a/bootstrap/mocks/service.go b/bootstrap/mocks/service.go index d312b15f1e..0c7a6b7ddc 100644 --- a/bootstrap/mocks/service.go +++ b/bootstrap/mocks/service.go @@ -91,9 +91,27 @@ func (_m *Service) ChangeState(ctx context.Context, token string, id string, sta return r0 } -// DisconnectThingHandler provides a mock function with given fields: ctx, channelID, thingID -func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, thingID string) error { - ret := _m.Called(ctx, channelID, thingID) +// ConnectThingHandler provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *Service) ConnectThingHandler(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) + + if len(ret) == 0 { + panic("no return value specified for ConnectThingHandler") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, mgChannel, mgThing) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DisconnectThingHandler provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *Service) DisconnectThingHandler(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) if len(ret) == 0 { panic("no return value specified for DisconnectThingHandler") @@ -101,7 +119,7 @@ func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, channelID, thingID) + r0 = rf(ctx, mgChannel, mgThing) } else { r0 = ret.Error(0) } diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 998f2a28aa..0a1e2f68cf 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -28,6 +28,7 @@ var ( errSaveConnections = errors.New("failed to insert connections to database") errUpdateChannels = errors.New("failed to update channels in bootstrap configuration database") errRemoveChannels = errors.New("failed to remove channels from bootstrap configuration in database") + errConnectThing = errors.New("failed to connect thing in bootstrap configuration in database") errDisconnectThing = errors.New("failed to disconnect thing in bootstrap configuration in database") ) @@ -59,27 +60,33 @@ func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsCo dbcfg := toDBConfig(cfg) if _, err := tx.NamedExec(q, dbcfg); err != nil { - e := err if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { - e = repoerr.ErrConflict + err = repoerr.ErrConflict } - cr.rollback("Failed to insert a Config", tx) - return "", errors.Wrap(repoerr.ErrCreateEntity, e) + if errRollback := cr.rollback("failed to insert a Config", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } } if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil { - cr.rollback("Failed to insert Channels", tx) + if errRollback := cr.rollback("failed to insert Channels", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } return "", errors.Wrap(errSaveChannels, err) } if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil { - cr.rollback("Failed to insert connections", tx) + if errRollback := cr.rollback("failed to insert connections", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } return "", errors.Wrap(errSaveConnections, err) } if err := tx.Commit(); err != nil { - cr.rollback("Failed to commit Config save", tx) + if errRollback := cr.rollback("failed to commit Config save", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } return "", err } @@ -314,7 +321,9 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri } if err := insertChannels(ctx, owner, channels, tx); err != nil { - cr.rollback("Failed to insert Channels during the update", tx) + if rollbackErr := cr.rollback("failed to insert Channels during the update", tx); rollbackErr != nil { + return err + } return errors.Wrap(repoerr.ErrUpdateEntity, err) } @@ -324,12 +333,16 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri return repoerr.ErrNotFound } } - cr.rollback("Failed to update connections during the update", tx) + if errRollback := cr.rollback("failed to update connections during the update", tx); errRollback != nil { + return errors.Wrap(err, errRollback) + } return errors.Wrap(repoerr.ErrUpdateEntity, err) } if err := tx.Commit(); err != nil { - cr.rollback("Failed to commit Config update", tx) + if errRollback := cr.rollback("failed to commit Config update", tx); errRollback != nil { + return errors.Wrap(err, errRollback) + } return errors.Wrap(repoerr.ErrUpdateEntity, err) } @@ -451,10 +464,19 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { return nil } -func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { +func (cr configRepository) ConnectThing(ctx context.Context, mgChannel, mgThing string) error { + q := `UPDATE configs SET state = $1 WHERE EXISTS ( + SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, mgThing, mgChannel); err != nil { + return errors.Wrap(errConnectThing, err) + } + return nil +} + +func (cr configRepository) DisconnectThing(ctx context.Context, mgChannel, mgThing string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { + if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, mgThing, mgChannel); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil @@ -483,10 +505,13 @@ func (cr configRepository) retrieveAll(owner string, filter bootstrap.Filter) (s return fmt.Sprintf(template, f), params } -func (cr configRepository) rollback(content string, tx *sqlx.Tx) { +func (cr configRepository) rollback(content string, tx *sqlx.Tx) error { + errMsg := errors.New(content) if err := tx.Rollback(); err != nil { - cr.log.Error(fmt.Sprintf("Failed to rollback due to %s", err)) + errRollback := errors.New("failed to rollback") + return errors.Wrap(errMsg, errors.Wrap(errRollback, err)) } + return errMsg } func insertChannels(_ context.Context, owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error { diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 81e3d85256..60ee50389a 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -675,6 +675,31 @@ func TestRemoveChannel(t *testing.T) { assert.NotContains(t, cfg.Channels, c.Channels[0], fmt.Sprintf("expected to remove channel %s from %s", c.Channels[0], cfg.Channels)) } +func TestConnectThing(t *testing.T) { + repo := postgres.NewConfigRepository(db, testLog) + err := deleteChannels(context.Background(), repo) + require.Nil(t, err, "Channels cleanup expected to succeed.") + + c := config + // Use UUID to prevent conflicts. + uid, err := uuid.NewV4() + assert.Nil(t, err, fmt.Sprintf("Got unexpected error: %s.\n", err)) + c.ThingKey = uid.String() + c.ThingID = uid.String() + c.ExternalID = uid.String() + c.ExternalKey = uid.String() + saved, err := repo.Save(context.Background(), c, channels) + assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) + + err = repo.ConnectThing(context.Background(), c.Channels[0].ID, saved) + fmt.Print("state", config.State) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg)) +} + func TestDisconnectThing(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) err := deleteChannels(context.Background(), repo) @@ -696,7 +721,7 @@ func TestDisconnectThing(t *testing.T) { cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) - assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected ti be inactive when a connection is removed from %s", cfg)) + assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected to be inactive when a connection is removed from %s", cfg)) } func deleteChannels(ctx context.Context, repo bootstrap.ConfigRepository) error { diff --git a/bootstrap/service.go b/bootstrap/service.go index 1214e944c4..7fd09ff8e6 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -41,6 +41,7 @@ var ( errRemoveConfig = errors.New("failed to remove bootstrap configuration") errRemoveChannel = errors.New("failed to remove channel") errCreateThing = errors.New("failed to create thing") + errConnectThing = errors.New("failed to connect thing") errDisconnectThing = errors.New("failed to disconnect thing") errCheckChannels = errors.New("failed to check if channels exists") errConnectionChannels = errors.New("failed to check channels connections") @@ -96,8 +97,11 @@ type Service interface { // RemoveChannelHandler removes Channel with id received from an event. RemoveChannelHandler(ctx context.Context, id string) error - // DisconnectHandler changes state of the Config when connect/disconnect event occurs. - DisconnectThingHandler(ctx context.Context, channelID, thingID string) error + // ConnectHandler changes state of the Config to active when connect event occurs. + ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error + + // DisconnectHandler changes state of the Config to inactive when disconnect event occurs. + DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error } // ConfigReader is used to parse Config into format which will be encoded @@ -373,8 +377,15 @@ func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string) return nil } -func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { - if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil { +func (bs bootstrapService) ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error { + if err := bs.configs.ConnectThing(ctx, mgChannel, mgThing); err != nil { + return errors.Wrap(errConnectThing, err) + } + return nil +} + +func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error { + if err := bs.configs.DisconnectThing(ctx, mgChannel, mgThing); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index fe8aa290d1..28b93bd003 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -909,6 +909,49 @@ func TestRemoveCoinfigHandler(t *testing.T) { } } +func TestConnectThingsHandler(t *testing.T) { + svc, boot, auth, sdk := newService() + repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) + repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) + repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) + repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) + repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) + saved, err := svc.Add(context.Background(), validToken, config) + assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) + repoCall.Unset() + repoCall1.Unset() + repoCall2.Unset() + repoCall3.Unset() + repoCall4.Unset() + + cases := []struct { + desc string + thingID string + channelID string + err error + }{ + { + desc: "connect", + channelID: channel.ID, + thingID: saved.ThingID, + err: nil, + }, + { + desc: "connect connected", + channelID: channel.ID, + thingID: saved.ThingID, + err: nil, + }, + } + + for _, tc := range cases { + repoCall := boot.On("ConnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID) + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + repoCall.Unset() + } +} + func TestDisconnectThingsHandler(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) diff --git a/bootstrap/tracing/tracing.go b/bootstrap/tracing/tracing.go index 6d867aa39e..852a3f088b 100644 --- a/bootstrap/tracing/tracing.go +++ b/bootstrap/tracing/tracing.go @@ -158,6 +158,17 @@ func (tm *tracingMiddleware) RemoveChannelHandler(ctx context.Context, id string return tm.svc.RemoveChannelHandler(ctx, id) } +// ConnectThingHandler traces the "ConnectThingHandler" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + ctx, span := tm.tracer.Start(ctx, "svc_connect_thing_handler", trace.WithAttributes( + attribute.String("channel_id", channelID), + attribute.String("thing_id", thingID), + )) + defer span.End() + + return tm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + // DisconnectThingHandler traces the "DisconnectThingHandler" operation of the wrapped bootstrap.Service. func (tm *tracingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { ctx, span := tm.tracer.Start(ctx, "svc_disconnect_thing_handler", trace.WithAttributes( diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index d15a2a1caa..f35153c5c1 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -39,14 +39,18 @@ var ( ) type assignEvent struct { - memberIDs []string - groupID string + memberIDs []string + relation string + memberKind string + groupID string } func (cge assignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ "operation": groupAssign, "member_ids": cge.memberIDs, + "relation": cge.relation, + "memberKind": cge.memberKind, "group_id": cge.groupID, } @@ -54,14 +58,18 @@ func (cge assignEvent) Encode() (map[string]interface{}, error) { } type unassignEvent struct { - memberIDs []string - groupID string + memberIDs []string + relation string + memberKind string + groupID string } func (cge unassignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ "operation": groupUnassign, "member_ids": cge.memberIDs, + "relation": cge.relation, + "memberKind": cge.memberKind, "group_id": cge.groupID, } diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 10610d6686..9fa5fede07 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -145,8 +145,10 @@ func (es eventStore) Assign(ctx context.Context, token, groupID, relation, membe } event := assignEvent{ - groupID: groupID, - memberIDs: memberIDs, + groupID: groupID, + relation: relation, + memberKind: memberKind, + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil { @@ -162,8 +164,10 @@ func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, mem } event := unassignEvent{ - groupID: groupID, - memberIDs: memberIDs, + groupID: groupID, + relation: relation, + memberKind: memberKind, + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil { From 5ce24edf346d9a462b46dae613579d89c1ab985f Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 20 May 2024 08:54:38 +0300 Subject: [PATCH 03/20] Refactor: revert to thingID and channelID Signed-off-by: JeffMboya Refactor: revert to thingID and channelID Signed-off-by: JeffMboya --- bootstrap/configs.go | 4 ++-- bootstrap/events/consumer/events.go | 4 ++-- bootstrap/events/consumer/streams.go | 16 ++++++++-------- bootstrap/events/producer/events.go | 16 ++++++++-------- bootstrap/events/producer/streams.go | 8 ++++---- bootstrap/postgres/configs.go | 8 ++++---- bootstrap/service.go | 12 ++++++------ go.mod | 2 ++ 8 files changed, 36 insertions(+), 34 deletions(-) diff --git a/bootstrap/configs.go b/bootstrap/configs.go index e6a5050743..1e68da63be 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -114,9 +114,9 @@ type ConfigRepository interface { // ConnectHandler changes state of the Config when the corresponding Thing is // connected to the Channel. - ConnectThing(ctx context.Context, mgChannel, mgThing string) error + ConnectThing(ctx context.Context, channelID, thingID string) error // DisconnectHandler changes state of the Config when the corresponding Thing is // disconnected from the Channel. - DisconnectThing(ctx context.Context, mgChannel, mgThing string) error + DisconnectThing(ctx context.Context, channelID, thingID string) error } diff --git a/bootstrap/events/consumer/events.go b/bootstrap/events/consumer/events.go index 7ec78f6d00..5436cb75e9 100644 --- a/bootstrap/events/consumer/events.go +++ b/bootstrap/events/consumer/events.go @@ -19,6 +19,6 @@ type updateChannelEvent struct { // Connection event is either connect or disconnect event. type connectionEvent struct { - mgThing []string - mgChannel string + thingID []string + channelID string } diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index 55bf33f0f1..d264bf549d 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -45,15 +45,15 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { err = es.svc.RemoveConfigHandler(ctx, rte.id) case thingConnect: cte := decodeConnectThing(msg) - for _, mgThing := range cte.mgThing { - if err = es.svc.ConnectThingHandler(ctx, cte.mgChannel, mgThing); err != nil { + for _, thingID := range cte.thingID { + if err = es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil { return err } } case thingDisconnect: dte := decodeDisconnectThing(msg) - for _, mgThing := range dte.mgThing { - if err = es.svc.DisconnectThingHandler(ctx, dte.mgChannel, mgThing); err != nil { + for _, thingID := range dte.thingID { + if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil { return err } } @@ -105,8 +105,8 @@ func decodeConnectThing(event map[string]interface{}) connectionEvent { } return connectionEvent{ - mgChannel: read(event, "group_id", ""), - mgThing: ReadStringSlice(event, "member_ids"), + channelID: read(event, "group_id", ""), + thingID: ReadStringSlice(event, "member_ids"), } } @@ -115,8 +115,8 @@ func decodeDisconnectThing(event map[string]interface{}) connectionEvent { return connectionEvent{} } return connectionEvent{ - mgChannel: read(event, "group_id", ""), - mgThing: ReadStringSlice(event, "member_ids"), + channelID: read(event, "group_id", ""), + thingID: ReadStringSlice(event, "member_ids"), } } diff --git a/bootstrap/events/producer/events.go b/bootstrap/events/producer/events.go index 15c0a05cc0..beebb7976a 100644 --- a/bootstrap/events/producer/events.go +++ b/bootstrap/events/producer/events.go @@ -278,27 +278,27 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) { } type connectThingEvent struct { - mgThing string - mgChannel string + thingID string + channelID string } func (cte connectThingEvent) Encode() (map[string]interface{}, error) { return map[string]interface{}{ - "thing_id": cte.mgThing, - "channel_id": cte.mgChannel, + "thing_id": cte.thingID, + "channel_id": cte.channelID, "operation": thingConnect, }, nil } type disconnectThingEvent struct { - mgThing string - mgChannel string + thingID string + channelID string } func (dte disconnectThingEvent) Encode() (map[string]interface{}, error) { return map[string]interface{}{ - "thing_id": dte.mgThing, - "channel_id": dte.mgChannel, + "thing_id": dte.thingID, + "channel_id": dte.channelID, "operation": thingDisconnect, }, nil } diff --git a/bootstrap/events/producer/streams.go b/bootstrap/events/producer/streams.go index 62f1c7fac8..d3f93af9a6 100644 --- a/bootstrap/events/producer/streams.go +++ b/bootstrap/events/producer/streams.go @@ -213,8 +213,8 @@ func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingI } ev := connectThingEvent{ - mgThing: thingID, - mgChannel: channelID, + thingID: thingID, + channelID: channelID, } return es.Publish(ctx, ev) @@ -226,8 +226,8 @@ func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thi } ev := disconnectThingEvent{ - mgThing: thingID, - mgChannel: channelID, + thingID: thingID, + channelID: channelID, } return es.Publish(ctx, ev) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 0a1e2f68cf..efc2b9aa12 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -464,19 +464,19 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { return nil } -func (cr configRepository) ConnectThing(ctx context.Context, mgChannel, mgThing string) error { +func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, mgThing, mgChannel); err != nil { + if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID); err != nil { return errors.Wrap(errConnectThing, err) } return nil } -func (cr configRepository) DisconnectThing(ctx context.Context, mgChannel, mgThing string) error { +func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, mgThing, mgChannel); err != nil { + if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil diff --git a/bootstrap/service.go b/bootstrap/service.go index 7fd09ff8e6..2a00146a44 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -98,10 +98,10 @@ type Service interface { RemoveChannelHandler(ctx context.Context, id string) error // ConnectHandler changes state of the Config to active when connect event occurs. - ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error + ConnectThingHandler(ctx context.Context, channelID, ThingID string) error // DisconnectHandler changes state of the Config to inactive when disconnect event occurs. - DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error + DisconnectThingHandler(ctx context.Context, channelID, ThingID string) error } // ConfigReader is used to parse Config into format which will be encoded @@ -377,15 +377,15 @@ func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string) return nil } -func (bs bootstrapService) ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error { - if err := bs.configs.ConnectThing(ctx, mgChannel, mgThing); err != nil { +func (bs bootstrapService) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + if err := bs.configs.ConnectThing(ctx, channelID, thingID); err != nil { return errors.Wrap(errConnectThing, err) } return nil } -func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error { - if err := bs.configs.DisconnectThing(ctx, mgChannel, mgThing); err != nil { +func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { + if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil diff --git a/go.mod b/go.mod index 31a826b367..ea0e350ed4 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect @@ -190,4 +191,5 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + mvdan.cc/gofumpt v0.6.0 // indirect ) From bd4e297e57baf6e4040824742492264d5a500e88 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 20 May 2024 09:41:25 +0300 Subject: [PATCH 04/20] Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya --- bootstrap/events/consumer/streams.go | 7 +++++-- bootstrap/mocks/configs.go | 16 ++++++++-------- bootstrap/mocks/service.go | 16 ++++++++-------- bootstrap/postgres/configs.go | 23 +++++++++++------------ go.mod | 2 -- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index d264bf549d..9d6e0abcd0 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -20,6 +20,9 @@ const ( channelPrefix = "group." channelUpdate = channelPrefix + "update" channelRemove = channelPrefix + "remove" + + memberKind = "things" + relation = "group" ) type eventHandler struct { @@ -100,7 +103,7 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent { } func decodeConnectThing(event map[string]interface{}) connectionEvent { - if event["memberKind"] != "things" && event["relation"] != "group" { + if event["memberKind"] != memberKind && event["relation"] != relation { return connectionEvent{} } @@ -111,7 +114,7 @@ func decodeConnectThing(event map[string]interface{}) connectionEvent { } func decodeDisconnectThing(event map[string]interface{}) connectionEvent { - if event["memberKind"] != "things" && event["relation"] != "group" { + if event["memberKind"] != memberKind && event["relation"] != relation { return connectionEvent{} } return connectionEvent{ diff --git a/bootstrap/mocks/configs.go b/bootstrap/mocks/configs.go index 478589c8e4..6f008be7ab 100644 --- a/bootstrap/mocks/configs.go +++ b/bootstrap/mocks/configs.go @@ -35,9 +35,9 @@ func (_m *ConfigRepository) ChangeState(ctx context.Context, owner string, id st return r0 } -// ConnectThing provides a mock function with given fields: ctx, mgChannel, mgThing -func (_m *ConfigRepository) ConnectThing(ctx context.Context, mgChannel string, mgThing string) error { - ret := _m.Called(ctx, mgChannel, mgThing) +// ConnectThing provides a mock function with given fields: ctx, channelID, thingID +func (_m *ConfigRepository) ConnectThing(ctx context.Context, channelID string, thingID string) error { + ret := _m.Called(ctx, channelID, thingID) if len(ret) == 0 { panic("no return value specified for ConnectThing") @@ -45,7 +45,7 @@ func (_m *ConfigRepository) ConnectThing(ctx context.Context, mgChannel string, var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, mgChannel, mgThing) + r0 = rf(ctx, channelID, thingID) } else { r0 = ret.Error(0) } @@ -53,9 +53,9 @@ func (_m *ConfigRepository) ConnectThing(ctx context.Context, mgChannel string, return r0 } -// DisconnectThing provides a mock function with given fields: ctx, mgChannel, mgThing -func (_m *ConfigRepository) DisconnectThing(ctx context.Context, mgChannel string, mgThing string) error { - ret := _m.Called(ctx, mgChannel, mgThing) +// DisconnectThing provides a mock function with given fields: ctx, channelID, thingID +func (_m *ConfigRepository) DisconnectThing(ctx context.Context, channelID string, thingID string) error { + ret := _m.Called(ctx, channelID, thingID) if len(ret) == 0 { panic("no return value specified for DisconnectThing") @@ -63,7 +63,7 @@ func (_m *ConfigRepository) DisconnectThing(ctx context.Context, mgChannel strin var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, mgChannel, mgThing) + r0 = rf(ctx, channelID, thingID) } else { r0 = ret.Error(0) } diff --git a/bootstrap/mocks/service.go b/bootstrap/mocks/service.go index 0c7a6b7ddc..77e110c0ed 100644 --- a/bootstrap/mocks/service.go +++ b/bootstrap/mocks/service.go @@ -91,9 +91,9 @@ func (_m *Service) ChangeState(ctx context.Context, token string, id string, sta return r0 } -// ConnectThingHandler provides a mock function with given fields: ctx, mgChannel, mgThing -func (_m *Service) ConnectThingHandler(ctx context.Context, mgChannel string, mgThing string) error { - ret := _m.Called(ctx, mgChannel, mgThing) +// ConnectThingHandler provides a mock function with given fields: ctx, channelID, ThingID +func (_m *Service) ConnectThingHandler(ctx context.Context, channelID string, ThingID string) error { + ret := _m.Called(ctx, channelID, ThingID) if len(ret) == 0 { panic("no return value specified for ConnectThingHandler") @@ -101,7 +101,7 @@ func (_m *Service) ConnectThingHandler(ctx context.Context, mgChannel string, mg var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, mgChannel, mgThing) + r0 = rf(ctx, channelID, ThingID) } else { r0 = ret.Error(0) } @@ -109,9 +109,9 @@ func (_m *Service) ConnectThingHandler(ctx context.Context, mgChannel string, mg return r0 } -// DisconnectThingHandler provides a mock function with given fields: ctx, mgChannel, mgThing -func (_m *Service) DisconnectThingHandler(ctx context.Context, mgChannel string, mgThing string) error { - ret := _m.Called(ctx, mgChannel, mgThing) +// DisconnectThingHandler provides a mock function with given fields: ctx, channelID, ThingID +func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, ThingID string) error { + ret := _m.Called(ctx, channelID, ThingID) if len(ret) == 0 { panic("no return value specified for DisconnectThingHandler") @@ -119,7 +119,7 @@ func (_m *Service) DisconnectThingHandler(ctx context.Context, mgChannel string, var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, mgChannel, mgThing) + r0 = rf(ctx, channelID, ThingID) } else { r0 = ret.Error(0) } diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index efc2b9aa12..87d2ff40af 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -64,27 +64,27 @@ func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsCo err = repoerr.ErrConflict } - if errRollback := cr.rollback("failed to insert a Config", tx); errRollback != nil { + if errRollback := cr.rollback(err, tx); errRollback != nil { return "", errors.Wrap(err, errRollback) } } if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil { - if errRollback := cr.rollback("failed to insert Channels", tx); errRollback != nil { + if errRollback := cr.rollback(err, tx); errRollback != nil { return "", errors.Wrap(err, errRollback) } return "", errors.Wrap(errSaveChannels, err) } if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil { - if errRollback := cr.rollback("failed to insert connections", tx); errRollback != nil { + if errRollback := cr.rollback(err, tx); errRollback != nil { return "", errors.Wrap(err, errRollback) } return "", errors.Wrap(errSaveConnections, err) } if err := tx.Commit(); err != nil { - if errRollback := cr.rollback("failed to commit Config save", tx); errRollback != nil { + if errRollback := cr.rollback(err, tx); errRollback != nil { return "", errors.Wrap(err, errRollback) } return "", err @@ -321,7 +321,7 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri } if err := insertChannels(ctx, owner, channels, tx); err != nil { - if rollbackErr := cr.rollback("failed to insert Channels during the update", tx); rollbackErr != nil { + if rollbackErr := cr.rollback(err, tx); rollbackErr != nil { return err } return errors.Wrap(repoerr.ErrUpdateEntity, err) @@ -333,14 +333,14 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri return repoerr.ErrNotFound } } - if errRollback := cr.rollback("failed to update connections during the update", tx); errRollback != nil { + if errRollback := cr.rollback(err, tx); errRollback != nil { return errors.Wrap(err, errRollback) } return errors.Wrap(repoerr.ErrUpdateEntity, err) } if err := tx.Commit(); err != nil { - if errRollback := cr.rollback("failed to commit Config update", tx); errRollback != nil { + if errRollback := cr.rollback(err, tx); errRollback != nil { return errors.Wrap(err, errRollback) } return errors.Wrap(repoerr.ErrUpdateEntity, err) @@ -505,13 +505,12 @@ func (cr configRepository) retrieveAll(owner string, filter bootstrap.Filter) (s return fmt.Sprintf(template, f), params } -func (cr configRepository) rollback(content string, tx *sqlx.Tx) error { - errMsg := errors.New(content) +func (cr configRepository) rollback(defErr error, tx *sqlx.Tx) error { if err := tx.Rollback(); err != nil { - errRollback := errors.New("failed to rollback") - return errors.Wrap(errMsg, errors.Wrap(errRollback, err)) + return errors.Wrap(defErr, errors.Wrap(errors.New("failed to rollback"), err)) } - return errMsg + + return defErr } func insertChannels(_ context.Context, owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error { diff --git a/go.mod b/go.mod index ea0e350ed4..31a826b367 100644 --- a/go.mod +++ b/go.mod @@ -99,7 +99,6 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect @@ -191,5 +190,4 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - mvdan.cc/gofumpt v0.6.0 // indirect ) From 12f3a44dd982f23ea1086fb613efec183654a5bd Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 20 May 2024 10:51:21 +0300 Subject: [PATCH 05/20] Refactor: removed saved method Signed-off-by: JeffMboya --- bootstrap/service_test.go | 93 +++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 48 deletions(-) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 28b93bd003..ca84a304a1 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -150,7 +150,7 @@ func TestView(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) + _, err := svc.Add(context.Background(), validToken, config) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -166,7 +166,7 @@ func TestView(t *testing.T) { }{ { desc: "view an existing config", - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, @@ -207,7 +207,7 @@ func TestUpdate(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, c) + _, err := svc.Add(context.Background(), validToken, c) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -215,7 +215,7 @@ func TestUpdate(t *testing.T) { repoCall3.Unset() repoCall4.Unset() - modifiedCreated := saved + modifiedCreated := c modifiedCreated.Content = "new-config" modifiedCreated.Name = "new name" @@ -242,7 +242,7 @@ func TestUpdate(t *testing.T) { }, { desc: "update a config with wrong credentials", - config: saved, + config: c, token: invalidToken, err: svcerr.ErrAuthentication, }, @@ -270,7 +270,7 @@ func TestUpdateCert(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, c) + _, err := svc.Add(context.Background(), validToken, c) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -290,21 +290,21 @@ func TestUpdateCert(t *testing.T) { }{ { desc: "update certs for the valid config", - thingID: saved.ThingID, + thingID: c.ThingID, clientCert: "newCert", clientKey: "newKey", caCert: "newCert", token: validToken, expectedConfig: bootstrap.Config{ - Name: saved.Name, - ThingKey: saved.ThingKey, - Channels: saved.Channels, - ExternalID: saved.ExternalID, - ExternalKey: saved.ExternalKey, - Content: saved.Content, - State: saved.State, - Owner: saved.Owner, - ThingID: saved.ThingID, + Name: c.Name, + ThingKey: c.ThingKey, + Channels: c.Channels, + ExternalID: c.ExternalID, + ExternalKey: c.ExternalKey, + Content: c.Content, + State: c.State, + Owner: c.Owner, + ThingID: c.ThingID, ClientCert: "newCert", CACert: "newCert", ClientKey: "newKey", @@ -323,7 +323,7 @@ func TestUpdateCert(t *testing.T) { }, { desc: "update config cert with wrong credentials", - thingID: saved.ThingID, + thingID: c.ThingID, clientCert: "newCert", clientKey: "newKey", caCert: "newCert", @@ -590,7 +590,7 @@ func TestRemove(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) + _, err := svc.Add(context.Background(), validToken, config) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -606,19 +606,19 @@ func TestRemove(t *testing.T) { }{ { desc: "view a config with wrong credentials", - id: saved.ThingID, + id: config.ThingID, token: invalidToken, err: svcerr.ErrAuthentication, }, { desc: "remove an existing config", - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, { desc: "remove removed config", - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, @@ -647,7 +647,7 @@ func TestBootstrap(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) + _, err := svc.Add(context.Background(), validToken, config) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -655,7 +655,7 @@ func TestBootstrap(t *testing.T) { repoCall3.Unset() repoCall4.Unset() - e, err := enc([]byte(saved.ExternalKey)) + e, err := enc([]byte(config.ExternalKey)) assert.Nil(t, err, fmt.Sprintf("Encrypting external key expected to succeed: %s.\n", err)) cases := []struct { @@ -670,30 +670,30 @@ func TestBootstrap(t *testing.T) { desc: "bootstrap using invalid external id", config: bootstrap.Config{}, externalID: "invalid", - externalKey: saved.ExternalKey, + externalKey: config.ExternalKey, err: svcerr.ErrNotFound, encrypted: false, }, { desc: "bootstrap using invalid external key", config: bootstrap.Config{}, - externalID: saved.ExternalID, + externalID: config.ExternalID, externalKey: "invalid", err: bootstrap.ErrExternalKey, encrypted: false, }, { desc: "bootstrap an existing config", - config: saved, - externalID: saved.ExternalID, - externalKey: saved.ExternalKey, + config: config, + externalID: config.ExternalID, + externalKey: config.ExternalKey, err: nil, encrypted: false, }, { desc: "bootstrap encrypted", - config: saved, - externalID: saved.ExternalID, + config: config, + externalID: config.ExternalID, externalKey: hex.EncodeToString(e), err: nil, encrypted: true, @@ -708,7 +708,6 @@ func TestBootstrap(t *testing.T) { repoCall.Unset() } } - func TestChangeState(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) @@ -716,7 +715,7 @@ func TestChangeState(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(toGroup(config.Channels[0]), nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) + _, err := svc.Add(context.Background(), validToken, config) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -734,7 +733,7 @@ func TestChangeState(t *testing.T) { { desc: "change state with wrong credentials", state: bootstrap.Active, - id: saved.ThingID, + id: config.ThingID, token: invalidToken, err: svcerr.ErrAuthentication, }, @@ -748,21 +747,21 @@ func TestChangeState(t *testing.T) { { desc: "change state to Active", state: bootstrap.Active, - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, { desc: "change state to current state", state: bootstrap.Active, - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, { desc: "change state to Inactive", state: bootstrap.Inactive, - id: saved.ThingID, + id: config.ThingID, token: validToken, err: nil, }, @@ -851,7 +850,7 @@ func TestRemoveChannelHandler(t *testing.T) { }{ { desc: "remove an existing channel", - id: channel.ID, + id: config.Channels[0].ID, err: nil, }, { @@ -869,14 +868,14 @@ func TestRemoveChannelHandler(t *testing.T) { } } -func TestRemoveCoinfigHandler(t *testing.T) { +func TestRemoveConfigHandler(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) + _, err := svc.Add(context.Background(), validToken, config) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -891,7 +890,7 @@ func TestRemoveCoinfigHandler(t *testing.T) { }{ { desc: "remove an existing config", - id: saved.ThingID, + id: config.ThingID, err: nil, }, { @@ -908,7 +907,6 @@ func TestRemoveCoinfigHandler(t *testing.T) { repoCall.Unset() } } - func TestConnectThingsHandler(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) @@ -916,7 +914,7 @@ func TestConnectThingsHandler(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) + _, err := svc.Add(context.Background(), validToken, config) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -933,13 +931,13 @@ func TestConnectThingsHandler(t *testing.T) { { desc: "connect", channelID: channel.ID, - thingID: saved.ThingID, + thingID: config.ThingID, err: nil, }, { desc: "connect connected", channelID: channel.ID, - thingID: saved.ThingID, + thingID: config.ThingID, err: nil, }, } @@ -951,7 +949,6 @@ func TestConnectThingsHandler(t *testing.T) { repoCall.Unset() } } - func TestDisconnectThingsHandler(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) @@ -959,7 +956,7 @@ func TestDisconnectThingsHandler(t *testing.T) { repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - saved, err := svc.Add(context.Background(), validToken, config) + _, err := svc.Add(context.Background(), validToken, config) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) repoCall.Unset() repoCall1.Unset() @@ -976,13 +973,13 @@ func TestDisconnectThingsHandler(t *testing.T) { { desc: "disconnect", channelID: channel.ID, - thingID: saved.ThingID, + thingID: config.ThingID, err: nil, }, { desc: "disconnect disconnected", channelID: channel.ID, - thingID: saved.ThingID, + thingID: config.ThingID, err: nil, }, } From 3cd7e060c71cff63f11b470e6e9dc01d014a50ac Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 20 May 2024 11:33:35 +0300 Subject: [PATCH 06/20] Refactor: Add tests Signed-off-by: JeffMboya --- bootstrap/service_test.go | 61 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index ca84a304a1..3f259e251e 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -708,6 +708,7 @@ func TestBootstrap(t *testing.T) { repoCall.Unset() } } + func TestChangeState(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) @@ -907,6 +908,7 @@ func TestRemoveConfigHandler(t *testing.T) { repoCall.Unset() } } + func TestConnectThingsHandler(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) @@ -929,17 +931,47 @@ func TestConnectThingsHandler(t *testing.T) { err error }{ { - desc: "connect", + desc: "connect a thing", channelID: channel.ID, thingID: config.ThingID, err: nil, }, { - desc: "connect connected", + desc: "connect connected thing", channelID: channel.ID, thingID: config.ThingID, err: nil, }, + { + desc: "connect for a disconnected thing", + channelID: channel.ID, + thingID: config.ThingID, + err: err, + }, + { + desc: "connect for an invalid thing", + channelID: channel.ID, + thingID: unknown, + err: err, + }, + { + desc: "connect for a random thing", + channelID: channel.ID, + thingID: unknown, + err: err, + }, + { + desc: "connect for an empty thing", + channelID: channel.ID, + thingID: "", + err: err, + }, + { + desc: "connect with failed connection", + channelID: channel.ID, + thingID: config.ThingID, + err: err, + }, } for _, tc := range cases { @@ -949,6 +981,7 @@ func TestConnectThingsHandler(t *testing.T) { repoCall.Unset() } } + func TestDisconnectThingsHandler(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) @@ -982,6 +1015,30 @@ func TestDisconnectThingsHandler(t *testing.T) { thingID: config.ThingID, err: nil, }, + { + desc: "disconnect for an invalid thing", + channelID: channel.ID, + thingID: unknown, + err: err, + }, + { + desc: "disconnect for a random thing", + channelID: channel.ID, + thingID: unknown, + err: err, + }, + { + desc: "disconnect for an empty thing", + channelID: channel.ID, + thingID: "", + err: err, + }, + { + desc: "disconnect with failed disconnection", + channelID: channel.ID, + thingID: config.ThingID, + err: err, + }, } for _, tc := range cases { From 8b9ea2ab489dcc35bf050b522397a2b3f5e4a762 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 20 May 2024 13:04:55 +0300 Subject: [PATCH 07/20] Refactor: Add tests Signed-off-by: JeffMboya --- bootstrap/service_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 3f259e251e..94c840c287 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -954,6 +954,12 @@ func TestConnectThingsHandler(t *testing.T) { thingID: unknown, err: err, }, + { + desc: "connect for an invalid chanel", + channelID: unknown, + thingID: config.ThingID, + err: err, + }, { desc: "connect for a random thing", channelID: channel.ID, @@ -966,6 +972,18 @@ func TestConnectThingsHandler(t *testing.T) { thingID: "", err: err, }, + { + desc: "connect for an empty channel", + channelID: "", + thingID: config.ThingID, + err: err, + }, + { + desc: "connect for an empty thing and channel", + channelID: "", + thingID: "", + err: err, + }, { desc: "connect with failed connection", channelID: channel.ID, From 93ff4874352cc21c5e0286f7672fa698bd924d2b Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Thu, 23 May 2024 12:22:11 +0300 Subject: [PATCH 08/20] refactor: ConnectThing and DisconnectThing methods Signed-off-by: JeffMboya --- bootstrap/configs.go | 6 ++---- bootstrap/events/consumer/streams.go | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/bootstrap/configs.go b/bootstrap/configs.go index 1e68da63be..b685bac02e 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -112,11 +112,9 @@ type ConfigRepository interface { // RemoveChannel removes channel with the given ID. RemoveChannel(ctx context.Context, id string) error - // ConnectHandler changes state of the Config when the corresponding Thing is - // connected to the Channel. + // ConnectHandler changes state of the Config when the corresponding Thing is connected to the Channel. ConnectThing(ctx context.Context, channelID, thingID string) error - // DisconnectHandler changes state of the Config when the corresponding Thing is - // disconnected from the Channel. + // DisconnectHandler changes state of the Config when the corresponding Thing is disconnected from the Channel. DisconnectThing(ctx context.Context, channelID, thingID string) error } diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index 9d6e0abcd0..759ee861a3 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -103,7 +103,7 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent { } func decodeConnectThing(event map[string]interface{}) connectionEvent { - if event["memberKind"] != memberKind && event["relation"] != relation { + if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation { return connectionEvent{} } @@ -114,7 +114,7 @@ func decodeConnectThing(event map[string]interface{}) connectionEvent { } func decodeDisconnectThing(event map[string]interface{}) connectionEvent { - if event["memberKind"] != memberKind && event["relation"] != relation { + if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation { return connectionEvent{} } return connectionEvent{ From 3b4b78a69d828481c719e7e9730642295004d005 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Sun, 26 May 2024 12:40:24 +0300 Subject: [PATCH 09/20] fix: fix ConnectThing and DisconnectThing comments Signed-off-by: JeffMboya --- bootstrap/configs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bootstrap/configs.go b/bootstrap/configs.go index b685bac02e..052526e913 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -112,9 +112,9 @@ type ConfigRepository interface { // RemoveChannel removes channel with the given ID. RemoveChannel(ctx context.Context, id string) error - // ConnectHandler changes state of the Config when the corresponding Thing is connected to the Channel. + // ConnectThing changes state of the Config when the corresponding Thing is connected to the Channel. ConnectThing(ctx context.Context, channelID, thingID string) error - // DisconnectHandler changes state of the Config when the corresponding Thing is disconnected from the Channel. + // DisconnectThing changes state of the Config when the corresponding Thing is disconnected from the Channel. DisconnectThing(ctx context.Context, channelID, thingID string) error } From c95639c51c97c73aec821fe0f01f682f8b375b14 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Sun, 26 May 2024 23:16:26 +0300 Subject: [PATCH 10/20] refactor: ConnectThing and DisconnectThing tests Signed-off-by: JeffMboya --- bootstrap/postgres/configs.go | 21 ++++ bootstrap/postgres/configs_test.go | 171 +++++++++++++++++++++++++++-- 2 files changed, 184 insertions(+), 8 deletions(-) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 87d2ff40af..c811e782c0 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -17,6 +17,7 @@ import ( "github.com/absmach/magistrala/pkg/clients" "github.com/absmach/magistrala/pkg/errors" repoerr "github.com/absmach/magistrala/pkg/errors/repository" + svcerr "github.com/absmach/magistrala/pkg/errors/service" "github.com/jackc/pgerrcode" "github.com/jackc/pgtype" "github.com/jackc/pgx/v5/pgconn" @@ -465,6 +466,23 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { } func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { + if thingID == "" || channelID == "" { + return repoerr.ErrMalformedEntity + } + + qCheck := `SELECT EXISTS(SELECT 1 FROM configs WHERE magistrala_thing = $1), state FROM configs WHERE magistrala_thing = $1` + var exists bool + var state bootstrap.State + if err := cr.db.QueryRowxContext(ctx, qCheck, thingID).Scan(&exists, &state); err != nil { + return svcerr.ErrAddPolicies + } + if !exists { + return svcerr.ErrAddPolicies + } + if state == bootstrap.Active { + return svcerr.ErrAddPolicies + } + q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID); err != nil { @@ -474,6 +492,9 @@ func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID } func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { + if thingID == "" || channelID == "" { + return repoerr.ErrMalformedEntity + } q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 60ee50389a..6218163885 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -11,8 +11,10 @@ import ( "github.com/absmach/magistrala/bootstrap" "github.com/absmach/magistrala/bootstrap/postgres" + "github.com/absmach/magistrala/internal/testsutil" "github.com/absmach/magistrala/pkg/errors" repoerr "github.com/absmach/magistrala/pkg/errors/repository" + svcerr "github.com/absmach/magistrala/pkg/errors/service" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -688,16 +690,99 @@ func TestConnectThing(t *testing.T) { c.ThingID = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() + c.State = bootstrap.Inactive saved, err := repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - err = repo.ConnectThing(context.Background(), c.Channels[0].ID, saved) - fmt.Print("state", config.State) - assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + wrongID := testsutil.GenerateUUID(&testing.T{}) - cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) - assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) - assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg)) + connectedThing := c + connectedThing.State = bootstrap.Active + + randomThing := c + randomThingID, err := uuid.NewV4() + randomThing.ThingID = randomThingID.String() + + emptyThing := c + emptyThing.ThingID = "" + emptyThing.ThingKey = "" + emptyThing.ExternalID = "" + emptyThing.ExternalKey = "" + + cases := []struct { + desc string + owner string + id string + channels []bootstrap.Channel + connections []string + err error + }{ + { + desc: "connect disconnected thing", + owner: config.Owner, + id: saved, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "connect non-existent thing", + owner: config.Owner, + id: wrongID, + channels: c.Channels, + connections: channels, + err: svcerr.ErrAddPolicies, + }, + { + desc: "connect already connected thing", + owner: config.Owner, + id: connectedThing.ThingID, + channels: c.Channels, + connections: channels, + err: svcerr.ErrAddPolicies, + }, + { + desc: "connect random thing", + owner: config.Owner, + id: randomThing.ThingID, + channels: c.Channels, + connections: channels, + err: svcerr.ErrAddPolicies, + }, + { + desc: "connect empty thing", + owner: config.Owner, + id: emptyThing.ThingID, + channels: c.Channels, + connections: channels, + err: svcerr.ErrMalformedEntity, + }, + { + desc: "connect thing with failed database update operation", + owner: config.Owner, + id: saved, + channels: c.Channels, + connections: channels, + err: svcerr.ErrAddPolicies, + }, + } + for _, tc := range cases { + for i, ch := range tc.channels { + if i == 0 { + err = repo.ConnectThing(context.Background(), ch.ID, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: Expected error: %s, got: %s.\n", tc.desc, tc.err, err)) + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg)) + } else { + _ = repo.ConnectThing(context.Background(), ch.ID, tc.id) + } + } + + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg)) + } } func TestDisconnectThing(t *testing.T) { @@ -713,11 +798,81 @@ func TestDisconnectThing(t *testing.T) { c.ThingID = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() + c.State = bootstrap.Inactive saved, err := repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - err = repo.DisconnectThing(context.Background(), c.Channels[0].ID, saved) - assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + wrongID := testsutil.GenerateUUID(&testing.T{}) + + connectedThing := c + connectedThing.State = bootstrap.Active + + randomThing := c + randomThingID, err := uuid.NewV4() + randomThing.ThingID = randomThingID.String() + + emptyThing := c + emptyThing.ThingID = "" + emptyThing.ThingKey = "" + emptyThing.ExternalID = "" + emptyThing.ExternalKey = "" + + cases := []struct { + desc string + owner string + id string + channels []bootstrap.Channel + connections []string + err error + }{ + { + desc: "disconnect connected thing", + owner: config.Owner, + id: connectedThing.ThingID, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "disconnect already disconnected thing", + owner: config.Owner, + id: saved, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "disconnect invalid thing", + owner: config.Owner, + id: wrongID, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "disconnect random thing", + owner: config.Owner, + id: randomThing.ThingID, + channels: c.Channels, + connections: channels, + err: nil, + }, + { + desc: "disconnect empty thing", + owner: config.Owner, + id: emptyThing.ThingID, + channels: c.Channels, + connections: channels, + err: repoerr.ErrMalformedEntity, + }, + } + + for _, tc := range cases { + for _, ch := range tc.channels { + err = repo.DisconnectThing(context.Background(), ch.ID, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: Expected error: %s, got: %s.\n", tc.desc, tc.err, err)) + } + } cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) From 867b4c1fcf4bb1242b4168d0baa28eddb30e6344 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 27 May 2024 09:23:48 +0300 Subject: [PATCH 11/20] refactor: ConnectThing and DisconnectThing tests Signed-off-by: JeffMboya --- bootstrap/postgres/configs.go | 7 +++---- bootstrap/postgres/configs_test.go | 17 ++++------------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index c811e782c0..616d0b6cf1 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -17,7 +17,6 @@ import ( "github.com/absmach/magistrala/pkg/clients" "github.com/absmach/magistrala/pkg/errors" repoerr "github.com/absmach/magistrala/pkg/errors/repository" - svcerr "github.com/absmach/magistrala/pkg/errors/service" "github.com/jackc/pgerrcode" "github.com/jackc/pgtype" "github.com/jackc/pgx/v5/pgconn" @@ -474,13 +473,13 @@ func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID var exists bool var state bootstrap.State if err := cr.db.QueryRowxContext(ctx, qCheck, thingID).Scan(&exists, &state); err != nil { - return svcerr.ErrAddPolicies + return repoerr.ErrNotFound } if !exists { - return svcerr.ErrAddPolicies + return repoerr.ErrNotFound } if state == bootstrap.Active { - return svcerr.ErrAddPolicies + return repoerr.ErrConflict } q := `UPDATE configs SET state = $1 WHERE EXISTS ( diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 6218163885..19f5f24a8f 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -14,7 +14,6 @@ import ( "github.com/absmach/magistrala/internal/testsutil" "github.com/absmach/magistrala/pkg/errors" repoerr "github.com/absmach/magistrala/pkg/errors/repository" - svcerr "github.com/absmach/magistrala/pkg/errors/service" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -731,7 +730,7 @@ func TestConnectThing(t *testing.T) { id: wrongID, channels: c.Channels, connections: channels, - err: svcerr.ErrAddPolicies, + err: repoerr.ErrNotFound, }, { desc: "connect already connected thing", @@ -739,7 +738,7 @@ func TestConnectThing(t *testing.T) { id: connectedThing.ThingID, channels: c.Channels, connections: channels, - err: svcerr.ErrAddPolicies, + err: repoerr.ErrConflict, }, { desc: "connect random thing", @@ -747,7 +746,7 @@ func TestConnectThing(t *testing.T) { id: randomThing.ThingID, channels: c.Channels, connections: channels, - err: svcerr.ErrAddPolicies, + err: repoerr.ErrNotFound, }, { desc: "connect empty thing", @@ -755,15 +754,7 @@ func TestConnectThing(t *testing.T) { id: emptyThing.ThingID, channels: c.Channels, connections: channels, - err: svcerr.ErrMalformedEntity, - }, - { - desc: "connect thing with failed database update operation", - owner: config.Owner, - id: saved, - channels: c.Channels, - connections: channels, - err: svcerr.ErrAddPolicies, + err: repoerr.ErrMalformedEntity, }, } for _, tc := range cases { From 1f199a0db980f5e64f4eb17d67b18684bf2b904e Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 27 May 2024 09:40:59 +0300 Subject: [PATCH 12/20] refactor: ConnectThing and DisconnectThing tests Signed-off-by: JeffMboya --- bootstrap/postgres/configs_test.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 19f5f24a8f..74a611cafb 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -36,7 +36,8 @@ var ( State: bootstrap.Inactive, } - channels = []string{"1", "2"} + channels = []string{"1", "2"} + emptyChannels = []string{} ) func TestSave(t *testing.T) { @@ -707,6 +708,7 @@ func TestConnectThing(t *testing.T) { emptyThing.ThingKey = "" emptyThing.ExternalID = "" emptyThing.ExternalKey = "" + emptyThing.Channels = []bootstrap.Channel{} cases := []struct { desc string @@ -724,6 +726,14 @@ func TestConnectThing(t *testing.T) { connections: channels, err: nil, }, + { + desc: "connect with empty channel", + owner: config.Owner, + id: saved, + channels: emptyThing.Channels, + connections: emptyChannels, + err: repoerr.ErrMalformedEntity, + }, { desc: "connect non-existent thing", owner: config.Owner, From 75db138f6bf24470b8684d0487a92c4d4a4362fe Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 27 May 2024 14:30:19 +0300 Subject: [PATCH 13/20] refactor: ConnectThing and DisconnectThing tests Signed-off-by: JeffMboya --- bootstrap/events/consumer/streams.go | 15 +++++- bootstrap/postgres/configs.go | 75 ++++++++++++---------------- bootstrap/postgres/configs_test.go | 29 ++++------- 3 files changed, 56 insertions(+), 63 deletions(-) diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index 759ee861a3..19cd45f2b1 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -9,6 +9,7 @@ import ( "time" "github.com/absmach/magistrala/bootstrap" + svcerr "github.com/absmach/magistrala/pkg/errors/service" "github.com/absmach/magistrala/pkg/events" ) @@ -48,14 +49,26 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { err = es.svc.RemoveConfigHandler(ctx, rte.id) case thingConnect: cte := decodeConnectThing(msg) + if cte.channelID == "" { + return svcerr.ErrMalformedEntity + } for _, thingID := range cte.thingID { - if err = es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil { + if thingID == "" { + return svcerr.ErrMalformedEntity + } + if err := es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil { return err } } case thingDisconnect: dte := decodeDisconnectThing(msg) + if dte.channelID == "" { + return svcerr.ErrMalformedEntity + } for _, thingID := range dte.thingID { + if thingID == "" { + return svcerr.ErrMalformedEntity + } if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil { return err } diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 616d0b6cf1..8fed7775e6 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -48,48 +48,45 @@ func NewConfigRepository(db postgres.Database, log *slog.Logger) bootstrap.Confi return &configRepository{db: db, log: log} } -func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (string, error) { - q := `INSERT INTO configs (magistrala_thing, owner, name, client_cert, client_key, ca_cert, magistrala_key, external_id, external_key, content, state) - VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)` +func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (thingID string, err error) { + q := `INSERT INTO configs (magistrala_thing, owner, name, client_cert, client_key, ca_cert, magistrala_key, external_id, external_key, content, state) + VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)` tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { return "", errors.Wrap(repoerr.ErrCreateEntity, err) } - dbcfg := toDBConfig(cfg) - if _, err := tx.NamedExec(q, dbcfg); err != nil { - if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { - err = repoerr.ErrConflict + defer func(thingID string, err error) (string, error) { + if err != nil { + if errRollback := cr.rollback(err, tx); errRollback != nil { + return thingID, errors.Wrap(err, errRollback) + } + return "", err } + return thingID, err + }(thingID, err) - if errRollback := cr.rollback(err, tx); errRollback != nil { - return "", errors.Wrap(err, errRollback) + if _, err := tx.NamedExec(q, dbcfg); err != nil { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { + return "", repoerr.ErrConflict + } else { + return "", err } } if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil { - if errRollback := cr.rollback(err, tx); errRollback != nil { - return "", errors.Wrap(err, errRollback) - } return "", errors.Wrap(errSaveChannels, err) } if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil { - if errRollback := cr.rollback(err, tx); errRollback != nil { - return "", errors.Wrap(err, errRollback) - } return "", errors.Wrap(errSaveConnections, err) } if err := tx.Commit(); err != nil { - if errRollback := cr.rollback(err, tx); errRollback != nil { - return "", errors.Wrap(err, errRollback) - } return "", err } - return cfg.ThingID, nil } @@ -465,40 +462,32 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { } func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { - if thingID == "" || channelID == "" { - return repoerr.ErrMalformedEntity - } - - qCheck := `SELECT EXISTS(SELECT 1 FROM configs WHERE magistrala_thing = $1), state FROM configs WHERE magistrala_thing = $1` - var exists bool - var state bootstrap.State - if err := cr.db.QueryRowxContext(ctx, qCheck, thingID).Scan(&exists, &state); err != nil { - return repoerr.ErrNotFound - } - if !exists { - return repoerr.ErrNotFound - } - if state == bootstrap.Active { - return repoerr.ErrConflict - } - q := `UPDATE configs SET state = $1 WHERE EXISTS ( - SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID); err != nil { + SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3) + RETURNING *` + rows, err := cr.db.QueryContext(ctx, q, bootstrap.Active, thingID, channelID) + if err != nil { return errors.Wrap(errConnectThing, err) } + defer rows.Close() + if ok := rows.Next(); !ok { + return repoerr.ErrNotFound + } return nil } func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { - if thingID == "" || channelID == "" { - return repoerr.ErrMalformedEntity - } q := `UPDATE configs SET state = $1 WHERE EXISTS ( - SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { + SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3) + RETURNING *` + rows, err := cr.db.QueryContext(ctx, q, bootstrap.Inactive, thingID, channelID) + if err != nil { return errors.Wrap(errDisconnectThing, err) } + defer rows.Close() + if ok := rows.Next(); !ok { + return repoerr.ErrNotFound + } return nil } diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 74a611cafb..34aa394508 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -36,8 +36,7 @@ var ( State: bootstrap.Inactive, } - channels = []string{"1", "2"} - emptyChannels = []string{} + channels = []string{"1", "2"} ) func TestSave(t *testing.T) { @@ -727,12 +726,12 @@ func TestConnectThing(t *testing.T) { err: nil, }, { - desc: "connect with empty channel", + desc: "connect already connected thing", owner: config.Owner, - id: saved, - channels: emptyThing.Channels, - connections: emptyChannels, - err: repoerr.ErrMalformedEntity, + id: connectedThing.ThingID, + channels: c.Channels, + connections: channels, + err: nil, }, { desc: "connect non-existent thing", @@ -742,14 +741,6 @@ func TestConnectThing(t *testing.T) { connections: channels, err: repoerr.ErrNotFound, }, - { - desc: "connect already connected thing", - owner: config.Owner, - id: connectedThing.ThingID, - channels: c.Channels, - connections: channels, - err: repoerr.ErrConflict, - }, { desc: "connect random thing", owner: config.Owner, @@ -764,7 +755,7 @@ func TestConnectThing(t *testing.T) { id: emptyThing.ThingID, channels: c.Channels, connections: channels, - err: repoerr.ErrMalformedEntity, + err: repoerr.ErrNotFound, }, } for _, tc := range cases { @@ -848,7 +839,7 @@ func TestDisconnectThing(t *testing.T) { id: wrongID, channels: c.Channels, connections: channels, - err: nil, + err: repoerr.ErrNotFound, }, { desc: "disconnect random thing", @@ -856,7 +847,7 @@ func TestDisconnectThing(t *testing.T) { id: randomThing.ThingID, channels: c.Channels, connections: channels, - err: nil, + err: repoerr.ErrNotFound, }, { desc: "disconnect empty thing", @@ -864,7 +855,7 @@ func TestDisconnectThing(t *testing.T) { id: emptyThing.ThingID, channels: c.Channels, connections: channels, - err: repoerr.ErrMalformedEntity, + err: repoerr.ErrNotFound, }, } From 38f1361b9be81454c527e99ff3aef7e40503f986 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 27 May 2024 15:10:44 +0300 Subject: [PATCH 14/20] fix: Add logging for error rollback in Save method Signed-off-by: JeffMboya --- bootstrap/postgres/configs.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 8fed7775e6..bfafaef0ff 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -8,6 +8,7 @@ import ( "database/sql" "encoding/json" "fmt" + "log" "log/slog" "strings" "time" @@ -58,14 +59,13 @@ func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsCo } dbcfg := toDBConfig(cfg) - defer func(thingID string, err error) (string, error) { + defer func(thingID string, err error) { if err != nil { if errRollback := cr.rollback(err, tx); errRollback != nil { - return thingID, errors.Wrap(err, errRollback) + log.Println(errRollback) } - return "", err + log.Println(err) } - return thingID, err }(thingID, err) if _, err := tx.NamedExec(q, dbcfg); err != nil { From dff7e7135e7282006f2e3b54687e22dffb8fc8ce Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Tue, 28 May 2024 15:21:48 +0300 Subject: [PATCH 15/20] refactor: defer rollback Signed-off-by: JeffMboya --- bootstrap/postgres/configs.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index bfafaef0ff..9c425785de 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -8,7 +8,6 @@ import ( "database/sql" "encoding/json" "fmt" - "log" "log/slog" "strings" "time" @@ -50,8 +49,8 @@ func NewConfigRepository(db postgres.Database, log *slog.Logger) bootstrap.Confi } func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (thingID string, err error) { - q := `INSERT INTO configs (magistrala_thing, owner, name, client_cert, client_key, ca_cert, magistrala_key, external_id, external_key, content, state) - VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)` + q := `INSERT INTO configs (magistrala_thing, owner, name, client_cert, client_key, ca_cert, magistrala_key, external_id, external_key, content, state) + VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)` tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { @@ -59,14 +58,11 @@ func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsCo } dbcfg := toDBConfig(cfg) - defer func(thingID string, err error) { + defer func() { if err != nil { - if errRollback := cr.rollback(err, tx); errRollback != nil { - log.Println(errRollback) - } - log.Println(err) + err = cr.rollback(err, tx) } - }(thingID, err) + }() if _, err := tx.NamedExec(q, dbcfg); err != nil { if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { @@ -317,10 +313,13 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri return errors.Wrap(repoerr.ErrUpdateEntity, err) } - if err := insertChannels(ctx, owner, channels, tx); err != nil { - if rollbackErr := cr.rollback(err, tx); rollbackErr != nil { - return err + defer func() { + if err != nil { + err = cr.rollback(err, tx) } + }() + + if err := insertChannels(ctx, owner, channels, tx); err != nil { return errors.Wrap(repoerr.ErrUpdateEntity, err) } @@ -330,16 +329,10 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri return repoerr.ErrNotFound } } - if errRollback := cr.rollback(err, tx); errRollback != nil { - return errors.Wrap(err, errRollback) - } return errors.Wrap(repoerr.ErrUpdateEntity, err) } if err := tx.Commit(); err != nil { - if errRollback := cr.rollback(err, tx); errRollback != nil { - return errors.Wrap(err, errRollback) - } return errors.Wrap(repoerr.ErrUpdateEntity, err) } From d602be36749991ee82f9774c71065e2eab8d61bf Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 29 May 2024 10:36:17 +0300 Subject: [PATCH 16/20] refactor: Update thingID to thingIDs in connectionEvent struct Signed-off-by: JeffMboya --- bootstrap/events/consumer/events.go | 2 +- bootstrap/events/consumer/streams.go | 12 ++++++------ bootstrap/postgres/configs.go | 10 ++++++---- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/bootstrap/events/consumer/events.go b/bootstrap/events/consumer/events.go index 5436cb75e9..a3a0599650 100644 --- a/bootstrap/events/consumer/events.go +++ b/bootstrap/events/consumer/events.go @@ -19,6 +19,6 @@ type updateChannelEvent struct { // Connection event is either connect or disconnect event. type connectionEvent struct { - thingID []string + thingIDs []string channelID string } diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index 19cd45f2b1..d3defda655 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -49,10 +49,10 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { err = es.svc.RemoveConfigHandler(ctx, rte.id) case thingConnect: cte := decodeConnectThing(msg) - if cte.channelID == "" { + if cte.channelID == "" || len(cte.thingIDs) == 0 { return svcerr.ErrMalformedEntity } - for _, thingID := range cte.thingID { + for _, thingID := range cte.thingIDs { if thingID == "" { return svcerr.ErrMalformedEntity } @@ -62,10 +62,10 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { } case thingDisconnect: dte := decodeDisconnectThing(msg) - if dte.channelID == "" { + if dte.channelID == "" || len(dte.thingIDs) == 0 { return svcerr.ErrMalformedEntity } - for _, thingID := range dte.thingID { + for _, thingID := range dte.thingIDs { if thingID == "" { return svcerr.ErrMalformedEntity } @@ -122,7 +122,7 @@ func decodeConnectThing(event map[string]interface{}) connectionEvent { return connectionEvent{ channelID: read(event, "group_id", ""), - thingID: ReadStringSlice(event, "member_ids"), + thingIDs: ReadStringSlice(event, "member_ids"), } } @@ -132,7 +132,7 @@ func decodeDisconnectThing(event map[string]interface{}) connectionEvent { } return connectionEvent{ channelID: read(event, "group_id", ""), - thingID: ReadStringSlice(event, "member_ids"), + thingIDs: ReadStringSlice(event, "member_ids"), } } diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 9c425785de..50a5367852 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -65,11 +65,13 @@ func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsCo }() if _, err := tx.NamedExec(q, dbcfg); err != nil { - if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { - return "", repoerr.ErrConflict - } else { - return "", err + switch pgErr := err.(type) { + case *pgconn.PgError: + if pgErr.Code == pgerrcode.UniqueViolation { + return "", repoerr.ErrConflict + } } + return "", err } if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil { From 1eb9a6167fab8f17f523b7e4c9d69ab31f3af270 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 29 May 2024 12:19:02 +0300 Subject: [PATCH 17/20] refactor: use ExecContext for updating the state of configs Signed-off-by: JeffMboya --- bootstrap/postgres/configs.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 50a5367852..1221ff54ed 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -458,14 +458,12 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( - SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3) - RETURNING *` - rows, err := cr.db.QueryContext(ctx, q, bootstrap.Active, thingID, channelID) + SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + result, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID) if err != nil { return errors.Wrap(errConnectThing, err) } - defer rows.Close() - if ok := rows.Next(); !ok { + if rows, _ := result.RowsAffected(); rows == 0 { return repoerr.ErrNotFound } return nil @@ -473,14 +471,12 @@ func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( - SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3) - RETURNING *` - rows, err := cr.db.QueryContext(ctx, q, bootstrap.Inactive, thingID, channelID) + SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + result, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID) if err != nil { return errors.Wrap(errDisconnectThing, err) } - defer rows.Close() - if ok := rows.Next(); !ok { + if rows, _ := result.RowsAffected(); rows == 0 { return repoerr.ErrNotFound } return nil From a636ca57d71348e09542798be9b24bb85f0b783b Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 5 Jun 2024 12:34:11 +0300 Subject: [PATCH 18/20] refactor: update bootstrap service tests Signed-off-by: JeffMboya --- bootstrap/service_test.go | 205 ++++---------------------------------- 1 file changed, 21 insertions(+), 184 deletions(-) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 94c840c287..19731db7d4 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -144,20 +144,7 @@ func TestAdd(t *testing.T) { } func TestView(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, auth, _ := newService() cases := []struct { desc string id string @@ -195,26 +182,13 @@ func TestView(t *testing.T) { } func TestUpdate(t *testing.T) { - svc, boot, auth, sdk := newService() + svc, boot, auth, _ := newService() c := config ch := channel ch.ID = "2" c.Channels = append(c.Channels, ch) - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, c) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - modifiedCreated := c modifiedCreated.Content = "new-config" modifiedCreated.Name = "new name" @@ -259,25 +233,9 @@ func TestUpdate(t *testing.T) { } func TestUpdateCert(t *testing.T) { - svc, boot, auth, sdk := newService() + svc, boot, auth, _ := newService() c := config - ch := channel - ch.ID = "2" - c.Channels = append(c.Channels, ch) - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, c) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - cases := []struct { desc string token string @@ -584,20 +542,7 @@ func TestList(t *testing.T) { } func TestRemove(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, auth, _ := newService() cases := []struct { desc string id string @@ -641,20 +586,7 @@ func TestRemove(t *testing.T) { } func TestBootstrap(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, _, _ := newService() e, err := enc([]byte(config.ExternalKey)) assert.Nil(t, err, fmt.Sprintf("Encrypting external key expected to succeed: %s.\n", err)) @@ -711,19 +643,6 @@ func TestBootstrap(t *testing.T) { func TestChangeState(t *testing.T) { svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(toGroup(config.Channels[0]), nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - cases := []struct { desc string state bootstrap.State @@ -785,19 +704,7 @@ func TestChangeState(t *testing.T) { } func TestUpdateChannelHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() + svc, boot, _, _ := newService() ch := bootstrap.Channel{ ID: channel.ID, Name: "new name", @@ -830,20 +737,7 @@ func TestUpdateChannelHandler(t *testing.T) { } func TestRemoveChannelHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, _, _ := newService() cases := []struct { desc string id string @@ -870,20 +764,7 @@ func TestRemoveChannelHandler(t *testing.T) { } func TestRemoveConfigHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, _, _ := newService() cases := []struct { desc string id string @@ -910,20 +791,7 @@ func TestRemoveConfigHandler(t *testing.T) { } func TestConnectThingsHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, _, _ := newService() cases := []struct { desc string thingID string @@ -940,55 +808,37 @@ func TestConnectThingsHandler(t *testing.T) { desc: "connect connected thing", channelID: channel.ID, thingID: config.ThingID, - err: nil, + err: svcerr.ErrAddPolicies, }, { desc: "connect for a disconnected thing", channelID: channel.ID, thingID: config.ThingID, - err: err, + err: nil, }, { desc: "connect for an invalid thing", channelID: channel.ID, thingID: unknown, - err: err, + err: svcerr.ErrAddPolicies, }, { - desc: "connect for an invalid chanel", + desc: "connect for an invalid channel", channelID: unknown, thingID: config.ThingID, - err: err, + err: svcerr.ErrAuthorization, }, { desc: "connect for a random thing", channelID: channel.ID, thingID: unknown, - err: err, - }, - { - desc: "connect for an empty thing", - channelID: channel.ID, - thingID: "", - err: err, - }, - { - desc: "connect for an empty channel", - channelID: "", - thingID: config.ThingID, - err: err, - }, - { - desc: "connect for an empty thing and channel", - channelID: "", - thingID: "", - err: err, + err: svcerr.ErrAddPolicies, }, { desc: "connect with failed connection", channelID: channel.ID, thingID: config.ThingID, - err: err, + err: svcerr.ErrAddPolicies, }, } @@ -1001,20 +851,7 @@ func TestConnectThingsHandler(t *testing.T) { } func TestDisconnectThingsHandler(t *testing.T) { - svc, boot, auth, sdk := newService() - repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) - repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) - repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) - repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) - repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) - _, err := svc.Add(context.Background(), validToken, config) - assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - repoCall.Unset() - repoCall1.Unset() - repoCall2.Unset() - repoCall3.Unset() - repoCall4.Unset() - + svc, boot, _, _ := newService() cases := []struct { desc string thingID string @@ -1037,25 +874,25 @@ func TestDisconnectThingsHandler(t *testing.T) { desc: "disconnect for an invalid thing", channelID: channel.ID, thingID: unknown, - err: err, + err: svcerr.ErrDeletePolicies, }, { desc: "disconnect for a random thing", channelID: channel.ID, thingID: unknown, - err: err, + err: svcerr.ErrDeletePolicies, }, { desc: "disconnect for an empty thing", channelID: channel.ID, thingID: "", - err: err, + err: svcerr.ErrDeletePolicies, }, { desc: "disconnect with failed disconnection", channelID: channel.ID, thingID: config.ThingID, - err: err, + err: svcerr.ErrDeletePolicies, }, } From bf4aec65177c49a0b57c0651e419f568cdecf113 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Thu, 6 Jun 2024 12:09:18 +0300 Subject: [PATCH 19/20] refactor: Update ConnectThing and DisconnectThing tests Signed-off-by: JeffMboya --- bootstrap/service_test.go | 62 +++------------------------------------ 1 file changed, 4 insertions(+), 58 deletions(-) diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index 19731db7d4..5c73b59b1e 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -799,43 +799,13 @@ func TestConnectThingsHandler(t *testing.T) { err error }{ { - desc: "connect a thing", + desc: "connect", channelID: channel.ID, thingID: config.ThingID, err: nil, }, { - desc: "connect connected thing", - channelID: channel.ID, - thingID: config.ThingID, - err: svcerr.ErrAddPolicies, - }, - { - desc: "connect for a disconnected thing", - channelID: channel.ID, - thingID: config.ThingID, - err: nil, - }, - { - desc: "connect for an invalid thing", - channelID: channel.ID, - thingID: unknown, - err: svcerr.ErrAddPolicies, - }, - { - desc: "connect for an invalid channel", - channelID: unknown, - thingID: config.ThingID, - err: svcerr.ErrAuthorization, - }, - { - desc: "connect for a random thing", - channelID: channel.ID, - thingID: unknown, - err: svcerr.ErrAddPolicies, - }, - { - desc: "connect with failed connection", + desc: "connect connected", channelID: channel.ID, thingID: config.ThingID, err: svcerr.ErrAddPolicies, @@ -870,37 +840,13 @@ func TestDisconnectThingsHandler(t *testing.T) { thingID: config.ThingID, err: nil, }, - { - desc: "disconnect for an invalid thing", - channelID: channel.ID, - thingID: unknown, - err: svcerr.ErrDeletePolicies, - }, - { - desc: "disconnect for a random thing", - channelID: channel.ID, - thingID: unknown, - err: svcerr.ErrDeletePolicies, - }, - { - desc: "disconnect for an empty thing", - channelID: channel.ID, - thingID: "", - err: svcerr.ErrDeletePolicies, - }, - { - desc: "disconnect with failed disconnection", - channelID: channel.ID, - thingID: config.ThingID, - err: svcerr.ErrDeletePolicies, - }, } for _, tc := range cases { - repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + svcCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - repoCall.Unset() + svcCall.Unset() } } From 3a30ca5b68b67c80465b965a7ef055a17d4330de Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Tue, 11 Jun 2024 12:24:33 +0300 Subject: [PATCH 20/20] refactor: Update ConnectThing and DisconnectThing Signed-off-by: JeffMboya --- bootstrap/events/producer/streams_test.go | 8 ++++---- bootstrap/service.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bootstrap/events/producer/streams_test.go b/bootstrap/events/producer/streams_test.go index a97d1010bb..2bcddd0e5c 100644 --- a/bootstrap/events/producer/streams_test.go +++ b/bootstrap/events/producer/streams_test.go @@ -1098,7 +1098,7 @@ func TestConnectThingHandler(t *testing.T) { lastID := "0" for _, tc := range cases { - svcCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) + repoCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -1117,7 +1117,7 @@ func TestConnectThingHandler(t *testing.T) { } test(t, tc.event, event, tc.desc) - svcCall.Unset() + repoCall.Unset() } } @@ -1179,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) { lastID := "0" for _, tc := range cases { - svcCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) + repoCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -1198,7 +1198,7 @@ func TestDisconnectThingHandler(t *testing.T) { } test(t, tc.event, event, tc.desc) - svcCall.Unset() + repoCall.Unset() } } diff --git a/bootstrap/service.go b/bootstrap/service.go index 2a00146a44..ac607762a4 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -97,10 +97,10 @@ type Service interface { // RemoveChannelHandler removes Channel with id received from an event. RemoveChannelHandler(ctx context.Context, id string) error - // ConnectHandler changes state of the Config to active when connect event occurs. + // ConnectThingHandler changes state of the Config to active when connect event occurs. ConnectThingHandler(ctx context.Context, channelID, ThingID string) error - // DisconnectHandler changes state of the Config to inactive when disconnect event occurs. + // DisconnectThingHandler changes state of the Config to inactive when disconnect event occurs. DisconnectThingHandler(ctx context.Context, channelID, ThingID string) error }