Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

State storage refactor #1839

Merged
merged 18 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c
github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.6
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
github.com/opentracing/opentracing-go v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,8 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.2/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb h1:ax2vG2unlxsjwS7PMRo4FECIfAdQLowd6ejWYwPQhBo=
github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

func LoadFromGoose() {
goose.AddMigration(UpAddForgottenColumn, DownAddForgottenColumn)
goose.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}

func LoadAddForgottenColumn(m *sqlutil.Migrations) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deltas

import (
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)

type stateSnapshotData struct {
StateSnapshotNID types.StateSnapshotNID
RoomNID types.RoomNID
}

type stateBlockData struct {
stateSnapshotData
StateBlockNID types.StateBlockNID
EventNIDs types.EventNIDs
}

func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
}

// nolint:gocyclo
func UpStateBlocksRefactor(tx *sql.Tx) error {
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
defer logrus.Warn("State storage upgrade complete")

var snapshotcount int
var maxsnapshotid int
var maxblockid int
if err := tx.QueryRow(`SELECT COUNT(DISTINCT state_snapshot_nid) FROM roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
}
if err := tx.QueryRow(`SELECT COALESCE(MAX(state_snapshot_nid),0) FROM roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
}
if err := tx.QueryRow(`SELECT COALESCE(MAX(state_block_nid),0) FROM roomserver_state_block;`).Scan(&maxblockid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
}
maxsnapshotid++
maxblockid++

if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
}
if _, err := tx.Exec(`ALTER TABLE roomserver_state_snapshots RENAME TO _roomserver_state_snapshots;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
}
// We create new sequences starting with the maximum state snapshot and block NIDs.
// This means that all newly created snapshots and blocks by the migration will have
// NIDs higher than these values, so that when we come to update the references to
// these NIDs using UPDATE statements, we can guarantee we are only ever updating old
// values and not accidentally overwriting new ones.
if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_block_nid_sequence START WITH %d;`, maxblockid)); err != nil {
return fmt.Errorf("tx.Exec: %w", err)
}
if _, err := tx.Exec(fmt.Sprintf(`CREATE SEQUENCE roomserver_state_snapshot_nid_sequence START WITH %d;`, maxsnapshotid)); err != nil {
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("tx.Exec: %w", err)
}
_, err := tx.Exec(`
CREATE TABLE IF NOT EXISTS roomserver_state_block (
state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_sequence'),
state_block_hash BYTEA UNIQUE,
event_nids bigint[] NOT NULL
);
`)
if err != nil {
return fmt.Errorf("tx.Exec (create blocks table): %w", err)
}
_, err = tx.Exec(`
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_sequence'),
state_snapshot_hash BYTEA UNIQUE,
room_nid bigint NOT NULL,
state_block_nids bigint[] NOT NULL
);
`)
if err != nil {
return fmt.Errorf("tx.Exec (create snapshots table): %w", err)
}
logrus.Warn("New tables created...")

batchsize := 100
for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize {
var snapshotrows *sql.Rows
snapshotrows, err = tx.Query(`
SELECT
state_snapshot_nid,
room_nid,
state_block_nid,
ARRAY_AGG(event_nid) AS event_nids
FROM (
SELECT
_roomserver_state_snapshots.state_snapshot_nid,
_roomserver_state_snapshots.room_nid,
_roomserver_state_block.state_block_nid,
_roomserver_state_block.event_nid
FROM
_roomserver_state_snapshots
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
WHERE
_roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT
_roomserver_state_snapshots.state_snapshot_nid
FROM
_roomserver_state_snapshots
LIMIT $1 OFFSET $2)) AS _roomserver_state_block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use an ORDER BY here to ensure we are getting the same ordering each time we do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it really matters. We end up sorting the NIDs in Go anyway, and since the migration transaction only gets committed if the entire thing succeeds, we don't really need to care about the ordering that the migration runs in.

GROUP BY
state_snapshot_nid,
room_nid,
state_block_nid;
`, batchsize, batchoffset)
if err != nil {
return fmt.Errorf("tx.Query: %w", err)
}

logrus.Warnf("Rewriting snapshots %d-%d of %d...", batchoffset, batchoffset+batchsize, snapshotcount)
var snapshots []stateBlockData

for snapshotrows.Next() {
var snapshot stateBlockData
var eventsarray pq.Int64Array
if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil {
return fmt.Errorf("rows.Scan: %w", err)
}
for _, e := range eventsarray {
snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e))
}
snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)]
snapshots = append(snapshots, snapshot)
}

if err = snapshotrows.Close(); err != nil {
return fmt.Errorf("snapshots.Close: %w", err)
}

newsnapshots := map[stateSnapshotData]types.StateBlockNIDs{}

for _, snapshot := range snapshots {
var eventsarray pq.Int64Array
for _, e := range snapshot.EventNIDs {
eventsarray = append(eventsarray, int64(e))
}

var blocknid types.StateBlockNID
err = tx.QueryRow(`
INSERT INTO roomserver_state_block (state_block_hash, event_nids)
VALUES ($1, $2)
ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2
RETURNING state_block_nid
`, snapshot.EventNIDs.Hash(), eventsarray).Scan(&blocknid)
if err != nil {
return fmt.Errorf("tx.QueryRow.Scan (insert new block with %d events): %w", len(eventsarray), err)
}
index := stateSnapshotData{snapshot.StateSnapshotNID, snapshot.RoomNID}
newsnapshots[index] = append(newsnapshots[index], blocknid)
}

for snapshotdata, newblocks := range newsnapshots {
var newblocksarray pq.Int64Array
for _, b := range newblocks {
newblocksarray = append(newblocksarray, int64(b))
}

var newNID types.StateSnapshotNID
err = tx.QueryRow(`
INSERT INTO roomserver_state_snapshots (state_snapshot_hash, room_nid, state_block_nids)
VALUES ($1, $2, $3)
ON CONFLICT (state_snapshot_hash) DO UPDATE SET room_nid=$2
RETURNING state_snapshot_nid
`, newblocks.Hash(), snapshotdata.RoomNID, newblocksarray).Scan(&newNID)
if err != nil {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
}

if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.Exec (update events): %w", err)
}

if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.Exec (update rooms): %w", err)
}
}
}

if _, err = tx.Exec(`
DROP TABLE _roomserver_state_snapshots;
DROP SEQUENCE roomserver_state_snapshot_nid_seq;
`); err != nil {
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
}
if _, err = tx.Exec(`
DROP TABLE _roomserver_state_block;
DROP SEQUENCE roomserver_state_block_nid_seq;
`); err != nil {
return fmt.Errorf("tx.Exec (delete old block table): %w", err)
}

return nil
}

func DownStateBlocksRefactor(tx *sql.Tx) error {
panic("Downgrading state storage is not supported")
}
12 changes: 7 additions & 5 deletions roomserver/storage/postgres/event_json_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ type eventJSONStatements struct {
bulkSelectEventJSONStmt *sql.Stmt
}

func NewPostgresEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
s := &eventJSONStatements{}
func createEventJSONTable(db *sql.DB) error {
_, err := db.Exec(eventJSONSchema)
if err != nil {
return nil, err
}
return err
}

func prepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
s := &eventJSONStatements{}

return s, shared.StatementList{
{&s.insertEventJSONStmt, insertEventJSONSQL},
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
Expand Down
12 changes: 7 additions & 5 deletions roomserver/storage/postgres/event_state_keys_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ type eventStateKeyStatements struct {
bulkSelectEventStateKeyStmt *sql.Stmt
}

func NewPostgresEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
s := &eventStateKeyStatements{}
func createEventStateKeysTable(db *sql.DB) error {
_, err := db.Exec(eventStateKeysSchema)
if err != nil {
return nil, err
}
return err
}

func prepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
s := &eventStateKeyStatements{}

return s, shared.StatementList{
{&s.insertEventStateKeyNIDStmt, insertEventStateKeyNIDSQL},
{&s.selectEventStateKeyNIDStmt, selectEventStateKeyNIDSQL},
Expand Down
11 changes: 6 additions & 5 deletions roomserver/storage/postgres/event_types_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ type eventTypeStatements struct {
bulkSelectEventTypeNIDStmt *sql.Stmt
}

func NewPostgresEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
s := &eventTypeStatements{}
func createEventTypesTable(db *sql.DB) error {
_, err := db.Exec(eventTypesSchema)
if err != nil {
return nil, err
}
return err
}

func prepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
s := &eventTypeStatements{}

return s, shared.StatementList{
{&s.insertEventTypeNIDStmt, insertEventTypeNIDSQL},
Expand Down
Loading