diff --git a/clientapi/jsonerror/jsonerror.go b/clientapi/jsonerror/jsonerror.go index c42b25bea7..7accde5f53 100644 --- a/clientapi/jsonerror/jsonerror.go +++ b/clientapi/jsonerror/jsonerror.go @@ -131,6 +131,12 @@ func InvalidSignature(msg string) *MatrixError { return &MatrixError{"M_INVALID_SIGNATURE", msg} } +// InvalidParam is an error that is returned when a parameter was invalid, +// traditionally with cross-signing. +func InvalidParam(msg string) *MatrixError { + return &MatrixError{"M_INVALID_PARAM", msg} +} + // MissingParam is an error that is returned when a parameter was incorrect, // traditionally with cross-signing. func MissingParam(msg string) *MatrixError { diff --git a/clientapi/routing/key_crosssigning.go b/clientapi/routing/key_crosssigning.go index 3c103fd729..756598dbcd 100644 --- a/clientapi/routing/key_crosssigning.go +++ b/clientapi/routing/key_crosssigning.go @@ -73,6 +73,11 @@ func UploadCrossSigningDeviceKeys( Code: http.StatusBadRequest, JSON: jsonerror.MissingParam(err.Error()), } + case err.IsInvalidParam: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidParam(err.Error()), + } default: return util.JSONResponse{ Code: http.StatusBadRequest, @@ -110,6 +115,11 @@ func UploadCrossSigningDeviceSignatures(req *http.Request, keyserverAPI api.KeyI Code: http.StatusBadRequest, JSON: jsonerror.MissingParam(err.Error()), } + case err.IsInvalidParam: + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidParam(err.Error()), + } default: return util.JSONResponse{ Code: http.StatusBadRequest, diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index d282306f2a..30ecc29222 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -65,7 +65,7 @@ func Setup( userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg) unstableFeatures := map[string]bool{ - //"org.matrix.e2e_cross_signing": true, + "org.matrix.e2e_cross_signing": true, } for _, msc := range cfg.MSCs.MSCs { unstableFeatures["org.matrix."+msc] = true diff --git a/eduserver/api/input.go b/eduserver/api/input.go index f8599e1cc4..2fa253f4db 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -75,6 +75,12 @@ type InputReceiptEventRequest struct { // InputReceiptEventResponse is a response to InputReceiptEventRequest type InputReceiptEventResponse struct{} +type InputCrossSigningKeyUpdateRequest struct { + CrossSigningKeyUpdate `json:"signing_keys"` +} + +type InputCrossSigningKeyUpdateResponse struct{} + // EDUServerInputAPI is used to write events to the typing server. type EDUServerInputAPI interface { InputTypingEvent( @@ -94,4 +100,10 @@ type EDUServerInputAPI interface { request *InputReceiptEventRequest, response *InputReceiptEventResponse, ) error + + InputCrossSigningKeyUpdate( + ctx context.Context, + request *InputCrossSigningKeyUpdateRequest, + response *InputCrossSigningKeyUpdateResponse, + ) error } diff --git a/eduserver/api/output.go b/eduserver/api/output.go index 650458a290..c6de4e01c7 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -33,14 +33,6 @@ type OutputTypingEvent struct { ExpireTime *time.Time } -// TypingEvent represents a matrix edu event of type 'm.typing'. -type TypingEvent struct { - Type string `json:"type"` - RoomID string `json:"room_id"` - UserID string `json:"user_id"` - Typing bool `json:"typing"` -} - // OutputSendToDeviceEvent is an entry in the send-to-device output kafka log. // This contains the full event content, along with the user ID and device ID // to which it is destined. @@ -50,14 +42,6 @@ type OutputSendToDeviceEvent struct { gomatrixserverlib.SendToDeviceEvent } -type ReceiptEvent struct { - UserID string `json:"user_id"` - RoomID string `json:"room_id"` - EventID string `json:"event_id"` - Type string `json:"type"` - Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` -} - // OutputReceiptEvent is an entry in the receipt output kafka log type OutputReceiptEvent struct { UserID string `json:"user_id"` @@ -67,21 +51,7 @@ type OutputReceiptEvent struct { Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` } -// Helper structs for receipts json creation -type ReceiptMRead struct { - User map[string]ReceiptTS `json:"m.read"` -} - -type ReceiptTS struct { - TS gomatrixserverlib.Timestamp `json:"ts"` -} - -// FederationSender output -type FederationReceiptMRead struct { - User map[string]FederationReceiptData `json:"m.read"` -} - -type FederationReceiptData struct { - Data ReceiptTS `json:"data"` - EventIDs []string `json:"event_ids"` +// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log +type OutputCrossSigningKeyUpdate struct { + CrossSigningKeyUpdate `json:"signing_keys"` } diff --git a/eduserver/api/types.go b/eduserver/api/types.go new file mode 100644 index 0000000000..a207580f9c --- /dev/null +++ b/eduserver/api/types.go @@ -0,0 +1,59 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import "github.com/matrix-org/gomatrixserverlib" + +const ( + MSigningKeyUpdate = "m.signing_key_update" +) + +type TypingEvent struct { + Type string `json:"type"` + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` +} + +type ReceiptEvent struct { + UserID string `json:"user_id"` + RoomID string `json:"room_id"` + EventID string `json:"event_id"` + Type string `json:"type"` + Timestamp gomatrixserverlib.Timestamp `json:"timestamp"` +} + +type FederationReceiptMRead struct { + User map[string]FederationReceiptData `json:"m.read"` +} + +type FederationReceiptData struct { + Data ReceiptTS `json:"data"` + EventIDs []string `json:"event_ids"` +} + +type ReceiptMRead struct { + User map[string]ReceiptTS `json:"m.read"` +} + +type ReceiptTS struct { + TS gomatrixserverlib.Timestamp `json:"ts"` +} + +type CrossSigningKeyUpdate struct { + MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"` + SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"` + UserID string `json:"user_id"` +} diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 7cc4051080..7875e27f12 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -52,6 +52,7 @@ func NewInternalAPI( OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + OutputKeyChangeEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index c54fb9de8c..bdc2437459 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -24,6 +24,7 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" + keyapi "github.com/matrix-org/dendrite/keyserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -39,6 +40,8 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // The kafka topic to output new receipt events to OutputReceiptEventTopic string + // The kafka topic to output new key change events to + OutputKeyChangeEventTopic string // kafka producer Producer sarama.SyncProducer // Internal user query API @@ -77,6 +80,36 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent( return t.sendToDeviceEvent(ise) } +// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate( + ctx context.Context, + request *api.InputCrossSigningKeyUpdateRequest, + response *api.InputCrossSigningKeyUpdateResponse, +) error { + eventJSON, err := json.Marshal(&keyapi.DeviceMessage{ + Type: keyapi.TypeCrossSigningUpdate, + OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{ + CrossSigningKeyUpdate: request.CrossSigningKeyUpdate, + }, + }) + if err != nil { + return err + } + + logrus.WithFields(logrus.Fields{ + "user_id": request.UserID, + }).Infof("Producing to topic '%s'", t.OutputKeyChangeEventTopic) + + m := &sarama.ProducerMessage{ + Topic: string(t.OutputKeyChangeEventTopic), + Key: sarama.StringEncoder(request.UserID), + Value: sarama.ByteEncoder(eventJSON), + } + + _, _, err = t.Producer.SendMessage(m) + return err +} + func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go index 0690ed827a..9a6f483c22 100644 --- a/eduserver/inthttp/client.go +++ b/eduserver/inthttp/client.go @@ -12,9 +12,10 @@ import ( // HTTP paths for the internal HTTP APIs const ( - EDUServerInputTypingEventPath = "/eduserver/input" - EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" - EDUServerInputReceiptEventPath = "/eduserver/receipt" + EDUServerInputTypingEventPath = "/eduserver/input" + EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" + EDUServerInputReceiptEventPath = "/eduserver/receipt" + EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate" ) // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. @@ -68,3 +69,16 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent( apiURL := h.eduServerURL + EDUServerInputReceiptEventPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +// InputCrossSigningKeyUpdate implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate( + ctx context.Context, + request *api.InputCrossSigningKeyUpdateRequest, + response *api.InputCrossSigningKeyUpdateResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate") + defer span.Finish() + + apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go index a349437508..a50ca84f97 100644 --- a/eduserver/inthttp/server.go +++ b/eduserver/inthttp/server.go @@ -51,4 +51,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath, + httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse { + var request api.InputCrossSigningKeyUpdateRequest + var response api.InputCrossSigningKeyUpdateResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 5f214e0fcb..2b28282660 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -502,6 +502,22 @@ func (t *txnReq) processEDUs(ctx context.Context) { } } } + case eduserverAPI.MSigningKeyUpdate: + var updatePayload eduserverAPI.CrossSigningKeyUpdate + if err := json.Unmarshal(e.Content, &updatePayload); err != nil { + util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ + "user_id": updatePayload.UserID, + }).Error("Failed to send signing key update to edu server") + continue + } + inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{ + CrossSigningKeyUpdate: updatePayload, + } + inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{} + if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil { + util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update") + continue + } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 5b5af9c4db..7028846133 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -84,6 +84,14 @@ func (o *testEDUProducer) InputReceiptEvent( return nil } +func (o *testEDUProducer) InputCrossSigningKeyUpdate( + ctx context.Context, + request *eduAPI.InputCrossSigningKeyUpdateRequest, + response *eduAPI.InputCrossSigningKeyUpdateResponse, +) error { + return nil +} + type testRoomserverAPI struct { api.RoomserverInternalAPITrace inputRoomEvents []api.InputRoomEvent diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index 9e146390a0..675328dfc6 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/Shopify/sarama" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal" @@ -28,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -83,6 +85,17 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { log.WithError(err).Errorf("failed to read device message from key change topic") return nil } + switch m.Type { + case api.TypeCrossSigningUpdate: + return t.onCrossSigningMessage(m) + case api.TypeDeviceKeyUpdate: + fallthrough + default: + return t.onDeviceKeyMessage(m) + } +} + +func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error { logger := log.WithField("user_id", m.UserID) // only send key change events which originated from us @@ -133,6 +146,50 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { return t.queues.SendEDU(edu, t.serverName, destinations) } +func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error { + output := m.CrossSigningKeyUpdate + _, host, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure") + return nil + } + if host != gomatrixserverlib.ServerName(t.serverName) { + // Ignore any messages that didn't originate locally, otherwise we'll + // end up parroting information we received from other servers. + return nil + } + logger := log.WithField("user_id", output.UserID) + + var queryRes roomserverAPI.QueryRoomsForUserResponse + err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{ + UserID: output.UserID, + WantMembership: "join", + }, &queryRes) + if err != nil { + logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user") + return nil + } + // send this key change to all servers who share rooms with this user. + destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs) + if err != nil { + logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in") + return nil + } + + // Pack the EDU and marshal it + edu := &gomatrixserverlib.EDU{ + Type: eduserverAPI.MSigningKeyUpdate, + Origin: string(t.serverName), + } + if edu.Content, err = json.Marshal(output); err != nil { + logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping") + return nil + } + + logger.Infof("Sending cross-signing update message to %q", destinations) + return t.queues.SendEDU(edu, t.serverName, destinations) +} + func prevID(streamID int) []int { if streamID <= 1 { return nil diff --git a/go.mod b/go.mod index e5d1575654..a18328544f 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b + github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723 github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 diff --git a/go.sum b/go.sum index d848988da8..7f3be4c05e 100644 --- a/go.sum +++ b/go.sum @@ -994,8 +994,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b h1:8St1B8QmlvMLsOmGqW3++0akUs0250IAi+AGcr5faxw= -github.com/matrix-org/gomatrixserverlib v0.0.0-20210809130922-d9c3f400582b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723 h1:b8cyR4aYv9Lmf1lKgASJ+PFSp/GBv8ZFgb/O42ZXLGA= +github.com/matrix-org/gomatrixserverlib v0.0.0-20210817115641-f9416ac1a723/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU= github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= github.com/matrix-org/pinecone v0.0.0-20210623102758-74f885644c1b h1:5X5vdWQ13xrNkJVqaJHPsrt7rKkMJH5iac0EtfOuxSg= diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 490f0e41cc..40120236fb 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -20,6 +20,7 @@ import ( "strings" "time" + eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -47,6 +48,7 @@ type KeyError struct { Err string `json:"error"` IsInvalidSignature bool `json:"is_invalid_signature,omitempty"` // M_INVALID_SIGNATURE IsMissingParam bool `json:"is_missing_param,omitempty"` // M_MISSING_PARAM + IsInvalidParam bool `json:"is_invalid_param,omitempty"` // M_INVALID_PARAM } func (k *KeyError) Error() string { @@ -62,8 +64,9 @@ const ( // DeviceMessage represents the message produced into Kafka by the key server. type DeviceMessage struct { - Type DeviceMessageType `json:"Type,omitempty"` - *DeviceKeys `json:"DeviceKeys,omitempty"` + Type DeviceMessageType `json:"Type,omitempty"` + *DeviceKeys `json:"DeviceKeys,omitempty"` + *eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"` // A monotonically increasing number which represents device changes for this user. StreamID int } diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go new file mode 100644 index 0000000000..f9973ec9fd --- /dev/null +++ b/keyserver/consumers/cross_signing.go @@ -0,0 +1,112 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/storage" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + + "github.com/Shopify/sarama" +) + +type OutputCrossSigningKeyUpdateConsumer struct { + eduServerConsumer *internal.ContinualConsumer + keyDB storage.Database + keyAPI api.KeyInternalAPI + serverName string +} + +func NewOutputCrossSigningKeyUpdateConsumer( + process *process.ProcessContext, + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + keyDB storage.Database, + keyAPI api.KeyInternalAPI, +) *OutputCrossSigningKeyUpdateConsumer { + // The keyserver both produces and consumes on the TopicOutputKeyChangeEvent + // topic. We will only produce events where the UserID matches our server name, + // and we will only consume events where the UserID does NOT match our server + // name (because the update came from a remote server). + consumer := internal.ContinualConsumer{ + Process: process, + ComponentName: "keyserver/keyserver", + Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputKeyChangeEvent), + Consumer: kafkaConsumer, + PartitionStore: keyDB, + } + s := &OutputCrossSigningKeyUpdateConsumer{ + eduServerConsumer: &consumer, + keyDB: keyDB, + keyAPI: keyAPI, + serverName: string(cfg.Global.ServerName), + } + consumer.ProcessMessage = s.onMessage + + return s +} + +func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { + return s.eduServerConsumer.Start() +} + +// onMessage is called in response to a message received on the +// key change events topic from the key server. +func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { + var m api.DeviceMessage + if err := json.Unmarshal(msg.Value, &m); err != nil { + logrus.WithError(err).Errorf("failed to read device message from key change topic") + return nil + } + switch m.Type { + case api.TypeCrossSigningUpdate: + return t.onCrossSigningMessage(m) + default: + return nil + } +} + +func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error { + output := m.CrossSigningKeyUpdate + _, host, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") + return nil + } + if host == gomatrixserverlib.ServerName(s.serverName) { + // Ignore any messages that contain information about our own users, as + // they already originated from this server. + return nil + } + uploadReq := &api.PerformUploadDeviceKeysRequest{ + UserID: output.UserID, + } + if output.MasterKey != nil { + uploadReq.MasterKey = *output.MasterKey + } + if output.SelfSigningKey != nil { + uploadReq.SelfSigningKey = *output.SelfSigningKey + } + uploadRes := &api.PerformUploadDeviceKeysResponse{} + s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) + return uploadRes.Error +} diff --git a/keyserver/consumers/eduserver.go b/keyserver/consumers/eduserver.go deleted file mode 100644 index d764950bcf..0000000000 --- a/keyserver/consumers/eduserver.go +++ /dev/null @@ -1,61 +0,0 @@ -package consumers - -import ( - "fmt" - - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/keyserver/api" - "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/process" - - "github.com/Shopify/sarama" -) - -type OutputSigningKeyUpdateConsumer struct { - eduServerConsumer *internal.ContinualConsumer - keyDB storage.Database - keyAPI api.KeyInternalAPI - serverName string -} - -func NewOutputSigningKeyUpdateConsumer( - process *process.ProcessContext, - cfg *config.Dendrite, - kafkaConsumer sarama.Consumer, - keyDB storage.Database, - keyAPI api.KeyInternalAPI, -) *OutputSigningKeyUpdateConsumer { - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "keyserver/eduserver", - Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate), - Consumer: kafkaConsumer, - PartitionStore: keyDB, - } - s := &OutputSigningKeyUpdateConsumer{ - eduServerConsumer: &consumer, - keyDB: keyDB, - keyAPI: keyAPI, - serverName: string(cfg.Global.ServerName), - } - consumer.ProcessMessage = s.onMessage - - return s -} - -func (s *OutputSigningKeyUpdateConsumer) Start() error { - return s.eduServerConsumer.Start() -} - -func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error { - /* - var output eduapi.OutputSigningKeyUpdate - if err := json.Unmarshal(msg.Value, &output); err != nil { - log.WithError(err).Errorf("eduserver output log: message parse failure") - return nil - } - return nil - */ - return fmt.Errorf("TODO") -} diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 4009dd459a..1e1871b8ba 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -19,14 +19,15 @@ import ( "context" "crypto/ed25519" "database/sql" - "encoding/json" "fmt" "strings" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" + "golang.org/x/crypto/curve25519" ) func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpose gomatrixserverlib.CrossSigningKeyPurpose) error { @@ -45,6 +46,41 @@ func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpos if tokens[1] != b64 { return fmt.Errorf("key ID isn't correct") } + switch tokens[0] { + case "ed25519": + if len(keyData) != ed25519.PublicKeySize { + return fmt.Errorf("ed25519 key is not the correct length") + } + case "curve25519": + if len(keyData) != curve25519.PointSize { + return fmt.Errorf("curve25519 key is not the correct length") + } + default: + // We can't enforce the key length to be correct for an + // algorithm that we don't recognise, so instead we'll + // just make sure that it isn't incredibly excessive. + if l := len(keyData); l > 4096 { + return fmt.Errorf("unknown key type is too long (%d bytes)", l) + } + } + } + + // Check to see if the signatures make sense + for _, forOriginUser := range key.Signatures { + for originKeyID, originSignature := range forOriginUser { + switch strings.SplitN(string(originKeyID), ":", 1)[0] { + case "ed25519": + if len(originSignature) != ed25519.SignatureSize { + return fmt.Errorf("ed25519 signature is not the correct length") + } + case "curve25519": + return fmt.Errorf("curve25519 signatures are impossible") + default: + if l := len(originSignature); l > 4096 { + return fmt.Errorf("unknown signature type is too long (%d bytes)", l) + } + } + } } // Does the key claim to be from the right user? @@ -69,132 +105,91 @@ func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpos // nolint:gocyclo func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) { - var masterKey gomatrixserverlib.Base64Bytes + // Find the keys to store. + byPurpose := map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey{} + toStore := types.CrossSigningKeyMap{} hasMasterKey := false if len(req.MasterKey.Keys) > 0 { if err := sanityCheckKey(req.MasterKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil { res.Error = &api.KeyError{ - Err: "Master key sanity check failed: " + err.Error(), + Err: "Master key sanity check failed: " + err.Error(), + IsInvalidParam: true, } return } - for _, keyData := range req.MasterKey.Keys { // iterates once, because sanityCheckKey requires one key - hasMasterKey = true - masterKey = keyData + + byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey + for _, key := range req.MasterKey.Keys { // iterates once, see sanityCheckKey + toStore[gomatrixserverlib.CrossSigningKeyPurposeMaster] = key } + hasMasterKey = true } if len(req.SelfSigningKey.Keys) > 0 { if err := sanityCheckKey(req.SelfSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err != nil { res.Error = &api.KeyError{ - Err: "Self-signing key sanity check failed: " + err.Error(), + Err: "Self-signing key sanity check failed: " + err.Error(), + IsInvalidParam: true, } return } - } - if len(req.UserSigningKey.Keys) > 0 { - if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil { - res.Error = &api.KeyError{ - Err: "User-signing key sanity check failed: " + err.Error(), - } - return + byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey + for _, key := range req.SelfSigningKey.Keys { // iterates once, see sanityCheckKey + toStore[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = key } } - // If the user hasn't given a new master key, then let's go and get their - // existing keys from the database. - if !hasMasterKey { - existingKeys, err := a.DB.CrossSigningKeysDataForUser(ctx, req.UserID) - if err != nil { + if len(req.UserSigningKey.Keys) > 0 { + if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil { res.Error = &api.KeyError{ - Err: "Retrieving cross-signing keys from database failed: " + err.Error(), + Err: "User-signing key sanity check failed: " + err.Error(), + IsInvalidParam: true, } return } - masterKey, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster] - } - - // If we still don't have a master key at this point then there's nothing else - // we can do - we've checked both the request and the database. - if !hasMasterKey { - res.Error = &api.KeyError{ - Err: "No master key was found either in the database or in the request!", - IsMissingParam: true, + byPurpose[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey + for _, key := range req.UserSigningKey.Keys { // iterates once, see sanityCheckKey + toStore[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = key } - return - } - - // The key ID is basically the key itself. - masterKeyID := gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", masterKey.Encode())) - - // Work out which things we need to verify the signatures for. - toVerify := make(map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, 3) - toStore := types.CrossSigningKeyMap{} - if len(req.MasterKey.Keys) > 0 { - toVerify[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey - } - if len(req.SelfSigningKey.Keys) > 0 { - toVerify[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey - } - if len(req.UserSigningKey.Keys) > 0 { - toVerify[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey } - if len(toVerify) == 0 { + // If there's nothing to do then stop here. + if len(toStore) == 0 { res.Error = &api.KeyError{ - Err: "No supplied keys available for verification", + Err: "No keys were supplied in the request", IsMissingParam: true, } return } - for purpose, key := range toVerify { - // Collect together the key IDs we need to verify with. This will include - // all of the key IDs specified in the signatures. - keyJSON, err := json.Marshal(key) + // We can't have a self-signing or user-signing key without a master + // key, so make sure we have one of those. + if !hasMasterKey { + existingKeys, err := a.DB.CrossSigningKeysDataForUser(ctx, req.UserID) if err != nil { res.Error = &api.KeyError{ - Err: fmt.Sprintf("The JSON of the key section is invalid: %s", err.Error()), + Err: "Retrieving cross-signing keys from database failed: " + err.Error(), } return } - switch purpose { - case gomatrixserverlib.CrossSigningKeyPurposeMaster: - // The master key might have a signature attached to it from the - // previous key, or from a device key, but there's no real need - // to verify it. Clients will perform key checks when the master - // key changes. - - default: - // Sub-keys should be signed by the master key. - if err := gomatrixserverlib.VerifyJSON(req.UserID, masterKeyID, ed25519.PublicKey(masterKey), keyJSON); err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("The %q sub-key failed master key signature verification: %s", purpose, err.Error()), - IsInvalidSignature: true, - } - return - } - } - - // If we've reached this point then all the signatures are valid so - // add the key to the list of keys to store. - for _, keyData := range key.Keys { // iterates once, see sanityCheckKey - toStore[purpose] = keyData - } + _, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster] } - if len(toStore) == 0 { + // If we still can't find a master key for the user then stop the upload. + // This satisfies the "Fails to upload self-signing key without master key" test. + if !hasMasterKey { res.Error = &api.KeyError{ - Err: "No supplied keys passed verification", + Err: "No master key was found", IsMissingParam: true, } return } + // Store the keys. if err := a.DB.StoreCrossSigningKeysForUser(ctx, req.UserID, toStore); err != nil { res.Error = &api.KeyError{ Err: fmt.Sprintf("a.DB.StoreCrossSigningKeysForUser: %s", err), @@ -203,7 +198,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P } // Now upload any signatures that were included with the keys. - for _, key := range toVerify { + for _, key := range byPurpose { var targetKeyID gomatrixserverlib.KeyID for targetKey := range key.Keys { // iterates once, see sanityCheckKey targetKeyID = targetKey @@ -222,6 +217,28 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P } } } + + // Finally, generate a notification that we updated the keys. + if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer { + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: req.UserID, + } + if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { + update.MasterKey = &mk + } + if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { + update.SelfSigningKey = &ssk + } + if update.MasterKey == nil && update.SelfSigningKey == nil { + return + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), + } + return + } + } } func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) { @@ -277,7 +294,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req } } - if err := a.processSelfSignatures(ctx, req.UserID, queryRes, selfSignatures); err != nil { + if err := a.processSelfSignatures(ctx, selfSignatures); err != nil { res.Error = &api.KeyError{ Err: fmt.Sprintf("a.processSelfSignatures: %s", err), } @@ -290,10 +307,25 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req } return } + + // Finally, generate a notification that we updated the signatures. + for userID := range req.Signatures { + if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer { + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: userID, + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), + } + return + } + } + } } func (a *KeyInternalAPI) processSelfSignatures( - ctx context.Context, _ string, queryRes *api.QueryKeysResponse, + ctx context.Context, signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice, ) error { // Here we will process: @@ -304,37 +336,8 @@ func (a *KeyInternalAPI) processSelfSignatures( for targetKeyID, signature := range forTargetUserID { switch sig := signature.CrossSigningBody.(type) { case *gomatrixserverlib.CrossSigningKey: - // The user is signing their master key with one of their devices - // The QueryKeys response should contain the device key hopefully. - // First we need to marshal the blob back into JSON so we can verify - // it. - j, err := json.Marshal(sig) - if err != nil { - return fmt.Errorf("json.Marshal: %w", err) - } - for originUserID, forOriginUserID := range sig.Signatures { - originDeviceKeys, ok := queryRes.DeviceKeys[originUserID] - if !ok { - return fmt.Errorf("missing device keys for user %q", originUserID) - } - for originKeyID, originSig := range forOriginUserID { - var originKey gomatrixserverlib.DeviceKeys - if err := json.Unmarshal(originDeviceKeys[string(originKeyID)], &originKey); err != nil { - return fmt.Errorf("json.Unmarshal: %w", err) - } - - originSigningKey, ok := originKey.Keys[originKeyID] - if !ok { - return fmt.Errorf("missing origin signing key %q", originKeyID) - } - originSigningKeyPublic := ed25519.PublicKey(originSigningKey) - - if err := gomatrixserverlib.VerifyJSON(originUserID, originKeyID, originSigningKeyPublic, j); err != nil { - return fmt.Errorf("gomatrixserverlib.VerifyJSON: %w", err) - } - if err := a.DB.StoreCrossSigningSigsForTarget( ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig, ); err != nil { @@ -344,35 +347,8 @@ func (a *KeyInternalAPI) processSelfSignatures( } case *gomatrixserverlib.DeviceKeys: - // The user is signing one of their devices with their self-signing key - // The QueryKeys response should contain the master key hopefully. - // First we need to marshal the blob back into JSON so we can verify - // it. - j, err := json.Marshal(sig) - if err != nil { - return fmt.Errorf("json.Marshal: %w", err) - } - for originUserID, forOriginUserID := range sig.Signatures { for originKeyID, originSig := range forOriginUserID { - originSelfSigningKeys, ok := queryRes.SelfSigningKeys[originUserID] - if !ok { - return fmt.Errorf("missing self-signing key for user %q", originUserID) - } - - var originSelfSigningKeyID gomatrixserverlib.KeyID - var originSelfSigningKey gomatrixserverlib.Base64Bytes - for keyID, key := range originSelfSigningKeys.Keys { - originSelfSigningKeyID, originSelfSigningKey = keyID, key - break - } - - originSelfSigningKeyPublic := ed25519.PublicKey(originSelfSigningKey) - - if err := gomatrixserverlib.VerifyJSON(originUserID, originSelfSigningKeyID, originSelfSigningKeyPublic, j); err != nil { - return fmt.Errorf("gomatrixserverlib.VerifyJSON: %w", err) - } - if err := a.DB.StoreCrossSigningSigsForTarget( ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig, ); err != nil { diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index fcfe24de89..603067552b 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -65,7 +65,7 @@ func NewInternalAPI( } }() - keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer( + keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( base.ProcessContext, base.Cfg, consumer, db, ap, ) if err := keyconsumer.Start(); err != nil { diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index 0fe21d8b16..782675c2a0 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -19,6 +19,7 @@ import ( "encoding/json" "github.com/Shopify/sarama" + eduapi "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/sirupsen/logrus" @@ -73,3 +74,36 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error { } return nil } + +func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error { + var m sarama.ProducerMessage + output := &api.DeviceMessage{ + Type: api.TypeCrossSigningUpdate, + OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{ + CrossSigningKeyUpdate: key, + }, + } + + value, err := json.Marshal(output) + if err != nil { + return err + } + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(key.UserID) + m.Value = sarama.ByteEncoder(value) + + partition, offset, err := p.Producer.SendMessage(&m) + if err != nil { + return err + } + err = p.DB.StoreKeyChange(context.Background(), partition, offset, key.UserID) + if err != nil { + return err + } + + logrus.WithFields(logrus.Fields{ + "user_id": key.UserID, + }).Infof("Produced to cross-signing update topic '%s'", p.Topic) + return nil +} diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go index 15b3ad7134..3619142871 100644 --- a/setup/config/config_kafka.go +++ b/setup/config/config_kafka.go @@ -10,7 +10,6 @@ const ( TopicOutputRoomEvent = "OutputRoomEvent" TopicOutputClientData = "OutputClientData" TopicOutputReceiptEvent = "OutputReceiptEvent" - TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate" ) type Kafka struct { diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 0d2ecd449f..05fcf37d9d 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -104,13 +105,50 @@ func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage) func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { defer s.updateOffset(msg) - var output api.DeviceMessage - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server") + var m api.DeviceMessage + if err := json.Unmarshal(msg.Value, &m); err != nil { + logrus.WithError(err).Errorf("failed to read device message from key change topic") + return nil + } + switch m.Type { + case api.TypeCrossSigningUpdate: + return s.onCrossSigningMessage(m, msg.Offset, msg.Partition) + case api.TypeDeviceKeyUpdate: + fallthrough + default: + return s.onDeviceKeyMessage(m, msg.Offset, msg.Partition) + } +} + +func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error { + output := m.DeviceKeys + // work out who we need to notify about the new key + var queryRes roomserverAPI.QuerySharedUsersResponse + err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ + UserID: output.UserID, + }, &queryRes) + if err != nil { + log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") sentry.CaptureException(err) return err } + // make sure we get our own key updates too! + queryRes.UserIDsToCount[output.UserID] = 1 + posUpdate := types.LogPosition{ + Offset: offset, + Partition: partition, + } + + s.stream.Advance(posUpdate) + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID) + } + + return nil +} + +func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64, partition int32) error { + output := m.CrossSigningKeyUpdate // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ @@ -124,8 +162,8 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er // make sure we get our own key updates too! queryRes.UserIDsToCount[output.UserID] = 1 posUpdate := types.LogPosition{ - Offset: msg.Offset, - Partition: msg.Partition, + Offset: offset, + Partition: partition, } s.stream.Advance(posUpdate) diff --git a/sytest-whitelist b/sytest-whitelist index d2f2a1c7d2..9f3eb893ac 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -554,3 +554,5 @@ Can upload self-signing keys Fails to upload self-signing keys with no auth Fails to upload self-signing key without master key can fetch self-signing keys over federation +Changing master key notifies local users +Changing user-signing key notifies local users