Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

Change how servers are selected for missing auth/prev events #1892

Merged
merged 3 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dendrite-polylith-multi/personalities/federationapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func FederationAPI(base *setup.BaseDendrite, cfg *config.Dendrite) {
base.PublicFederationAPIMux, base.PublicKeyAPIMux,
&base.Cfg.FederationAPI, userAPI, federation, keyRing,
rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
&base.Cfg.MSCs,
&base.Cfg.MSCs, nil,
)

base.SetupAndServeHTTP(
Expand Down
11 changes: 11 additions & 0 deletions federationapi/api/servers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package api

import (
"context"

"github.com/matrix-org/gomatrixserverlib"
)

type ServersInRoomProvider interface {
GetServersForRoom(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName
}
3 changes: 3 additions & 0 deletions federationapi/federationapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package federationapi
import (
"github.com/gorilla/mux"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
Expand All @@ -39,10 +40,12 @@ func AddPublicRoutes(
eduAPI eduserverAPI.EDUServerInputAPI,
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) {
routing.Setup(
fedRouter, keyRouter, cfg, rsAPI,
eduAPI, federationSenderAPI, keyRing,
federation, userAPI, keyAPI, mscCfg,
servers,
)
}
2 changes: 1 addition & 1 deletion federationapi/federationapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationSenderHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs)
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))
Expand Down
4 changes: 3 additions & 1 deletion federationapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil"
Expand Down Expand Up @@ -50,6 +51,7 @@ func Setup(
userAPI userapi.UserInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) {
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
Expand Down Expand Up @@ -99,7 +101,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu,
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
Expand Down
55 changes: 30 additions & 25 deletions federationapi/routing/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
Expand Down Expand Up @@ -101,6 +102,7 @@ func Send(
keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient,
mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
) util.JSONResponse {
t := txnReq{
rsAPI: rsAPI,
Expand All @@ -109,6 +111,7 @@ func Send(
federation: federation,
hadEvents: make(map[string]bool),
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
servers: servers,
keyAPI: keyAPI,
roomsMu: mu,
}
Expand Down Expand Up @@ -160,14 +163,14 @@ func Send(

type txnReq struct {
gomatrixserverlib.Transaction
rsAPI api.RoomserverInternalAPI
eduAPI eduserverAPI.EDUServerInputAPI
keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
servers []gomatrixserverlib.ServerName
serversMutex sync.RWMutex
roomsMu *internal.MutexByRoom
rsAPI api.RoomserverInternalAPI
eduAPI eduserverAPI.EDUServerInputAPI
keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
roomsMu *internal.MutexByRoom
// something that can tell us about which servers are in a room right now
servers federationAPI.ServersInRoomProvider
// a list of events from the auth and prev events which we already had
hadEvents map[string]bool
// local cache of events for auth checks, etc - this may include events
Expand Down Expand Up @@ -466,22 +469,24 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
}

func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
t.serversMutex.Lock()
defer t.serversMutex.Unlock()
if t.servers != nil {
return t.servers
}
t.servers = []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID,
func (t *txnReq) getServers(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName {
// The server that sent us the event should be sufficient to tell us about missing
// prev and auth events.
servers := []gomatrixserverlib.ServerName{t.Origin}
// If the event origin is different to the transaction origin then we can use
// this as a last resort. The origin server that created the event would have
// had to know the auth and prev events.
if event != nil {
if origin := event.Origin(); origin != t.Origin {
servers = append(servers, origin)
}
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
t.servers = append(t.servers, serverRes.ServerNames...)
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
// If a specific room-to-server provider exists then use that. This will primarily
// be used for the P2P demos.
if t.servers != nil {
servers = append(servers, t.servers.GetServersForRoom(ctx, roomID, event)...)
}
return t.servers
return servers
}

func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
Expand Down Expand Up @@ -566,7 +571,7 @@ func (t *txnReq) retrieveMissingAuthEvents(
withNextEvent:
for missingAuthEventID := range missingAuthEvents {
withNextServer:
for _, server := range t.getServers(ctx, e.RoomID()) {
for _, server := range t.getServers(ctx, e.RoomID(), e) {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil {
Expand Down Expand Up @@ -948,7 +953,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
}

var missingResp *gomatrixserverlib.RespMissingEvents
servers := t.getServers(ctx, e.RoomID())
servers := t.getServers(ctx, e.RoomID(), e)
for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
Expand Down Expand Up @@ -1220,7 +1225,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
}
var event *gomatrixserverlib.Event
found := false
servers := t.getServers(ctx, roomID)
servers := t.getServers(ctx, roomID, nil)
for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion setup/monolith.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
federationapi.AddPublicRoutes(
ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
Expand Down