Skip to content

Commit

Permalink
Use initial schema during bulk load. (dgraph-io#3333)
Browse files Browse the repository at this point in the history
Use the initial schema during bulk load, otherwise data might not be
loaded correctly for those predicates.

In particular, I observed that when loading the 21million dataset with
dgraph.type triples added to it, queries would not respond and would
eventually timeout. I figured this was because the index for that
predicate was not built during the bulkload. I reloaded the dataset with
my changes included and queries respond immediately.
  • Loading branch information
martinmr authored and dna2github committed Jul 19, 2019
1 parent d6ebfc8 commit ec40170
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
10 changes: 10 additions & 0 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
wk "github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
)
Expand All @@ -45,6 +46,15 @@ func newSchemaStore(initial []*pb.SchemaUpdate, opt options, state *state) *sche
},
state: state,
}

// Load all initial predicates. Some predicates that might not be used when
// the alpha is started (e.g ACL predicates) might be included but it's
// better to include them in case the input data contains triples with these
// predicates.
for _, update := range schema.CompleteInitialSchema() {
s.m[update.Predicate] = update
}

if opt.StoreXids {
s.m["xid"] = &pb.SchemaUpdate{
ValueType: pb.Posting_STRING,
Expand Down
18 changes: 17 additions & 1 deletion schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,23 @@ func LoadTypesFromDb() error {
return nil
}

// InitialSchema returns the schema updates to insert at the beginning of
// Dgraph's execution. It looks at the worker options to determine which
// attributes to insert.
func InitialSchema() []*pb.SchemaUpdate {
return initialSchemaInternal(false)
}

// CompleteInitialSchema returns all the schema updates regardless of the worker
// options. This is useful in situations where the worker options are not known
// in advance and it's better to create all the reserved predicates and remove
// them later than miss some of them. An example of such situation is during bulk
// loading.
func CompleteInitialSchema() []*pb.SchemaUpdate {
return initialSchemaInternal(true)
}

func initialSchemaInternal(all bool) []*pb.SchemaUpdate {
var initialSchema []*pb.SchemaUpdate

// propose the schema for _predicate_
Expand All @@ -428,7 +444,7 @@ func InitialSchema() []*pb.SchemaUpdate {
List: true,
})

if x.WorkerConfig.AclEnabled {
if all || x.WorkerConfig.AclEnabled {
// propose the schema update for acl predicates
initialSchema = append(initialSchema, []*pb.SchemaUpdate{
{
Expand Down

0 comments on commit ec40170

Please sign in to comment.