Skip to content

Commit

Permalink
Add queries and sprocs
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Aug 21, 2024
1 parent 93a8215 commit b342aa5
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 55 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ dev/generate
Create a new migration by running:

```sh
dev/gen-migration
dev/gen-migration {migration-name}
```

Fill in the migrations in the generated files. If you are unfamiliar with migrations, you may follow [this guide](https://github.com/golang-migrate/migrate/blob/master/MIGRATIONS.md). The database is PostgreSQL and the driver is PGX.
Expand All @@ -106,5 +106,5 @@ Fill in the migrations in the generated files. If you are unfamiliar with migrat
We use [sqlc](https://docs.sqlc.dev/en/latest/index.html) to generate the code for our DB queries. Modify the `queries.sql` file, and then run:

```sh
sqlc generate
dev/generate
```
1 change: 1 addition & 0 deletions dev/generate
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go generate ./...
rm -f pkg/mocks/*
./dev/abigen
mockery
sqlc generate

rm -rf pkg/proto/**/*.pb.go pkg/proto/**/*.pb.gw.go pkg/proto/**/*.swagger.json
if ! buf generate https://github.com/xmtp/proto.git#subdir=proto; then
Expand Down
44 changes: 35 additions & 9 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
-- name: InsertStagedOriginatorEnvelope :one
INSERT INTO staged_originator_envelopes(payer_envelope)
VALUES (@payer_envelope)
RETURNING
*;

-- name: InsertNodeInfo :one
-- name: InsertNodeInfo :execrows
INSERT INTO node_info(node_id, public_key)
VALUES (@node_id, @public_key)
RETURNING *;
ON CONFLICT
DO NOTHING;

-- name: SelectNodeInfo :one
SELECT * FROM node_info WHERE singleton_id = 1;
SELECT
*
FROM
node_info
WHERE
singleton_id = 1;

-- name: InsertGatewayEnvelope :execrows
SELECT
insert_gateway_envelope(@originator_id, @sequence_id, @topic, @originator_envelope);

-- name: InsertStagedOriginatorEnvelope :one
SELECT
*
FROM
insert_staged_originator_envelope(@payer_envelope);

-- name: SelectStagedOriginatorEnvelopes :many
SELECT
*
FROM
staged_originator_envelopes
WHERE
id > @last_seen_id
ORDER BY
id ASC
LIMIT @num_rows;

-- name: DeleteStagedOriginatorEnvelope :execrows
DELETE FROM staged_originator_envelopes
WHERE id = @id;

3 changes: 2 additions & 1 deletion pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 96 additions & 11 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 39 additions & 4 deletions pkg/migrations/00001_init-schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
CREATE TABLE node_info(
node_id INTEGER NOT NULL,
public_key BYTEA NOT NULL,

singleton_id SMALLINT PRIMARY KEY DEFAULT 1,
CONSTRAINT is_singleton CHECK (singleton_id = 1)
);
Expand All @@ -12,15 +11,35 @@ CREATE TABLE node_info(
CREATE TABLE gateway_envelopes(
-- used to construct gateway_sid
id BIGSERIAL PRIMARY KEY,
originator_sid BIGINT NOT NULL,
originator_id INT NOT NULL,
sequence_id BIGINT NOT NULL,
topic BYTEA NOT NULL,
originator_envelope BYTEA NOT NULL
);

-- Client queries
CREATE INDEX idx_gateway_envelopes_topic ON gateway_envelopes(topic);

-- Node queries
CREATE UNIQUE INDEX idx_gateway_envelopes_originator_sid ON gateway_envelopes(originator_sid);
CREATE UNIQUE INDEX idx_gateway_envelopes_originator_sid ON gateway_envelopes(originator_id, sequence_id);

CREATE FUNCTION insert_gateway_envelope(originator_id INT, sequence_id BIGINT, topic BYTEA, originator_envelope BYTEA)
RETURNS SETOF gateway_envelopes
AS $$
BEGIN
-- Ensures that the generated sequence ID matches the insertion order
-- Only released at the end of the enclosing transaction - beware if called within a long transaction
PERFORM
pg_advisory_xact_lock(hashtext('gateway_envelopes_sequence'));
RETURN QUERY INSERT INTO gateway_envelopes(originator_id, sequence_id, topic, originator_envelope)
VALUES(originator_id, sequence_id, topic, originator_envelope)
ON CONFLICT
DO NOTHING
RETURNING
*;
END;
$$
LANGUAGE plpgsql;

-- Process for originating envelopes:
-- 1. Perform any necessary validation
Expand All @@ -38,6 +57,22 @@ CREATE TABLE staged_originator_envelopes(
payer_envelope BYTEA NOT NULL
);

CREATE FUNCTION insert_staged_originator_envelope(payer_envelope BYTEA)
RETURNS SETOF staged_originator_envelopes
AS $$
BEGIN
PERFORM
pg_advisory_xact_lock(hashtext('staged_originator_envelopes_sequence'));
RETURN QUERY INSERT INTO staged_originator_envelopes(payer_envelope)
VALUES(payer_envelope)
ON CONFLICT
DO NOTHING
RETURNING
*;
END;
$$
LANGUAGE plpgsql;

-- A cached view for looking up the inbox_id that an address belongs to.
-- Relies on a total ordering of updates across all inbox_ids, from which this
-- view can be deterministically generated.
Expand All @@ -46,6 +81,6 @@ CREATE TABLE address_log(
inbox_id BYTEA NOT NULL,
association_sequence_id BIGINT,
revocation_sequence_id BIGINT,

PRIMARY KEY (address, inbox_id)
);

8 changes: 6 additions & 2 deletions pkg/registrant/registrant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewRegistrant(
}

func (r *Registrant) sid(localID int64) (uint64, error) {
if !utils.IsValidLocalID(localID) {
if !utils.IsValidSequenceID(localID) {
return 0, fmt.Errorf("Invalid local ID %d, likely due to ID exhaustion", localID)
}
return utils.SID(r.record.NodeID, localID), nil
Expand Down Expand Up @@ -121,14 +121,18 @@ func getRegistryRecord(
// - Running multiple nodes with different private keys against the same DB
// - Changing a server's configuration while pointing to data in an existing DB
func ensureDatabaseMatches(ctx context.Context, db *queries.Queries, record *registry.Node) error {
_, err := db.InsertNodeInfo(
numRows, err := db.InsertNodeInfo(
ctx,
queries.InsertNodeInfoParams{
NodeID: int32(record.NodeID),
PublicKey: crypto.FromECDSAPub(record.SigningKey),
},
)
if err != nil {
return fmt.Errorf("unable to insert node info into database: %v", err)
}

if numRows == 0 {
nodeInfo, err := db.SelectNodeInfo(ctx)
if err != nil {
return fmt.Errorf("unable to retrieve node info from database: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/utils/sid.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package utils

// SIDS are 64-bit numbers consisting of 16 bits for the node ID
// followed by 48 bits for the sequence ID (local ID). This file
// followed by 48 bits for the sequence ID. This file
// contains methods for reading and constructing sids.
//
// We also leverage type-checking throughout the repo to avoid confusion:
// - SIDs are uint64
// - node IDs are uint16
// - local IDs are int64
// - sequence IDs are int64

const (
// Number of bits used for node ID
Expand All @@ -20,15 +20,15 @@ const (
localIDMask uint64 = ^nodeIDMask
)

func IsValidLocalID(localID int64) bool {
func IsValidSequenceID(localID int64) bool {
return localID > 0 && localID>>localIDBits == 0
}

func NodeID(sid uint64) uint16 {
return uint16(sid >> localIDBits)
}

func LocalID(sid uint64) int64 {
func SequenceID(sid uint64) int64 {
return int64(sid & localIDMask)
}

Expand Down
Loading

0 comments on commit b342aa5

Please sign in to comment.