diff --git a/acp/identity/identity.go b/acp/identity/identity.go index 4dee93deba..845a65577b 100644 --- a/acp/identity/identity.go +++ b/acp/identity/identity.go @@ -18,6 +18,7 @@ import ( "github.com/cyware/ssi-sdk/did/key" "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/lestrrat-go/jwx/v2/jwa" + "github.com/lestrrat-go/jwx/v2/jws" "github.com/lestrrat-go/jwx/v2/jwt" "github.com/sourcenetwork/immutable" acptypes "github.com/sourcenetwork/sourcehub/x/acp/bearer_token" @@ -127,6 +128,28 @@ func (identity *Identity) UpdateToken( audience immutable.Option[string], authorizedAccount immutable.Option[string], ) error { + signedToken, err := identity.NewToken(duration, audience, authorizedAccount) + if err != nil { + return err + } + + identity.BearerToken = string(signedToken) + return nil +} + +// NewToken creates and returns a new `BearerToken`. +// +// - duration: The [time.Duration] that this identity is valid for. +// - audience: The audience that this identity is valid for. This is required +// by the Defra http client. For example `github.com/sourcenetwork/defradb` +// - authorizedAccount: An account that this identity is authorizing to make +// SourceHub calls on behalf of this actor. This is currently required when +// using SourceHub ACP. +func (identity Identity) NewToken( + duration time.Duration, + audience immutable.Option[string], + authorizedAccount immutable.Option[string], +) ([]byte, error) { var signedToken []byte subject := hex.EncodeToString(identity.PublicKey.SerializeCompressed()) now := time.Now() @@ -144,21 +167,39 @@ func (identity *Identity) UpdateToken( token, err := jwtBuilder.Build() if err != nil { - return err + return nil, err } if authorizedAccount.HasValue() { err = token.Set(acptypes.AuthorizedAccountClaim, authorizedAccount.Value()) if err != nil { - return err + return nil, err } } signedToken, err = jwt.Sign(token, jwt.WithKey(BearerTokenSignatureScheme, identity.PrivateKey.ToECDSA())) + if err != nil { + return nil, err + } + + return signedToken, nil +} + +// VerifyAuthToken verifies that the jwt auth token is valid and that the signature +// matches the identity of the subject. +func VerifyAuthToken(ident Identity, audience string) error { + _, err := jwt.Parse([]byte(ident.BearerToken), jwt.WithVerify(false), jwt.WithAudience(audience)) + if err != nil { + return err + } + + _, err = jws.Verify( + []byte(ident.BearerToken), + jws.WithKey(BearerTokenSignatureScheme, ident.PublicKey.ToECDSA()), + ) if err != nil { return err } - identity.BearerToken = string(signedToken) return nil } diff --git a/http/auth.go b/http/auth.go index 79f4262252..544b2fbe96 100644 --- a/http/auth.go +++ b/http/auth.go @@ -14,8 +14,6 @@ import ( "net/http" "strings" - "github.com/lestrrat-go/jwx/v2/jws" - "github.com/lestrrat-go/jwx/v2/jwt" "github.com/sourcenetwork/immutable" acpIdentity "github.com/sourcenetwork/defradb/acp/identity" @@ -30,24 +28,6 @@ const ( authSchemaPrefix = "Bearer " ) -// verifyAuthToken verifies that the jwt auth token is valid and that the signature -// matches the identity of the subject. -func verifyAuthToken(identity acpIdentity.Identity, audience string) error { - _, err := jwt.Parse([]byte(identity.BearerToken), jwt.WithVerify(false), jwt.WithAudience(audience)) - if err != nil { - return err - } - - _, err = jws.Verify( - []byte(identity.BearerToken), - jws.WithKey(acpIdentity.BearerTokenSignatureScheme, identity.PublicKey.ToECDSA()), - ) - if err != nil { - return err - } - return nil -} - // AuthMiddleware authenticates an actor and sets their identity for all subsequent actions. func AuthMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { @@ -63,7 +43,7 @@ func AuthMiddleware(next http.Handler) http.Handler { return } - err = verifyAuthToken(ident, strings.ToLower(req.Host)) + err = acpIdentity.VerifyAuthToken(ident, strings.ToLower(req.Host)) if err != nil { http.Error(rw, "forbidden", http.StatusForbidden) return diff --git a/http/auth_test.go b/http/auth_test.go index 365ed98c89..0fa46e5c81 100644 --- a/http/auth_test.go +++ b/http/auth_test.go @@ -34,7 +34,7 @@ func TestVerifyAuthToken(t *testing.T) { err = identity.UpdateToken(time.Hour, immutable.Some(audience), immutable.None[string]()) require.NoError(t, err) - err = verifyAuthToken(identity, audience) + err = acpIdentity.VerifyAuthToken(identity, audience) require.NoError(t, err) } @@ -48,7 +48,7 @@ func TestVerifyAuthTokenErrorsWithNonMatchingAudience(t *testing.T) { err = identity.UpdateToken(time.Hour, immutable.Some("valid"), immutable.None[string]()) require.NoError(t, err) - err = verifyAuthToken(identity, "invalid") + err = acpIdentity.VerifyAuthToken(identity, "invalid") assert.Error(t, err) } @@ -65,6 +65,6 @@ func TestVerifyAuthTokenErrorsWithExpired(t *testing.T) { err = identity.UpdateToken(-time.Hour, immutable.Some(audience), immutable.None[string]()) require.NoError(t, err) - err = verifyAuthToken(identity, "123abc") + err = acpIdentity.VerifyAuthToken(identity, "123abc") assert.Error(t, err) } diff --git a/internal/db/backup.go b/internal/db/backup.go index e41a29178d..89ca19fd0a 100644 --- a/internal/db/backup.go +++ b/internal/db/backup.go @@ -21,7 +21,7 @@ import ( "github.com/sourcenetwork/defradb/client/request" ) -func (db *db) basicImport(ctx context.Context, filepath string) (err error) { +func (db *DB) basicImport(ctx context.Context, filepath string) (err error) { f, err := os.Open(filepath) if err != nil { return NewErrOpenFile(err, filepath) @@ -115,7 +115,7 @@ func (db *db) basicImport(ctx context.Context, filepath string) (err error) { return nil } -func (db *db) basicExport(ctx context.Context, config *client.BackupConfig) (err error) { +func (db *DB) basicExport(ctx context.Context, config *client.BackupConfig) (err error) { // old key -> new Key keyChangeCache := map[string]string{} diff --git a/internal/db/collection.go b/internal/db/collection.go index 70b6b536b8..88b455668f 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -42,7 +42,7 @@ var _ client.Collection = (*collection)(nil) // collection stores data records at Documents, which are gathered // together under a collection name. This is analogous to SQL Tables. type collection struct { - db *db + db *DB def client.CollectionDefinition indexes []CollectionIndex fetcherFactory func() fetcher.Fetcher @@ -55,7 +55,7 @@ type collection struct { // CollectionOptions object. // newCollection returns a pointer to a newly instantiated DB Collection -func (db *db) newCollection(desc client.CollectionDescription, schema client.SchemaDescription) *collection { +func (db *DB) newCollection(desc client.CollectionDescription, schema client.SchemaDescription) *collection { return &collection{ db: db, def: client.CollectionDefinition{Description: desc, Schema: schema}, @@ -77,7 +77,7 @@ func (c *collection) newFetcher() fetcher.Fetcher { return lens.NewFetcher(innerFetcher, c.db.LensRegistry()) } -func (db *db) getCollectionByID(ctx context.Context, id uint32) (client.Collection, error) { +func (db *DB) getCollectionByID(ctx context.Context, id uint32) (client.Collection, error) { txn := mustGetContextTxn(ctx) col, err := description.GetCollectionByID(ctx, txn, id) @@ -101,7 +101,7 @@ func (db *db) getCollectionByID(ctx context.Context, id uint32) (client.Collecti } // getCollectionByName returns an existing collection within the database. -func (db *db) getCollectionByName(ctx context.Context, name string) (client.Collection, error) { +func (db *DB) getCollectionByName(ctx context.Context, name string) (client.Collection, error) { if name == "" { return nil, ErrCollectionNameEmpty } @@ -120,7 +120,7 @@ func (db *db) getCollectionByName(ctx context.Context, name string) (client.Coll // // Inactive collections are not returned by default unless a specific schema version ID // is provided. -func (db *db) getCollections( +func (db *DB) getCollections( ctx context.Context, options client.CollectionFetchOptions, ) ([]client.Collection, error) { @@ -219,7 +219,7 @@ func (db *db) getCollections( } // getAllActiveDefinitions returns all queryable collection/views and any embedded schema used by them. -func (db *db) getAllActiveDefinitions(ctx context.Context) ([]client.CollectionDefinition, error) { +func (db *DB) getAllActiveDefinitions(ctx context.Context) ([]client.CollectionDefinition, error) { txn := mustGetContextTxn(ctx) cols, err := description.GetActiveCollections(ctx, txn) diff --git a/internal/db/collection_define.go b/internal/db/collection_define.go index 8f49b9a970..066b89741b 100644 --- a/internal/db/collection_define.go +++ b/internal/db/collection_define.go @@ -23,7 +23,7 @@ import ( "github.com/sourcenetwork/defradb/internal/db/description" ) -func (db *db) createCollections( +func (db *DB) createCollections( ctx context.Context, newDefinitions []client.CollectionDefinition, ) ([]client.CollectionDefinition, error) { @@ -112,7 +112,7 @@ func (db *db) createCollections( return returnDescriptions, nil } -func (db *db) patchCollection( +func (db *DB) patchCollection( ctx context.Context, patchString string, ) error { @@ -224,7 +224,7 @@ func (db *db) patchCollection( // provided. This includes GQL queries and Collection operations. // // It will return an error if the provided schema version ID does not exist. -func (db *db) setActiveSchemaVersion( +func (db *DB) setActiveSchemaVersion( ctx context.Context, schemaVersionID string, ) error { @@ -311,7 +311,7 @@ func (db *db) setActiveSchemaVersion( return db.loadSchema(ctx) } -func (db *db) getActiveCollectionDown( +func (db *DB) getActiveCollectionDown( ctx context.Context, colsByID map[uint32]client.CollectionDescription, id uint32, @@ -338,7 +338,7 @@ func (db *db) getActiveCollectionDown( return db.getActiveCollectionDown(ctx, colsByID, sources[0].SourceCollectionID) } -func (db *db) getActiveCollectionUp( +func (db *DB) getActiveCollectionUp( ctx context.Context, colsBySourceID map[uint32][]client.CollectionDescription, id uint32, diff --git a/internal/db/collection_id.go b/internal/db/collection_id.go index 84edcbb1c4..148bf40bd9 100644 --- a/internal/db/collection_id.go +++ b/internal/db/collection_id.go @@ -21,7 +21,7 @@ import ( ) // setCollectionIDs sets the IDs on a collection description, including field IDs, mutating the input set. -func (db *db) setCollectionIDs(ctx context.Context, newCollections []client.CollectionDefinition) error { +func (db *DB) setCollectionIDs(ctx context.Context, newCollections []client.CollectionDefinition) error { err := db.setCollectionID(ctx, newCollections) if err != nil { return err @@ -32,7 +32,7 @@ func (db *db) setCollectionIDs(ctx context.Context, newCollections []client.Coll // setCollectionID sets the IDs directly on a collection description, excluding stuff like field IDs, // mutating the input set. -func (db *db) setCollectionID(ctx context.Context, newCollections []client.CollectionDefinition) error { +func (db *DB) setCollectionID(ctx context.Context, newCollections []client.CollectionDefinition) error { colSeq, err := db.getSequence(ctx, keys.CollectionIDSequenceKey{}) if err != nil { return err @@ -64,7 +64,7 @@ func (db *db) setCollectionID(ctx context.Context, newCollections []client.Colle } // setFieldIDs sets the field IDs hosted on the given collections, mutating the input set. -func (db *db) setFieldIDs(ctx context.Context, definitions []client.CollectionDefinition) error { +func (db *DB) setFieldIDs(ctx context.Context, definitions []client.CollectionDefinition) error { collectionsByName := map[string]client.CollectionDescription{} schemasByName := map[string]client.SchemaDescription{} for _, def := range definitions { diff --git a/internal/db/collection_index.go b/internal/db/collection_index.go index 49e796f824..7347ff3348 100644 --- a/internal/db/collection_index.go +++ b/internal/db/collection_index.go @@ -32,7 +32,7 @@ import ( ) // createCollectionIndex creates a new collection index and saves it to the database in its system store. -func (db *db) createCollectionIndex( +func (db *DB) createCollectionIndex( ctx context.Context, collectionName string, desc client.IndexDescriptionCreateRequest, @@ -44,7 +44,7 @@ func (db *db) createCollectionIndex( return col.CreateIndex(ctx, desc) } -func (db *db) dropCollectionIndex( +func (db *DB) dropCollectionIndex( ctx context.Context, collectionName, indexName string, ) error { @@ -56,7 +56,7 @@ func (db *db) dropCollectionIndex( } // getAllIndexDescriptions returns all the index descriptions in the database. -func (db *db) getAllIndexDescriptions( +func (db *DB) getAllIndexDescriptions( ctx context.Context, ) (map[client.CollectionName][]client.IndexDescription, error) { // callers of this function must set a context transaction @@ -92,7 +92,7 @@ func (db *db) getAllIndexDescriptions( return indexes, nil } -func (db *db) fetchCollectionIndexDescriptions( +func (db *DB) fetchCollectionIndexDescriptions( ctx context.Context, colID uint32, ) ([]client.IndexDescription, error) { diff --git a/internal/db/db.go b/internal/db/db.go index 630bd0ae43..c3bb574da9 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -54,9 +54,8 @@ const ( eventBufferSize = 100 ) -// DB is the main interface for interacting with the -// DefraDB storage system. -type db struct { +// DB is the main struct for DefraDB's storage layer. +type DB struct { glock sync.RWMutex rootstore datastore.Rootstore @@ -97,6 +96,8 @@ type db struct { retryIntervals []time.Duration } +var _ client.DB = (*DB)(nil) + // NewDB creates a new instance of the DB using the given options. func NewDB( ctx context.Context, @@ -104,7 +105,7 @@ func NewDB( acp immutable.Option[acp.ACP], lens client.LensRegistry, options ...Option, -) (client.DB, error) { +) (*DB, error) { return newDB(ctx, rootstore, acp, lens, options...) } @@ -114,7 +115,7 @@ func newDB( acp immutable.Option[acp.ACP], lens client.LensRegistry, options ...Option, -) (*db, error) { +) (*DB, error) { multistore := datastore.MultiStoreFrom(rootstore) parser, err := graphql.NewParser() @@ -129,7 +130,7 @@ func newDB( ctx, cancel := context.WithCancel(ctx) - db := &db{ + db := &DB{ rootstore: rootstore, multistore: multistore, acp: acp, @@ -167,47 +168,47 @@ func newDB( } // NewTxn creates a new transaction. -func (db *db) NewTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { +func (db *DB) NewTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { txnId := db.previousTxnID.Add(1) return datastore.NewTxnFrom(ctx, db.rootstore, txnId, readonly) } // NewConcurrentTxn creates a new transaction that supports concurrent API calls. -func (db *db) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { +func (db *DB) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { txnId := db.previousTxnID.Add(1) return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, txnId, readonly) } // Rootstore returns the root datastore. -func (db *db) Rootstore() datastore.Rootstore { +func (db *DB) Rootstore() datastore.Rootstore { return db.rootstore } // Blockstore returns the internal DAG store which contains IPLD blocks. -func (db *db) Blockstore() datastore.Blockstore { +func (db *DB) Blockstore() datastore.Blockstore { return db.multistore.Blockstore() } // Encstore returns the internal enc store which contains encryption key for documents and their fields. -func (db *db) Encstore() datastore.Blockstore { +func (db *DB) Encstore() datastore.Blockstore { return db.multistore.Encstore() } // Peerstore returns the internal DAG store which contains IPLD blocks. -func (db *db) Peerstore() datastore.DSReaderWriter { +func (db *DB) Peerstore() datastore.DSReaderWriter { return db.multistore.Peerstore() } // Headstore returns the internal DAG store which contains IPLD blocks. -func (db *db) Headstore() ds.Read { +func (db *DB) Headstore() ds.Read { return db.multistore.Headstore() } -func (db *db) LensRegistry() client.LensRegistry { +func (db *DB) LensRegistry() client.LensRegistry { return db.lensRegistry } -func (db *db) AddPolicy( +func (db *DB) AddPolicy( ctx context.Context, policy string, ) (client.AddPolicyResult, error) { @@ -230,7 +231,7 @@ func (db *db) AddPolicy( // publishDocUpdateEvent publishes an update event for a document. // It uses heads iterator to read the document's head blocks directly from the storage, i.e. without // using a transaction. -func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error { +func (db *DB) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error { headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID) if err != nil { return err @@ -256,7 +257,7 @@ func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collectio return nil } -func (db *db) AddDocActorRelationship( +func (db *DB) AddDocActorRelationship( ctx context.Context, collectionName string, docID string, @@ -301,7 +302,7 @@ func (db *db) AddDocActorRelationship( return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil } -func (db *db) DeleteDocActorRelationship( +func (db *DB) DeleteDocActorRelationship( ctx context.Context, collectionName string, docID string, @@ -339,16 +340,23 @@ func (db *db) DeleteDocActorRelationship( return client.DeleteDocActorRelationshipResult{RecordFound: recordFound}, nil } -func (db *db) GetNodeIdentity(context.Context) (immutable.Option[identity.PublicRawIdentity], error) { +func (db *DB) GetNodeIdentity(_ context.Context) (immutable.Option[identity.PublicRawIdentity], error) { if db.nodeIdentity.HasValue() { return immutable.Some(db.nodeIdentity.Value().IntoRawIdentity().Public()), nil } return immutable.None[identity.PublicRawIdentity](), nil } +func (db *DB) GetNodeIdentityToken(_ context.Context, audience immutable.Option[string]) ([]byte, error) { + if db.nodeIdentity.HasValue() { + return db.nodeIdentity.Value().NewToken(time.Hour*24, audience, immutable.None[string]()) + } + return nil, nil +} + // Initialize is called when a database is first run and creates all the db global meta data // like Collection ID counters. -func (db *db) initialize(ctx context.Context) error { +func (db *DB) initialize(ctx context.Context) error { db.glock.Lock() defer db.glock.Unlock() @@ -405,13 +413,13 @@ func (db *db) initialize(ctx context.Context) error { } // Events returns the events Channel. -func (db *db) Events() *event.Bus { +func (db *DB) Events() *event.Bus { return db.events } // MaxRetries returns the maximum number of retries per transaction. // Defaults to `defaultMaxTxnRetries` if not explicitely set -func (db *db) MaxTxnRetries() int { +func (db *DB) MaxTxnRetries() int { if db.maxTxnRetries.HasValue() { return db.maxTxnRetries.Value() } @@ -419,13 +427,13 @@ func (db *db) MaxTxnRetries() int { } // PrintDump prints the entire database to console. -func (db *db) PrintDump(ctx context.Context) error { +func (db *DB) PrintDump(ctx context.Context) error { return printStore(ctx, db.multistore.Rootstore()) } // Close is called when we are shutting down the database. // This is the place for any last minute cleanup or releasing of resources (i.e.: Badger instance). -func (db *db) Close() { +func (db *DB) Close() { log.Info("Closing DefraDB process...") db.ctxCancel() diff --git a/internal/db/db_test.go b/internal/db/db_test.go index 387be0154d..4c39a0521d 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -21,7 +21,7 @@ import ( "github.com/sourcenetwork/defradb/datastore/memory" ) -func newMemoryDB(ctx context.Context) (*db, error) { +func newMemoryDB(ctx context.Context) (*DB, error) { opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} rootstore, err := badgerds.NewDatastore("", &opts) if err != nil { @@ -30,7 +30,7 @@ func newMemoryDB(ctx context.Context) (*db, error) { return newDB(ctx, rootstore, acp.NoACP, nil) } -func newDefraMemoryDB(ctx context.Context) (*db, error) { +func newDefraMemoryDB(ctx context.Context) (*DB, error) { rootstore := memory.NewDatastore(ctx) return newDB(ctx, rootstore, acp.NoACP, nil) } diff --git a/internal/db/definition_validation.go b/internal/db/definition_validation.go index 1613340e00..565aae64f9 100644 --- a/internal/db/definition_validation.go +++ b/internal/db/definition_validation.go @@ -103,7 +103,7 @@ func newDefinitionState( // validation functions should follow. type definitionValidator = func( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error @@ -175,7 +175,7 @@ var createValidators = append( globalValidators..., ) -func (db *db) validateSchemaUpdate( +func (db *DB) validateSchemaUpdate( ctx context.Context, oldDefinitions []client.CollectionDefinition, newDefinitions []client.CollectionDefinition, @@ -193,7 +193,7 @@ func (db *db) validateSchemaUpdate( return nil } -func (db *db) validateCollectionChanges( +func (db *DB) validateCollectionChanges( ctx context.Context, oldCols []client.CollectionDescription, newColsByID map[uint32]client.CollectionDescription, @@ -216,7 +216,7 @@ func (db *db) validateCollectionChanges( return nil } -func (db *db) validateNewCollection( +func (db *DB) validateNewCollection( ctx context.Context, newDefinitions []client.CollectionDefinition, oldDefinitions []client.CollectionDefinition, @@ -236,7 +236,7 @@ func (db *db) validateNewCollection( func validateRelationPointsToValidKind( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -276,7 +276,7 @@ func validateRelationPointsToValidKind( func validateSecondaryFieldsPairUp( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -341,7 +341,7 @@ func validateSecondaryFieldsPairUp( func validateSingleSidePrimary( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -398,7 +398,7 @@ func validateSingleSidePrimary( func validateCollectionNameUnique( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -419,7 +419,7 @@ func validateCollectionNameUnique( func validateSingleVersionActive( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -445,7 +445,7 @@ func validateSingleVersionActive( // cannot be redirected to other collections. func validateSourcesNotRedefined( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -485,7 +485,7 @@ func validateSourcesNotRedefined( func validateIndexesNotModified( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -506,7 +506,7 @@ func validateIndexesNotModified( func validateFieldsNotModified( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -527,7 +527,7 @@ func validateFieldsNotModified( func validatePolicyNotModified( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -548,7 +548,7 @@ func validatePolicyNotModified( func validateIDNotZero( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -563,7 +563,7 @@ func validateIDNotZero( func validateIDUnique( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -580,7 +580,7 @@ func validateIDUnique( func validateIDExists( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -595,7 +595,7 @@ func validateIDExists( func validateRootIDNotMutated( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -630,7 +630,7 @@ func validateRootIDNotMutated( func validateSchemaVersionIDNotMutated( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -658,7 +658,7 @@ func validateSchemaVersionIDNotMutated( func validateCollectionNotRemoved( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -684,7 +684,7 @@ oldLoop: // this function might also make relevant remote calls using the acp system. func validateCollectionDefinitionPolicyDesc( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -721,7 +721,7 @@ func validateCollectionDefinitionPolicyDesc( func validateSchemaFieldNotDeleted( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -748,7 +748,7 @@ func validateSchemaFieldNotDeleted( func validateTypeAndKindCompatible( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -765,7 +765,7 @@ func validateTypeAndKindCompatible( func validateTypeSupported( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -782,7 +782,7 @@ func validateTypeSupported( func validateFieldNotMoved( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -806,7 +806,7 @@ func validateFieldNotMoved( func validateFieldNotMutated( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -833,7 +833,7 @@ func validateFieldNotMutated( func validateFieldNotDuplicated( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -853,7 +853,7 @@ func validateFieldNotDuplicated( func validateSelfReferences( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -905,7 +905,7 @@ func validateSelfReferences( func validateSecondaryNotOnSchema( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -922,7 +922,7 @@ func validateSecondaryNotOnSchema( func validateRelationalFieldIDType( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -951,7 +951,7 @@ func validateRelationalFieldIDType( func validateSchemaNotAdded( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -972,7 +972,7 @@ func validateSchemaNotAdded( func validateSchemaNameNotEmpty( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -990,7 +990,7 @@ func validateSchemaNameNotEmpty( // Long term we wish to support this, however for now we block it off. func validateCollectionMaterialized( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -1008,7 +1008,7 @@ func validateCollectionMaterialized( // Long term we wish to support this, however for now we block it off. func validateMaterializedHasNoPolicy( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -1023,7 +1023,7 @@ func validateMaterializedHasNoPolicy( func validateCollectionFieldDefaultValue( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { @@ -1043,7 +1043,7 @@ func validateCollectionFieldDefaultValue( // https://github.com/sourcenetwork/defradb/issues/3219 func validateCollectionIsBranchableNotMutated( ctx context.Context, - db *db, + db *DB, newState *definitionState, oldState *definitionState, ) error { diff --git a/internal/db/index_test.go b/internal/db/index_test.go index 705dba38ac..226911e711 100644 --- a/internal/db/index_test.go +++ b/internal/db/index_test.go @@ -58,7 +58,7 @@ const ( type indexTestFixture struct { ctx context.Context - db *db + db *DB txn datastore.Txn users client.Collection t *testing.T diff --git a/internal/db/lens.go b/internal/db/lens.go index 0ad3d55994..b68bce5f34 100644 --- a/internal/db/lens.go +++ b/internal/db/lens.go @@ -22,7 +22,7 @@ import ( "github.com/sourcenetwork/defradb/internal/keys" ) -func (db *db) setMigration(ctx context.Context, cfg client.LensConfig) error { +func (db *DB) setMigration(ctx context.Context, cfg client.LensConfig) error { txn := mustGetContextTxn(ctx) dstCols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, cfg.DestinationSchemaVersionID) diff --git a/internal/db/merge.go b/internal/db/merge.go index c361aa8a2b..96d8ab6c18 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -36,7 +36,7 @@ import ( merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt" ) -func (db *db) executeMerge(ctx context.Context, col *collection, dagMerge event.Merge) error { +func (db *DB) executeMerge(ctx context.Context, col *collection, dagMerge event.Merge) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -154,7 +154,7 @@ type mergeProcessor struct { availableEncryptionBlocks map[cidlink.Link]*coreblock.Encryption } -func (db *db) newMergeProcessor( +func (db *DB) newMergeProcessor( txn datastore.Txn, col *collection, ) (*mergeProcessor, error) { @@ -487,7 +487,7 @@ func (mp *mergeProcessor) initCRDTForType(crdt crdt.CRDT) (merklecrdt.MerkleCRDT } } -func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) { +func getCollectionFromRootSchema(ctx context.Context, db *DB, rootSchema string) (*collection, error) { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return nil, err diff --git a/internal/db/messages.go b/internal/db/messages.go index e980eb7d84..08975ead66 100644 --- a/internal/db/messages.go +++ b/internal/db/messages.go @@ -21,7 +21,7 @@ import ( "github.com/sourcenetwork/defradb/event" ) -func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { +func (db *DB) handleMessages(ctx context.Context, sub *event.Subscription) { docIDQueue := newMergeQueue() schemaRootQueue := newMergeQueue() diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index 7dcdf36c0f..feebdd8a40 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -39,7 +39,7 @@ const ( retryTimeout = 10 * time.Second ) -func (db *db) SetReplicator(ctx context.Context, rep client.ReplicatorParams) error { +func (db *DB) SetReplicator(ctx context.Context, rep client.ReplicatorParams) error { txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -145,7 +145,7 @@ func (db *db) SetReplicator(ctx context.Context, rep client.ReplicatorParams) er return txn.Commit(ctx) } -func (db *db) getDocsHeads( +func (db *DB) getDocsHeads( ctx context.Context, cols []client.Collection, ) <-chan event.Update { @@ -213,7 +213,7 @@ func (db *db) getDocsHeads( return updateChan } -func (db *db) DeleteReplicator(ctx context.Context, rep client.ReplicatorParams) error { +func (db *DB) DeleteReplicator(ctx context.Context, rep client.ReplicatorParams) error { txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -306,7 +306,7 @@ func (db *db) DeleteReplicator(ctx context.Context, rep client.ReplicatorParams) return txn.Commit(ctx) } -func (db *db) GetAllReplicators(ctx context.Context) ([]client.Replicator, error) { +func (db *DB) GetAllReplicators(ctx context.Context) ([]client.Replicator, error) { txn, err := db.NewTxn(ctx, true) if err != nil { return nil, err @@ -333,7 +333,7 @@ func (db *db) GetAllReplicators(ctx context.Context) ([]client.Replicator, error return reps, nil } -func (db *db) loadAndPublishReplicators(ctx context.Context) error { +func (db *DB) loadAndPublishReplicators(ctx context.Context) error { replicators, err := db.GetAllReplicators(ctx) if err != nil { return err @@ -353,7 +353,7 @@ func (db *db) loadAndPublishReplicators(ctx context.Context) error { } // handleReplicatorRetries manages retries for failed replication attempts. -func (db *db) handleReplicatorRetries(ctx context.Context) { +func (db *DB) handleReplicatorRetries(ctx context.Context) { for { select { case <-ctx.Done(): @@ -365,7 +365,7 @@ func (db *db) handleReplicatorRetries(ctx context.Context) { } } -func (db *db) handleReplicatorFailure(ctx context.Context, peerID, docID string) error { +func (db *DB) handleReplicatorFailure(ctx context.Context, peerID, docID string) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -387,7 +387,7 @@ func (db *db) handleReplicatorFailure(ctx context.Context, peerID, docID string) return txn.Commit(ctx) } -func (db *db) handleCompletedReplicatorRetry(ctx context.Context, peerID string, success bool) error { +func (db *DB) handleCompletedReplicatorRetry(ctx context.Context, peerID string, success bool) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -491,7 +491,7 @@ func createIfNotExistsReplicatorRetry( return nil } -func (db *db) retryReplicators(ctx context.Context) { +func (db *DB) retryReplicators(ctx context.Context) { q := query.Query{ Prefix: keys.REPLICATOR_RETRY_ID, } @@ -546,7 +546,7 @@ func (db *db) retryReplicators(ctx context.Context) { } } -func (db *db) setReplicatorAsRetrying(ctx context.Context, key keys.ReplicatorRetryIDKey, rInfo retryInfo) error { +func (db *DB) setReplicatorAsRetrying(ctx context.Context, key keys.ReplicatorRetryIDKey, rInfo retryInfo) error { rInfo.Retrying = true rInfo.NumRetries++ b, err := cbor.Marshal(rInfo) @@ -598,7 +598,7 @@ func setReplicatorNextRetry( // All action within this function are done outside a transaction to always get the most recent data // and post updates as soon as possible. Because of the asyncronous nature of the retryDoc step, there // would be a high chance of unnecessary transaction conflicts. -func (db *db) retryReplicator(ctx context.Context, peerID string) { +func (db *DB) retryReplicator(ctx context.Context, peerID string) { log.InfoContext(ctx, "Retrying replicator", corelog.String("PeerID", peerID)) key := keys.NewReplicatorRetryDocIDKey(peerID, "") q := query.Query{ @@ -642,7 +642,7 @@ func (db *db) retryReplicator(ctx context.Context, peerID string) { } } -func (db *db) retryDoc(ctx context.Context, docID string) error { +func (db *DB) retryDoc(ctx context.Context, docID string) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -728,7 +728,7 @@ func deleteReplicatorRetryIfNoMoreDocs( } // deleteReplicatorRetryAndDocs deletes the replicator retry and all retry docs. -func (db *db) deleteReplicatorRetryAndDocs(ctx context.Context, peerID string) error { +func (db *DB) deleteReplicatorRetryAndDocs(ctx context.Context, peerID string) error { key := keys.NewReplicatorRetryIDKey(peerID) err := db.Peerstore().Delete(ctx, key.ToDS()) if err != nil { diff --git a/internal/db/p2p_replicator_test.go b/internal/db/p2p_replicator_test.go index bad3d46271..db2226e31e 100644 --- a/internal/db/p2p_replicator_test.go +++ b/internal/db/p2p_replicator_test.go @@ -23,7 +23,7 @@ import ( "github.com/sourcenetwork/defradb/event" ) -func waitForPeerInfo(db *db, sub *event.Subscription) { +func waitForPeerInfo(db *DB, sub *event.Subscription) { for msg := range sub.Message() { if msg.Name == event.PeerInfoName { hasPeerInfo := false diff --git a/internal/db/p2p_schema_root.go b/internal/db/p2p_schema_root.go index d42461ba1b..b1df8ae421 100644 --- a/internal/db/p2p_schema_root.go +++ b/internal/db/p2p_schema_root.go @@ -25,7 +25,7 @@ import ( const marker = byte(0xff) -func (db *db) AddP2PCollections(ctx context.Context, collectionIDs []string) error { +func (db *DB) AddP2PCollections(ctx context.Context, collectionIDs []string) error { txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -92,7 +92,7 @@ func (db *db) AddP2PCollections(ctx context.Context, collectionIDs []string) err return txn.Commit(ctx) } -func (db *db) RemoveP2PCollections(ctx context.Context, collectionIDs []string) error { +func (db *DB) RemoveP2PCollections(ctx context.Context, collectionIDs []string) error { txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -151,7 +151,7 @@ func (db *db) RemoveP2PCollections(ctx context.Context, collectionIDs []string) return txn.Commit(ctx) } -func (db *db) GetAllP2PCollections(ctx context.Context) ([]string, error) { +func (db *DB) GetAllP2PCollections(ctx context.Context) ([]string, error) { txn, err := db.NewTxn(ctx, true) if err != nil { return nil, err @@ -178,7 +178,7 @@ func (db *db) GetAllP2PCollections(ctx context.Context) ([]string, error) { return collectionIDs, nil } -func (db *db) PeerInfo() peer.AddrInfo { +func (db *DB) PeerInfo() peer.AddrInfo { peerInfo := db.peerInfo.Load() if peerInfo != nil { return peerInfo.(peer.AddrInfo) @@ -186,7 +186,7 @@ func (db *db) PeerInfo() peer.AddrInfo { return peer.AddrInfo{} } -func (db *db) loadAndPublishP2PCollections(ctx context.Context) error { +func (db *DB) loadAndPublishP2PCollections(ctx context.Context) error { schemaRoots, err := db.GetAllP2PCollections(ctx) if err != nil { return err diff --git a/internal/db/permission/check.go b/internal/db/permission/check.go index 599329855b..33705420a9 100644 --- a/internal/db/permission/check.go +++ b/internal/db/permission/check.go @@ -40,6 +40,42 @@ func CheckAccessOfDocOnCollectionWithACP( collection client.Collection, permission acp.DPIPermission, docID string, +) (bool, error) { + identityFunc := func() immutable.Option[acpIdentity.Identity] { + return identity + } + return CheckDocAccessWithIdentityFunc( + ctx, + identityFunc, + acpSystem, + collection, + permission, + docID, + ) +} + +// CheckDocAccessWithIdentityFunc handles the check, which tells us if access to the target +// document is valid, with respect to the permission type, and the specified collection. +// +// The identity is determined by an identity function. +// +// This function should only be called if acp is available. As we have unrestricted +// access when acp is not available (acp turned off). +// +// Since we know acp is enabled we have these components to check in this function: +// (1) the request is permissioned (has an identity), +// (2) the collection is permissioned (has a policy), +// +// Unrestricted Access to document if: +// - (2) is false. +// - Document is public (unregistered), whether signatured request or not doesn't matter. +func CheckDocAccessWithIdentityFunc( + ctx context.Context, + identityFunc func() immutable.Option[acpIdentity.Identity], + acpSystem acp.ACP, + collection client.Collection, + permission acp.DPIPermission, + docID string, ) (bool, error) { // Even if acp exists, but there is no policy on the collection (unpermissioned collection) // then we still have unrestricted access. @@ -67,6 +103,7 @@ func CheckAccessOfDocOnCollectionWithACP( return true, nil } + identity := identityFunc() var identityValue string if !identity.HasValue() { // We can't assume that there is no-access just because there is no identity even if the document diff --git a/internal/db/request.go b/internal/db/request.go index 611382d6c2..e476ab50eb 100644 --- a/internal/db/request.go +++ b/internal/db/request.go @@ -19,7 +19,7 @@ import ( ) // execRequest executes a request against the database. -func (db *db) execRequest(ctx context.Context, request string, options *client.GQLOptions) *client.RequestResult { +func (db *DB) execRequest(ctx context.Context, request string, options *client.GQLOptions) *client.RequestResult { res := &client.RequestResult{} ast, err := db.parser.BuildRequestAST(request) if err != nil { @@ -59,6 +59,6 @@ func (db *db) execRequest(ctx context.Context, request string, options *client.G } // ExecIntrospection executes an introspection request against the database. -func (db *db) ExecIntrospection(request string) *client.RequestResult { +func (db *DB) ExecIntrospection(request string) *client.RequestResult { return db.parser.ExecuteIntrospection(request) } diff --git a/internal/db/schema.go b/internal/db/schema.go index b5e2ff4e61..e465f98a2e 100644 --- a/internal/db/schema.go +++ b/internal/db/schema.go @@ -34,7 +34,7 @@ const ( // addSchema takes the provided schema in SDL format, and applies it to the database, // and creates the necessary collections, request types, etc. -func (db *db) addSchema( +func (db *DB) addSchema( ctx context.Context, schemaString string, ) ([]client.CollectionDescription, error) { @@ -61,7 +61,7 @@ func (db *db) addSchema( return returnDescriptions, nil } -func (db *db) loadSchema(ctx context.Context) error { +func (db *DB) loadSchema(ctx context.Context) error { txn := mustGetContextTxn(ctx) definitions, err := db.getAllActiveDefinitions(ctx) @@ -83,7 +83,7 @@ func (db *db) loadSchema(ctx context.Context) error { // The collections (including the schema version ID) will only be updated if any changes have actually // been made, if the net result of the patch matches the current persisted description then no changes // will be applied. -func (db *db) patchSchema( +func (db *DB) patchSchema( ctx context.Context, patchString string, migration immutable.Option[model.Lens], @@ -236,7 +236,7 @@ func substituteSchemaPatch( return patch, nil } -func (db *db) getSchemaByVersionID( +func (db *DB) getSchemaByVersionID( ctx context.Context, versionID string, ) (client.SchemaDescription, error) { @@ -249,7 +249,7 @@ func (db *db) getSchemaByVersionID( return schemas[0], nil } -func (db *db) getSchemas( +func (db *DB) getSchemas( ctx context.Context, options client.SchemaFetchOptions, ) ([]client.SchemaDescription, error) { @@ -324,7 +324,7 @@ func containsLetter(s string) bool { // The schema (including the schema version ID) will only be updated if any changes have actually // been made, if the given description matches the current persisted description then no changes will be // applied. -func (db *db) updateSchema( +func (db *DB) updateSchema( ctx context.Context, existingSchemaByName map[string]client.SchemaDescription, proposedDescriptionsByName map[string]client.SchemaDescription, diff --git a/internal/db/sequence.go b/internal/db/sequence.go index 8a9facaa63..c41fecb7ac 100644 --- a/internal/db/sequence.go +++ b/internal/db/sequence.go @@ -25,7 +25,7 @@ type sequence struct { val uint64 } -func (db *db) getSequence(ctx context.Context, key keys.Key) (*sequence, error) { +func (db *DB) getSequence(ctx context.Context, key keys.Key) (*sequence, error) { seq := &sequence{ key: key, val: uint64(0), diff --git a/internal/db/store.go b/internal/db/store.go index bd23e69d28..b36cd15c35 100644 --- a/internal/db/store.go +++ b/internal/db/store.go @@ -21,7 +21,7 @@ import ( ) // ExecRequest executes a request against the database. -func (db *db) ExecRequest(ctx context.Context, request string, opts ...client.RequestOption) *client.RequestResult { +func (db *DB) ExecRequest(ctx context.Context, request string, opts ...client.RequestOption) *client.RequestResult { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { res := &client.RequestResult{} @@ -49,7 +49,7 @@ func (db *db) ExecRequest(ctx context.Context, request string, opts ...client.Re } // GetCollectionByName returns an existing collection within the database. -func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) { +func (db *DB) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) { ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return nil, err @@ -60,7 +60,7 @@ func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Coll } // GetCollections gets all the currently defined collections. -func (db *db) GetCollections( +func (db *DB) GetCollections( ctx context.Context, options client.CollectionFetchOptions, ) ([]client.Collection, error) { @@ -77,7 +77,7 @@ func (db *db) GetCollections( // ID provided. // // Will return an error if it is not found. -func (db *db) GetSchemaByVersionID(ctx context.Context, versionID string) (client.SchemaDescription, error) { +func (db *DB) GetSchemaByVersionID(ctx context.Context, versionID string) (client.SchemaDescription, error) { ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return client.SchemaDescription{}, err @@ -89,7 +89,7 @@ func (db *db) GetSchemaByVersionID(ctx context.Context, versionID string) (clien // GetSchemas returns all schema versions that currently exist within // this [Store]. -func (db *db) GetSchemas( +func (db *DB) GetSchemas( ctx context.Context, options client.SchemaFetchOptions, ) ([]client.SchemaDescription, error) { @@ -103,7 +103,7 @@ func (db *db) GetSchemas( } // GetAllIndexes gets all the indexes in the database. -func (db *db) GetAllIndexes( +func (db *DB) GetAllIndexes( ctx context.Context, ) (map[client.CollectionName][]client.IndexDescription, error) { ctx, txn, err := ensureContextTxn(ctx, db, true) @@ -120,7 +120,7 @@ func (db *db) GetAllIndexes( // // All schema types provided must not exist prior to calling this, and they may not reference existing // types previously defined. -func (db *db) AddSchema(ctx context.Context, schemaString string) ([]client.CollectionDescription, error) { +func (db *DB) AddSchema(ctx context.Context, schemaString string) ([]client.CollectionDescription, error) { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return nil, err @@ -149,7 +149,7 @@ func (db *db) AddSchema(ctx context.Context, schemaString string) ([]client.Coll // The collections (including the schema version ID) will only be updated if any changes have actually // been made, if the net result of the patch matches the current persisted description then no changes // will be applied. -func (db *db) PatchSchema( +func (db *DB) PatchSchema( ctx context.Context, patchString string, migration immutable.Option[model.Lens], @@ -169,7 +169,7 @@ func (db *db) PatchSchema( return txn.Commit(ctx) } -func (db *db) PatchCollection( +func (db *DB) PatchCollection( ctx context.Context, patchString string, ) error { @@ -187,7 +187,7 @@ func (db *db) PatchCollection( return txn.Commit(ctx) } -func (db *db) SetActiveSchemaVersion(ctx context.Context, schemaVersionID string) error { +func (db *DB) SetActiveSchemaVersion(ctx context.Context, schemaVersionID string) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -202,7 +202,7 @@ func (db *db) SetActiveSchemaVersion(ctx context.Context, schemaVersionID string return txn.Commit(ctx) } -func (db *db) SetMigration(ctx context.Context, cfg client.LensConfig) error { +func (db *DB) SetMigration(ctx context.Context, cfg client.LensConfig) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -217,7 +217,7 @@ func (db *db) SetMigration(ctx context.Context, cfg client.LensConfig) error { return txn.Commit(ctx) } -func (db *db) AddView( +func (db *DB) AddView( ctx context.Context, query string, sdl string, @@ -242,7 +242,7 @@ func (db *db) AddView( return defs, nil } -func (db *db) RefreshViews(ctx context.Context, opts client.CollectionFetchOptions) error { +func (db *DB) RefreshViews(ctx context.Context, opts client.CollectionFetchOptions) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -264,7 +264,7 @@ func (db *db) RefreshViews(ctx context.Context, opts client.CollectionFetchOptio // BasicImport imports a json dataset. // filepath must be accessible to the node. -func (db *db) BasicImport(ctx context.Context, filepath string) error { +func (db *DB) BasicImport(ctx context.Context, filepath string) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -280,7 +280,7 @@ func (db *db) BasicImport(ctx context.Context, filepath string) error { } // BasicExport exports the current data or subset of data to file in json format. -func (db *db) BasicExport(ctx context.Context, config *client.BackupConfig) error { +func (db *DB) BasicExport(ctx context.Context, config *client.BackupConfig) error { ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return err diff --git a/internal/db/subscriptions.go b/internal/db/subscriptions.go index 4b92b127fc..fa45f20487 100644 --- a/internal/db/subscriptions.go +++ b/internal/db/subscriptions.go @@ -23,7 +23,7 @@ import ( // handleSubscription checks for a subscription within the given request and // starts a new go routine that will return all subscription results on the returned // channel. If a subscription does not exist on the given request nil will be returned. -func (db *db) handleSubscription(ctx context.Context, r *request.Request) (<-chan client.GQLResult, error) { +func (db *DB) handleSubscription(ctx context.Context, r *request.Request) (<-chan client.GQLResult, error) { if len(r.Subscription) == 0 || len(r.Subscription[0].Selections) == 0 { return nil, nil // This is not a subscription request and we have nothing to do here } diff --git a/internal/db/view.go b/internal/db/view.go index 29550c8f24..55828021aa 100644 --- a/internal/db/view.go +++ b/internal/db/view.go @@ -29,7 +29,7 @@ import ( "github.com/sourcenetwork/defradb/internal/planner" ) -func (db *db) addView( +func (db *DB) addView( ctx context.Context, inputQuery string, sdl string, @@ -96,7 +96,7 @@ func (db *db) addView( return returnDescriptions, nil } -func (db *db) refreshViews(ctx context.Context, opts client.CollectionFetchOptions) error { +func (db *DB) refreshViews(ctx context.Context, opts client.CollectionFetchOptions) error { // For now, we only support user-cache management of views, not all collections cols, err := db.getViews(ctx, opts) if err != nil { @@ -126,7 +126,7 @@ func (db *db) refreshViews(ctx context.Context, opts client.CollectionFetchOptio return nil } -func (db *db) getViews(ctx context.Context, opts client.CollectionFetchOptions) ([]client.CollectionDefinition, error) { +func (db *DB) getViews(ctx context.Context, opts client.CollectionFetchOptions) ([]client.CollectionDefinition, error) { cols, err := db.getCollections(ctx, opts) if err != nil { return nil, err @@ -144,7 +144,7 @@ func (db *db) getViews(ctx context.Context, opts client.CollectionFetchOptions) return views, nil } -func (db *db) buildViewCache(ctx context.Context, col client.CollectionDefinition) (err error) { +func (db *DB) buildViewCache(ctx context.Context, col client.CollectionDefinition) (err error) { txn := mustGetContextTxn(ctx) p := planner.New(ctx, identity.FromContext(ctx), db.acp, db, txn) @@ -226,7 +226,7 @@ func (db *db) buildViewCache(ctx context.Context, col client.CollectionDefinitio return nil } -func (db *db) clearViewCache(ctx context.Context, col client.CollectionDefinition) error { +func (db *DB) clearViewCache(ctx context.Context, col client.CollectionDefinition) error { txn := mustGetContextTxn(ctx) prefix := keys.NewViewCacheColPrefix(col.Description.RootID) @@ -252,7 +252,7 @@ func (db *db) clearViewCache(ctx context.Context, col client.CollectionDefinitio return q.Close() } -func (db *db) generateMaximalSelectFromCollection( +func (db *DB) generateMaximalSelectFromCollection( ctx context.Context, col client.CollectionDefinition, fieldName immutable.Option[string], diff --git a/net/client.go b/net/client.go index d5276f292b..40bdc5b1c1 100644 --- a/net/client.go +++ b/net/client.go @@ -71,3 +71,26 @@ func (s *server) pushLog(evt event.Update, pid peer.ID) (err error) { } return nil } + +// getIdentity creates a getIdentity request and sends it to another node +func (s *server) getIdentity(ctx context.Context, pid peer.ID) (getIdentityReply, error) { + client, err := s.dial(pid) // grpc dial over P2P stream + if err != nil { + return getIdentityReply{}, NewErrPushLog(err) + } + + ctx, cancel := context.WithTimeout(ctx, PushTimeout) + defer cancel() + + req := getIdentityRequest{ + PeerID: s.peer.host.ID().String(), + } + resp := getIdentityReply{} + if err := client.Invoke(ctx, serviceGetIdentityName, req, &resp); err != nil { + return getIdentityReply{}, NewErrFailedToGetIdentity( + err, + errors.NewKV("PeerID", pid), + ) + } + return resp, nil +} diff --git a/net/dialer_test.go b/net/dialer_test.go index 4ed8bcf68b..e866321a58 100644 --- a/net/dialer_test.go +++ b/net/dialer_test.go @@ -14,8 +14,11 @@ import ( "context" "testing" + "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/sourcenetwork/defradb/acp" ) func TestDial_WithConnectedPeer_NoError(t *testing.T) { @@ -26,18 +29,18 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) { ctx := context.Background() n1, err := NewPeer( ctx, - db1.Blockstore(), - db1.Encstore(), db1.Events(), + immutable.None[acp.ACP](), + db1, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Blockstore(), - db1.Encstore(), db2.Events(), + immutable.None[acp.ACP](), + db2, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -58,18 +61,18 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) { ctx := context.Background() n1, err := NewPeer( ctx, - db1.Blockstore(), - db1.Encstore(), db1.Events(), + immutable.None[acp.ACP](), + db1, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Blockstore(), - db1.Encstore(), db2.Events(), + immutable.None[acp.ACP](), + db2, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -93,18 +96,18 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing ctx := context.Background() n1, err := NewPeer( ctx, - db1.Blockstore(), - db1.Encstore(), db1.Events(), + immutable.None[acp.ACP](), + db1, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Blockstore(), - db1.Encstore(), db2.Events(), + immutable.None[acp.ACP](), + db2, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) diff --git a/net/errors.go b/net/errors.go index 3a21c8e5c1..bfe61d81fa 100644 --- a/net/errors.go +++ b/net/errors.go @@ -25,6 +25,7 @@ const ( errRequestingEncryptionKeys = "failed to request encryption keys with %v" errTopicAlreadyExist = "topic with name \"%s\" already exists" errTopicDoesNotExist = "topic with name \"%s\" does not exists" + errFailedToGetIdentity = "failed to get identity" ) var ( @@ -59,3 +60,7 @@ func NewErrTopicAlreadyExist(topic string) error { func NewErrTopicDoesNotExist(topic string) error { return errors.New(fmt.Sprintf(errTopicDoesNotExist, topic)) } + +func NewErrFailedToGetIdentity(inner error, kv ...errors.KV) error { + return errors.Wrap(errFailedToGetIdentity, inner, kv...) +} diff --git a/net/grpc.go b/net/grpc.go index 8e526de102..48f6c160f4 100644 --- a/net/grpc.go +++ b/net/grpc.go @@ -19,29 +19,10 @@ import ( const ( grpcServiceName = "defradb.net.Service" - serviceGetDocGraphName = "/" + grpcServiceName + "/GetDocGraph" - servicePushDocGraphName = "/" + grpcServiceName + "/PushDocGraph" - serviceGetLogName = "/" + grpcServiceName + "/GetLog" - servicePushLogName = "/" + grpcServiceName + "/PushLog" - serviceGetHeadLogName = "/" + grpcServiceName + "/GetHeadLog" + servicePushLogName = "/" + grpcServiceName + "/PushLog" + serviceGetIdentityName = "/" + grpcServiceName + "/GetIdentity" ) -type getDocGraphRequest struct{} - -type getDocGraphReply struct{} - -type getHeadLogRequest struct{} - -type getHeadLogReply struct{} - -type getLogRequest struct{} - -type getLogReply struct{} - -type pushDocGraphRequest struct{} - -type pushDocGraphReply struct{} - type pushLogRequest struct { DocID string CID []byte @@ -52,17 +33,45 @@ type pushLogRequest struct { type pushLogReply struct{} +type getIdentityRequest struct { + // PeerID is the ID of the requesting peer. + // It will be used as the audience for the identity token. + PeerID string +} + +type getIdentityReply struct { + // IdentityToken is the token that can be used to authenticate the peer. + IdentityToken []byte +} + type serviceServer interface { - // GetDocGraph from this peer. - GetDocGraph(context.Context, *getDocGraphRequest) (*getDocGraphReply, error) - // PushDocGraph to this peer. - PushDocGraph(context.Context, *pushDocGraphRequest) (*pushDocGraphReply, error) - // GetLog from this peer. - GetLog(context.Context, *getLogRequest) (*getLogReply, error) - // PushLog to this peer. - PushLog(context.Context, *pushLogRequest) (*pushLogReply, error) - // GetHeadLog from this peer - GetHeadLog(context.Context, *getHeadLogRequest) (*getHeadLogReply, error) + // pushLogHandler handles a push log request to sync blocks. + pushLogHandler(context.Context, *pushLogRequest) (*pushLogReply, error) + // getIdentityHandler handles an indentity request and returns the local node's identity. + getIdentityHandler(context.Context, *getIdentityRequest) (*getIdentityReply, error) +} + +func getIdentityHandler( + srv any, + ctx context.Context, + dec func(any) error, + interceptor grpc.UnaryServerInterceptor, +) (any, error) { + in := new(getIdentityRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(serviceServer).getIdentityHandler(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: serviceGetIdentityName, + } + handler := func(ctx context.Context, req any) (any, error) { + return srv.(serviceServer).getIdentityHandler(ctx, req.(*getIdentityRequest)) + } + return interceptor(ctx, in, info, handler) } func pushLogHandler( @@ -76,14 +85,14 @@ func pushLogHandler( return nil, err } if interceptor == nil { - return srv.(serviceServer).PushLog(ctx, in) + return srv.(serviceServer).pushLogHandler(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: servicePushLogName, } handler := func(ctx context.Context, req any) (any, error) { - return srv.(serviceServer).PushLog(ctx, req.(*pushLogRequest)) + return srv.(serviceServer).pushLogHandler(ctx, req.(*pushLogRequest)) } return interceptor(ctx, in, info, handler) } @@ -97,6 +106,10 @@ func registerServiceServer(s grpc.ServiceRegistrar, srv serviceServer) { MethodName: "PushLog", Handler: pushLogHandler, }, + { + MethodName: "GetIdentity", + Handler: getIdentityHandler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "defradb.cbor", diff --git a/net/peer.go b/net/peer.go index a12f31a8b5..5f4dca1de4 100644 --- a/net/peer.go +++ b/net/peer.go @@ -30,8 +30,11 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/sourcenetwork/corelog" + "github.com/sourcenetwork/immutable" "google.golang.org/grpc" + "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" @@ -39,12 +42,23 @@ import ( corenet "github.com/sourcenetwork/defradb/internal/core/net" ) +// DB hold the database related methods that are required by Peer. +type DB interface { + // Blockstore returns the blockstore, within which all blocks (commits) managed by DefraDB are held. + Blockstore() datastore.Blockstore + // Encstore returns the store, that contains all known encryption keys for documents and their fields. + Encstore() datastore.Blockstore + // GetCollections returns the list of collections according to the given options. + GetCollections(ctx context.Context, opts client.CollectionFetchOptions) ([]client.Collection, error) + // GetNodeIndentityToken returns an identity token for the given audience. + GetNodeIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) + // GetNodeIdentity returns the node's public raw identity. + GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) +} + // Peer is a DefraDB Peer node which exposes all the LibP2P host/peer functionality // to the underlying DefraDB instance. type Peer struct { - blockstore datastore.Blockstore - encstore datastore.Blockstore - bus *event.Bus updateSub *event.Subscription @@ -61,15 +75,18 @@ type Peer struct { // peer DAG service bserv blockservice.BlockService + acp immutable.Option[acp.ACP] + db DB + bootCloser io.Closer } // NewPeer creates a new instance of the DefraDB server as a peer-to-peer node. func NewPeer( ctx context.Context, - blockstore datastore.Blockstore, - encstore datastore.Blockstore, bus *event.Bus, + acp immutable.Option[acp.ACP], + db DB, opts ...NodeOpt, ) (p *Peer, err error) { ctx, cancel := context.WithCancel(ctx) @@ -81,7 +98,7 @@ func NewPeer( } }() - if blockstore == nil || encstore == nil { + if db == nil { return nil, ErrNilDB } @@ -111,19 +128,15 @@ func NewPeer( corelog.Any("Address", options.ListenAddresses), ) - bswapnet := network.NewFromIpfsHost(h) - bswap := bitswap.New(ctx, bswapnet, ddht, blockstore) - p = &Peer{ - host: h, - dht: ddht, - blockstore: blockstore, - encstore: encstore, - ctx: ctx, - cancel: cancel, - bus: bus, - p2pRPC: grpc.NewServer(options.GRPCServerOptions...), - bserv: blockservice.New(blockstore, bswap), + host: h, + dht: ddht, + ctx: ctx, + cancel: cancel, + bus: bus, + acp: acp, + db: db, + p2pRPC: grpc.NewServer(options.GRPCServerOptions...), } if options.EnablePubSub { @@ -149,6 +162,10 @@ func NewPeer( return nil, err } + bswapnet := network.NewFromIpfsHost(h) + bswap := bitswap.New(ctx, bswapnet, ddht, db.Blockstore(), bitswap.WithPeerBlockRequestFilter(p.server.hasAccess)) + p.bserv = blockservice.New(db.Blockstore(), bswap) + p2pListener, err := gostream.Listen(h, corenet.Protocol) if err != nil { return nil, err diff --git a/net/peer_test.go b/net/peer_test.go index 40249192ea..01b4f93af0 100644 --- a/net/peer_test.go +++ b/net/peer_test.go @@ -79,9 +79,9 @@ func newTestPeer(ctx context.Context, t *testing.T) (client.DB, *Peer) { n, err := NewPeer( ctx, - db.Blockstore(), - db.Encstore(), db.Events(), + immutable.None[acp.ACP](), + db, WithListenAddresses(randomMultiaddr), ) require.NoError(t, err) @@ -95,14 +95,14 @@ func TestNewPeer_NoError(t *testing.T) { db, err := db.NewDB(ctx, store, acp.NoACP, nil) require.NoError(t, err) defer db.Close() - p, err := NewPeer(ctx, db.Blockstore(), db.Encstore(), db.Events()) + p, err := NewPeer(ctx, db.Events(), immutable.None[acp.ACP](), db) require.NoError(t, err) p.Close() } func TestNewPeer_NoDB_NilDBError(t *testing.T) { ctx := context.Background() - _, err := NewPeer(ctx, nil, nil, nil, nil) + _, err := NewPeer(ctx, nil, immutable.None[acp.ACP](), nil) require.ErrorIs(t, err, ErrNilDB) } @@ -120,18 +120,18 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { n1, err := NewPeer( ctx, - db1.Blockstore(), - db1.Encstore(), db1.Events(), + immutable.None[acp.ACP](), + db1, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Blockstore(), - db1.Encstore(), db2.Events(), + immutable.None[acp.ACP](), + db2, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -249,13 +249,12 @@ func TestHandleLog_WithExistingSchemaTopic_TopicExistsError(t *testing.T) { require.ErrorContains(t, err, "topic already exists") } -func FixtureNewMemoryDBWithBroadcaster(t *testing.T) client.DB { - var database client.DB +func FixtureNewMemoryDBWithBroadcaster(t *testing.T) *db.DB { ctx := context.Background() opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - database, err = db.NewDB(ctx, rootstore, acp.NoACP, nil) + database, err := db.NewDB(ctx, rootstore, acp.NoACP, nil) require.NoError(t, err) return database } @@ -268,9 +267,9 @@ func TestNewPeer_WithEnableRelay_NoError(t *testing.T) { defer db.Close() n, err := NewPeer( context.Background(), - db.Blockstore(), - db.Encstore(), db.Events(), + immutable.None[acp.ACP](), + db, WithEnableRelay(true), ) require.NoError(t, err) @@ -286,9 +285,9 @@ func TestNewPeer_NoPubSub_NoError(t *testing.T) { n, err := NewPeer( context.Background(), - db.Blockstore(), - db.Encstore(), db.Events(), + immutable.None[acp.ACP](), + db, WithEnablePubSub(false), ) require.NoError(t, err) @@ -305,9 +304,9 @@ func TestNewPeer_WithEnablePubSub_NoError(t *testing.T) { n, err := NewPeer( ctx, - db.Blockstore(), - db.Encstore(), db.Events(), + immutable.None[acp.ACP](), + db, WithEnablePubSub(true), ) @@ -325,9 +324,9 @@ func TestNodeClose_NoError(t *testing.T) { defer db.Close() n, err := NewPeer( context.Background(), - db.Blockstore(), - db.Encstore(), db.Events(), + immutable.None[acp.ACP](), + db, ) require.NoError(t, err) n.Close() @@ -342,9 +341,9 @@ func TestListenAddrs_WithListenAddresses_NoError(t *testing.T) { n, err := NewPeer( context.Background(), - db.Blockstore(), - db.Encstore(), db.Events(), + immutable.None[acp.ACP](), + db, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -361,9 +360,9 @@ func TestPeer_WithBootstrapPeers_NoError(t *testing.T) { n, err := NewPeer( context.Background(), - db.Blockstore(), - db.Encstore(), db.Events(), + immutable.None[acp.ACP](), + db, WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"), ) require.NoError(t, err) diff --git a/net/server.go b/net/server.go index 0be9def0ce..ceeea53ce7 100644 --- a/net/server.go +++ b/net/server.go @@ -23,14 +23,18 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/sourcenetwork/corelog" rpc "github.com/sourcenetwork/go-libp2p-pubsub-rpc" + "github.com/sourcenetwork/immutable" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" grpcpeer "google.golang.org/grpc/peer" + "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/db/permission" ) // server is the request/response instance for all P2P RPC communication. @@ -48,6 +52,9 @@ type server struct { mu sync.Mutex conns map[libpeer.ID]*grpc.ClientConn + + peerIdentities map[libpeer.ID]identity.Identity + piMux sync.RWMutex } // pubsubTopic is a wrapper of rpc.Topic to be able to track if the topic has @@ -61,10 +68,11 @@ type pubsubTopic struct { // underlying DB instance. func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { s := &server{ - peer: p, - conns: make(map[libpeer.ID]*grpc.ClientConn), - topics: make(map[string]pubsubTopic), - replicators: make(map[string]map[libpeer.ID]struct{}), + peer: p, + conns: make(map[libpeer.ID]*grpc.ClientConn), + topics: make(map[string]pubsubTopic), + replicators: make(map[string]map[libpeer.ID]struct{}), + peerIdentities: make(map[libpeer.ID]identity.Identity), } cred := insecure.NewCredentials() @@ -79,29 +87,17 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { return s, nil } -// GetDocGraph receives a get graph request -func (s *server) GetDocGraph( - ctx context.Context, - req *getDocGraphRequest, -) (*getDocGraphReply, error) { - return nil, nil +// pushLogHandler receives a push log request from the grpc server (replicator) +func (s *server) pushLogHandler(ctx context.Context, req *pushLogRequest) (*pushLogReply, error) { + return s.processPushlog(ctx, req, true) } -// PushDocGraph receives a push graph request -func (s *server) PushDocGraph( +// processPushlog processes a push log request +func (s *server) processPushlog( ctx context.Context, - req *pushDocGraphRequest, -) (*pushDocGraphReply, error) { - return nil, nil -} - -// GetLog receives a get log request -func (s *server) GetLog(ctx context.Context, req *getLogRequest) (*getLogReply, error) { - return nil, nil -} - -// PushLog receives a push log request -func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogReply, error) { + req *pushLogRequest, + isReplicator bool, +) (*pushLogReply, error) { pid, err := peerIDFromContext(ctx) if err != nil { return nil, err @@ -126,15 +122,24 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl return nil, err } + // No need to check access if the message is for replication as the node sending + // will have done so deliberately. + if !isReplicator { + mightHaveAccess, err := s.trySelfHasAccess(block, req.SchemaRoot) + if err != nil { + return nil, err + } + if !mightHaveAccess { + // If we know we don't have access, we can skip the rest of the processing. + return &pushLogReply{}, nil + } + } + log.InfoContext(ctx, "Received pushlog", corelog.Any("PeerID", pid.String()), corelog.Any("Creator", byPeer.String()), corelog.Any("DocID", req.DocID)) - log.InfoContext(ctx, "Starting DAG sync", - corelog.Any("PeerID", pid.String()), - corelog.Any("DocID", req.DocID)) - err = syncDAG(ctx, s.peer.bserv, block) if err != nil { return nil, err @@ -164,12 +169,20 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl return &pushLogReply{}, nil } -// GetHeadLog receives a get head log request -func (s *server) GetHeadLog( +// getIdentityHandler receives a get identity request and returns the identity token +// with the requesting peer as the audience. +func (s *server) getIdentityHandler( ctx context.Context, - req *getHeadLogRequest, -) (*getHeadLogReply, error) { - return nil, nil + req *getIdentityRequest, +) (*getIdentityReply, error) { + if !s.peer.acp.HasValue() { + return &getIdentityReply{}, nil + } + token, err := s.peer.db.GetNodeIdentityToken(ctx, immutable.Some(req.PeerID)) + if err != nil { + return nil, err + } + return &getIdentityReply{IdentityToken: token}, nil } // addPubSubTopic subscribes to a topic on the pubsub network @@ -327,7 +340,7 @@ func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{ Addr: addr{from}, }) - if _, err := s.PushLog(ctx, req); err != nil { + if _, err := s.processPushlog(ctx, req, false); err != nil { return nil, errors.Wrap(fmt.Sprintf("Failed pushing log for doc %s", topic), err) } return nil, nil @@ -447,3 +460,138 @@ func (s *server) SendPubSubMessage( } return t.Publish(ctx, data) } + +// hasAccess checks if the requesting peer has access to the given cid. +// +// This is used as a filter in bitswap to determine if we should send the block to the requesting peer. +func (s *server) hasAccess(p libpeer.ID, c cid.Cid) bool { + if !s.peer.acp.HasValue() { + return true + } + + rawblock, err := s.peer.db.Blockstore().Get(s.peer.ctx, c) + if err != nil { + log.ErrorE("Failed to get block", err) + return false + } + block, err := coreblock.GetFromBytes(rawblock.RawData()) + if err != nil { + log.ErrorE("Failed to get doc from block", err) + return false + } + + cols, err := s.peer.db.GetCollections( + s.peer.ctx, + client.CollectionFetchOptions{ + SchemaVersionID: immutable.Some(block.Delta.GetSchemaVersionID()), + }, + ) + if err != nil { + log.ErrorE("Failed to get collections", err) + return false + } + if len(cols) == 0 { + log.Info("No collections found", corelog.Any("Schema Version ID", block.Delta.GetSchemaVersionID())) + return false + } + + // If the requesting peer is in the replicators list for that collection, then they have access. + s.mu.Lock() + if peerList, ok := s.replicators[cols[0].SchemaRoot()]; ok { + _, exists := peerList[p] + if exists { + s.mu.Unlock() + return true + } + } + s.mu.Unlock() + + identFunc := func() immutable.Option[identity.Identity] { + s.piMux.RLock() + ident, ok := s.peerIdentities[p] + s.piMux.RUnlock() + if !ok { + resp, err := s.getIdentity(s.peer.ctx, p) + if err != nil { + log.ErrorE("Failed to get identity", err) + return immutable.None[identity.Identity]() + } + ident, err = identity.FromToken(resp.IdentityToken) + if err != nil { + log.ErrorE("Failed to parse identity token", err) + return immutable.None[identity.Identity]() + } + err = identity.VerifyAuthToken(ident, s.peer.PeerID().String()) + if err != nil { + log.ErrorE("Failed to verify auth token", err) + return immutable.None[identity.Identity]() + } + s.piMux.Lock() + s.peerIdentities[p] = ident + s.piMux.Unlock() + } + return immutable.Some(ident) + } + + peerHasAccess, err := permission.CheckDocAccessWithIdentityFunc( + s.peer.ctx, + identFunc, + s.peer.acp.Value(), + cols[0], // For now we assume there is only one collection. + acp.ReadPermission, + string(block.Delta.GetDocID()), + ) + if err != nil { + log.ErrorE("Failed to check access", err) + return false + } + + return peerHasAccess +} + +// trySelfHasAccess checks if the local node has access to the given block. +// +// This is a best-effort check and returns true unless we explicitly find that the local node +// doesn't have access or if we get an error. The node sending is ultimately responsible for +// ensuring that the recipient has access. +func (s *server) trySelfHasAccess(block *coreblock.Block, schemaRoot string) (bool, error) { + if !s.peer.acp.HasValue() { + return true, nil + } + + cols, err := s.peer.db.GetCollections( + s.peer.ctx, + client.CollectionFetchOptions{ + SchemaRoot: immutable.Some(schemaRoot), + }, + ) + if err != nil { + return false, err + } + if len(cols) == 0 { + return false, client.ErrCollectionNotFound + } + ident, err := s.peer.db.GetNodeIdentity(s.peer.ctx) + if err != nil { + return false, err + } + if !ident.HasValue() { + return true, nil + } + + peerHasAccess, err := permission.CheckDocAccessWithIdentityFunc( + s.peer.ctx, + func() immutable.Option[identity.Identity] { + return immutable.Some(identity.Identity{DID: ident.Value().DID}) + }, + s.peer.acp.Value(), + cols[0], // For now we assume there is only one collection. + acp.ReadPermission, + string(block.Delta.GetDocID()), + ) + if err != nil { + return false, err + } + + return peerHasAccess, nil +} diff --git a/net/server_test.go b/net/server_test.go index a29952d2b8..49b8ae1e91 100644 --- a/net/server_test.go +++ b/net/server_test.go @@ -34,46 +34,6 @@ func TestNewServerSimple(t *testing.T) { require.NoError(t, err) } -func TestGetDocGraph(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.GetDocGraph(ctx, &getDocGraphRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - -func TestPushDocGraph(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.PushDocGraph(ctx, &pushDocGraphRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - -func TestGetLog(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.GetLog(ctx, &getLogRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - -func TestGetHeadLog(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.GetHeadLog(ctx, &getHeadLogRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - func getHead(ctx context.Context, db client.DB, docID client.DocID) (cid.Cid, error) { prefix := keys.DataStoreKeyFromDocID(docID).ToHeadStoreKey().WithFieldID(core.COMPOSITE_NAMESPACE).ToString() results, err := db.Headstore().Query(ctx, query.Query{Prefix: prefix}) @@ -126,7 +86,7 @@ func TestPushLog(t *testing.T) { b, err := db.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString()) require.NoError(t, err) - _, err = p.server.PushLog(ctx, &pushLogRequest{ + _, err = p.server.pushLogHandler(ctx, &pushLogRequest{ DocID: doc.ID().String(), CID: headCID.Bytes(), SchemaRoot: col.SchemaRoot(), diff --git a/net/sync_dag.go b/net/sync_dag.go index 11e021f239..76a834cbcb 100644 --- a/net/sync_dag.go +++ b/net/sync_dag.go @@ -23,9 +23,9 @@ import ( coreblock "github.com/sourcenetwork/defradb/internal/core/block" ) -// syncDAGTimeout is the maximum amount of time -// to wait for a dag to be fetched. -var syncDAGTimeout = 60 * time.Second +// syncBlockLinkTimeout is the maximum amount of time +// to wait for a block link to be fetched. +var syncBlockLinkTimeout = 5 * time.Second // syncDAG synchronizes the DAG starting with the given block // using the blockservice to fetch remote blocks. @@ -60,9 +60,8 @@ func syncDAG(ctx context.Context, bserv blockservice.BlockService, block *corebl // If it encounters errors in the concurrent loading of links, it will return // the first error it encountered. func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblock.Block) error { - ctx, cancel := context.WithTimeout(ctx, syncDAGTimeout) + ctxWithCancel, cancel := context.WithCancel(ctx) defer cancel() - var wg sync.WaitGroup var asyncErr error var asyncErrOnce sync.Once @@ -76,10 +75,12 @@ func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblo wg.Add(1) go func(lnk cidlink.Link) { defer wg.Done() - if ctx.Err() != nil { + if ctxWithCancel.Err() != nil { return } - nd, err := lsys.Load(linking.LinkContext{Ctx: ctx}, lnk, coreblock.SchemaPrototype) + ctxWithTimeout, cancel := context.WithTimeout(ctx, syncBlockLinkTimeout) + defer cancel() + nd, err := lsys.Load(linking.LinkContext{Ctx: ctxWithTimeout}, lnk, coreblock.SchemaPrototype) if err != nil { asyncErrOnce.Do(func() { setAsyncErr(err) }) return diff --git a/node/node.go b/node/node.go index 660f6d4174..16c5fc3658 100644 --- a/node/node.go +++ b/node/node.go @@ -149,19 +149,26 @@ func (n *Node) Start(ctx context.Context) error { return err } - n.DB, err = db.NewDB(ctx, rootstore, n.acp, lens, n.dbOpts...) + coreDB, err := db.NewDB(ctx, rootstore, n.acp, lens, n.dbOpts...) if err != nil { return err } + n.DB = coreDB if !n.options.disableP2P { // setup net node - n.Peer, err = net.NewPeer(ctx, n.DB.Blockstore(), n.DB.Encstore(), n.DB.Events(), n.netOpts...) + n.Peer, err = net.NewPeer( + ctx, + coreDB.Events(), + n.acp, + coreDB, + n.netOpts..., + ) if err != nil { return err } - ident, err := n.DB.GetNodeIdentity(ctx) + ident, err := coreDB.GetNodeIdentity(ctx) if err != nil { return err } @@ -173,10 +180,10 @@ func (n *Node) Start(ctx context.Context) error { ctx, n.Peer.PeerID(), n.Peer.Server(), - n.DB.Events(), - n.DB.Encstore(), + coreDB.Events(), + coreDB.Encstore(), n.acp, - db.NewCollectionRetriever(n.DB), + db.NewCollectionRetriever(coreDB), ident.Value().DID, ) } @@ -188,7 +195,7 @@ func (n *Node) Start(ctx context.Context) error { if !n.options.disableAPI { // setup http server - handler, err := http.NewHandler(n.DB) + handler, err := http.NewHandler(coreDB) if err != nil { return err } diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 5983ad228a..438cd07f23 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -219,7 +219,7 @@ func addDocActorRelationshipACP( docID: {}, } - waitForUpdateEvents(s, actionNodeID, action.CollectionID, expect) + waitForUpdateEvents(s, actionNodeID, action.CollectionID, expect, action.TargetIdentity) } } @@ -471,7 +471,7 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { // // We need to lock before getting the ports, otherwise they may try and use the port we use for locking. // We can only unlock after the source hub node has started and begun listening on the assigned ports. - unlock, err := crossLock(55555) + unlock, err := crossLock(44444) if err != nil { return nil, err } diff --git a/tests/integration/acp/p2p/create_test.go b/tests/integration/acp/p2p/create_test.go index db3d5a4508..b4561962c3 100644 --- a/tests/integration/acp/p2p/create_test.go +++ b/tests/integration/acp/p2p/create_test.go @@ -130,3 +130,147 @@ func TestACP_P2PCreatePrivateDocumentsOnDifferentNodes_SourceHubACP(t *testing.T testUtils.ExecuteTestCase(t, test) } + +func TestACP_P2PCreatePrivateDocumentAndSyncAfterAddingRelationship_SourceHubACP(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + Description: "Test acp, p2p create a private documents and sync after adding actor relationship, with source-hub", + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + + testUtils.AddPolicy{ + + Identity: testUtils.ClientIdentity(1), + + Policy: ` + name: Test Policy + + description: A Policy + + actor: + name: actor + + resources: + users: + permissions: + read: + expr: owner + reader + writer + + write: + expr: owner + writer + + nothing: + expr: dummy + + relations: + owner: + types: + - actor + + reader: + types: + - actor + + writer: + types: + - actor + + admin: + manages: + - reader + types: + - actor + + dummy: + types: + - actor + `, + + ExpectedPolicyID: expectedPolicyID, + }, + + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + + testUtils.CreateDoc{ + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(0), + CollectionID: 0, + DocMap: map[string]any{ + "name": "Shahzad", + }, + }, + + // At this point the document is only accessible to the owner so node 1 + // should not have been able to sync the document. + testUtils.Request{ + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(1), + Request: `query { + Users{ + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, + + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.NodeIdentity(1), + CollectionID: 0, + DocID: 0, + Relation: "reader", + ExpectedExistence: false, + }, + + testUtils.WaitForSync{}, + + testUtils.Request{ + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(1), + Request: `query { + Users { + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + {"name": "Shahzad"}, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/acp/p2p/subscribe_test.go b/tests/integration/acp/p2p/subscribe_test.go index e776ae4fb2..378f9a1887 100644 --- a/tests/integration/acp/p2p/subscribe_test.go +++ b/tests/integration/acp/p2p/subscribe_test.go @@ -168,9 +168,10 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollection_SourceHubACP(t * "name": "John", }, }, - testUtils.WaitForSync{}, testUtils.Request{ - // Ensure that the document is accessible on all nodes to authorized actors + // The document will only be accessible on node 0 since node 1 is not authorized to + // access the document. + NodeID: immutable.Some(0), Identity: testUtils.ClientIdentity(1), Request: ` query { @@ -187,6 +188,22 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollection_SourceHubACP(t * }, }, }, + testUtils.Request{ + // Since node 1 is not authorized to access the document, it won't have to document + // so even if requesting with an authorized identity, the document won't be returned. + NodeID: immutable.Some(1), + Identity: testUtils.ClientIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, testUtils.Request{ // Ensure that the document is hidden on all nodes to unidentified actors Request: ` diff --git a/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go b/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go index 52038b8d5b..3f2ea6b901 100644 --- a/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go +++ b/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go @@ -104,34 +104,38 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel testUtils.ConnectPeers{ SourceNodeID: 1, - TargetNodeID: 0, }, testUtils.SubscribeToCollection{ - NodeID: 1, - + NodeID: 1, CollectionIDs: []int{0}, }, testUtils.CreateDoc{ - Identity: testUtils.ClientIdentity(1), - - NodeID: immutable.Some(0), - + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(0), CollectionID: 0, - DocMap: map[string]any{ "name": "Shahzad", }, }, + testUtils.AddDocActorRelationship{ + NodeID: immutable.Some(0), + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.NodeIdentity(1), + CollectionID: 0, + DocID: 0, + Relation: "reader", + ExpectedExistence: false, + }, + testUtils.WaitForSync{}, testUtils.Request{ // Ensure that the document is hidden on all nodes to an unauthorized actor Identity: testUtils.ClientIdentity(2), - Request: ` query { Users { @@ -139,48 +143,34 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{}, }, }, testUtils.AddDocActorRelationship{ - NodeID: immutable.Some(0), - + NodeID: immutable.Some(0), RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedExistence: false, }, testUtils.AddDocActorRelationship{ - NodeID: immutable.Some(1), // Note: Different node than the previous - + NodeID: immutable.Some(1), // Note: Different node than the previous RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedExistence: true, // Making the same relation through any node should be a no-op }, testUtils.Request{ // Ensure that the document is now accessible on all nodes to the newly authorized actor. Identity: testUtils.ClientIdentity(2), - Request: ` query { Users { @@ -188,7 +178,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{ { @@ -201,7 +190,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel testUtils.Request{ // Ensure that the document is still accessible on all nodes to the owner. Identity: testUtils.ClientIdentity(1), - Request: ` query { Users { @@ -209,7 +197,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{ { @@ -220,41 +207,28 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel }, testUtils.DeleteDocActorRelationship{ - NodeID: immutable.Some(1), - - RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + NodeID: immutable.Some(1), + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedRecordFound: true, }, testUtils.DeleteDocActorRelationship{ - NodeID: immutable.Some(0), // Note: Different node than the previous - - RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + NodeID: immutable.Some(0), // Note: Different node than the previous + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedRecordFound: false, // Making the same relation through any node should be a no-op }, testUtils.Request{ // Ensure that the document is now inaccessible on all nodes to the actor we revoked access from. Identity: testUtils.ClientIdentity(2), - Request: ` query { Users { @@ -271,7 +245,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel testUtils.Request{ // Ensure that the document is still accessible on all nodes to the owner. Identity: testUtils.ClientIdentity(1), - Request: ` query { Users { @@ -279,7 +252,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{ { diff --git a/tests/integration/encryption/peer_acp_test.go b/tests/integration/encryption/peer_acp_test.go index bb6705c626..9b0fd6f132 100644 --- a/tests/integration/encryption/peer_acp_test.go +++ b/tests/integration/encryption/peer_acp_test.go @@ -433,7 +433,6 @@ func TestDocEncryptionACP_IfClientNodeHasDocPermissionButServerNodeIsNotAvailabl `, IsDocEncrypted: true, }, - testUtils.WaitForSync{}, testUtils.Close{ NodeID: immutable.Some(0), }, diff --git a/tests/integration/events.go b/tests/integration/events.go index 12fc58f8b7..658945c757 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -12,6 +12,7 @@ package tests import ( "encoding/json" + "strconv" "time" "github.com/sourcenetwork/immutable" @@ -155,6 +156,7 @@ func waitForUpdateEvents( nodeID immutable.Option[int], collectionIndex int, docIDs map[string]struct{}, + ident immutable.Option[identity], ) { for i := 0; i < len(s.nodes); i++ { if nodeID.HasValue() && nodeID.Value() != i { @@ -197,7 +199,7 @@ func waitForUpdateEvents( // we only need to update the network state if the nodes // are configured for networking if s.isNetworkEnabled { - updateNetworkState(s, i, evt) + updateNetworkState(s, i, evt, ident) } } } @@ -271,7 +273,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { // updateNetworkState updates the network state by checking which // nodes should receive the updated document in the given update event. -func updateNetworkState(s *state, nodeID int, evt event.Update) { +func updateNetworkState(s *state, nodeID int, evt event.Update, ident immutable.Option[identity]) { // find the correct collection index for this update collectionID := -1 for i, c := range s.nodes[nodeID].collections { @@ -298,6 +300,13 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { if _, ok := s.nodes[id].p2p.actualDAGHeads[getUpdateEventKey(evt)]; ok { s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } + if ident.HasValue() && ident.Value().selector != strconv.Itoa(id) { + // If the document is created by a specific identity, only the node with the + // same index as the identity can initially access it. + // If this network state update comes from the adding of an actor relationship, + // then the identity reflects that of the target node. + continue + } // peer collection subscribers receive updates from any other subscriber node if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok { s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 6ab621728e..2f6a47dd6c 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -1225,7 +1225,13 @@ func createDoc( s.docIDs[action.CollectionID] = append(s.docIDs[action.CollectionID], docIDs...) if action.ExpectedError == "" { - waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForCreateDoc(s, action)) + waitForUpdateEvents( + s, + action.NodeID, + action.CollectionID, + getEventsForCreateDoc(s, action), + action.Identity, + ) } } @@ -1407,7 +1413,7 @@ func deleteDoc( docID.String(): {}, } - waitForUpdateEvents(s, action.NodeID, action.CollectionID, expect) + waitForUpdateEvents(s, action.NodeID, action.CollectionID, expect, immutable.None[identity]()) } } @@ -1452,7 +1458,13 @@ func updateDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { - waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForUpdateDoc(s, action)) + waitForUpdateEvents( + s, + action.NodeID, + action.CollectionID, + getEventsForUpdateDoc(s, action), + immutable.None[identity](), + ) } } @@ -1552,7 +1564,13 @@ func updateWithFilter(s *state, action UpdateWithFilter) { assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { - waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForUpdateWithFilter(s, action, res)) + waitForUpdateEvents( + s, + action.NodeID, + action.CollectionID, + getEventsForUpdateWithFilter(s, action, res), + immutable.None[identity](), + ) } }