diff --git a/CHANGELOG.md b/CHANGELOG.md index a49ccce58f..9e71365dfd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,22 @@ with the exception that this project **does not** follow Semantic Versioning. For details about compatibility between different releases, see the **Commitments and Releases** section of our README. +## [Unreleased] + +### Added + +### Changed + +### Deprecated + +### Removed + +### Fixed + +- Resolve scroll jumps when selecting different tabs of a table in the Console. + +### Security + ## [3.28.0] - 2023-10-31 ### Added diff --git a/config/messages.json b/config/messages.json index 9f89f6d34a..ac9f78406f 100644 --- a/config/messages.json +++ b/config/messages.json @@ -3509,6 +3509,42 @@ "file": "shared.go" } }, + "error:pkg/console/internal/events/protocol:message_type": { + "translations": { + "en": "invalid message type `{type}`" + }, + "description": { + "package": "pkg/console/internal/events/protocol", + "file": "protocol.go" + } + }, + "error:pkg/console/internal/events/subscriptions:already_subscribed": { + "translations": { + "en": "already subscribed with ID `{id}`" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, + "error:pkg/console/internal/events/subscriptions:no_identifiers": { + "translations": { + "en": "no identifiers" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, + "error:pkg/console/internal/events/subscriptions:not_subscribed": { + "translations": { + "en": "not subscribed with ID `{id}`" + }, + "description": { + "package": "pkg/console/internal/events/subscriptions", + "file": "subscriptions.go" + } + }, "error:pkg/crypto/cryptoservices:no_app_key": { "translations": { "en": "no AppKey specified" @@ -4382,67 +4418,67 @@ "file": "conversion.go" } }, - "error:pkg/events/grpc:invalid_regexp": { + "error:pkg/events/grpc:no_identifiers": { "translations": { - "en": "invalid regexp" + "en": "no identifiers" }, "description": { "package": "pkg/events/grpc", "file": "grpc.go" } }, - "error:pkg/events/grpc:no_identifiers": { + "error:pkg/events/grpc:storage_disabled": { "translations": { - "en": "no identifiers" + "en": "events storage is not not enabled" }, "description": { "package": "pkg/events/grpc", "file": "grpc.go" } }, - "error:pkg/events/grpc:no_matching_events": { + "error:pkg/events/redis:channel_closed": { "translations": { - "en": "no matching events for regexp `{regexp}`" + "en": "channel closed" }, "description": { - "package": "pkg/events/grpc", - "file": "grpc.go" + "package": "pkg/events/redis", + "file": "redis.go" } }, - "error:pkg/events/grpc:storage_disabled": { + "error:pkg/events/redis:unknown_encoding": { "translations": { - "en": "events storage is not not enabled" + "en": "unknown encoding" }, "description": { - "package": "pkg/events/grpc", - "file": "grpc.go" + "package": "pkg/events/redis", + "file": "codec.go" } }, - "error:pkg/events/grpc:unknown_event_name": { + "error:pkg/events:invalid_regexp": { "translations": { - "en": "unknown event `{name}`" + "en": "invalid regexp" }, "description": { - "package": "pkg/events/grpc", - "file": "grpc.go" + "package": "pkg/events", + "file": "pattern.go" } }, - "error:pkg/events/redis:channel_closed": { + "error:pkg/events:no_matching_events": { "translations": { - "en": "channel closed" + "en": "no matching events for regexp `{regexp}`" }, "description": { - "package": "pkg/events/redis", - "file": "redis.go" + "package": "pkg/events", + "file": "pattern.go" } }, - "error:pkg/events/redis:unknown_encoding": { + "error:pkg/events:unknown_event_name": { "translations": { - "en": "unknown encoding" + "en": "unknown event `{name}`" }, "description": { - "package": "pkg/events/redis", - "file": "codec.go" + "package": "pkg/events", + "file": "pattern.go" } }, "error:pkg/fetch:fetch_file": { diff --git a/cypress/e2e/console/admin-panel/packet-broker/networks.spec.js b/cypress/e2e/console/admin-panel/packet-broker/networks.spec.js index db702ac160..e3a83f2526 100644 --- a/cypress/e2e/console/admin-panel/packet-broker/networks.spec.js +++ b/cypress/e2e/console/admin-panel/packet-broker/networks.spec.js @@ -45,7 +45,12 @@ describe('Packet Broker networks', () => { cy.intercept('/api/v3/pba/home-networks/policies*', { fixture: 'console/packet-broker/policies-home-network.json', }) - cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker/networks`) + cy.visit( + `${Cypress.config( + 'consoleRootPath', + )}/admin-panel/packet-broker/routing-configuration/networks`, + ) + cy.findByLabelText('Use custom routing policies').check() const { networks } = this.networks const networksFiltered = networks.filter( @@ -73,7 +78,11 @@ describe('Packet Broker networks', () => { n => n.forwarder_id.net_id === 19 && n.forwarder_id.tenant_id === 'johan', ) - cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker/networks/19/johan`) + cy.visit( + `${Cypress.config( + 'consoleRootPath', + )}/admin-panel/packet-broker/routing-configuration/networks/19/johan`, + ) cy.findAllByText(`${network.id.net_id.toString(16).padStart(6, '0')}/${network.id.tenant_id}`) cy.findByText( diff --git a/cypress/e2e/console/admin-panel/packet-broker/registration.spec.js b/cypress/e2e/console/admin-panel/packet-broker/registration.spec.js index 5f4afc5366..4bfd68d939 100644 --- a/cypress/e2e/console/admin-panel/packet-broker/registration.spec.js +++ b/cypress/e2e/console/admin-panel/packet-broker/registration.spec.js @@ -61,15 +61,15 @@ describe('Packet Broker registration', () => { cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker`) cy.findByText('Packet Broker', { selector: 'h1' }).should('be.visible') - cy.findByText(/Packet Broker can be used to exchange traffic/).should('be.visible') - cy.findByText('Packet Broker documentation', { selector: 'a' }).should('be.visible') + cy.findByText(/Packet Broker is a service by The Things Industries/).should('be.visible') + cy.findByText('Packet Broker', { selector: 'a' }).should('be.visible') cy.findByText('Packet Broker website', { selector: 'a' }).should('be.visible') - cy.findByText('Register network', { selector: 'span' }).should('be.visible') + cy.findByText('Enable Packet Broker', { selector: 'span' }).should('be.visible') cy.findByTestId('switch') .should('be.visible') .and('not.be.checked') .and('not.have.attr', 'disabled') - cy.findByText(/To enable peering/).should('be.visible') + cy.findByText(/Enabling will allow/).should('be.visible') cy.findByText('Default routing policy').should('not.exist') cy.findByText('Networks').should('not.exist') @@ -89,18 +89,20 @@ describe('Packet Broker registration', () => { cy.loginConsole({ user_id: 'admin', password: 'admin' }) cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker`) - cy.findByText('Register network').click() - cy.findByText('Register network').next().findByTestId('switch').should('be.checked') - cy.findByText('List network publicly') + cy.findByText('Enable Packet Broker').click() + cy.findByText('Enable Packet Broker').next().findByTestId('switch').should('be.checked') + cy.findByText('List my network in Packet Broker publicly') .should('be.visible') .next() .findByTestId('switch') .should('be.checked') - cy.findByTestId('feature-info-forwarder-enabled').should('be.visible') - cy.findByTestId('feature-info-home-network-enabled').should('be.visible') - cy.findByTestId('tabs').findByText('Default routing policy').should('be.visible') - cy.findByTestId('tabs').findByText('Networks').should('be.visible') - cy.findByLabelText('Do not use a default routing policy for this network').should('be.checked') + cy.findByLabelText('Forward traffic to all networks registered in Packet Broker').should( + 'exist', + ) + cy.findByLabelText( + 'Forward traffic to The Things Stack Sandbox (community network) only', + ).should('exist') + cy.findByLabelText('Use custom routing policies').should('exist') cy.findByTestId('error-notification').should('not.exist') }) diff --git a/cypress/e2e/console/admin-panel/packet-broker/routing-policies.spec.js b/cypress/e2e/console/admin-panel/packet-broker/routing-policies.spec.js index 9b9f519f12..be0fee18a5 100644 --- a/cypress/e2e/console/admin-panel/packet-broker/routing-policies.spec.js +++ b/cypress/e2e/console/admin-panel/packet-broker/routing-policies.spec.js @@ -32,12 +32,63 @@ describe('Packet Broker routing policies', () => { cy.loginConsole({ user_id: 'admin', password: 'admin' }) }) - it('succeeds setting a default routing policy', () => { + it('succeeds setting a "traffic to all networks" routing configuration', () => { cy.intercept('GET', '/api/v3/pba/home-networks/policies/default', { statusCode: 404 }) cy.intercept('PUT', '/api/v3/pba/home-networks/policies/default', {}) + cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/19', {}) + cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/19/johan', {}) + cy.intercept('/api/v3/pba/networks*', { fixture: 'console/packet-broker/networks.json' }) + cy.intercept('/api/v3/pba/home-networks/policies*', { + fixture: 'console/packet-broker/policies-home-network.json', + }) cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker`) - cy.findByLabelText('Use default routing policy for this network').check() + cy.findByLabelText('Forward traffic to all networks registered in Packet Broker').check() + cy.findByRole('button', { name: 'Save routing configuration' }).click() + + cy.findByTestId('error-notification').should('not.exist') + cy.findByTestId('toast-notification') + .should('be.visible') + .findByText('Default routing configuration set') + .should('be.visible') + }) + + it('succeeds setting a "only ttn" routing configuration', () => { + cy.intercept('GET', '/api/v3/pba/home-networks/policies/default', { statusCode: 404 }) + cy.intercept('/api/v3/pba/networks*', { fixture: 'console/packet-broker/networks.json' }) + cy.intercept('/api/v3/pba/home-networks/policies*', { + fixture: 'console/packet-broker/policies-home-network.json', + }) + cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/default', {}) + cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/19', {}) + cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/19/johan', {}) + cy.intercept('PUT', '/api/v3/pba/home-networks/policies/19/ttn', {}) + cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker`) + + cy.findByLabelText( + 'Forward traffic to The Things Stack Sandbox (community network) only', + ).check() + cy.findByRole('button', { name: 'Save routing configuration' }).click() + + cy.findByTestId('error-notification').should('not.exist') + cy.findByTestId('toast-notification') + .should('be.visible') + .findByText('Default routing configuration set') + .should('be.visible') + }) + + it('succeeds setting a custom routing configuration with a default routing policy', () => { + cy.intercept('GET', '/api/v3/pba/home-networks/policies/default', { + fixture: 'console/packet-broker/default-policy.json', + }) + cy.intercept('PUT', '/api/v3/pba/home-networks/policies/default', {}) + cy.intercept('/api/v3/pba/networks*', { fixture: 'console/packet-broker/networks.json' }) + cy.intercept('/api/v3/pba/home-networks/policies*', { + fixture: 'console/packet-broker/policies-home-network.json', + }) + cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker`) + + cy.findByLabelText('Use custom routing policies').check() // Check routing policy form checkboxes. cy.findByText('Uplink') @@ -56,30 +107,35 @@ describe('Packet Broker routing policies', () => { cy.findByLabelText('MAC data').check() cy.findByLabelText('Application data').check() }) - cy.findByRole('button', { name: 'Save default policy' }).click() + cy.findByRole('button', { name: 'Save routing configuration' }).click() cy.findByTestId('error-notification').should('not.exist') cy.findByTestId('toast-notification') .should('be.visible') - .findByText('Default routing policy set') + .findByText('Default routing configuration set') .should('be.visible') }) it('succeeds unsetting a default routing policy', () => { + cy.intercept('PUT', '/api/v3/pba/home-networks/policies/default', {}) + cy.intercept('/api/v3/pba/networks*', { fixture: 'console/packet-broker/networks.json' }) + cy.intercept('/api/v3/pba/home-networks/policies*', { + fixture: 'console/packet-broker/policies-home-network.json', + }) cy.intercept('GET', '/api/v3/pba/home-networks/policies/default', { - fixture: 'console/packet-broker/default-policy.json', + fixture: 'console/packet-broker/default-custom-policy.json', }) cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/default', {}) cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker`) cy.findByLabelText('Do not use a default routing policy for this network').check() - cy.findByRole('button', { name: 'Save default policy' }).click() + cy.findByRole('button', { name: 'Save routing configuration' }).click() cy.findByTestId('error-notification').should('not.exist') cy.findByTestId('toast-notification') .should('be.visible') - .findByText('Default routing policy set') + .findByText('Default routing configuration set') .should('be.visible') }) @@ -93,7 +149,11 @@ describe('Packet Broker routing policies', () => { }) cy.intercept('PUT', '/api/v3/pba/home-networks/policies/19', {}) - cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker/networks/19`) + cy.visit( + `${Cypress.config( + 'consoleRootPath', + )}/admin-panel/packet-broker/routing-configuration/networks/19`, + ) // Check routing policy form checkboxes. cy.findByLabelText('Use network specific routing policy').check() @@ -138,7 +198,11 @@ describe('Packet Broker routing policies', () => { }) cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/19', {}) - cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker/networks/19`) + cy.visit( + `${Cypress.config( + 'consoleRootPath', + )}/admin-panel/packet-broker/routing-configuration/networks/19`, + ) cy.findByLabelText('Do not use a routing policy for this network').check() cy.findByRole('button', { name: 'Save routing policy' }).click() @@ -161,7 +225,11 @@ describe('Packet Broker routing policies', () => { }) cy.intercept('DELETE', '/api/v3/pba/home-networks/policies/19', {}) - cy.visit(`${Cypress.config('consoleRootPath')}/admin-panel/packet-broker/networks/19`) + cy.visit( + `${Cypress.config( + 'consoleRootPath', + )}/admin-panel/packet-broker/routing-configuration/networks/19`, + ) cy.findByLabelText('Use default routing policy for this network').check() cy.findByRole('button', { name: 'Save routing policy' }).click() diff --git a/cypress/fixtures/console/packet-broker/default-custom-policy.json b/cypress/fixtures/console/packet-broker/default-custom-policy.json new file mode 100644 index 0000000000..edd8728179 --- /dev/null +++ b/cypress/fixtures/console/packet-broker/default-custom-policy.json @@ -0,0 +1,11 @@ +{ + "updated_at": "2021-06-21T12:09:26.810087Z", + "uplink": { + "join_request": true, + "mac_data": false, + "application_data": true, + "signal_quality": false, + "localization": false + }, + "downlink": { "join_accept": true, "mac_data": false, "application_data": true } +} diff --git a/go.mod b/go.mod index 20ab02d6a5..5ad3ab087a 100644 --- a/go.mod +++ b/go.mod @@ -106,11 +106,12 @@ require ( google.golang.org/genproto v0.0.0-20230911183012-2d3300fd4832 google.golang.org/genproto/googleapis/api v0.0.0-20230911183012-2d3300fd4832 google.golang.org/genproto/googleapis/rpc v0.0.0-20230911183012-2d3300fd4832 - google.golang.org/grpc v1.58.2 + google.golang.org/grpc v1.58.3 google.golang.org/protobuf v1.31.0 gopkg.in/mail.v2 v2.3.1 gopkg.in/square/go-jose.v2 v2.6.0 gopkg.in/yaml.v2 v2.4.0 + nhooyr.io/websocket v1.8.10 ) require ( diff --git a/go.sum b/go.sum index fde42d7502..259f3ae6ca 100644 --- a/go.sum +++ b/go.sum @@ -1194,8 +1194,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= -google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= -google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= +google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/grpc/examples v0.0.0-20210424002626-9572fd6faeae/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -1254,6 +1254,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo= mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/auth/rights/auth_info.go b/pkg/auth/rights/auth_info.go index f6b59c71ab..1a7158ffd3 100644 --- a/pkg/auth/rights/auth_info.go +++ b/pkg/auth/rights/auth_info.go @@ -51,16 +51,14 @@ func AuthInfo(ctx context.Context) (authInfo *ttnpb.AuthInfoResponse, err error) var errUnauthenticated = errors.DefineUnauthenticated("unauthenticated", "unauthenticated") -// RequireAuthentication confirms if the authentication information within a context contains any rights, if so, +// RequireAuthenticated confirms if the authentication information within a context contains any rights, if so, // the request is considered to be authenticated. -func RequireAuthentication(ctx context.Context) error { - log.FromContext(ctx).Debug("Authenticate request") +func RequireAuthenticated(ctx context.Context) error { authInfo, err := AuthInfo(ctx) if err != nil { log.FromContext(ctx).WithError(err).Debug("Failed to validate authentication information") return errUnauthenticated.WithCause(err) } - if authInfo.GetAccessMethod() == nil && len(authInfo.GetUniversalRights().GetRights()) == 0 { return errUnauthenticated.New() } diff --git a/pkg/console/console.go b/pkg/console/console.go index d012123d35..d1d26e654e 100644 --- a/pkg/console/console.go +++ b/pkg/console/console.go @@ -23,6 +23,7 @@ import ( "github.com/gorilla/csrf" "github.com/gorilla/mux" "go.thethings.network/lorawan-stack/v3/pkg/component" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events" "go.thethings.network/lorawan-stack/v3/pkg/web" "go.thethings.network/lorawan-stack/v3/pkg/web/oauthclient" "go.thethings.network/lorawan-stack/v3/pkg/webhandlers" @@ -58,6 +59,7 @@ func New(c *component.Component, config Config) (*Console, error) { } c.RegisterWeb(console) + c.RegisterWeb(events.New(c)) return console, nil } diff --git a/pkg/console/internal/events/events.go b/pkg/console/internal/events/events.go new file mode 100644 index 0000000000..caae64a0a4 --- /dev/null +++ b/pkg/console/internal/events/events.go @@ -0,0 +1,135 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package events contains the internal events APi for the Console. +package events + +import ( + "context" + "net/http" + "sync" + + "github.com/gorilla/mux" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/config" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/ratelimit" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/web" + "go.thethings.network/lorawan-stack/v3/pkg/webhandlers" + "go.thethings.network/lorawan-stack/v3/pkg/webmiddleware" + "nhooyr.io/websocket" +) + +// Component is the interface of the component to the events API handler. +type Component interface { + task.Starter + Context() context.Context + RateLimiter() ratelimit.Interface + GetBaseConfig(context.Context) config.ServiceBase +} + +type eventsHandler struct { + component Component + subscriber events.Subscriber + definedNames map[string]struct{} +} + +var _ web.Registerer = (*eventsHandler)(nil) + +func (h *eventsHandler) RegisterRoutes(server *web.Server) { + router := server.APIRouter().PathPrefix(ttnpb.HTTPAPIPrefix + "/console/internal/events/").Subrouter() + router.Use( + mux.MiddlewareFunc(webmiddleware.Namespace("console/internal/events")), + ratelimit.HTTPMiddleware(h.component.RateLimiter(), "http:console:internal:events"), + mux.MiddlewareFunc(webmiddleware.Metadata("Authorization")), + ) + router.Path("/").HandlerFunc(h.handleEvents).Methods(http.MethodGet) +} + +func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + logger := log.FromContext(ctx) + + if err := rights.RequireAuthenticated(ctx); err != nil { + webhandlers.Error(w, r, err) + return + } + + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, // CORS is not enabled for APIs. + CompressionMode: websocket.CompressionContextTakeover, + }) + if err != nil { + logger.WithError(err).Debug("Failed to accept WebSocket") + return + } + defer conn.Close(websocket.StatusNormalClosure, "main task closed") + + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + var wg sync.WaitGroup + defer wg.Wait() + + m := eventsmux.New(func(ctx context.Context, cancel func(error)) subscriptions.Interface { + return subscriptions.New(ctx, cancel, h.subscriber, h.definedNames, h.component) + }) + for name, f := range map[string]func(context.Context) error{ + "console_events_mux": makeMuxTask(m, cancel), + "console_events_read": makeReadTask(conn, m, cancel), + "console_events_write": makeWriteTask(conn, m, cancel), + } { + wg.Add(1) + h.component.StartTask(&task.Config{ + Context: ctx, + ID: name, + Func: f, + Done: wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + } +} + +// Option configures the events API handler. +type Option func(*eventsHandler) + +// WithSubscriber configures the Subscriber to use for events. +func WithSubscriber(subscriber events.Subscriber) Option { + return func(h *eventsHandler) { + h.subscriber = subscriber + } +} + +// New returns an events API handler for the Console. +func New(c Component, opts ...Option) web.Registerer { + definedNames := make(map[string]struct{}) + for _, def := range events.All().Definitions() { + definedNames[def.Name()] = struct{}{} + } + h := &eventsHandler{ + component: c, + subscriber: events.DefaultPubSub(), + definedNames: definedNames, + } + for _, opt := range opts { + opt(h) + } + return h +} diff --git a/pkg/console/internal/events/eventsmux/mux.go b/pkg/console/internal/events/eventsmux/mux.go new file mode 100644 index 0000000000..e0874f9c51 --- /dev/null +++ b/pkg/console/internal/events/eventsmux/mux.go @@ -0,0 +1,106 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package eventsmux implements the events mux. +package eventsmux + +import ( + "context" + + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" +) + +// Interface is the interface for the events mux. +type Interface interface { + // Requests returns the channel for requests. + Requests() chan<- protocol.Request + // Responses returns the channel for responses. + Responses() <-chan protocol.Response + + // Run runs the events mux. + Run(context.Context) error +} + +type mux struct { + createSubs func(context.Context, func(error)) subscriptions.Interface + + requestCh chan protocol.Request + responseCh chan protocol.Response +} + +// Requests implements Interface. +func (m *mux) Requests() chan<- protocol.Request { + return m.requestCh +} + +// Responses implements Interface. +func (m *mux) Responses() <-chan protocol.Response { + return m.responseCh +} + +// Run implements Interface. +func (m *mux) Run(ctx context.Context) (err error) { + ctx, cancel := context.WithCancelCause(ctx) + defer func() { cancel(err) }() + subs := m.createSubs(ctx, cancel) + defer subs.Close() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-m.requestCh: + var resp protocol.Response + switch req := req.(type) { + case *protocol.SubscribeRequest: + resp = req.Response(subs.Subscribe(req.ID, req.Identifiers, req.After, req.Tail, req.Names)) + case *protocol.UnsubscribeRequest: + resp = req.Response(subs.Unsubscribe(req.ID)) + default: + panic("unreachable") + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.responseCh <- resp: + } + case subEvt := <-subs.SubscriptionEvents(): + evtPB, err := events.Proto(subEvt.Event) + if err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to convert event to proto") + continue + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.responseCh <- &protocol.PublishResponse{ + ID: subEvt.ID, + Event: evtPB, + }: + } + } + } +} + +// New returns a new Interface. +func New(createSubs func(context.Context, func(error)) subscriptions.Interface) Interface { + return &mux{ + createSubs: createSubs, + + requestCh: make(chan protocol.Request, 1), + responseCh: make(chan protocol.Response, 1), + } +} diff --git a/pkg/console/internal/events/eventsmux/mux_test.go b/pkg/console/internal/events/eventsmux/mux_test.go new file mode 100644 index 0000000000..de220f52fa --- /dev/null +++ b/pkg/console/internal/events/eventsmux/mux_test.go @@ -0,0 +1,315 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventsmux_test + +import ( + "context" + "errors" + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type subscribeRequest struct { + ID uint64 + Identifiers []*ttnpb.EntityIdentifiers + After *time.Time + Tail uint32 + Names []string + + Response chan<- error +} + +type unsubscribeRequest struct { + ID uint64 + + Response chan<- error +} + +type mockSubscriptions struct { + ctx context.Context + subReqs chan subscribeRequest + unsubReqs chan unsubscribeRequest + evsCh chan *subscriptions.SubscriptionEvent +} + +// Subscribe implements subscriptions.Interface. +func (m *mockSubscriptions) Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, +) error { + ch := make(chan error, 1) + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case m.subReqs <- subscribeRequest{ + ID: id, + Identifiers: identifiers, + After: after, + Tail: tail, + Names: names, + + Response: ch, + }: + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case err := <-ch: + return err + } + } +} + +// Unsubscribe implements subscriptions.Interface. +func (m *mockSubscriptions) Unsubscribe(id uint64) error { + ch := make(chan error, 1) + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case m.unsubReqs <- unsubscribeRequest{ + ID: id, + + Response: ch, + }: + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case err := <-ch: + return err + } + } +} + +// SubscriptionEvents implements subscriptions.Interface. +func (m *mockSubscriptions) SubscriptionEvents() <-chan *subscriptions.SubscriptionEvent { + return m.evsCh +} + +// Close implements subscriptions.Interface. +func (*mockSubscriptions) Close() error { return nil } + +var _ subscriptions.Interface = (*mockSubscriptions)(nil) + +func TestMux(t *testing.T) { // nolint:gocyclo + t.Parallel() + + a, ctx := test.New(t) + + appIDs := &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + } + ctx = rights.NewContext(ctx, &rights.Rights{ + ApplicationRights: *rights.NewMap(map[string]*ttnpb.Rights{ + unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL), + }), + }) + + subs := &mockSubscriptions{ + ctx: ctx, + subReqs: make(chan subscribeRequest, 1), + unsubReqs: make(chan unsubscribeRequest, 1), + evsCh: make(chan *subscriptions.SubscriptionEvent, 1), + } + m := eventsmux.New(func(context.Context, func(error)) subscriptions.Interface { return subs }) + + go m.Run(ctx) // nolint:errcheck + + now := time.Now() + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.SubscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.subReqs: + a.So(req, should.Resemble, subscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.SubscribeResponse{ + ID: 42, + }) + } + + errAlreadySubscribed := errors.New("already subscribed") + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.SubscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.subReqs: + a.So(req, should.Resemble, subscribeRequest{ + ID: 42, + Identifiers: []*ttnpb.EntityIdentifiers{ + appIDs.GetEntityIdentifiers(), + }, + After: &now, + Tail: 1, + Names: []string{"foo"}, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- errAlreadySubscribed: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.ErrorResponse{ + ID: 42, + Error: status.New(codes.Unknown, "already subscribed"), + }) + } + + ev := events.New( + ctx, + "test.evt", + "test event", + events.WithIdentifiers(appIDs), + ) + select { + case <-ctx.Done(): + return + case subs.evsCh <- &subscriptions.SubscriptionEvent{ + ID: 42, + Event: ev, + }: + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.PublishResponse{ + ID: 42, + Event: test.Must(events.Proto(ev)), + }) + } + + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.UnsubscribeRequest{ + ID: 42, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.unsubReqs: + a.So(req, should.Resemble, unsubscribeRequest{ + ID: 42, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.UnsubscribeResponse{ + ID: 42, + }) + } + + errNotSubscribed := errors.New("not subscribed") + select { + case <-ctx.Done(): + return + case m.Requests() <- &protocol.UnsubscribeRequest{ + ID: 42, + }: + } + select { + case <-ctx.Done(): + return + case req := <-subs.unsubReqs: + a.So(req, should.Resemble, unsubscribeRequest{ + ID: 42, + + Response: req.Response, + }) + select { + case <-ctx.Done(): + return + case req.Response <- errNotSubscribed: + } + } + select { + case <-ctx.Done(): + return + case resp := <-m.Responses(): + a.So(resp, should.Resemble, &protocol.ErrorResponse{ + ID: 42, + Error: status.New(codes.Unknown, "not subscribed"), + }) + } +} diff --git a/pkg/console/internal/events/protocol/PROTOCOL.md b/pkg/console/internal/events/protocol/PROTOCOL.md new file mode 100644 index 0000000000..168dc5980c --- /dev/null +++ b/pkg/console/internal/events/protocol/PROTOCOL.md @@ -0,0 +1,191 @@ +### Internal Events API + +The Console internal events API is designed as an alternative to the `Events.Stream` gRPC API for event stream interactions. It allows multiple subscriptions to be multiplexed over a singular [WebSocket](https://en.wikipedia.org/wiki/WebSocket) connection. + +### Reasoning + +The `Events.Stream` gRPC API is available to HTTP clients via [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway). While translated to HTTP, it is visible as a long-polling request whose response body will contain the events as a series of JSON objects. + +This approach is efficient in the context of [HTTP/2](https://en.wikipedia.org/wiki/HTTP/2) which supports multiplexing multiple requests over a singular TCP connection. + +Unfortunately the connection between a browser and The Things Stack is susceptible to proxies. Corporate environments are generally equipped with such proxies, and in their presence the connections are downgraded to HTTP/1.1 semantics. + +In HTTP/1.1 connections can be used for a singular request at a time - it is not possible to multiplex the requests over a singular connection, and only [keep-alive](https://en.wikipedia.org/wiki/HTTP_persistent_connection) connections are available. + +This is problematic as browsers have builtin limits for the number of concurrent connections that singular windows may use. This leads to hard to debug issues which are hardly reproducible. + +But, there is one silver lining - the connection limit _does not apply to WebSocket connections_. The internal events API is designed to deal with this limitation while providing an experience similar to the original `Events.Stream` gRPC API. + +### Endpoint + +The endpoint for the internal events API is `/api/v3/console/internal/events/`. Note that the trailing slash is not optional. + +### Semantics + +The protocol is [full-duplex](https://en.wikipedia.org/wiki/Duplex_(telecommunications)#Full_duplex) - the client side and server side may transmit messages at any time without waiting for a response from the other party. + +The protocol is centered around subscriptions. Subscriptions are identified by an unsigned numerical ID, which is selected by the client. + +A subscription is initiated by the client via a subscription request, which the server confirms either with a subscription response or an error response. + +Following a successful subscription, the server may send at any time publication responses containing the subscription identifier and an event. The subscription identifier can be used on the client side in order to route the event to the appropriate component or view. + +A subscription can be terminated via an unsubscribe request, which the server confirms either with an unsubscribe response or an error response. + +The client can expect that no publication responses will follow an unsubscribe response, but it is recommended that subscription identifiers are not recycled within the same session. + +Error responses can be expected when the request contents are invalid (lack of identifiers, or invalid identifiers), or the caller is not authorized to subscribe to the provided identifiers. It is also invalid to request a subscription with the same identifier as an existing subscription, or to unsubscribe using an identifier which is not subscribed. + +Error response are provided as a debugging facility, and the errors are generally not fixable by the Console user. + +A special case exists for situations in which the caller is no longer authorized to receive any events associated with the provided identifiers _after_ the subscription response has been sent. This can happen if the caller token has expired or the rights have been revoked while the stream is ongoing. In such situations the server will terminate the connection explicitly. + +### Authentication and Authorization + +The authentication for the internal API is similar to other APIs available in The Things Stack. Given a `Bearer` token `t`, the `Authorization` header should contain the value `Bearer t`. + +Upon connecting, no authorization will take place - the endpoint only will check that the provided token is valid (i.e. exists and it is not expired). + +### Message Format + +Both requests and responses sent over the WebSocket connection are JSON encoded. All messages are JSON objects and are required to contain at least the following two fields: + +- `type`: a string whose value must be either `subscribe`, `unsubscribe`, `publish` or `error`. +- `id`: an unsigned integer which identifies the underlying subscription being served. + +Each of the following subsections describes an individual message and the message direction (client to server or server to client). + +#### `SubscribeRequest` [C -> S] + +- `type`: `subscribe` +- `id`: the subscription identifier +- `identifiers`, `tail`, `after`, `names`: semantically the same fields as those of the `StreamEventsRequest` Protobuf message. + +Example: + +```json +{ + "type": "subscribe", + "id": 1, + "tail": 10, + "identifiers": [ + { + "application_ids": { + "application_id": "app1" + } + } + ] +} +``` + +#### `SubscribeResponse` [S -> C] + +- `type`: `subscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "subscribe", + "id": 1 +} +``` + +#### `UnsubscribeRequest` [C -> S] + +- `type`: `unsubscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "unsubscribe", + "id": 1 +} +``` + +#### `UnsubscribeResponse` [S -> C] + +- `type`: `unsubscribe` +- `id`: the subscription identifier + +Example: + +```json +{ + "type": "unsubscribe", + "id": 1 +} +``` + +#### `PublishResponse` [S -> C] + +- `type`: `publish` +- `id`: the subscription identifier +- `event`: an `Event` Protobuf message encoded as a JSON object + +Example: + +```json +{ + "type": "publish", + "id": 1, + "event": { + "name": "as.up.data.forward", + "time": "2023-10-26T16:27:14.103854Z", + "identifiers": [ + { + "device_ids": { + "device_id": "eui-0000000000000003", + "application_ids": { + "application_id": "app1" + } + } + } + ], + "context": { + "tenant-id": "Cgl0aGV0aGluZ3M=" + }, + "visibility": { + "rights": [ + "RIGHT_APPLICATION_TRAFFIC_READ" + ] + }, + "unique_id": "01HDPCZDSQ358JMHD4SC2BQAB8" + } +} +``` + +#### ErrorResponse [S -> C] + +- `type`: `error` +- `id`: the subscription identifier +- `error`: a `Status` Protobuf message encoded as a JSON object + +Example: + +```json +{ + "type": "error", + "id": 1, + "error": { + "code": 6, + "message": "error:pkg/console/internal/events/subscriptions:already_subscribed (already subscribed with ID `1`)", + "details": [ + { + "@type": "type.googleapis.com/ttn.lorawan.v3.ErrorDetails", + "namespace": "pkg/console/internal/events/subscriptions", + "name": "already_subscribed", + "message_format": "already subscribed with ID `{id}`", + "attributes": { + "id": "1" + }, + "correlation_id": "5da004b9f61f479aafe5bbcae4551e63", + "code": 6 + } + ] + } +} +``` diff --git a/pkg/console/internal/events/protocol/protocol.go b/pkg/console/internal/events/protocol/protocol.go new file mode 100644 index 0000000000..3348a8dc82 --- /dev/null +++ b/pkg/console/internal/events/protocol/protocol.go @@ -0,0 +1,331 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package protocol implements the protocol for the events package. +package protocol + +import ( + "encoding/json" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/jsonpb" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + statuspb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/status" +) + +var ( + errMessageType = errors.DefineInvalidArgument("message_type", "invalid message type `{type}`") + + _ json.Marshaler = (*ttnpb.EntityIdentifiers)(nil) + _ json.Unmarshaler = (*ttnpb.EntityIdentifiers)(nil) + + _ json.Marshaler = (*ttnpb.Event)(nil) + _ json.Unmarshaler = (*ttnpb.Event)(nil) +) + +// MessageType is the type of a message. +type MessageType int + +const ( + // MessageTypeSubscribe is the type of a subscribe message. + MessageTypeSubscribe MessageType = iota + // MessageTypeUnsubscribe is the type of an unsubscribe message. + MessageTypeUnsubscribe + // MessageTypePublish is the type of a publish message. + MessageTypePublish + // MessageTypeError is the type of an error message. + MessageTypeError +) + +// MarshalJSON implements json.Marshaler. +func (m MessageType) MarshalJSON() ([]byte, error) { + switch m { + case MessageTypeSubscribe: + return []byte(`"subscribe"`), nil + case MessageTypeUnsubscribe: + return []byte(`"unsubscribe"`), nil + case MessageTypePublish: + return []byte(`"publish"`), nil + case MessageTypeError: + return []byte(`"error"`), nil + default: + return nil, errMessageType.WithAttributes("type", m) + } +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *MessageType) UnmarshalJSON(data []byte) error { + switch string(data) { + case `"subscribe"`: + *m = MessageTypeSubscribe + case `"unsubscribe"`: + *m = MessageTypeUnsubscribe + case `"publish"`: + *m = MessageTypePublish + case `"error"`: + *m = MessageTypeError + default: + return errMessageType.WithAttributes("type", string(data)) + } + return nil +} + +// Request is a request message. +type Request interface { + _requestMessage() +} + +// Response is a response message. +type Response interface { + _responseMessage() +} + +// SubscribeRequest is the request to subscribe to events. +type SubscribeRequest struct { + ID uint64 `json:"id"` + Identifiers []*ttnpb.EntityIdentifiers `json:"identifiers"` + Tail uint32 `json:"tail"` + After *time.Time `json:"after"` + Names []string `json:"names"` +} + +func (SubscribeRequest) _requestMessage() {} + +// Response builds a response to the request. +func (m SubscribeRequest) Response(err error) Response { + if err != nil { + return newErrorResponse(m.ID, err) + } + return &SubscribeResponse{ + ID: m.ID, + } +} + +// MarshalJSON implements json.Marshaler. +func (m SubscribeRequest) MarshalJSON() ([]byte, error) { + type alias SubscribeRequest + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeSubscribe, + alias: alias(m), + }) +} + +// SubscribeResponse is the response to a subscribe request. +type SubscribeResponse struct { + ID uint64 `json:"id"` +} + +func (SubscribeResponse) _responseMessage() {} + +// MarshalJSON implements json.Marshaler. +func (m SubscribeResponse) MarshalJSON() ([]byte, error) { + type alias SubscribeResponse + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeSubscribe, + alias: alias(m), + }) +} + +// UnsubscribeRequest is the request to unsubscribe from events. +type UnsubscribeRequest struct { + ID uint64 `json:"id"` +} + +func (UnsubscribeRequest) _requestMessage() {} + +// MarshalJSON implements json.Marshaler. +func (m UnsubscribeRequest) MarshalJSON() ([]byte, error) { + type alias UnsubscribeRequest + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeUnsubscribe, + alias: alias(m), + }) +} + +// UnsubscribeResponse is the response to an unsubscribe request. +type UnsubscribeResponse struct { + ID uint64 `json:"id"` +} + +func (UnsubscribeResponse) _responseMessage() {} + +// Response builds a response to the request. +func (m UnsubscribeRequest) Response(err error) Response { + if err != nil { + return newErrorResponse(m.ID, err) + } + return &UnsubscribeResponse{ + ID: m.ID, + } +} + +// MarshalJSON implements json.Marshaler. +func (m UnsubscribeResponse) MarshalJSON() ([]byte, error) { + type alias UnsubscribeResponse + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypeUnsubscribe, + alias: alias(m), + }) +} + +// PublishResponse is the request to publish an event. +type PublishResponse struct { + ID uint64 `json:"id"` + Event *ttnpb.Event `json:"event"` +} + +func (PublishResponse) _responseMessage() {} + +// MarshalJSON implements json.Marshaler. +func (m PublishResponse) MarshalJSON() ([]byte, error) { + type alias PublishResponse + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + alias + }{ + Type: MessageTypePublish, + alias: alias(m), + }) +} + +// ErrorResponse is the response to an error. +type ErrorResponse struct { + ID uint64 + Error *status.Status +} + +func (ErrorResponse) _responseMessage() {} + +// statusAlias is an alias of status.Status which supports JSON marshaling. +type statusAlias statuspb.Status + +// MarshalJSON implements json.Marshaler. +func (s *statusAlias) MarshalJSON() ([]byte, error) { + return jsonpb.TTN().Marshal((*statuspb.Status)(s)) +} + +// UnmarshalJSON implements json.Unmarshaler. +func (s *statusAlias) UnmarshalJSON(data []byte) error { + return jsonpb.TTN().Unmarshal(data, (*statuspb.Status)(s)) +} + +// MarshalJSON implements json.Marshaler. +func (m ErrorResponse) MarshalJSON() ([]byte, error) { + return jsonpb.TTN().Marshal(struct { + Type MessageType `json:"type"` + ID uint64 `json:"id"` + Error *statusAlias `json:"error"` + }{ + Type: MessageTypeError, + ID: m.ID, + Error: (*statusAlias)(m.Error.Proto()), + }) +} + +func newErrorResponse(id uint64, err error) Response { + return &ErrorResponse{ + ID: id, + Error: status.Convert(err), + } +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *ErrorResponse) UnmarshalJSON(data []byte) error { + var alias struct { + ID uint64 `json:"id"` + Error *statusAlias `json:"error"` + } + if err := jsonpb.TTN().Unmarshal(data, &alias); err != nil { + return err + } + m.ID = alias.ID + m.Error = status.FromProto((*statuspb.Status)(alias.Error)) + return nil +} + +// RequestWrapper wraps a request to be sent over the websocket. +type RequestWrapper struct { + Contents Request +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *RequestWrapper) UnmarshalJSON(data []byte) error { + var contents struct { + Type MessageType `json:"type"` + } + if err := jsonpb.TTN().Unmarshal(data, &contents); err != nil { + return err + } + switch contents.Type { + case MessageTypeSubscribe: + m.Contents = &SubscribeRequest{} + case MessageTypeUnsubscribe: + m.Contents = &UnsubscribeRequest{} + default: + return errMessageType.WithAttributes("type", contents.Type) + } + return jsonpb.TTN().Unmarshal(data, m.Contents) +} + +// MarshalJSON implements json.Marshaler. +func (m RequestWrapper) MarshalJSON() ([]byte, error) { + return json.Marshal(m.Contents) +} + +// ResponseWrapper wraps a response to be sent over the websocket. +type ResponseWrapper struct { + Contents Response +} + +// UnmarshalJSON implements json.Unmarshaler. +func (m *ResponseWrapper) UnmarshalJSON(data []byte) error { + var contents struct { + Type MessageType `json:"type"` + } + if err := jsonpb.TTN().Unmarshal(data, &contents); err != nil { + return err + } + switch contents.Type { + case MessageTypeSubscribe: + m.Contents = &SubscribeResponse{} + case MessageTypeUnsubscribe: + m.Contents = &UnsubscribeResponse{} + case MessageTypePublish: + m.Contents = &PublishResponse{} + case MessageTypeError: + m.Contents = &ErrorResponse{} + default: + return errMessageType.WithAttributes("type", contents.Type) + } + return jsonpb.TTN().Unmarshal(data, m.Contents) +} + +// MarshalJSON implements json.Marshaler. +func (m ResponseWrapper) MarshalJSON() ([]byte, error) { + return json.Marshal(m.Contents) +} diff --git a/pkg/console/internal/events/protocol/protocol_test.go b/pkg/console/internal/events/protocol/protocol_test.go new file mode 100644 index 0000000000..bec85bc950 --- /dev/null +++ b/pkg/console/internal/events/protocol/protocol_test.go @@ -0,0 +1,220 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package protocol_test + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/smarty/assertions" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestMarshal(t *testing.T) { + t.Parallel() + + a := assertions.New(t) + + b, err := json.Marshal(protocol.MessageTypePublish) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`"publish"`)) + } + var tp protocol.MessageType + err = json.Unmarshal([]byte(`"publish"`), &tp) + if a.So(err, should.BeNil) { + a.So(tp, should.Equal, protocol.MessageTypePublish) + } + + b, err = json.Marshal(&protocol.SubscribeRequest{ + ID: 0x42, + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + (&ttnpb.ClientIdentifiers{ClientId: "bar"}).GetEntityIdentifiers(), + }, + Tail: 10, + After: timePtr(time.UnixMilli(123456789012).UTC()), + Names: []string{"foo", "bar"}, + }) + if a.So(err, should.BeNil) { + a.So( + b, + should.Resemble, + []byte(`{"type":"subscribe","id":66,"identifiers":[{"application_ids":{"application_id":"foo"}},{"client_ids":{"client_id":"bar"}}],"tail":10,"after":"1973-11-29T21:33:09.012Z","names":["foo","bar"]}`), // nolint:lll + ) + } + var subReq protocol.SubscribeRequest + err = json.Unmarshal( + []byte(`{"type":"subscribe","id":66,"identifiers":[{"application_ids":{"application_id":"foo"}},{"client_ids":{"client_id":"bar"}}],"tail":10,"after":"1973-11-29T21:33:09.012Z","names":["foo","bar"]}`), // nolint:lll + &subReq, + ) + if a.So(err, should.BeNil) { + a.So(subReq, should.Resemble, protocol.SubscribeRequest{ + ID: 0x42, + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + (&ttnpb.ClientIdentifiers{ClientId: "bar"}).GetEntityIdentifiers(), + }, + Tail: 10, + After: timePtr(time.UnixMilli(123456789012).UTC()), + Names: []string{"foo", "bar"}, + }) + } + + b, err = json.Marshal(&protocol.SubscribeResponse{ + ID: 0x42, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`{"type":"subscribe","id":66}`)) + } + var subResp protocol.SubscribeResponse + err = json.Unmarshal([]byte(`{"type":"subscribe","id":66}`), &subResp) + if a.So(err, should.BeNil) { + a.So(subResp, should.Resemble, protocol.SubscribeResponse{ID: 0x42}) + } + + b, err = json.Marshal(&protocol.UnsubscribeRequest{ + ID: 0x42, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`{"type":"unsubscribe","id":66}`)) + } + var unsubReq protocol.UnsubscribeRequest + err = json.Unmarshal([]byte(`{"type":"unsubscribe","id":66}`), &unsubReq) + if a.So(err, should.BeNil) { + a.So(unsubReq, should.Resemble, protocol.UnsubscribeRequest{ID: 0x42}) + } + + b, err = json.Marshal(&protocol.UnsubscribeResponse{ + ID: 0x42, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(`{"type":"unsubscribe","id":66}`)) + } + var unsubResp protocol.UnsubscribeResponse + err = json.Unmarshal([]byte(`{"type":"unsubscribe","id":66}`), &unsubResp) + if a.So(err, should.BeNil) { + a.So(unsubResp, should.Resemble, protocol.UnsubscribeResponse{ID: 0x42}) + } + + b, err = json.Marshal(&protocol.PublishResponse{ + ID: 0x42, + Event: &ttnpb.Event{ + Name: "foo", + Time: timestamppb.New(time.UnixMilli(123456789012).UTC()), + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + }, + Data: test.Must(anypb.New(&ttnpb.ApplicationUp{ + Up: &ttnpb.ApplicationUp_UplinkMessage{ + UplinkMessage: &ttnpb.ApplicationUplink{}, + }, + })), + CorrelationIds: []string{"foo", "bar"}, + }, + }) + if a.So(err, should.BeNil) { + a.So( + b, + should.Resemble, + []byte(`{"type":"publish","id":66,"event":{"name":"foo","time":"1973-11-29T21:33:09.012Z","identifiers":[{"application_ids":{"application_id":"foo"}}],"data":{"@type":"type.googleapis.com/ttn.lorawan.v3.ApplicationUp","uplink_message":{}},"correlation_ids":["foo","bar"]}}`), // nolint:lll + ) + } + var pubResp protocol.PublishResponse + err = json.Unmarshal( + []byte(`{"type":"publish","id":66,"event":{"name":"foo","time":"1973-11-29T21:33:09.012Z","identifiers":[{"application_ids":{"application_id":"foo"}}],"data":{"@type":"type.googleapis.com/ttn.lorawan.v3.ApplicationUp","uplink_message":{}},"correlation_ids":["foo","bar"]}}`), // nolint:lll + &pubResp, + ) + if a.So(err, should.BeNil) { + a.So(pubResp, should.Resemble, protocol.PublishResponse{ + ID: 0x42, + Event: &ttnpb.Event{ + Name: "foo", + Time: timestamppb.New(time.UnixMilli(123456789012).UTC()), + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + }, + Data: test.Must(anypb.New(&ttnpb.ApplicationUp{ + Up: &ttnpb.ApplicationUp_UplinkMessage{ + UplinkMessage: &ttnpb.ApplicationUplink{}, + }, + })), + CorrelationIds: []string{"foo", "bar"}, + }, + }) + } + + errDefinition := errors.DefineInvalidArgument("bad_argument", "bad argument `{argument}`") + errInstance := errDefinition.WithAttributes("argument", "foo") + errStatus := status.Convert(errInstance) + errJSON := test.Must(json.Marshal(errInstance)) + b, err = json.Marshal(&protocol.ErrorResponse{ + ID: 0x42, + Error: errStatus, + }) + if a.So(err, should.BeNil) { + a.So(b, should.Resemble, []byte(fmt.Sprintf(`{"type":"error","id":66,"error":%v}`, string(errJSON)))) // nolint:lll + } + var errResp protocol.ErrorResponse + err = json.Unmarshal([]byte(fmt.Sprintf(`{"type":"error","id":66,"error":%v}`, string(errJSON))), &errResp) // nolint:lll + if a.So(err, should.BeNil) { + a.So(errResp, should.Resemble, protocol.ErrorResponse{ + ID: 0x42, + Error: errStatus, + }) + } + + var reqWrapper protocol.RequestWrapper + err = json.Unmarshal( + []byte(`{"type":"subscribe","id":66,"identifiers":[{"application_ids":{"application_id":"foo"}},{"client_ids":{"client_id":"bar"}}],"tail":10,"after":"1973-11-29T21:33:09.012Z","names":["foo","bar"]}`), // nolint:lll + &reqWrapper, + ) + if a.So(err, should.BeNil) { + a.So(reqWrapper, should.Resemble, protocol.RequestWrapper{ + Contents: &protocol.SubscribeRequest{ + ID: 0x42, + Identifiers: []*ttnpb.EntityIdentifiers{ + (&ttnpb.ApplicationIdentifiers{ApplicationId: "foo"}).GetEntityIdentifiers(), + (&ttnpb.ClientIdentifiers{ClientId: "bar"}).GetEntityIdentifiers(), + }, + Tail: 10, + After: timePtr(time.UnixMilli(123456789012).UTC()), + Names: []string{"foo", "bar"}, + }, + }) + } + + var respWrapper protocol.ResponseWrapper + err = json.Unmarshal([]byte(`{"type":"subscribe","id":66}`), &respWrapper) + if a.So(err, should.BeNil) { + a.So(respWrapper, should.Resemble, protocol.ResponseWrapper{ + Contents: &protocol.SubscribeResponse{ + ID: 0x42, + }, + }) + } +} + +func timePtr(t time.Time) *time.Time { + return &t +} diff --git a/pkg/console/internal/events/subscriptions/subscriptions.go b/pkg/console/internal/events/subscriptions/subscriptions.go new file mode 100644 index 0000000000..0a099fffdd --- /dev/null +++ b/pkg/console/internal/events/subscriptions/subscriptions.go @@ -0,0 +1,255 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package subscriptions implements the events mux subscriptions. +package subscriptions + +import ( + "context" + "sync" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil" + "go.thethings.network/lorawan-stack/v3/pkg/errors" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" +) + +// SubscriptionEvent wraps an events.Event with a subscription ID. +type SubscriptionEvent struct { + ID uint64 + Event events.Event +} + +// Interface is the interface for the events mux subscriptions. +type Interface interface { + // Subscribe subscribes to events. + Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, + ) error + // Unsubscribe unsubscribe to events. + Unsubscribe(id uint64) error + + // SubscriptionEvents provides the events for the underlying subscriptions. + SubscriptionEvents() <-chan *SubscriptionEvent + + // Close closes all of the underlying subscriptions and waits for the background tasks to finish. + Close() error +} + +type subscription struct { + id uint64 + cancel func(error) + wg sync.WaitGroup + cancelParent func(error) + inputCh <-chan events.Event + outputCh chan<- *SubscriptionEvent +} + +func (s *subscription) run(ctx context.Context) (err error) { + defer func() { + select { + case <-ctx.Done(): + default: + s.cancelParent(err) + } + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case evt := <-s.inputCh: + isVisible, err := rightsutil.EventIsVisible(ctx, evt) + if err != nil { + if err := rights.RequireAny(ctx, evt.Identifiers()...); err != nil { + return err + } + log.FromContext(ctx).WithError(err).Warn("Failed to check event visibility") + continue + } + if !isVisible { + continue + } + select { + case <-ctx.Done(): + return ctx.Err() + case s.outputCh <- &SubscriptionEvent{ + ID: s.id, + Event: evt, + }: + } + } + } +} + +type subscriptions struct { + ctx context.Context + cancel func(error) + subscriber events.Subscriber + definedNames map[string]struct{} + taskStarter task.Starter + + wg sync.WaitGroup + ch chan *SubscriptionEvent + subs map[uint64]*subscription +} + +var _ Interface = (*subscriptions)(nil) + +// Close implements Interface. +func (s *subscriptions) Close() error { + for id, sub := range s.subs { + delete(s.subs, id) + sub.cancel(nil) + sub.wg.Wait() + } + s.wg.Wait() + return nil +} + +// SubscriptionEvents implements Interface. +func (s *subscriptions) SubscriptionEvents() <-chan *SubscriptionEvent { return s.ch } + +var ( + errAlreadySubscribed = errors.DefineAlreadyExists("already_subscribed", "already subscribed with ID `{id}`") + errNoIdentifiers = errors.DefineInvalidArgument("no_identifiers", "no identifiers") +) + +// Subscribe implements Interface. +func (s *subscriptions) Subscribe( + id uint64, identifiers []*ttnpb.EntityIdentifiers, after *time.Time, tail uint32, names []string, +) (err error) { + if err := s.validateSubscribe(id, identifiers); err != nil { + return err + } + names, err = events.NamesFromPatterns(s.definedNames, names) + if err != nil { + return err + } + ch := make(chan events.Event, channelSize(tail)) + ctx, cancel := context.WithCancelCause(s.ctx) + defer func() { + if err != nil { + cancel(err) + } + }() + if store, hasStore := s.subscriber.(events.Store); hasStore { + if after == nil && tail == 0 { + now := time.Now() + after = &now + } + f := func(ctx context.Context) (err error) { + defer func() { + select { + case <-ctx.Done(): + default: + s.cancel(err) + } + }() + return store.SubscribeWithHistory(ctx, names, identifiers, after, int(tail), events.Channel(ch)) + } + s.wg.Add(1) + s.taskStarter.StartTask(&task.Config{ + Context: ctx, + ID: "console_events_subscribe", + Func: f, + Done: s.wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + } else { + if err := s.subscriber.Subscribe(ctx, names, identifiers, events.Channel(ch)); err != nil { + return err + } + } + sub := &subscription{ + id: id, + cancel: cancel, + cancelParent: s.cancel, + inputCh: ch, + outputCh: s.ch, + } + sub.wg.Add(1) + s.taskStarter.StartTask(&task.Config{ + Context: ctx, + ID: "console_events_filter", + Func: sub.run, + Done: sub.wg.Done, + Restart: task.RestartNever, + Backoff: task.DefaultBackoffConfig, + }) + s.subs[id] = sub + return nil +} + +var errNotSubscribed = errors.DefineNotFound("not_subscribed", "not subscribed with ID `{id}`") + +// Unsubscribe implements Interface. +func (s *subscriptions) Unsubscribe(id uint64) error { + sub, ok := s.subs[id] + if !ok { + return errNotSubscribed.WithAttributes("id", id) + } + delete(s.subs, id) + sub.cancel(nil) + sub.wg.Wait() + return nil +} + +// New returns a new Interface. +func New( + ctx context.Context, + cancel func(error), + subscriber events.Subscriber, + definedNames map[string]struct{}, + taskStarter task.Starter, +) Interface { + return &subscriptions{ + ctx: ctx, + cancel: cancel, + subscriber: subscriber, + definedNames: definedNames, + taskStarter: taskStarter, + ch: make(chan *SubscriptionEvent, 1), + subs: make(map[uint64]*subscription), + } +} + +func (s *subscriptions) validateSubscribe(id uint64, identifiers []*ttnpb.EntityIdentifiers) error { + if _, ok := s.subs[id]; ok { + return errAlreadySubscribed.WithAttributes("id", id) + } + if len(identifiers) == 0 { + return errNoIdentifiers.New() + } + for _, ids := range identifiers { + if err := ids.ValidateFields(); err != nil { + return err + } + } + return rights.RequireAny(s.ctx, identifiers...) +} + +func channelSize(n uint32) uint32 { + if n < 8 { + n = 8 + } + if n > 1024 { + n = 1024 + } + return n +} diff --git a/pkg/console/internal/events/subscriptions/subscriptions_test.go b/pkg/console/internal/events/subscriptions/subscriptions_test.go new file mode 100644 index 0000000000..10aaf738bb --- /dev/null +++ b/pkg/console/internal/events/subscriptions/subscriptions_test.go @@ -0,0 +1,303 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package subscriptions_test + +import ( + "context" + "sync" + "testing" + "time" + + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/events" + "go.thethings.network/lorawan-stack/v3/pkg/task" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" + "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" +) + +type subscribeRequest struct { + Context context.Context + Names []string + Identifiers []*ttnpb.EntityIdentifiers + After *time.Time + Tail int + Handler events.Handler + + Response chan<- error +} + +type mockSubscriber struct { + subReqs chan subscribeRequest +} + +func (m *mockSubscriber) subscribeRequests() <-chan subscribeRequest { return m.subReqs } + +// Subscribe implements events.Subscriber. +func (m *mockSubscriber) Subscribe( + ctx context.Context, names []string, identifiers []*ttnpb.EntityIdentifiers, hdl events.Handler, +) error { + ch := make(chan error, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case m.subReqs <- subscribeRequest{ + Context: ctx, + Names: names, + Identifiers: identifiers, + Handler: hdl, + + Response: ch, + }: + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-ch: + return err + } + } +} + +var _ events.Subscriber = (*mockSubscriber)(nil) + +type mockPubSubStore struct { + subReqs chan subscribeRequest +} + +func (m *mockPubSubStore) subscribeRequests() <-chan subscribeRequest { return m.subReqs } + +func (*mockPubSubStore) historical() {} + +// Publish implements events.Store. +func (*mockPubSubStore) Publish(...events.Event) { panic("not implemented") } + +// Subscribe implements events.Store. +func (*mockPubSubStore) Subscribe(context.Context, []string, []*ttnpb.EntityIdentifiers, events.Handler) error { + panic("not implemented") +} + +// FindRelated implements events.Store. +func (*mockPubSubStore) FindRelated(context.Context, string) ([]events.Event, error) { + panic("not implemented") +} + +// FetchHistory implements events.Store. +func (*mockPubSubStore) FetchHistory( + context.Context, []string, []*ttnpb.EntityIdentifiers, *time.Time, int, +) ([]events.Event, error) { + panic("not implemented") +} + +// SubscribeWithHistory implements events.Store. +func (m *mockPubSubStore) SubscribeWithHistory( + ctx context.Context, names []string, ids []*ttnpb.EntityIdentifiers, after *time.Time, tail int, hdl events.Handler, +) error { + ch := make(chan error, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case m.subReqs <- subscribeRequest{ + Context: ctx, + Names: names, + Identifiers: ids, + After: after, + Tail: tail, + Handler: hdl, + + Response: ch, + }: + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-ch: + return err + } + } +} + +var _ events.Store = (*mockPubSubStore)(nil) + +func runTestSubscriptions( + t *testing.T, + subscriber interface { + events.Subscriber + subscribeRequests() <-chan subscribeRequest + }, +) { + t.Helper() + + _, historical := subscriber.(interface{ historical() }) + + a, ctx := test.New(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + timeout := test.Delay << 3 + app1IDs, app2IDs := &ttnpb.ApplicationIdentifiers{ + ApplicationId: "foo", + }, &ttnpb.ApplicationIdentifiers{ + ApplicationId: "bar", + } + ctx = rights.NewContext(ctx, &rights.Rights{ + ApplicationRights: *rights.NewMap(map[string]*ttnpb.Rights{ + unique.ID(ctx, app1IDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_APPLICATION_ALL), + }), + }) + + sub := subscriptions.New( + ctx, + cancel, + subscriber, + map[string]struct{}{ + "test": {}, + }, + task.StartTaskFunc(task.DefaultStartTask), + ) + defer sub.Close() + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case req := <-subscriber.subscribeRequests(): + t.Fatal("Unexpected subscribe request", req) + } + + now := time.Now() + + var wg sync.WaitGroup + defer wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + err := sub.Subscribe( + 1, + []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }, + &now, + 10, + []string{"test"}, + ) + a.So(err, should.BeNil) + }() + var handler events.Handler + select { + case <-ctx.Done(): + return + case req := <-subscriber.subscribeRequests(): + a.So(req.Context, should.HaveParentContextOrEqual, ctx) + a.So(req.Names, should.Resemble, []string{"test"}) + a.So(req.Identifiers, should.Resemble, []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }) + if historical { + a.So(req.After, should.Resemble, &now) + a.So(req.Tail, should.Equal, 10) + } + a.So(req.Handler, should.NotBeNil) + if !historical { + select { + case <-ctx.Done(): + return + case req.Response <- nil: + } + } + handler = req.Handler + } + wg.Wait() + + err := sub.Subscribe( + 1, + []*ttnpb.EntityIdentifiers{ + app1IDs.GetEntityIdentifiers(), + }, + &now, + 10, + []string{"test"}, + ) + a.So(err, should.NotBeNil) + + evt := events.New( + ctx, + "test", + "test", + events.WithIdentifiers(app2IDs), + events.WithVisibility(ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ), + ) + handler.Notify(evt) + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case subEvt := <-sub.SubscriptionEvents(): + t.Fatal("Unexpected subscription event", subEvt) + } + + evt = events.New( + ctx, + "test", + "test", + events.WithIdentifiers(app1IDs), + events.WithVisibility(ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ), + ) + handler.Notify(evt) + + select { + case <-ctx.Done(): + return + case subEvt := <-sub.SubscriptionEvents(): + a.So(subEvt.ID, should.Equal, 1) + a.So(subEvt.Event, should.ResembleEvent, evt) + } + + err = sub.Unsubscribe(1) + a.So(err, should.BeNil) + + err = sub.Unsubscribe(1) + a.So(err, should.NotBeNil) + + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + case subEvt := <-sub.SubscriptionEvents(): + t.Fatal("Unexpected subscription event", subEvt) + } +} + +func TestSubscriptions(t *testing.T) { + t.Parallel() + runTestSubscriptions( + t, + &mockSubscriber{ + subReqs: make(chan subscribeRequest, 1), + }, + ) +} + +func TestStoreSubscriptions(t *testing.T) { + t.Parallel() + runTestSubscriptions( + t, + &mockPubSubStore{ + subReqs: make(chan subscribeRequest, 1), + }, + ) +} diff --git a/pkg/console/internal/events/tasks.go b/pkg/console/internal/events/tasks.go new file mode 100644 index 0000000000..1130b7297b --- /dev/null +++ b/pkg/console/internal/events/tasks.go @@ -0,0 +1,76 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "errors" + "io" + + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" + "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" + "go.thethings.network/lorawan-stack/v3/pkg/log" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +func makeMuxTask(m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + return m.Run(ctx) + } +} + +func makeReadTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + defer func() { + if closeErr := (websocket.CloseError{}); errors.As(err, &closeErr) { + log.FromContext(ctx).WithFields(log.Fields( + "code", closeErr.Code, + "reason", closeErr.Reason, + )).Debug("WebSocket closed") + err = io.EOF + } + }() + for { + var request protocol.RequestWrapper + if err := wsjson.Read(ctx, conn, &request); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case m.Requests() <- request.Contents: + } + } + } +} + +func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(error)) func(context.Context) error { + return func(ctx context.Context) (err error) { + defer func() { cancel(err) }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case response := <-m.Responses(): + if err := wsjson.Write(ctx, conn, response); err != nil { + return err + } + } + } + } +} diff --git a/pkg/devicerepository/grpc.go b/pkg/devicerepository/grpc.go index c0d490b751..bad776016a 100644 --- a/pkg/devicerepository/grpc.go +++ b/pkg/devicerepository/grpc.go @@ -65,7 +65,7 @@ func (dr *DeviceRepository) ListBrands( ctx context.Context, req *ttnpb.ListEndDeviceBrandsRequest, ) (*ttnpb.ListEndDeviceBrandsResponse, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } if req.Limit > defaultLimit || req.Limit == 0 { @@ -97,7 +97,7 @@ func (dr *DeviceRepository) GetBrand( ctx context.Context, req *ttnpb.GetEndDeviceBrandRequest, ) (*ttnpb.EndDeviceBrand, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } response, err := dr.store.GetBrands(store.GetBrandsRequest{ @@ -121,7 +121,7 @@ func (dr *DeviceRepository) ListModels( ctx context.Context, req *ttnpb.ListEndDeviceModelsRequest, ) (*ttnpb.ListEndDeviceModelsResponse, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } if req.Limit > defaultLimit || req.Limit == 0 { @@ -152,7 +152,7 @@ func (dr *DeviceRepository) GetModel( ctx context.Context, req *ttnpb.GetEndDeviceModelRequest, ) (*ttnpb.EndDeviceModel, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } response, err := dr.store.GetModels(store.GetModelsRequest{ @@ -177,7 +177,7 @@ func (dr *DeviceRepository) GetTemplate( ctx context.Context, req *ttnpb.GetTemplateRequest, ) (*ttnpb.EndDeviceTemplate, error) { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } return dr.store.GetTemplate(req, nil) @@ -189,7 +189,7 @@ func getDecoder( f func(store.GetCodecRequest) (*ttnpb.MessagePayloadDecoder, error), ) (*ttnpb.MessagePayloadDecoder, error) { if clusterauth.Authorized(ctx) != nil { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } } @@ -218,7 +218,7 @@ func (dr *DeviceRepository) GetDownlinkEncoder( req *ttnpb.GetPayloadFormatterRequest, ) (*ttnpb.MessagePayloadEncoder, error) { if clusterauth.Authorized(ctx) != nil { - if err := rights.RequireAuthentication(ctx); err != nil { + if err := rights.RequireAuthenticated(ctx); err != nil { return nil, err } } diff --git a/pkg/events/grpc/grpc.go b/pkg/events/grpc/grpc.go index 0de69e6009..be250151c4 100644 --- a/pkg/events/grpc/grpc.go +++ b/pkg/events/grpc/grpc.go @@ -19,9 +19,6 @@ package grpc import ( "context" "os" - "regexp" - "sort" - "strings" "time" grpc_runtime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -65,58 +62,6 @@ type EventsServer struct { definedNames map[string]struct{} } -var ( - errInvalidRegexp = errors.DefineInvalidArgument("invalid_regexp", "invalid regexp") - errNoMatchingEvents = errors.DefineInvalidArgument("no_matching_events", "no matching events for regexp `{regexp}`") - errUnknownEventName = errors.DefineInvalidArgument("unknown_event_name", "unknown event `{name}`") -) - -func (srv *EventsServer) processNames(names ...string) ([]string, error) { - if len(names) == 0 { - return nil, nil - } - nameMap := make(map[string]struct{}) - for _, name := range names { - if strings.HasPrefix(name, "/") && strings.HasSuffix(name, "/") { - re, err := regexp.Compile(strings.Trim(name, "/")) - if err != nil { - return nil, errInvalidRegexp.WithCause(err) - } - var found bool - for defined := range srv.definedNames { - if re.MatchString(defined) { - nameMap[defined] = struct{}{} - found = true - } - } - if !found { - return nil, errNoMatchingEvents.WithAttributes("regexp", re.String()) - } - } else { - var found bool - for defined := range srv.definedNames { - if name == defined { - nameMap[name] = struct{}{} - found = true - break - } - } - if !found { - return nil, errUnknownEventName.WithAttributes("name", name) - } - } - } - if len(nameMap) == 0 { - return nil, nil - } - out := make([]string, 0, len(nameMap)) - for name := range nameMap { - out = append(out, name) - } - sort.Strings(out) - return out, nil -} - var errNoIdentifiers = errors.DefineInvalidArgument("no_identifiers", "no identifiers") // Stream implements the EventsServer interface. @@ -125,7 +70,7 @@ func (srv *EventsServer) Stream(req *ttnpb.StreamEventsRequest, stream ttnpb.Eve return errNoIdentifiers.New() } - names, err := srv.processNames(req.Names...) + names, err := events.NamesFromPatterns(srv.definedNames, req.Names) if err != nil { return err } diff --git a/pkg/events/pattern.go b/pkg/events/pattern.go new file mode 100644 index 0000000000..bb3d5389c3 --- /dev/null +++ b/pkg/events/pattern.go @@ -0,0 +1,77 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "regexp" + "sort" + "strings" + + "go.thethings.network/lorawan-stack/v3/pkg/errors" +) + +var ( + errInvalidRegexp = errors.DefineInvalidArgument("invalid_regexp", "invalid regexp") + errNoMatchingEvents = errors.DefineInvalidArgument("no_matching_events", "no matching events for regexp `{regexp}`") + errUnknownEventName = errors.DefineInvalidArgument("unknown_event_name", "unknown event `{name}`") +) + +// NamesFromPatterns returns the event names which match the given patterns. +// The defined names are a set of event names which are used to match the patterns. +func NamesFromPatterns(definedNames map[string]struct{}, patterns []string) ([]string, error) { + if len(patterns) == 0 { + return nil, nil + } + nameMap := make(map[string]struct{}) + for _, name := range patterns { + if strings.HasPrefix(name, "/") && strings.HasSuffix(name, "/") { + re, err := regexp.Compile(strings.Trim(name, "/")) + if err != nil { + return nil, errInvalidRegexp.WithCause(err) + } + var found bool + for defined := range definedNames { + if re.MatchString(defined) { + nameMap[defined] = struct{}{} + found = true + } + } + if !found { + return nil, errNoMatchingEvents.WithAttributes("regexp", re.String()) + } + } else { + var found bool + for defined := range definedNames { + if name == defined { + nameMap[name] = struct{}{} + found = true + break + } + } + if !found { + return nil, errUnknownEventName.WithAttributes("name", name) + } + } + } + if len(nameMap) == 0 { + return nil, nil + } + out := make([]string, 0, len(nameMap)) + for name := range nameMap { + out = append(out, name) + } + sort.Strings(out) + return out, nil +} diff --git a/pkg/networkserver/grpc_deviceregistry.go b/pkg/networkserver/grpc_deviceregistry.go index b89a10d032..2ef6b5748d 100644 --- a/pkg/networkserver/grpc_deviceregistry.go +++ b/pkg/networkserver/grpc_deviceregistry.go @@ -948,40 +948,39 @@ var ( legacyADRSettingsFields = []string{ "mac_settings.adr_margin", - "mac_settings.use_adr", "mac_settings.use_adr.value", + "mac_settings.use_adr", } adrSettingsFields = []string{ - "mac_settings.adr", - "mac_settings.adr.mode", "mac_settings.adr.mode.disabled", - "mac_settings.adr.mode.dynamic", - "mac_settings.adr.mode.dynamic.channel_steering", - "mac_settings.adr.mode.dynamic.channel_steering.mode", "mac_settings.adr.mode.dynamic.channel_steering.mode.disabled", "mac_settings.adr.mode.dynamic.channel_steering.mode.lora_narrow", + "mac_settings.adr.mode.dynamic.channel_steering.mode", + "mac_settings.adr.mode.dynamic.channel_steering", "mac_settings.adr.mode.dynamic.margin", - "mac_settings.adr.mode.dynamic.max_data_rate_index", "mac_settings.adr.mode.dynamic.max_data_rate_index.value", + "mac_settings.adr.mode.dynamic.max_data_rate_index", "mac_settings.adr.mode.dynamic.max_nb_trans", "mac_settings.adr.mode.dynamic.max_tx_power_index", - "mac_settings.adr.mode.dynamic.min_data_rate_index", "mac_settings.adr.mode.dynamic.min_data_rate_index.value", + "mac_settings.adr.mode.dynamic.min_data_rate_index", "mac_settings.adr.mode.dynamic.min_nb_trans", "mac_settings.adr.mode.dynamic.min_tx_power_index", - "mac_settings.adr.mode.static", + "mac_settings.adr.mode.dynamic", "mac_settings.adr.mode.static.data_rate_index", "mac_settings.adr.mode.static.nb_trans", "mac_settings.adr.mode.static.tx_power_index", + "mac_settings.adr.mode.static", + "mac_settings.adr.mode", + "mac_settings.adr", } dynamicADRSettingsFields = []string{ - "mac_settings.adr.mode.dynamic", - "mac_settings.adr.mode.dynamic.channel_steering", - "mac_settings.adr.mode.dynamic.channel_steering.mode", "mac_settings.adr.mode.dynamic.channel_steering.mode.disabled", "mac_settings.adr.mode.dynamic.channel_steering.mode.lora_narrow", + "mac_settings.adr.mode.dynamic.channel_steering.mode", + "mac_settings.adr.mode.dynamic.channel_steering", "mac_settings.adr.mode.dynamic.margin", "mac_settings.adr.mode.dynamic.max_data_rate_index.value", "mac_settings.adr.mode.dynamic.max_nb_trans", @@ -989,6 +988,7 @@ var ( "mac_settings.adr.mode.dynamic.min_data_rate_index.value", "mac_settings.adr.mode.dynamic.min_nb_trans", "mac_settings.adr.mode.dynamic.min_tx_power_index", + "mac_settings.adr.mode.dynamic", } ) @@ -1244,36 +1244,36 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest if st.HasSetField( "frequency_plan_id", "lorawan_phy_version", - "mac_settings.adr", - "mac_settings.adr.mode", "mac_settings.adr.mode.disabled", - "mac_settings.adr.mode.dynamic", - "mac_settings.adr.mode.dynamic.channel_steering", - "mac_settings.adr.mode.dynamic.channel_steering.mode", "mac_settings.adr.mode.dynamic.channel_steering.mode.disabled", "mac_settings.adr.mode.dynamic.channel_steering.mode.lora_narrow", + "mac_settings.adr.mode.dynamic.channel_steering.mode", + "mac_settings.adr.mode.dynamic.channel_steering", "mac_settings.adr.mode.dynamic.margin", - "mac_settings.adr.mode.dynamic.max_data_rate_index", "mac_settings.adr.mode.dynamic.max_data_rate_index.value", + "mac_settings.adr.mode.dynamic.max_data_rate_index", "mac_settings.adr.mode.dynamic.max_nb_trans", "mac_settings.adr.mode.dynamic.max_tx_power_index", - "mac_settings.adr.mode.dynamic.min_data_rate_index", "mac_settings.adr.mode.dynamic.min_data_rate_index.value", + "mac_settings.adr.mode.dynamic.min_data_rate_index", "mac_settings.adr.mode.dynamic.min_nb_trans", "mac_settings.adr.mode.dynamic.min_tx_power_index", - "mac_settings.adr.mode.static", + "mac_settings.adr.mode.dynamic", "mac_settings.adr.mode.static.data_rate_index", "mac_settings.adr.mode.static.nb_trans", "mac_settings.adr.mode.static.tx_power_index", + "mac_settings.adr.mode.static", + "mac_settings.adr.mode", + "mac_settings.adr", + "mac_settings.desired_ping_slot_data_rate_index.value", + "mac_settings.desired_rx2_data_rate_index.value", + "mac_settings.downlink_dwell_time.value", "mac_settings.factory_preset_frequencies", + "mac_settings.ping_slot_data_rate_index.value", "mac_settings.ping_slot_frequency.value", - "mac_settings.use_adr.value", "mac_settings.rx2_data_rate_index.value", - "mac_settings.desired_rx2_data_rate_index.value", - "mac_settings.ping_slot_data_rate_index.value", - "mac_settings.desired_ping_slot_data_rate_index.value", "mac_settings.uplink_dwell_time.value", - "mac_settings.downlink_dwell_time.value", + "mac_settings.use_adr.value", "mac_state.current_parameters.adr_data_rate_index", "mac_state.current_parameters.adr_tx_power_index", "mac_state.current_parameters.channels", @@ -1333,18 +1333,8 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest "frequency_plan_id", "lorawan_phy_version", ) - - hasSetFieldWithFallback := func(field, fallbackField string) (fieldToRetrieve string, validate bool) { - if st.HasSetField(field) { - return field, true - } - return fallbackField, hasPHYUpdate - } hasSetField := func(field string) (fieldToRetrieve string, validate bool) { - return hasSetFieldWithFallback(field, field) - } - hasSetADRField := func(field string) (fieldToRetrieve string, validate bool) { - return hasSetFieldWithFallback(field, "mac_settings.adr.mode") + return field, st.HasSetField(field) || hasPHYUpdate } setFields := func(fields ...string) []string { @@ -1449,7 +1439,7 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest return nil, err } } - if field, validate := hasSetADRField("mac_settings.adr.mode.dynamic.max_data_rate_index.value"); validate { + if field, validate := hasSetField("mac_settings.adr.mode.dynamic.max_data_rate_index.value"); validate { if err := st.WithField(func(dev *ttnpb.EndDevice) error { return withPHY(func(phy *band.Band, _ *frequencyplans.FrequencyPlan) error { if dev.GetMacSettings().GetAdr().GetDynamic().GetMaxDataRateIndex() == nil { @@ -1468,7 +1458,7 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest return nil, err } } - if field, validate := hasSetADRField("mac_settings.adr.mode.dynamic.min_data_rate_index.value"); validate { + if field, validate := hasSetField("mac_settings.adr.mode.dynamic.min_data_rate_index.value"); validate { if err := st.WithField(func(dev *ttnpb.EndDevice) error { return withPHY(func(phy *band.Band, _ *frequencyplans.FrequencyPlan) error { if dev.GetMacSettings().GetAdr().GetDynamic().GetMinDataRateIndex() == nil { @@ -1487,7 +1477,7 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest return nil, err } } - if field, validate := hasSetADRField("mac_settings.adr.mode.dynamic.max_tx_power_index"); validate { + if field, validate := hasSetField("mac_settings.adr.mode.dynamic.max_tx_power_index"); validate { if err := st.WithField(func(dev *ttnpb.EndDevice) error { return withPHY(func(phy *band.Band, _ *frequencyplans.FrequencyPlan) error { if dev.GetMacSettings().GetAdr().GetDynamic().GetMaxTxPowerIndex() == nil { @@ -1504,7 +1494,7 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest return nil, err } } - if field, validate := hasSetADRField("mac_settings.adr.mode.dynamic.min_tx_power_index"); validate { + if field, validate := hasSetField("mac_settings.adr.mode.dynamic.min_tx_power_index"); validate { if err := st.WithField(func(dev *ttnpb.EndDevice) error { return withPHY(func(phy *band.Band, _ *frequencyplans.FrequencyPlan) error { if dev.GetMacSettings().GetAdr().GetDynamic().GetMinTxPowerIndex() == nil { @@ -1544,7 +1534,7 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest return nil, err } } - if field, validate := hasSetADRField("mac_settings.adr.mode.static.data_rate_index"); validate { + if field, validate := hasSetField("mac_settings.adr.mode.static.data_rate_index"); validate { if err := st.WithField(func(dev *ttnpb.EndDevice) error { return withPHY(func(phy *band.Band, _ *frequencyplans.FrequencyPlan) error { if dev.GetMacSettings().GetAdr().GetStatic() == nil { @@ -1562,7 +1552,7 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest return nil, err } } - if field, validate := hasSetADRField("mac_settings.adr.mode.static.tx_power_index"); validate { + if field, validate := hasSetField("mac_settings.adr.mode.static.tx_power_index"); validate { if err := st.WithField(func(dev *ttnpb.EndDevice) error { return withPHY(func(phy *band.Band, _ *frequencyplans.FrequencyPlan) error { if dev.GetMacSettings().GetAdr().GetStatic() == nil { @@ -2682,7 +2672,14 @@ func (ns *NetworkServer) Set(ctx context.Context, req *ttnpb.SetEndDeviceRequest } var evt events.Event - dev, ctx, err := ns.devices.SetByID(ctx, st.Device.Ids.ApplicationIds, st.Device.Ids.DeviceId, st.GetFields(), st.SetFunc(func(ctx context.Context, stored *ttnpb.EndDevice) error { + dev, ctx, err := ns.devices.SetByID(ctx, st.Device.Ids.ApplicationIds, st.Device.Ids.DeviceId, ttnpb.EndDeviceFieldPathsTopLevel, st.SetFunc(func(ctx context.Context, stored *ttnpb.EndDevice) error { + if nonZeroFields := ttnpb.NonZeroFields(stored, st.GetFields()...); len(nonZeroFields) > 0 { + newStored := &ttnpb.EndDevice{} + if err := newStored.SetFields(stored, nonZeroFields...); err != nil { + return err + } + stored = newStored + } if hasSession { macVersion := stored.GetMacState().GetLorawanVersion() if stored.GetMacState() == nil && !st.HasSetField("mac_state") { diff --git a/pkg/packetbrokeragent/agent.go b/pkg/packetbrokeragent/agent.go index c8a99fced2..cb000bfc4b 100644 --- a/pkg/packetbrokeragent/agent.go +++ b/pkg/packetbrokeragent/agent.go @@ -178,7 +178,7 @@ func New(c *component.Component, conf *Config, opts ...Option) (*Agent, error) { } devAddrPrefix := types.DevAddrPrefix{ DevAddr: devAddr, - Length: uint8(conf.NetID.IDBits()), + Length: uint8(32 - types.NwkAddrBits(conf.NetID)), } devAddrPrefixes = append(devAddrPrefixes, devAddrPrefix) } diff --git a/pkg/webui/account/views/app/index.js b/pkg/webui/account/views/app/index.js index 8f4fd0adaa..284175f326 100644 --- a/pkg/webui/account/views/app/index.js +++ b/pkg/webui/account/views/app/index.js @@ -14,7 +14,15 @@ import { useSelector, useDispatch } from 'react-redux' import React, { useEffect } from 'react' -import { Routes, Route, BrowserRouter } from 'react-router-dom' +import { + Routes, + Route, + BrowserRouter, + ScrollRestoration, + createBrowserRouter, + RouterProvider, + Outlet, +} from 'react-router-dom' import { Helmet } from 'react-helmet' import { ToastContainer } from '@ttn-lw/components/toast' @@ -27,7 +35,6 @@ import Header from '@account/containers/header' import Landing from '@account/views/landing' import Authorize from '@account/views/authorize' -import PropTypes from '@ttn-lw/lib/prop-types' import { selectApplicationSiteName, selectApplicationSiteTitle, @@ -44,8 +51,34 @@ const siteTitle = selectApplicationSiteTitle() const pageData = selectPageData() const errorRender = error => } /> +const getScrollRestorationKey = location => { + // Preserve scroll position only when necessary. + // E.g. we don't want to scroll to top when changing tabs of a table, + // but we do want to scroll to top when changing pages. + const { pathname, search } = location + const params = new URLSearchParams(search) + const page = params.get('page') + + return `${pathname}${page ? `?page=${page}` : ''}` +} -const AccountApp = ({ history }) => { +const Layout = () => ( + <> + + + + + + + + + +) + +const AccountRoot = () => { const user = useSelector(selectUser) const dispatch = useDispatch() @@ -72,28 +105,19 @@ const AccountApp = ({ history }) => { } return ( - <> - - - - - - - - - - - - - + + }> + + + + ) } -AccountApp.propTypes = { - history: PropTypes.history.isRequired, -} +const router = createBrowserRouter([{ path: '*', Component: AccountRoot }], { + basename: '/oauth', +}) + +const AccountApp = () => export default AccountApp diff --git a/pkg/webui/console/components/default-routing-policy-form/index.js b/pkg/webui/console/components/default-routing-policy-form/index.js new file mode 100644 index 0000000000..ce33d11b2a --- /dev/null +++ b/pkg/webui/console/components/default-routing-policy-form/index.js @@ -0,0 +1,118 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import React, { useCallback, useState } from 'react' +import { Col, Row } from 'react-grid-system' +import { defineMessages } from 'react-intl' + +import Form, { useFormContext } from '@ttn-lw/components/form' +import Checkbox from '@ttn-lw/components/checkbox' +import Radio from '@ttn-lw/components/radio-button' + +import Message from '@ttn-lw/lib/components/message' + +import sharedMessages from '@ttn-lw/lib/shared-messages' + +import policyMessages from '@console/lib/packet-broker/messages' + +const m = defineMessages({ + doNotUseADefaultPolicy: 'Do not use a default routing policy for this network', +}) + +const useDefaultEncode = val => val === 'default' +const useDefaultDecode = val => (val ? 'default' : 'no-default') + +const DefaultRoutingPolicyForm = () => { + const { values } = useFormContext() + const [useDefault, setUseDefault] = useState(values._use_default_policy || false) + const handlePolicySourceChange = useCallback(setUseDefault, [setUseDefault]) + + return ( + + + + + + + + {useDefault && ( + <> + + + + + + + + + + + + + + + + )} + + ) +} + +export default DefaultRoutingPolicyForm diff --git a/pkg/webui/console/components/gateway-visibility-form/index.js b/pkg/webui/console/components/gateway-visibility-form/index.js index 3a5372f6af..535477a7ac 100644 --- a/pkg/webui/console/components/gateway-visibility-form/index.js +++ b/pkg/webui/console/components/gateway-visibility-form/index.js @@ -74,19 +74,19 @@ const GatewayVisibilityForm = ({ onSubmit, initialValues, error }) => { here to modify the default payload formatter for this application. The payload formatter of this application is currently set to `{defaultFormatter}`', pasteRepositoryFormatter: 'Paste repository formatter', @@ -419,7 +418,7 @@ const PayloadFormattersForm = ({ > {() => ( <> - + val === 'default' const policySourceDecode = val => (val ? 'default' : 'specific') -const useDefaultEncode = val => val === 'default' -const useDefaultDecode = val => (val ? 'default' : 'no-default') -const RoutingPolicyForm = ({ - onSubmit, - initialValues, - error, - defaultPolicy, - networkLevel, - submitMessage, -}) => { +const RoutingPolicyForm = ({ onSubmit, initialValues, error, defaultPolicy, submitMessage }) => { const handleSubmit = useCallback(values => onSubmit(validationSchema.cast(values)), [onSubmit]) const [useDefault, setUseDefault] = useState(initialValues._use_default_policy || false) const handlePolicySourceChange = useCallback(setUseDefault, [setUseDefault]) const hasDefaultPolicy = isValidPolicy(defaultPolicy) - const showDefaultPolicySheet = networkLevel && useDefault && isValidPolicy(defaultPolicy) - const showPolicyCheckboxes = (useDefault && !networkLevel) || (!useDefault && networkLevel) - + const showDefaultPolicySheet = useDefault && isValidPolicy(defaultPolicy) + const showPolicyCheckboxes = !useDefault return (
- {networkLevel ? ( - - - - - ) : ( - - - - - )} + + + + {showDefaultPolicySheet && ( @@ -132,7 +106,7 @@ const RoutingPolicyForm = ({ { /> @@ -98,7 +98,7 @@ const RoutingPolicySheet = ({ policy }) => { /> diff --git a/pkg/webui/console/lib/packet-broker/messages.js b/pkg/webui/console/lib/packet-broker/messages.js index 5ab1c4f8db..764cdb47c6 100644 --- a/pkg/webui/console/lib/packet-broker/messages.js +++ b/pkg/webui/console/lib/packet-broker/messages.js @@ -22,7 +22,6 @@ export default defineMessages({ joinRequestDesc: 'Forward join-request messages', localizationInformation: 'Localization information', localizationInformationDesc: 'Forward gateway location, RSSI, SNR and fine timestamp', - macData: 'MAC data', macDataAllowDesc: 'Allow downlink messages with FPort of 0', macDataDesc: 'Forward uplink messages with FPort 0', signalQualityInformation: 'Signal quality information', diff --git a/pkg/webui/console/store/actions/packet-broker.js b/pkg/webui/console/store/actions/packet-broker.js index fc90e5b136..640826f679 100644 --- a/pkg/webui/console/store/actions/packet-broker.js +++ b/pkg/webui/console/store/actions/packet-broker.js @@ -210,6 +210,21 @@ export const [ }, ] = createRequestActions(DELETE_HOME_NETWORK_ROUTING_POLICY_BASE, (id, policy) => ({ id, policy })) +export const DELETE_ALL_HOME_NETWORK_ROUTING_POLICIES_BASE = + 'DELETE_ALL_HOME_NETWORK_ROUTING_POLICIES' +export const [ + { + request: DELETE_ALL_HOME_NETWORK_ROUTING_POLICIES, + success: DELETE_ALL_HOME_NETWORK_ROUTING_POLICIES_SUCCESS, + failure: DELETE_ALL_HOME_NETWORK_ROUTING_POLICIES_FAILURE, + }, + { + request: deleteAllHomeNetworkRoutingPolicies, + success: deleteAllHomeNetworkRoutingPoliciesSuccess, + failure: deleteAllHomeNetworkRoutingPoliciesFailure, + }, +] = createRequestActions(DELETE_ALL_HOME_NETWORK_ROUTING_POLICIES_BASE, ids => ({ ids })) + export const GET_HOME_NETWORK_ROUTING_POLICIES_BASE = 'GET_HOME_NETWORK_ROUTING_POLICIES' export const [ { diff --git a/pkg/webui/console/store/middleware/logics/packet-broker.js b/pkg/webui/console/store/middleware/logics/packet-broker.js index 1eeed2ac8f..6017934e48 100644 --- a/pkg/webui/console/store/middleware/logics/packet-broker.js +++ b/pkg/webui/console/store/middleware/logics/packet-broker.js @@ -263,7 +263,12 @@ const setPacketBrokerHomeNetworkPolicyLogic = createRequestLogic({ const ids = extractPacketBrokerIdsFromCombinedId(id) await tts.PacketBrokerAgent.setHomeNetworkRoutingPolicy(ids.net_id, ids.tenant_id, policy) - return policy + const newPolicy = { home_network_id: { net_id: ids.net_id }, ...policy } + if ('tenant_id' in ids) { + newPolicy.home_network_id.tenant_id = ids.tenant_id + } + + return newPolicy }, }) @@ -289,6 +294,34 @@ const deletePacketBrokerHomeNetworkPolicyLogic = createRequestLogic({ }, }) +const deleteAllPacketBrokerHomeNetworkPoliciesLogic = createRequestLogic({ + type: packetBroker.DELETE_ALL_HOME_NETWORK_ROUTING_POLICIES, + process: async ({ action }) => { + const { + payload: { ids }, + } = action + + try { + await Promise.all( + ids.map(async id => { + const ids = extractPacketBrokerIdsFromCombinedId(id) + if (typeof ids === 'number') { + return tts.PacketBrokerAgent.deleteHomeNetworkRoutingPolicy(ids) + } + + return tts.PacketBrokerAgent.deleteHomeNetworkRoutingPolicy(ids.net_id, ids.tenant_id) + }), + ) + + return ids + } catch (error) { + if (!isNotFoundError(error)) { + throw error + } + } + }, +}) + const getDefaultGatewayVisibilityLogic = createRequestLogic({ type: packetBroker.GET_HOME_NETWORK_DEFAULT_GATEWAY_VISIBILITY, process: async () => { @@ -346,6 +379,7 @@ export default [ getPacketBrokerHomeNetworkPoliciesLogic, setPacketBrokerHomeNetworkPolicyLogic, deletePacketBrokerHomeNetworkPolicyLogic, + deleteAllPacketBrokerHomeNetworkPoliciesLogic, getDefaultGatewayVisibilityLogic, setDefaultGatewayVisibilityLogic, deleteDefaultGatewayVisibilityLogic, diff --git a/pkg/webui/console/views/admin-packet-broker/admin-packet-broker.js b/pkg/webui/console/views/admin-packet-broker/admin-packet-broker.js index c5d01fc26b..04d50bcd99 100644 --- a/pkg/webui/console/views/admin-packet-broker/admin-packet-broker.js +++ b/pkg/webui/console/views/admin-packet-broker/admin-packet-broker.js @@ -15,48 +15,37 @@ import React, { useCallback, useState } from 'react' import { Container, Col, Row } from 'react-grid-system' import { useSelector, useDispatch } from 'react-redux' -import { Routes, Route } from 'react-router-dom' +import { Routes, Route, Navigate } from 'react-router-dom' import classnames from 'classnames' import PacketBrokerLogo from '@assets/misc/packet-broker.svg' import Link from '@ttn-lw/components/link' import PageTitle from '@ttn-lw/components/page-title' -import Icon from '@ttn-lw/components/icon' import Switch from '@ttn-lw/components/switch' import Tabs from '@ttn-lw/components/tabs' import PortalledModal from '@ttn-lw/components/modal/portalled' -import Notification from '@ttn-lw/components/notification' import ErrorNotification from '@ttn-lw/components/error-notification' +import Notification from '@ttn-lw/components/notification' import Message from '@ttn-lw/lib/components/message' -import RequireRequest from '@ttn-lw/lib/components/require-request' import GenericNotFound from '@ttn-lw/lib/components/full-view-error/not-found' -import SubViewErrorComponent from '@console/views/sub-view-error' - import sharedMessages from '@ttn-lw/lib/shared-messages' import { isNotEnabledError } from '@console/lib/packet-broker/utils' -import { - registerPacketBroker, - deregisterPacketBroker, - getHomeNetworkDefaultRoutingPolicy, - getHomeNetworkDefaultGatewayVisibility, -} from '@console/store/actions/packet-broker' +import { registerPacketBroker, deregisterPacketBroker } from '@console/store/actions/packet-broker' import { selectRegistered, selectRegisterEnabled, selectEnabled, selectListed, - selectInfo, selectInfoError, } from '@console/store/selectors/packet-broker' -import DefaultRoutingPolicyView from './default-routing-policy' -import NetworkRoutingPoliciesView from './network-routing-policies' +import RoutingConfigurationView from './routing-configuration' import DefaultGatewayVisibilityView from './default-gateway-visibility' import m from './messages' @@ -70,7 +59,6 @@ const PacketBroker = () => { const enabled = useSelector(selectEnabled) const [unlistModalVisible, setUnlistModalVisible] = useState(false) const listed = useSelector(selectListed) - const info = useSelector(selectInfo) const infoError = useSelector(selectInfoError) const dispatch = useDispatch() const showError = Boolean(infoError) && !isNotEnabledError(infoError) @@ -81,7 +69,7 @@ const PacketBroker = () => { } else { setDeregisterModalVisible(true) } - }, [dispatch, registered, setDeregisterModalVisible]) + }, [dispatch, registered]) const handleDeregisterModalComplete = useCallback( approved => { @@ -112,22 +100,19 @@ const PacketBroker = () => { ) const tabs = [ - { title: m.defaultRoutingPolicy, link: '/admin-panel/packet-broker', name: 'default' }, + { + title: m.routingConfig, + link: '/admin-panel/packet-broker/routing-configuration', + name: 'default', + exact: false, + }, { title: m.defaultGatewayVisibility, link: '/admin-panel/packet-broker/default-gateway-visibility', name: 'default-gateway-visibility', }, - { - title: sharedMessages.networks, - link: '/admin-panel/packet-broker/networks', - name: 'networks', - exact: false, - }, ] - const boldMessage = { b: msg => {msg} } - return ( @@ -138,29 +123,24 @@ const PacketBroker = () => { Packet Broker
- + - Packet Broker documentation + Packet Broker {' | '} - {' | '} - - -

- + + + + {!enabled && } {showError && } {enabled && ( - + {registerEnabled && ( )} - {registered && ( -
- {info.forwarder_enabled ? ( - - - - - ) : ( - - - - - )} - {info.home_network_enabled ? ( - - - - - ) : ( - - - - - )} -
- )} { {registered && ( <> -