Skip to content

Commit

Permalink
adds caveat support to postgres
Browse files Browse the repository at this point in the history
- refactors memdb caveat test and turns it into
  a datastore test. Datastores not implementing
  the interface are skipped
- adds migration to add caveat table for PSQL
- makes psql TX implements CaveatStorer, following
  same strategy for MVCC
- adds context to ReadCaveatByName to follow same
  signature as ReadNamespace
- adjusts memdb to avoid upserts on duplicate caveats,
  and align with PSQL
- implement missing DeleteCaveat test, and implement
  that logic for memdb and postgres
  • Loading branch information
vroldanbet committed Oct 14, 2022
1 parent aa3712f commit cb7b956
Show file tree
Hide file tree
Showing 21 changed files with 432 additions and 65 deletions.
3 changes: 3 additions & 0 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
)

var (
// CaveatNameKey is a tracing attribute representing a caveat name
CaveatNameKey = attribute.Key("authzed.com/spicedb/sql/caveatName")

// ObjNamespaceNameKey is a tracing attribute representing the resource
// object type.
ObjNamespaceNameKey = attribute.Key("authzed.com/spicedb/sql/objNamespaceName")
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(createTxFunc),
Executor: pgxcommon.NewPGXExecutor(createTxFunc, false),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand All @@ -234,7 +234,7 @@ func (cds *crdbDatastore) ReadWriteTx(
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(longLivedTx),
Executor: pgxcommon.NewPGXExecutor(longLivedTx, false),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand Down
19 changes: 17 additions & 2 deletions internal/datastore/memdb/caveat.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package memdb

import (
"context"
"fmt"

"github.com/authzed/spicedb/internal/util"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

Expand All @@ -23,7 +25,7 @@ func (c *caveat) Unwrap() *core.Caveat {
}
}

func (r *memdbReader) ReadCaveatByName(name string) (*core.Caveat, error) {
func (r *memdbReader) ReadCaveatByName(_ context.Context, name string) (*core.Caveat, error) {
if !r.enableCaveats {
return nil, fmt.Errorf("caveats are not enabled")
}
Expand Down Expand Up @@ -68,7 +70,11 @@ func (rwt *memdbReadWriteTx) WriteCaveats(caveats []*core.Caveat) error {
}

func (rwt *memdbReadWriteTx) writeCaveat(tx *memdb.Txn, caveats []*core.Caveat) error {
caveatNames := util.NewSet[string]()
for _, coreCaveat := range caveats {
if !caveatNames.Add(coreCaveat.Name) {
return fmt.Errorf("duplicate caveat %s", coreCaveat.Name)
}
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
Expand All @@ -87,5 +93,14 @@ func (rwt *memdbReadWriteTx) DeleteCaveats(caveats []*core.Caveat) error {
if err != nil {
return err
}
return tx.Delete(tableCaveats, caveats)
for _, coreCaveat := range caveats {
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
}
if err := tx.Delete(tableCaveats, c); err != nil {
return err
}
}
return nil
}
142 changes: 142 additions & 0 deletions internal/datastore/postgres/caveat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package postgres

import (
"context"
"errors"
"fmt"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var (
writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression)
writeCaveatDeprecated = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression, colCreatedTxnDeprecated)
readCaveat = psql.Select(colCaveatExpression).From(tableCaveat)
deleteCaveat = psql.Update(tableCaveat).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
deleteCaveatDeprecated = psql.Update(tableCaveat).Where(sq.Eq{colDeletedTxnDeprecated: liveDeletedTxnID})
)

func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Caveat, error) {
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

filteredReadCaveat := r.filterer(readCaveat)
sql, args, err := filteredReadCaveat.Where(sq.Eq{colCaveatName: name}).ToSql()
if err != nil {
return nil, err
}

tx, txCleanup, err := r.txSource(ctx)
if err != nil {
return nil, fmt.Errorf("unable to read caveat: %w", err)
}
defer txCleanup(ctx)

var expr []byte
err = tx.QueryRow(ctx, sql, args...).Scan(&expr)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, datastore.NewCaveatNameNotFoundErr(name)
}
return nil, err
}
return &core.Caveat{
Name: name,
Expression: expr,
}, nil
}

func (rwt *pgReadWriteTXN) WriteCaveats(caveats []*core.Caveat) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "WriteCaveats")
defer span.End()

deletedCaveatClause := sq.Or{}
write := writeCaveat
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
write = writeCaveatDeprecated
}
writtenCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name})
valuesToWrite := []any{caveat.Name, caveat.Expression}
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
valuesToWrite = append(valuesToWrite, rwt.newXID.Uint)
}
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// mark current caveats as deleted
err := rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause)
if err != nil {
return err
}

// store the new caveat revision
sql, args, err := write.ToSql()
if err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
return nil
}

func (rwt *pgReadWriteTXN) DeleteCaveats(caveats []*core.Caveat) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "DeleteCaveats")
defer span.End()

deletedCaveatClause := sq.Or{}
deletedCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name})
deletedCaveatNames = append(deletedCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(deletedCaveatNames))

// mark current caveats as deleted
return rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause)
}

func (rwt *pgReadWriteTXN) deleteCaveatsWithClause(ctx context.Context, deleteClauses sq.Or) error {
sql, args, err := deleteCaveat.
Set(colDeletedXid, rwt.newXID).
Where(sq.And{sq.Eq{colDeletedXid: liveDeletedTxnID}, deleteClauses}).
ToSql()
if err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}

// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
baseQuery := deleteCaveat
if rwt.migrationPhase == writeBothReadOld {
baseQuery = deleteCaveatDeprecated
}

sql, args, err = baseQuery.
Where(deleteClauses).
Set(colDeletedTxnDeprecated, rwt.newXID.Uint).
Set(colDeletedXid, rwt.newXID).
ToSql()
if err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}
}

if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}
return nil
}
57 changes: 43 additions & 14 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package common

import (
"context"
"database/sql"
"fmt"

"github.com/authzed/spicedb/internal/logging"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/log/zerologadapter"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/structpb"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
)
Expand All @@ -21,7 +22,7 @@ const (
)

// NewPGXExecutor creates an executor that uses the pgx library to make the specified queries.
func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
func NewPGXExecutor(txSource TxFactory, caveatEnabled bool) common.ExecuteQueryFunc {
return func(ctx context.Context, sql string, args []any) ([]*corev1.RelationTuple, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

Expand All @@ -32,14 +33,14 @@ func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}
defer txCleanup(ctx)
return queryTuples(ctx, sql, args, span, tx)
return queryTuples(ctx, sql, args, span, tx, caveatEnabled)
}
}

// queryTuples queries tuples for the given query and transaction.
func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, tx pgx.Tx) ([]*corev1.RelationTuple, error) {
func queryTuples(ctx context.Context, sqlStatement string, args []any, span trace.Span, tx pgx.Tx, caveatEnabled bool) ([]*corev1.RelationTuple, error) {
span.AddEvent("DB transaction established")
rows, err := tx.Query(ctx, sql, args...)
rows, err := tx.Query(ctx, sqlStatement, args...)
if err != nil {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}
Expand All @@ -53,18 +54,46 @@ func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, t
ResourceAndRelation: &corev1.ObjectAndRelation{},
Subject: &corev1.ObjectAndRelation{},
}
err := rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
var caveatName sql.NullString
var caveatCtx map[string]any
var err error
// TODO(vroldanbet) deprecate bool flag once CRDB also supports caveat
if caveatEnabled {
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
&caveatName,
&caveatCtx,
)
} else {
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
}
if err != nil {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}

if caveatEnabled && caveatName.String != "" {
caveatStruct, err := structpb.NewStruct(caveatCtx)
if err != nil {
return nil, fmt.Errorf("unable to fetch caveat context: %w", err)
}
nextTuple.Caveat = &corev1.ContextualizedCaveat{
CaveatName: caveatName.String,
Context: caveatStruct,
}
}

tuples = append(tuples, nextTuple)
}
if err := rows.Err(); err != nil {
Expand Down
36 changes: 36 additions & 0 deletions internal/datastore/postgres/migrations/zz_migration.0009_caveat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v4"
)

var caveatStatements = []string{
`CREATE TABLE caveat (
name VARCHAR NOT NULL,
expression BYTEA NOT NULL,
created_transaction BIGINT NOT NULL,
deleted_transaction BIGINT NOT NULL DEFAULT '9223372036854775807',
CONSTRAINT pk_caveat_v1 PRIMARY KEY (name, deleted_transaction),
CONSTRAINT uq_caveat_v1 UNIQUE (name, created_transaction, deleted_transaction));`,
`ALTER TABLE relation_tuple
ADD COLUMN caveat_name VARCHAR,
ADD COLUMN caveat_context JSONB;`,
}

func init() {
if err := DatabaseMigrations.Register("add-caveats", "add-ns-config-id",
noNonatomicMigration,
func(ctx context.Context, tx pgx.Tx) error {
for _, stmt := range caveatStatements {
if _, err := tx.Exec(ctx, stmt); err != nil {
return err
}
}

return nil
}); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ const (
ADD COLUMN created_xid xid8,
ADD COLUMN deleted_xid xid8 NOT NULL DEFAULT ('9223372036854775807');`

addCaveatXIDColumns = `
ALTER TABLE caveat
ADD COLUMN created_xid xid8,
ADD COLUMN deleted_xid xid8 NOT NULL DEFAULT ('9223372036854775807');`

addTransactionDefault = `
ALTER TABLE relation_tuple_transaction
ALTER COLUMN snapshot SET DEFAULT (pg_current_snapshot());`
Expand All @@ -34,19 +39,25 @@ const (
addNamepsaceDefault = `
ALTER TABLE namespace_config
ALTER COLUMN created_xid SET DEFAULT (pg_current_xact_id());`

addCaveatDefault = `
ALTER TABLE caveat
ALTER COLUMN created_xid SET DEFAULT (pg_current_xact_id());`
)

func init() {
if err := DatabaseMigrations.Register("add-xid-columns", "add-ns-config-id",
if err := DatabaseMigrations.Register("add-xid-columns", "add-caveats",
noNonatomicMigration,
func(ctx context.Context, tx pgx.Tx) error {
for _, stmt := range []string{
addTransactionXIDColumns,
addTupleXIDColumns,
addNamespaceXIDColumns,
addCaveatXIDColumns,
addTransactionDefault,
addRelationTupleDefault,
addNamepsaceDefault,
addCaveatDefault,
} {
if _, err := tx.Exec(ctx, stmt); err != nil {
return err
Expand Down
Loading

0 comments on commit cb7b956

Please sign in to comment.