Skip to content

Commit

Permalink
adds caveat support to postgres
Browse files Browse the repository at this point in the history
- refactors memdb caveat test and turns it into
  a datastore test. Datastores not implementing
  the interface are skipped
- adds migration to add caveat table for PSQL
- makes psql TX implements CaveatStorer, following
  same strategy for MVCC
- adds context to ReadCaveatByName to follow same
  signature as ReadNamespace
- adjusts memdb to avoid upserts on duplicate caveats,
  and align with PSQL
- implement missing DeleteCaveat test, and implement
  that logic or memdb and postgres
  • Loading branch information
vroldanbet committed Oct 10, 2022
1 parent eee236e commit c21dfad
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 64 deletions.
3 changes: 3 additions & 0 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
)

var (
// ObjCaveatNameKey is a tracing attribute representing a caveat name
ObjCaveatNameKey = attribute.Key("authzed.com/spicedb/sql/objCaveatName")

// ObjNamespaceNameKey is a tracing attribute representing the resource
// object type.
ObjNamespaceNameKey = attribute.Key("authzed.com/spicedb/sql/objNamespaceName")
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(createTxFunc),
Executor: pgxcommon.NewPGXExecutor(createTxFunc, false),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand All @@ -234,7 +234,7 @@ func (cds *crdbDatastore) ReadWriteTx(
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(longLivedTx),
Executor: pgxcommon.NewPGXExecutor(longLivedTx, false),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand Down
19 changes: 17 additions & 2 deletions internal/datastore/memdb/caveat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memdb

import (
"context"
"fmt"

"github.com/authzed/spicedb/pkg/datastore"
Expand All @@ -23,7 +24,7 @@ func (c *caveat) Unwrap() *core.Caveat {
}
}

func (r *memdbReader) ReadCaveatByName(name string) (*core.Caveat, error) {
func (r *memdbReader) ReadCaveatByName(_ context.Context, name string) (*core.Caveat, error) {
if !r.enableCaveats {
return nil, fmt.Errorf("caveats are not enabled")
}
Expand Down Expand Up @@ -68,14 +69,19 @@ func (rwt *memdbReadWriteTx) WriteCaveats(caveats []*core.Caveat) error {
}

func (rwt *memdbReadWriteTx) writeCaveat(tx *memdb.Txn, caveats []*core.Caveat) error {
caveatNames := make(map[string]struct{}, len(caveats))
for _, coreCaveat := range caveats {
if _, ok := caveatNames[coreCaveat.Name]; ok {
return fmt.Errorf("duplicate caveat %s", coreCaveat.Name)
}
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
}
if err := tx.Insert(tableCaveats, &c); err != nil {
return err
}
caveatNames[coreCaveat.Name] = struct{}{}
}
return nil
}
Expand All @@ -87,5 +93,14 @@ func (rwt *memdbReadWriteTx) DeleteCaveats(caveats []*core.Caveat) error {
if err != nil {
return err
}
return tx.Delete(tableCaveats, caveats)
for _, coreCaveat := range caveats {
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
}
if err := tx.Delete(tableCaveats, c); err != nil {
return err
}
}
return nil
}
94 changes: 94 additions & 0 deletions internal/datastore/postgres/caveat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package postgres

import (
"context"
"errors"
"fmt"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var (
writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression)
readCaveat = psql.Select(colCaveatExpression).From(tableCaveat)
deleteCaveat = psql.Update(tableCaveat).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
)

func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Caveat, error) {
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

filteredReadCaveat := r.filterer(readCaveat)
sql, args, err := filteredReadCaveat.Where(sq.Eq{colCaveatName: name}).ToSql()
if err != nil {
return nil, err
}

tx, txCleanup, err := r.txSource(ctx)
if err != nil {
return nil, fmt.Errorf("unable to read caveat: %w", err)
}
defer txCleanup(ctx)

var expr []byte
err = tx.QueryRow(ctx, sql, args...).Scan(&expr)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, datastore.NewCaveatNameNotFoundErr(name)
}
return nil, err
}
return &core.Caveat{
Name: name,
Expression: expr,
}, nil
}

func (rwt *pgReadWriteTXN) WriteCaveats(caveats []*core.Caveat) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "WriteCaveats")
defer span.End()

deletedCaveatClause := sq.Or{}
write := writeCaveat
writtenCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name})
write = write.Values(caveat.Name, caveat.Expression)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.ObjCaveatNameKey.StringSlice(writtenCaveatNames))

// mark current caveats as deleted
delSQL, delArgs, err := deleteCaveat.
Set(colDeletedXid, rwt.newXID).
Where(sq.And{sq.Eq{colDeletedXid: liveDeletedTxnID}, deletedCaveatClause}).
ToSql()
if err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}
if _, err := rwt.tx.Exec(ctx, delSQL, delArgs...); err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}

// store the new caveat revision
sql, args, err := write.ToSql()
if err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
return nil
}

func (rwt *pgReadWriteTXN) DeleteCaveats(caveats []*core.Caveat) error {
// TODO implement me
return errors.New("implement me")
}
54 changes: 41 additions & 13 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"fmt"

"github.com/authzed/spicedb/internal/logging"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/log/zerologadapter"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/structpb"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
)
Expand All @@ -21,7 +21,7 @@ const (
)

// NewPGXExecutor creates an executor that uses the pgx library to make the specified queries.
func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
func NewPGXExecutor(txSource TxFactory, caveatEnabled bool) common.ExecuteQueryFunc {
return func(ctx context.Context, sql string, args []any) ([]*corev1.RelationTuple, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

Expand All @@ -32,12 +32,12 @@ func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}
defer txCleanup(ctx)
return queryTuples(ctx, sql, args, span, tx)
return queryTuples(ctx, sql, args, span, tx, caveatEnabled)
}
}

// queryTuples queries tuples for the given query and transaction.
func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, tx pgx.Tx) ([]*corev1.RelationTuple, error) {
func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, tx pgx.Tx, caveatEnabled bool) ([]*corev1.RelationTuple, error) {
span.AddEvent("DB transaction established")
rows, err := tx.Query(ctx, sql, args...)
if err != nil {
Expand All @@ -53,18 +53,46 @@ func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, t
ResourceAndRelation: &corev1.ObjectAndRelation{},
Subject: &corev1.ObjectAndRelation{},
}
err := rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
var caveatName string
var caveatCtx map[string]any
var err error
// TODO(vroldanbet) deprecate bool flag once CRDB also supports caveat
if caveatEnabled {
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
&caveatName,
&caveatCtx,
)
} else {
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
}
if err != nil {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}

if caveatEnabled && caveatName != "" {
caveatStruct, err := structpb.NewStruct(caveatCtx)
if err != nil {
return nil, fmt.Errorf("unable to fetch caveat context: %w", err)
}
nextTuple.Caveat = &corev1.ContextualizedCaveat{
CaveatName: caveatName,
Context: caveatStruct,
}
}

tuples = append(tuples, nextTuple)
}
if err := rows.Err(); err != nil {
Expand Down
36 changes: 36 additions & 0 deletions internal/datastore/postgres/migrations/zz_migration.0013_caveat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v4"
)

var caveatStatements = []string{
`CREATE TABLE caveat (
name VARCHAR NOT NULL,
expression BYTEA NOT NULL,
created_xid xid8 NOT NULL DEFAULT (pg_current_xact_id()),
deleted_xid xid8 NOT NULL DEFAULT ('9223372036854775807'),
CONSTRAINT uq_caveat_living PRIMARY KEY (name, deleted_xid),
CONSTRAINT uq_caveat UNIQUE (name, created_xid, deleted_xid));`,
`ALTER TABLE relation_tuple
ADD COLUMN caveat_name VARCHAR,
ADD COLUMN caveat_context JSONB;`,
}

func init() {
if err := DatabaseMigrations.Register("add-caveats", "drop-bigserial-ids",
noNonatomicMigration,
func(ctx context.Context, tx pgx.Tx) error {
for _, stmt := range caveatStatements {
if _, err := tx.Exec(ctx, stmt); err != nil {
return err
}
}

return nil
}); err != nil {
panic("failed to register migration: " + err.Error())
}
}
35 changes: 20 additions & 15 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,24 @@ const (
tableNamespace = "namespace_config"
tableTransaction = "relation_tuple_transaction"
tableTuple = "relation_tuple"

colXID = "xid"
colTimestamp = "timestamp"
colNamespace = "namespace"
colConfig = "serialized_config"
colCreatedXid = "created_xid"
colDeletedXid = "deleted_xid"
colSnapshot = "snapshot"
colObjectID = "object_id"
colRelation = "relation"
colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"
tableCaveat = "caveat"

colXID = "xid"
colTimestamp = "timestamp"
colNamespace = "namespace"
colConfig = "serialized_config"
colCreatedXid = "created_xid"
colDeletedXid = "deleted_xid"
colSnapshot = "snapshot"
colObjectID = "object_id"
colRelation = "relation"
colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"
colCaveatName = "name"
colCaveatExpression = "expression"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"

errUnableToInstantiate = "unable to instantiate datastore: %w"

Expand Down Expand Up @@ -270,7 +275,7 @@ func (pgd *pgDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(createTxFunc),
Executor: pgxcommon.NewPGXExecutor(createTxFunc, true),
UsersetBatchSize: pgd.usersetBatchSize,
}

Expand Down Expand Up @@ -304,7 +309,7 @@ func (pgd *pgDatastore) ReadWriteTx(
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(longLivedTx),
Executor: pgxcommon.NewPGXExecutor(longLivedTx, true),
UsersetBatchSize: pgd.usersetBatchSize,
}

Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/postgres/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var (
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
colCaveatContextName,
colCaveatContext,
).From(tableTuple)

schema = common.SchemaInformation{
Expand Down
Loading

0 comments on commit c21dfad

Please sign in to comment.