Skip to content

Commit

Permalink
Merge pull request #940 from jzelinskie/delete-ns-list
Browse files Browse the repository at this point in the history
datastore: DeleteNamespace => DeleteNamespaces
  • Loading branch information
jzelinskie authored Oct 31, 2022
2 parents c56b0f9 + afc2b3d commit f529619
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 90 deletions.
31 changes: 19 additions & 12 deletions internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,27 @@ func (rwt *crdbReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...
return nil
}

func (rwt *crdbReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) error {
_, timestamp, err := loadNamespace(ctx, rwt.tx, nsName)
if err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return err
func (rwt *crdbReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
// For each namespace, check they exist and collect predicates for the
// "WHERE" clause to delete the namespaces and associated tuples.
nsClauses := make([]sq.Sqlizer, 0, len(nsNames))
tplClauses := make([]sq.Sqlizer, 0, len(nsNames))
for _, nsName := range nsNames {
_, timestamp, err := loadNamespace(ctx, rwt.tx, nsName)
if err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return err
}
return fmt.Errorf(errUnableToDeleteConfig, err)
}

for _, nsName := range nsNames {
nsClauses = append(nsClauses, sq.Eq{colNamespace: nsName, colTimestamp: timestamp})
tplClauses = append(tplClauses, sq.Eq{colNamespace: nsName})
}
return fmt.Errorf(errUnableToDeleteConfig, err)
}

delSQL, delArgs, err := queryDeleteNamespace.
Where(sq.Eq{colNamespace: nsName, colTimestamp: timestamp}).
ToSql()
delSQL, delArgs, err := queryDeleteNamespace.Where(sq.Or(nsClauses)).ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}
Expand All @@ -271,9 +280,7 @@ func (rwt *crdbReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string)
return fmt.Errorf(errUnableToDeleteConfig, err)
}

deleteTupleSQL, deleteTupleArgs, err := queryDeleteTuples.
Where(sq.Eq{colNamespace: nsName}).
ToSql()
deleteTupleSQL, deleteTupleArgs, err := queryDeleteTuples.Where(sq.Or(tplClauses)).ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}
Expand Down
34 changes: 18 additions & 16 deletions internal/datastore/memdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (rwt *memdbReadWriteTx) WriteNamespaces(ctx context.Context, newConfigs ...
return nil
}

func (rwt *memdbReadWriteTx) DeleteNamespace(ctx context.Context, nsName string) error {
func (rwt *memdbReadWriteTx) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
rwt.lockOrPanic()
defer rwt.Unlock()

Expand All @@ -173,24 +173,26 @@ func (rwt *memdbReadWriteTx) DeleteNamespace(ctx context.Context, nsName string)
return err
}

foundRaw, err := tx.First(tableNamespace, indexID, nsName)
if err != nil {
return err
}
for _, nsName := range nsNames {
foundRaw, err := tx.First(tableNamespace, indexID, nsName)
if err != nil {
return err
}

if foundRaw == nil {
return fmt.Errorf("unable to find namespace to delete")
}
if foundRaw == nil {
return fmt.Errorf("unable to find namespace to delete")
}

if err := tx.Delete(tableNamespace, foundRaw); err != nil {
return err
}
if err := tx.Delete(tableNamespace, foundRaw); err != nil {
return err
}

// Delete the relationships from the namespace
if err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
ResourceType: nsName,
}); err != nil {
return fmt.Errorf("unable to delete relationships from deleted namespace: %w", err)
// Delete the relationships from the namespace
if err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
ResourceType: nsName,
}); err != nil {
return fmt.Errorf("unable to delete relationships from deleted namespace: %w", err)
}
}

return nil
Expand Down
35 changes: 23 additions & 12 deletions internal/datastore/mysql/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,34 @@ func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces
return nil
}

func (rwt *mysqlReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) error {
func (rwt *mysqlReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor

baseQuery := rwt.ReadNamespaceQuery.Where(sq.Eq{colDeletedTxn: liveDeletedTxnID})
_, createdAt, err := loadNamespace(ctx, nsName, rwt.tx, baseQuery)
switch {
case errors.As(err, &datastore.ErrNamespaceNotFound{}):
return err
case err == nil:
break
default:
return fmt.Errorf(errUnableToDeleteConfig, err)
// For each namespace, check they exist and collect predicates for the
// "WHERE" clause to delete the namespaces and associated tuples.
nsClauses := make([]sq.Sqlizer, 0, len(nsNames))
tplClauses := make([]sq.Sqlizer, 0, len(nsNames))
for _, nsName := range nsNames {
// TODO(jzelinskie): check these in one query
baseQuery := rwt.ReadNamespaceQuery.Where(sq.Eq{colDeletedTxn: liveDeletedTxnID})
_, createdAt, err := loadNamespace(ctx, nsName, rwt.tx, baseQuery)
switch {
case errors.As(err, &datastore.ErrNamespaceNotFound{}):
// TODO(jzelinskie): return the name of the missing namespace
return err
case err == nil:
break
default:
return fmt.Errorf(errUnableToDeleteConfig, err)
}

nsClauses = append(nsClauses, sq.Eq{colNamespace: nsName, colCreatedTxn: createdAt})
tplClauses = append(tplClauses, sq.Eq{colNamespace: nsName})
}

delSQL, delArgs, err := rwt.DeleteNamespaceQuery.
Set(colDeletedTxn, rwt.newTxnID).
Where(sq.Eq{colNamespace: nsName, colCreatedTxn: createdAt}).
Where(sq.Or(nsClauses)).
ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
Expand All @@ -263,7 +274,7 @@ func (rwt *mysqlReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string

deleteTupleSQL, deleteTupleArgs, err := rwt.DeleteNamespaceTuplesQuery.
Set(colDeletedTxn, rwt.newTxnID).
Where(sq.Eq{colNamespace: nsName}).
Where(sq.Or(tplClauses)).
ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ func XIDMigrationAssumptionsTest(t *testing.T, b testdatastore.RunningEngineForT
ResourceType: "one_namespace",
}))

require.NoError(rwt.DeleteNamespace(ctx, "one_namespace"))
require.NoError(rwt.DeleteNamespaces(ctx, "one_namespace"))

require.NoError(rwt.WriteNamespaces(ctx, namespace.Namespace(
"three_namespace", namespace.Relation("parent", nil, nil))))
Expand Down Expand Up @@ -955,7 +955,7 @@ func XIDMigrationAssumptionsTest(t *testing.T, b testdatastore.RunningEngineForT
ResourceType: "two_namespace",
}))

require.NoError(rwt.DeleteNamespace(ctx, "two_namespace"))
require.NoError(rwt.DeleteNamespaces(ctx, "two_namespace"))

require.NoError(rwt.WriteNamespaces(ctx, namespace.Namespace(
"four_namespace", namespace.Relation("parent", nil, nil))))
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func XIDMigrationAssumptionsTest(t *testing.T, b testdatastore.RunningEngineForT
ResourceType: "three_namespace",
}))

require.NoError(rwt.DeleteNamespace(ctx, "three_namespace"))
require.NoError(rwt.DeleteNamespaces(ctx, "three_namespace"))

require.NoError(rwt.WriteNamespaces(ctx, namespace.Namespace(
"five_namespace", namespace.Relation("parent", nil, nil))))
Expand Down
47 changes: 29 additions & 18 deletions internal/datastore/postgres/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (rwt *pgReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*c
return nil
}

func (rwt *pgReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) error {
func (rwt *pgReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
filterer := func(original sq.SelectBuilder) sq.SelectBuilder {
return original.Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
}
Expand All @@ -304,35 +304,46 @@ func (rwt *pgReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) e
}
}

_, createdAt, err := rwt.loadNamespace(ctx, nsName, rwt.tx, filterer)
switch {
case errors.As(err, &datastore.ErrNamespaceNotFound{}):
return err
case err == nil:
break
default:
return fmt.Errorf(errUnableToDeleteConfig, err)
nsClauses := make([]sq.Sqlizer, 0, len(nsNames))
tplClauses := make([]sq.Sqlizer, 0, len(nsNames))
migrationClauses := make([]sq.Sqlizer, 0, len(nsNames))
for _, nsName := range nsNames {
_, createdAt, err := rwt.loadNamespace(ctx, nsName, rwt.tx, filterer)
switch {
case errors.As(err, &datastore.ErrNamespaceNotFound{}):
return err
case err == nil:
break
default:
return fmt.Errorf(errUnableToDeleteConfig, err)
}

nsClauses = append(nsClauses, sq.Eq{colNamespace: nsName, colCreatedXid: createdAt.tx})
tplClauses = append(tplClauses, sq.Eq{colNamespace: nsName})

// TODO remove once the ID->XID migrations are all complete
switch rwt.migrationPhase {
case writeBothReadOld:
migrationClauses = append(migrationClauses, sq.Eq{colNamespace: nsName, colCreatedTxnDeprecated: createdAt.tx.Uint})
case writeBothReadNew:
migrationClauses = nsClauses
}
}

delSQL, delArgs, err := deleteNamespace.
Set(colDeletedXid, rwt.newXID).
Where(sq.Eq{colNamespace: nsName, colCreatedXid: createdAt.tx}).
Where(sq.Or(nsClauses)).
ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
whereClause := sq.Eq{colNamespace: nsName, colCreatedXid: createdAt.tx}
if rwt.migrationPhase == writeBothReadOld {
whereClause = sq.Eq{colNamespace: nsName, colCreatedTxnDeprecated: createdAt.tx.Uint}
}

delSQL, delArgs, err = deleteNamespace.
Set(colDeletedTxnDeprecated, rwt.newXID.Uint).
Set(colDeletedXid, rwt.newXID).
Where(whereClause).
Where(sq.Or(migrationClauses)).
ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
Expand All @@ -346,7 +357,7 @@ func (rwt *pgReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) e

deleteTupleSQL, deleteTupleArgs, err := deleteNamespaceTuples.
Set(colDeletedXid, rwt.newXID).
Where(sq.Eq{colNamespace: nsName}).
Where(sq.Or(tplClauses)).
ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
Expand All @@ -357,7 +368,7 @@ func (rwt *pgReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) e
deleteTupleSQL, deleteTupleArgs, err = deleteNamespaceTuples.
Set(colDeletedTxnDeprecated, rwt.newXID.Uint).
Set(colDeletedXid, rwt.newXID).
Where(sq.Eq{colNamespace: nsName}).
Where(sq.Or(tplClauses)).
ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,14 @@ func (rwt *observableRWT) WriteNamespaces(ctx context.Context, newConfigs ...*co
return rwt.delegate.WriteNamespaces(ctx, newConfigs...)
}

func (rwt *observableRWT) DeleteNamespace(ctx context.Context, nsName string) error {
func (rwt *observableRWT) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
var span trace.Span
ctx, span = tracer.Start(ctx, "DeleteNamespace", trace.WithAttributes(
attribute.String("name", nsName),
attribute.StringSlice("names", nsNames),
))
defer span.End()

return rwt.delegate.DeleteNamespace(ctx, nsName)
return rwt.delegate.DeleteNamespaces(ctx, nsNames...)
}

func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
Expand Down
9 changes: 7 additions & 2 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,13 @@ func (dm *MockReadWriteTransaction) WriteNamespaces(ctx context.Context, newConf
return args.Error(0)
}

func (dm *MockReadWriteTransaction) DeleteNamespace(ctx context.Context, nsName string) error {
args := dm.Called(nsName)
func (dm *MockReadWriteTransaction) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
xs := make([]any, 0, len(nsNames))
for _, nsName := range nsNames {
xs = append(xs, nsName)
}

args := dm.Called(xs...)
return args.Error(0)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/readonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestRWOperationErrors(t *testing.T) {
ctx := context.Background()

rev, err := ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error {
return rwt.DeleteNamespace(ctx, "fake")
return rwt.DeleteNamespaces(ctx, "fake")
})
require.ErrorAs(err, &datastore.ErrReadOnly{})
require.Equal(datastore.NoRevision, rev)
Expand Down
26 changes: 14 additions & 12 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,21 +238,23 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs .
return rwt.spannerRWT.BufferWrite(mutations)
}

func (rwt spannerReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) error {
if err := deleteWithFilter(ctx, rwt.spannerRWT, &v1.RelationshipFilter{
ResourceType: nsName,
}); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}
func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
for _, nsName := range nsNames {
if err := deleteWithFilter(ctx, rwt.spannerRWT, &v1.RelationshipFilter{
ResourceType: nsName,
}); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

err := rwt.spannerRWT.BufferWrite([]*spanner.Mutation{
spanner.Delete(tableNamespace, spanner.KeySetFromKeys(spanner.Key{nsName})),
})
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
err := rwt.spannerRWT.BufferWrite([]*spanner.Mutation{
spanner.Delete(tableNamespace, spanner.KeySetFromKeys(spanner.Key{nsName})),
})
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}
}

return err
return nil
}

var _ datastore.ReadWriteTransaction = spannerReadWriteTXN{}
8 changes: 4 additions & 4 deletions internal/services/v1/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque
})

// Delete the removed namespaces.
if err := removedObjectDefNames.ForEach(func(value string) error {
return rwt.DeleteNamespace(ctx, value)
}); err != nil {
return err
if removedObjectDefNames.Len() > 0 {
if err := rwt.DeleteNamespaces(ctx, removedObjectDefNames.AsSlice()...); err != nil {
return err
}
}

// Delete the removed caveats.
Expand Down
2 changes: 1 addition & 1 deletion internal/services/v1alpha1/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestSchemaReadDeleteAndFailWrite(t *testing.T) {
// Issue a delete out of band for the namespace.
_, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error {
for _, nsName := range requestedObjectDefNames {
derr := rwt.DeleteNamespace(ctx, nsName)
derr := rwt.DeleteNamespaces(ctx, nsName)
if derr != nil {
return derr
}
Expand Down
4 changes: 2 additions & 2 deletions internal/testfixtures/validating.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (vrwt validatingReadWriteTransaction) WriteNamespaces(ctx context.Context,
return vrwt.delegate.WriteNamespaces(ctx, newConfigs...)
}

func (vrwt validatingReadWriteTransaction) DeleteNamespace(ctx context.Context, nsName string) error {
return vrwt.delegate.DeleteNamespace(ctx, nsName)
func (vrwt validatingReadWriteTransaction) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
return vrwt.delegate.DeleteNamespaces(ctx, nsNames...)
}

func (vrwt validatingReadWriteTransaction) WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ type ReadWriteTransaction interface {
// WriteNamespaces takes proto namespace definitions and persists them.
WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error

// DeleteNamespace deletes a namespace and any associated tuples.
DeleteNamespace(ctx context.Context, nsName string) error
// DeleteNamespaces deletes namespaces including associated relationships.
DeleteNamespaces(ctx context.Context, nsNames ...string) error
}

// TxUserFunc is a type for the function that users supply when they invoke a read-write transaction.
Expand Down
Loading

0 comments on commit f529619

Please sign in to comment.