diff --git a/internal/datastore/common/sql.go b/internal/datastore/common/sql.go index 36be7b56de..11e74c4eef 100644 --- a/internal/datastore/common/sql.go +++ b/internal/datastore/common/sql.go @@ -18,6 +18,9 @@ import ( ) var ( + // CaveatNameKey is a tracing attribute representing a caveat name + CaveatNameKey = attribute.Key("authzed.com/spicedb/sql/caveatName") + // ObjNamespaceNameKey is a tracing attribute representing the resource // object type. ObjNamespaceNameKey = attribute.Key("authzed.com/spicedb/sql/objNamespaceName") diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 6546ec0f8c..804ef67a0b 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -213,7 +213,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade } querySplitter := common.TupleQuerySplitter{ - Executor: pgxcommon.NewPGXExecutor(createTxFunc), + Executor: pgxcommon.NewPGXExecutor(createTxFunc, false), UsersetBatchSize: cds.usersetBatchSize, } @@ -234,7 +234,7 @@ func (cds *crdbDatastore) ReadWriteTx( } querySplitter := common.TupleQuerySplitter{ - Executor: pgxcommon.NewPGXExecutor(longLivedTx), + Executor: pgxcommon.NewPGXExecutor(longLivedTx, false), UsersetBatchSize: cds.usersetBatchSize, } diff --git a/internal/datastore/memdb/caveat.go b/internal/datastore/memdb/caveat.go index 504792c462..dd0842222b 100644 --- a/internal/datastore/memdb/caveat.go +++ b/internal/datastore/memdb/caveat.go @@ -1,8 +1,10 @@ package memdb import ( + "context" "fmt" + "github.com/authzed/spicedb/internal/util" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -23,7 +25,7 @@ func (c *caveat) Unwrap() *core.Caveat { } } -func (r *memdbReader) ReadCaveatByName(name string) (*core.Caveat, error) { +func (r *memdbReader) ReadCaveatByName(_ context.Context, name string) (*core.Caveat, error) { if !r.enableCaveats { return nil, fmt.Errorf("caveats are not enabled") } @@ -68,7 +70,11 @@ func (rwt *memdbReadWriteTx) WriteCaveats(caveats []*core.Caveat) error { } func (rwt *memdbReadWriteTx) writeCaveat(tx *memdb.Txn, caveats []*core.Caveat) error { + caveatNames := util.NewSet[string]() for _, coreCaveat := range caveats { + if !caveatNames.Add(coreCaveat.Name) { + return fmt.Errorf("duplicate caveat %s", coreCaveat.Name) + } c := caveat{ name: coreCaveat.Name, expression: coreCaveat.Expression, @@ -87,5 +93,14 @@ func (rwt *memdbReadWriteTx) DeleteCaveats(caveats []*core.Caveat) error { if err != nil { return err } - return tx.Delete(tableCaveats, caveats) + for _, coreCaveat := range caveats { + c := caveat{ + name: coreCaveat.Name, + expression: coreCaveat.Expression, + } + if err := tx.Delete(tableCaveats, c); err != nil { + return err + } + } + return nil } diff --git a/internal/datastore/postgres/caveat.go b/internal/datastore/postgres/caveat.go new file mode 100644 index 0000000000..c3281dadd9 --- /dev/null +++ b/internal/datastore/postgres/caveat.go @@ -0,0 +1,142 @@ +package postgres + +import ( + "context" + "errors" + "fmt" + + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/pkg/datastore" + core "github.com/authzed/spicedb/pkg/proto/core/v1" + + sq "github.com/Masterminds/squirrel" + "github.com/jackc/pgx/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var ( + writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression) + writeCaveatDeprecated = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression, colCreatedTxnDeprecated) + readCaveat = psql.Select(colCaveatExpression).From(tableCaveat) + deleteCaveat = psql.Update(tableCaveat).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + deleteCaveatDeprecated = psql.Update(tableCaveat).Where(sq.Eq{colDeletedTxnDeprecated: liveDeletedTxnID}) +) + +func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Caveat, error) { + ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name))) + defer span.End() + + filteredReadCaveat := r.filterer(readCaveat) + sql, args, err := filteredReadCaveat.Where(sq.Eq{colCaveatName: name}).ToSql() + if err != nil { + return nil, err + } + + tx, txCleanup, err := r.txSource(ctx) + if err != nil { + return nil, fmt.Errorf("unable to read caveat: %w", err) + } + defer txCleanup(ctx) + + var expr []byte + err = tx.QueryRow(ctx, sql, args...).Scan(&expr) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, datastore.NewCaveatNameNotFoundErr(name) + } + return nil, err + } + return &core.Caveat{ + Name: name, + Expression: expr, + }, nil +} + +func (rwt *pgReadWriteTXN) WriteCaveats(caveats []*core.Caveat) error { + ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "WriteCaveats") + defer span.End() + + deletedCaveatClause := sq.Or{} + write := writeCaveat + // TODO remove once the ID->XID migrations are all complete + if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld { + write = writeCaveatDeprecated + } + writtenCaveatNames := make([]string, 0, len(caveats)) + for _, caveat := range caveats { + deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name}) + valuesToWrite := []any{caveat.Name, caveat.Expression} + // TODO remove once the ID->XID migrations are all complete + if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld { + valuesToWrite = append(valuesToWrite, rwt.newXID.Uint) + } + write = write.Values(valuesToWrite...) + writtenCaveatNames = append(writtenCaveatNames, caveat.Name) + } + span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames)) + + // mark current caveats as deleted + err := rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause) + if err != nil { + return err + } + + // store the new caveat revision + sql, args, err := write.ToSql() + if err != nil { + return fmt.Errorf("unable to write new caveat revision: %w", err) + } + if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil { + return fmt.Errorf("unable to write new caveat revision: %w", err) + } + return nil +} + +func (rwt *pgReadWriteTXN) DeleteCaveats(caveats []*core.Caveat) error { + ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "DeleteCaveats") + defer span.End() + + deletedCaveatClause := sq.Or{} + deletedCaveatNames := make([]string, 0, len(caveats)) + for _, caveat := range caveats { + deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name}) + deletedCaveatNames = append(deletedCaveatNames, caveat.Name) + } + span.SetAttributes(common.CaveatNameKey.StringSlice(deletedCaveatNames)) + + // mark current caveats as deleted + return rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause) +} + +func (rwt *pgReadWriteTXN) deleteCaveatsWithClause(ctx context.Context, deleteClauses sq.Or) error { + sql, args, err := deleteCaveat. + Set(colDeletedXid, rwt.newXID). + Where(sq.And{sq.Eq{colDeletedXid: liveDeletedTxnID}, deleteClauses}). + ToSql() + if err != nil { + return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err) + } + + // TODO remove once the ID->XID migrations are all complete + if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld { + baseQuery := deleteCaveat + if rwt.migrationPhase == writeBothReadOld { + baseQuery = deleteCaveatDeprecated + } + + sql, args, err = baseQuery. + Where(deleteClauses). + Set(colDeletedTxnDeprecated, rwt.newXID.Uint). + Set(colDeletedXid, rwt.newXID). + ToSql() + if err != nil { + return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err) + } + } + + if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil { + return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err) + } + return nil +} diff --git a/internal/datastore/postgres/common/pgx.go b/internal/datastore/postgres/common/pgx.go index f34c365e26..c558a32473 100644 --- a/internal/datastore/postgres/common/pgx.go +++ b/internal/datastore/postgres/common/pgx.go @@ -2,16 +2,17 @@ package common import ( "context" + "database/sql" "fmt" - "github.com/authzed/spicedb/internal/logging" - "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/log/zerologadapter" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/types/known/structpb" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -21,7 +22,7 @@ const ( ) // NewPGXExecutor creates an executor that uses the pgx library to make the specified queries. -func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc { +func NewPGXExecutor(txSource TxFactory, caveatEnabled bool) common.ExecuteQueryFunc { return func(ctx context.Context, sql string, args []any) ([]*corev1.RelationTuple, error) { ctx = datastore.SeparateContextWithTracing(ctx) @@ -32,14 +33,14 @@ func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc { return nil, fmt.Errorf(errUnableToQueryTuples, err) } defer txCleanup(ctx) - return queryTuples(ctx, sql, args, span, tx) + return queryTuples(ctx, sql, args, span, tx, caveatEnabled) } } // queryTuples queries tuples for the given query and transaction. -func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, tx pgx.Tx) ([]*corev1.RelationTuple, error) { +func queryTuples(ctx context.Context, sqlStatement string, args []any, span trace.Span, tx pgx.Tx, caveatEnabled bool) ([]*corev1.RelationTuple, error) { span.AddEvent("DB transaction established") - rows, err := tx.Query(ctx, sql, args...) + rows, err := tx.Query(ctx, sqlStatement, args...) if err != nil { return nil, fmt.Errorf(errUnableToQueryTuples, err) } @@ -53,18 +54,46 @@ func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, t ResourceAndRelation: &corev1.ObjectAndRelation{}, Subject: &corev1.ObjectAndRelation{}, } - err := rows.Scan( - &nextTuple.ResourceAndRelation.Namespace, - &nextTuple.ResourceAndRelation.ObjectId, - &nextTuple.ResourceAndRelation.Relation, - &nextTuple.Subject.Namespace, - &nextTuple.Subject.ObjectId, - &nextTuple.Subject.Relation, - ) + var caveatName sql.NullString + var caveatCtx map[string]any + var err error + // TODO(vroldanbet) deprecate bool flag once CRDB also supports caveat + if caveatEnabled { + err = rows.Scan( + &nextTuple.ResourceAndRelation.Namespace, + &nextTuple.ResourceAndRelation.ObjectId, + &nextTuple.ResourceAndRelation.Relation, + &nextTuple.Subject.Namespace, + &nextTuple.Subject.ObjectId, + &nextTuple.Subject.Relation, + &caveatName, + &caveatCtx, + ) + } else { + err = rows.Scan( + &nextTuple.ResourceAndRelation.Namespace, + &nextTuple.ResourceAndRelation.ObjectId, + &nextTuple.ResourceAndRelation.Relation, + &nextTuple.Subject.Namespace, + &nextTuple.Subject.ObjectId, + &nextTuple.Subject.Relation, + ) + } if err != nil { return nil, fmt.Errorf(errUnableToQueryTuples, err) } + if caveatEnabled && caveatName.String != "" { + caveatStruct, err := structpb.NewStruct(caveatCtx) + if err != nil { + return nil, fmt.Errorf("unable to fetch caveat context: %w", err) + } + nextTuple.Caveat = &corev1.ContextualizedCaveat{ + CaveatName: caveatName.String, + Context: caveatStruct, + } + } + tuples = append(tuples, nextTuple) } if err := rows.Err(); err != nil { diff --git a/internal/datastore/postgres/migrations/driver.go b/internal/datastore/postgres/migrations/driver.go index a9203d10be..4e2dbb52e9 100644 --- a/internal/datastore/postgres/migrations/driver.go +++ b/internal/datastore/postgres/migrations/driver.go @@ -12,8 +12,6 @@ import ( "github.com/authzed/spicedb/pkg/migrate" ) -const errUnableToInstantiate = "unable to instantiate AlembicPostgresDriver: %w" - const postgresMissingTableErrorCode = "42P01" // AlembicPostgresDriver implements a schema migration facility for use in @@ -28,12 +26,12 @@ type AlembicPostgresDriver struct { func NewAlembicPostgresDriver(url string) (*AlembicPostgresDriver, error) { connectStr, err := pq.ParseURL(url) if err != nil { - return nil, fmt.Errorf(errUnableToInstantiate, err) + return nil, err } db, err := pgx.Connect(context.Background(), connectStr) if err != nil { - return nil, fmt.Errorf(errUnableToInstantiate, err) + return nil, err } return &AlembicPostgresDriver{db}, nil diff --git a/internal/datastore/postgres/migrations/zz_migration.0009_caveat.go b/internal/datastore/postgres/migrations/zz_migration.0009_caveat.go new file mode 100644 index 0000000000..1fafe4c760 --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0009_caveat.go @@ -0,0 +1,36 @@ +package migrations + +import ( + "context" + + "github.com/jackc/pgx/v4" +) + +var caveatStatements = []string{ + `CREATE TABLE caveat ( + name VARCHAR NOT NULL, + expression BYTEA NOT NULL, + created_transaction BIGINT NOT NULL, + deleted_transaction BIGINT NOT NULL DEFAULT '9223372036854775807', + CONSTRAINT pk_caveat_v1 PRIMARY KEY (name, deleted_transaction), + CONSTRAINT uq_caveat_v1 UNIQUE (name, created_transaction, deleted_transaction));`, + `ALTER TABLE relation_tuple + ADD COLUMN caveat_name VARCHAR, + ADD COLUMN caveat_context JSONB;`, +} + +func init() { + if err := DatabaseMigrations.Register("add-caveats", "add-ns-config-id", + noNonatomicMigration, + func(ctx context.Context, tx pgx.Tx) error { + for _, stmt := range caveatStatements { + if _, err := tx.Exec(ctx, stmt); err != nil { + return err + } + } + + return nil + }); err != nil { + panic("failed to register migration: " + err.Error()) + } +} diff --git a/internal/datastore/postgres/migrations/zz_migration.0009_add_xid8_columns.go b/internal/datastore/postgres/migrations/zz_migration.0010_add_xid8_columns.go similarity index 79% rename from internal/datastore/postgres/migrations/zz_migration.0009_add_xid8_columns.go rename to internal/datastore/postgres/migrations/zz_migration.0010_add_xid8_columns.go index 2645fb29dd..e9ba01de73 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0009_add_xid8_columns.go +++ b/internal/datastore/postgres/migrations/zz_migration.0010_add_xid8_columns.go @@ -23,6 +23,11 @@ const ( ADD COLUMN created_xid xid8, ADD COLUMN deleted_xid xid8 NOT NULL DEFAULT ('9223372036854775807');` + addCaveatXIDColumns = ` + ALTER TABLE caveat + ADD COLUMN created_xid xid8, + ADD COLUMN deleted_xid xid8 NOT NULL DEFAULT ('9223372036854775807');` + addTransactionDefault = ` ALTER TABLE relation_tuple_transaction ALTER COLUMN snapshot SET DEFAULT (pg_current_snapshot());` @@ -34,19 +39,25 @@ const ( addNamepsaceDefault = ` ALTER TABLE namespace_config ALTER COLUMN created_xid SET DEFAULT (pg_current_xact_id());` + + addCaveatDefault = ` + ALTER TABLE caveat + ALTER COLUMN created_xid SET DEFAULT (pg_current_xact_id());` ) func init() { - if err := DatabaseMigrations.Register("add-xid-columns", "add-ns-config-id", + if err := DatabaseMigrations.Register("add-xid-columns", "add-caveats", noNonatomicMigration, func(ctx context.Context, tx pgx.Tx) error { for _, stmt := range []string{ addTransactionXIDColumns, addTupleXIDColumns, addNamespaceXIDColumns, + addCaveatXIDColumns, addTransactionDefault, addRelationTupleDefault, addNamepsaceDefault, + addCaveatDefault, } { if _, err := tx.Exec(ctx, stmt); err != nil { return err diff --git a/internal/datastore/postgres/migrations/zz_migration.0010_backfill_xid_add_indices.go b/internal/datastore/postgres/migrations/zz_migration.0011_backfill_xid_add_indices.go similarity index 85% rename from internal/datastore/postgres/migrations/zz_migration.0010_backfill_xid_add_indices.go rename to internal/datastore/postgres/migrations/zz_migration.0011_backfill_xid_add_indices.go index 6c3a4e9841..1d52fcbe7e 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0010_backfill_xid_add_indices.go +++ b/internal/datastore/postgres/migrations/zz_migration.0011_backfill_xid_add_indices.go @@ -18,6 +18,8 @@ var addBackfillIndices = []string{ ON namespace_config ( (created_xid IS NULL) )`, `CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_backfill_tuple_temp ON relation_tuple ( (created_xid IS NULL) )`, + `CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_backfill_caveat_temp + ON caveat ( (created_xid IS NULL) )`, } var backfills = []string{ @@ -52,6 +54,16 @@ var backfills = []string{ LIMIT %d FOR UPDATE );`, + `UPDATE caveat + SET deleted_xid = deleted_transaction::text::xid8, + created_xid = created_transaction::text::xid8 + WHERE (name, created_transaction, deleted_transaction) IN ( + SELECT name, created_transaction, deleted_transaction + FROM caveat + WHERE created_xid IS NULL + LIMIT %d + FOR UPDATE + );`, } var addXIDIndices = []string{ @@ -74,12 +86,17 @@ var addXIDIndices = []string{ `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ix_relation_tuple_living ON relation_tuple (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation, deleted_xid);`, + `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ix_caveat_unique + ON caveat (name, created_xid, deleted_xid);`, + `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ix_caveat_living + ON caveat (name, deleted_xid);`, } var dropBackfillIndices = []string{ "DROP INDEX ix_backfill_rtt_temp", "DROP INDEX ix_backfill_ns_temp", "DROP INDEX ix_backfill_tuple_temp", + "DROP INDEX ix_backfill_caveat_temp", } func init() { diff --git a/internal/datastore/postgres/migrations/zz_migration.0011_add_xid_constraints.go b/internal/datastore/postgres/migrations/zz_migration.0012_add_xid_constraints.go similarity index 90% rename from internal/datastore/postgres/migrations/zz_migration.0011_add_xid_constraints.go rename to internal/datastore/postgres/migrations/zz_migration.0012_add_xid_constraints.go index 30d89fadd0..473fa122c2 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0011_add_xid_constraints.go +++ b/internal/datastore/postgres/migrations/zz_migration.0012_add_xid_constraints.go @@ -29,6 +29,7 @@ var addXIDConstraints = []string{ `ALTER TABLE relation_tuple_transaction ALTER COLUMN snapshot SET NOT NULL;`, `ALTER TABLE relation_tuple ALTER COLUMN created_xid SET NOT NULL;`, `ALTER TABLE namespace_config ALTER COLUMN created_xid SET NOT NULL;`, + `ALTER TABLE caveat ALTER COLUMN created_xid SET NOT NULL;`, `ALTER TABLE relation_tuple_transaction DROP CONSTRAINT pk_rttx, ADD CONSTRAINT pk_rttx PRIMARY KEY USING INDEX ix_rttx_pk;`, @@ -39,6 +40,10 @@ var addXIDConstraints = []string{ DROP CONSTRAINT pk_relation_tuple, ADD CONSTRAINT pk_relation_tuple PRIMARY KEY USING INDEX ix_relation_tuple_pk, ADD CONSTRAINT uq_relation_tuple_living_xid UNIQUE USING INDEX ix_relation_tuple_living;`, + `ALTER TABLE caveat + DROP CONSTRAINT pk_caveat_v1, + ADD CONSTRAINT pk_caveat_v2 PRIMARY KEY USING INDEX ix_caveat_living, + ADD CONSTRAINT uq_caveat_v2 UNIQUE USING INDEX ix_caveat_unique;`, } func init() { diff --git a/internal/datastore/postgres/migrations/zz_migration.0012_drop_id_constraints.go b/internal/datastore/postgres/migrations/zz_migration.0013_drop_id_constraints.go similarity index 85% rename from internal/datastore/postgres/migrations/zz_migration.0012_drop_id_constraints.go rename to internal/datastore/postgres/migrations/zz_migration.0013_drop_id_constraints.go index 0715a3baf0..0db713a526 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0012_drop_id_constraints.go +++ b/internal/datastore/postgres/migrations/zz_migration.0013_drop_id_constraints.go @@ -16,6 +16,10 @@ var dropIDConstraints = []string{ DROP CONSTRAINT uq_namespace_living, ALTER COLUMN created_transaction DROP NOT NULL, ALTER COLUMN deleted_transaction DROP NOT NULL`, + `ALTER TABLE caveat + DROP CONSTRAINT uq_caveat_v1, + ALTER COLUMN created_transaction DROP NOT NULL, + ALTER COLUMN deleted_transaction DROP NOT NULL`, } func init() { diff --git a/internal/datastore/postgres/migrations/zz_migration.0013_drop_bigserial_ids.go b/internal/datastore/postgres/migrations/zz_migration.0014_drop_bigserial_ids.go similarity index 100% rename from internal/datastore/postgres/migrations/zz_migration.0013_drop_bigserial_ids.go rename to internal/datastore/postgres/migrations/zz_migration.0014_drop_bigserial_ids.go diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 88d32799e3..9d9b19dc7d 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -35,22 +35,27 @@ const ( tableNamespace = "namespace_config" tableTransaction = "relation_tuple_transaction" tableTuple = "relation_tuple" + tableCaveat = "caveat" colCreatedTxnDeprecated = "created_transaction" colDeletedTxnDeprecated = "deleted_transaction" - colXID = "xid" - colTimestamp = "timestamp" - colNamespace = "namespace" - colConfig = "serialized_config" - colCreatedXid = "created_xid" - colDeletedXid = "deleted_xid" - colSnapshot = "snapshot" - colObjectID = "object_id" - colRelation = "relation" - colUsersetNamespace = "userset_namespace" - colUsersetObjectID = "userset_object_id" - colUsersetRelation = "userset_relation" + colXID = "xid" + colTimestamp = "timestamp" + colNamespace = "namespace" + colConfig = "serialized_config" + colCreatedXid = "created_xid" + colDeletedXid = "deleted_xid" + colSnapshot = "snapshot" + colObjectID = "object_id" + colRelation = "relation" + colUsersetNamespace = "userset_namespace" + colUsersetObjectID = "userset_object_id" + colUsersetRelation = "userset_relation" + colCaveatName = "name" + colCaveatExpression = "expression" + colCaveatContextName = "caveat_name" + colCaveatContext = "caveat_context" errUnableToInstantiate = "unable to instantiate datastore: %w" @@ -282,7 +287,7 @@ func (pgd *pgDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader } querySplitter := common.TupleQuerySplitter{ - Executor: pgxcommon.NewPGXExecutor(createTxFunc), + Executor: pgxcommon.NewPGXExecutor(createTxFunc, true), UsersetBatchSize: pgd.usersetBatchSize, } @@ -327,7 +332,7 @@ func (pgd *pgDatastore) ReadWriteTx( } querySplitter := common.TupleQuerySplitter{ - Executor: pgxcommon.NewPGXExecutor(longLivedTx), + Executor: pgxcommon.NewPGXExecutor(longLivedTx, true), UsersetBatchSize: pgd.usersetBatchSize, } diff --git a/internal/datastore/postgres/reader.go b/internal/datastore/postgres/reader.go index d2e30a55e8..a91041ea59 100644 --- a/internal/datastore/postgres/reader.go +++ b/internal/datastore/postgres/reader.go @@ -35,6 +35,8 @@ var ( colUsersetNamespace, colUsersetObjectID, colUsersetRelation, + colCaveatContextName, + colCaveatContext, ).From(tableTuple) schema = common.SchemaInformation{ diff --git a/internal/datastore/postgres/readwrite.go b/internal/datastore/postgres/readwrite.go index 7bc05bd488..5cbf5e423c 100644 --- a/internal/datastore/postgres/readwrite.go +++ b/internal/datastore/postgres/readwrite.go @@ -48,6 +48,8 @@ var ( colUsersetNamespace, colUsersetObjectID, colUsersetRelation, + colCaveatContextName, + colCaveatContext, ) // TODO remove once the ID->XID migrations are all complete @@ -58,6 +60,8 @@ var ( colUsersetNamespace, colUsersetObjectID, colUsersetRelation, + colCaveatContextName, + colCaveatContext, colCreatedTxnDeprecated, ) @@ -94,15 +98,17 @@ func (rwt *pgReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleUpd for _, mut := range mutations { tpl := mut.Tuple - if tpl.Caveat != nil { - panic("caveats not currently supported in Postgres datastore") - } - if mut.Operation == core.RelationTupleUpdate_TOUCH || mut.Operation == core.RelationTupleUpdate_DELETE { deleteClauses = append(deleteClauses, exactRelationshipClause(tpl)) } if mut.Operation == core.RelationTupleUpdate_TOUCH || mut.Operation == core.RelationTupleUpdate_CREATE { + var caveatName string + var caveatContext map[string]any + if tpl.Caveat != nil { + caveatName = tpl.Caveat.CaveatName + caveatContext = tpl.Caveat.Context.AsMap() + } valuesToWrite := []interface{}{ tpl.ResourceAndRelation.Namespace, tpl.ResourceAndRelation.ObjectId, @@ -110,6 +116,8 @@ func (rwt *pgReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleUpd tpl.Subject.Namespace, tpl.Subject.ObjectId, tpl.Subject.Relation, + caveatName, + caveatContext, // PGX driver serializes map[string]any to JSONB type columns } // TODO remove once the ID->XID migrations are all complete diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 02bc7c2f30..bc91eabdf7 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -7,6 +7,7 @@ import ( "time" sq "github.com/Masterminds/squirrel" + "google.golang.org/protobuf/types/known/structpb" "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/pkg/datastore" @@ -36,6 +37,8 @@ var ( colUsersetNamespace, colUsersetObjectID, colUsersetRelation, + colCaveatContextName, + colCaveatContext, colCreatedXid, colDeletedXid, ).From(tableTuple) @@ -151,6 +154,8 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revision xid8) (*datast } var createdXID, deletedXID xid8 + var caveatName string + var caveatContext map[string]any if err := changes.Scan( &nextTuple.ResourceAndRelation.Namespace, &nextTuple.ResourceAndRelation.ObjectId, @@ -158,12 +163,25 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revision xid8) (*datast &nextTuple.Subject.Namespace, &nextTuple.Subject.ObjectId, &nextTuple.Subject.Relation, + &caveatName, + &caveatContext, &createdXID, &deletedXID, ); err != nil { return nil, fmt.Errorf("unable to parse changed tuple: %w", err) } + if caveatName != "" { + contextStruct, err := structpb.NewStruct(caveatContext) + if err != nil { + return nil, fmt.Errorf("failed to read caveat context from update: %w", err) + } + nextTuple.Caveat = &core.ContextualizedCaveat{ + CaveatName: caveatName, + Context: contextStruct, + } + } + if createdXID.Uint == revision.Uint { tracked.AddChange(ctx, revisionFromTransaction(revision), nextTuple, core.RelationTupleUpdate_TOUCH) } else if deletedXID.Uint == revision.Uint { diff --git a/internal/dispatch/graph/computecheck.go b/internal/dispatch/graph/computecheck.go index d6e606db76..21e4ec4073 100644 --- a/internal/dispatch/graph/computecheck.go +++ b/internal/dispatch/graph/computecheck.go @@ -121,7 +121,7 @@ func runExpression( ds := datastoremw.MustFromContext(ctx) reader := ds.SnapshotReader(revision) cr := reader.(datastore.CaveatReader) - caveat, err := cr.ReadCaveatByName(expr.GetCaveat().CaveatName) + caveat, err := cr.ReadCaveatByName(ctx, expr.GetCaveat().CaveatName) if err != nil { return nil, err } diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index cc428cc188..f8bd197c42 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -183,7 +183,7 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ // eventually this type assertion should be removed if update.Relationship.OptionalCaveat != nil && update.Relationship.OptionalCaveat.CaveatName != "" { if caveatReader, ok := rwt.(datastore.CaveatReader); ok { - _, err := caveatReader.ReadCaveatByName(update.Relationship.OptionalCaveat.CaveatName) + _, err := caveatReader.ReadCaveatByName(ctx, update.Relationship.OptionalCaveat.CaveatName) if errors.As(err, &datastore.ErrCaveatNameNotFound{}) { return rewriteError(ctx, NewCaveatNotFoundError(update)) } diff --git a/internal/testfixtures/validating.go b/internal/testfixtures/validating.go index 63cca6e7ca..3de68cd8e0 100644 --- a/internal/testfixtures/validating.go +++ b/internal/testfixtures/validating.go @@ -173,9 +173,9 @@ func (vrwt validatingReadWriteTransaction) DeleteRelationships(filter *v1.Relati return vrwt.delegate.DeleteRelationships(filter) } -func (vrwt validatingReadWriteTransaction) ReadCaveatByName(name string) (*core.Caveat, error) { +func (vrwt validatingReadWriteTransaction) ReadCaveatByName(ctx context.Context, name string) (*core.Caveat, error) { if ds, ok := vrwt.delegate.(datastore.CaveatReader); ok { - return ds.ReadCaveatByName(name) + return ds.ReadCaveatByName(ctx, name) } return nil, fmt.Errorf("transaction delegate does not implement datastore.CaveatReader") } diff --git a/internal/testserver/datastore/crdb.go b/internal/testserver/datastore/crdb.go index a3e2ef9f02..45a072dbdf 100644 --- a/internal/testserver/datastore/crdb.go +++ b/internal/testserver/datastore/crdb.go @@ -62,12 +62,14 @@ func RunCRDBForTesting(t testing.TB, bridgeNetworkName string) RunningEngineForT uri := fmt.Sprintf("postgres://%s@localhost:%s/defaultdb?sslmode=disable", builder.creds, port) require.NoError(t, pool.Retry(func() error { var err error - builder.conn, err = pgx.Connect(context.Background(), uri) + ctx, cancelConnect := context.WithTimeout(context.Background(), dockerBootTimeout) + defer cancelConnect() + builder.conn, err = pgx.Connect(ctx, uri) if err != nil { return err } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, cancelRangeFeeds := context.WithTimeout(context.Background(), dockerBootTimeout) + defer cancelRangeFeeds() _, err = builder.conn.Exec(ctx, enableRangefeeds) return err })) diff --git a/internal/testserver/datastore/datastore.go b/internal/testserver/datastore/datastore.go index b2d69ccde8..5e43e74197 100644 --- a/internal/testserver/datastore/datastore.go +++ b/internal/testserver/datastore/datastore.go @@ -5,6 +5,7 @@ package datastore import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -12,6 +13,8 @@ import ( "github.com/authzed/spicedb/pkg/migrate" ) +const dockerBootTimeout = 10 * time.Second + // InitFunc initializes a datastore instance from a uri that has been // generated from a TestDatastoreBuilder type InitFunc func(engine, uri string) datastore.Datastore diff --git a/internal/testserver/datastore/mysql.go b/internal/testserver/datastore/mysql.go index 4eef751d50..6e192b4b48 100644 --- a/internal/testserver/datastore/mysql.go +++ b/internal/testserver/datastore/mysql.go @@ -87,7 +87,9 @@ func RunMySQLForTestingWithOptions(t testing.TB, options MySQLTesterOptions, bri if err != nil { return err } - err = builder.db.Ping() + ctx, cancelPing := context.WithTimeout(context.Background(), dockerBootTimeout) + defer cancelPing() + err = builder.db.PingContext(ctx) if err != nil { return err } diff --git a/internal/testserver/datastore/postgres.go b/internal/testserver/datastore/postgres.go index 2969653374..944acfbd2b 100644 --- a/internal/testserver/datastore/postgres.go +++ b/internal/testserver/datastore/postgres.go @@ -64,7 +64,9 @@ func RunPostgresForTesting(t testing.TB, bridgeNetworkName string, targetMigrati uri := fmt.Sprintf("postgres://%s@localhost:%s/defaultdb?sslmode=disable", builder.creds, port) require.NoError(t, pool.Retry(func() error { var err error - builder.conn, err = pgx.Connect(context.Background(), uri) + ctx, cancelConnect := context.WithTimeout(context.Background(), dockerBootTimeout) + defer cancelConnect() + builder.conn, err = pgx.Connect(ctx, uri) if err != nil { return err } diff --git a/internal/testserver/datastore/spanner.go b/internal/testserver/datastore/spanner.go index 9f6a23dc7c..ff687cce52 100644 --- a/internal/testserver/datastore/spanner.go +++ b/internal/testserver/datastore/spanner.go @@ -52,7 +52,8 @@ func RunSpannerForTesting(t testing.TB, bridgeNetworkName string) RunningEngineF require.NoError(t, os.Setenv("SPANNER_EMULATOR_HOST", spannerEmulatorAddr)) require.NoError(t, pool.Retry(func() error { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), dockerBootTimeout) + defer cancel() instancesClient, err := instances.NewInstanceAdminClient(ctx) if err != nil { @@ -60,6 +61,8 @@ func RunSpannerForTesting(t testing.TB, bridgeNetworkName string) RunningEngineF } defer func() { require.NoError(t, instancesClient.Close()) }() + ctx, cancel = context.WithTimeout(context.Background(), dockerBootTimeout) + defer cancel() _, err = instancesClient.CreateInstance(ctx, &instance.CreateInstanceRequest{ Parent: "projects/fake-project-id", InstanceId: "init", diff --git a/pkg/cmd/migrate.go b/pkg/cmd/migrate.go index c9601fbd09..bde4db7c73 100644 --- a/pkg/cmd/migrate.go +++ b/pkg/cmd/migrate.go @@ -52,18 +52,18 @@ func migrateRun(cmd *cobra.Command, args []string) error { var err error migrationDriver, err := crdbmigrations.NewCRDBDriver(dbURL) if err != nil { - log.Fatal().Err(err).Msg("unable to create migration driver") + return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err) } - runMigration(cmd.Context(), migrationDriver, crdbmigrations.CRDBMigrations, args[0], timeout, migrationBatachSize) + return runMigration(cmd.Context(), migrationDriver, crdbmigrations.CRDBMigrations, args[0], timeout, migrationBatachSize) } else if datastoreEngine == "postgres" { log.Info().Msg("migrating postgres datastore") var err error migrationDriver, err := migrations.NewAlembicPostgresDriver(dbURL) if err != nil { - log.Fatal().Err(err).Msg("unable to create migration driver") + return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err) } - runMigration(cmd.Context(), migrationDriver, migrations.DatabaseMigrations, args[0], timeout, migrationBatachSize) + return runMigration(cmd.Context(), migrationDriver, migrations.DatabaseMigrations, args[0], timeout, migrationBatachSize) } else if datastoreEngine == "spanner" { log.Info().Msg("migrating spanner datastore") @@ -75,9 +75,9 @@ func migrateRun(cmd *cobra.Command, args []string) error { } migrationDriver, err := spannermigrations.NewSpannerDriver(dbURL, credFile, emulatorHost) if err != nil { - log.Fatal().Err(err).Msg("unable to create migration driver") + return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err) } - runMigration(cmd.Context(), migrationDriver, spannermigrations.SpannerMigrations, args[0], timeout, migrationBatachSize) + return runMigration(cmd.Context(), migrationDriver, spannermigrations.SpannerMigrations, args[0], timeout, migrationBatachSize) } else if datastoreEngine == "mysql" { log.Info().Msg("migrating mysql datastore") @@ -89,14 +89,12 @@ func migrateRun(cmd *cobra.Command, args []string) error { migrationDriver, err := mysqlmigrations.NewMySQLDriverFromDSN(dbURL, tablePrefix) if err != nil { - log.Fatal().Err(err).Msg("unable to create migration driver") + return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err) } - runMigration(cmd.Context(), migrationDriver, mysqlmigrations.Manager, args[0], timeout, migrationBatachSize) - } else { - return fmt.Errorf("cannot migrate datastore engine type: %s", datastoreEngine) + return runMigration(cmd.Context(), migrationDriver, mysqlmigrations.Manager, args[0], timeout, migrationBatachSize) } - return nil + return fmt.Errorf("cannot migrate datastore engine type: %s", datastoreEngine) } func runMigration[D migrate.Driver[C, T], C any, T any]( @@ -106,18 +104,19 @@ func runMigration[D migrate.Driver[C, T], C any, T any]( targetRevision string, timeout time.Duration, backfillBatchSize uint64, -) { +) error { log.Info().Str("targetRevision", targetRevision).Msg("running migrations") ctxWithBatch := context.WithValue(ctx, migrate.BackfillBatchSize, backfillBatchSize) ctx, cancel := context.WithTimeout(ctxWithBatch, timeout) defer cancel() if err := manager.Run(ctx, driver, targetRevision, migrate.LiveRun); err != nil { - log.Fatal().Err(err).Msg("unable to complete requested migrations") + return fmt.Errorf("unable to migrate to `%s` revision: %w", targetRevision, err) } if err := driver.Close(ctx); err != nil { - log.Fatal().Err(err).Msg("unable to close migration driver") + return fmt.Errorf("unable to close migration driver: %w", err) } + return nil } func RegisterHeadFlags(cmd *cobra.Command) { @@ -132,7 +131,7 @@ func NewHeadCommand(programName string) *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { headRevision, err := HeadRevision(cobrautil.MustGetStringExpanded(cmd, "datastore-engine")) if err != nil { - log.Fatal().Err(err).Msg("unable to compute head revision") + return fmt.Errorf("unable to compute head revision: %w", err) } fmt.Println(headRevision) return nil diff --git a/pkg/datastore/caveat.go b/pkg/datastore/caveat.go index 31351b1f6f..37c2b8282c 100644 --- a/pkg/datastore/caveat.go +++ b/pkg/datastore/caveat.go @@ -1,13 +1,15 @@ package datastore import ( + "context" + core "github.com/authzed/spicedb/pkg/proto/core/v1" ) // CaveatReader offers read operations for caveats type CaveatReader interface { // ReadCaveatByName returns a caveat with the provided name - ReadCaveatByName(name string) (*core.Caveat, error) + ReadCaveatByName(ctx context.Context, name string) (*core.Caveat, error) } // CaveatStorer offers both read and write operations for Caveats diff --git a/internal/datastore/memdb/caveat_test.go b/pkg/datastore/test/caveat.go similarity index 60% rename from internal/datastore/memdb/caveat_test.go rename to pkg/datastore/test/caveat.go index e1c5cdbeb9..bedf1422e1 100644 --- a/internal/datastore/memdb/caveat_test.go +++ b/pkg/datastore/test/caveat.go @@ -1,13 +1,11 @@ -package memdb +package test import ( "context" + "fmt" "testing" "time" - "github.com/google/go-cmp/cmp" - "google.golang.org/protobuf/testing/protocmp" - "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/testfixtures" "github.com/authzed/spicedb/pkg/caveats" @@ -15,24 +13,27 @@ import ( core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/tuple" + "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" ) -func TestWriteReadCaveat(t *testing.T) { +func WriteReadDeleteCaveatTest(t *testing.T, tester DatastoreTester) { req := require.New(t) - - ds, err := NewMemdbDatastore(0, 1*time.Hour, 1*time.Hour) + ds, err := tester.New(0*time.Second, veryLargeGCWindow, 1) req.NoError(err) - // Dupes in same transaction are treated as upserts - coreCaveat := createCoreCaveat(t) + skipIfNotCaveatStorer(t, ds) + + // Dupes in same transaction are fail to be written ctx := context.Background() + coreCaveat := createCoreCaveat(t) _, err = writeCaveats(ctx, ds, coreCaveat, coreCaveat) - req.NoError(err) + req.Error(err) - // Succeeds upserting an existing caveat + // Succeeds writing a caveat rev, err := writeCaveat(ctx, ds, coreCaveat) req.NoError(err) @@ -40,26 +41,39 @@ func TestWriteReadCaveat(t *testing.T) { cr, ok := ds.SnapshotReader(rev).(datastore.CaveatReader) req.True(ok, "expected a CaveatStorer value") - cv, err := cr.ReadCaveatByName(coreCaveat.Name) + cv, err := cr.ReadCaveatByName(ctx, coreCaveat.Name) req.NoError(err) foundDiff := cmp.Diff(coreCaveat, cv, protocmp.Transform()) req.Empty(foundDiff) + // Delete Caveat + rev, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error { + cs := tx.(datastore.CaveatStorer) + return cs.DeleteCaveats([]*core.Caveat{coreCaveat}) + }) + req.NoError(err) + cr = ds.SnapshotReader(rev).(datastore.CaveatReader) + _, err = cr.ReadCaveatByName(ctx, coreCaveat.Name) + req.ErrorAs(err, &datastore.ErrCaveatNameNotFound{}) + // Returns an error if caveat name or ID does not exist - _, err = cr.ReadCaveatByName("doesnotexist") + _, err = cr.ReadCaveatByName(ctx, "doesnotexist") req.ErrorAs(err, &datastore.ErrCaveatNameNotFound{}) } -func TestWriteCaveatedTuple(t *testing.T) { +func WriteCaveatedRelationshipTest(t *testing.T, tester DatastoreTester) { req := require.New(t) - ctx := context.Background() + ds, err := tester.New(0*time.Second, veryLargeGCWindow, 1) + req.NoError(err) + + skipIfNotCaveatStorer(t, ds) - ds, err := NewMemdbDatastore(0, 1*time.Hour, 1*time.Hour) req.NoError(err) sds, _ := testfixtures.StandardDatastoreWithSchema(ds, req) // Store caveat, write caveated tuple and read back same value coreCaveat := createCoreCaveat(t) + ctx := context.Background() _, err = writeCaveat(ctx, ds, coreCaveat) req.NoError(err) @@ -82,12 +96,13 @@ func TestWriteCaveatedTuple(t *testing.T) { req.NoError(err) } -func TestCaveatSnapshotReads(t *testing.T) { +func CaveatSnapshotReadsTest(t *testing.T, tester DatastoreTester) { req := require.New(t) - - ds, err := NewMemdbDatastore(0, 1*time.Hour, 1*time.Hour) + ds, err := tester.New(0*time.Second, veryLargeGCWindow, 1) req.NoError(err) + skipIfNotCaveatStorer(t, ds) + // Write an initial caveat coreCaveat := createCoreCaveat(t) ctx := context.Background() @@ -103,19 +118,69 @@ func TestCaveatSnapshotReads(t *testing.T) { // check most recent revision cr, ok := ds.SnapshotReader(newRev).(datastore.CaveatReader) - req.True(ok, "expected a CaveatStorer value") - cv, err := cr.ReadCaveatByName(coreCaveat.Name) + req.True(ok, "expected a CaveatReader value") + cv, err := cr.ReadCaveatByName(ctx, coreCaveat.Name) req.NoError(err) req.Equal(newExpression, cv.Expression) // check previous revision cr, ok = ds.SnapshotReader(oldRev).(datastore.CaveatReader) - req.True(ok, "expected a CaveatStorer value") - cv, err = cr.ReadCaveatByName(coreCaveat.Name) + req.True(ok, "expected a CaveatReader value") + cv, err = cr.ReadCaveatByName(ctx, coreCaveat.Name) req.NoError(err) req.Equal(oldExpression, cv.Expression) } +func CaveatedRelationshipWatchTest(t *testing.T, tester DatastoreTester) { + req := require.New(t) + ds, err := tester.New(0*time.Second, veryLargeGCWindow, 16) + req.NoError(err) + + skipIfNotCaveatStorer(t, ds) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Write caveat and caveated relationship + coreCaveat := createCoreCaveat(t) + _, err = writeCaveat(ctx, ds, coreCaveat) + req.NoError(err) + + // TODO bug: Watch API won't send updates i revision used is the first revision + lowestRevision, err := ds.HeadRevision(ctx) + req.NoError(err) + chanRevisionChanges, chanErr := ds.Watch(ctx, lowestRevision) + req.Zero(len(chanErr)) + + tpl := createTestCaveatedTuple(t, "document:companyplan#parent@folder:company#...", coreCaveat.Name) + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpl) + req.NoError(err) + + // Caveated Relationship should come through Watch API + + changeWait := time.NewTimer(5 * time.Second) + select { + case change, ok := <-chanRevisionChanges: + req.True(ok) + req.Len(change.Changes, 1) + for _, update := range change.Changes { + foundDiff := cmp.Diff(tpl, update.Tuple, protocmp.Transform()) + req.Empty(foundDiff) + } + case <-changeWait.C: + req.Fail("timed out waiting for caveated relationship via Watch API") + } +} + +func skipIfNotCaveatStorer(t *testing.T, ds datastore.Datastore) { + ctx := context.Background() + ds.ReadWriteTx(ctx, func(ctx context.Context, transaction datastore.ReadWriteTransaction) error { //nolint: errcheck + if cs, ok := transaction.(datastore.CaveatStorer); !ok { + t.Skip("datastore does not implement CaveatStorer interface", cs) + } + return fmt.Errorf("force rollback of unnecesary tx") + }) +} + func createTestCaveatedTuple(t *testing.T, tplString string, caveatName string) *core.RelationTuple { tpl := tuple.MustParse(tplString) st, err := structpb.NewStruct(map[string]interface{}{"a": 1, "b": "test"}) diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index b86f0489bd..7ecec0cfbd 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -56,6 +56,11 @@ func All(t *testing.T, tester DatastoreTester) { t.Run("TestWatchCancel", func(t *testing.T) { WatchCancelTest(t, tester) }) t.Run("TestStats", func(t *testing.T) { StatsTest(t, tester) }) + + t.Run("TestWriteReadDeleteCaveat", func(t *testing.T) { WriteReadDeleteCaveatTest(t, tester) }) + t.Run("TestWriteCaveatedRelationship", func(t *testing.T) { WriteCaveatedRelationshipTest(t, tester) }) + t.Run("TestCaveatSnapshotReads", func(t *testing.T) { CaveatSnapshotReadsTest(t, tester) }) + t.Run("TestCaveatedRelationshipWatch", func(t *testing.T) { CaveatedRelationshipWatchTest(t, tester) }) } var testResourceNS = namespace.Namespace( diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index ed5b3ff0de..7d32c9cd80 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -99,6 +99,7 @@ func (m *Manager[D, C, T]) Register(version, replaces string, up MigrationFunc[C // Run will actually perform the necessary migrations to bring the backing datastore // from its current revision to the specified revision. func (m *Manager[D, C, T]) Run(ctx context.Context, driver D, throughRevision string, dryRun RunType) error { + requestedRevision := throughRevision starting, err := driver.Version(ctx) if err != nil { return fmt.Errorf("unable to compute target revision: %w", err) @@ -115,6 +116,9 @@ func (m *Manager[D, C, T]) Run(ctx context.Context, driver D, throughRevision st if err != nil { return fmt.Errorf("unable to compute migration list: %w", err) } + if len(toRun) == 0 { + log.Info().Str("targetRevision", requestedRevision).Msg("server already at requested revision") + } if !dryRun { for _, migrationToRun := range toRun { @@ -148,7 +152,7 @@ func (m *Manager[D, C, T]) Run(ctx context.Context, driver D, throughRevision st return nil }); err != nil { - return fmt.Errorf("error executing migration transaction function: %w", err) + return fmt.Errorf("error executing migration `%s`: %w", migrationToRun.version, err) } currentVersion, err = driver.Version(ctx)