Skip to content

Commit

Permalink
feat: Add support for branchable collections (#3216)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #3038

## Description

Adds support for branchable collections.

Does not add support for syncing the collection level commits via the
P2P system (broken out, lots of people working in the space, significant
changes required, I'm not so familiar with that part of the code so will
take longer). This does mean that the somewhat surprising (to me at
least) implementation of `Collection.Merge` is currently untested.

Commits are queriable via the `commits` GQL queries.

Time travel queries do not work due to an existing bug:
#3214 - once that bug is
fixed I expect there to be some more work (definitely testing) in order
to get it to work with branchable collections.

This is a breaking change due to the moving/namespacing of the existing
document headstore keys.
  • Loading branch information
AndrewSisley authored Nov 8, 2024
1 parent 2f53878 commit 85bbdc0
Show file tree
Hide file tree
Showing 49 changed files with 1,718 additions and 92 deletions.
17 changes: 17 additions & 0 deletions client/collection_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ type CollectionDescription struct {
// At the moment this can only be set to `false` if this collection sources its data from
// another collection/query (is a View).
IsMaterialized bool

// IsBranchable defines whether the history of this collection is tracked as a single,
// verifiable entity.
//
// If set to `true` any change to the contents of this set will be linked to a collection
// level commit via the document(s) composite commit.
//
// This enables multiple nodes to verify that they have the same state/history.
//
// The history may be queried like a document history can be queried, for example via 'commits'
// GQL queries.
//
// Currently this property is immutable and can only be set on collection creation, however
// that will change in the future.
IsBranchable bool
}

// QuerySource represents a collection data source from a query.
Expand Down Expand Up @@ -189,6 +204,7 @@ type collectionDescription struct {
RootID uint32
SchemaVersionID string
IsMaterialized bool
IsBranchable bool
Policy immutable.Option[PolicyDescription]
Indexes []IndexDescription
Fields []CollectionFieldDescription
Expand All @@ -209,6 +225,7 @@ func (c *CollectionDescription) UnmarshalJSON(bytes []byte) error {
c.RootID = descMap.RootID
c.SchemaVersionID = descMap.SchemaVersionID
c.IsMaterialized = descMap.IsMaterialized
c.IsBranchable = descMap.IsBranchable
c.Indexes = descMap.Indexes
c.Fields = descMap.Fields
c.Sources = make([]any, len(descMap.Sources))
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i3038-branchable-collections.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Add support for branchable collections

The existing keys in the headstore gained a '/d' prefix in order to accommodate new types of keys within the headstore.
6 changes: 6 additions & 0 deletions docs/website/references/http/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@
},
"type": "array"
},
"IsBranchable": {
"type": "boolean"
},
"IsMaterialized": {
"type": "boolean"
},
Expand Down Expand Up @@ -276,6 +279,9 @@
},
"type": "array"
},
"IsBranchable": {
"type": "boolean"
},
"IsMaterialized": {
"type": "boolean"
},
Expand Down
1 change: 1 addition & 0 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
&crdt.LWWRegDelta{},
&crdt.CompositeDAGDelta{},
&crdt.CounterDelta{},
&crdt.CollectionDelta{},
)

EncryptionSchema, EncryptionSchemaPrototype = mustSetSchema(
Expand Down
75 changes: 75 additions & 0 deletions internal/core/crdt/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package crdt

import (
"context"

"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

// Collection is a simple CRDT type that tracks changes to the contents of a
// collection in a similar way to a document composite commit, only simpler,
// without the need to track status and a simpler [Merge] function.
type Collection struct {
// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey keys.CollectionSchemaVersionKey
}

var _ core.ReplicatedData = (*Collection)(nil)

func NewCollection(schemaVersionKey keys.CollectionSchemaVersionKey) *Collection {
return &Collection{
schemaVersionKey: schemaVersionKey,
}
}

func (c *Collection) Merge(ctx context.Context, other core.Delta) error {
// Collection merges don't actually need to do anything, as the delta is empty,
// and doc-level merges are handled by the document commits.
return nil
}

func (c *Collection) NewDelta() *CollectionDelta {
return &CollectionDelta{
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
}
}

type CollectionDelta struct {
Priority uint64

// As we do not yet have a global collection id we temporarily rely on the schema
// version id for tracking which collection this belongs to. See:
// https://github.com/sourcenetwork/defradb/issues/3215
SchemaVersionID string
}

var _ core.Delta = (*CollectionDelta)(nil)

func (delta *CollectionDelta) IPLDSchemaBytes() []byte {
return []byte(`
type CollectionDelta struct {
priority Int
schemaVersionID String
}`)
}

func (d *CollectionDelta) GetPriority() uint64 {
return d.Priority
}

func (d *CollectionDelta) SetPriority(priority uint64) {
d.Priority = priority
}
2 changes: 1 addition & 1 deletion internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewCompositeDAG(
}

// Set returns a new composite DAG delta CRDT with the given status.
func (c CompositeDAG) Set(status client.DocumentStatus) *CompositeDAGDelta {
func (c CompositeDAG) NewDelta(status client.DocumentStatus) *CompositeDAGDelta {
return &CompositeDAGDelta{
DocID: []byte(c.key.DocID),
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
Expand Down
22 changes: 22 additions & 0 deletions internal/core/crdt/ipld_union.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type CRDT struct {
LWWRegDelta *LWWRegDelta
CompositeDAGDelta *CompositeDAGDelta
CounterDelta *CounterDelta
CollectionDelta *CollectionDelta
}

// NewCRDT returns a new CRDT.
Expand All @@ -28,6 +29,8 @@ func NewCRDT(delta core.Delta) CRDT {
return CRDT{CompositeDAGDelta: d}
case *CounterDelta:
return CRDT{CounterDelta: d}
case *CollectionDelta:
return CRDT{CollectionDelta: d}
}
return CRDT{}
}
Expand All @@ -41,6 +44,7 @@ func (c CRDT) IPLDSchemaBytes() []byte {
| LWWRegDelta "lww"
| CompositeDAGDelta "composite"
| CounterDelta "counter"
| CollectionDelta "collection"
} representation keyed`)
}

Expand All @@ -53,6 +57,8 @@ func (c CRDT) GetDelta() core.Delta {
return c.CompositeDAGDelta
case c.CounterDelta != nil:
return c.CounterDelta
case c.CollectionDelta != nil:
return c.CollectionDelta
}
return nil
}
Expand All @@ -66,6 +72,8 @@ func (c CRDT) GetPriority() uint64 {
return c.CompositeDAGDelta.GetPriority()
case c.CounterDelta != nil:
return c.CounterDelta.GetPriority()
case c.CollectionDelta != nil:
return c.CollectionDelta.GetPriority()
}
return 0
}
Expand All @@ -90,6 +98,8 @@ func (c CRDT) GetDocID() []byte {
return c.CompositeDAGDelta.DocID
case c.CounterDelta != nil:
return c.CounterDelta.DocID
case c.CollectionDelta != nil:
return nil
}
return nil
}
Expand All @@ -103,6 +113,8 @@ func (c CRDT) GetSchemaVersionID() string {
return c.CompositeDAGDelta.SchemaVersionID
case c.CounterDelta != nil:
return c.CounterDelta.SchemaVersionID
case c.CollectionDelta != nil:
return c.CollectionDelta.SchemaVersionID
}
return ""
}
Expand Down Expand Up @@ -135,6 +147,11 @@ func (c CRDT) Clone() CRDT {
Nonce: c.CounterDelta.Nonce,
Data: c.CounterDelta.Data,
}
case c.CollectionDelta != nil:
cloned.CollectionDelta = &CollectionDelta{
Priority: c.CollectionDelta.Priority,
SchemaVersionID: c.CollectionDelta.SchemaVersionID,
}
}
return cloned
}
Expand Down Expand Up @@ -172,3 +189,8 @@ func (c CRDT) SetData(data []byte) {
func (c CRDT) IsComposite() bool {
return c.CompositeDAGDelta != nil
}

// IsCollection returns true if the CRDT is a collection CRDT.
func (c CRDT) IsCollection() bool {
return c.CollectionDelta != nil
}
23 changes: 23 additions & 0 deletions internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,29 @@ func (c *collection) save(
doc.SetHead(link.Cid)
})

if c.def.Description.IsBranchable {
collectionCRDT := merklecrdt.NewMerkleCollection(
txn,
keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
keys.NewHeadstoreColKey(c.def.Description.RootID),
)

link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}})
if err != nil {
return err
}

updateEvent := event.Update{
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
}

txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})
}

return nil
}

Expand Down
24 changes: 24 additions & 0 deletions internal/db/collection_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/keys"
merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt"
)
Expand Down Expand Up @@ -162,5 +163,28 @@ func (c *collection) applyDelete(
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})

if c.def.Description.IsBranchable {
collectionCRDT := merklecrdt.NewMerkleCollection(
txn,
keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
keys.NewHeadstoreColKey(c.def.Description.RootID),
)

link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}})
if err != nil {
return err
}

updateEvent := event.Update{
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
}

txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})
}

return nil
}
21 changes: 21 additions & 0 deletions internal/db/definition_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var collectionUpdateValidators = append(
validateIDExists,
validateSchemaVersionIDNotMutated,
validateCollectionNotRemoved,
validateCollectionIsBranchableNotMutated,
),
globalValidators...,
)
Expand Down Expand Up @@ -1036,3 +1037,23 @@ func validateCollectionFieldDefaultValue(

return nil
}

// validateCollectionIsBranchableNotMutated is a temporary restriction that prevents users from toggling
// whether or not a collection is branchable.
// https://github.com/sourcenetwork/defradb/issues/3219
func validateCollectionIsBranchableNotMutated(
ctx context.Context,
db *db,
newState *definitionState,
oldState *definitionState,
) error {
for _, newCol := range newState.collections {
oldCol := oldState.collectionsByID[newCol.ID]

if newCol.IsBranchable != oldCol.IsBranchable {
return NewErrColMutatingIsBranchable(newCol.Name.Value())
}
}

return nil
}
9 changes: 9 additions & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ const (
errFailedToHandleEncKeysReceivedEvent string = "failed to handle encryption-keys-received event"
errSelfReferenceWithoutSelf string = "must specify 'Self' kind for self referencing relations"
errColNotMaterialized string = "non-materialized collections are not supported"
errColMutatingIsBranchable string = "mutating IsBranchable is not supported"
errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP"
errInvalidDefaultFieldValue string = "default field value is invalid"
errDocIDNotFound string = "docID not found"
Expand Down Expand Up @@ -156,6 +157,7 @@ var (
ErrTimeoutDocRetry = errors.New("timeout while retrying doc")
ErrDocIDNotFound = errors.New(errDocIDNotFound)
ErrorCollectionWithSchemaRootNotFound = errors.New(errCollectionWithSchemaRootNotFound)
ErrColMutatingIsBranchable = errors.New(errColMutatingIsBranchable)
)

// NewErrFailedToGetHeads returns a new error indicating that the heads of a document
Expand Down Expand Up @@ -680,6 +682,13 @@ func NewErrColNotMaterialized(collection string) error {
)
}

func NewErrColMutatingIsBranchable(collection string) error {
return errors.New(
errColMutatingIsBranchable,
errors.NewKV("Collection", collection),
)
}

func NewErrMaterializedViewAndACPNotSupported(collection string) error {
return errors.New(
errMaterializedViewAndACPNotSupported,
Expand Down
Loading

0 comments on commit 85bbdc0

Please sign in to comment.