diff --git a/config/messages.json b/config/messages.json index 515b8cfff8..ac9f78406f 100644 --- a/config/messages.json +++ b/config/messages.json @@ -3509,6 +3509,42 @@ "file": "shared.go" } }, + "error:pkg/console/internal/events/protocol:message_type": { + "translations": { + "en": "invalid message type `{type}`" + }, + "description": { + "package": "pkg/console/internal/events/protocol", + "file": "protocol.go" + } + }, + "error:pkg/console/internal/events/subscriptions:already_subscribed": { + "translations": { + "en": "already subscribed with ID `{id}`" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, + "error:pkg/console/internal/events/subscriptions:no_identifiers": { + "translations": { + "en": "no identifiers" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, + "error:pkg/console/internal/events/subscriptions:not_subscribed": { + "translations": { + "en": "not subscribed with ID `{id}`" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, "error:pkg/crypto/cryptoservices:no_app_key": { "translations": { "en": "no AppKey specified" diff --git a/go.mod b/go.mod index 466728cc88..5ad3ab087a 100644 --- a/go.mod +++ b/go.mod @@ -111,6 +111,7 @@ require ( gopkg.in/mail.v2 v2.3.1 gopkg.in/square/go-jose.v2 v2.6.0 gopkg.in/yaml.v2 v2.4.0 + nhooyr.io/websocket v1.8.10 ) require ( diff --git a/go.sum b/go.sum index 54ba93982e..259f3ae6ca 100644 --- a/go.sum +++ b/go.sum @@ -1254,6 +1254,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo= mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/console/console.go b/pkg/console/console.go index d012123d35..d1d26e654e 100644 --- a/pkg/console/console.go +++ b/pkg/console/console.go @@ -23,6 +23,7 @@ import ( "github.com/gorilla/csrf" "github.com/gorilla/mux" "go.thethings.network/lorawan-stack/v3/pkg/component" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events" "go.thethings.network/lorawan-stack/v3/pkg/web" "go.thethings.network/lorawan-stack/v3/pkg/web/oauthclient" "go.thethings.network/lorawan-stack/v3/pkg/webhandlers" @@ -58,6 +59,7 @@ func New(c *component.Component, config Config) (*Console, error) { } c.RegisterWeb(console) + c.RegisterWeb(events.New(c)) return console, nil } diff --git a/pkg/console/internal/events/events.go b/pkg/console/internal/events/events.go new file mode 100644 index 0000000000..caae64a0a4 --- /dev/null +++ b/pkg/console/internal/events/events.go @@ -0,0 +1,135 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package events contains the internal events APi for the Console. +package events + +import ( + "context" + "net/http" + "sync" + + "github.com/gorilla/mux" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/config" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/ratelimit" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/web" + "go.thethings.network/lorawan-stack/v3/pkg/webhandlers" + "go.thethings.network/lorawan-stack/v3/pkg/webmiddleware" + "nhooyr.io/websocket" +) + +// Component is the interface of the component to the events API handler. +type Component interface { + task.Starter + Context() context.Context + RateLimiter() ratelimit.Interface + GetBaseConfig(context.Context) config.ServiceBase +} + +type eventsHandler struct { + component Component + subscriber events.Subscriber + definedNames map[string]struct{} +} + +var _ web.Registerer = (*eventsHandler)(nil) + +func (h *eventsHandler) RegisterRoutes(server *web.Server) { + router := server.APIRouter().PathPrefix(ttnpb.HTTPAPIPrefix + "/console/internal/events/").Subrouter() + router.Use( + mux.MiddlewareFunc(webmiddleware.Namespace("console/internal/events")), + ratelimit.HTTPMiddleware(h.component.RateLimiter(), "http:console:internal:events"), + mux.MiddlewareFunc(webmiddleware.Metadata("Authorization")), + ) + router.Path("/").HandlerFunc(h.handleEvents).Methods(http.MethodGet) +} + +func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + logger := log.FromContext(ctx) + + if err := rights.RequireAuthenticated(ctx); err != nil { + webhandlers.Error(w, r, err) + return + } + + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, // CORS is not enabled for APIs. + CompressionMode: websocket.CompressionContextTakeover, + }) + if err != nil { + logger.WithError(err).Debug("Failed to accept WebSocket") + return + } + defer conn.Close(websocket.StatusNormalClosure, "main task closed") + + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + var wg sync.WaitGroup + defer wg.Wait() + + m := eventsmux.New(func(ctx context.Context, cancel func(error)) subscriptions.Interface { + return subscriptions.New(ctx, cancel, h.subscriber, h.definedNames, h.component) + }) + for name, f := range map[string]func(context.Context) error{ + "console_events_mux": makeMuxTask(m, cancel), + "console_events_read": makeReadTask(conn, m, cancel), + "console_events_write": makeWriteTask(conn, m, cancel), + } { + wg.Add(1) + h.component.StartTask(&task.Config{ + Context: ctx, + ID: name, + Func: f, + Done: wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + } +} + +// Option configures the events API handler. +type Option func(*eventsHandler) + +// WithSubscriber configures the Subscriber to use for events. +func WithSubscriber(subscriber events.Subscriber) Option { + return func(h *eventsHandler) { + h.subscriber = subscriber + } +} + +// New returns an events API handler for the Console. +func New(c Component, opts ...Option) web.Registerer { + definedNames := make(map[string]struct{}) + for _, def := range events.All().Definitions() { + definedNames[def.Name()] = struct{}{} + } + h := &eventsHandler{ + component: c, + subscriber: events.DefaultPubSub(), + definedNames: definedNames, + } + for _, opt := range opts { + opt(h) + } + return h +} diff --git a/pkg/console/internal/events/eventsmux/mux.go b/pkg/console/internal/events/eventsmux/mux.go new file mode 100644 index 0000000000..e0874f9c51 --- /dev/null +++ b/pkg/console/internal/events/eventsmux/mux.go @@ -0,0 +1,106 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package eventsmux implements the events mux. +package eventsmux + +import ( + "context" + + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" +) + +// Interface is the interface for the events mux. +type Interface interface { + // Requests returns the channel for requests. + Requests() chan<- protocol.Request + // Responses returns the channel for responses. + Responses() <-chan protocol.Response + + // Run runs the events mux. + Run(context.Context) error +} + +type mux struct { + createSubs func(context.Context, func(error)) subscriptions.Interface + + requestCh chan protocol.Request + responseCh chan protocol.Response +} + +// Requests implements Interface. +func (m *mux) Requests() chan<- protocol.Request { + return m.requestCh +} + +// Responses implements Interface. +func (m *mux) Responses() <-chan protocol.Response { + return m.responseCh +} + +// Run implements Interface. +func (m *mux) Run(ctx context.Context) (err error) { + ctx, cancel := context.WithCancelCause(ctx) + defer func() { cancel(err) }() + subs := m.createSubs(ctx, cancel) + defer subs.Close() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-m.requestCh: + var resp protocol.Response + switch req := req.(type) { + case *protocol.SubscribeRequest: + resp = req.Response(subs.Subscribe(req.ID, req.Identifiers, req.After, req.Tail, req.Names)) + case *protocol.UnsubscribeRequest: + resp = req.Response(subs.Unsubscribe(req.ID)) + default: + panic("unreachable") + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.responseCh <- resp: + } + case subEvt := <-subs.SubscriptionEvents(): + evtPB, err := events.Proto(subEvt.Event) + if err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to convert event to proto") + continue + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.responseCh <- &protocol.PublishResponse{ + ID: subEvt.ID, + Event: evtPB, + }: + } + } + } +} + +// New returns a new Interface. +func New(createSubs func(context.Context, func(error)) subscriptions.Interface) Interface { + return &mux{ + createSubs: createSubs, + + requestCh: make(chan protocol.Request, 1), + responseCh: make(chan protocol.Response, 1), + } +} diff --git a/pkg/console/internal/events/eventsmux/mux_test.go b/pkg/console/internal/events/eventsmux/mux_test.go new file mode 100644 index 0000000000..de220f52fa --- /dev/null +++ b/pkg/console/internal/events/eventsmux/mux_test.go @@ -0,0 +1,315 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventsmux_test + +import ( + "context" + "errors" + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type subscribeRequest struct { + ID uint64 + Identifiers []*ttnpb.EntityIdentifiers + After *time.Time + Tail uint32 + Names []string + + Response chan<- error +} + +type unsubscribeRequest struct { + ID uint64 + + Response chan<- error +} + +type mockSubscriptions struct { + ctx context.Context + subReqs chan subscribeRequest + unsubReqs chan unsubscribeRequest + evsCh chan *subscriptions.SubscriptionEvent +} + +// Subscribe implements subscriptions.Interface. +func (m *mockSubscriptions) Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, +) error { + ch := make(chan error, 1) + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case m.subReqs <- subscribeRequest{ + ID: id, + Identifiers: identifiers, + After: after, + Tail: tail, + Names: names, + + Response: ch, + }: + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case err := <-ch: + return err + } + } +} + +// Unsubscribe implements subscriptions.Interface. +func (m *mockSubscriptions) Unsubscribe(id uint64) error { + ch := make(chan error, 1) + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case m.unsubReqs <- unsubscribeRequest{ + ID: id, + + Response: ch, + }: + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case err := <-ch: + return err + } + } +} + +// SubscriptionEvents implements subscriptions.Interface. +func (m *mockSubscriptions) SubscriptionEvents() <-chan *subscriptions.SubscriptionEvent { + return m.evsCh +} + +// Close implements subscriptions.Interface. +func (*mockSubscriptions) Close() error { return nil } + +var _ subscriptions.Interface = (*mockSubscriptions)(nil) + +func TestMux(t *testing.T) { // nolint:gocyclo + t.Parallel() + + a, ctx := test.New(t) + + appIDs := &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + } + ctx = rights.NewContext(ctx, &rights.Rights{ + ApplicationRights: *rights.NewMap(map[string]*ttnpb.Rights{ + unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL), + }), + }) + + subs := &mockSubscriptions{ + ctx: ctx, + subReqs: make(chan subscribeRequest, 1), + unsubReqs: make(chan unsubscribeRequest, 1), + evsCh: make(chan *subscriptions.SubscriptionEvent, 1), + } + m := eventsmux.New(func(context.Context, func(error)) subscriptions.Interface { return subs }) + + go m.Run(ctx) // nolint:errcheck + + now := time.Now() + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.SubscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.subReqs: + a.So(req, should.Resemble, subscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.SubscribeResponse{ + ID: 42, + }) + } + + errAlreadySubscribed := errors.New("already subscribed") + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.SubscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.subReqs: + a.So(req, should.Resemble, subscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- errAlreadySubscribed: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.ErrorResponse{ + ID: 42, + Error: status.New(codes.Unknown, "already subscribed"), + }) + } + + ev := events.New( + ctx, + "test.evt", + "test event", + events.WithIdentifiers(appIDs), + ) + select { + case <-ctx.Done(): + return + case subs.evsCh <- &subscriptions.SubscriptionEvent{ + ID: 42, + Event: ev, + }: + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.PublishResponse{ + ID: 42, + Event: test.Must(events.Proto(ev)), + }) + } + + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.UnsubscribeRequest{ + ID: 42, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.unsubReqs: + a.So(req, should.Resemble, unsubscribeRequest{ + ID: 42, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.UnsubscribeResponse{ + ID: 42, + }) + } + + errNotSubscribed := errors.New("not subscribed") + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.UnsubscribeRequest{ + ID: 42, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.unsubReqs: + a.So(req, should.Resemble, unsubscribeRequest{ + ID: 42, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- errNotSubscribed: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.ErrorResponse{ + ID: 42, + Error: status.New(codes.Unknown, "not subscribed"), + }) + } +} diff --git a/pkg/console/internal/events/protocol/PROTOCOL.md b/pkg/console/internal/events/protocol/PROTOCOL.md new file mode 100644 index 0000000000..168dc5980c --- /dev/null +++ b/pkg/console/internal/events/protocol/PROTOCOL.md @@ -0,0 +1,191 @@ +### Internal Events API + +The Console internal events API is designed as an alternative to the `Events.Stream` gRPC API for event stream interactions. It allows multiple subscriptions to be multiplexed over a singular [WebSocket](https://en.wikipedia.org/wiki/WebSocket) connection. + +### Reasoning + +The `Events.Stream` gRPC API is available to HTTP clients via [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway). While translated to HTTP, it is visible as a long-polling request whose response body will contain the events as a series of JSON objects. + +This approach is efficient in the context of [HTTP/2](https://en.wikipedia.org/wiki/HTTP/2) which supports multiplexing multiple requests over a singular TCP connection. + +Unfortunately the connection between a browser and The Things Stack is susceptible to proxies. Corporate environments are generally equipped with such proxies, and in their presence the connections are downgraded to HTTP/1.1 semantics. + +In HTTP/1.1 connections can be used for a singular request at a time - it is not possible to multiplex the requests over a singular connection, and only [keep-alive](https://en.wikipedia.org/wiki/HTTP_persistent_connection) connections are available. + +This is problematic as browsers have builtin limits for the number of concurrent connections that singular windows may use. This leads to hard to debug issues which are hardly reproducible. + +But, there is one silver lining - the connection limit _does not apply to WebSocket connections_. The internal events API is designed to deal with this limitation while providing an experience similar to the original `Events.Stream` gRPC API. + +### Endpoint + +The endpoint for the internal events API is `/api/v3/console/internal/events/`. Note that the trailing slash is not optional. + +### Semantics + +The protocol is [full-duplex](https://en.wikipedia.org/wiki/Duplex_(telecommunications)#Full_duplex) - the client side and server side may transmit messages at any time without waiting for a response from the other party. + +The protocol is centered around subscriptions. Subscriptions are identified by an unsigned numerical ID, which is selected by the client. + +A subscription is initiated by the client via a subscription request, which the server confirms either with a subscription response or an error response. + +Following a successful subscription, the server may send at any time publication responses containing the subscription identifier and an event. The subscription identifier can be used on the client side in order to route the event to the appropriate component or view. + +A subscription can be terminated via an unsubscribe request, which the server confirms either with an unsubscribe response or an error response. + +The client can expect that no publication responses will follow an unsubscribe response, but it is recommended that subscription identifiers are not recycled within the same session. + +Error responses can be expected when the request contents are invalid (lack of identifiers, or invalid identifiers), or the caller is not authorized to subscribe to the provided identifiers. It is also invalid to request a subscription with the same identifier as an existing subscription, or to unsubscribe using an identifier which is not subscribed. + +Error response are provided as a debugging facility, and the errors are generally not fixable by the Console user. + +A special case exists for situations in which the caller is no longer authorized to receive any events associated with the provided identifiers _after_ the subscription response has been sent. This can happen if the caller token has expired or the rights have been revoked while the stream is ongoing. In such situations the server will terminate the connection explicitly. + +### Authentication and Authorization + +The authentication for the internal API is similar to other APIs available in The Things Stack. Given a `Bearer` token `t`, the `Authorization` header should contain the value `Bearer t`. + +Upon connecting, no authorization will take place - the endpoint only will check that the provided token is valid (i.e. exists and it is not expired). + +### Message Format + +Both requests and responses sent over the WebSocket connection are JSON encoded. All messages are JSON objects and are required to contain at least the following two fields: + +- `type`: a string whose value must be either `subscribe`, `unsubscribe`, `publish` or `error`. +- `id`: an unsigned integer which identifies the underlying subscription being served. + +Each of the following subsections describes an individual message and the message direction (client to server or server to client). + +#### `SubscribeRequest` [C -> S] + +- `type`: `subscribe` +- `id`: the subscription identifier +- `identifiers`, `tail`, `after`, `names`: semantically the same fields as those of the `StreamEventsRequest` Protobuf message. + +Example: + +```json +{ + "type": "subscribe", + "id": 1, + "tail": 10, + "identifiers": [ + { + "application_ids": { + "application_id": "app1" + } + } + ] +} +``` + +#### `SubscribeResponse` [S -> C] + +- `type`: `subscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "subscribe", + "id": 1 +} +``` + +#### `UnsubscribeRequest` [C -> S] + +- `type`: `unsubscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "unsubscribe", + "id": 1 +} +``` + +#### `UnsubscribeResponse` [S -> C] + +- `type`: `unsubscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "unsubscribe", + "id": 1 +} +``` + +#### `PublishResponse` [S -> C] + +- `type`: `publish` +- `id`: the subscription identifier +- `event`: an `Event` Protobuf message encoded as a JSON object + +Example: + +```json +{ + "type": "publish", + "id": 1, + "event": { + "name": "as.up.data.forward", + "time": "2023-10-26T16:27:14.103854Z", + "identifiers": [ + { + "device_ids": { + "device_id": "eui-0000000000000003", + "application_ids": { + "application_id": "app1" + } + } + } + ], + "context": { + "tenant-id": "Cgl0aGV0aGluZ3M=" + }, + "visibility": { + "rights": [ + "RIGHT_APPLICATION_TRAFFIC_READ" + ] + }, + "unique_id": "01HDPCZDSQ358JMHD4SC2BQAB8" + } +} +``` + +#### ErrorResponse [S -> C] + +- `type`: `error` +- `id`: the subscription identifier +- `error`: a `Status` Protobuf message encoded as a JSON object + +Example: + +```json +{ + "type": "error", + "id": 1, + "error": { + "code": 6, + "message": "error:pkg/console/internal/events/subscriptions:already_subscribed (already subscribed with ID `1`)", + "details": [ + { + "@type": "type.googleapis.com/ttn.lorawan.v3.ErrorDetails", + "namespace": "pkg/console/internal/events/subscriptions", + "name": "already_subscribed", + "message_format": "already subscribed with ID `{id}`", + "attributes": { + "id": "1" + }, + "correlation_id": "5da004b9f61f479aafe5bbcae4551e63", + "code": 6 + } + ] + } +} +``` diff --git a/pkg/console/internal/events/subscriptions/subscriptions.go b/pkg/console/internal/events/subscriptions/subscriptions.go new file mode 100644 index 0000000000..0a099fffdd --- /dev/null +++ b/pkg/console/internal/events/subscriptions/subscriptions.go @@ -0,0 +1,255 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package subscriptions implements the events mux subscriptions. +package subscriptions + +import ( + "context" + "sync" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil" + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" +) + +// SubscriptionEvent wraps an events.Event with a subscription ID. +type SubscriptionEvent struct { + ID uint64 + Event events.Event +} + +// Interface is the interface for the events mux subscriptions. +type Interface interface { + // Subscribe subscribes to events. + Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, + ) error + // Unsubscribe unsubscribe to events. + Unsubscribe(id uint64) error + + // SubscriptionEvents provides the events for the underlying subscriptions. + SubscriptionEvents() <-chan *SubscriptionEvent + + // Close closes all of the underlying subscriptions and waits for the background tasks to finish. + Close() error +} + +type subscription struct { + id uint64 + cancel func(error) + wg sync.WaitGroup + cancelParent func(error) + inputCh <-chan events.Event + outputCh chan<- *SubscriptionEvent +} + +func (s *subscription) run(ctx context.Context) (err error) { + defer func() { + select { + case <-ctx.Done(): + default: + s.cancelParent(err) + } + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case evt := <-s.inputCh: + isVisible, err := rightsutil.EventIsVisible(ctx, evt) + if err != nil { + if err := rights.RequireAny(ctx, evt.Identifiers()...); err != nil { + return err + } + log.FromContext(ctx).WithError(err).Warn("Failed to check event visibility") + continue + } + if !isVisible { + continue + } + select { + case <-ctx.Done(): + return ctx.Err() + case s.outputCh <- &SubscriptionEvent{ + ID: s.id, + Event: evt, + }: + } + } + } +} + +type subscriptions struct { + ctx context.Context + cancel func(error) + subscriber events.Subscriber + definedNames map[string]struct{} + taskStarter task.Starter + + wg sync.WaitGroup + ch chan *SubscriptionEvent + subs map[uint64]*subscription +} + +var _ Interface = (*subscriptions)(nil) + +// Close implements Interface. +func (s *subscriptions) Close() error { + for id, sub := range s.subs { + delete(s.subs, id) + sub.cancel(nil) + sub.wg.Wait() + } + s.wg.Wait() + return nil +} + +// SubscriptionEvents implements Interface. +func (s *subscriptions) SubscriptionEvents() <-chan *SubscriptionEvent { return s.ch } + +var ( + errAlreadySubscribed = errors.DefineAlreadyExists("already_subscribed", "already subscribed with ID `{id}`") + errNoIdentifiers = errors.DefineInvalidArgument("no_identifiers", "no identifiers") +) + +// Subscribe implements Interface. +func (s *subscriptions) Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, +) (err error) { + if err := s.validateSubscribe(id, identifiers); err != nil { + return err + } + names, err = events.NamesFromPatterns(s.definedNames, names) + if err != nil { + return err + } + ch := make(chan events.Event, channelSize(tail)) + ctx, cancel := context.WithCancelCause(s.ctx) + defer func() { + if err != nil { + cancel(err) + } + }() + if store, hasStore := s.subscriber.(events.Store); hasStore { + if after == nil && tail == 0 { + now := time.Now() + after = &now + } + f := func(ctx context.Context) (err error) { + defer func() { + select { + case <-ctx.Done(): + default: + s.cancel(err) + } + }() + return store.SubscribeWithHistory(ctx, names, identifiers, after, int(tail), events.Channel(ch)) + } + s.wg.Add(1) + s.taskStarter.StartTask(&task.Config{ + Context: ctx, + ID: "console_events_subscribe", + Func: f, + Done: s.wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + } else { + if err := s.subscriber.Subscribe(ctx, names, identifiers, events.Channel(ch)); err != nil { + return err + } + } + sub := &subscription{ + id: id, + cancel: cancel, + cancelParent: s.cancel, + inputCh: ch, + outputCh: s.ch, + } + sub.wg.Add(1) + s.taskStarter.StartTask(&task.Config{ + Context: ctx, + ID: "console_events_filter", + Func: sub.run, + Done: sub.wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + s.subs[id] = sub + return nil +} + +var errNotSubscribed = errors.DefineNotFound("not_subscribed", "not subscribed with ID `{id}`") + +// Unsubscribe implements Interface. +func (s *subscriptions) Unsubscribe(id uint64) error { + sub, ok := s.subs[id] + if !ok { + return errNotSubscribed.WithAttributes("id", id) + } + delete(s.subs, id) + sub.cancel(nil) + sub.wg.Wait() + return nil +} + +// New returns a new Interface. +func New( + ctx context.Context, + cancel func(error), + subscriber events.Subscriber, + definedNames map[string]struct{}, + taskStarter task.Starter, +) Interface { + return &subscriptions{ + ctx: ctx, + cancel: cancel, + subscriber: subscriber, + definedNames: definedNames, + taskStarter: taskStarter, + ch: make(chan *SubscriptionEvent, 1), + subs: make(map[uint64]*subscription), + } +} + +func (s *subscriptions) validateSubscribe(id uint64, identifiers []*ttnpb.EntityIdentifiers) error { + if _, ok := s.subs[id]; ok { + return errAlreadySubscribed.WithAttributes("id", id) + } + if len(identifiers) == 0 { + return errNoIdentifiers.New() + } + for _, ids := range identifiers { + if err := ids.ValidateFields(); err != nil { + return err + } + } + return rights.RequireAny(s.ctx, identifiers...) +} + +func channelSize(n uint32) uint32 { + if n < 8 { + n = 8 + } + if n > 1024 { + n = 1024 + } + return n +} diff --git a/pkg/console/internal/events/subscriptions/subscriptions_test.go b/pkg/console/internal/events/subscriptions/subscriptions_test.go new file mode 100644 index 0000000000..10aaf738bb --- /dev/null +++ b/pkg/console/internal/events/subscriptions/subscriptions_test.go @@ -0,0 +1,303 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package subscriptions_test + +import ( + "context" + "sync" + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" +) + +type subscribeRequest struct { + Context context.Context + Names []string + Identifiers []*ttnpb.EntityIdentifiers + After *time.Time + Tail int + Handler events.Handler + + Response chan<- error +} + +type mockSubscriber struct { + subReqs chan subscribeRequest +} + +func (m *mockSubscriber) subscribeRequests() <-chan subscribeRequest { return m.subReqs } + +// Subscribe implements events.Subscriber. +func (m *mockSubscriber) Subscribe( + ctx context.Context, names []string, identifiers []*ttnpb.EntityIdentifiers, hdl events.Handler, +) error { + ch := make(chan error, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case m.subReqs <- subscribeRequest{ + Context: ctx, + Names: names, + Identifiers: identifiers, + Handler: hdl, + + Response: ch, + }: + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-ch: + return err + } + } +} + +var _ events.Subscriber = (*mockSubscriber)(nil) + +type mockPubSubStore struct { + subReqs chan subscribeRequest +} + +func (m *mockPubSubStore) subscribeRequests() <-chan subscribeRequest { return m.subReqs } + +func (*mockPubSubStore) historical() {} + +// Publish implements events.Store. +func (*mockPubSubStore) Publish(...events.Event) { panic("not implemented") } + +// Subscribe implements events.Store. +func (*mockPubSubStore) Subscribe(context.Context, []string, []*ttnpb.EntityIdentifiers, events.Handler) error { + panic("not implemented") +} + +// FindRelated implements events.Store. +func (*mockPubSubStore) FindRelated(context.Context, string) ([]events.Event, error) { + panic("not implemented") +} + +// FetchHistory implements events.Store. +func (*mockPubSubStore) FetchHistory( + context.Context, []string, []*ttnpb.EntityIdentifiers, *time.Time, int, +) ([]events.Event, error) { + panic("not implemented") +} + +// SubscribeWithHistory implements events.Store. +func (m *mockPubSubStore) SubscribeWithHistory( + ctx context.Context, names []string, ids []*ttnpb.EntityIdentifiers, after *time.Time, tail int, hdl events.Handler, +) error { + ch := make(chan error, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case m.subReqs <- subscribeRequest{ + Context: ctx, + Names: names, + Identifiers: ids, + After: after, + Tail: tail, + Handler: hdl, + + Response: ch, + }: + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-ch: + return err + } + } +} + +var _ events.Store = (*mockPubSubStore)(nil) + +func runTestSubscriptions( + t *testing.T, + subscriber interface { + events.Subscriber + subscribeRequests() <-chan subscribeRequest + }, +) { + t.Helper() + + _, historical := subscriber.(interface{ historical() }) + + a, ctx := test.New(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + timeout := test.Delay << 3 + app1IDs, app2IDs := &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + }, &ttnpb.ApplicationIdentifiers{ + ApplicationId: "bar", + } + ctx = rights.NewContext(ctx, &rights.Rights{ + ApplicationRights: *rights.NewMap(map[string]*ttnpb.Rights{ + unique.ID(ctx, app1IDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_APPLICATION_ALL), + }), + }) + + sub := subscriptions.New( + ctx, + cancel, + subscriber, + map[string]struct{}{ + "test": {}, + }, + task.StartTaskFunc(task.DefaultStartTask), + ) + defer sub.Close() + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case req := <-subscriber.subscribeRequests(): + t.Fatal("Unexpected subscribe request", req) + } + + now := time.Now() + + var wg sync.WaitGroup + defer wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + err := sub.Subscribe( + 1, + []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }, + &now, + 10, + []string{"test"}, + ) + a.So(err, should.BeNil) + }() + var handler events.Handler + select { + case <-ctx.Done(): + return + case req := <-subscriber.subscribeRequests(): + a.So(req.Context, should.HaveParentContextOrEqual, ctx) + a.So(req.Names, should.Resemble, []string{"test"}) + a.So(req.Identifiers, should.Resemble, []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }) + if historical { + a.So(req.After, should.Resemble, &now) + a.So(req.Tail, should.Equal, 10) + } + a.So(req.Handler, should.NotBeNil) + if !historical { + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + handler = req.Handler + } + wg.Wait() + + err := sub.Subscribe( + 1, + []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }, + &now, + 10, + []string{"test"}, + ) + a.So(err, should.NotBeNil) + + evt := events.New( + ctx, + "test", + "test", + events.WithIdentifiers(app2IDs), + events.WithVisibility(ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ), + ) + handler.Notify(evt) + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case subEvt := <-sub.SubscriptionEvents(): + t.Fatal("Unexpected subscription event", subEvt) + } + + evt = events.New( + ctx, + "test", + "test", + events.WithIdentifiers(app1IDs), + events.WithVisibility(ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ), + ) + handler.Notify(evt) + + select { + case <-ctx.Done(): + return + case subEvt := <-sub.SubscriptionEvents(): + a.So(subEvt.ID, should.Equal, 1) + a.So(subEvt.Event, should.ResembleEvent, evt) + } + + err = sub.Unsubscribe(1) + a.So(err, should.BeNil) + + err = sub.Unsubscribe(1) + a.So(err, should.NotBeNil) + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case subEvt := <-sub.SubscriptionEvents(): + t.Fatal("Unexpected subscription event", subEvt) + } +} + +func TestSubscriptions(t *testing.T) { + t.Parallel() + runTestSubscriptions( + t, + &mockSubscriber{ + subReqs: make(chan subscribeRequest, 1), + }, + ) +} + +func TestStoreSubscriptions(t *testing.T) { + t.Parallel() + runTestSubscriptions( + t, + &mockPubSubStore{ + subReqs: make(chan subscribeRequest, 1), + }, + ) +} diff --git a/pkg/console/internal/events/tasks.go b/pkg/console/internal/events/tasks.go new file mode 100644 index 0000000000..1130b7297b --- /dev/null +++ b/pkg/console/internal/events/tasks.go @@ -0,0 +1,76 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "errors" + "io" + + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +func makeMuxTask(m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + return m.Run(ctx) + } +} + +func makeReadTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + defer func() { + if closeErr := (websocket.CloseError{}); errors.As(err, &closeErr) { + log.FromContext(ctx).WithFields(log.Fields( + "code", closeErr.Code, + "reason", closeErr.Reason, + )).Debug("WebSocket closed") + err = io.EOF + } + }() + for { + var request protocol.RequestWrapper + if err := wsjson.Read(ctx, conn, &request); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.Requests() <- request.Contents: + } + } + } +} + +func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case response := <-m.Responses(): + if err := wsjson.Write(ctx, conn, response); err != nil { + return err + } + } + } + } +} diff --git a/pkg/webui/locales/ja.json b/pkg/webui/locales/ja.json index 4a993c438c..f472f90e2e 100644 --- a/pkg/webui/locales/ja.json +++ b/pkg/webui/locales/ja.json @@ -1883,6 +1883,10 @@ "error:pkg/config:format": "無効なフォーマット `{input}`", "error:pkg/config:missing_blob_config": "Blobストア設定が見つかりません", "error:pkg/config:unknown_blob_provider": "無効なBlobストアプロバイダ `{provider}`", + "error:pkg/console/internal/events/protocol:message_type": "", + "error:pkg/console/internal/events/subscriptions:already_subscribed": "", + "error:pkg/console/internal/events/subscriptions:no_identifiers": "", + "error:pkg/console/internal/events/subscriptions:not_subscribed": "", "error:pkg/crypto/cryptoservices:no_app_key": "指定されたAppKeyがありません", "error:pkg/crypto/cryptoservices:no_dev_eui": "指定されたDevEUIがありません", "error:pkg/crypto/cryptoservices:no_join_eui": "指定されたJoinEUIがありません", diff --git a/tools/go.mod b/tools/go.mod index 079e36dfbe..d450416177 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -257,4 +257,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.90.1 // indirect mellium.im/sasl v0.3.1 // indirect + nhooyr.io/websocket v1.8.10 // indirect ) diff --git a/tools/go.sum b/tools/go.sum index 1a77188a02..7984c18fb3 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -1290,6 +1290,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo= mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=