diff --git a/core/key.go b/core/key.go index c7c050897c..3d8873e2e7 100644 --- a/core/key.go +++ b/core/key.go @@ -11,6 +11,7 @@ package core import ( + "fmt" "strconv" "strings" @@ -41,18 +42,18 @@ const ( ) const ( - COLLECTION = "/collection/names" - COLLECTION_SCHEMA = "/collection/schema" - COLLECTION_SCHEMA_VERSION = "/collection/version/v" - COLLECTION_SCHEMA_VERSION_HISTORY = "/collection/version/h" - COLLECTION_INDEX = "/collection/index" - SCHEMA_MIGRATION = "/schema/migration" - SCHEMA_VERSION = "/schema/version" - SEQ = "/seq" - PRIMARY_KEY = "/pk" - DATASTORE_DOC_VERSION_FIELD_ID = "v" - REPLICATOR = "/replicator/id" - P2P_COLLECTION = "/p2p/collection" + COLLECTION = "/collection/id" + COLLECTION_NAME = "/collection/name" + COLLECTION_SCHEMA_VERSION = "/collection/version" + COLLECTION_INDEX = "/collection/index" + SCHEMA_MIGRATION = "/schema/migration" + SCHEMA_VERSION = "/schema/version/v" + SCHEMA_VERSION_HISTORY = "/schema/version/h" + SEQ = "/seq" + PRIMARY_KEY = "/pk" + DATASTORE_DOC_VERSION_FIELD_ID = "v" + REPLICATOR = "/replicator/id" + P2P_COLLECTION = "/p2p/collection" ) // Key is an interface that represents a key in the database. @@ -99,26 +100,32 @@ type HeadStoreKey struct { var _ Key = (*HeadStoreKey)(nil) -// CollectionKey points to the current/'head' SchemaVersionId for -// the collection of the given name. +// CollectionKey points to the json serialized description of the +// the collection of the given ID. type CollectionKey struct { - CollectionName string + CollectionID uint32 } var _ Key = (*CollectionKey)(nil) -// CollectionSchemaKey points to the current/'head' SchemaVersionId for -// the collection of the given schema id. -type CollectionSchemaKey struct { - SchemaId string +// CollectionNameKey points to the ID of the collection of the given +// name. +type CollectionNameKey struct { + Name string } -var _ Key = (*CollectionSchemaKey)(nil) +var _ Key = (*CollectionNameKey)(nil) -// CollectionSchemaVersionKey points to schema of a collection at a given -// version. +// CollectionSchemaVersionKey points to nil, but the keys/prefix can be used +// to get collections that are using, or have used a given schema version. +// +// If a collection is updated to a different schema version, the old entry(s) +// of this key will be preserved. +// +// This key should be removed in https://github.com/sourcenetwork/defradb/issues/1085 type CollectionSchemaVersionKey struct { SchemaVersionId string + CollectionID uint32 } var _ Key = (*CollectionSchemaVersionKey)(nil) @@ -255,21 +262,32 @@ func NewHeadStoreKey(key string) (HeadStoreKey, error) { // Returns a formatted collection key for the system data store. // It assumes the name of the collection is non-empty. -func NewCollectionKey(name string) CollectionKey { - return CollectionKey{CollectionName: name} +func NewCollectionKey(id uint32) CollectionKey { + return CollectionKey{CollectionID: id} } -func NewCollectionSchemaKey(schemaId string) CollectionSchemaKey { - return CollectionSchemaKey{SchemaId: schemaId} +func NewCollectionNameKey(name string) CollectionNameKey { + return CollectionNameKey{Name: name} } -func NewCollectionSchemaVersionKey(schemaVersionId string) CollectionSchemaVersionKey { - return CollectionSchemaVersionKey{SchemaVersionId: schemaVersionId} +func NewCollectionSchemaVersionKey(schemaVersionId string, collectionID uint32) CollectionSchemaVersionKey { + return CollectionSchemaVersionKey{ + SchemaVersionId: schemaVersionId, + CollectionID: collectionID, + } } -func NewCollectionSchemaVersionKeyFromString(key string) CollectionSchemaVersionKey { +func NewCollectionSchemaVersionKeyFromString(key string) (CollectionSchemaVersionKey, error) { elements := strings.Split(key, "/") - return CollectionSchemaVersionKey{SchemaVersionId: elements[len(elements)-1]} + colID, err := strconv.Atoi(elements[len(elements)-1]) + if err != nil { + return CollectionSchemaVersionKey{}, err + } + + return CollectionSchemaVersionKey{ + SchemaVersionId: elements[len(elements)-2], + CollectionID: uint32(colID), + }, nil } // NewCollectionIndexKey creates a new CollectionIndexKey from a collection name and index name. @@ -338,7 +356,7 @@ func NewSchemaVersionMigrationKey(schemaVersionID string) SchemaVersionMigration } func NewSchemaHistoryKeyFromString(keyString string) (SchemaHistoryKey, error) { - keyString = strings.TrimPrefix(keyString, COLLECTION_SCHEMA_VERSION_HISTORY+"/") + keyString = strings.TrimPrefix(keyString, SCHEMA_VERSION_HISTORY+"/") elements := strings.Split(keyString, "/") if len(elements) != 2 { return SchemaHistoryKey{}, ErrInvalidKey @@ -591,13 +609,7 @@ func (k PrimaryDataStoreKey) ToString() string { } func (k CollectionKey) ToString() string { - result := COLLECTION - - if k.CollectionName != "" { - result = result + "/" + k.CollectionName - } - - return result + return fmt.Sprintf("%s/%s", COLLECTION, strconv.Itoa(int(k.CollectionID))) } func (k CollectionKey) Bytes() []byte { @@ -608,21 +620,15 @@ func (k CollectionKey) ToDS() ds.Key { return ds.NewKey(k.ToString()) } -func (k CollectionSchemaKey) ToString() string { - result := COLLECTION_SCHEMA - - if k.SchemaId != "" { - result = result + "/" + k.SchemaId - } - - return result +func (k CollectionNameKey) ToString() string { + return fmt.Sprintf("%s/%s", COLLECTION_NAME, k.Name) } -func (k CollectionSchemaKey) Bytes() []byte { +func (k CollectionNameKey) Bytes() []byte { return []byte(k.ToString()) } -func (k CollectionSchemaKey) ToDS() ds.Key { +func (k CollectionNameKey) ToDS() ds.Key { return ds.NewKey(k.ToString()) } @@ -633,6 +639,10 @@ func (k CollectionSchemaVersionKey) ToString() string { result = result + "/" + k.SchemaVersionId } + if k.CollectionID != 0 { + result = fmt.Sprintf("%s/%s", result, strconv.Itoa(int(k.CollectionID))) + } + return result } @@ -663,7 +673,7 @@ func (k SchemaVersionKey) ToDS() ds.Key { } func (k SchemaHistoryKey) ToString() string { - result := COLLECTION_SCHEMA_VERSION_HISTORY + result := SCHEMA_VERSION_HISTORY if k.SchemaID != "" { result = result + "/" + k.SchemaID diff --git a/core/parser.go b/core/parser.go index 300f4411a4..61b19def1c 100644 --- a/core/parser.go +++ b/core/parser.go @@ -54,5 +54,7 @@ type Parser interface { ParseSDL(ctx context.Context, schemaString string) ([]client.CollectionDefinition, error) // Adds the given schema to this parser's model. + // + // All collections should be provided, not just new/updated ones. SetSchema(ctx context.Context, txn datastore.Txn, collections []client.CollectionDefinition) error } diff --git a/db/collection.go b/db/collection.go index feeff441c5..79c13b9905 100644 --- a/db/collection.go +++ b/db/collection.go @@ -13,7 +13,6 @@ package db import ( "bytes" "context" - "encoding/json" "fmt" "strconv" "strings" @@ -98,9 +97,7 @@ func (db *db) createCollection( schema := def.Schema desc := def.Description - // check if collection by this name exists - collectionKey := core.NewCollectionKey(desc.Name) - exists, err := txn.Systemstore().Has(ctx, collectionKey.ToDS()) + exists, err := description.HasCollectionByName(ctx, txn, desc.Name) if err != nil { return nil, err } @@ -124,27 +121,7 @@ func (db *db) createCollection( } desc.SchemaVersionID = schema.VersionID - // buffer must include all the ids, as it is saved and loaded from the store later. - buf, err := json.Marshal(desc) - if err != nil { - return nil, err - } - - collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schema.VersionID) - // Whilst the schemaVersionKey is global, the data persisted at the key's location - // is local to the node (the global only elements are not useful beyond key generation). - err = txn.Systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf) - if err != nil { - return nil, err - } - - collectionSchemaKey := core.NewCollectionSchemaKey(schema.SchemaID) - err = txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schema.VersionID)) - if err != nil { - return nil, err - } - - err = txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schema.VersionID)) + desc, err = description.SaveCollection(ctx, txn, desc) if err != nil { return nil, err } @@ -174,7 +151,7 @@ func (db *db) updateSchema( proposedDescriptionsByName map[string]client.SchemaDescription, schema client.SchemaDescription, setAsDefaultVersion bool, -) (client.Collection, error) { +) error { hasChanged, err := db.validateUpdateSchema( ctx, txn, @@ -183,11 +160,11 @@ func (db *db) updateSchema( schema, ) if err != nil { - return nil, err + return err } if !hasChanged { - return db.getCollectionByName(ctx, txn, schema.Name) + return nil } for _, field := range schema.Fields { @@ -212,39 +189,34 @@ func (db *db) updateSchema( } } + previousVersionID := schema.VersionID schema, err = description.CreateSchemaVersion(ctx, txn, schema) if err != nil { - return nil, err + return err } - col, err := db.getCollectionByName(ctx, txn, schema.Name) - if err != nil { - return nil, err - } - desc := col.Description() - desc.SchemaVersionID = schema.VersionID + if setAsDefaultVersion { + cols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, previousVersionID) + if err != nil { + return err + } - buf, err := json.Marshal(desc) - if err != nil { - return nil, err - } + for _, col := range cols { + col.SchemaVersionID = schema.VersionID - collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schema.VersionID) - // Whilst the schemaVersionKey is global, the data persisted at the key's location - // is local to the node (the global only elements are not useful beyond key generation). - err = txn.Systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf) - if err != nil { - return nil, err - } + col, err = description.SaveCollection(ctx, txn, col) + if err != nil { + return err + } - if setAsDefaultVersion { - err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, schema.SchemaID, schema.VersionID) - if err != nil { - return nil, err + err = db.setDefaultSchemaVersionExplicit(ctx, txn, col.Name, schema.VersionID) + if err != nil { + return err + } } } - return db.getCollectionByName(ctx, txn, desc.Name) + return nil } // validateUpdateSchema validates that the given schema description is a valid update. @@ -486,21 +458,28 @@ func (db *db) setDefaultSchemaVersion( txn datastore.Txn, schemaVersionID string, ) error { - // This call makes no sense at the moment, but needs to be done due to the bad way we currently store - // collections. - // https://github.com/sourcenetwork/defradb/issues/1964 - collections, err := db.getCollectionsByVersionID(ctx, txn, schemaVersionID) + if schemaVersionID == "" { + return ErrSchemaVersionIDEmpty + } + + schema, err := description.GetSchemaVersion(ctx, txn, schemaVersionID) if err != nil { return err } - col := collections[0] - desc := col.Description() - err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, col.Schema().SchemaID, schemaVersionID) + colDescs, err := description.GetCollectionsBySchemaID(ctx, txn, schema.SchemaID) if err != nil { return err } + for _, col := range colDescs { + col.SchemaVersionID = schemaVersionID + col, err = description.SaveCollection(ctx, txn, col) + if err != nil { + return err + } + } + cols, err := db.getAllCollections(ctx, txn) if err != nil { return err @@ -518,17 +497,21 @@ func (db *db) setDefaultSchemaVersionExplicit( ctx context.Context, txn datastore.Txn, collectionName string, - schemaID string, schemaVersionID string, ) error { - collectionSchemaKey := core.NewCollectionSchemaKey(schemaID) - err := txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionID)) + if schemaVersionID == "" { + return ErrSchemaVersionIDEmpty + } + + col, err := description.GetCollectionByName(ctx, txn, collectionName) if err != nil { return err } - collectionKey := core.NewCollectionKey(collectionName) - return txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionID)) + col.SchemaVersionID = schemaVersionID + + _, err = description.SaveCollection(ctx, txn, col) + return err } // getCollectionsByVersionId returns the [*collection]s at the given [schemaVersionId] version. @@ -539,35 +522,27 @@ func (db *db) getCollectionsByVersionID( txn datastore.Txn, schemaVersionId string, ) ([]*collection, error) { - if schemaVersionId == "" { - return nil, ErrSchemaVersionIDEmpty - } - - key := core.NewCollectionSchemaVersionKey(schemaVersionId) - buf, err := txn.Systemstore().Get(ctx, key.ToDS()) + cols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, schemaVersionId) if err != nil { return nil, err } - var desc client.CollectionDescription - err = json.Unmarshal(buf, &desc) - if err != nil { - return nil, err - } - - schema, err := description.GetSchemaVersion(ctx, txn, desc.SchemaVersionID) - if err != nil { - return nil, err - } + collections := make([]*collection, len(cols)) + for i, col := range cols { + schema, err := description.GetSchemaVersion(ctx, txn, col.SchemaVersionID) + if err != nil { + return nil, err + } - col := db.newCollection(desc, schema) + collections[i] = db.newCollection(col, schema) - err = col.loadIndexes(ctx, txn) - if err != nil { - return nil, err + err = collections[i].loadIndexes(ctx, txn) + if err != nil { + return nil, err + } } - return []*collection{col}, nil + return collections, nil } // getCollectionByName returns an existing collection within the database. @@ -576,25 +551,23 @@ func (db *db) getCollectionByName(ctx context.Context, txn datastore.Txn, name s return nil, ErrCollectionNameEmpty } - key := core.NewCollectionKey(name) - buf, err := txn.Systemstore().Get(ctx, key.ToDS()) + col, err := description.GetCollectionByName(ctx, txn, name) if err != nil { return nil, err } - schemaVersionId := string(buf) - // This call makes no sense at the moment, but needs to be done due to the bad way we currently store - // collections. - // https://github.com/sourcenetwork/defradb/issues/1964 - cols, err := db.getCollectionsByVersionID(ctx, txn, schemaVersionId) + schema, err := description.GetSchemaVersion(ctx, txn, col.SchemaVersionID) if err != nil { return nil, err } - if len(cols) == 0 { - return nil, NewErrFailedToGetCollection(schemaVersionId, err) + + collection := db.newCollection(col, schema) + err = collection.loadIndexes(ctx, txn) + if err != nil { + return nil, err } - return cols[0], nil + return collection, nil } // getCollectionsBySchemaID returns all existing collections using the schema hash ID. @@ -607,21 +580,25 @@ func (db *db) getCollectionsBySchemaID( return nil, ErrSchemaIDEmpty } - key := core.NewCollectionSchemaKey(schemaID) - buf, err := txn.Systemstore().Get(ctx, key.ToDS()) - if err != nil { - return nil, err - } - - schemaVersionId := string(buf) - cols, err := db.getCollectionsByVersionID(ctx, txn, schemaVersionId) + cols, err := description.GetCollectionsBySchemaID(ctx, txn, schemaID) if err != nil { return nil, err } collections := make([]client.Collection, len(cols)) for i, col := range cols { - collections[i] = col + schema, err := description.GetSchemaVersion(ctx, txn, col.SchemaVersionID) + if err != nil { + return nil, err + } + + collection := db.newCollection(col, schema) + collections[i] = collection + + err = collection.loadIndexes(ctx, txn) + if err != nil { + return nil, err + } } return collections, nil @@ -629,42 +606,28 @@ func (db *db) getCollectionsBySchemaID( // getAllCollections gets all the currently defined collections. func (db *db) getAllCollections(ctx context.Context, txn datastore.Txn) ([]client.Collection, error) { - // create collection system prefix query - prefix := core.NewCollectionKey("") - q, err := txn.Systemstore().Query(ctx, query.Query{ - Prefix: prefix.ToString(), - }) + cols, err := description.GetCollections(ctx, txn) if err != nil { - return nil, NewErrFailedToCreateCollectionQuery(err) + return nil, err } - defer func() { - if err := q.Close(); err != nil { - log.ErrorE(ctx, "Failed to close collection query", err) - } - }() - cols := make([]client.Collection, 0) - for res := range q.Next() { - if res.Error != nil { + collections := make([]client.Collection, len(cols)) + for i, col := range cols { + schema, err := description.GetSchemaVersion(ctx, txn, col.SchemaVersionID) + if err != nil { return nil, err } - schemaVersionId := string(res.Value) - // This call makes no sense at the moment, but needs to be done due to the bad way we currently store - // collections. - // https://github.com/sourcenetwork/defradb/issues/1964 - collections, err := db.getCollectionsByVersionID(ctx, txn, schemaVersionId) + collection := db.newCollection(col, schema) + collections[i] = collection + + err = collection.loadIndexes(ctx, txn) if err != nil { - return nil, NewErrFailedToGetCollection(schemaVersionId, err) - } - if len(collections) == 0 { - return nil, NewErrFailedToGetCollection(schemaVersionId, err) + return nil, err } - - cols = append(cols, collections[0]) } - return cols, nil + return collections, nil } // GetAllDocKeys returns all the document keys that exist in the collection. @@ -1266,7 +1229,7 @@ func (c *collection) saveValueToMerkleCRDT( merkleCRDT, err := c.db.crdtFactory.InstanceWithStores( txn, - core.NewCollectionSchemaVersionKey(schema.VersionID), + core.NewCollectionSchemaVersionKey(schema.VersionID, c.ID()), c.db.events.Updates, ctype, key, @@ -1291,7 +1254,7 @@ func (c *collection) saveValueToMerkleCRDT( key = key.WithFieldId(core.COMPOSITE_NAMESPACE) merkleCRDT, err := c.db.crdtFactory.InstanceWithStores( txn, - core.NewCollectionSchemaVersionKey(c.Schema().VersionID), + core.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()), c.db.events.Updates, ctype, key, diff --git a/db/description/collection.go b/db/description/collection.go new file mode 100644 index 0000000000..e4b78a8bd9 --- /dev/null +++ b/db/description/collection.go @@ -0,0 +1,230 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package description + +import ( + "context" + "encoding/json" + + "github.com/ipfs/go-datastore/query" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/core" + "github.com/sourcenetwork/defradb/datastore" +) + +// SaveCollection saves the given collection to the system store overwriting any +// pre-existing values. +func SaveCollection( + ctx context.Context, + txn datastore.Txn, + desc client.CollectionDescription, +) (client.CollectionDescription, error) { + buf, err := json.Marshal(desc) + if err != nil { + return client.CollectionDescription{}, err + } + + key := core.NewCollectionKey(desc.ID) + err = txn.Systemstore().Put(ctx, key.ToDS(), buf) + if err != nil { + return client.CollectionDescription{}, err + } + + idBuf, err := json.Marshal(desc.ID) + if err != nil { + return client.CollectionDescription{}, err + } + + nameKey := core.NewCollectionNameKey(desc.Name) + err = txn.Systemstore().Put(ctx, nameKey.ToDS(), idBuf) + if err != nil { + return client.CollectionDescription{}, err + } + + // The need for this key is temporary, we should replace it with the global collection ID + // https://github.com/sourcenetwork/defradb/issues/1085 + schemaVersionKey := core.NewCollectionSchemaVersionKey(desc.SchemaVersionID, desc.ID) + err = txn.Systemstore().Put(ctx, schemaVersionKey.ToDS(), []byte{}) + if err != nil { + return client.CollectionDescription{}, err + } + + return desc, nil +} + +// GetCollectionByName returns the collection with the given name. +// +// If no collection of that name is found, it will return an error. +func GetCollectionByName( + ctx context.Context, + txn datastore.Txn, + name string, +) (client.CollectionDescription, error) { + nameKey := core.NewCollectionNameKey(name) + idBuf, err := txn.Systemstore().Get(ctx, nameKey.ToDS()) + if err != nil { + return client.CollectionDescription{}, err + } + + var id uint32 + err = json.Unmarshal(idBuf, &id) + if err != nil { + return client.CollectionDescription{}, err + } + + key := core.NewCollectionKey(id) + buf, err := txn.Systemstore().Get(ctx, key.ToDS()) + if err != nil { + return client.CollectionDescription{}, err + } + + var col client.CollectionDescription + err = json.Unmarshal(buf, &col) + if err != nil { + return client.CollectionDescription{}, err + } + + return col, nil +} + +// GetCollectionsBySchemaVersionID returns all collections that use the given +// schemaVersionID. +// +// If no collections are found an empty set will be returned. +func GetCollectionsBySchemaVersionID( + ctx context.Context, + txn datastore.Txn, + schemaVersionID string, +) ([]client.CollectionDescription, error) { + schemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionID, 0) + + schemaVersionQuery, err := txn.Systemstore().Query(ctx, query.Query{ + Prefix: schemaVersionKey.ToString(), + KeysOnly: true, + }) + if err != nil { + return nil, NewErrFailedToCreateCollectionQuery(err) + } + + colIDs := make([]uint32, 0) + for res := range schemaVersionQuery.Next() { + if res.Error != nil { + if err := schemaVersionQuery.Close(); err != nil { + return nil, NewErrFailedToCloseSchemaQuery(err) + } + return nil, err + } + + colSchemaVersionKey, err := core.NewCollectionSchemaVersionKeyFromString(string(res.Key)) + if err != nil { + if err := schemaVersionQuery.Close(); err != nil { + return nil, NewErrFailedToCloseSchemaQuery(err) + } + return nil, err + } + + colIDs = append(colIDs, colSchemaVersionKey.CollectionID) + } + + cols := make([]client.CollectionDescription, len(colIDs)) + for i, colID := range colIDs { + key := core.NewCollectionKey(colID) + buf, err := txn.Systemstore().Get(ctx, key.ToDS()) + if err != nil { + return nil, err + } + + var col client.CollectionDescription + err = json.Unmarshal(buf, &col) + if err != nil { + return nil, err + } + + cols[i] = col + } + + return cols, nil +} + +// GetCollectionsBySchemaID returns all collections that use the given +// schemaID. +// +// If no collections are found an empty set will be returned. +func GetCollectionsBySchemaID( + ctx context.Context, + txn datastore.Txn, + schemaID string, +) ([]client.CollectionDescription, error) { + schemaVersionIDs, err := GetSchemaVersionIDs(ctx, txn, schemaID) + if err != nil { + return nil, err + } + + cols := []client.CollectionDescription{} + for _, schemaVersionID := range schemaVersionIDs { + versionCols, err := GetCollectionsBySchemaVersionID(ctx, txn, schemaVersionID) + if err != nil { + return nil, err + } + + cols = append(cols, versionCols...) + } + + return cols, nil +} + +// GetCollections returns all collections in the system. +func GetCollections( + ctx context.Context, + txn datastore.Txn, +) ([]client.CollectionDescription, error) { + q, err := txn.Systemstore().Query(ctx, query.Query{ + Prefix: core.COLLECTION, + }) + if err != nil { + return nil, NewErrFailedToCreateCollectionQuery(err) + } + + cols := make([]client.CollectionDescription, 0) + for res := range q.Next() { + if res.Error != nil { + if err := q.Close(); err != nil { + return nil, NewErrFailedToCloseCollectionQuery(err) + } + return nil, err + } + + var col client.CollectionDescription + err = json.Unmarshal(res.Value, &col) + if err != nil { + if err := q.Close(); err != nil { + return nil, NewErrFailedToCloseCollectionQuery(err) + } + return nil, err + } + + cols = append(cols, col) + } + + return cols, nil +} + +// HasCollectionByName returns true if there is a collection of the given name, +// else returns false. +func HasCollectionByName( + ctx context.Context, + txn datastore.Txn, + name string, +) (bool, error) { + nameKey := core.NewCollectionNameKey(name) + return txn.Systemstore().Has(ctx, nameKey.ToDS()) +} diff --git a/db/description/errors.go b/db/description/errors.go index 7ca524e81d..72bd63908a 100644 --- a/db/description/errors.go +++ b/db/description/errors.go @@ -13,8 +13,10 @@ package description import "github.com/sourcenetwork/defradb/errors" const ( - errFailedToCreateSchemaQuery string = "failed to create schema prefix query" - errFailedToCloseSchemaQuery string = "failed to close schema prefix query" + errFailedToCreateSchemaQuery string = "failed to create schema prefix query" + errFailedToCloseSchemaQuery string = "failed to close schema prefix query" + errFailedToCreateCollectionQuery string = "failed to create collection prefix query" + errFailedToCloseCollectionQuery string = "failed to close collection prefix query" ) // NewErrFailedToCreateSchemaQuery returns a new error indicating that the query @@ -28,3 +30,15 @@ func NewErrFailedToCreateSchemaQuery(inner error) error { func NewErrFailedToCloseSchemaQuery(inner error) error { return errors.Wrap(errFailedToCloseSchemaQuery, inner) } + +// NewErrFailedToCreateCollectionQuery returns a new error indicating that the query +// to create a collection failed. +func NewErrFailedToCreateCollectionQuery(inner error) error { + return errors.Wrap(errFailedToCreateCollectionQuery, inner) +} + +// NewErrFailedToCreateCollectionQuery returns a new error indicating that the query +// to create a collection failed to close. +func NewErrFailedToCloseCollectionQuery(inner error) error { + return errors.Wrap(errFailedToCloseCollectionQuery, inner) +} diff --git a/db/description/schema.go b/db/description/schema.go index 5504c11ccf..08e030cb65 100644 --- a/db/description/schema.go +++ b/db/description/schema.go @@ -110,29 +110,14 @@ func GetSchemas( ctx context.Context, txn datastore.Txn, ) ([]client.SchemaDescription, error) { - collectionSchemaVersionPrefix := core.NewCollectionSchemaVersionKey("") - collectionSchemaVersionQuery, err := txn.Systemstore().Query(ctx, query.Query{ - Prefix: collectionSchemaVersionPrefix.ToString(), - KeysOnly: true, - }) + cols, err := GetCollections(ctx, txn) if err != nil { - return nil, NewErrFailedToCreateSchemaQuery(err) + return nil, err } versionIDs := make([]string, 0) - for res := range collectionSchemaVersionQuery.Next() { - if res.Error != nil { - if err := collectionSchemaVersionQuery.Close(); err != nil { - return nil, NewErrFailedToCloseSchemaQuery(err) - } - return nil, err - } - - versionIDs = append(versionIDs, core.NewCollectionSchemaVersionKeyFromString(string(res.Key)).SchemaVersionId) - } - - if err := collectionSchemaVersionQuery.Close(); err != nil { - return nil, NewErrFailedToCloseSchemaQuery(err) + for _, col := range cols { + versionIDs = append(versionIDs, col.SchemaVersionID) } schemaVersionPrefix := core.NewSchemaVersionKey("") @@ -175,3 +160,43 @@ func GetSchemas( return descriptions, nil } + +func GetSchemaVersionIDs( + ctx context.Context, + txn datastore.Txn, + schemaID string, +) ([]string, error) { + // Add the schemaID as the first version here. + // It is not present in the history prefix. + schemaVersions := []string{schemaID} + + prefix := core.NewSchemaHistoryKey(schemaID, "") + q, err := txn.Systemstore().Query(ctx, query.Query{ + Prefix: prefix.ToString(), + KeysOnly: true, + }) + if err != nil { + return nil, NewErrFailedToCreateSchemaQuery(err) + } + + for res := range q.Next() { + if res.Error != nil { + if err := q.Close(); err != nil { + return nil, NewErrFailedToCloseSchemaQuery(err) + } + return nil, err + } + + key, err := core.NewSchemaHistoryKeyFromString(res.Key) + if err != nil { + if err := q.Close(); err != nil { + return nil, NewErrFailedToCloseSchemaQuery(err) + } + return nil, err + } + + schemaVersions = append(schemaVersions, key.PreviousSchemaVersionID) + } + + return schemaVersions, nil +} diff --git a/db/index_test.go b/db/index_test.go index 70619d70f6..e5682b551c 100644 --- a/db/index_test.go +++ b/db/index_test.go @@ -48,8 +48,6 @@ const ( testUsersColIndexName = "user_name" testUsersColIndexAge = "user_age" testUsersColIndexWeight = "user_weight" - - userColVersionID = "bafkreiefzlx2xsfaxixs24hcqwwqpa3nuqbutkapasymk3d5v4fxa4rlhy" ) type indexTestFixture struct { @@ -470,27 +468,6 @@ func TestCreateIndex_ShouldUpdateCollectionsDescription(t *testing.T) { f.users.Description().Indexes) } -func TestCreateIndex_NewCollectionDescription_ShouldIncludeIndexDescription(t *testing.T) { - f := newIndexTestFixture(t) - - _, err := f.createCollectionIndex(getUsersIndexDescOnName()) - require.NoError(t, err) - - desc := getUsersIndexDescOnAge() - desc.Name = "" - _, err = f.createCollectionIndex(desc) - require.NoError(t, err) - - cols, err := f.db.getAllCollections(f.ctx, f.txn) - require.NoError(t, err) - - require.Equal(t, 1, len(cols)) - col := cols[0] - require.Equal(t, 2, len(col.Description().Indexes)) - require.NotEmpty(t, col.Description().Indexes[0].Name) - require.NotEmpty(t, col.Description().Indexes[1].Name) -} - func TestCreateIndex_IfAttemptToIndexOnUnsupportedType_ReturnError(t *testing.T) { f := newIndexTestFixtureBare(t) @@ -521,36 +498,6 @@ func TestCreateIndex_IfAttemptToIndexOnUnsupportedType_ReturnError(t *testing.T) f.commitTxn() } -func TestCreateIndex_IfFailedToReadIndexUponRetrievingCollectionDesc_ReturnError(t *testing.T) { - f := newIndexTestFixture(t) - - testErr := errors.New("test error") - - mockedTxn := f.mockTxn().ClearSystemStore() - onSystemStore := mockedTxn.MockSystemstore.EXPECT() - - colIndexKey := core.NewCollectionIndexKey(f.users.Description().Name, "") - matchPrefixFunc := func(q query.Query) bool { - res := q.Prefix == colIndexKey.ToDS().String() - return res - } - - onSystemStore.Query(mock.Anything, mock.MatchedBy(matchPrefixFunc)).Return(nil, testErr) - - descData, err := json.Marshal(f.users.Description()) - require.NoError(t, err) - - onSystemStore.Query(mock.Anything, mock.Anything). - Return(mocks.NewQueryResultsWithValues(t, []byte("schemaID")), nil) - onSystemStore.Get(mock.Anything, mock.Anything).Unset() - onSystemStore.Get(mock.Anything, mock.Anything).Return(descData, nil) - - f.stubSystemStore(onSystemStore) - - _, err = f.db.getAllCollections(f.ctx, f.txn) - require.ErrorIs(t, err, testErr) -} - func TestGetIndexes_ShouldReturnListOfAllExistingIndexes(t *testing.T) { f := newIndexTestFixture(t) diff --git a/db/indexed_docs_test.go b/db/indexed_docs_test.go index f4d621282d..6503429c96 100644 --- a/db/indexed_docs_test.go +++ b/db/indexed_docs_test.go @@ -230,17 +230,6 @@ func (f *indexTestFixture) stubSystemStore(systemStoreOn *mocks.DSReaderWriter_E systemStoreOn.Query(mock.Anything, mock.Anything).Maybe(). Return(mocks.NewQueryResultsWithValues(f.t), nil) - colKey := core.NewCollectionKey(usersColName) - systemStoreOn.Get(mock.Anything, colKey.ToDS()).Maybe().Return([]byte(userColVersionID), nil) - - colVersionIDKey := core.NewCollectionSchemaVersionKey(userColVersionID) - usersCol, err := f.db.GetCollectionByName(f.ctx, usersColName) - require.NoError(f.t, err) - colDesc := usersCol.Description() - colDescBytes, err := json.Marshal(colDesc) - require.NoError(f.t, err) - systemStoreOn.Get(mock.Anything, colVersionIDKey.ToDS()).Maybe().Return(colDescBytes, nil) - colIndexOnNameKey := core.NewCollectionIndexKey(usersColName, testUsersColIndexName) systemStoreOn.Get(mock.Anything, colIndexOnNameKey.ToDS()).Maybe().Return(indexOnNameDescData, nil) diff --git a/db/schema.go b/db/schema.go index f5051f1a00..627602119b 100644 --- a/db/schema.go +++ b/db/schema.go @@ -136,9 +136,8 @@ func (db *db) patchSchema(ctx context.Context, txn datastore.Txn, patchString st return err } - newCollections := []client.CollectionDefinition{} for _, schema := range newSchemaByName { - col, err := db.updateSchema( + err := db.updateSchema( ctx, txn, existingSchemaByName, @@ -149,11 +148,19 @@ func (db *db) patchSchema(ctx context.Context, txn datastore.Txn, patchString st if err != nil { return err } + } + + newCollections, err := db.getAllCollections(ctx, txn) + if err != nil { + return err + } - newCollections = append(newCollections, col.Definition()) + definitions := make([]client.CollectionDefinition, len(newCollections)) + for i, col := range newCollections { + definitions[i] = col.Definition() } - return db.parser.SetSchema(ctx, txn, newCollections) + return db.parser.SetSchema(ctx, txn, definitions) } // substituteSchemaPatch handles any substitution of values that may be required before diff --git a/docs/data_format_changes/i1964-reorg-col-desc-storage.md b/docs/data_format_changes/i1964-reorg-col-desc-storage.md new file mode 100644 index 0000000000..6e88006230 --- /dev/null +++ b/docs/data_format_changes/i1964-reorg-col-desc-storage.md @@ -0,0 +1,3 @@ +# Reorganise collection description storage + +The way collection descriptions are stored and index in Defra has changed, please refer to https://github.com/sourcenetwork/defradb/pull/1988 and https://github.com/sourcenetwork/defradb/issues/1964 for more info. diff --git a/net/peer_collection.go b/net/peer_collection.go index 86b5d9b483..8b01bb4235 100644 --- a/net/peer_collection.go +++ b/net/peer_collection.go @@ -35,6 +35,9 @@ func (p *Peer) AddP2PCollections(ctx context.Context, collectionIDs []string) er if err != nil { return err } + if len(storeCol) == 0 { + return client.NewErrCollectionNotFoundForSchema(col) + } storeCollections = append(storeCollections, storeCol...) } @@ -97,6 +100,9 @@ func (p *Peer) RemoveP2PCollections(ctx context.Context, collectionIDs []string) if err != nil { return err } + if len(storeCol) == 0 { + return client.NewErrCollectionNotFoundForSchema(col) + } storeCollections = append(storeCollections, storeCol...) } diff --git a/net/process.go b/net/process.go index c07800b51f..e3d958c466 100644 --- a/net/process.go +++ b/net/process.go @@ -113,7 +113,7 @@ func initCRDTForType( log.Debug(ctx, "Got CRDT Type", logging.NewKV("CType", ctype), logging.NewKV("Field", field)) return crdt.DefaultFactory.InstanceWithStores( txn, - core.NewCollectionSchemaVersionKey(col.Schema().VersionID), + core.NewCollectionSchemaVersionKey(col.Schema().VersionID, col.ID()), events.EmptyUpdateChannel, ctype, key, diff --git a/tests/integration/backup/one_to_many/export_test.go b/tests/integration/backup/one_to_many/export_test.go index cbba06162b..328d48bd6d 100644 --- a/tests/integration/backup/one_to_many/export_test.go +++ b/tests/integration/backup/one_to_many/export_test.go @@ -57,7 +57,7 @@ func TestBackupExport_AllCollectionsMultipleDocsAndDocUpdate_NoError(t *testing. Doc: `{"age": 31}`, }, testUtils.BackupExport{ - ExpectedContent: `{"Book":[{"_key":"bae-5cf2fec3-d8ed-50d5-8286-39109853d2da","_newKey":"bae-edeade01-2d21-5d6d-aadf-efc5a5279de5","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"John and the sourcerers' stone"}],"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}]}`, + ExpectedContent: `{"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}],"Book":[{"_key":"bae-5cf2fec3-d8ed-50d5-8286-39109853d2da","_newKey":"bae-edeade01-2d21-5d6d-aadf-efc5a5279de5","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"John and the sourcerers' stone"}]}`, }, }, } @@ -90,7 +90,7 @@ func TestBackupExport_AllCollectionsMultipleDocsAndMultipleDocUpdate_NoError(t * Doc: `{"age": 31}`, }, testUtils.BackupExport{ - ExpectedContent: `{"Book":[{"_key":"bae-4399f189-138d-5d49-9e25-82e78463677b","_newKey":"bae-78a40f28-a4b8-5dca-be44-392b0f96d0ff","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"Game of chains"},{"_key":"bae-5cf2fec3-d8ed-50d5-8286-39109853d2da","_newKey":"bae-edeade01-2d21-5d6d-aadf-efc5a5279de5","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"John and the sourcerers' stone"}],"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}]}`, + ExpectedContent: `{"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}],"Book":[{"_key":"bae-4399f189-138d-5d49-9e25-82e78463677b","_newKey":"bae-78a40f28-a4b8-5dca-be44-392b0f96d0ff","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"Game of chains"},{"_key":"bae-5cf2fec3-d8ed-50d5-8286-39109853d2da","_newKey":"bae-edeade01-2d21-5d6d-aadf-efc5a5279de5","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"John and the sourcerers' stone"}]}`, }, }, } diff --git a/tests/integration/backup/one_to_one/export_test.go b/tests/integration/backup/one_to_one/export_test.go index 48700907a5..c5bb798643 100644 --- a/tests/integration/backup/one_to_one/export_test.go +++ b/tests/integration/backup/one_to_one/export_test.go @@ -57,7 +57,7 @@ func TestBackupExport_AllCollectionsMultipleDocsAndDocUpdate_NoError(t *testing. Doc: `{"age": 31}`, }, testUtils.BackupExport{ - ExpectedContent: `{"Book":[{"_key":"bae-5cf2fec3-d8ed-50d5-8286-39109853d2da","_newKey":"bae-edeade01-2d21-5d6d-aadf-efc5a5279de5","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"John and the sourcerers' stone"}],"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}]}`, + ExpectedContent: `{"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}],"Book":[{"_key":"bae-5cf2fec3-d8ed-50d5-8286-39109853d2da","_newKey":"bae-edeade01-2d21-5d6d-aadf-efc5a5279de5","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","name":"John and the sourcerers' stone"}]}`, }, }, } @@ -101,7 +101,7 @@ func TestBackupExport_DoubleReletionship_NoError(t *testing.T) { Doc: `{"age": 31}`, }, testUtils.BackupExport{ - ExpectedContent: `{"Book":[{"_key":"bae-45b1def4-4e63-5a93-a1b8-f7b08e682164","_newKey":"bae-add2ccfe-84a1-519c-ab7d-c54b43909532","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","favourite_id":"bae-0648f44e-74e8-593b-a662-3310ec278927","name":"John and the sourcerers' stone"}],"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}]}`, + ExpectedContent: `{"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}],"Book":[{"_key":"bae-45b1def4-4e63-5a93-a1b8-f7b08e682164","_newKey":"bae-add2ccfe-84a1-519c-ab7d-c54b43909532","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","favourite_id":"bae-0648f44e-74e8-593b-a662-3310ec278927","name":"John and the sourcerers' stone"}]}`, }, }, } @@ -149,7 +149,7 @@ func TestBackupExport_DoubleReletionshipWithUpdate_NoError(t *testing.T) { Doc: `{"age": 31}`, }, testUtils.BackupExport{ - ExpectedContent: `{"Book":[{"_key":"bae-45b1def4-4e63-5a93-a1b8-f7b08e682164","_newKey":"bae-add2ccfe-84a1-519c-ab7d-c54b43909532","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","favourite_id":"bae-0648f44e-74e8-593b-a662-3310ec278927","name":"John and the sourcerers' stone"},{"_key":"bae-da7f2d88-05c4-528a-846a-0d18ab26603b","_newKey":"bae-da7f2d88-05c4-528a-846a-0d18ab26603b","name":"Game of chains"}],"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}]}`, + ExpectedContent: `{"User":[{"_key":"bae-0648f44e-74e8-593b-a662-3310ec278927","_newKey":"bae-0648f44e-74e8-593b-a662-3310ec278927","age":31,"name":"Bob"},{"_key":"bae-e933420a-988a-56f8-8952-6c245aebd519","_newKey":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","age":31,"name":"John"}],"Book":[{"_key":"bae-45b1def4-4e63-5a93-a1b8-f7b08e682164","_newKey":"bae-add2ccfe-84a1-519c-ab7d-c54b43909532","author_id":"bae-807ea028-6c13-5f86-a72b-46e8b715a162","favourite_id":"bae-0648f44e-74e8-593b-a662-3310ec278927","name":"John and the sourcerers' stone"},{"_key":"bae-da7f2d88-05c4-528a-846a-0d18ab26603b","_newKey":"bae-da7f2d88-05c4-528a-846a-0d18ab26603b","name":"Game of chains"}]}`, }, }, } diff --git a/tests/integration/net/state/simple/peer/subscribe/with_add_remove_test.go b/tests/integration/net/state/simple/peer/subscribe/with_add_remove_test.go index d3795ee0e1..26dbfdf151 100644 --- a/tests/integration/net/state/simple/peer/subscribe/with_add_remove_test.go +++ b/tests/integration/net/state/simple/peer/subscribe/with_add_remove_test.go @@ -170,7 +170,7 @@ func TestP2PSubscribeAddSingleAndRemoveErroneous(t *testing.T) { testUtils.UnsubscribeToCollection{ NodeID: 1, CollectionIDs: []int{0, testUtils.NonExistentCollectionID}, - ExpectedError: "datastore: key not found", + ExpectedError: "collection not found", }, testUtils.CreateDoc{ NodeID: immutable.Some(0), diff --git a/tests/integration/net/state/simple/peer/subscribe/with_add_test.go b/tests/integration/net/state/simple/peer/subscribe/with_add_test.go index 8cd294e98a..04a1a3d57c 100644 --- a/tests/integration/net/state/simple/peer/subscribe/with_add_test.go +++ b/tests/integration/net/state/simple/peer/subscribe/with_add_test.go @@ -199,7 +199,7 @@ func TestP2PSubscribeAddSingleErroneousCollectionID(t *testing.T) { testUtils.SubscribeToCollection{ NodeID: 1, CollectionIDs: []int{testUtils.NonExistentCollectionID}, - ExpectedError: "datastore: key not found", + ExpectedError: "collection not found", }, testUtils.CreateDoc{ NodeID: immutable.Some(0), @@ -243,7 +243,7 @@ func TestP2PSubscribeAddValidAndErroneousCollectionID(t *testing.T) { testUtils.SubscribeToCollection{ NodeID: 1, CollectionIDs: []int{0, testUtils.NonExistentCollectionID}, - ExpectedError: "datastore: key not found", + ExpectedError: "collection not found", }, testUtils.CreateDoc{ NodeID: immutable.Some(0), @@ -292,7 +292,7 @@ func TestP2PSubscribeAddValidThenErroneousCollectionID(t *testing.T) { testUtils.SubscribeToCollection{ NodeID: 1, CollectionIDs: []int{testUtils.NonExistentCollectionID}, - ExpectedError: "datastore: key not found", + ExpectedError: "collection not found", }, testUtils.CreateDoc{ NodeID: immutable.Some(0),