Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add caveat support to postgres datastore #890

Merged
merged 3 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
)

var (
// CaveatNameKey is a tracing attribute representing a caveat name
CaveatNameKey = attribute.Key("authzed.com/spicedb/sql/caveatName")

// ObjNamespaceNameKey is a tracing attribute representing the resource
// object type.
ObjNamespaceNameKey = attribute.Key("authzed.com/spicedb/sql/objNamespaceName")
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(createTxFunc),
Executor: pgxcommon.NewPGXExecutor(createTxFunc, false),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand All @@ -234,7 +234,7 @@ func (cds *crdbDatastore) ReadWriteTx(
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(longLivedTx),
Executor: pgxcommon.NewPGXExecutor(longLivedTx, false),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand Down
19 changes: 17 additions & 2 deletions internal/datastore/memdb/caveat.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package memdb

import (
"context"
"fmt"

"github.com/authzed/spicedb/internal/util"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

Expand All @@ -23,7 +25,7 @@ func (c *caveat) Unwrap() *core.Caveat {
}
}

func (r *memdbReader) ReadCaveatByName(name string) (*core.Caveat, error) {
func (r *memdbReader) ReadCaveatByName(_ context.Context, name string) (*core.Caveat, error) {
if !r.enableCaveats {
return nil, fmt.Errorf("caveats are not enabled")
}
Expand Down Expand Up @@ -68,7 +70,11 @@ func (rwt *memdbReadWriteTx) WriteCaveats(caveats []*core.Caveat) error {
}

func (rwt *memdbReadWriteTx) writeCaveat(tx *memdb.Txn, caveats []*core.Caveat) error {
caveatNames := util.NewSet[string]()
for _, coreCaveat := range caveats {
if !caveatNames.Add(coreCaveat.Name) {
return fmt.Errorf("duplicate caveat %s", coreCaveat.Name)
}
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
Expand All @@ -87,5 +93,14 @@ func (rwt *memdbReadWriteTx) DeleteCaveats(caveats []*core.Caveat) error {
if err != nil {
return err
}
return tx.Delete(tableCaveats, caveats)
for _, coreCaveat := range caveats {
c := caveat{
name: coreCaveat.Name,
expression: coreCaveat.Expression,
}
if err := tx.Delete(tableCaveats, c); err != nil {
return err
}
}
return nil
}
142 changes: 142 additions & 0 deletions internal/datastore/postgres/caveat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package postgres

import (
"context"
"errors"
"fmt"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var (
writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression)
writeCaveatDeprecated = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatExpression, colCreatedTxnDeprecated)
readCaveat = psql.Select(colCaveatExpression).From(tableCaveat)
deleteCaveat = psql.Update(tableCaveat).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
deleteCaveatDeprecated = psql.Update(tableCaveat).Where(sq.Eq{colDeletedTxnDeprecated: liveDeletedTxnID})
)

func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Caveat, error) {
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

filteredReadCaveat := r.filterer(readCaveat)
sql, args, err := filteredReadCaveat.Where(sq.Eq{colCaveatName: name}).ToSql()
if err != nil {
return nil, err
}

tx, txCleanup, err := r.txSource(ctx)
if err != nil {
return nil, fmt.Errorf("unable to read caveat: %w", err)
}
defer txCleanup(ctx)

var expr []byte
err = tx.QueryRow(ctx, sql, args...).Scan(&expr)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, datastore.NewCaveatNameNotFoundErr(name)
}
return nil, err
}
return &core.Caveat{
Name: name,
Expression: expr,
}, nil
}

func (rwt *pgReadWriteTXN) WriteCaveats(caveats []*core.Caveat) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "WriteCaveats")
defer span.End()

deletedCaveatClause := sq.Or{}
write := writeCaveat
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
write = writeCaveatDeprecated
}
writtenCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name})
valuesToWrite := []any{caveat.Name, caveat.Expression}
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
valuesToWrite = append(valuesToWrite, rwt.newXID.Uint)
}
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// mark current caveats as deleted
err := rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause)
if err != nil {
return err
}

// store the new caveat revision
sql, args, err := write.ToSql()
if err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf("unable to write new caveat revision: %w", err)
}
return nil
}

func (rwt *pgReadWriteTXN) DeleteCaveats(caveats []*core.Caveat) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "DeleteCaveats")
defer span.End()

deletedCaveatClause := sq.Or{}
deletedCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
deletedCaveatClause = append(deletedCaveatClause, sq.Eq{colCaveatName: caveat.Name})
deletedCaveatNames = append(deletedCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(deletedCaveatNames))

// mark current caveats as deleted
return rwt.deleteCaveatsWithClause(ctx, deletedCaveatClause)
}

func (rwt *pgReadWriteTXN) deleteCaveatsWithClause(ctx context.Context, deleteClauses sq.Or) error {
sql, args, err := deleteCaveat.
Set(colDeletedXid, rwt.newXID).
Where(sq.And{sq.Eq{colDeletedXid: liveDeletedTxnID}, deleteClauses}).
ToSql()
if err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}

// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
baseQuery := deleteCaveat
if rwt.migrationPhase == writeBothReadOld {
baseQuery = deleteCaveatDeprecated
}

sql, args, err = baseQuery.
Where(deleteClauses).
Set(colDeletedTxnDeprecated, rwt.newXID.Uint).
Set(colDeletedXid, rwt.newXID).
ToSql()
if err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}
}

if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf("unable to mark previous caveat revisions as deleted: %w", err)
}
return nil
}
57 changes: 43 additions & 14 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package common

import (
"context"
"database/sql"
"fmt"

"github.com/authzed/spicedb/internal/logging"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/log/zerologadapter"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/structpb"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
)
Expand All @@ -21,7 +22,7 @@ const (
)

// NewPGXExecutor creates an executor that uses the pgx library to make the specified queries.
func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
func NewPGXExecutor(txSource TxFactory, caveatEnabled bool) common.ExecuteQueryFunc {
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, sql string, args []any) ([]*corev1.RelationTuple, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

Expand All @@ -32,14 +33,14 @@ func NewPGXExecutor(txSource TxFactory) common.ExecuteQueryFunc {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}
defer txCleanup(ctx)
return queryTuples(ctx, sql, args, span, tx)
return queryTuples(ctx, sql, args, span, tx, caveatEnabled)
}
}

// queryTuples queries tuples for the given query and transaction.
func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, tx pgx.Tx) ([]*corev1.RelationTuple, error) {
func queryTuples(ctx context.Context, sqlStatement string, args []any, span trace.Span, tx pgx.Tx, caveatEnabled bool) ([]*corev1.RelationTuple, error) {
span.AddEvent("DB transaction established")
rows, err := tx.Query(ctx, sql, args...)
rows, err := tx.Query(ctx, sqlStatement, args...)
if err != nil {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}
Expand All @@ -53,18 +54,46 @@ func queryTuples(ctx context.Context, sql string, args []any, span trace.Span, t
ResourceAndRelation: &corev1.ObjectAndRelation{},
Subject: &corev1.ObjectAndRelation{},
}
err := rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
var caveatName sql.NullString
var caveatCtx map[string]any
var err error
// TODO(vroldanbet) deprecate bool flag once CRDB also supports caveat
if caveatEnabled {
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
&caveatName,
&caveatCtx,
)
} else {
err = rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
&nextTuple.ResourceAndRelation.Relation,
&nextTuple.Subject.Namespace,
&nextTuple.Subject.ObjectId,
&nextTuple.Subject.Relation,
)
}
if err != nil {
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}

if caveatEnabled && caveatName.String != "" {
caveatStruct, err := structpb.NewStruct(caveatCtx)
if err != nil {
return nil, fmt.Errorf("unable to fetch caveat context: %w", err)
}
nextTuple.Caveat = &corev1.ContextualizedCaveat{
CaveatName: caveatName.String,
Context: caveatStruct,
}
}

tuples = append(tuples, nextTuple)
}
if err := rows.Err(); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions internal/datastore/postgres/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/authzed/spicedb/pkg/migrate"
)

const errUnableToInstantiate = "unable to instantiate AlembicPostgresDriver: %w"

const postgresMissingTableErrorCode = "42P01"

// AlembicPostgresDriver implements a schema migration facility for use in
Expand All @@ -28,12 +26,12 @@ type AlembicPostgresDriver struct {
func NewAlembicPostgresDriver(url string) (*AlembicPostgresDriver, error) {
connectStr, err := pq.ParseURL(url)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

db, err := pgx.Connect(context.Background(), connectStr)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
return nil, err
}

return &AlembicPostgresDriver{db}, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v4"
)

var caveatStatements = []string{
`CREATE TABLE caveat (
name VARCHAR NOT NULL,
expression BYTEA NOT NULL,
created_transaction BIGINT NOT NULL,
deleted_transaction BIGINT NOT NULL DEFAULT '9223372036854775807',
CONSTRAINT pk_caveat_v1 PRIMARY KEY (name, deleted_transaction),
CONSTRAINT uq_caveat_v1 UNIQUE (name, created_transaction, deleted_transaction));`,
`ALTER TABLE relation_tuple
ADD COLUMN caveat_name VARCHAR,
ADD COLUMN caveat_context JSONB;`,
}

func init() {
if err := DatabaseMigrations.Register("add-caveats", "add-ns-config-id",
noNonatomicMigration,
func(ctx context.Context, tx pgx.Tx) error {
for _, stmt := range caveatStatements {
if _, err := tx.Exec(ctx, stmt); err != nil {
return err
}
}

return nil
}); err != nil {
panic("failed to register migration: " + err.Error())
}
}
Loading