diff --git a/acp/identity/identity.go b/acp/identity/identity.go index 4dee93deba..1297efb7d3 100644 --- a/acp/identity/identity.go +++ b/acp/identity/identity.go @@ -65,6 +65,19 @@ func FromPrivateKey(privateKey *secp256k1.PrivateKey) (Identity, error) { }, nil } +// FromPublicRawIdentity returns a new raw identity using the given public raw identity. +func FromPublicRawIdentity(rawIdentity PublicRawIdentity) (Identity, error) { + publicKey, err := secp256k1.ParsePubKey([]byte(rawIdentity.PublicKey)) + if err != nil { + return Identity{}, err + } + + return Identity{ + DID: rawIdentity.DID, + PublicKey: publicKey, + }, nil +} + // FromToken constructs a new `Identity` from a bearer token. func FromToken(data []byte) (Identity, error) { token, err := jwt.Parse(data, jwt.WithVerify(false)) @@ -127,6 +140,28 @@ func (identity *Identity) UpdateToken( audience immutable.Option[string], authorizedAccount immutable.Option[string], ) error { + signedToken, err := identity.NewToken(duration, audience, authorizedAccount) + if err != nil { + return err + } + + identity.BearerToken = string(signedToken) + return nil +} + +// NewToken creates and returns a new `BearerToken`. +// +// - duration: The [time.Duration] that this identity is valid for. +// - audience: The audience that this identity is valid for. This is required +// by the Defra http client. For example `github.com/sourcenetwork/defradb` +// - authorizedAccount: An account that this identity is authorizing to make +// SourceHub calls on behalf of this actor. This is currently required when +// using SourceHub ACP. +func (identity Identity) NewToken( + duration time.Duration, + audience immutable.Option[string], + authorizedAccount immutable.Option[string], +) ([]byte, error) { var signedToken []byte subject := hex.EncodeToString(identity.PublicKey.SerializeCompressed()) now := time.Now() @@ -144,21 +179,20 @@ func (identity *Identity) UpdateToken( token, err := jwtBuilder.Build() if err != nil { - return err + return nil, err } if authorizedAccount.HasValue() { err = token.Set(acptypes.AuthorizedAccountClaim, authorizedAccount.Value()) if err != nil { - return err + return nil, err } } signedToken, err = jwt.Sign(token, jwt.WithKey(BearerTokenSignatureScheme, identity.PrivateKey.ToECDSA())) if err != nil { - return err + return nil, err } - identity.BearerToken = string(signedToken) - return nil + return signedToken, nil } diff --git a/internal/db/db.go b/internal/db/db.go index 630bd0ae43..5ad246092c 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -104,7 +104,7 @@ func NewDB( acp immutable.Option[acp.ACP], lens client.LensRegistry, options ...Option, -) (client.DB, error) { +) (*db, error) { return newDB(ctx, rootstore, acp, lens, options...) } @@ -339,13 +339,20 @@ func (db *db) DeleteDocActorRelationship( return client.DeleteDocActorRelationshipResult{RecordFound: recordFound}, nil } -func (db *db) GetNodeIdentity(context.Context) (immutable.Option[identity.PublicRawIdentity], error) { +func (db *db) GetNodeIdentity(_ context.Context) (immutable.Option[identity.PublicRawIdentity], error) { if db.nodeIdentity.HasValue() { return immutable.Some(db.nodeIdentity.Value().IntoRawIdentity().Public()), nil } return immutable.None[identity.PublicRawIdentity](), nil } +func (db *db) GetIdentityToken(_ context.Context, audience immutable.Option[string]) ([]byte, error) { + if db.nodeIdentity.HasValue() { + return db.nodeIdentity.Value().NewToken(time.Hour*24, audience, immutable.None[string]()) + } + return nil, nil +} + // Initialize is called when a database is first run and creates all the db global meta data // like Collection ID counters. func (db *db) initialize(ctx context.Context) error { diff --git a/internal/db/permission/check.go b/internal/db/permission/check.go index 599329855b..550f9b1ef0 100644 --- a/internal/db/permission/check.go +++ b/internal/db/permission/check.go @@ -40,6 +40,81 @@ func CheckAccessOfDocOnCollectionWithACP( collection client.Collection, permission acp.DPIPermission, docID string, +) (bool, error) { + identityFunc := func() immutable.Option[acpIdentity.Identity] { + return identity + } + return CheckDocAccessWithIdentityFunc( + ctx, + identityFunc, + acpSystem, + collection, + permission, + docID, + ) +} + +// CheckAccessDocAccessWithDID handles the check, which tells us if access to the target +// document is valid, with respect to the permission type, and the specified collection. +// +// The identity is determined by a DID. +// +// This function should only be called if acp is available. As we have unrestricted +// access when acp is not available (acp turned off). +// +// Since we know acp is enabled we have these components to check in this function: +// (1) the request is permissioned (has an identity), +// (2) the collection is permissioned (has a policy), +// +// Unrestricted Access to document if: +// - (2) is false. +// - Document is public (unregistered), whether signatured request or not doesn't matter. +func CheckAccessDocAccessWithDID( + ctx context.Context, + did string, + acpSystem acp.ACP, + collection client.Collection, + permission acp.DPIPermission, + docID string, +) (bool, error) { + identityFunc := func() immutable.Option[acpIdentity.Identity] { + if did == "" { + return immutable.None[acpIdentity.Identity]() + } + return immutable.Some[acpIdentity.Identity](acpIdentity.Identity{DID: did}) + } + return CheckDocAccessWithIdentityFunc( + ctx, + identityFunc, + acpSystem, + collection, + permission, + docID, + ) +} + +// CheckDocAccessWithIdentityFunc handles the check, which tells us if access to the target +// document is valid, with respect to the permission type, and the specified collection. +// +// The identity is determined by an identity function. +// +// This function should only be called if acp is available. As we have unrestricted +// access when acp is not available (acp turned off). +// +// Since we know acp is enabled we have these components to check in this function: +// (1) the request is permissioned (has an identity), +// (2) the collection is permissioned (has a policy), +// +// Unrestricted Access to document if: +// - (2) is false. +// - Document is public (unregistered), whether signatured request or not doesn't matter. +func CheckDocAccessWithIdentityFunc( + ctx context.Context, + identityFunc func() immutable.Option[acpIdentity.Identity], + acpSystem acp.ACP, + collection client.Collection, + permission acp.DPIPermission, + docID string, ) (bool, error) { // Even if acp exists, but there is no policy on the collection (unpermissioned collection) // then we still have unrestricted access. @@ -67,6 +142,7 @@ func CheckAccessOfDocOnCollectionWithACP( return true, nil } + identity := identityFunc() var identityValue string if !identity.HasValue() { // We can't assume that there is no-access just because there is no identity even if the document diff --git a/net/acp.go b/net/acp.go new file mode 100644 index 0000000000..d7155b7b11 --- /dev/null +++ b/net/acp.go @@ -0,0 +1,31 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package net + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" +) + +type ACP interface { + acp.ACP + // GetCollections returns the list of collections according to the given options. + GetCollections(ctx context.Context, opts client.CollectionFetchOptions) ([]client.Collection, error) + // GetIndentityToken returns an identity token for the given audience. + GetIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) + // GetNodeIdentity returns the node's public raw identity. + GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) +} diff --git a/net/client.go b/net/client.go index d5276f292b..40bdc5b1c1 100644 --- a/net/client.go +++ b/net/client.go @@ -71,3 +71,26 @@ func (s *server) pushLog(evt event.Update, pid peer.ID) (err error) { } return nil } + +// getIdentity creates a getIdentity request and sends it to another node +func (s *server) getIdentity(ctx context.Context, pid peer.ID) (getIdentityReply, error) { + client, err := s.dial(pid) // grpc dial over P2P stream + if err != nil { + return getIdentityReply{}, NewErrPushLog(err) + } + + ctx, cancel := context.WithTimeout(ctx, PushTimeout) + defer cancel() + + req := getIdentityRequest{ + PeerID: s.peer.host.ID().String(), + } + resp := getIdentityReply{} + if err := client.Invoke(ctx, serviceGetIdentityName, req, &resp); err != nil { + return getIdentityReply{}, NewErrFailedToGetIdentity( + err, + errors.NewKV("PeerID", pid), + ) + } + return resp, nil +} diff --git a/net/dialer_test.go b/net/dialer_test.go index 4ed8bcf68b..0346ac330b 100644 --- a/net/dialer_test.go +++ b/net/dialer_test.go @@ -14,6 +14,7 @@ import ( "context" "testing" + "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -29,6 +30,7 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) { db1.Blockstore(), db1.Encstore(), db1.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -38,6 +40,7 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) { db2.Blockstore(), db1.Encstore(), db2.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -61,6 +64,7 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) { db1.Blockstore(), db1.Encstore(), db1.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -70,6 +74,7 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) { db2.Blockstore(), db1.Encstore(), db2.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -96,6 +101,7 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing db1.Blockstore(), db1.Encstore(), db1.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -105,6 +111,7 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing db2.Blockstore(), db1.Encstore(), db2.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) diff --git a/net/errors.go b/net/errors.go index 3a21c8e5c1..bfe61d81fa 100644 --- a/net/errors.go +++ b/net/errors.go @@ -25,6 +25,7 @@ const ( errRequestingEncryptionKeys = "failed to request encryption keys with %v" errTopicAlreadyExist = "topic with name \"%s\" already exists" errTopicDoesNotExist = "topic with name \"%s\" does not exists" + errFailedToGetIdentity = "failed to get identity" ) var ( @@ -59,3 +60,7 @@ func NewErrTopicAlreadyExist(topic string) error { func NewErrTopicDoesNotExist(topic string) error { return errors.New(fmt.Sprintf(errTopicDoesNotExist, topic)) } + +func NewErrFailedToGetIdentity(inner error, kv ...errors.KV) error { + return errors.Wrap(errFailedToGetIdentity, inner, kv...) +} diff --git a/net/grpc.go b/net/grpc.go index 8e526de102..f11faa9966 100644 --- a/net/grpc.go +++ b/net/grpc.go @@ -24,6 +24,7 @@ const ( serviceGetLogName = "/" + grpcServiceName + "/GetLog" servicePushLogName = "/" + grpcServiceName + "/PushLog" serviceGetHeadLogName = "/" + grpcServiceName + "/GetHeadLog" + serviceGetIdentityName = "/" + grpcServiceName + "/GetIdentity" ) type getDocGraphRequest struct{} @@ -52,6 +53,17 @@ type pushLogRequest struct { type pushLogReply struct{} +type getIdentityRequest struct { + // PeerID is the ID of the requesting peer. + // It will be used as the audience for the identity token. + PeerID string +} + +type getIdentityReply struct { + // IdentityToken is the token that can be used to authenticate the peer. + IdentityToken []byte +} + type serviceServer interface { // GetDocGraph from this peer. GetDocGraph(context.Context, *getDocGraphRequest) (*getDocGraphReply, error) @@ -63,6 +75,30 @@ type serviceServer interface { PushLog(context.Context, *pushLogRequest) (*pushLogReply, error) // GetHeadLog from this peer GetHeadLog(context.Context, *getHeadLogRequest) (*getHeadLogReply, error) + GetIdentity(context.Context, *getIdentityRequest) (*getIdentityReply, error) +} + +func getIdentityHandler( + srv any, + ctx context.Context, + dec func(any) error, + interceptor grpc.UnaryServerInterceptor, +) (any, error) { + in := new(getIdentityRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(serviceServer).GetIdentity(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: serviceGetIdentityName, + } + handler := func(ctx context.Context, req any) (any, error) { + return srv.(serviceServer).GetIdentity(ctx, req.(*getIdentityRequest)) + } + return interceptor(ctx, in, info, handler) } func pushLogHandler( @@ -97,6 +133,10 @@ func registerServiceServer(s grpc.ServiceRegistrar, srv serviceServer) { MethodName: "PushLog", Handler: pushLogHandler, }, + { + MethodName: "GetIdentity", + Handler: getIdentityHandler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "defradb.cbor", diff --git a/net/peer.go b/net/peer.go index d59d6fe150..d9186a10ed 100644 --- a/net/peer.go +++ b/net/peer.go @@ -30,6 +30,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/sourcenetwork/corelog" + "github.com/sourcenetwork/immutable" "google.golang.org/grpc" "github.com/sourcenetwork/defradb/client" @@ -61,6 +62,8 @@ type Peer struct { // peer DAG service bserv blockservice.BlockService + acp immutable.Option[ACP] + bootCloser io.Closer } @@ -70,6 +73,7 @@ func NewPeer( blockstore datastore.Blockstore, encstore datastore.Blockstore, bus *event.Bus, + acp immutable.Option[ACP], opts ...NodeOpt, ) (p *Peer, err error) { ctx, cancel := context.WithCancel(ctx) @@ -111,9 +115,6 @@ func NewPeer( corelog.Any("Address", options.ListenAddresses), ) - bswapnet := network.NewFromIpfsHost(h, ddht) - bswap := bitswap.New(ctx, bswapnet, blockstore) - p = &Peer{ host: h, dht: ddht, @@ -122,8 +123,8 @@ func NewPeer( ctx: ctx, cancel: cancel, bus: bus, + acp: acp, p2pRPC: grpc.NewServer(options.GRPCServerOptions...), - bserv: blockservice.New(blockstore, bswap), } if options.EnablePubSub { @@ -149,6 +150,10 @@ func NewPeer( return nil, err } + bswapnet := network.NewFromIpfsHost(h, ddht) + bswap := bitswap.New(ctx, bswapnet, blockstore, bitswap.WithPeerBlockRequestFilter(p.server.hasAccess)) + p.bserv = blockservice.New(blockstore, bswap) + p2pListener, err := gostream.Listen(h, corenet.Protocol) if err != nil { return nil, err diff --git a/net/peer_test.go b/net/peer_test.go index 40249192ea..e2150a6c52 100644 --- a/net/peer_test.go +++ b/net/peer_test.go @@ -82,6 +82,7 @@ func newTestPeer(ctx context.Context, t *testing.T) (client.DB, *Peer) { db.Blockstore(), db.Encstore(), db.Events(), + immutable.None[ACP](), WithListenAddresses(randomMultiaddr), ) require.NoError(t, err) @@ -95,14 +96,14 @@ func TestNewPeer_NoError(t *testing.T) { db, err := db.NewDB(ctx, store, acp.NoACP, nil) require.NoError(t, err) defer db.Close() - p, err := NewPeer(ctx, db.Blockstore(), db.Encstore(), db.Events()) + p, err := NewPeer(ctx, db.Blockstore(), db.Encstore(), db.Events(), immutable.None[ACP]()) require.NoError(t, err) p.Close() } func TestNewPeer_NoDB_NilDBError(t *testing.T) { ctx := context.Background() - _, err := NewPeer(ctx, nil, nil, nil, nil) + _, err := NewPeer(ctx, nil, nil, nil, immutable.None[ACP]()) require.ErrorIs(t, err, ErrNilDB) } @@ -123,6 +124,7 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { db1.Blockstore(), db1.Encstore(), db1.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -132,6 +134,7 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { db2.Blockstore(), db1.Encstore(), db2.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -271,6 +274,7 @@ func TestNewPeer_WithEnableRelay_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), + immutable.None[ACP](), WithEnableRelay(true), ) require.NoError(t, err) @@ -289,6 +293,7 @@ func TestNewPeer_NoPubSub_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), + immutable.None[ACP](), WithEnablePubSub(false), ) require.NoError(t, err) @@ -308,6 +313,7 @@ func TestNewPeer_WithEnablePubSub_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), + immutable.None[ACP](), WithEnablePubSub(true), ) @@ -328,6 +334,7 @@ func TestNodeClose_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), + immutable.None[ACP](), ) require.NoError(t, err) n.Close() @@ -345,6 +352,7 @@ func TestListenAddrs_WithListenAddresses_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), + immutable.None[ACP](), WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -364,6 +372,7 @@ func TestPeer_WithBootstrapPeers_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), + immutable.None[ACP](), WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"), ) require.NoError(t, err) diff --git a/net/server.go b/net/server.go index 0be9def0ce..9377b09227 100644 --- a/net/server.go +++ b/net/server.go @@ -19,18 +19,25 @@ import ( "github.com/fxamacker/cbor/v2" cid "github.com/ipfs/go-cid" + "github.com/lestrrat-go/jwx/v2/jws" + "github.com/lestrrat-go/jwx/v2/jwt" + "github.com/libp2p/go-libp2p/core/peer" libpeer "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/sourcenetwork/corelog" rpc "github.com/sourcenetwork/go-libp2p-pubsub-rpc" + "github.com/sourcenetwork/immutable" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" grpcpeer "google.golang.org/grpc/peer" + "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/db/permission" ) // server is the request/response instance for all P2P RPC communication. @@ -48,6 +55,9 @@ type server struct { mu sync.Mutex conns map[libpeer.ID]*grpc.ClientConn + + peerIdentities map[libpeer.ID]identity.Identity + piMux sync.RWMutex } // pubsubTopic is a wrapper of rpc.Topic to be able to track if the topic has @@ -61,10 +71,11 @@ type pubsubTopic struct { // underlying DB instance. func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { s := &server{ - peer: p, - conns: make(map[libpeer.ID]*grpc.ClientConn), - topics: make(map[string]pubsubTopic), - replicators: make(map[string]map[libpeer.ID]struct{}), + peer: p, + conns: make(map[libpeer.ID]*grpc.ClientConn), + topics: make(map[string]pubsubTopic), + replicators: make(map[string]map[libpeer.ID]struct{}), + peerIdentities: make(map[libpeer.ID]identity.Identity), } cred := insecure.NewCredentials() @@ -100,8 +111,17 @@ func (s *server) GetLog(ctx context.Context, req *getLogRequest) (*getLogReply, return nil, nil } -// PushLog receives a push log request +// PushLog receives a push log request from the grpc server (replicator) func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogReply, error) { + return s.handlePushLog(ctx, req, true) +} + +// handlePushLog handles a push log request +func (s *server) handlePushLog( + ctx context.Context, + req *pushLogRequest, + isReplicator bool, +) (*pushLogReply, error) { pid, err := peerIDFromContext(ctx) if err != nil { return nil, err @@ -126,6 +146,16 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl return nil, err } + if !isReplicator && !s.trySelfHasAccess(block) { + // If we know we don't have access, we can skip the rest of the processing. + // No need to check access if the message is for replication as the node sending + // will have done so deliberately. + log.InfoContext(ctx, "Skipping pushlog due to known lack of access", + corelog.Any("PeerID", pid.String()), + corelog.Any("DocID", req.DocID)) + return &pushLogReply{}, nil + } + log.InfoContext(ctx, "Received pushlog", corelog.Any("PeerID", pid.String()), corelog.Any("Creator", byPeer.String()), @@ -172,6 +202,22 @@ func (s *server) GetHeadLog( return nil, nil } +// GetIdentity receives a get identity request and returns the identity token +// with the requesting peer as the audience. +func (s *server) GetIdentity( + ctx context.Context, + req *getIdentityRequest, +) (*getIdentityReply, error) { + if !s.peer.acp.HasValue() { + return &getIdentityReply{}, nil + } + token, err := s.peer.acp.Value().GetIdentityToken(ctx, immutable.Some(req.PeerID)) + if err != nil { + return nil, err + } + return &getIdentityReply{IdentityToken: token}, nil +} + // addPubSubTopic subscribes to a topic on the pubsub network // A custom message handler can be provided to handle incoming messages. If not provided, // the default message handler will be used. @@ -327,7 +373,7 @@ func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{ Addr: addr{from}, }) - if _, err := s.PushLog(ctx, req); err != nil { + if _, err := s.handlePushLog(ctx, req, false); err != nil { return nil, errors.Wrap(fmt.Sprintf("Failed pushing log for doc %s", topic), err) } return nil, nil @@ -447,3 +493,153 @@ func (s *server) SendPubSubMessage( } return t.Publish(ctx, data) } + +// hasAccess checks if the requesting peer has access to the given cid. +func (s *server) hasAccess(p peer.ID, c cid.Cid) bool { + if !s.peer.acp.HasValue() { + return true + } + + rawblock, err := s.peer.blockstore.Get(s.peer.ctx, c) + if err != nil { + log.ErrorE("Failed to get block", err) + return false + } + block, err := coreblock.GetFromBytes(rawblock.RawData()) + if err != nil { + log.ErrorE("Failed to get doc from block", err) + return false + } + + cols, err := s.peer.acp.Value().GetCollections( + s.peer.ctx, + client.CollectionFetchOptions{ + SchemaVersionID: immutable.Some(block.Delta.GetSchemaVersionID()), + }, + ) + if err != nil { + log.ErrorE("Failed to get collections", err) + return false + } + if len(cols) == 0 { + log.Info("No collections found", corelog.Any("Schema Version ID", block.Delta.GetSchemaVersionID())) + return false + } + + // If the requesting peer is in the replicators list for that collection, then they have access. + if peerList, ok := s.replicators[cols[0].SchemaRoot()]; ok { + _, exists := peerList[p] + if exists { + return true + } + } + + identFunc := func() immutable.Option[identity.Identity] { + s.piMux.RLock() + ident, ok := s.peerIdentities[p] + s.piMux.RUnlock() + if !ok { + resp, err := s.getIdentity(s.peer.ctx, p) + if err != nil { + log.ErrorE("Failed to get identity", err) + return immutable.None[identity.Identity]() + } + ident, err = identity.FromToken(resp.IdentityToken) + if err != nil { + log.ErrorE("Failed to parse identity token", err) + return immutable.None[identity.Identity]() + } + err = verifyAuthToken(ident, s.peer.PeerID().String()) + if err != nil { + log.ErrorE("Failed to verify auth token", err) + return immutable.None[identity.Identity]() + } + s.piMux.Lock() + s.peerIdentities[p] = ident + s.piMux.Unlock() + } + return immutable.Some(ident) + } + + peerHasAccess, err := permission.CheckDocAccessWithIdentityFunc( + s.peer.ctx, + identFunc, + s.peer.acp.Value(), + cols[0], // For now we assume there is only one collection. + acp.ReadPermission, + string(block.Delta.GetDocID()), + ) + if err != nil { + log.ErrorE("Failed to check access", err) + return false + } + + return peerHasAccess +} + +// trySelfHasAccess checks if the local node has access to the given block. +// +// This is a best-effort check and returns true unless we explicitly find that we don't have access. +func (s *server) trySelfHasAccess(block *coreblock.Block) bool { + if !s.peer.acp.HasValue() { + return true + } + + cols, err := s.peer.acp.Value().GetCollections( + s.peer.ctx, + client.CollectionFetchOptions{ + SchemaVersionID: immutable.Some(block.Delta.GetSchemaVersionID()), + }, + ) + if err != nil { + log.ErrorE("Failed to get collections", err) + return true + } + if len(cols) == 0 { + log.Info("No collections found", corelog.Any("Schema Version ID", block.Delta.GetSchemaVersionID())) + return true + } + ident, err := s.peer.acp.Value().GetNodeIdentity(s.peer.ctx) + if err != nil { + log.ErrorE("Failed to get node identity", err) + return true + } + if !ident.HasValue() { + log.Info("No node identity found") + return true + } + + peerHasAccess, err := permission.CheckAccessDocAccessWithDID( + s.peer.ctx, + ident.Value().DID, + s.peer.acp.Value(), + cols[0], // For now we assume there is only one collection. + acp.ReadPermission, + string(block.Delta.GetDocID()), + ) + if err != nil { + log.ErrorE("Failed to check access", err) + return true + } + + return peerHasAccess +} + +// verifyAuthToken verifies that the jwt auth token is valid and that the signature +// matches the identity of the subject. +func verifyAuthToken(ident identity.Identity, audience string) error { + _, err := jwt.Parse([]byte(ident.BearerToken), jwt.WithVerify(false), jwt.WithAudience(audience)) + if err != nil { + return err + } + + _, err = jws.Verify( + []byte(ident.BearerToken), + jws.WithKey(identity.BearerTokenSignatureScheme, ident.PublicKey.ToECDSA()), + ) + if err != nil { + return err + } + + return nil +} diff --git a/net/sync_dag.go b/net/sync_dag.go index 11e021f239..76a834cbcb 100644 --- a/net/sync_dag.go +++ b/net/sync_dag.go @@ -23,9 +23,9 @@ import ( coreblock "github.com/sourcenetwork/defradb/internal/core/block" ) -// syncDAGTimeout is the maximum amount of time -// to wait for a dag to be fetched. -var syncDAGTimeout = 60 * time.Second +// syncBlockLinkTimeout is the maximum amount of time +// to wait for a block link to be fetched. +var syncBlockLinkTimeout = 5 * time.Second // syncDAG synchronizes the DAG starting with the given block // using the blockservice to fetch remote blocks. @@ -60,9 +60,8 @@ func syncDAG(ctx context.Context, bserv blockservice.BlockService, block *corebl // If it encounters errors in the concurrent loading of links, it will return // the first error it encountered. func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblock.Block) error { - ctx, cancel := context.WithTimeout(ctx, syncDAGTimeout) + ctxWithCancel, cancel := context.WithCancel(ctx) defer cancel() - var wg sync.WaitGroup var asyncErr error var asyncErrOnce sync.Once @@ -76,10 +75,12 @@ func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblo wg.Add(1) go func(lnk cidlink.Link) { defer wg.Done() - if ctx.Err() != nil { + if ctxWithCancel.Err() != nil { return } - nd, err := lsys.Load(linking.LinkContext{Ctx: ctx}, lnk, coreblock.SchemaPrototype) + ctxWithTimeout, cancel := context.WithTimeout(ctx, syncBlockLinkTimeout) + defer cancel() + nd, err := lsys.Load(linking.LinkContext{Ctx: ctxWithTimeout}, lnk, coreblock.SchemaPrototype) if err != nil { asyncErrOnce.Do(func() { setAsyncErr(err) }) return diff --git a/node/acp.go b/node/acp.go index 4092aa3532..f419536f58 100644 --- a/node/acp.go +++ b/node/acp.go @@ -17,6 +17,9 @@ import ( "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/net" ) type ACPType string @@ -149,3 +152,37 @@ func NewACP(ctx context.Context, opts ...ACPOpt) (immutable.Option[acp.ACP], err return immutable.Some[acp.ACP](acpLocal), nil } } + +// acpDB is an interface for ACP related DB operations. +type acpDB interface { + GetCollections(ctx context.Context, options client.CollectionFetchOptions) ([]client.Collection, error) + GetIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) + GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) +} + +type netACP struct { + acp.ACP + db acpDB +} + +var _ net.ACP = (*netACP)(nil) + +// NewNetACP returns a new net.ACP instance. +func NewNetACP(a immutable.Option[acp.ACP], db acpDB) immutable.Option[net.ACP] { + if !a.HasValue() { + return immutable.None[net.ACP]() + } + return immutable.Some[net.ACP](&netACP{a.Value(), db}) +} + +func (n *netACP) GetCollections(ctx context.Context, options client.CollectionFetchOptions) ([]client.Collection, error) { + return n.db.GetCollections(ctx, options) +} + +func (n *netACP) GetIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) { + return n.db.GetIdentityToken(ctx, audience) +} + +func (n *netACP) GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) { + return n.db.GetNodeIdentity(ctx) +} diff --git a/node/node.go b/node/node.go index aa47bfbc5c..15be7cf8fd 100644 --- a/node/node.go +++ b/node/node.go @@ -147,19 +147,27 @@ func (n *Node) Start(ctx context.Context) error { return err } - n.DB, err = db.NewDB(ctx, rootstore, acp, lens, n.dbOpts...) + coreDB, err := db.NewDB(ctx, rootstore, acp, lens, n.dbOpts...) if err != nil { return err } + n.DB = coreDB if !n.options.disableP2P { // setup net node - n.Peer, err = net.NewPeer(ctx, n.DB.Blockstore(), n.DB.Encstore(), n.DB.Events(), n.netOpts...) + n.Peer, err = net.NewPeer( + ctx, + coreDB.Blockstore(), + coreDB.Encstore(), + coreDB.Events(), + NewNetACP(acp, coreDB), + n.netOpts..., + ) if err != nil { return err } - ident, err := n.DB.GetNodeIdentity(ctx) + ident, err := coreDB.GetNodeIdentity(ctx) if err != nil { return err } @@ -171,10 +179,10 @@ func (n *Node) Start(ctx context.Context) error { ctx, n.Peer.PeerID(), n.Peer.Server(), - n.DB.Events(), - n.DB.Encstore(), + coreDB.Events(), + coreDB.Encstore(), acp, - db.NewCollectionRetriever(n.DB), + db.NewCollectionRetriever(coreDB), ident.Value().DID, ) } @@ -186,7 +194,7 @@ func (n *Node) Start(ctx context.Context) error { if !n.options.disableAPI { // setup http server - handler, err := http.NewHandler(n.DB) + handler, err := http.NewHandler(coreDB) if err != nil { return err } diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 5983ad228a..93b23f6326 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -219,7 +219,7 @@ func addDocActorRelationshipACP( docID: {}, } - waitForUpdateEvents(s, actionNodeID, action.CollectionID, expect) + waitForUpdateEvents(s, actionNodeID, action.CollectionID, expect, action.TargetIdentity) } } @@ -471,7 +471,7 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { // // We need to lock before getting the ports, otherwise they may try and use the port we use for locking. // We can only unlock after the source hub node has started and begun listening on the assigned ports. - unlock, err := crossLock(55555) + unlock, err := crossLock(44444) if err != nil { return nil, err } @@ -602,6 +602,7 @@ func crossLock(port uint16) (func(), error) { l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", port)) if err != nil { if strings.Contains(err.Error(), "address already in use") { + fmt.Println(err) time.Sleep(5 * time.Millisecond) continue } diff --git a/tests/integration/acp/p2p/create_test.go b/tests/integration/acp/p2p/create_test.go index db3d5a4508..ab248fff9b 100644 --- a/tests/integration/acp/p2p/create_test.go +++ b/tests/integration/acp/p2p/create_test.go @@ -130,3 +130,147 @@ func TestACP_P2PCreatePrivateDocumentsOnDifferentNodes_SourceHubACP(t *testing.T testUtils.ExecuteTestCase(t, test) } + +func TestACP_P2PCreatePrivateDocumentsOnDifferentNodes_SourceHubACP1(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + Description: "Test acp, p2p create private documents on different nodes, with source-hub", + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + + testUtils.AddPolicy{ + + Identity: testUtils.ClientIdentity(1), + + Policy: ` + name: Test Policy + + description: A Policy + + actor: + name: actor + + resources: + users: + permissions: + read: + expr: owner + reader + writer + + write: + expr: owner + writer + + nothing: + expr: dummy + + relations: + owner: + types: + - actor + + reader: + types: + - actor + + writer: + types: + - actor + + admin: + manages: + - reader + types: + - actor + + dummy: + types: + - actor + `, + + ExpectedPolicyID: expectedPolicyID, + }, + + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + + testUtils.CreateDoc{ + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(0), + CollectionID: 0, + DocMap: map[string]any{ + "name": "Shahzad", + }, + }, + + // At this point the document is only accessible to the owner so node 1 + // should not have been able to sync the document. + testUtils.Request{ + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(1), + Request: `query { + Users{ + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, + + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.NodeIdentity(1), + CollectionID: 0, + DocID: 0, + Relation: "reader", + ExpectedExistence: false, + }, + + testUtils.WaitForSync{}, + + testUtils.Request{ + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(1), + Request: `query { + Users { + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + {"name": "Shahzad"}, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/acp/p2p/subscribe_test.go b/tests/integration/acp/p2p/subscribe_test.go index e776ae4fb2..378f9a1887 100644 --- a/tests/integration/acp/p2p/subscribe_test.go +++ b/tests/integration/acp/p2p/subscribe_test.go @@ -168,9 +168,10 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollection_SourceHubACP(t * "name": "John", }, }, - testUtils.WaitForSync{}, testUtils.Request{ - // Ensure that the document is accessible on all nodes to authorized actors + // The document will only be accessible on node 0 since node 1 is not authorized to + // access the document. + NodeID: immutable.Some(0), Identity: testUtils.ClientIdentity(1), Request: ` query { @@ -187,6 +188,22 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollection_SourceHubACP(t * }, }, }, + testUtils.Request{ + // Since node 1 is not authorized to access the document, it won't have to document + // so even if requesting with an authorized identity, the document won't be returned. + NodeID: immutable.Some(1), + Identity: testUtils.ClientIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, testUtils.Request{ // Ensure that the document is hidden on all nodes to unidentified actors Request: ` diff --git a/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go b/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go index 52038b8d5b..3f2ea6b901 100644 --- a/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go +++ b/tests/integration/acp/p2p/subscribe_with_doc_actor_relationship_test.go @@ -104,34 +104,38 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel testUtils.ConnectPeers{ SourceNodeID: 1, - TargetNodeID: 0, }, testUtils.SubscribeToCollection{ - NodeID: 1, - + NodeID: 1, CollectionIDs: []int{0}, }, testUtils.CreateDoc{ - Identity: testUtils.ClientIdentity(1), - - NodeID: immutable.Some(0), - + Identity: testUtils.ClientIdentity(1), + NodeID: immutable.Some(0), CollectionID: 0, - DocMap: map[string]any{ "name": "Shahzad", }, }, + testUtils.AddDocActorRelationship{ + NodeID: immutable.Some(0), + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.NodeIdentity(1), + CollectionID: 0, + DocID: 0, + Relation: "reader", + ExpectedExistence: false, + }, + testUtils.WaitForSync{}, testUtils.Request{ // Ensure that the document is hidden on all nodes to an unauthorized actor Identity: testUtils.ClientIdentity(2), - Request: ` query { Users { @@ -139,48 +143,34 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{}, }, }, testUtils.AddDocActorRelationship{ - NodeID: immutable.Some(0), - + NodeID: immutable.Some(0), RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedExistence: false, }, testUtils.AddDocActorRelationship{ - NodeID: immutable.Some(1), // Note: Different node than the previous - + NodeID: immutable.Some(1), // Note: Different node than the previous RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedExistence: true, // Making the same relation through any node should be a no-op }, testUtils.Request{ // Ensure that the document is now accessible on all nodes to the newly authorized actor. Identity: testUtils.ClientIdentity(2), - Request: ` query { Users { @@ -188,7 +178,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{ { @@ -201,7 +190,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel testUtils.Request{ // Ensure that the document is still accessible on all nodes to the owner. Identity: testUtils.ClientIdentity(1), - Request: ` query { Users { @@ -209,7 +197,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{ { @@ -220,41 +207,28 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel }, testUtils.DeleteDocActorRelationship{ - NodeID: immutable.Some(1), - - RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + NodeID: immutable.Some(1), + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedRecordFound: true, }, testUtils.DeleteDocActorRelationship{ - NodeID: immutable.Some(0), // Note: Different node than the previous - - RequestorIdentity: testUtils.ClientIdentity(1), - - TargetIdentity: testUtils.ClientIdentity(2), - - CollectionID: 0, - - DocID: 0, - - Relation: "reader", - + NodeID: immutable.Some(0), // Note: Different node than the previous + RequestorIdentity: testUtils.ClientIdentity(1), + TargetIdentity: testUtils.ClientIdentity(2), + CollectionID: 0, + DocID: 0, + Relation: "reader", ExpectedRecordFound: false, // Making the same relation through any node should be a no-op }, testUtils.Request{ // Ensure that the document is now inaccessible on all nodes to the actor we revoked access from. Identity: testUtils.ClientIdentity(2), - Request: ` query { Users { @@ -271,7 +245,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel testUtils.Request{ // Ensure that the document is still accessible on all nodes to the owner. Identity: testUtils.ClientIdentity(1), - Request: ` query { Users { @@ -279,7 +252,6 @@ func TestACP_P2PSubscribeAddGetSingleWithPermissionedCollectionCreateDocActorRel } } `, - Results: map[string]any{ "Users": []map[string]any{ { diff --git a/tests/integration/encryption/peer_acp_test.go b/tests/integration/encryption/peer_acp_test.go index bb6705c626..9b0fd6f132 100644 --- a/tests/integration/encryption/peer_acp_test.go +++ b/tests/integration/encryption/peer_acp_test.go @@ -433,7 +433,6 @@ func TestDocEncryptionACP_IfClientNodeHasDocPermissionButServerNodeIsNotAvailabl `, IsDocEncrypted: true, }, - testUtils.WaitForSync{}, testUtils.Close{ NodeID: immutable.Some(0), }, diff --git a/tests/integration/events.go b/tests/integration/events.go index 12fc58f8b7..658945c757 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -12,6 +12,7 @@ package tests import ( "encoding/json" + "strconv" "time" "github.com/sourcenetwork/immutable" @@ -155,6 +156,7 @@ func waitForUpdateEvents( nodeID immutable.Option[int], collectionIndex int, docIDs map[string]struct{}, + ident immutable.Option[identity], ) { for i := 0; i < len(s.nodes); i++ { if nodeID.HasValue() && nodeID.Value() != i { @@ -197,7 +199,7 @@ func waitForUpdateEvents( // we only need to update the network state if the nodes // are configured for networking if s.isNetworkEnabled { - updateNetworkState(s, i, evt) + updateNetworkState(s, i, evt, ident) } } } @@ -271,7 +273,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { // updateNetworkState updates the network state by checking which // nodes should receive the updated document in the given update event. -func updateNetworkState(s *state, nodeID int, evt event.Update) { +func updateNetworkState(s *state, nodeID int, evt event.Update, ident immutable.Option[identity]) { // find the correct collection index for this update collectionID := -1 for i, c := range s.nodes[nodeID].collections { @@ -298,6 +300,13 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { if _, ok := s.nodes[id].p2p.actualDAGHeads[getUpdateEventKey(evt)]; ok { s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } + if ident.HasValue() && ident.Value().selector != strconv.Itoa(id) { + // If the document is created by a specific identity, only the node with the + // same index as the identity can initially access it. + // If this network state update comes from the adding of an actor relationship, + // then the identity reflects that of the target node. + continue + } // peer collection subscribers receive updates from any other subscriber node if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok { s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 6ab621728e..2f6a47dd6c 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -1225,7 +1225,13 @@ func createDoc( s.docIDs[action.CollectionID] = append(s.docIDs[action.CollectionID], docIDs...) if action.ExpectedError == "" { - waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForCreateDoc(s, action)) + waitForUpdateEvents( + s, + action.NodeID, + action.CollectionID, + getEventsForCreateDoc(s, action), + action.Identity, + ) } } @@ -1407,7 +1413,7 @@ func deleteDoc( docID.String(): {}, } - waitForUpdateEvents(s, action.NodeID, action.CollectionID, expect) + waitForUpdateEvents(s, action.NodeID, action.CollectionID, expect, immutable.None[identity]()) } } @@ -1452,7 +1458,13 @@ func updateDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { - waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForUpdateDoc(s, action)) + waitForUpdateEvents( + s, + action.NodeID, + action.CollectionID, + getEventsForUpdateDoc(s, action), + immutable.None[identity](), + ) } } @@ -1552,7 +1564,13 @@ func updateWithFilter(s *state, action UpdateWithFilter) { assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { - waitForUpdateEvents(s, action.NodeID, action.CollectionID, getEventsForUpdateWithFilter(s, action, res)) + waitForUpdateEvents( + s, + action.NodeID, + action.CollectionID, + getEventsForUpdateWithFilter(s, action, res), + immutable.None[identity](), + ) } }