From 6841dfc71934ca98b99f2ee821a308c8a9ec6e8a Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Tue, 24 Oct 2023 13:36:38 +0200 Subject: [PATCH 1/4] all: Expose names from patterns utility --- config/messages.json | 48 ++++++++++++------------ pkg/events/grpc/grpc.go | 57 +---------------------------- pkg/events/pattern.go | 77 +++++++++++++++++++++++++++++++++++++++ pkg/webui/locales/ja.json | 6 +-- 4 files changed, 105 insertions(+), 83 deletions(-) create mode 100644 pkg/events/pattern.go diff --git a/config/messages.json b/config/messages.json index 9f89f6d34a..515b8cfff8 100644 --- a/config/messages.json +++ b/config/messages.json @@ -4382,67 +4382,67 @@ "file": "conversion.go" } }, - "error:pkg/events/grpc:invalid_regexp": { + "error:pkg/events/grpc:no_identifiers": { "translations": { - "en": "invalid regexp" + "en": "no identifiers" }, "description": { "package": "pkg/events/grpc", "file": "grpc.go" } }, - "error:pkg/events/grpc:no_identifiers": { + "error:pkg/events/grpc:storage_disabled": { "translations": { - "en": "no identifiers" + "en": "events storage is not not enabled" }, "description": { "package": "pkg/events/grpc", "file": "grpc.go" } }, - "error:pkg/events/grpc:no_matching_events": { + "error:pkg/events/redis:channel_closed": { "translations": { - "en": "no matching events for regexp `{regexp}`" + "en": "channel closed" }, "description": { - "package": "pkg/events/grpc", - "file": "grpc.go" + "package": "pkg/events/redis", + "file": "redis.go" } }, - "error:pkg/events/grpc:storage_disabled": { + "error:pkg/events/redis:unknown_encoding": { "translations": { - "en": "events storage is not not enabled" + "en": "unknown encoding" }, "description": { - "package": "pkg/events/grpc", - "file": "grpc.go" + "package": "pkg/events/redis", + "file": "codec.go" } }, - "error:pkg/events/grpc:unknown_event_name": { + "error:pkg/events:invalid_regexp": { "translations": { - "en": "unknown event `{name}`" + "en": "invalid regexp" }, "description": { - "package": "pkg/events/grpc", - "file": "grpc.go" + "package": "pkg/events", + "file": "pattern.go" } }, - "error:pkg/events/redis:channel_closed": { + "error:pkg/events:no_matching_events": { "translations": { - "en": "channel closed" + "en": "no matching events for regexp `{regexp}`" }, "description": { - "package": "pkg/events/redis", - "file": "redis.go" + "package": "pkg/events", + "file": "pattern.go" } }, - "error:pkg/events/redis:unknown_encoding": { + "error:pkg/events:unknown_event_name": { "translations": { - "en": "unknown encoding" + "en": "unknown event `{name}`" }, "description": { - "package": "pkg/events/redis", - "file": "codec.go" + "package": "pkg/events", + "file": "pattern.go" } }, "error:pkg/fetch:fetch_file": { diff --git a/pkg/events/grpc/grpc.go b/pkg/events/grpc/grpc.go index 0de69e6009..be250151c4 100644 --- a/pkg/events/grpc/grpc.go +++ b/pkg/events/grpc/grpc.go @@ -19,9 +19,6 @@ package grpc import ( "context" "os" - "regexp" - "sort" - "strings" "time" grpc_runtime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -65,58 +62,6 @@ type EventsServer struct { definedNames map[string]struct{} } -var ( - errInvalidRegexp = errors.DefineInvalidArgument("invalid_regexp", "invalid regexp") - errNoMatchingEvents = errors.DefineInvalidArgument("no_matching_events", "no matching events for regexp `{regexp}`") - errUnknownEventName = errors.DefineInvalidArgument("unknown_event_name", "unknown event `{name}`") -) - -func (srv *EventsServer) processNames(names ...string) ([]string, error) { - if len(names) == 0 { - return nil, nil - } - nameMap := make(map[string]struct{}) - for _, name := range names { - if strings.HasPrefix(name, "/") && strings.HasSuffix(name, "/") { - re, err := regexp.Compile(strings.Trim(name, "/")) - if err != nil { - return nil, errInvalidRegexp.WithCause(err) - } - var found bool - for defined := range srv.definedNames { - if re.MatchString(defined) { - nameMap[defined] = struct{}{} - found = true - } - } - if !found { - return nil, errNoMatchingEvents.WithAttributes("regexp", re.String()) - } - } else { - var found bool - for defined := range srv.definedNames { - if name == defined { - nameMap[name] = struct{}{} - found = true - break - } - } - if !found { - return nil, errUnknownEventName.WithAttributes("name", name) - } - } - } - if len(nameMap) == 0 { - return nil, nil - } - out := make([]string, 0, len(nameMap)) - for name := range nameMap { - out = append(out, name) - } - sort.Strings(out) - return out, nil -} - var errNoIdentifiers = errors.DefineInvalidArgument("no_identifiers", "no identifiers") // Stream implements the EventsServer interface. @@ -125,7 +70,7 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve return errNoIdentifiers.New() } - names, err := srv.processNames(req.Names...) + names, err := events.NamesFromPatterns(srv.definedNames, req.Names) if err != nil { return err } diff --git a/pkg/events/pattern.go b/pkg/events/pattern.go new file mode 100644 index 0000000000..bb3d5389c3 --- /dev/null +++ b/pkg/events/pattern.go @@ -0,0 +1,77 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "regexp" + "sort" + "strings" + + "go.thethings.network/lorawan-stack/v3/pkg/errors" +) + +var ( + errInvalidRegexp = errors.DefineInvalidArgument("invalid_regexp", "invalid regexp") + errNoMatchingEvents = errors.DefineInvalidArgument("no_matching_events", "no matching events for regexp `{regexp}`") + errUnknownEventName = errors.DefineInvalidArgument("unknown_event_name", "unknown event `{name}`") +) + +// NamesFromPatterns returns the event names which match the given patterns. +// The defined names are a set of event names which are used to match the patterns. +func NamesFromPatterns(definedNames map[string]struct{}, patterns []string) ([]string, error) { + if len(patterns) == 0 { + return nil, nil + } + nameMap := make(map[string]struct{}) + for _, name := range patterns { + if strings.HasPrefix(name, "/") && strings.HasSuffix(name, "/") { + re, err := regexp.Compile(strings.Trim(name, "/")) + if err != nil { + return nil, errInvalidRegexp.WithCause(err) + } + var found bool + for defined := range definedNames { + if re.MatchString(defined) { + nameMap[defined] = struct{}{} + found = true + } + } + if !found { + return nil, errNoMatchingEvents.WithAttributes("regexp", re.String()) + } + } else { + var found bool + for defined := range definedNames { + if name == defined { + nameMap[name] = struct{}{} + found = true + break + } + } + if !found { + return nil, errUnknownEventName.WithAttributes("name", name) + } + } + } + if len(nameMap) == 0 { + return nil, nil + } + out := make([]string, 0, len(nameMap)) + for name := range nameMap { + out = append(out, name) + } + sort.Strings(out) + return out, nil +} diff --git a/pkg/webui/locales/ja.json b/pkg/webui/locales/ja.json index 9720652f04..4a993c438c 100644 --- a/pkg/webui/locales/ja.json +++ b/pkg/webui/locales/ja.json @@ -1980,13 +1980,13 @@ "error:pkg/errors:x509_certificate_invalid": "証明書が無効", "error:pkg/errors:x509_hostname": "証明書の承認された名前が要求された名前と一致しません", "error:pkg/errors:x509_unknown_authority": "不明の証明書発行機関", - "error:pkg/events/grpc:invalid_regexp": "無効な正規表現", "error:pkg/events/grpc:no_identifiers": "識別子がありません", - "error:pkg/events/grpc:no_matching_events": "正規表現`{regexp}`に一致するイベントがありません", "error:pkg/events/grpc:storage_disabled": "イベントストレージが有効になっていません", - "error:pkg/events/grpc:unknown_event_name": "不明なイベント`{name}`", "error:pkg/events/redis:channel_closed": "チャネルが閉じています", "error:pkg/events/redis:unknown_encoding": "不明なエンコーディング", + "error:pkg/events:invalid_regexp": "無効な正規表現", + "error:pkg/events:no_matching_events": "正規表現`{regexp}`に一致するイベントがありません", + "error:pkg/events:unknown_event_name": "不明なイベント`{name}`", "error:pkg/fetch:fetch_file": "ファイル `{filename}` を取得できません", "error:pkg/fetch:file_not_found": "ファイル `{filename}` が見つかりません", "error:pkg/fetch:filename_not_specified": "ファイル名が特定されません", From 853bdaed400d90beb16b06b71ef67b7dc545d2ea Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Tue, 24 Oct 2023 16:18:29 +0200 Subject: [PATCH 2/4] console: Add events protocol messages --- .../internal/events/protocol/protocol.go | 331 ++++++++++++++++++ .../internal/events/protocol/protocol_test.go | 220 ++++++++++++ 2 files changed, 551 insertions(+) create mode 100644 pkg/console/internal/events/protocol/protocol.go create mode 100644 pkg/console/internal/events/protocol/protocol_test.go diff --git a/pkg/console/internal/events/protocol/protocol.go b/pkg/console/internal/events/protocol/protocol.go new file mode 100644 index 0000000000..3348a8dc82 --- /dev/null +++ b/pkg/console/internal/events/protocol/protocol.go @@ -0,0 +1,331 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package protocol implements the protocol for the events package. +package protocol + +import ( + "encoding/json" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/jsonpb" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + statuspb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/status" +) + +var ( + errMessageType = errors.DefineInvalidArgument("message_type", "invalid message type `{type}`") + + _ json.Marshaler = (*ttnpb.EntityIdentifiers)(nil) + _ json.Unmarshaler = (*ttnpb.EntityIdentifiers)(nil) + + _ json.Marshaler = (*ttnpb.Event)(nil) + _ json.Unmarshaler = (*ttnpb.Event)(nil) +) + +// MessageType is the type of a message. +type MessageType int + +const ( + // MessageTypeSubscribe is the type of a subscribe message. + MessageTypeSubscribe MessageType = iota + // MessageTypeUnsubscribe is the type of an unsubscribe message. + MessageTypeUnsubscribe + // MessageTypePublish is the type of a publish message. + MessageTypePublish + // MessageTypeError is the type of an error message. + MessageTypeError +) + +// MarshalJSON implements json.Marshaler. +func (m MessageType) MarshalJSON() ([]byte, error) { + switch m { + case MessageTypeSubscribe: + return []byte(`"subscribe"`), nil + case MessageTypeUnsubscribe: + return []byte(`"unsubscribe"`), nil + case MessageTypePublish: + return []byte(`"publish"`), nil + case MessageTypeError: + return []byte(`"error"`), nil + default: + return nil, errMessageType.WithAttributes("type", m) + } +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *MessageType) UnmarshalJSON(data []byte) error { + switch string(data) { + case `"subscribe"`: + *m = MessageTypeSubscribe + case `"unsubscribe"`: + *m = MessageTypeUnsubscribe + case `"publish"`: + *m = MessageTypePublish + case `"error"`: + *m = MessageTypeError + default: + return errMessageType.WithAttributes("type", string(data)) + } + return nil +} + +// Request is a request message. +type Request interface { + _requestMessage() +} + +// Response is a response message. +type Response interface { + _responseMessage() +} + +// SubscribeRequest is the request to subscribe to events. +type SubscribeRequest struct { + ID uint64 `json:"id"` + Identifiers []*ttnpb.EntityIdentifiers `json:"identifiers"` + Tail uint32 `json:"tail"` + After *time.Time `json:"after"` + Names []string `json:"names"` +} + +func (SubscribeRequest) _requestMessage() {} + +// Response builds a response to the request. +func (m SubscribeRequest) Response(err error) Response { + if err != nil { + return newErrorResponse(m.ID, err) + } + return &SubscribeResponse{ + ID: m.ID, + } +} + +// MarshalJSON implements json.Marshaler. +func (m SubscribeRequest) MarshalJSON() ([]byte, error) { + type alias SubscribeRequest + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeSubscribe, + alias: alias(m), + }) +} + +// SubscribeResponse is the response to a subscribe request. +type SubscribeResponse struct { + ID uint64 `json:"id"` +} + +func (SubscribeResponse) _responseMessage() {} + +// MarshalJSON implements json.Marshaler. +func (m SubscribeResponse) MarshalJSON() ([]byte, error) { + type alias SubscribeResponse + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeSubscribe, + alias: alias(m), + }) +} + +// UnsubscribeRequest is the request to unsubscribe from events. +type UnsubscribeRequest struct { + ID uint64 `json:"id"` +} + +func (UnsubscribeRequest) _requestMessage() {} + +// MarshalJSON implements json.Marshaler. +func (m UnsubscribeRequest) MarshalJSON() ([]byte, error) { + type alias UnsubscribeRequest + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeUnsubscribe, + alias: alias(m), + }) +} + +// UnsubscribeResponse is the response to an unsubscribe request. +type UnsubscribeResponse struct { + ID uint64 `json:"id"` +} + +func (UnsubscribeResponse) _responseMessage() {} + +// Response builds a response to the request. +func (m UnsubscribeRequest) Response(err error) Response { + if err != nil { + return newErrorResponse(m.ID, err) + } + return &UnsubscribeResponse{ + ID: m.ID, + } +} + +// MarshalJSON implements json.Marshaler. +func (m UnsubscribeResponse) MarshalJSON() ([]byte, error) { + type alias UnsubscribeResponse + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeUnsubscribe, + alias: alias(m), + }) +} + +// PublishResponse is the request to publish an event. +type PublishResponse struct { + ID uint64 `json:"id"` + Event *ttnpb.Event `json:"event"` +} + +func (PublishResponse) _responseMessage() {} + +// MarshalJSON implements json.Marshaler. +func (m PublishResponse) MarshalJSON() ([]byte, error) { + type alias PublishResponse + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypePublish, + alias: alias(m), + }) +} + +// ErrorResponse is the response to an error. +type ErrorResponse struct { + ID uint64 + Error *status.Status +} + +func (ErrorResponse) _responseMessage() {} + +// statusAlias is an alias of status.Status which supports JSON marshaling. +type statusAlias statuspb.Status + +// MarshalJSON implements json.Marshaler. +func (s *statusAlias) MarshalJSON() ([]byte, error) { + return jsonpb.TTN().Marshal((*statuspb.Status)(s)) +} + +// UnmarshalJSON implements json.Unmarshaler. +func (s *statusAlias) UnmarshalJSON(data []byte) error { + return jsonpb.TTN().Unmarshal(data, (*statuspb.Status)(s)) +} + +// MarshalJSON implements json.Marshaler. +func (m ErrorResponse) MarshalJSON() ([]byte, error) { + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + ID uint64 `json:"id"` + Error *statusAlias `json:"error"` + }{ + Type: MessageTypeError, + ID: m.ID, + Error: (*statusAlias)(m.Error.Proto()), + }) +} + +func newErrorResponse(id uint64, err error) Response { + return &ErrorResponse{ + ID: id, + Error: status.Convert(err), + } +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *ErrorResponse) UnmarshalJSON(data []byte) error { + var alias struct { + ID uint64 `json:"id"` + Error *statusAlias `json:"error"` + } + if err := jsonpb.TTN().Unmarshal(data, &alias); err != nil { + return err + } + m.ID = alias.ID + m.Error = status.FromProto((*statuspb.Status)(alias.Error)) + return nil +} + +// RequestWrapper wraps a request to be sent over the websocket. +type RequestWrapper struct { + Contents Request +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *RequestWrapper) UnmarshalJSON(data []byte) error { + var contents struct { + Type MessageType `json:"type"` + } + if err := jsonpb.TTN().Unmarshal(data, &contents); err != nil { + return err + } + switch contents.Type { + case MessageTypeSubscribe: + m.Contents = &SubscribeRequest{} + case MessageTypeUnsubscribe: + m.Contents = &UnsubscribeRequest{} + default: + return errMessageType.WithAttributes("type", contents.Type) + } + return jsonpb.TTN().Unmarshal(data, m.Contents) +} + +// MarshalJSON implements json.Marshaler. +func (m RequestWrapper) MarshalJSON() ([]byte, error) { + return json.Marshal(m.Contents) +} + +// ResponseWrapper wraps a response to be sent over the websocket. +type ResponseWrapper struct { + Contents Response +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *ResponseWrapper) UnmarshalJSON(data []byte) error { + var contents struct { + Type MessageType `json:"type"` + } + if err := jsonpb.TTN().Unmarshal(data, &contents); err != nil { + return err + } + switch contents.Type { + case MessageTypeSubscribe: + m.Contents = &SubscribeResponse{} + case MessageTypeUnsubscribe: + m.Contents = &UnsubscribeResponse{} + case MessageTypePublish: + m.Contents = &PublishResponse{} + case MessageTypeError: + m.Contents = &ErrorResponse{} + default: + return errMessageType.WithAttributes("type", contents.Type) + } + return jsonpb.TTN().Unmarshal(data, m.Contents) +} + +// MarshalJSON implements json.Marshaler. +func (m ResponseWrapper) MarshalJSON() ([]byte, error) { + return json.Marshal(m.Contents) +} diff --git a/pkg/console/internal/events/protocol/protocol_test.go b/pkg/console/internal/events/protocol/protocol_test.go new file mode 100644 index 0000000000..bec85bc950 --- /dev/null +++ b/pkg/console/internal/events/protocol/protocol_test.go @@ -0,0 +1,220 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package protocol_test + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/smarty/assertions" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestMarshal(t *testing.T) { + t.Parallel() + + a := assertions.New(t) + + b, err := json.Marshal(protocol.MessageTypePublish) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`"publish"`)) + } + var tp protocol.MessageType + err = json.Unmarshal([]byte(`"publish"`), &tp) + if a.So(err, should.BeNil) { + a.So(tp, should.Equal, protocol.MessageTypePublish) + } + + b, err = json.Marshal(&protocol.SubscribeRequest{ + ID: 0x42, + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + (&ttnpb.ClientIdentifiers{ClientId: "bar"}).GetEntityIdentifiers(), + }, + Tail: 10, + After: timePtr(time.UnixMilli(123456789012).UTC()), + Names: []string{"foo", "bar"}, + }) + if a.So(err, should.BeNil) { + a.So( + b, + should.Resemble, + []byte(`{"type":"subscribe","id":66,"identifiers":[{"application_ids":{"application_id":"foo"}},{"client_ids":{"client_id":"bar"}}],"tail":10,"after":"1973-11-29T21:33:09.012Z","names":["foo","bar"]}`), // nolint:lll + ) + } + var subReq protocol.SubscribeRequest + err = json.Unmarshal( + []byte(`{"type":"subscribe","id":66,"identifiers":[{"application_ids":{"application_id":"foo"}},{"client_ids":{"client_id":"bar"}}],"tail":10,"after":"1973-11-29T21:33:09.012Z","names":["foo","bar"]}`), // nolint:lll + &subReq, + ) + if a.So(err, should.BeNil) { + a.So(subReq, should.Resemble, protocol.SubscribeRequest{ + ID: 0x42, + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + (&ttnpb.ClientIdentifiers{ClientId: "bar"}).GetEntityIdentifiers(), + }, + Tail: 10, + After: timePtr(time.UnixMilli(123456789012).UTC()), + Names: []string{"foo", "bar"}, + }) + } + + b, err = json.Marshal(&protocol.SubscribeResponse{ + ID: 0x42, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`{"type":"subscribe","id":66}`)) + } + var subResp protocol.SubscribeResponse + err = json.Unmarshal([]byte(`{"type":"subscribe","id":66}`), &subResp) + if a.So(err, should.BeNil) { + a.So(subResp, should.Resemble, protocol.SubscribeResponse{ID: 0x42}) + } + + b, err = json.Marshal(&protocol.UnsubscribeRequest{ + ID: 0x42, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`{"type":"unsubscribe","id":66}`)) + } + var unsubReq protocol.UnsubscribeRequest + err = json.Unmarshal([]byte(`{"type":"unsubscribe","id":66}`), &unsubReq) + if a.So(err, should.BeNil) { + a.So(unsubReq, should.Resemble, protocol.UnsubscribeRequest{ID: 0x42}) + } + + b, err = json.Marshal(&protocol.UnsubscribeResponse{ + ID: 0x42, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`{"type":"unsubscribe","id":66}`)) + } + var unsubResp protocol.UnsubscribeResponse + err = json.Unmarshal([]byte(`{"type":"unsubscribe","id":66}`), &unsubResp) + if a.So(err, should.BeNil) { + a.So(unsubResp, should.Resemble, protocol.UnsubscribeResponse{ID: 0x42}) + } + + b, err = json.Marshal(&protocol.PublishResponse{ + ID: 0x42, + Event: &ttnpb.Event{ + Name: "foo", + Time: timestamppb.New(time.UnixMilli(123456789012).UTC()), + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + }, + Data: test.Must(anypb.New(&ttnpb.ApplicationUp{ + Up: &ttnpb.ApplicationUp_UplinkMessage{ + UplinkMessage: &ttnpb.ApplicationUplink{}, + }, + })), + CorrelationIds: []string{"foo", "bar"}, + }, + }) + if a.So(err, should.BeNil) { + a.So( + b, + should.Resemble, + []byte(`{"type":"publish","id":66,"event":{"name":"foo","time":"1973-11-29T21:33:09.012Z","identifiers":[{"application_ids":{"application_id":"foo"}}],"data":{"@type":"type.googleapis.com/ttn.lorawan.v3.ApplicationUp","uplink_message":{}},"correlation_ids":["foo","bar"]}}`), // nolint:lll + ) + } + var pubResp protocol.PublishResponse + err = json.Unmarshal( + []byte(`{"type":"publish","id":66,"event":{"name":"foo","time":"1973-11-29T21:33:09.012Z","identifiers":[{"application_ids":{"application_id":"foo"}}],"data":{"@type":"type.googleapis.com/ttn.lorawan.v3.ApplicationUp","uplink_message":{}},"correlation_ids":["foo","bar"]}}`), // nolint:lll + &pubResp, + ) + if a.So(err, should.BeNil) { + a.So(pubResp, should.Resemble, protocol.PublishResponse{ + ID: 0x42, + Event: &ttnpb.Event{ + Name: "foo", + Time: timestamppb.New(time.UnixMilli(123456789012).UTC()), + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + }, + Data: test.Must(anypb.New(&ttnpb.ApplicationUp{ + Up: &ttnpb.ApplicationUp_UplinkMessage{ + UplinkMessage: &ttnpb.ApplicationUplink{}, + }, + })), + CorrelationIds: []string{"foo", "bar"}, + }, + }) + } + + errDefinition := errors.DefineInvalidArgument("bad_argument", "bad argument `{argument}`") + errInstance := errDefinition.WithAttributes("argument", "foo") + errStatus := status.Convert(errInstance) + errJSON := test.Must(json.Marshal(errInstance)) + b, err = json.Marshal(&protocol.ErrorResponse{ + ID: 0x42, + Error: errStatus, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(fmt.Sprintf(`{"type":"error","id":66,"error":%v}`, string(errJSON)))) // nolint:lll + } + var errResp protocol.ErrorResponse + err = json.Unmarshal([]byte(fmt.Sprintf(`{"type":"error","id":66,"error":%v}`, string(errJSON))), &errResp) // nolint:lll + if a.So(err, should.BeNil) { + a.So(errResp, should.Resemble, protocol.ErrorResponse{ + ID: 0x42, + Error: errStatus, + }) + } + + var reqWrapper protocol.RequestWrapper + err = json.Unmarshal( + []byte(`{"type":"subscribe","id":66,"identifiers":[{"application_ids":{"application_id":"foo"}},{"client_ids":{"client_id":"bar"}}],"tail":10,"after":"1973-11-29T21:33:09.012Z","names":["foo","bar"]}`), // nolint:lll + &reqWrapper, + ) + if a.So(err, should.BeNil) { + a.So(reqWrapper, should.Resemble, protocol.RequestWrapper{ + Contents: &protocol.SubscribeRequest{ + ID: 0x42, + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + (&ttnpb.ClientIdentifiers{ClientId: "bar"}).GetEntityIdentifiers(), + }, + Tail: 10, + After: timePtr(time.UnixMilli(123456789012).UTC()), + Names: []string{"foo", "bar"}, + }, + }) + } + + var respWrapper protocol.ResponseWrapper + err = json.Unmarshal([]byte(`{"type":"subscribe","id":66}`), &respWrapper) + if a.So(err, should.BeNil) { + a.So(respWrapper, should.Resemble, protocol.ResponseWrapper{ + Contents: &protocol.SubscribeResponse{ + ID: 0x42, + }, + }) + } +} + +func timePtr(t time.Time) *time.Time { + return &t +} From fbfd38b77a5b4c5cac6877ee3acb1bf5ea0e015b Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Wed, 25 Oct 2023 18:04:25 +0200 Subject: [PATCH 3/4] all: Remove log message --- pkg/auth/rights/auth_info.go | 6 ++---- pkg/devicerepository/grpc.go | 14 +++++++------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/auth/rights/auth_info.go b/pkg/auth/rights/auth_info.go index f6b59c71ab..1a7158ffd3 100644 --- a/pkg/auth/rights/auth_info.go +++ b/pkg/auth/rights/auth_info.go @@ -51,16 +51,14 @@ func AuthInfo(ctx context.Context) (authInfo *ttnpb.AuthInfoResponse, err error) var errUnauthenticated = errors.DefineUnauthenticated("unauthenticated", "unauthenticated") -// RequireAuthentication confirms if the authentication information within a context contains any rights, if so, +// RequireAuthenticated confirms if the authentication information within a context contains any rights, if so, // the request is considered to be authenticated. -func RequireAuthentication(ctx context.Context) error { - log.FromContext(ctx).Debug("Authenticate request") +func RequireAuthenticated(ctx context.Context) error { authInfo, err := AuthInfo(ctx) if err != nil { log.FromContext(ctx).WithError(err).Debug("Failed to validate authentication information") return errUnauthenticated.WithCause(err) } - if authInfo.GetAccessMethod() == nil && len(authInfo.GetUniversalRights().GetRights()) == 0 { return errUnauthenticated.New() } diff --git a/pkg/devicerepository/grpc.go b/pkg/devicerepository/grpc.go index c0d490b751..bad776016a 100644 --- a/pkg/devicerepository/grpc.go +++ b/pkg/devicerepository/grpc.go @@ -65,7 +65,7 @@ func (dr *DeviceRepository) ListBrands( ctx context.Context, req *ttnpb.ListEndDeviceBrandsRequest, ) (*ttnpb.ListEndDeviceBrandsResponse, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } if req.Limit > defaultLimit || req.Limit == 0 { @@ -97,7 +97,7 @@ func (dr *DeviceRepository) GetBrand( ctx context.Context, req *ttnpb.GetEndDeviceBrandRequest, ) (*ttnpb.EndDeviceBrand, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } response, err := dr.store.GetBrands(store.GetBrandsRequest{ @@ -121,7 +121,7 @@ func (dr *DeviceRepository) ListModels( ctx context.Context, req *ttnpb.ListEndDeviceModelsRequest, ) (*ttnpb.ListEndDeviceModelsResponse, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } if req.Limit > defaultLimit || req.Limit == 0 { @@ -152,7 +152,7 @@ func (dr *DeviceRepository) GetModel( ctx context.Context, req *ttnpb.GetEndDeviceModelRequest, ) (*ttnpb.EndDeviceModel, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } response, err := dr.store.GetModels(store.GetModelsRequest{ @@ -177,7 +177,7 @@ func (dr *DeviceRepository) GetTemplate( ctx context.Context, req *ttnpb.GetTemplateRequest, ) (*ttnpb.EndDeviceTemplate, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } return dr.store.GetTemplate(req, nil) @@ -189,7 +189,7 @@ func getDecoder( f func(store.GetCodecRequest) (*ttnpb.MessagePayloadDecoder, error), ) (*ttnpb.MessagePayloadDecoder, error) { if clusterauth.Authorized(ctx) != nil { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } } @@ -218,7 +218,7 @@ func (dr *DeviceRepository) GetDownlinkEncoder( req *ttnpb.GetPayloadFormatterRequest, ) (*ttnpb.MessagePayloadEncoder, error) { if clusterauth.Authorized(ctx) != nil { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } } From addcfe5ee9b85084edd8cbb0104881fd7c2760b0 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Wed, 25 Oct 2023 18:09:46 +0200 Subject: [PATCH 4/4] console: Add internal events API --- config/messages.json | 36 ++ go.mod | 1 + go.sum | 2 + pkg/console/console.go | 2 + pkg/console/internal/events/events.go | 135 ++++++++ pkg/console/internal/events/eventsmux/mux.go | 106 ++++++ .../internal/events/eventsmux/mux_test.go | 315 ++++++++++++++++++ .../internal/events/protocol/PROTOCOL.md | 191 +++++++++++ .../events/subscriptions/subscriptions.go | 255 ++++++++++++++ .../subscriptions/subscriptions_test.go | 303 +++++++++++++++++ pkg/console/internal/events/tasks.go | 76 +++++ pkg/webui/locales/ja.json | 4 + tools/go.mod | 1 + tools/go.sum | 2 + 14 files changed, 1429 insertions(+) create mode 100644 pkg/console/internal/events/events.go create mode 100644 pkg/console/internal/events/eventsmux/mux.go create mode 100644 pkg/console/internal/events/eventsmux/mux_test.go create mode 100644 pkg/console/internal/events/protocol/PROTOCOL.md create mode 100644 pkg/console/internal/events/subscriptions/subscriptions.go create mode 100644 pkg/console/internal/events/subscriptions/subscriptions_test.go create mode 100644 pkg/console/internal/events/tasks.go diff --git a/config/messages.json b/config/messages.json index 515b8cfff8..ac9f78406f 100644 --- a/config/messages.json +++ b/config/messages.json @@ -3509,6 +3509,42 @@ "file": "shared.go" } }, + "error:pkg/console/internal/events/protocol:message_type": { + "translations": { + "en": "invalid message type `{type}`" + }, + "description": { + "package": "pkg/console/internal/events/protocol", + "file": "protocol.go" + } + }, + "error:pkg/console/internal/events/subscriptions:already_subscribed": { + "translations": { + "en": "already subscribed with ID `{id}`" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, + "error:pkg/console/internal/events/subscriptions:no_identifiers": { + "translations": { + "en": "no identifiers" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, + "error:pkg/console/internal/events/subscriptions:not_subscribed": { + "translations": { + "en": "not subscribed with ID `{id}`" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, "error:pkg/crypto/cryptoservices:no_app_key": { "translations": { "en": "no AppKey specified" diff --git a/go.mod b/go.mod index 466728cc88..5ad3ab087a 100644 --- a/go.mod +++ b/go.mod @@ -111,6 +111,7 @@ require ( gopkg.in/mail.v2 v2.3.1 gopkg.in/square/go-jose.v2 v2.6.0 gopkg.in/yaml.v2 v2.4.0 + nhooyr.io/websocket v1.8.10 ) require ( diff --git a/go.sum b/go.sum index 54ba93982e..259f3ae6ca 100644 --- a/go.sum +++ b/go.sum @@ -1254,6 +1254,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo= mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/console/console.go b/pkg/console/console.go index d012123d35..d1d26e654e 100644 --- a/pkg/console/console.go +++ b/pkg/console/console.go @@ -23,6 +23,7 @@ import ( "github.com/gorilla/csrf" "github.com/gorilla/mux" "go.thethings.network/lorawan-stack/v3/pkg/component" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events" "go.thethings.network/lorawan-stack/v3/pkg/web" "go.thethings.network/lorawan-stack/v3/pkg/web/oauthclient" "go.thethings.network/lorawan-stack/v3/pkg/webhandlers" @@ -58,6 +59,7 @@ func New(c *component.Component, config Config) (*Console, error) { } c.RegisterWeb(console) + c.RegisterWeb(events.New(c)) return console, nil } diff --git a/pkg/console/internal/events/events.go b/pkg/console/internal/events/events.go new file mode 100644 index 0000000000..caae64a0a4 --- /dev/null +++ b/pkg/console/internal/events/events.go @@ -0,0 +1,135 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package events contains the internal events APi for the Console. +package events + +import ( + "context" + "net/http" + "sync" + + "github.com/gorilla/mux" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/config" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/ratelimit" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/web" + "go.thethings.network/lorawan-stack/v3/pkg/webhandlers" + "go.thethings.network/lorawan-stack/v3/pkg/webmiddleware" + "nhooyr.io/websocket" +) + +// Component is the interface of the component to the events API handler. +type Component interface { + task.Starter + Context() context.Context + RateLimiter() ratelimit.Interface + GetBaseConfig(context.Context) config.ServiceBase +} + +type eventsHandler struct { + component Component + subscriber events.Subscriber + definedNames map[string]struct{} +} + +var _ web.Registerer = (*eventsHandler)(nil) + +func (h *eventsHandler) RegisterRoutes(server *web.Server) { + router := server.APIRouter().PathPrefix(ttnpb.HTTPAPIPrefix + "/console/internal/events/").Subrouter() + router.Use( + mux.MiddlewareFunc(webmiddleware.Namespace("console/internal/events")), + ratelimit.HTTPMiddleware(h.component.RateLimiter(), "http:console:internal:events"), + mux.MiddlewareFunc(webmiddleware.Metadata("Authorization")), + ) + router.Path("/").HandlerFunc(h.handleEvents).Methods(http.MethodGet) +} + +func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + logger := log.FromContext(ctx) + + if err := rights.RequireAuthenticated(ctx); err != nil { + webhandlers.Error(w, r, err) + return + } + + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, // CORS is not enabled for APIs. + CompressionMode: websocket.CompressionContextTakeover, + }) + if err != nil { + logger.WithError(err).Debug("Failed to accept WebSocket") + return + } + defer conn.Close(websocket.StatusNormalClosure, "main task closed") + + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + var wg sync.WaitGroup + defer wg.Wait() + + m := eventsmux.New(func(ctx context.Context, cancel func(error)) subscriptions.Interface { + return subscriptions.New(ctx, cancel, h.subscriber, h.definedNames, h.component) + }) + for name, f := range map[string]func(context.Context) error{ + "console_events_mux": makeMuxTask(m, cancel), + "console_events_read": makeReadTask(conn, m, cancel), + "console_events_write": makeWriteTask(conn, m, cancel), + } { + wg.Add(1) + h.component.StartTask(&task.Config{ + Context: ctx, + ID: name, + Func: f, + Done: wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + } +} + +// Option configures the events API handler. +type Option func(*eventsHandler) + +// WithSubscriber configures the Subscriber to use for events. +func WithSubscriber(subscriber events.Subscriber) Option { + return func(h *eventsHandler) { + h.subscriber = subscriber + } +} + +// New returns an events API handler for the Console. +func New(c Component, opts ...Option) web.Registerer { + definedNames := make(map[string]struct{}) + for _, def := range events.All().Definitions() { + definedNames[def.Name()] = struct{}{} + } + h := &eventsHandler{ + component: c, + subscriber: events.DefaultPubSub(), + definedNames: definedNames, + } + for _, opt := range opts { + opt(h) + } + return h +} diff --git a/pkg/console/internal/events/eventsmux/mux.go b/pkg/console/internal/events/eventsmux/mux.go new file mode 100644 index 0000000000..e0874f9c51 --- /dev/null +++ b/pkg/console/internal/events/eventsmux/mux.go @@ -0,0 +1,106 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package eventsmux implements the events mux. +package eventsmux + +import ( + "context" + + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" +) + +// Interface is the interface for the events mux. +type Interface interface { + // Requests returns the channel for requests. + Requests() chan<- protocol.Request + // Responses returns the channel for responses. + Responses() <-chan protocol.Response + + // Run runs the events mux. + Run(context.Context) error +} + +type mux struct { + createSubs func(context.Context, func(error)) subscriptions.Interface + + requestCh chan protocol.Request + responseCh chan protocol.Response +} + +// Requests implements Interface. +func (m *mux) Requests() chan<- protocol.Request { + return m.requestCh +} + +// Responses implements Interface. +func (m *mux) Responses() <-chan protocol.Response { + return m.responseCh +} + +// Run implements Interface. +func (m *mux) Run(ctx context.Context) (err error) { + ctx, cancel := context.WithCancelCause(ctx) + defer func() { cancel(err) }() + subs := m.createSubs(ctx, cancel) + defer subs.Close() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-m.requestCh: + var resp protocol.Response + switch req := req.(type) { + case *protocol.SubscribeRequest: + resp = req.Response(subs.Subscribe(req.ID, req.Identifiers, req.After, req.Tail, req.Names)) + case *protocol.UnsubscribeRequest: + resp = req.Response(subs.Unsubscribe(req.ID)) + default: + panic("unreachable") + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.responseCh <- resp: + } + case subEvt := <-subs.SubscriptionEvents(): + evtPB, err := events.Proto(subEvt.Event) + if err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to convert event to proto") + continue + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.responseCh <- &protocol.PublishResponse{ + ID: subEvt.ID, + Event: evtPB, + }: + } + } + } +} + +// New returns a new Interface. +func New(createSubs func(context.Context, func(error)) subscriptions.Interface) Interface { + return &mux{ + createSubs: createSubs, + + requestCh: make(chan protocol.Request, 1), + responseCh: make(chan protocol.Response, 1), + } +} diff --git a/pkg/console/internal/events/eventsmux/mux_test.go b/pkg/console/internal/events/eventsmux/mux_test.go new file mode 100644 index 0000000000..de220f52fa --- /dev/null +++ b/pkg/console/internal/events/eventsmux/mux_test.go @@ -0,0 +1,315 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventsmux_test + +import ( + "context" + "errors" + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type subscribeRequest struct { + ID uint64 + Identifiers []*ttnpb.EntityIdentifiers + After *time.Time + Tail uint32 + Names []string + + Response chan<- error +} + +type unsubscribeRequest struct { + ID uint64 + + Response chan<- error +} + +type mockSubscriptions struct { + ctx context.Context + subReqs chan subscribeRequest + unsubReqs chan unsubscribeRequest + evsCh chan *subscriptions.SubscriptionEvent +} + +// Subscribe implements subscriptions.Interface. +func (m *mockSubscriptions) Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, +) error { + ch := make(chan error, 1) + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case m.subReqs <- subscribeRequest{ + ID: id, + Identifiers: identifiers, + After: after, + Tail: tail, + Names: names, + + Response: ch, + }: + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case err := <-ch: + return err + } + } +} + +// Unsubscribe implements subscriptions.Interface. +func (m *mockSubscriptions) Unsubscribe(id uint64) error { + ch := make(chan error, 1) + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case m.unsubReqs <- unsubscribeRequest{ + ID: id, + + Response: ch, + }: + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case err := <-ch: + return err + } + } +} + +// SubscriptionEvents implements subscriptions.Interface. +func (m *mockSubscriptions) SubscriptionEvents() <-chan *subscriptions.SubscriptionEvent { + return m.evsCh +} + +// Close implements subscriptions.Interface. +func (*mockSubscriptions) Close() error { return nil } + +var _ subscriptions.Interface = (*mockSubscriptions)(nil) + +func TestMux(t *testing.T) { // nolint:gocyclo + t.Parallel() + + a, ctx := test.New(t) + + appIDs := &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + } + ctx = rights.NewContext(ctx, &rights.Rights{ + ApplicationRights: *rights.NewMap(map[string]*ttnpb.Rights{ + unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL), + }), + }) + + subs := &mockSubscriptions{ + ctx: ctx, + subReqs: make(chan subscribeRequest, 1), + unsubReqs: make(chan unsubscribeRequest, 1), + evsCh: make(chan *subscriptions.SubscriptionEvent, 1), + } + m := eventsmux.New(func(context.Context, func(error)) subscriptions.Interface { return subs }) + + go m.Run(ctx) // nolint:errcheck + + now := time.Now() + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.SubscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.subReqs: + a.So(req, should.Resemble, subscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.SubscribeResponse{ + ID: 42, + }) + } + + errAlreadySubscribed := errors.New("already subscribed") + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.SubscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.subReqs: + a.So(req, should.Resemble, subscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- errAlreadySubscribed: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.ErrorResponse{ + ID: 42, + Error: status.New(codes.Unknown, "already subscribed"), + }) + } + + ev := events.New( + ctx, + "test.evt", + "test event", + events.WithIdentifiers(appIDs), + ) + select { + case <-ctx.Done(): + return + case subs.evsCh <- &subscriptions.SubscriptionEvent{ + ID: 42, + Event: ev, + }: + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.PublishResponse{ + ID: 42, + Event: test.Must(events.Proto(ev)), + }) + } + + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.UnsubscribeRequest{ + ID: 42, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.unsubReqs: + a.So(req, should.Resemble, unsubscribeRequest{ + ID: 42, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.UnsubscribeResponse{ + ID: 42, + }) + } + + errNotSubscribed := errors.New("not subscribed") + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.UnsubscribeRequest{ + ID: 42, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.unsubReqs: + a.So(req, should.Resemble, unsubscribeRequest{ + ID: 42, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- errNotSubscribed: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.ErrorResponse{ + ID: 42, + Error: status.New(codes.Unknown, "not subscribed"), + }) + } +} diff --git a/pkg/console/internal/events/protocol/PROTOCOL.md b/pkg/console/internal/events/protocol/PROTOCOL.md new file mode 100644 index 0000000000..168dc5980c --- /dev/null +++ b/pkg/console/internal/events/protocol/PROTOCOL.md @@ -0,0 +1,191 @@ +### Internal Events API + +The Console internal events API is designed as an alternative to the `Events.Stream` gRPC API for event stream interactions. It allows multiple subscriptions to be multiplexed over a singular [WebSocket](https://en.wikipedia.org/wiki/WebSocket) connection. + +### Reasoning + +The `Events.Stream` gRPC API is available to HTTP clients via [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway). While translated to HTTP, it is visible as a long-polling request whose response body will contain the events as a series of JSON objects. + +This approach is efficient in the context of [HTTP/2](https://en.wikipedia.org/wiki/HTTP/2) which supports multiplexing multiple requests over a singular TCP connection. + +Unfortunately the connection between a browser and The Things Stack is susceptible to proxies. Corporate environments are generally equipped with such proxies, and in their presence the connections are downgraded to HTTP/1.1 semantics. + +In HTTP/1.1 connections can be used for a singular request at a time - it is not possible to multiplex the requests over a singular connection, and only [keep-alive](https://en.wikipedia.org/wiki/HTTP_persistent_connection) connections are available. + +This is problematic as browsers have builtin limits for the number of concurrent connections that singular windows may use. This leads to hard to debug issues which are hardly reproducible. + +But, there is one silver lining - the connection limit _does not apply to WebSocket connections_. The internal events API is designed to deal with this limitation while providing an experience similar to the original `Events.Stream` gRPC API. + +### Endpoint + +The endpoint for the internal events API is `/api/v3/console/internal/events/`. Note that the trailing slash is not optional. + +### Semantics + +The protocol is [full-duplex](https://en.wikipedia.org/wiki/Duplex_(telecommunications)#Full_duplex) - the client side and server side may transmit messages at any time without waiting for a response from the other party. + +The protocol is centered around subscriptions. Subscriptions are identified by an unsigned numerical ID, which is selected by the client. + +A subscription is initiated by the client via a subscription request, which the server confirms either with a subscription response or an error response. + +Following a successful subscription, the server may send at any time publication responses containing the subscription identifier and an event. The subscription identifier can be used on the client side in order to route the event to the appropriate component or view. + +A subscription can be terminated via an unsubscribe request, which the server confirms either with an unsubscribe response or an error response. + +The client can expect that no publication responses will follow an unsubscribe response, but it is recommended that subscription identifiers are not recycled within the same session. + +Error responses can be expected when the request contents are invalid (lack of identifiers, or invalid identifiers), or the caller is not authorized to subscribe to the provided identifiers. It is also invalid to request a subscription with the same identifier as an existing subscription, or to unsubscribe using an identifier which is not subscribed. + +Error response are provided as a debugging facility, and the errors are generally not fixable by the Console user. + +A special case exists for situations in which the caller is no longer authorized to receive any events associated with the provided identifiers _after_ the subscription response has been sent. This can happen if the caller token has expired or the rights have been revoked while the stream is ongoing. In such situations the server will terminate the connection explicitly. + +### Authentication and Authorization + +The authentication for the internal API is similar to other APIs available in The Things Stack. Given a `Bearer` token `t`, the `Authorization` header should contain the value `Bearer t`. + +Upon connecting, no authorization will take place - the endpoint only will check that the provided token is valid (i.e. exists and it is not expired). + +### Message Format + +Both requests and responses sent over the WebSocket connection are JSON encoded. All messages are JSON objects and are required to contain at least the following two fields: + +- `type`: a string whose value must be either `subscribe`, `unsubscribe`, `publish` or `error`. +- `id`: an unsigned integer which identifies the underlying subscription being served. + +Each of the following subsections describes an individual message and the message direction (client to server or server to client). + +#### `SubscribeRequest` [C -> S] + +- `type`: `subscribe` +- `id`: the subscription identifier +- `identifiers`, `tail`, `after`, `names`: semantically the same fields as those of the `StreamEventsRequest` Protobuf message. + +Example: + +```json +{ + "type": "subscribe", + "id": 1, + "tail": 10, + "identifiers": [ + { + "application_ids": { + "application_id": "app1" + } + } + ] +} +``` + +#### `SubscribeResponse` [S -> C] + +- `type`: `subscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "subscribe", + "id": 1 +} +``` + +#### `UnsubscribeRequest` [C -> S] + +- `type`: `unsubscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "unsubscribe", + "id": 1 +} +``` + +#### `UnsubscribeResponse` [S -> C] + +- `type`: `unsubscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "unsubscribe", + "id": 1 +} +``` + +#### `PublishResponse` [S -> C] + +- `type`: `publish` +- `id`: the subscription identifier +- `event`: an `Event` Protobuf message encoded as a JSON object + +Example: + +```json +{ + "type": "publish", + "id": 1, + "event": { + "name": "as.up.data.forward", + "time": "2023-10-26T16:27:14.103854Z", + "identifiers": [ + { + "device_ids": { + "device_id": "eui-0000000000000003", + "application_ids": { + "application_id": "app1" + } + } + } + ], + "context": { + "tenant-id": "Cgl0aGV0aGluZ3M=" + }, + "visibility": { + "rights": [ + "RIGHT_APPLICATION_TRAFFIC_READ" + ] + }, + "unique_id": "01HDPCZDSQ358JMHD4SC2BQAB8" + } +} +``` + +#### ErrorResponse [S -> C] + +- `type`: `error` +- `id`: the subscription identifier +- `error`: a `Status` Protobuf message encoded as a JSON object + +Example: + +```json +{ + "type": "error", + "id": 1, + "error": { + "code": 6, + "message": "error:pkg/console/internal/events/subscriptions:already_subscribed (already subscribed with ID `1`)", + "details": [ + { + "@type": "type.googleapis.com/ttn.lorawan.v3.ErrorDetails", + "namespace": "pkg/console/internal/events/subscriptions", + "name": "already_subscribed", + "message_format": "already subscribed with ID `{id}`", + "attributes": { + "id": "1" + }, + "correlation_id": "5da004b9f61f479aafe5bbcae4551e63", + "code": 6 + } + ] + } +} +``` diff --git a/pkg/console/internal/events/subscriptions/subscriptions.go b/pkg/console/internal/events/subscriptions/subscriptions.go new file mode 100644 index 0000000000..0a099fffdd --- /dev/null +++ b/pkg/console/internal/events/subscriptions/subscriptions.go @@ -0,0 +1,255 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package subscriptions implements the events mux subscriptions. +package subscriptions + +import ( + "context" + "sync" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil" + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" +) + +// SubscriptionEvent wraps an events.Event with a subscription ID. +type SubscriptionEvent struct { + ID uint64 + Event events.Event +} + +// Interface is the interface for the events mux subscriptions. +type Interface interface { + // Subscribe subscribes to events. + Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, + ) error + // Unsubscribe unsubscribe to events. + Unsubscribe(id uint64) error + + // SubscriptionEvents provides the events for the underlying subscriptions. + SubscriptionEvents() <-chan *SubscriptionEvent + + // Close closes all of the underlying subscriptions and waits for the background tasks to finish. + Close() error +} + +type subscription struct { + id uint64 + cancel func(error) + wg sync.WaitGroup + cancelParent func(error) + inputCh <-chan events.Event + outputCh chan<- *SubscriptionEvent +} + +func (s *subscription) run(ctx context.Context) (err error) { + defer func() { + select { + case <-ctx.Done(): + default: + s.cancelParent(err) + } + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case evt := <-s.inputCh: + isVisible, err := rightsutil.EventIsVisible(ctx, evt) + if err != nil { + if err := rights.RequireAny(ctx, evt.Identifiers()...); err != nil { + return err + } + log.FromContext(ctx).WithError(err).Warn("Failed to check event visibility") + continue + } + if !isVisible { + continue + } + select { + case <-ctx.Done(): + return ctx.Err() + case s.outputCh <- &SubscriptionEvent{ + ID: s.id, + Event: evt, + }: + } + } + } +} + +type subscriptions struct { + ctx context.Context + cancel func(error) + subscriber events.Subscriber + definedNames map[string]struct{} + taskStarter task.Starter + + wg sync.WaitGroup + ch chan *SubscriptionEvent + subs map[uint64]*subscription +} + +var _ Interface = (*subscriptions)(nil) + +// Close implements Interface. +func (s *subscriptions) Close() error { + for id, sub := range s.subs { + delete(s.subs, id) + sub.cancel(nil) + sub.wg.Wait() + } + s.wg.Wait() + return nil +} + +// SubscriptionEvents implements Interface. +func (s *subscriptions) SubscriptionEvents() <-chan *SubscriptionEvent { return s.ch } + +var ( + errAlreadySubscribed = errors.DefineAlreadyExists("already_subscribed", "already subscribed with ID `{id}`") + errNoIdentifiers = errors.DefineInvalidArgument("no_identifiers", "no identifiers") +) + +// Subscribe implements Interface. +func (s *subscriptions) Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, +) (err error) { + if err := s.validateSubscribe(id, identifiers); err != nil { + return err + } + names, err = events.NamesFromPatterns(s.definedNames, names) + if err != nil { + return err + } + ch := make(chan events.Event, channelSize(tail)) + ctx, cancel := context.WithCancelCause(s.ctx) + defer func() { + if err != nil { + cancel(err) + } + }() + if store, hasStore := s.subscriber.(events.Store); hasStore { + if after == nil && tail == 0 { + now := time.Now() + after = &now + } + f := func(ctx context.Context) (err error) { + defer func() { + select { + case <-ctx.Done(): + default: + s.cancel(err) + } + }() + return store.SubscribeWithHistory(ctx, names, identifiers, after, int(tail), events.Channel(ch)) + } + s.wg.Add(1) + s.taskStarter.StartTask(&task.Config{ + Context: ctx, + ID: "console_events_subscribe", + Func: f, + Done: s.wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + } else { + if err := s.subscriber.Subscribe(ctx, names, identifiers, events.Channel(ch)); err != nil { + return err + } + } + sub := &subscription{ + id: id, + cancel: cancel, + cancelParent: s.cancel, + inputCh: ch, + outputCh: s.ch, + } + sub.wg.Add(1) + s.taskStarter.StartTask(&task.Config{ + Context: ctx, + ID: "console_events_filter", + Func: sub.run, + Done: sub.wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + s.subs[id] = sub + return nil +} + +var errNotSubscribed = errors.DefineNotFound("not_subscribed", "not subscribed with ID `{id}`") + +// Unsubscribe implements Interface. +func (s *subscriptions) Unsubscribe(id uint64) error { + sub, ok := s.subs[id] + if !ok { + return errNotSubscribed.WithAttributes("id", id) + } + delete(s.subs, id) + sub.cancel(nil) + sub.wg.Wait() + return nil +} + +// New returns a new Interface. +func New( + ctx context.Context, + cancel func(error), + subscriber events.Subscriber, + definedNames map[string]struct{}, + taskStarter task.Starter, +) Interface { + return &subscriptions{ + ctx: ctx, + cancel: cancel, + subscriber: subscriber, + definedNames: definedNames, + taskStarter: taskStarter, + ch: make(chan *SubscriptionEvent, 1), + subs: make(map[uint64]*subscription), + } +} + +func (s *subscriptions) validateSubscribe(id uint64, identifiers []*ttnpb.EntityIdentifiers) error { + if _, ok := s.subs[id]; ok { + return errAlreadySubscribed.WithAttributes("id", id) + } + if len(identifiers) == 0 { + return errNoIdentifiers.New() + } + for _, ids := range identifiers { + if err := ids.ValidateFields(); err != nil { + return err + } + } + return rights.RequireAny(s.ctx, identifiers...) +} + +func channelSize(n uint32) uint32 { + if n < 8 { + n = 8 + } + if n > 1024 { + n = 1024 + } + return n +} diff --git a/pkg/console/internal/events/subscriptions/subscriptions_test.go b/pkg/console/internal/events/subscriptions/subscriptions_test.go new file mode 100644 index 0000000000..10aaf738bb --- /dev/null +++ b/pkg/console/internal/events/subscriptions/subscriptions_test.go @@ -0,0 +1,303 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package subscriptions_test + +import ( + "context" + "sync" + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" +) + +type subscribeRequest struct { + Context context.Context + Names []string + Identifiers []*ttnpb.EntityIdentifiers + After *time.Time + Tail int + Handler events.Handler + + Response chan<- error +} + +type mockSubscriber struct { + subReqs chan subscribeRequest +} + +func (m *mockSubscriber) subscribeRequests() <-chan subscribeRequest { return m.subReqs } + +// Subscribe implements events.Subscriber. +func (m *mockSubscriber) Subscribe( + ctx context.Context, names []string, identifiers []*ttnpb.EntityIdentifiers, hdl events.Handler, +) error { + ch := make(chan error, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case m.subReqs <- subscribeRequest{ + Context: ctx, + Names: names, + Identifiers: identifiers, + Handler: hdl, + + Response: ch, + }: + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-ch: + return err + } + } +} + +var _ events.Subscriber = (*mockSubscriber)(nil) + +type mockPubSubStore struct { + subReqs chan subscribeRequest +} + +func (m *mockPubSubStore) subscribeRequests() <-chan subscribeRequest { return m.subReqs } + +func (*mockPubSubStore) historical() {} + +// Publish implements events.Store. +func (*mockPubSubStore) Publish(...events.Event) { panic("not implemented") } + +// Subscribe implements events.Store. +func (*mockPubSubStore) Subscribe(context.Context, []string, []*ttnpb.EntityIdentifiers, events.Handler) error { + panic("not implemented") +} + +// FindRelated implements events.Store. +func (*mockPubSubStore) FindRelated(context.Context, string) ([]events.Event, error) { + panic("not implemented") +} + +// FetchHistory implements events.Store. +func (*mockPubSubStore) FetchHistory( + context.Context, []string, []*ttnpb.EntityIdentifiers, *time.Time, int, +) ([]events.Event, error) { + panic("not implemented") +} + +// SubscribeWithHistory implements events.Store. +func (m *mockPubSubStore) SubscribeWithHistory( + ctx context.Context, names []string, ids []*ttnpb.EntityIdentifiers, after *time.Time, tail int, hdl events.Handler, +) error { + ch := make(chan error, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case m.subReqs <- subscribeRequest{ + Context: ctx, + Names: names, + Identifiers: ids, + After: after, + Tail: tail, + Handler: hdl, + + Response: ch, + }: + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-ch: + return err + } + } +} + +var _ events.Store = (*mockPubSubStore)(nil) + +func runTestSubscriptions( + t *testing.T, + subscriber interface { + events.Subscriber + subscribeRequests() <-chan subscribeRequest + }, +) { + t.Helper() + + _, historical := subscriber.(interface{ historical() }) + + a, ctx := test.New(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + timeout := test.Delay << 3 + app1IDs, app2IDs := &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + }, &ttnpb.ApplicationIdentifiers{ + ApplicationId: "bar", + } + ctx = rights.NewContext(ctx, &rights.Rights{ + ApplicationRights: *rights.NewMap(map[string]*ttnpb.Rights{ + unique.ID(ctx, app1IDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_APPLICATION_ALL), + }), + }) + + sub := subscriptions.New( + ctx, + cancel, + subscriber, + map[string]struct{}{ + "test": {}, + }, + task.StartTaskFunc(task.DefaultStartTask), + ) + defer sub.Close() + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case req := <-subscriber.subscribeRequests(): + t.Fatal("Unexpected subscribe request", req) + } + + now := time.Now() + + var wg sync.WaitGroup + defer wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + err := sub.Subscribe( + 1, + []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }, + &now, + 10, + []string{"test"}, + ) + a.So(err, should.BeNil) + }() + var handler events.Handler + select { + case <-ctx.Done(): + return + case req := <-subscriber.subscribeRequests(): + a.So(req.Context, should.HaveParentContextOrEqual, ctx) + a.So(req.Names, should.Resemble, []string{"test"}) + a.So(req.Identifiers, should.Resemble, []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }) + if historical { + a.So(req.After, should.Resemble, &now) + a.So(req.Tail, should.Equal, 10) + } + a.So(req.Handler, should.NotBeNil) + if !historical { + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + handler = req.Handler + } + wg.Wait() + + err := sub.Subscribe( + 1, + []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }, + &now, + 10, + []string{"test"}, + ) + a.So(err, should.NotBeNil) + + evt := events.New( + ctx, + "test", + "test", + events.WithIdentifiers(app2IDs), + events.WithVisibility(ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ), + ) + handler.Notify(evt) + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case subEvt := <-sub.SubscriptionEvents(): + t.Fatal("Unexpected subscription event", subEvt) + } + + evt = events.New( + ctx, + "test", + "test", + events.WithIdentifiers(app1IDs), + events.WithVisibility(ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ), + ) + handler.Notify(evt) + + select { + case <-ctx.Done(): + return + case subEvt := <-sub.SubscriptionEvents(): + a.So(subEvt.ID, should.Equal, 1) + a.So(subEvt.Event, should.ResembleEvent, evt) + } + + err = sub.Unsubscribe(1) + a.So(err, should.BeNil) + + err = sub.Unsubscribe(1) + a.So(err, should.NotBeNil) + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case subEvt := <-sub.SubscriptionEvents(): + t.Fatal("Unexpected subscription event", subEvt) + } +} + +func TestSubscriptions(t *testing.T) { + t.Parallel() + runTestSubscriptions( + t, + &mockSubscriber{ + subReqs: make(chan subscribeRequest, 1), + }, + ) +} + +func TestStoreSubscriptions(t *testing.T) { + t.Parallel() + runTestSubscriptions( + t, + &mockPubSubStore{ + subReqs: make(chan subscribeRequest, 1), + }, + ) +} diff --git a/pkg/console/internal/events/tasks.go b/pkg/console/internal/events/tasks.go new file mode 100644 index 0000000000..1130b7297b --- /dev/null +++ b/pkg/console/internal/events/tasks.go @@ -0,0 +1,76 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "errors" + "io" + + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +func makeMuxTask(m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + return m.Run(ctx) + } +} + +func makeReadTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + defer func() { + if closeErr := (websocket.CloseError{}); errors.As(err, &closeErr) { + log.FromContext(ctx).WithFields(log.Fields( + "code", closeErr.Code, + "reason", closeErr.Reason, + )).Debug("WebSocket closed") + err = io.EOF + } + }() + for { + var request protocol.RequestWrapper + if err := wsjson.Read(ctx, conn, &request); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.Requests() <- request.Contents: + } + } + } +} + +func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case response := <-m.Responses(): + if err := wsjson.Write(ctx, conn, response); err != nil { + return err + } + } + } + } +} diff --git a/pkg/webui/locales/ja.json b/pkg/webui/locales/ja.json index 4a993c438c..f472f90e2e 100644 --- a/pkg/webui/locales/ja.json +++ b/pkg/webui/locales/ja.json @@ -1883,6 +1883,10 @@ "error:pkg/config:format": "無効なフォーマット `{input}`", "error:pkg/config:missing_blob_config": "Blobストア設定が見つかりません", "error:pkg/config:unknown_blob_provider": "無効なBlobストアプロバイダ `{provider}`", + "error:pkg/console/internal/events/protocol:message_type": "", + "error:pkg/console/internal/events/subscriptions:already_subscribed": "", + "error:pkg/console/internal/events/subscriptions:no_identifiers": "", + "error:pkg/console/internal/events/subscriptions:not_subscribed": "", "error:pkg/crypto/cryptoservices:no_app_key": "指定されたAppKeyがありません", "error:pkg/crypto/cryptoservices:no_dev_eui": "指定されたDevEUIがありません", "error:pkg/crypto/cryptoservices:no_join_eui": "指定されたJoinEUIがありません", diff --git a/tools/go.mod b/tools/go.mod index 079e36dfbe..d450416177 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -257,4 +257,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.90.1 // indirect mellium.im/sasl v0.3.1 // indirect + nhooyr.io/websocket v1.8.10 // indirect ) diff --git a/tools/go.sum b/tools/go.sum index 1a77188a02..7984c18fb3 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -1290,6 +1290,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo= mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=