Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add caveat support to postgres datastore #890

Merged
merged 3 commits into from
Oct 14, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adds caveat support to postgres
- refactors memdb caveat test and turns it into
  a datastore test. Datastores not implementing
  the interface are skipped
- adds migration to add caveat table for PSQL
- makes psql TX implements CaveatStorer, following
  same strategy for MVCC
- adds context to ReadCaveatByName to follow same
  signature as ReadNamespace
- adjusts memdb to avoid upserts on duplicate caveats,
  and align with PSQL
- implement missing DeleteCaveat test, and implement
  that logic for memdb and postgres
  • Loading branch information
vroldanbet committed Oct 14, 2022

Unverified

This user has not yet uploaded their public signing key.
commit cb7b9569ce4b10caa1320dcb9ef16ce1aae7659b
3 changes: 3 additions & 0 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,9 @@ import (
)

var (
// CaveatNameKey is a tracing attribute representing a caveat name
CaveatNameKey = attribute.Key("authzed.com/spicedb/sql/caveatName")

// ObjNamespaceNameKey is a tracing attribute representing the resource
// object type.
ObjNamespaceNameKey = attribute.Key("authzed.com/spicedb/sql/objNamespaceName")
4 changes: 2 additions & 2 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
@@ -213,7 +213,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(createTxFunc),
Executor: pgxcommon.NewPGXExecutor(createTxFunc, false),
UsersetBatchSize: cds.usersetBatchSize,
}

@@ -234,7 +234,7 @@ func (cds *crdbDatastore) ReadWriteTx(
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(longLivedTx),
Executor: pgxcommon.NewPGXExecutor(longLivedTx, false),
UsersetBatchSize: cds.usersetBatchSize,
}

19 changes: 17 additions & 2 deletions internal/datastore/memdb/caveat.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package memdb

import (
"context"
"fmt"

"github.com/authzed/spicedb/internal/util"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

@@ -23,7 +25,7 @@ func (c *caveat) Unwrap() *core.Caveat {
}
}

func (r *memdbReader) ReadCaveatByName(name string) (*core.Caveat, error) {
func (r *memdbReader) ReadCaveatByName(_ context.Context, name string) (*core.Caveat, error) {
if !r.enableCaveats {
return nil, fmt.Errorf("caveats are not enabled")
}
@@ -68,7 +70,11 @@ func (rwt *memdbReadWriteTx) WriteCaveats(caveats []*core.Caveat) error {
}

func (rwt *memdbReadWriteTx) writeCaveat(tx *memdb.Txn, caveats []*core.Caveat) error {
caveatNames := util.NewSet[string]()
for _, coreCaveat := range caveats {
if !caveatNames.Add(coreCaveat.Name) {
return fmt.Errorf("duplicate caveat %s", coreCaveat.Name)
}
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
@@ -87,5 +93,14 @@ func (rwt *memdbReadWriteTx) DeleteCaveats(caveats []*core.Caveat) error {
if err != nil {
return err
}
return tx.Delete(tableCaveats, caveats)
for _, coreCaveat := range caveats {
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
}
if err := tx.Delete(tableCaveats, c); err != nil {
return err
}
}
return nil
}
142 changes: 142 additions & 0 deletions internal/datastore/postgres/caveat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package postgres

import (
"context"
"errors"
"fmt"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var (
writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression)
writeCaveatDeprecated = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression, colCreatedTxnDeprecated)
readCaveat = psql.Select(colCaveatExpression).From(tableCaveat)
deleteCaveat = psql.Update(tableCaveat).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
deleteCaveatDeprecated = psql.Update(tableCaveat).Where(sq.Eq{colDeletedTxnDeprecated: liveDeletedTxnID})
)

func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Caveat, error) {
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

filteredReadCaveat := r.filterer(readCaveat)
sql, args, err := filteredReadCaveat.Where(sq.Eq{colCaveatName: name}).ToSql()
if err != nil {
return nil, err
}

tx, txCleanup, err := r.txSource(ctx)
if err != nil {
return nil, fmt.Errorf("unable to read caveat: %w", err)
}
defer txCleanup(ctx)

var expr []byte
err = tx.QueryRow(ctx, sql, args...).Scan(&expr)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, datastore.NewCaveatNameNotFoundErr(name)
}
return nil, err
}
return &core.Caveat{
Name: name,
Expression: expr,
}, nil
}

func (rwt *pgReadWriteTXN) WriteCaveats(caveats []*core.Caveat) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "WriteCaveats")
defer span.End()

deletedCaveatClause := sq.Or{}
write := writeCaveat
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
write = writeCaveatDeprecated
}
writtenCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name})
valuesToWrite := []any{caveat.Name, caveat.Expression}
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
valuesToWrite = append(valuesToWrite, rwt.newXID.Uint)
}
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// mark current caveats as deleted
err := rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause)
if err != nil {
return err
}

// store the new caveat revision
sql, args, err := write.ToSql()
if err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
return nil
}

func (rwt *pgReadWriteTXN) DeleteCaveats(caveats []*core.Caveat) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "DeleteCaveats")
defer span.End()

deletedCaveatClause := sq.Or{}
deletedCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name})
deletedCaveatNames = append(deletedCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(deletedCaveatNames))

// mark current caveats as deleted
return rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause)
}

func (rwt *pgReadWriteTXN) deleteCaveatsWithClause(ctx context.Context, deleteClauses sq.Or) error {
sql, args, err := deleteCaveat.
Set(colDeletedXid, rwt.newXID).
Where(sq.And{sq.Eq{colDeletedXid: liveDeletedTxnID}, deleteClauses}).
ToSql()
if err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}

// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
baseQuery := deleteCaveat
if rwt.migrationPhase == writeBothReadOld {
baseQuery = deleteCaveatDeprecated
}

sql, args, err = baseQuery.
Where(deleteClauses).
Set(colDeletedTxnDeprecated, rwt.newXID.Uint).
Set(colDeletedXid, rwt.newXID).
ToSql()
if err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}
}

if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}
return nil
}
57 changes: 43 additions & 14 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
@@ -2,16 +2,17 @@ package common

import (
"context"
"database/sql"
"fmt"

"github.com/authzed/spicedb/internal/logging"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/log/zerologadapter"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/structpb"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
)
@@ -21,7 +22,7 @@ const (
)

// NewPGXExecutor creates an executor that uses the pgx library to make the specified queries.
func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
func NewPGXExecutor(txSource TxFactory, caveatEnabled bool) common.ExecuteQueryFunc {
return func(ctx context.Context, sql string, args []any) ([]*corev1.RelationTuple, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

@@ -32,14 +33,14 @@ func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}
defer txCleanup(ctx)
return queryTuples(ctx, sql, args, span, tx)
return queryTuples(ctx, sql, args, span, tx, caveatEnabled)
}
}

// queryTuples queries tuples for the given query and transaction.
func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, tx pgx.Tx) ([]*corev1.RelationTuple, error) {
func queryTuples(ctx context.Context, sqlStatement string, args []any, span trace.Span, tx pgx.Tx, caveatEnabled bool) ([]*corev1.RelationTuple, error) {
span.AddEvent("DB transaction established")
rows, err := tx.Query(ctx, sql, args...)
rows, err := tx.Query(ctx, sqlStatement, args...)
if err != nil {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}
@@ -53,18 +54,46 @@ func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, t
ResourceAndRelation: &corev1.ObjectAndRelation{},
Subject: &corev1.ObjectAndRelation{},
}
err := rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
var caveatName sql.NullString
var caveatCtx map[string]any
var err error
// TODO(vroldanbet) deprecate bool flag once CRDB also supports caveat
if caveatEnabled {
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
&caveatName,
&caveatCtx,
)
} else {
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
}
if err != nil {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}

if caveatEnabled && caveatName.String != "" {
caveatStruct, err := structpb.NewStruct(caveatCtx)
if err != nil {
return nil, fmt.Errorf("unable to fetch caveat context: %w", err)
}
nextTuple.Caveat = &corev1.ContextualizedCaveat{
CaveatName: caveatName.String,
Context: caveatStruct,
}
}

tuples = append(tuples, nextTuple)
}
if err := rows.Err(); err != nil {
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v4"
)

var caveatStatements = []string{
`CREATE TABLE caveat (
name VARCHAR NOT NULL,
expression BYTEA NOT NULL,
created_transaction BIGINT NOT NULL,
deleted_transaction BIGINT NOT NULL DEFAULT '9223372036854775807',
CONSTRAINT pk_caveat_v1 PRIMARY KEY (name, deleted_transaction),
CONSTRAINT uq_caveat_v1 UNIQUE (name, created_transaction, deleted_transaction));`,
`ALTER TABLE relation_tuple
ADD COLUMN caveat_name VARCHAR,
ADD COLUMN caveat_context JSONB;`,
}

func init() {
if err := DatabaseMigrations.Register("add-caveats", "add-ns-config-id",
noNonatomicMigration,
func(ctx context.Context, tx pgx.Tx) error {
for _, stmt := range caveatStatements {
if _, err := tx.Exec(ctx, stmt); err != nil {
return err
}
}

return nil
}); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,11 @@ const (
ADD COLUMN created_xid xid8,
ADD COLUMN deleted_xid xid8 NOT NULL DEFAULT ('9223372036854775807');`

addCaveatXIDColumns = `
ALTER TABLE caveat
ADD COLUMN created_xid xid8,
ADD COLUMN deleted_xid xid8 NOT NULL DEFAULT ('9223372036854775807');`

addTransactionDefault = `
ALTER TABLE relation_tuple_transaction
ALTER COLUMN snapshot SET DEFAULT (pg_current_snapshot());`
@@ -34,19 +39,25 @@ const (
addNamepsaceDefault = `
ALTER TABLE namespace_config
ALTER COLUMN created_xid SET DEFAULT (pg_current_xact_id());`

addCaveatDefault = `
ALTER TABLE caveat
ALTER COLUMN created_xid SET DEFAULT (pg_current_xact_id());`
)

func init() {
if err := DatabaseMigrations.Register("add-xid-columns", "add-ns-config-id",
if err := DatabaseMigrations.Register("add-xid-columns", "add-caveats",
noNonatomicMigration,
func(ctx context.Context, tx pgx.Tx) error {
for _, stmt := range []string{
addTransactionXIDColumns,
addTupleXIDColumns,
addNamespaceXIDColumns,
addCaveatXIDColumns,
addTransactionDefault,
addRelationTupleDefault,
addNamepsaceDefault,
addCaveatDefault,
} {
if _, err := tx.Exec(ctx, stmt); err != nil {
return err
Loading