From ddb0d76fe46755595ef0a197e0d2a07ee7ea08f3 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 26 Feb 2019 16:12:08 -0800 Subject: [PATCH] Introduce DropPrefix API into Dgraph (#3060) * Introduce Badger's new DropPrefix API into Dgraph * Simplify DeleteAll logic. DeleteAll logic now just calls DropAll. The logic in draft.go takes care of readding the reserved predicates. --- conn/node.go | 2 +- dgraph/cmd/alpha/admin.go | 2 +- dgraph/cmd/alpha/run.go | 18 +- dgraph/cmd/alpha/run_test.go | 4 +- dgraph/cmd/zero/oracle.go | 2 +- dgraph/cmd/zero/zero.go | 11 +- edgraph/config.go | 7 +- posting/index.go | 65 +----- query/common_test.go | 9 + query/mutation.go | 4 +- query/query.go | 4 +- query/query4_test.go | 201 ++++++++++++++++++ schema/schema.go | 54 ++++- systest/mutations_test.go | 15 ++ vendor/github.com/dgraph-io/badger/README.md | 72 +++++-- .../github.com/dgraph-io/badger/compaction.go | 4 +- vendor/github.com/dgraph-io/badger/db.go | 165 +++++++++----- vendor/github.com/dgraph-io/badger/errors.go | 2 +- .../dgraph-io/badger/level_handler.go | 59 +++-- vendor/github.com/dgraph-io/badger/levels.go | 169 +++++++++++---- vendor/github.com/dgraph-io/badger/logger.go | 63 ++---- .../github.com/dgraph-io/badger/manifest.go | 29 +-- vendor/github.com/dgraph-io/badger/options.go | 7 +- .../github.com/dgraph-io/badger/pb/pb.pb.go | 150 ++++++++----- .../github.com/dgraph-io/badger/pb/pb.proto | 5 +- .../dgraph-io/badger/table/table.go | 96 +++++---- vendor/github.com/dgraph-io/badger/test.sh | 9 +- vendor/github.com/dgraph-io/badger/txn.go | 16 +- .../dgraph-io/badger/y/watermark.go | 54 +++-- vendor/github.com/dgraph-io/badger/y/y.go | 9 + vendor/vendor.json | 40 ++-- worker/config.go | 40 ---- worker/draft.go | 23 +- worker/export.go | 2 +- worker/export_test.go | 2 +- worker/groups.go | 84 ++------ worker/mutation.go | 4 +- worker/worker.go | 2 +- x/config.go | 23 ++ x/keys.go | 11 - 40 files changed, 985 insertions(+), 553 deletions(-) create mode 100644 query/query4_test.go delete mode 100644 worker/config.go diff --git a/conn/node.go b/conn/node.go index 44f923247a9..c7ee041ccb8 100644 --- a/conn/node.go +++ b/conn/node.go @@ -145,7 +145,7 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node { peers: make(map[uint64]string), requestCh: make(chan linReadReq), } - n.Applied.Init() + n.Applied.Init(nil) // This should match up to the Applied index set above. n.Applied.SetDoneUntil(n.Cfg.Applied) glog.Infof("Setting raft.Config to: %+v\n", n.Cfg) diff --git a/dgraph/cmd/alpha/admin.go b/dgraph/cmd/alpha/admin.go index 549061fae73..9e4a528054a 100644 --- a/dgraph/cmd/alpha/admin.go +++ b/dgraph/cmd/alpha/admin.go @@ -120,7 +120,7 @@ func ipInIPWhitelistRanges(ipString string) bool { return false } - for _, ipRange := range worker.Config.WhiteListedIPRanges { + for _, ipRange := range x.WorkerConfig.WhiteListedIPRanges { if bytes.Compare(ip, ipRange.Lower) >= 0 && bytes.Compare(ip, ipRange.Upper) <= 0 { return true } diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 749f782131f..f27aa20826b 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -182,12 +182,12 @@ func setupCustomTokenizers() { // and returns a slice of []IPRange. // // e.g. "144.142.126.222:144.142.126.244,144.142.126.254,192.168.0.0/16,host.docker.internal" -func getIPsFromString(str string) ([]worker.IPRange, error) { +func getIPsFromString(str string) ([]x.IPRange, error) { if str == "" { - return []worker.IPRange{}, nil + return []x.IPRange{}, nil } - var ipRanges []worker.IPRange + var ipRanges []x.IPRange rangeStrings := strings.Split(str, ",") for _, s := range rangeStrings { @@ -201,7 +201,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) { // or IPv6 address like fd03:b188:0f3c:9ec4::babe:face ipAddr := net.ParseIP(s) if ipAddr != nil { - ipRanges = append(ipRanges, worker.IPRange{Lower: ipAddr, Upper: ipAddr}) + ipRanges = append(ipRanges, x.IPRange{Lower: ipAddr, Upper: ipAddr}) } else { ipAddrs, err := net.LookupIP(s) if err != nil { @@ -209,7 +209,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) { } for _, addr := range ipAddrs { - ipRanges = append(ipRanges, worker.IPRange{Lower: addr, Upper: addr}) + ipRanges = append(ipRanges, x.IPRange{Lower: addr, Upper: addr}) } } } else { @@ -226,7 +226,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) { rangeHi[addrLen-i] |= ^network.Mask[maskLen-i] } - ipRanges = append(ipRanges, worker.IPRange{Lower: rangeLo, Upper: rangeHi}) + ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi}) } case len(tuple) == 2: // string is range like a.b.c.d:w.x.y.z @@ -239,7 +239,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) { } else if bytes.Compare(rangeLo, rangeHi) > 0 { return nil, fmt.Errorf("inverted IP address range: %s", s) } else { - ipRanges = append(ipRanges, worker.IPRange{Lower: rangeLo, Upper: rangeHi}) + ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi}) } default: return nil, fmt.Errorf("invalid IP address range: %s", s) @@ -482,7 +482,7 @@ func run() { ips, err := getIPsFromString(Alpha.Conf.GetString("whitelist")) x.Check(err) - worker.Config = worker.Options{ + x.WorkerConfig = x.WorkerOptions{ ExportPath: Alpha.Conf.GetString("export"), NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"), Tracing: Alpha.Conf.GetFloat64("trace"), @@ -519,7 +519,7 @@ func run() { } } otrace.ApplyConfig(otrace.Config{ - DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)}) + DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Tracing)}) // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). diff --git a/dgraph/cmd/alpha/run_test.go b/dgraph/cmd/alpha/run_test.go index 3e599616ddb..ebaf1094755 100644 --- a/dgraph/cmd/alpha/run_test.go +++ b/dgraph/cmd/alpha/run_test.go @@ -36,7 +36,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/schema" - "github.com/dgraph-io/dgraph/worker" + "github.com/dgraph-io/dgraph/x" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1674,7 +1674,7 @@ func TestTypeMutationAndQuery(t *testing.T) { } func TestIPStringParsing(t *testing.T) { - var addrRange []worker.IPRange + var addrRange []x.IPRange var err error addrRange, err = getIPsFromString("144.142.126.222:144.142.126.244") diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 4267c3a453a..401bd09dc2b 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -60,7 +60,7 @@ func (o *Oracle) Init() { o.keyCommit = make(map[string]uint64) o.subscribers = make(map[int]chan *pb.OracleDelta) o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates. - o.doneUntil.Init() + o.doneUntil.Init(nil) go o.sendDeltasToSubscribers() } diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index fd472243ec1..28aa625cdc6 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -553,10 +553,13 @@ func (s *Server) ShouldServe( var proposal pb.ZeroProposal // Multiple Groups might be assigned to same tablet, so during proposal we will check again. tablet.Force = false - if x.IsAclPredicate(tablet.Predicate) { - // force all the acl predicates to be allocated to group 1 - // this is to make it eaiser to stream ACL updates to all alpha servers - // since they only need to open one pipeline to receive updates for all ACL predicates + if x.IsReservedPredicate(tablet.Predicate) { + // Force all the reserved predicates to be allocated to group 1. + // This is to make it eaiser to stream ACL updates to all alpha servers + // since they only need to open one pipeline to receive updates for all + // ACL predicates. + // This will also make it easier to restore the reserved predicates after + // a DropAll operation. tablet.GroupId = 1 } proposal.Tablet = tablet diff --git a/edgraph/config.go b/edgraph/config.go index 9144d53c394..2c4c9a3f8f5 100644 --- a/edgraph/config.go +++ b/edgraph/config.go @@ -22,7 +22,6 @@ import ( "time" "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" ) @@ -89,9 +88,9 @@ func setConfVar(conf Options) { x.Conf.Set("allotted_memory", newFloat(conf.AllottedMemory)) // Set some vars from worker.Config. - x.Conf.Set("tracing", newFloat(worker.Config.Tracing)) - x.Conf.Set("num_pending_proposals", newInt(worker.Config.NumPendingProposals)) - x.Conf.Set("expand_edge", newIntFromBool(worker.Config.ExpandEdge)) + x.Conf.Set("tracing", newFloat(x.WorkerConfig.Tracing)) + x.Conf.Set("num_pending_proposals", newInt(x.WorkerConfig.NumPendingProposals)) + x.Conf.Set("expand_edge", newIntFromBool(x.WorkerConfig.ExpandEdge)) } func SetConfiguration(newConfig Options) { diff --git a/posting/index.go b/posting/index.go index df284c72a83..ee0df6d98ec 100644 --- a/posting/index.go +++ b/posting/index.go @@ -445,20 +445,6 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error { }) } -func deleteAllEntries(prefix []byte) error { - return deleteEntries(prefix, func(key []byte) bool { - return true - }) -} - -// deleteAllTokens deletes the index for the given attribute. All tokenizers are -// used by this function. -func deleteAllTokens(attr string) error { - pk := x.ParsedKey{Attr: attr} - prefix := pk.IndexPrefix() - return deleteAllEntries(prefix) -} - // deleteTokensFor deletes the index for the given attribute and token. func deleteTokensFor(attr, tokenizerName string) error { pk := x.ParsedKey{Attr: attr} @@ -469,22 +455,21 @@ func deleteTokensFor(attr, tokenizerName string) error { } prefix = append(prefix, tokenizer.Identifier()) - return deleteAllEntries(prefix) + return pstore.DropPrefix(prefix) } func deleteReverseEdges(attr string) error { pk := x.ParsedKey{Attr: attr} prefix := pk.ReversePrefix() - return deleteAllEntries(prefix) + return pstore.DropPrefix(prefix) } func deleteCountIndex(attr string) error { pk := x.ParsedKey{Attr: attr} - if err := deleteAllEntries(pk.CountPrefix(false)); err != nil { + if err := pstore.DropPrefix(pk.CountPrefix(false)); err != nil { return err } - - return deleteAllEntries(pk.CountPrefix(true)) + return pstore.DropPrefix(pk.CountPrefix(true)) } // Index rebuilding logic here. @@ -983,52 +968,16 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error { // DeleteAll deletes all entries in the posting list. func DeleteAll() error { - return deleteEntries(nil, func(key []byte) bool { - pk := x.Parse(key) - if pk == nil { - return true - } else if pk.IsSchema() { - // Don't delete schema for _predicate_ - return !x.IsReservedPredicate(pk.Attr) - } - return true - }) + return pstore.DropAll() } // DeletePredicate deletes all entries and indices for a given predicate. func DeletePredicate(ctx context.Context, attr string) error { glog.Infof("Dropping predicate: [%s]", attr) - pk := x.ParsedKey{ - Attr: attr, - } - prefix := pk.DataPrefix() - // Delete all data postings for the given predicate. - err := deleteEntries(prefix, func(key []byte) bool { - return true - }) - if err != nil { + prefix := x.PredicatePrefix(attr) + if err := pstore.DropPrefix(prefix); err != nil { return err } - // TODO - We will still have the predicate present in posting lists. - indexed := schema.State().IsIndexed(attr) - reversed := schema.State().IsReversed(attr) - if indexed { - if err := deleteAllTokens(attr); err != nil { - return err - } - } else if reversed { - if err := deleteReverseEdges(attr); err != nil { - return err - } - } - - hasCountIndex := schema.State().HasCount(attr) - if hasCountIndex { - if err := deleteCountIndex(attr); err != nil { - return err - } - } - return schema.State().Delete(attr) } diff --git a/query/common_test.go b/query/common_test.go index 855853607df..f2f7ed01fbb 100644 --- a/query/common_test.go +++ b/query/common_test.go @@ -57,6 +57,15 @@ func setSchema(schema string) { } } +func dropPredicate(pred string) { + err := client.Alter(context.Background(), &api.Operation{ + DropAttr: pred, + }) + if err != nil { + panic(fmt.Sprintf("Could not drop predicate. Got error %v", err.Error())) + } +} + func processQuery(t *testing.T, ctx context.Context, query string) (string, error) { txn := client.NewTxn() defer txn.Discard(ctx) diff --git a/query/mutation.go b/query/mutation.go index be2e2d13962..55022e85722 100644 --- a/query/mutation.go +++ b/query/mutation.go @@ -34,7 +34,7 @@ import ( ) func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) { - if worker.Config.ExpandEdge { + if x.WorkerConfig.ExpandEdge { edges, err := expandEdges(ctx, m) if err != nil { return nil, x.Wrapf(err, "While adding pb.edges") @@ -42,7 +42,7 @@ func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, erro m.Edges = edges } else { for _, mu := range m.Edges { - if mu.Attr == x.Star && !worker.Config.ExpandEdge { + if mu.Attr == x.Star && !x.WorkerConfig.ExpandEdge { return nil, x.Errorf("Expand edge (--expand_edge) is set to false." + " Cannot perform S * * deletion.") } diff --git a/query/query.go b/query/query.go index 753e3c1687e..0f7361b6582 100644 --- a/query/query.go +++ b/query/query.go @@ -1852,7 +1852,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) { for i := 0; i < len(sg.Children); i++ { child := sg.Children[i] - if !worker.Config.ExpandEdge && child.Attr == "_predicate_" { + if !x.WorkerConfig.ExpandEdge && child.Attr == "_predicate_" { return out, x.Errorf("Cannot ask for _predicate_ when ExpandEdge(--expand_edge) is false.") } @@ -1862,7 +1862,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) { continue } - if !worker.Config.ExpandEdge { + if !x.WorkerConfig.ExpandEdge { return out, x.Errorf("Cannot run expand() query when ExpandEdge(--expand_edge) is false.") } diff --git a/query/query4_test.go b/query/query4_test.go new file mode 100644 index 00000000000..6a1fcb1b994 --- /dev/null +++ b/query/query4_test.go @@ -0,0 +1,201 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package query + +import ( + "context" + // "encoding/json" + // "fmt" + // "strings" + "testing" + + "github.com/stretchr/testify/require" + // "google.golang.org/grpc/metadata" +) + +func TestDeleteAndReaddIndex(t *testing.T) { + // Add new predicate with several indices. + s1 := testSchema + "\n numerology: string @index(exact, term, fulltext) .\n" + setSchema(s1) + triples := ` + <0x666> "This number is evil" . + <0x777> "This number is good" . + ` + addTriplesToCluster(triples) + + // Verify fulltext index works as expected. + q1 := ` + { + me(func: anyoftext(numerology, "numbers")) { + uid + numerology + } + }` + js := processQueryNoErr(t, q1) + require.JSONEq(t, `{"data": {"me": [ + {"uid": "0x666", "numerology": "This number is evil"}, + {"uid": "0x777", "numerology": "This number is good"} + ]}}`, js) + + // Remove the fulltext index and verify the previous query is no longer supported. + s2 := testSchema + "\n numerology: string @index(exact, term) .\n" + setSchema(s2) + _, err := processQuery(t, context.Background(), q1) + require.Error(t, err) + require.Contains(t, err.Error(), "Attribute numerology is not indexed with type fulltext") + + // Verify term index still works as expected. + q2 := ` + { + me(func: anyofterms(numerology, "number")) { + uid + numerology + } + }` + js = processQueryNoErr(t, q2) + require.JSONEq(t, `{"data": {"me": [ + {"uid": "0x666", "numerology": "This number is evil"}, + {"uid": "0x777", "numerology": "This number is good"} + ]}}`, js) + + // Re-add index and verify that the original query works again. + setSchema(s1) + js = processQueryNoErr(t, q1) + require.JSONEq(t, `{"data": {"me": [ + {"uid": "0x666", "numerology": "This number is evil"}, + {"uid": "0x777", "numerology": "This number is good"} + ]}}`, js) + + // Finally, drop the predicate and restore schema. + dropPredicate("numerology") + setSchema(testSchema) +} + +func TestDeleteAndReaddCount(t *testing.T) { + // Add new predicate with count index. + s1 := testSchema + "\n numerology: string @count .\n" + setSchema(s1) + triples := ` + <0x666> "This number is evil" . + <0x777> "This number is good" . + ` + addTriplesToCluster(triples) + + // Verify count index works as expected. + q1 := ` + { + me(func: gt(count(numerology), 0)) { + uid + numerology + } + }` + js := processQueryNoErr(t, q1) + require.JSONEq(t, `{"data": {"me": [ + {"uid": "0x666", "numerology": "This number is evil"}, + {"uid": "0x777", "numerology": "This number is good"} + ]}}`, js) + + // Remove the count index and verify the previous query is no longer supported. + s2 := testSchema + "\n numerology: string .\n" + setSchema(s2) + _, err := processQuery(t, context.Background(), q1) + require.Error(t, err) + require.Contains(t, err.Error(), "Need @count directive in schema for attr: numerology") + + // Re-add count index and verify that the original query works again. + setSchema(s1) + js = processQueryNoErr(t, q1) + require.JSONEq(t, `{"data": {"me": [ + {"uid": "0x666", "numerology": "This number is evil"}, + {"uid": "0x777", "numerology": "This number is good"} + ]}}`, js) + + // Finally, drop the predicate and restore schema. + dropPredicate("numerology") + setSchema(testSchema) +} + +func TestDeleteAndReaddReverse(t *testing.T) { + // Add new predicate with a reverse edge. + s1 := testSchema + "\n child_pred: uid @reverse .\n" + setSchema(s1) + triples := `<0x666> <0x777> .` + addTriplesToCluster(triples) + + // Verify reverse edges works as expected. + q1 := ` + { + me(func: uid(0x777)) { + ~child_pred { + uid + } + } + }` + js := processQueryNoErr(t, q1) + require.JSONEq(t, `{"data": {"me": [{"~child_pred": [{"uid": "0x666"}]}]}}`, js) + + // Remove the reverse edges and verify the previous query is no longer supported. + s2 := testSchema + "\n child_pred: uid .\n" + setSchema(s2) + _, err := processQuery(t, context.Background(), q1) + require.Error(t, err) + require.Contains(t, err.Error(), "Predicate child_pred doesn't have reverse edge") + + // Re-add reverse edges and verify that the original query works again. + setSchema(s1) + js = processQueryNoErr(t, q1) + require.JSONEq(t, `{"data": {"me": [{"~child_pred": [{"uid": "0x666"}]}]}}`, js) + + // Finally, drop the predicate and restore schema. + dropPredicate("child_pred") + setSchema(testSchema) +} + +func TestDropPredicate(t *testing.T) { + // Add new predicate with several indices. + s1 := testSchema + "\n numerology: string @index(term) .\n" + setSchema(s1) + triples := ` + <0x666> "This number is evil" . + <0x777> "This number is good" . + ` + addTriplesToCluster(triples) + + // Verify queries work as expected. + q1 := ` + { + me(func: anyofterms(numerology, "number")) { + uid + numerology + } + }` + js := processQueryNoErr(t, q1) + require.JSONEq(t, `{"data": {"me": [ + {"uid": "0x666", "numerology": "This number is evil"}, + {"uid": "0x777", "numerology": "This number is good"} + ]}}`, js) + + // Finally, drop the predicate and verify the query no longer works because + // the index was dropped when all the data for that predicate was deleted. + dropPredicate("numerology") + _, err := processQuery(t, context.Background(), q1) + require.Error(t, err) + require.Contains(t, err.Error(), "Attribute numerology is not indexed with type term") + + // Finally, restore the schema. + setSchema(testSchema) +} diff --git a/schema/schema.go b/schema/schema.go index 5875f171009..0ed6f8e0e91 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -58,11 +58,7 @@ func (s *state) DeleteAll() { defer s.Unlock() for pred := range s.predicate { - // Predicates in x.InitialPreds represent internal predicates which - // shouldn't be dropped. - if !x.IsReservedPredicate(pred) { - delete(s.predicate, pred) - } + delete(s.predicate, pred) } } @@ -307,6 +303,54 @@ func LoadFromDb() error { return nil } +func InitialSchema() []*pb.SchemaUpdate { + var initialSchema []*pb.SchemaUpdate + + // propose the schema for _predicate_ + if x.WorkerConfig.ExpandEdge { + initialSchema = append(initialSchema, &pb.SchemaUpdate{ + Predicate: x.PredicateListAttr, + ValueType: pb.Posting_STRING, + List: true, + }) + } + + initialSchema = append(initialSchema, &pb.SchemaUpdate{ + Predicate: "type", + ValueType: pb.Posting_STRING, + Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"exact"}, + }) + + if x.WorkerConfig.AclEnabled { + // propose the schema update for acl predicates + initialSchema = append(initialSchema, []*pb.SchemaUpdate{ + &pb.SchemaUpdate{ + Predicate: "dgraph.xid", + ValueType: pb.Posting_STRING, + Directive: pb.SchemaUpdate_INDEX, + Upsert: true, + Tokenizer: []string{"exact"}, + }, + &pb.SchemaUpdate{ + Predicate: "dgraph.password", + ValueType: pb.Posting_PASSWORD, + }, + &pb.SchemaUpdate{ + Predicate: "dgraph.user.group", + Directive: pb.SchemaUpdate_REVERSE, + ValueType: pb.Posting_UID, + List: true, + }, + &pb.SchemaUpdate{ + Predicate: "dgraph.group.acl", + ValueType: pb.Posting_STRING, + }}...) + } + + return initialSchema +} + func reset() { pstate = new(state) pstate.init() diff --git a/systest/mutations_test.go b/systest/mutations_test.go index 044a72a00b8..aa2f5ce7855 100644 --- a/systest/mutations_test.go +++ b/systest/mutations_test.go @@ -80,6 +80,7 @@ func TestSystem(t *testing.T) { t.Run("has should have reverse edges", wrap(HasReverseEdge)) t.Run("facet json input supports anyofterms query", wrap(FacetJsonInputSupportsAnyOfTerms)) t.Run("max predicate size", wrap(MaxPredicateSize)) + t.Run("restore reserved preds", wrap(RestoreReservedPreds)) } func FacetJsonInputSupportsAnyOfTerms(t *testing.T, c *dgo.Dgraph) { @@ -1716,3 +1717,17 @@ func MaxPredicateSize(t *testing.T, c *dgo.Dgraph) { require.Error(t, err) require.Contains(t, err.Error(), "Predicate name length cannot be bigger than 2^16") } + +func RestoreReservedPreds(t *testing.T, c *dgo.Dgraph) { + ctx := context.Background() + err := c.Alter(ctx, &api.Operation{ + DropAll: true, + }) + require.NoError(t, err) + + // Verify that the reserved predicates were restored to the schema. + query := `schema(preds: type) {predicate}` + resp, err := c.NewReadOnlyTxn().Query(ctx, query) + require.NoError(t, err) + CompareJSON(t, `{"schema": [{"predicate":"type"}]}`, string(resp.Json)) +} diff --git a/vendor/github.com/dgraph-io/badger/README.md b/vendor/github.com/dgraph-io/badger/README.md index da57f89f5e1..6f483a10079 100644 --- a/vendor/github.com/dgraph-io/badger/README.md +++ b/vendor/github.com/dgraph-io/badger/README.md @@ -182,7 +182,7 @@ if err != nil { } // Commit the transaction and check for error. -if err := txn.Commit(nil); err != nil { +if err := txn.Commit(); err != nil { return err } ``` @@ -487,30 +487,29 @@ are stored with different versions. These could accumulate, and take up unneeded time these older versions are needed. Badger relies on the client to perform garbage collection at a time of their choosing. It provides -the following methods, which can be invoked at an appropriate time: +the following method, which can be invoked at an appropriate time: -* `DB.PurgeOlderVersions()`: Is no longer needed since v1.5.0. Badger's LSM tree automatically discards older/invalid versions of keys. * `DB.RunValueLogGC()`: This method is designed to do garbage collection while Badger is online. Along with randomly picking a file, it uses statistics generated by the LSM-tree compactions to pick files that are likely to lead to maximum space - reclamation. - -It is recommended that this method be called during periods of low activity in -your system, or periodically. One call would only result in removal of at max -one log file. As an optimization, you could also immediately re-run it whenever -it returns nil error (indicating a successful value log GC). - -```go -ticker := time.NewTicker(5 * time.Minute) -defer ticker.Stop() -for range ticker.C { -again: - err := db.RunValueLogGC(0.7) - if err == nil { - goto again - } -} -``` + reclamation. It is recommended to be called during periods of low activity in + your system, or periodically. One call would only result in removal of at max + one log file. As an optimization, you could also immediately re-run it whenever + it returns nil error (indicating a successful value log GC), as shown below. + + ```go + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for range ticker.C { + again: + err := db.RunValueLogGC(0.7) + if err == nil { + goto again + } + } + ``` + +* `DB.PurgeOlderVersions()`: This method is **DEPRECATED** since v1.5.0. Now, Badger's LSM tree automatically discards older/invalid versions of keys. **Note: The RunValueLogGC method would not garbage collect the latest value log.** @@ -543,6 +542,22 @@ command above to upgrade your database to work with the latest version. badger_backup --dir --backup-file badger.bak ``` +We recommend all users to use the `Backup` and `Restore` APIs and tools. However, +Badger is also rsync-friendly because all files are immutable, barring the +latest value log which is append-only. So, rsync can be used as rudimentary way +to perform a backup. In the following script, we repeat rsync to ensure that the +LSM tree remains consistent with the MANIFEST file while doing a full backup. + +``` +#!/bin/bash +set -o history +set -o histexpand +# Makes a complete copy of a Badger database directory. +# Repeat rsync if the MANIFEST and SSTables are updated. +rsync -avz --delete db/ dst +while !! | grep -q "(MANIFEST\|\.sst)$"; do :; done +``` + ### Memory usage Badger's memory usage can be managed by tweaking several options available in the `Options` struct that is passed in when opening the database using @@ -621,6 +636,7 @@ amplification compared to a typical LSM tree. As such RocksDB's design isn't aimed at SSDs. 3 SSI: Serializable Snapshot Isolation. For more details, see the blog post [Concurrent ACID Transactions in Badger](https://blog.dgraph.io/post/badger-txn/) + 4 Badger provides direct access to value versions via its Iterator API. Users can also specify how many versions to keep per key via Options. @@ -645,6 +661,20 @@ Below is a list of known projects that use Badger: * [emitter](https://github.com/emitter-io/emitter) - Scalable, low latency, distributed pub/sub broker with message storage, uses MQTT, gossip and badger. * [GarageMQ](https://github.com/valinurovam/garagemq) - AMQP server written in Go. * [RedixDB](https://alash3al.github.io/redix/) - A real-time persistent key-value store with the same redis protocol. +* [BBVA](https://github.com/BBVA/raft-badger) - Raft backend implementation using BadgerDB for Hashicorp raft. +* [Riot](https://github.com/go-ego/riot) - An open-source, distributed search engine. +* [Fantom](https://github.com/Fantom-foundation/go-lachesis) - aBFT Consensus platform for distributed applications. +* [decred](https://github.com/decred/dcrdata) - An open, progressive, and self-funding cryptocurrency with a system of community-based governance integrated into its blockchain. +* [OpenNetSys](https://github.com/opennetsys/c3-go) - Create useful dApps in any software language. +* [HoneyTrap](https://github.com/honeytrap/honeytrap) - An extensible and opensource system for running, monitoring and managing honeypots. +* [Insolar](https://github.com/insolar/insolar) - Enterprise-ready blockchain platform. +* [IoTeX](https://github.com/iotexproject/iotex-core) - The next generation of the decentralized network for IoT powered by scalability- and privacy-centric blockchains. +* [go-sessions](https://github.com/kataras/go-sessions) - The sessions manager for Go net/http and fasthttp. +* [Babble](https://github.com/mosaicnetworks/babble) - BFT Consensus platform for distributed applications. +* [Tormenta](https://github.com/jpincas/tormenta) - Embedded object-persistence layer / simple JSON database for Go projects. +* [BadgerHold](https://github.com/timshannon/badgerhold) - An embeddable NoSQL store for querying Go types built on Badger +* [Goblero](https://github.com/didil/goblero) - Pure Go embedded persistent job queue backed by BadgerDB +* [Surfline](https://www.surfline.com) - Serving global wave and weather forecast data with Badger. If you are using Badger in a project please send a pull request to add it to the list. diff --git a/vendor/github.com/dgraph-io/badger/compaction.go b/vendor/github.com/dgraph-io/badger/compaction.go index 3e6bb5f4dcd..c568767cb3b 100644 --- a/vendor/github.com/dgraph-io/badger/compaction.go +++ b/vendor/github.com/dgraph-io/badger/compaction.go @@ -65,7 +65,9 @@ func (r keyRange) overlapsWith(dst keyRange) bool { } func getKeyRange(tables []*table.Table) keyRange { - y.AssertTrue(len(tables) > 0) + if len(tables) == 0 { + return keyRange{} + } smallest := tables[0].Smallest() biggest := tables[0].Biggest() for i := 1; i < len(tables); i++ { diff --git a/vendor/github.com/dgraph-io/badger/db.go b/vendor/github.com/dgraph-io/badger/db.go index 8a69c20a1b1..c1248412394 100644 --- a/vendor/github.com/dgraph-io/badger/db.go +++ b/vendor/github.com/dgraph-io/badger/db.go @@ -19,6 +19,7 @@ package badger import ( "bytes" "encoding/binary" + "encoding/hex" "expvar" "math" "os" @@ -223,12 +224,12 @@ func Open(opt Options) (db *DB, err error) { if err != nil { return nil, err } + defer func() { + if valueDirLockGuard != nil { + _ = valueDirLockGuard.release() + } + }() } - defer func() { - if valueDirLockGuard != nil { - _ = valueDirLockGuard.release() - } - }() if !(opt.ValueLogFileSize <= 2<<30 && opt.ValueLogFileSize >= 1<<20) { return nil, ErrValueLogSize } @@ -300,6 +301,9 @@ func Open(opt Options) (db *DB, err error) { // Let's advance nextTxnTs to one more than whatever we observed via // replaying the logs. db.orc.txnMark.Done(db.orc.nextTxnTs) + // In normal mode, we must update readMark so older versions of keys can be removed during + // compaction when run in offline mode via the flatten tool. + db.orc.readMark.Done(db.orc.nextTxnTs) db.orc.nextTxnTs++ db.writeCh = make(chan *request, kvWriteChCapacity) @@ -346,7 +350,7 @@ func (db *DB) Close() (err error) { defer db.Unlock() y.AssertTrue(db.mt != nil) select { - case db.flushChan <- flushTask{db.mt, db.vhead}: + case db.flushChan <- flushTask{mt: db.mt, vptr: db.vhead}: db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm. db.mt = nil // Will segfault if we try writing! db.elog.Printf("pushed to flush chan\n") @@ -369,10 +373,15 @@ func (db *DB) Close() (err error) { // Force Compact L0 // We don't need to care about cstatus since no parallel compaction is running. if db.opt.CompactL0OnClose { - if err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73}); err != nil { - db.opt.Warningf("While forcing compaction on level 0: %v", err) - } else { + err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73}) + switch err { + case errFillTables: + // This error only means that there might be enough tables to do a compaction. So, we + // should not report it to the end user to avoid confusing them. + case nil: db.opt.Infof("Force compaction on level 0 done") + default: + db.opt.Warningf("While forcing compaction on level 0: %v", err) } } @@ -381,6 +390,7 @@ func (db *DB) Close() (err error) { } db.elog.Printf("Waiting for closer") db.closers.updateSize.SignalAndWait() + db.orc.Stop() db.elog.Finish() @@ -746,7 +756,7 @@ func (db *DB) ensureRoomForWrite() error { y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed. select { - case db.flushChan <- flushTask{db.mt, db.vhead}: + case db.flushChan <- flushTask{mt: db.mt, vptr: db.vhead}: db.elog.Printf("Flushing value log to disk if async mode.") // Ensure value log is synced to disk so this memtable's contents wouldn't be lost. err = db.vlog.sync() @@ -772,12 +782,15 @@ func arenaSize(opt Options) int64 { } // WriteLevel0Table flushes memtable. -func writeLevel0Table(s *skl.Skiplist, f *os.File) error { - iter := s.NewIterator() +func writeLevel0Table(ft flushTask, f *os.File) error { + iter := ft.mt.NewIterator() defer iter.Close() b := table.NewTableBuilder() defer b.Close() for iter.SeekToFirst(); iter.Valid(); iter.Next() { + if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) { + continue + } if err := b.Add(iter.Key(), iter.Value()); err != nil { return err } @@ -787,15 +800,16 @@ func writeLevel0Table(s *skl.Skiplist, f *os.File) error { } type flushTask struct { - mt *skl.Skiplist - vptr valuePointer + mt *skl.Skiplist + vptr valuePointer + dropPrefix []byte } // handleFlushTask must be run serially. func (db *DB) handleFlushTask(ft flushTask) error { if !ft.mt.Empty() { // Store badger head even if vptr is zero, need it for readTs - db.opt.Infof("Storing value log head: %+v\n", ft.vptr) + db.opt.Debugf("Storing value log head: %+v\n", ft.vptr) db.elog.Printf("Storing offset: %+v\n", ft.vptr) offset := make([]byte, vptrSize) ft.vptr.Encode(offset) @@ -816,7 +830,7 @@ func (db *DB) handleFlushTask(ft flushTask) error { dirSyncCh := make(chan error) go func() { dirSyncCh <- syncDir(db.opt.Dir) }() - err = writeLevel0Table(ft.mt, fd) + err = writeLevel0Table(ft, fd) dirSyncErr := <-dirSyncCh if err != nil { @@ -828,7 +842,7 @@ func (db *DB) handleFlushTask(ft flushTask) error { db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr) } - tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode) + tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode, nil) if err != nil { db.elog.Printf("ERROR while opening table: %v", err) return err @@ -836,22 +850,7 @@ func (db *DB) handleFlushTask(ft flushTask) error { // We own a ref on tbl. err = db.lc.addLevel0Table(tbl) // This will incrRef (if we don't error, sure) tbl.DecrRef() // Releases our ref. - if err != nil { - return err - } - - // Update s.imm. Need a lock. - db.Lock() - defer db.Unlock() - // This is a single-threaded operation. ft.mt corresponds to the head of - // db.imm list. Once we flush it, we advance db.imm. The next ft.mt - // which would arrive here would match db.imm[0], because we acquire a - // lock over DB when pushing to flushChan. - // TODO: This logic is dirty AF. Any change and this could easily break. - y.AssertTrue(ft.mt == db.imm[0]) - db.imm = db.imm[1:] - ft.mt.DecrRef() // Return memory. - return nil + return err } // flushMemtable must keep running until we send it an empty flushTask. If there @@ -867,6 +866,18 @@ func (db *DB) flushMemtable(lc *y.Closer) error { for { err := db.handleFlushTask(ft) if err == nil { + // Update s.imm. Need a lock. + db.Lock() + // This is a single-threaded operation. ft.mt corresponds to the head of + // db.imm list. Once we flush it, we advance db.imm. The next ft.mt + // which would arrive here would match db.imm[0], because we acquire a + // lock over DB when pushing to flushChan. + // TODO: This logic is dirty AF. Any change and this could easily break. + y.AssertTrue(ft.mt == db.imm[0]) + db.imm = db.imm[1:] + ft.mt.DecrRef() // Return memory. + db.Unlock() + break } // Encountered error. Retry indefinitely. @@ -1229,22 +1240,10 @@ func (db *DB) Flatten(workers int) error { } } -// DropAll would drop all the data stored in Badger. It does this in the following way. -// - Stop accepting new writes. -// - Pause memtable flushes and compactions. -// - Pick all tables from all levels, create a changeset to delete all these -// tables and apply it to manifest. -// - Pick all log files from value log, and delete all of them. Restart value log files from zero. -// - Resume memtable flushes and compactions. -// -// NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do -// any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and -// writes are paused before running DropAll, and resumed after it is finished. -func (db *DB) DropAll() error { +func (db *DB) prepareToDrop() func() { if db.opt.ReadOnly { panic("Attempting to drop data in read-only mode.") } - db.opt.Infof("DropAll called. Blocking writes...") // Stop accepting new writes. atomic.StoreInt32(&db.blockWrites, 1) @@ -1254,7 +1253,7 @@ func (db *DB) DropAll() error { // Stop all compactions. db.stopCompactions() - defer func() { + return func() { db.opt.Infof("Resuming writes") db.startCompactions() @@ -1264,8 +1263,24 @@ func (db *DB) DropAll() error { // Resume writes. atomic.StoreInt32(&db.blockWrites, 0) - }() - db.opt.Infof("Compactions stopped. Dropping all SSTables...") + } +} + +// DropAll would drop all the data stored in Badger. It does this in the following way. +// - Stop accepting new writes. +// - Pause memtable flushes and compactions. +// - Pick all tables from all levels, create a changeset to delete all these +// tables and apply it to manifest. +// - Pick all log files from value log, and delete all of them. Restart value log files from zero. +// - Resume memtable flushes and compactions. +// +// NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do +// any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and +// writes are paused before running DropAll, and resumed after it is finished. +func (db *DB) DropAll() error { + db.opt.Infof("DropAll called. Blocking writes...") + f := db.prepareToDrop() + defer f() // Block all foreign interactions with memory tables. db.Lock() @@ -1273,13 +1288,13 @@ func (db *DB) DropAll() error { // Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed. db.mt.DecrRef() - db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Set it up for future writes. for _, mt := range db.imm { mt.DecrRef() } db.imm = db.imm[:0] + db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Set it up for future writes. - num, err := db.lc.deleteLSMTree() + num, err := db.lc.dropTree() if err != nil { return err } @@ -1293,3 +1308,51 @@ func (db *DB) DropAll() error { db.opt.Infof("Deleted %d value log files. DropAll done.\n", num) return nil } + +// DropPrefix would drop all the keys with the provided prefix. It does this in the following way: +// - Stop accepting new writes. +// - Stop memtable flushes and compactions. +// - Flush out all memtables, skipping over keys with the given prefix, Kp. +// - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp +// back after a restart. +// - Compact L0->L1, skipping over Kp. +// - Compact rest of the levels, Li->Li, picking tables which have Kp. +// - Resume memtable flushes, compactions and writes. +func (db *DB) DropPrefix(prefix []byte) error { + db.opt.Infof("DropPrefix called on %s. Blocking writes...", hex.Dump(prefix)) + f := db.prepareToDrop() + defer f() + + // Block all foreign interactions with memory tables. + db.Lock() + defer db.Unlock() + + db.imm = append(db.imm, db.mt) + for _, memtable := range db.imm { + if memtable.Empty() { + memtable.DecrRef() + continue + } + task := flushTask{ + mt: memtable, + // Ensure that the head of value log gets persisted to disk. + vptr: db.vhead, + dropPrefix: prefix, + } + db.opt.Debugf("Flushing memtable") + if err := db.handleFlushTask(task); err != nil { + db.opt.Errorf("While trying to flush memtable: %v", err) + return err + } + memtable.DecrRef() + } + db.imm = db.imm[:0] + db.mt = skl.NewSkiplist(arenaSize(db.opt)) + + // Drop prefixes from the levels. + if err := db.lc.dropPrefix(prefix); err != nil { + return err + } + db.opt.Infof("DropPrefix done") + return nil +} diff --git a/vendor/github.com/dgraph-io/badger/errors.go b/vendor/github.com/dgraph-io/badger/errors.go index 130635752a4..a0c1806870c 100644 --- a/vendor/github.com/dgraph-io/badger/errors.go +++ b/vendor/github.com/dgraph-io/badger/errors.go @@ -27,7 +27,7 @@ var ( // ErrValueThreshold is returned when ValueThreshold is set to a value close to or greater than // uint16. - ErrValueThreshold = errors.New("Invalid ValueThreshold, must be lower than uint16.") + ErrValueThreshold = errors.New("Invalid ValueThreshold, must be lower than uint16") // ErrKeyNotFound is returned when key isn't found on a txn.Get. ErrKeyNotFound = errors.New("Key not found") diff --git a/vendor/github.com/dgraph-io/badger/level_handler.go b/vendor/github.com/dgraph-io/badger/level_handler.go index 42d5b16d371..147967fb8c0 100644 --- a/vendor/github.com/dgraph-io/badger/level_handler.go +++ b/vendor/github.com/dgraph-io/badger/level_handler.go @@ -102,48 +102,40 @@ func (s *levelHandler) deleteTables(toDel []*table.Table) error { // replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right]. // You must call decr() to delete the old tables _after_ writing the update to the manifest. -func (s *levelHandler) replaceTables(newTables []*table.Table) error { +func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error { // Need to re-search the range of tables in this level to be replaced as other goroutines might // be changing it as well. (They can't touch our tables, but if they add/remove other tables, // the indices get shifted around.) - if len(newTables) == 0 { - return nil - } - s.Lock() // We s.Unlock() below. - // Increase totalSize first. - for _, tbl := range newTables { - s.totalSize += tbl.Size() - tbl.IncrRef() + toDelMap := make(map[uint64]struct{}) + for _, t := range toDel { + toDelMap[t.ID()] = struct{}{} } - - kr := keyRange{ - left: newTables[0].Smallest(), - right: newTables[len(newTables)-1].Biggest(), + var newTables []*table.Table + for _, t := range s.tables { + _, found := toDelMap[t.ID()] + if !found { + newTables = append(newTables, t) + continue + } + s.totalSize -= t.Size() } - left, right := s.overlappingTables(levelHandlerRLocked{}, kr) - - toDecr := make([]*table.Table, right-left) - // Update totalSize and reference counts. - for i := left; i < right; i++ { - tbl := s.tables[i] - s.totalSize -= tbl.Size() - toDecr[i-left] = tbl + + // Increase totalSize first. + for _, t := range toAdd { + s.totalSize += t.Size() + t.IncrRef() + newTables = append(newTables, t) } - // To be safe, just make a copy. TODO: Be more careful and avoid copying. - numDeleted := right - left - numAdded := len(newTables) - tables := make([]*table.Table, len(s.tables)-numDeleted+numAdded) - y.AssertTrue(left == copy(tables, s.tables[:left])) - t := tables[left:] - y.AssertTrue(numAdded == copy(t, newTables)) - t = t[numAdded:] - y.AssertTrue(len(s.tables[right:]) == copy(t, s.tables[right:])) - s.tables = tables + // Assign tables. + s.tables = newTables + sort.Slice(s.tables, func(i, j int) bool { + return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0 + }) s.Unlock() // s.Unlock before we DecrRef tables -- that can be slow. - return decrRefs(toDecr) + return decrRefs(toDel) } func decrRefs(tables []*table.Table) error { @@ -294,6 +286,9 @@ type levelHandlerRLocked struct{} // This function should already have acquired a read lock, and this is so important the caller must // pass an empty parameter declaring such. func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) { + if len(kr.left) == 0 || len(kr.right) == 0 { + return 0, 0 + } left := sort.Search(len(s.tables), func(i int) bool { return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0 }) diff --git a/vendor/github.com/dgraph-io/badger/levels.go b/vendor/github.com/dgraph-io/badger/levels.go index e1d94c7d4dc..9c8a4908a62 100644 --- a/vendor/github.com/dgraph-io/badger/levels.go +++ b/vendor/github.com/dgraph-io/badger/levels.go @@ -17,11 +17,13 @@ package badger import ( + "bytes" "fmt" "math" "math/rand" "os" "sort" + "strings" "sync" "sync/atomic" "time" @@ -122,7 +124,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { tick := time.NewTicker(3 * time.Second) defer tick.Stop() - for fileID, tableManifest := range mf.Tables { + for fileID, tf := range mf.Tables { fname := table.NewFilename(fileID, db.opt.Dir) select { case <-tick.C: @@ -137,7 +139,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { if fileID > maxFileID { maxFileID = fileID } - go func(fname string, level int) { + go func(fname string, tf TableManifest) { var rerr error defer func() { throttle.Done(rerr) @@ -149,16 +151,22 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { return } - t, err := table.OpenTable(fd, db.opt.TableLoadingMode) + t, err := table.OpenTable(fd, db.opt.TableLoadingMode, tf.Checksum) if err != nil { - rerr = errors.Wrapf(err, "Opening table: %q", fname) + if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") { + db.opt.Errorf(err.Error()) + db.opt.Errorf("Ignoring table %s", fd.Name()) + // Do not set rerr. We will continue without this table. + } else { + rerr = errors.Wrapf(err, "Opening table: %q", fname) + } return } mu.Lock() - tables[level] = append(tables[level], t) + tables[tf.Level] = append(tables[tf.Level], t) mu.Unlock() - }(fname, int(tableManifest.Level)) + }(fname, tf) } if err := throttle.Finish(); err != nil { closeAllTables(tables) @@ -208,10 +216,10 @@ func (s *levelsController) cleanupLevels() error { return firstErr } -// This function picks all tables from all levels, creates a manifest changeset, +// dropTree picks all tables from all levels, creates a manifest changeset, // applies it, and then decrements the refs of these tables, which would result // in their deletion. -func (s *levelsController) deleteLSMTree() (int, error) { +func (s *levelsController) dropTree() (int, error) { // First pick all tables, so we can create a manifest changelog. var all []*table.Table for _, l := range s.levels { @@ -226,7 +234,7 @@ func (s *levelsController) deleteLSMTree() (int, error) { // Generate the manifest changes. changes := []*pb.ManifestChange{} for _, table := range all { - changes = append(changes, makeTableDeleteChange(table.ID())) + changes = append(changes, newDeleteChange(table.ID())) } changeSet := pb.ManifestChangeSet{Changes: changes} if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil { @@ -248,6 +256,72 @@ func (s *levelsController) deleteLSMTree() (int, error) { return len(all), nil } +// dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the +// levels. For L0->L1 compaction, it runs compactions normally, but skips over all the keys with the +// provided prefix. For Li->Li compactions, it picks up the tables which would have the prefix. The +// tables who only have keys with this prefix are quickly dropped. The ones which have other keys +// are run through MergeIterator and compacted to create new tables. All the mechanisms of +// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow. +func (s *levelsController) dropPrefix(prefix []byte) error { + opt := s.kv.opt + for _, l := range s.levels { + l.RLock() + if l.level == 0 { + size := len(l.tables) + l.RUnlock() + + if size > 0 { + cp := compactionPriority{ + level: 0, + score: 1.74, + // A unique number greater than 1.0 does two things. Helps identify this + // function in logs, and forces a compaction. + dropPrefix: prefix, + } + if err := s.doCompact(cp); err != nil { + opt.Warningf("While compacting level 0: %v", err) + return nil + } + } + continue + } + + var tables []*table.Table + for _, table := range l.tables { + var absent bool + switch { + case bytes.HasPrefix(table.Smallest(), prefix): + case bytes.HasPrefix(table.Biggest(), prefix): + case bytes.Compare(prefix, table.Smallest()) > 0 && + bytes.Compare(prefix, table.Biggest()) < 0: + default: + absent = true + } + if !absent { + tables = append(tables, table) + } + } + l.RUnlock() + if len(tables) == 0 { + continue + } + + cd := compactDef{ + elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), + thisLevel: l, + nextLevel: l, + top: []*table.Table{}, + bot: tables, + dropPrefix: prefix, + } + if err := s.runCompactDef(l.level, cd); err != nil { + opt.Warningf("While running compact def: %+v. Error: %v", cd, err) + return err + } + } + return nil +} + func (s *levelsController) startCompact(lc *y.Closer) { n := s.kv.opt.NumCompactors lc.AddRunning(n - 1) @@ -304,8 +378,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool { } type compactionPriority struct { - level int - score float64 + level int + score float64 + dropPrefix []byte } // pickCompactLevel determines which level to compact. @@ -343,7 +418,7 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) { // compactBuildTables merge topTables and botTables to form a list of new tables. func (s *levelsController) compactBuildTables( - l int, cd compactDef) ([]*table.Table, func() error, error) { + lev int, cd compactDef) ([]*table.Table, func() error, error) { topTables := cd.top botTables := cd.bot @@ -351,7 +426,7 @@ func (s *levelsController) compactBuildTables( { kr := getKeyRange(cd.top) for i, lh := range s.levels { - if i <= l { // Skip upper levels. + if i <= lev { // Skip upper levels. continue } lh.RLock() @@ -362,7 +437,6 @@ func (s *levelsController) compactBuildTables( break } } - cd.elog.LazyPrintf("Key range overlaps with lower levels: %v", hasOverlap) } // Try to collect stats so that we can inform value log about GC. That would help us find which @@ -378,15 +452,26 @@ func (s *levelsController) compactBuildTables( // Create iterators across all the tables involved first. var iters []y.Iterator - if l == 0 { + if lev == 0 { iters = appendIteratorsReversed(iters, topTables, false) - } else { + } else if len(topTables) > 0 { y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(false)} } // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. - iters = append(iters, table.NewConcatIterator(botTables, false)) + var valid []*table.Table + for _, table := range botTables { + if len(cd.dropPrefix) > 0 && + bytes.HasPrefix(table.Smallest(), cd.dropPrefix) && + bytes.HasPrefix(table.Biggest(), cd.dropPrefix) { + // All the keys in this table have the dropPrefix. So, this table does not need to be + // in the iterator and can be dropped immediately. + continue + } + valid = append(valid, table) + } + iters = append(iters, table.NewConcatIterator(valid, false)) it := y.NewMergeIterator(iters, false) defer it.Close() // Important to close the iterator to do ref counting. @@ -410,6 +495,13 @@ func (s *levelsController) compactBuildTables( builder := table.NewTableBuilder() var numKeys, numSkips uint64 for ; it.Valid(); it.Next() { + // See if we need to skip the prefix. + if len(cd.dropPrefix) > 0 && bytes.HasPrefix(it.Key(), cd.dropPrefix) { + numSkips++ + updateStats(it.Value()) + continue + } + // See if we need to skip this key. if len(skipKey) > 0 { if y.SameKey(it.Key(), skipKey) { @@ -467,8 +559,8 @@ func (s *levelsController) compactBuildTables( } // It was true that it.Valid() at least once in the loop above, which means we // called Add() at least once, and builder is not Empty(). - cd.elog.LazyPrintf("Added %d keys. Skipped %d keys.", numKeys, numSkips) - cd.elog.LazyPrintf("LOG Compact. Iteration took: %v\n", time.Since(timeStart)) + s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v", + numKeys, numSkips, time.Since(timeStart)) if !builder.Empty() { numBuilds++ fileID := s.reserveFileID() @@ -486,7 +578,7 @@ func (s *levelsController) compactBuildTables( return } - tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode) + tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode, nil) // decrRef is added below. resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())} }(builder) @@ -527,20 +619,21 @@ func (s *levelsController) compactBuildTables( return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0 }) s.kv.vlog.updateGCStats(discardStats) - cd.elog.LazyPrintf("Discard stats: %v", discardStats) + s.kv.opt.Debugf("Discard stats: %v", discardStats) return newTables, func() error { return decrRefs(newTables) }, nil } func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet { changes := []*pb.ManifestChange{} for _, table := range newTables { - changes = append(changes, makeTableCreateChange(table.ID(), cd.nextLevel.level)) + changes = append(changes, + newCreateChange(table.ID(), cd.nextLevel.level, table.Checksum)) } for _, table := range cd.top { - changes = append(changes, makeTableDeleteChange(table.ID())) + changes = append(changes, newDeleteChange(table.ID())) } for _, table := range cd.bot { - changes = append(changes, makeTableDeleteChange(table.ID())) + changes = append(changes, newDeleteChange(table.ID())) } return pb.ManifestChangeSet{Changes: changes} } @@ -558,6 +651,8 @@ type compactDef struct { nextRange keyRange thisSize int64 + + dropPrefix []byte } func (cd *compactDef) lockLevels() { @@ -681,7 +776,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { // See comment earlier in this function about the ordering of these ops, and the order in which // we access levels when reading. - if err := nextLevel.replaceTables(newTables); err != nil { + if err := nextLevel.replaceTables(cd.bot, newTables); err != nil { return err } if err := thisLevel.deleteTables(cd.top); err != nil { @@ -691,8 +786,9 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { // Note: For level 0, while doCompact is running, it is possible that new tables are added. // However, the tables are added only to the end, so it is ok to just delete the first table. - cd.elog.LazyPrintf("LOG Compact %d->%d, del %d tables, add %d tables, took %v\n", - l, l+1, len(cd.top)+len(cd.bot), len(newTables), time.Since(timeStart)) + s.kv.opt.Infof("LOG Compact %d->%d, del %d tables, add %d tables, took %v\n", + thisLevel.level, nextLevel.level, len(cd.top)+len(cd.bot), + len(newTables), time.Since(timeStart)) return nil } @@ -704,41 +800,40 @@ func (s *levelsController) doCompact(p compactionPriority) error { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), - thisLevel: s.levels[l], - nextLevel: s.levels[l+1], + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), + thisLevel: s.levels[l], + nextLevel: s.levels[l+1], + dropPrefix: p.dropPrefix, } cd.elog.SetMaxEvents(100) defer cd.elog.Finish() - cd.elog.LazyPrintf("Got compaction priority: %+v", p) + s.kv.opt.Infof("Got compaction priority: %+v", p) // While picking tables to be compacted, both levels' tables are expected to // remain unchanged. if l == 0 { if !s.fillTablesL0(&cd) { - cd.elog.LazyPrintf("fillTables failed for level: %d\n", l) return errFillTables } } else { if !s.fillTables(&cd) { - cd.elog.LazyPrintf("fillTables failed for level: %d\n", l) return errFillTables } } defer s.cstatus.delete(cd) // Remove the ranges from compaction status. - cd.elog.LazyPrintf("Running for level: %d\n", cd.thisLevel.level) + s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level) s.cstatus.toLog(cd.elog) if err := s.runCompactDef(l, cd); err != nil { // This compaction couldn't be done successfully. - cd.elog.LazyPrintf("\tLOG Compact FAILED with error: %+v: %+v", err, cd) + s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd) return err } s.cstatus.toLog(cd.elog) - cd.elog.LazyPrintf("Compaction for level: %d DONE", cd.thisLevel.level) + s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level) return nil } @@ -748,7 +843,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // the proper order. (That means this update happens before that of some compaction which // deletes the table.) err := s.kv.manifest.addChanges([]*pb.ManifestChange{ - makeTableCreateChange(t.ID(), 0), + newCreateChange(t.ID(), 0, t.Checksum), }) if err != nil { return err diff --git a/vendor/github.com/dgraph-io/badger/logger.go b/vendor/github.com/dgraph-io/badger/logger.go index 0d58191da46..2c2795f52ee 100644 --- a/vendor/github.com/dgraph-io/badger/logger.go +++ b/vendor/github.com/dgraph-io/badger/logger.go @@ -24,56 +24,42 @@ import ( // Logger is implemented by any logging system that is used for standard logs. type Logger interface { Errorf(string, ...interface{}) - Infof(string, ...interface{}) Warningf(string, ...interface{}) -} - -var badgerLogger Logger - -// SetLogger sets the global logger. -func SetLogger(l Logger) { badgerLogger = l } - -// Errorf logs an ERROR message to the global logger. -func Errorf(format string, v ...interface{}) { - badgerLogger.Errorf(format, v...) + Infof(string, ...interface{}) + Debugf(string, ...interface{}) } // Errorf logs an ERROR log message to the logger specified in opts or to the // global logger if no logger is specified in opts. func (opt *Options) Errorf(format string, v ...interface{}) { - if opt.Logger != nil { - opt.Logger.Errorf(format, v...) + if opt.Logger == nil { return } - Errorf(format, v...) + opt.Logger.Errorf(format, v...) } -// Infof logs an INFO message to the global logger. -func Infof(format string, v ...interface{}) { - badgerLogger.Infof(format, v...) -} - -// Infof is like Errorf but for INFO messages. +// Infof logs an INFO message to the logger specified in opts. func (opt *Options) Infof(format string, v ...interface{}) { - if opt.Logger != nil { - opt.Logger.Infof(format, v...) + if opt.Logger == nil { return } - Infof(format, v...) + opt.Logger.Infof(format, v...) } -// Warningf logs a WARNING message to the global logger. -func Warningf(format string, v ...interface{}) { - badgerLogger.Warningf(format, v...) +// Warningf logs a WARNING message to the logger specified in opts. +func (opt *Options) Warningf(format string, v ...interface{}) { + if opt.Logger == nil { + return + } + opt.Logger.Warningf(format, v...) } -// Warningf is like Errorf but for WARNING messages. -func (opt *Options) Warningf(format string, v ...interface{}) { - if opt.Logger != nil { - opt.Logger.Warningf(format, v...) +// Warningf logs a WARNING message to the logger specified in opts. +func (opt *Options) Debugf(format string, v ...interface{}) { + if opt.Logger == nil { return } - Warningf(format, v...) + opt.Logger.Debugf(format, v...) } type defaultLog struct { @@ -82,21 +68,18 @@ type defaultLog struct { var defaultLogger = &defaultLog{Logger: log.New(os.Stderr, "badger ", log.LstdFlags)} -// UseDefaultLogger sets the global logger to the default logger. -func UseDefaultLogger() { SetLogger(defaultLogger) } - func (l *defaultLog) Errorf(f string, v ...interface{}) { l.Printf("ERROR: "+f, v...) } -func (l *defaultLog) Infof(f string, v ...interface{}) { - l.Printf("INFO: "+f, v...) -} - func (l *defaultLog) Warningf(f string, v ...interface{}) { l.Printf("WARNING: "+f, v...) } -func init() { - UseDefaultLogger() +func (l *defaultLog) Infof(f string, v ...interface{}) { + l.Printf("INFO: "+f, v...) +} + +func (l *defaultLog) Debugf(f string, v ...interface{}) { + l.Printf("DEBUG: "+f, v...) } diff --git a/vendor/github.com/dgraph-io/badger/manifest.go b/vendor/github.com/dgraph-io/badger/manifest.go index f8f3f89771d..34ce1217243 100644 --- a/vendor/github.com/dgraph-io/badger/manifest.go +++ b/vendor/github.com/dgraph-io/badger/manifest.go @@ -42,7 +42,7 @@ import ( // reconstruct the manifest at startup. type Manifest struct { Levels []levelManifest - Tables map[uint64]tableManifest + Tables map[uint64]TableManifest // Contains total number of creation and deletion changes in the manifest -- used to compute // whether it'd be useful to rewrite the manifest. @@ -54,7 +54,7 @@ func createManifest() Manifest { levels := make([]levelManifest, 0) return Manifest{ Levels: levels, - Tables: make(map[uint64]tableManifest), + Tables: make(map[uint64]TableManifest), } } @@ -64,10 +64,11 @@ type levelManifest struct { Tables map[uint64]struct{} // Set of table id's } -// tableManifest contains information about a specific level +// TableManifest contains information about a specific level // in the LSM tree. -type tableManifest struct { - Level uint8 +type TableManifest struct { + Level uint8 + Checksum []byte } // manifestFile holds the file pointer (and other info) about the manifest file, which is a log @@ -98,7 +99,7 @@ const ( func (m *Manifest) asChanges() []*pb.ManifestChange { changes := make([]*pb.ManifestChange, 0, len(m.Tables)) for id, tm := range m.Tables { - changes = append(changes, makeTableCreateChange(id, int(tm.Level))) + changes = append(changes, newCreateChange(id, int(tm.Level), tm.Checksum)) } return changes } @@ -384,8 +385,9 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error { if _, ok := build.Tables[tc.Id]; ok { return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id) } - build.Tables[tc.Id] = tableManifest{ - Level: uint8(tc.Level), + build.Tables[tc.Id] = TableManifest{ + Level: uint8(tc.Level), + Checksum: append([]byte{}, tc.Checksum...), } for len(build.Levels) <= int(tc.Level) { build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})}) @@ -417,15 +419,16 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error { return nil } -func makeTableCreateChange(id uint64, level int) *pb.ManifestChange { +func newCreateChange(id uint64, level int, checksum []byte) *pb.ManifestChange { return &pb.ManifestChange{ - Id: id, - Op: pb.ManifestChange_CREATE, - Level: uint32(level), + Id: id, + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + Checksum: checksum, } } -func makeTableDeleteChange(id uint64) *pb.ManifestChange { +func newDeleteChange(id uint64) *pb.ManifestChange { return &pb.ManifestChange{ Id: id, Op: pb.ManifestChange_DELETE, diff --git a/vendor/github.com/dgraph-io/badger/options.go b/vendor/github.com/dgraph-io/badger/options.go index f9b195fa5c7..de2a32aa9c2 100644 --- a/vendor/github.com/dgraph-io/badger/options.go +++ b/vendor/github.com/dgraph-io/badger/options.go @@ -31,10 +31,11 @@ import ( type Options struct { // 1. Mandatory flags // ------------------- - // Directory to store the data in. Should exist and be writable. + // Directory to store the data in. If it doesn't exist, Badger will + // try to create it for you. Dir string - // Directory to store the value log in. Can be the same as Dir. Should - // exist and be writable. + // Directory to store the value log in. Can be the same as Dir. If it + // doesn't exist, Badger will try to create it for you. ValueDir string // 2. Frequently modified flags diff --git a/vendor/github.com/dgraph-io/badger/pb/pb.pb.go b/vendor/github.com/dgraph-io/badger/pb/pb.pb.go index 8ab8d8a6321..6fdb919879e 100644 --- a/vendor/github.com/dgraph-io/badger/pb/pb.pb.go +++ b/vendor/github.com/dgraph-io/badger/pb/pb.pb.go @@ -3,11 +3,12 @@ package pb -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -import io "io" +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + io "io" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -31,6 +32,7 @@ var ManifestChange_Operation_name = map[int32]string{ 0: "CREATE", 1: "DELETE", } + var ManifestChange_Operation_value = map[string]int32{ "CREATE": 0, "DELETE": 1, @@ -39,8 +41,9 @@ var ManifestChange_Operation_value = map[string]int32{ func (x ManifestChange_Operation) String() string { return proto.EnumName(ManifestChange_Operation_name, int32(x)) } + func (ManifestChange_Operation) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pb_7d6edc481ce0a5cd, []int{3, 0} + return fileDescriptor_f80abaa17e25ccc8, []int{3, 0} } type KV struct { @@ -59,7 +62,7 @@ func (m *KV) Reset() { *m = KV{} } func (m *KV) String() string { return proto.CompactTextString(m) } func (*KV) ProtoMessage() {} func (*KV) Descriptor() ([]byte, []int) { - return fileDescriptor_pb_7d6edc481ce0a5cd, []int{0} + return fileDescriptor_f80abaa17e25ccc8, []int{0} } func (m *KV) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -76,8 +79,8 @@ func (m *KV) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } -func (dst *KV) XXX_Merge(src proto.Message) { - xxx_messageInfo_KV.Merge(dst, src) +func (m *KV) XXX_Merge(src proto.Message) { + xxx_messageInfo_KV.Merge(m, src) } func (m *KV) XXX_Size() int { return m.Size() @@ -131,7 +134,7 @@ func (m *KV) GetMeta() []byte { } type KVList struct { - Kv []*KV `protobuf:"bytes,1,rep,name=kv" json:"kv,omitempty"` + Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -141,7 +144,7 @@ func (m *KVList) Reset() { *m = KVList{} } func (m *KVList) String() string { return proto.CompactTextString(m) } func (*KVList) ProtoMessage() {} func (*KVList) Descriptor() ([]byte, []int) { - return fileDescriptor_pb_7d6edc481ce0a5cd, []int{1} + return fileDescriptor_f80abaa17e25ccc8, []int{1} } func (m *KVList) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -158,8 +161,8 @@ func (m *KVList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } -func (dst *KVList) XXX_Merge(src proto.Message) { - xxx_messageInfo_KVList.Merge(dst, src) +func (m *KVList) XXX_Merge(src proto.Message) { + xxx_messageInfo_KVList.Merge(m, src) } func (m *KVList) XXX_Size() int { return m.Size() @@ -179,7 +182,7 @@ func (m *KVList) GetKv() []*KV { type ManifestChangeSet struct { // A set of changes that are applied atomically. - Changes []*ManifestChange `protobuf:"bytes,1,rep,name=changes" json:"changes,omitempty"` + Changes []*ManifestChange `protobuf:"bytes,1,rep,name=changes,proto3" json:"changes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -189,7 +192,7 @@ func (m *ManifestChangeSet) Reset() { *m = ManifestChangeSet{} } func (m *ManifestChangeSet) String() string { return proto.CompactTextString(m) } func (*ManifestChangeSet) ProtoMessage() {} func (*ManifestChangeSet) Descriptor() ([]byte, []int) { - return fileDescriptor_pb_7d6edc481ce0a5cd, []int{2} + return fileDescriptor_f80abaa17e25ccc8, []int{2} } func (m *ManifestChangeSet) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -206,8 +209,8 @@ func (m *ManifestChangeSet) XXX_Marshal(b []byte, deterministic bool) ([]byte, e return b[:n], nil } } -func (dst *ManifestChangeSet) XXX_Merge(src proto.Message) { - xxx_messageInfo_ManifestChangeSet.Merge(dst, src) +func (m *ManifestChangeSet) XXX_Merge(src proto.Message) { + xxx_messageInfo_ManifestChangeSet.Merge(m, src) } func (m *ManifestChangeSet) XXX_Size() int { return m.Size() @@ -229,6 +232,7 @@ type ManifestChange struct { Id uint64 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,proto3,enum=pb.ManifestChange_Operation" json:"Op,omitempty"` Level uint32 `protobuf:"varint,3,opt,name=Level,proto3" json:"Level,omitempty"` + Checksum []byte `protobuf:"bytes,4,opt,name=Checksum,proto3" json:"Checksum,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -238,7 +242,7 @@ func (m *ManifestChange) Reset() { *m = ManifestChange{} } func (m *ManifestChange) String() string { return proto.CompactTextString(m) } func (*ManifestChange) ProtoMessage() {} func (*ManifestChange) Descriptor() ([]byte, []int) { - return fileDescriptor_pb_7d6edc481ce0a5cd, []int{3} + return fileDescriptor_f80abaa17e25ccc8, []int{3} } func (m *ManifestChange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -255,8 +259,8 @@ func (m *ManifestChange) XXX_Marshal(b []byte, deterministic bool) ([]byte, erro return b[:n], nil } } -func (dst *ManifestChange) XXX_Merge(src proto.Message) { - xxx_messageInfo_ManifestChange.Merge(dst, src) +func (m *ManifestChange) XXX_Merge(src proto.Message) { + xxx_messageInfo_ManifestChange.Merge(m, src) } func (m *ManifestChange) XXX_Size() int { return m.Size() @@ -288,13 +292,49 @@ func (m *ManifestChange) GetLevel() uint32 { return 0 } +func (m *ManifestChange) GetChecksum() []byte { + if m != nil { + return m.Checksum + } + return nil +} + func init() { + proto.RegisterEnum("pb.ManifestChange_Operation", ManifestChange_Operation_name, ManifestChange_Operation_value) proto.RegisterType((*KV)(nil), "pb.KV") proto.RegisterType((*KVList)(nil), "pb.KVList") proto.RegisterType((*ManifestChangeSet)(nil), "pb.ManifestChangeSet") proto.RegisterType((*ManifestChange)(nil), "pb.ManifestChange") - proto.RegisterEnum("pb.ManifestChange_Operation", ManifestChange_Operation_name, ManifestChange_Operation_value) } + +func init() { proto.RegisterFile("pb.proto", fileDescriptor_f80abaa17e25ccc8) } + +var fileDescriptor_f80abaa17e25ccc8 = []byte{ + // 342 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x91, 0x4d, 0x6a, 0xf2, 0x40, + 0x18, 0xc7, 0x9d, 0x31, 0x46, 0x7d, 0x5e, 0x5f, 0x49, 0x87, 0x52, 0x42, 0x3f, 0x42, 0x48, 0x37, + 0x2e, 0x24, 0x0b, 0x7b, 0x02, 0x6b, 0xb3, 0x10, 0x15, 0x61, 0x2a, 0x6e, 0x25, 0xd1, 0xa7, 0x35, + 0x44, 0x93, 0x21, 0x19, 0x43, 0x7b, 0x91, 0xd2, 0x0b, 0xf4, 0x2e, 0x5d, 0xf6, 0x08, 0xc5, 0x5e, + 0xa4, 0x64, 0xfc, 0x00, 0xe9, 0xee, 0xff, 0x31, 0xcf, 0x7f, 0xf1, 0x1b, 0xa8, 0x89, 0xc0, 0x15, + 0x69, 0x22, 0x13, 0x46, 0x45, 0xe0, 0xbc, 0x11, 0xa0, 0x83, 0x29, 0x33, 0xa0, 0x1c, 0xe1, 0xab, + 0x49, 0x6c, 0xd2, 0x6a, 0xf0, 0x42, 0xb2, 0x73, 0xa8, 0xe4, 0xfe, 0x6a, 0x83, 0x26, 0x55, 0xd9, + 0xce, 0xb0, 0x2b, 0xa8, 0x6f, 0x32, 0x4c, 0x67, 0x6b, 0x94, 0xbe, 0x59, 0x56, 0x4d, 0xad, 0x08, + 0x46, 0x28, 0x7d, 0x66, 0x42, 0x35, 0xc7, 0x34, 0x0b, 0x93, 0xd8, 0xd4, 0x6c, 0xd2, 0xd2, 0xf8, + 0xc1, 0xb2, 0x1b, 0x00, 0x7c, 0x11, 0x61, 0x8a, 0xd9, 0xcc, 0x97, 0x66, 0x45, 0x95, 0xf5, 0x7d, + 0xd2, 0x95, 0x8c, 0x81, 0xa6, 0x06, 0x75, 0x35, 0xa8, 0xb4, 0x63, 0x83, 0x3e, 0x98, 0x0e, 0xc3, + 0x4c, 0xb2, 0x0b, 0xa0, 0x51, 0x6e, 0x12, 0xbb, 0xdc, 0xfa, 0xd7, 0xd1, 0x5d, 0x11, 0xb8, 0x83, + 0x29, 0xa7, 0x51, 0xee, 0x74, 0xe1, 0x6c, 0xe4, 0xc7, 0xe1, 0x13, 0x66, 0xb2, 0xb7, 0xf4, 0xe3, + 0x67, 0x7c, 0x44, 0xc9, 0xda, 0x50, 0x9d, 0x2b, 0x93, 0xed, 0x2f, 0x58, 0x71, 0x71, 0xfa, 0x8e, + 0x1f, 0x9e, 0x38, 0x1f, 0x04, 0x9a, 0xa7, 0x1d, 0x6b, 0x02, 0xed, 0x2f, 0x14, 0x08, 0x8d, 0xd3, + 0xfe, 0x82, 0xb5, 0x81, 0x8e, 0x85, 0x82, 0xd0, 0xec, 0x5c, 0xff, 0xdd, 0x72, 0xc7, 0x02, 0x53, + 0x5f, 0x86, 0x49, 0xcc, 0xe9, 0x58, 0x14, 0xd4, 0x86, 0x98, 0xe3, 0x4a, 0xb1, 0xf9, 0xcf, 0x77, + 0x86, 0x5d, 0x42, 0xad, 0xb7, 0xc4, 0x79, 0x94, 0x6d, 0xd6, 0x8a, 0x4c, 0x83, 0x1f, 0xbd, 0x73, + 0x0b, 0xf5, 0xe3, 0x04, 0x03, 0xd0, 0x7b, 0xdc, 0xeb, 0x4e, 0x3c, 0xa3, 0x54, 0xe8, 0x07, 0x6f, + 0xe8, 0x4d, 0x3c, 0x83, 0xdc, 0x1b, 0x9f, 0x5b, 0x8b, 0x7c, 0x6d, 0x2d, 0xf2, 0xbd, 0xb5, 0xc8, + 0xfb, 0x8f, 0x55, 0x0a, 0x74, 0xf5, 0x85, 0x77, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x50, 0xdf, + 0x4a, 0x84, 0xce, 0x01, 0x00, 0x00, +} + func (m *KV) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -446,6 +486,12 @@ func (m *ManifestChange) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintPb(dAtA, i, uint64(m.Level)) } + if len(m.Checksum) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintPb(dAtA, i, uint64(len(m.Checksum))) + i += copy(dAtA[i:], m.Checksum) + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -546,6 +592,10 @@ func (m *ManifestChange) Size() (n int) { if m.Level != 0 { n += 1 + sovPb(uint64(m.Level)) } + l = len(m.Checksum) + if l > 0 { + n += 1 + l + sovPb(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1028,6 +1078,37 @@ func (m *ManifestChange) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Checksum", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthPb + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Checksum = append(m.Checksum[:0], dAtA[iNdEx:postIndex]...) + if m.Checksum == nil { + m.Checksum = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipPb(dAtA[iNdEx:]) @@ -1154,30 +1235,3 @@ var ( ErrInvalidLengthPb = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowPb = fmt.Errorf("proto: integer overflow") ) - -func init() { proto.RegisterFile("pb.proto", fileDescriptor_pb_7d6edc481ce0a5cd) } - -var fileDescriptor_pb_7d6edc481ce0a5cd = []byte{ - // 325 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0x4f, 0x4e, 0xfa, 0x40, - 0x14, 0x66, 0x86, 0x52, 0xe0, 0xfd, 0x7e, 0x92, 0xfa, 0x62, 0x4c, 0x13, 0xb5, 0x69, 0xea, 0x86, - 0x05, 0xe9, 0x02, 0x4f, 0x80, 0xd8, 0x05, 0x01, 0x42, 0x32, 0x12, 0xb6, 0xa4, 0x95, 0xa7, 0x36, - 0x60, 0x3b, 0x69, 0x87, 0x46, 0x8f, 0xe0, 0x05, 0x8c, 0x47, 0x72, 0xe9, 0x11, 0x0c, 0x5e, 0xc4, - 0x74, 0x00, 0x13, 0xe2, 0xee, 0xfb, 0xf7, 0xbe, 0xc5, 0xf7, 0xa0, 0x21, 0x23, 0x5f, 0x66, 0xa9, - 0x4a, 0x91, 0xcb, 0xc8, 0x7b, 0x63, 0xc0, 0x87, 0x33, 0xb4, 0xa0, 0xba, 0xa4, 0x17, 0x9b, 0xb9, - 0xac, 0xfd, 0x5f, 0x94, 0x10, 0x4f, 0xa0, 0x56, 0x84, 0xab, 0x35, 0xd9, 0x5c, 0x6b, 0x5b, 0x82, - 0x67, 0xd0, 0x5c, 0xe7, 0x94, 0xcd, 0x9f, 0x48, 0x85, 0x76, 0x55, 0x3b, 0x8d, 0x52, 0x18, 0x93, - 0x0a, 0xd1, 0x86, 0x7a, 0x41, 0x59, 0x1e, 0xa7, 0x89, 0x6d, 0xb8, 0xac, 0x6d, 0x88, 0x3d, 0xc5, - 0x0b, 0x00, 0x7a, 0x96, 0x71, 0x46, 0xf9, 0x3c, 0x54, 0x76, 0x4d, 0x9b, 0xcd, 0x9d, 0xd2, 0x53, - 0x88, 0x60, 0xe8, 0x42, 0x53, 0x17, 0x6a, 0xec, 0xb9, 0x60, 0x0e, 0x67, 0xa3, 0x38, 0x57, 0x78, - 0x0a, 0x7c, 0x59, 0xd8, 0xcc, 0xad, 0xb6, 0xff, 0x75, 0x4d, 0x5f, 0x46, 0xfe, 0x70, 0x26, 0xf8, - 0xb2, 0xf0, 0x7a, 0x70, 0x3c, 0x0e, 0x93, 0xf8, 0x9e, 0x72, 0xd5, 0x7f, 0x0c, 0x93, 0x07, 0xba, - 0x25, 0x85, 0x1d, 0xa8, 0xdf, 0x69, 0x92, 0xef, 0x2e, 0xb0, 0xbc, 0x38, 0xcc, 0x89, 0x7d, 0xc4, - 0x7b, 0x65, 0xd0, 0x3a, 0xf4, 0xb0, 0x05, 0x7c, 0xb0, 0xd0, 0x43, 0x18, 0x82, 0x0f, 0x16, 0xd8, - 0x01, 0x3e, 0x91, 0x7a, 0x84, 0x56, 0xf7, 0xfc, 0x6f, 0x97, 0x3f, 0x91, 0x94, 0x85, 0x2a, 0x4e, - 0x13, 0xc1, 0x27, 0xb2, 0x5c, 0x6d, 0x44, 0x05, 0xad, 0xf4, 0x36, 0x47, 0x62, 0x4b, 0xbc, 0x4b, - 0x68, 0xfe, 0xc6, 0x10, 0xc0, 0xec, 0x8b, 0xa0, 0x37, 0x0d, 0xac, 0x4a, 0x89, 0x6f, 0x82, 0x51, - 0x30, 0x0d, 0x2c, 0x76, 0x6d, 0x7d, 0x6c, 0x1c, 0xf6, 0xb9, 0x71, 0xd8, 0xd7, 0xc6, 0x61, 0xef, - 0xdf, 0x4e, 0x25, 0x32, 0xf5, 0x9b, 0xae, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0x8c, 0x0e, 0xe6, - 0x7c, 0xb2, 0x01, 0x00, 0x00, -} diff --git a/vendor/github.com/dgraph-io/badger/pb/pb.proto b/vendor/github.com/dgraph-io/badger/pb/pb.proto index aa8419c7a21..b790cf69bdd 100644 --- a/vendor/github.com/dgraph-io/badger/pb/pb.proto +++ b/vendor/github.com/dgraph-io/badger/pb/pb.proto @@ -43,6 +43,7 @@ message ManifestChange { CREATE = 0; DELETE = 1; } - Operation Op = 2; - uint32 Level = 3; // Only used for CREATE + Operation Op = 2; + uint32 Level = 3; // Only used for CREATE + bytes Checksum = 4; // Only used for CREATE } diff --git a/vendor/github.com/dgraph-io/badger/table/table.go b/vendor/github.com/dgraph-io/badger/table/table.go index e5a9c698748..4e57b91f43b 100644 --- a/vendor/github.com/dgraph-io/badger/table/table.go +++ b/vendor/github.com/dgraph-io/badger/table/table.go @@ -17,8 +17,8 @@ package table import ( - "bufio" "bytes" + "crypto/sha256" "encoding/binary" "fmt" "io" @@ -69,6 +69,8 @@ type Table struct { id uint64 // file id, part of filename bf bbloom.Bloom + + Checksum []byte } // IncrRef increments the refcount (having to do with whether the file should be deleted) @@ -115,7 +117,7 @@ func (b block) NewIterator() *blockIterator { // entry. Returns a table with one reference count on it (decrementing which may delete the file! // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before // deleting. -func OpenTable(fd *os.File, loadingMode options.FileLoadingMode) (*Table, error) { +func OpenTable(fd *os.File, mode options.FileLoadingMode, cksum []byte) (*Table, error) { fileInfo, err := fd.Stat() if err != nil { // It's OK to ignore fd.Close() errs in this function because we have only read @@ -134,26 +136,24 @@ func OpenTable(fd *os.File, loadingMode options.FileLoadingMode) (*Table, error) fd: fd, ref: 1, // Caller is given one reference. id: id, - loadingMode: loadingMode, + loadingMode: mode, } t.tableSize = int(fileInfo.Size()) - if loadingMode == options.MemoryMap { - t.mmap, err = y.Mmap(fd, false, fileInfo.Size()) - if err != nil { - _ = fd.Close() - return nil, y.Wrapf(err, "Unable to map file") - } - } else if loadingMode == options.LoadToRAM { - err = t.loadToRAM() - if err != nil { - _ = fd.Close() - return nil, y.Wrap(err) - } + // We first load to RAM, so we can read the index and do checksum. + if err := t.loadToRAM(); err != nil { + return nil, err } - - if err := t.readIndex(loadingMode); err != nil { + // Enforce checksum before we read index. Otherwise, if the file was + // truncated, we'd end up with panics in readIndex. + if len(cksum) > 0 && !bytes.Equal(t.Checksum, cksum) { + return nil, fmt.Errorf( + "CHECKSUM_MISMATCH: Table checksum does not match checksum in MANIFEST."+ + " NOT including table %s. This would lead to missing data."+ + "\n sha256 %x Expected\n sha256 %x Found\n", filename, cksum, t.Checksum) + } + if err := t.readIndex(); err != nil { return nil, y.Wrap(err) } @@ -170,6 +170,21 @@ func OpenTable(fd *os.File, loadingMode options.FileLoadingMode) (*Table, error) if it2.Valid() { t.biggest = it2.Key() } + + switch mode { + case options.LoadToRAM: + // No need to do anything. t.mmap is already filled. + case options.MemoryMap: + t.mmap, err = y.Mmap(fd, false, fileInfo.Size()) + if err != nil { + _ = fd.Close() + return nil, y.Wrapf(err, "Unable to map file") + } + case options.FileIO: + t.mmap = nil + default: + panic(fmt.Sprintf("Invalid loading mode: %v", mode)) + } return t, nil } @@ -203,7 +218,10 @@ func (t *Table) readNoFail(off int, sz int) []byte { return res } -func (t *Table) readIndex(loadingMode options.FileLoadingMode) error { +func (t *Table) readIndex() error { + if len(t.mmap) != t.tableSize { + panic("Table size does not match the read bytes") + } readPos := t.tableSize // Read bloom filter. @@ -243,39 +261,17 @@ func (t *Table) readIndex(loadingMode options.FileLoadingMode) error { t.blockIndex = append(t.blockIndex, ko) } - // Execute this index read serially, because all disks are orders of magnitude faster when read - // serially compared to executing random reads. + // Execute this index read serially, because we already have table data in memory. var h header - var offset int - var r *bufio.Reader - if loadingMode == options.LoadToRAM { - // We already read the table to put it into t.mmap. So, no point reading it again from disk. - // Instead use the read buffer. - r = bufio.NewReader(bytes.NewReader(t.mmap)) - } else { - if _, err := t.fd.Seek(0, io.SeekStart); err != nil { - return err - } - r = bufio.NewReader(t.fd) - } - hbuf := make([]byte, h.Size()) for idx := range t.blockIndex { ko := &t.blockIndex[idx] - if _, err := r.Discard(ko.offset - offset); err != nil { - return err - } - offset = ko.offset - if _, err := io.ReadFull(r, hbuf); err != nil { - return err - } - offset += len(hbuf) + + hbuf := t.readNoFail(ko.offset, h.Size()) h.Decode(hbuf) y.AssertTrue(h.plen == 0) - ko.key = make([]byte, h.klen) - if _, err := io.ReadFull(r, ko.key); err != nil { - return err - } - offset += len(ko.key) + + key := t.readNoFail(ko.offset+len(hbuf), int(h.klen)) + ko.key = append([]byte{}, key...) } return nil @@ -343,11 +339,17 @@ func NewFilename(id uint64, dir string) string { } func (t *Table) loadToRAM() error { + if _, err := t.fd.Seek(0, io.SeekStart); err != nil { + return err + } t.mmap = make([]byte, t.tableSize) - read, err := t.fd.ReadAt(t.mmap, 0) + sum := sha256.New() + tee := io.TeeReader(t.fd, sum) + read, err := tee.Read(t.mmap) if err != nil || read != t.tableSize { return y.Wrapf(err, "Unable to load file in memory. Table file: %s", t.Filename()) } + t.Checksum = sum.Sum(nil) y.NumReads.Add(1) y.NumBytesRead.Add(int64(read)) return nil diff --git a/vendor/github.com/dgraph-io/badger/test.sh b/vendor/github.com/dgraph-io/badger/test.sh index 2216ecbdf4a..e2df230eed5 100755 --- a/vendor/github.com/dgraph-io/badger/test.sh +++ b/vendor/github.com/dgraph-io/badger/test.sh @@ -17,8 +17,15 @@ go test -v --manual=true -run='TestTruncateVlogNoClose$' . truncate --size=4096 p/000000.vlog go test -v --manual=true -run='TestTruncateVlogNoClose2$' . go test -v --manual=true -run='TestTruncateVlogNoClose3$' . -rm -R p +rm -R p || true # Then the normal tests. +echo +echo "==> Starting tests with value log mmapped..." +sleep 5 go test -v --vlog_mmap=true -race ./... + +echo +echo "==> Starting tests with value log not mmapped..." +sleep 5 go test -v --vlog_mmap=false -race ./... diff --git a/vendor/github.com/dgraph-io/badger/txn.go b/vendor/github.com/dgraph-io/badger/txn.go index a284cae72d8..3fff2df7442 100644 --- a/vendor/github.com/dgraph-io/badger/txn.go +++ b/vendor/github.com/dgraph-io/badger/txn.go @@ -54,6 +54,9 @@ type oracle struct { // commits stores a key fingerprint and latest commit counter for it. // refCount is used to clear out commits map to avoid a memory blowup. commits map[uint64]uint64 + + // closer is used to stop watermarks. + closer *y.Closer } func newOracle(opt Options) *oracle { @@ -66,12 +69,17 @@ func newOracle(opt Options) *oracle { // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG. readMark: &y.WaterMark{Name: "badger.PendingReads"}, txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"}, + closer: y.NewCloser(2), } - orc.readMark.Init() - orc.txnMark.Init() + orc.readMark.Init(orc.closer) + orc.txnMark.Init(orc.closer) return orc } +func (o *oracle) Stop() { + o.closer.SignalAndWait() +} + func (o *oracle) addRef() { atomic.AddInt64(&o.refCount, 1) } @@ -608,13 +616,13 @@ func runTxnCallback(cb *txnCb) { switch { case cb == nil: panic("txn callback is nil") + case cb.user == nil: + panic("Must have caught a nil callback for txn.CommitWith") case cb.err != nil: cb.user(cb.err) case cb.commit != nil: err := cb.commit() cb.user(err) - case cb.user == nil: - panic("Must have caught a nil callback for txn.CommitWith") default: cb.user(nil) } diff --git a/vendor/github.com/dgraph-io/badger/y/watermark.go b/vendor/github.com/dgraph-io/badger/y/watermark.go index 55ffd265025..53fec89c76e 100644 --- a/vendor/github.com/dgraph-io/badger/y/watermark.go +++ b/vendor/github.com/dgraph-io/badger/y/watermark.go @@ -38,14 +38,15 @@ func (u *uint64Heap) Pop() interface{} { return x } -// mark contains raft proposal id and a done boolean. It is used to -// update the WaterMark struct about the status of a proposal. +// mark contains one of more indices, along with a done boolean to indicate the +// status of the index: begin or done. It also contains waiters, who could be +// waiting for the watermark to reach >= a certain index. type mark struct { // Either this is an (index, waiter) pair or (index, done) or (indices, done). index uint64 waiter chan struct{} indices []uint64 - done bool // Set to true if the pending mutation is done. + done bool // Set to true if the index is done. } // WaterMark is used to keep track of the minimum un-finished index. Typically, an index k becomes @@ -67,10 +68,10 @@ type WaterMark struct { } // Init initializes a WaterMark struct. MUST be called before using it. -func (w *WaterMark) Init() { +func (w *WaterMark) Init(closer *Closer) { w.markCh = make(chan mark, 100) w.elog = trace.NewEventLog("Watermark", w.Name) - go w.process() + go w.process(closer) } // Begin sets the last index to the given value. @@ -136,7 +137,9 @@ func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error { // if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it // can't decide whether the task at 101 has decided not to emit watermark or it didn't get // scheduled yet. -func (w *WaterMark) process() { +func (w *WaterMark) process(closer *Closer) { + defer closer.Done() + var indices uint64Heap // pending maps raft proposal index to the number of pending mutations for this proposal. pending := make(map[uint64]int) @@ -200,25 +203,30 @@ func (w *WaterMark) process() { } } - for mark := range w.markCh { - if mark.waiter != nil { - doneUntil := atomic.LoadUint64(&w.doneUntil) - if doneUntil >= mark.index { - close(mark.waiter) - } else { - ws, ok := waiters[mark.index] - if !ok { - waiters[mark.index] = []chan struct{}{mark.waiter} + for { + select { + case <-closer.HasBeenClosed(): + return + case mark := <-w.markCh: + if mark.waiter != nil { + doneUntil := atomic.LoadUint64(&w.doneUntil) + if doneUntil >= mark.index { + close(mark.waiter) } else { - waiters[mark.index] = append(ws, mark.waiter) + ws, ok := waiters[mark.index] + if !ok { + waiters[mark.index] = []chan struct{}{mark.waiter} + } else { + waiters[mark.index] = append(ws, mark.waiter) + } + } + } else { + if mark.index > 0 { + processOne(mark.index, mark.done) + } + for _, index := range mark.indices { + processOne(index, mark.done) } - } - } else { - if mark.index > 0 { - processOne(mark.index, mark.done) - } - for _, index := range mark.indices { - processOne(index, mark.done) } } } diff --git a/vendor/github.com/dgraph-io/badger/y/y.go b/vendor/github.com/dgraph-io/badger/y/y.go index 958fcbe0233..607883f9e14 100644 --- a/vendor/github.com/dgraph-io/badger/y/y.go +++ b/vendor/github.com/dgraph-io/badger/y/y.go @@ -48,6 +48,9 @@ var ( // CastagnoliCrcTable is a CRC32 polynomial table CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli) + + // Dummy channel for nil closers. + dummyCloserChan = make(chan struct{}) ) // OpenExistingFile opens an existing file, errors if it doesn't exist. @@ -203,11 +206,17 @@ func (lc *Closer) Signal() { // HasBeenClosed gets signaled when Signal() is called. func (lc *Closer) HasBeenClosed() <-chan struct{} { + if lc == nil { + return dummyCloserChan + } return lc.closed } // Done calls Done() on the WaitGroup. func (lc *Closer) Done() { + if lc == nil { + return + } lc.waiting.Done() } diff --git a/vendor/vendor.json b/vendor/vendor.json index 6cd3ffce0d0..27f5a159fe5 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -421,26 +421,26 @@ "revisionTime": "2016-09-07T16:21:46Z" }, { - "checksumSHA1": "z3e8O0yprygNDdmRVXdZrpy16gc=", + "checksumSHA1": "hQQMtRFYIExunOZvyHS6901UMOc=", "path": "github.com/dgraph-io/badger", - "revision": "b9e379e59eac9b828431a6c8f7a553ae38ab88eb", - "revisionTime": "2019-01-17T00:16:07Z", + "revision": "8115aed38f8f8cd248d832f51c27700efc25b201", + "revisionTime": "2019-02-26T22:53:17Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=", "path": "github.com/dgraph-io/badger/options", - "revision": "e1a4906e6aedf90948306a09c6d342f0032f9e67", - "revisionTime": "2019-01-16T00:23:15Z", + "revision": "8115aed38f8f8cd248d832f51c27700efc25b201", + "revisionTime": "2019-02-26T22:53:17Z", "version": "HEAD", "versionExact": "HEAD" }, { - "checksumSHA1": "+ntGoPo1TKIFyACmsyNfKzj96FQ=", + "checksumSHA1": "yu19DxVlBi/XuPl0SRV46bkBNUI=", "path": "github.com/dgraph-io/badger/pb", - "revision": "e1a4906e6aedf90948306a09c6d342f0032f9e67", - "revisionTime": "2019-01-16T00:23:15Z", + "revision": "8115aed38f8f8cd248d832f51c27700efc25b201", + "revisionTime": "2019-02-26T22:53:17Z", "version": "HEAD", "versionExact": "HEAD" }, @@ -455,24 +455,24 @@ { "checksumSHA1": "00T6XbLV4d95J7hm6kTXDReaQHM=", "path": "github.com/dgraph-io/badger/skl", - "revision": "e1a4906e6aedf90948306a09c6d342f0032f9e67", - "revisionTime": "2019-01-16T00:23:15Z", + "revision": "8115aed38f8f8cd248d832f51c27700efc25b201", + "revisionTime": "2019-02-26T22:53:17Z", "version": "HEAD", "versionExact": "HEAD" }, { - "checksumSHA1": "bDpfa+qiA0BBX+XUg1FQxHfSu7c=", + "checksumSHA1": "KQTB2ZpV6e7tMDz4Azl/EpmHWI8=", "path": "github.com/dgraph-io/badger/table", - "revision": "e1a4906e6aedf90948306a09c6d342f0032f9e67", - "revisionTime": "2019-01-16T00:23:15Z", + "revision": "8115aed38f8f8cd248d832f51c27700efc25b201", + "revisionTime": "2019-02-26T22:53:17Z", "version": "HEAD", "versionExact": "HEAD" }, { - "checksumSHA1": "gwZdaHFoXukrRe4v9p3PruPPyOs=", + "checksumSHA1": "te3wCrE1wZpTLwCNW2ZBaNpHb7s=", "path": "github.com/dgraph-io/badger/y", - "revision": "e1a4906e6aedf90948306a09c6d342f0032f9e67", - "revisionTime": "2019-01-16T00:23:15Z", + "revision": "8115aed38f8f8cd248d832f51c27700efc25b201", + "revisionTime": "2019-02-26T22:53:17Z", "version": "HEAD", "versionExact": "HEAD" }, @@ -983,14 +983,14 @@ { "checksumSHA1": "UxahDzW2v4mf/+aFxruuupaoIwo=", "path": "golang.org/x/net/internal/timeseries", - "revision": "915654e7eabcea33ae277abbecf52f0d8b7a9fdc", - "revisionTime": "2019-01-10T19:11:31Z" + "revision": "afe646ca25a4a688bba916cc14caef4b53c0b856", + "revisionTime": "2019-02-18T11:34:22Z" }, { "checksumSHA1": "HvmG9LfStMLF+hIC7xR4SxegMis=", "path": "golang.org/x/net/trace", - "revision": "915654e7eabcea33ae277abbecf52f0d8b7a9fdc", - "revisionTime": "2019-01-10T19:11:31Z" + "revision": "afe646ca25a4a688bba916cc14caef4b53c0b856", + "revisionTime": "2019-02-18T11:34:22Z" }, { "checksumSHA1": "REkmyB368pIiip76LiqMLspgCRk=", diff --git a/worker/config.go b/worker/config.go deleted file mode 100644 index e95877282e7..00000000000 --- a/worker/config.go +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package worker - -import "net" - -type IPRange struct { - Lower, Upper net.IP -} - -type Options struct { - ExportPath string - NumPendingProposals int - // TODO: Get rid of this here. - Tracing float64 - MyAddr string - ZeroAddr string - RaftId uint64 - ExpandEdge bool - WhiteListedIPRanges []IPRange - MaxRetries int - StrictMutations bool - AclEnabled bool -} - -var Config Options diff --git a/worker/draft.go b/worker/draft.go index 426a8ece087..972979cdf11 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -170,7 +170,24 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr // Ensures nothing get written to disk due to commit proposals. posting.Oracle().ResetTxns() schema.State().DeleteAll() - return posting.DeleteAll() + + if err := posting.DeleteAll(); err != nil { + return err + } + + if groups().groupId() == 1 { + initialSchema := schema.InitialSchema() + for _, s := range initialSchema { + if err := updateSchema(s.Predicate, *s); err != nil { + return err + } + + if !groups().ServesTablet(s.Predicate) { + return fmt.Errorf("Group 1 should always serve reserved predicate %s", + s.Predicate) + } + } + } } if proposal.Mutations.StartTs == 0 { @@ -484,7 +501,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { if txn == nil { return } - err := x.RetryUntilSuccess(Config.MaxRetries, 10*time.Millisecond, func() error { + err := x.RetryUntilSuccess(x.WorkerConfig.MaxRetries, 10*time.Millisecond, func() error { return txn.CommitToDisk(writer, commit) }) @@ -628,7 +645,7 @@ func (n *node) Run() { <-n.closer.HasBeenClosed() glog.Infof("Stopping node.Run") if peerId, has := groups().MyPeer(); has && n.AmLeader() { - n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId) + n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) time.Sleep(time.Second) // Let transfer happen. } n.Raft().Stop() diff --git a/worker/export.go b/worker/export.go index 5468b7b095e..357cce05d0c 100644 --- a/worker/export.go +++ b/worker/export.go @@ -251,7 +251,7 @@ func export(ctx context.Context, in *pb.ExportRequest) error { glog.Infof("Running export for group %d at timestamp %d.", in.GroupId, in.ReadTs) uts := time.Unix(in.UnixTs, 0) - bdir := path.Join(Config.ExportPath, fmt.Sprintf( + bdir := path.Join(x.WorkerConfig.ExportPath, fmt.Sprintf( "dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504"))) if err := os.MkdirAll(bdir, 0700); err != nil { diff --git a/worker/export_test.go b/worker/export_test.go index 60fbaf74fdd..599148f2ecb 100644 --- a/worker/export_test.go +++ b/worker/export_test.go @@ -111,7 +111,7 @@ func TestExport(t *testing.T) { time.Sleep(1 * time.Second) // We have 4 friend type edges. FP("friends")%10 = 2. - Config.ExportPath = bdir + x.WorkerConfig.ExportPath = bdir readTs := timestamp() // Do the following so export won't block forever for readTs. posting.Oracle().ProcessDelta(&pb.OracleDelta{MaxAssigned: readTs}) diff --git a/worker/groups.go b/worker/groups.go index 59f6e9e7f25..982ea44af42 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -72,32 +72,32 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { } gr.ctx, gr.cancel = context.WithCancel(context.Background()) - if len(Config.MyAddr) == 0 { - Config.MyAddr = fmt.Sprintf("localhost:%d", workerPort()) + if len(x.WorkerConfig.MyAddr) == 0 { + x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", workerPort()) } else { // check if address is valid or not - ok := x.ValidateAddress(Config.MyAddr) - x.AssertTruef(ok, "%s is not valid address", Config.MyAddr) + ok := x.ValidateAddress(x.WorkerConfig.MyAddr) + x.AssertTruef(ok, "%s is not valid address", x.WorkerConfig.MyAddr) if !bindall { glog.Errorln("--my flag is provided without bindall, Did you forget to specify bindall?") } } - x.AssertTruef(len(Config.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.") - x.AssertTruef(Config.ZeroAddr != Config.MyAddr, + x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.") + x.AssertTruef(x.WorkerConfig.ZeroAddr != x.WorkerConfig.MyAddr, "Dgraph Zero address and Dgraph address (IP:Port) can't be the same.") - if Config.RaftId == 0 { + if x.WorkerConfig.RaftId == 0 { id, err := raftwal.RaftId(walStore) x.Check(err) - Config.RaftId = id + x.WorkerConfig.RaftId = id } - glog.Infof("Current Raft Id: %#x\n", Config.RaftId) + glog.Infof("Current Raft Id: %#x\n", x.WorkerConfig.RaftId) // Successfully connect with dgraphzero, before doing anything else. // Connect with Zero leader and figure out what group we should belong to. - m := &pb.Member{Id: Config.RaftId, Addr: Config.MyAddr} + m := &pb.Member{Id: x.WorkerConfig.RaftId, Addr: x.WorkerConfig.MyAddr} var connState *pb.ConnectionState var err error for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 @@ -116,8 +116,8 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { x.Fatalf("Unable to join cluster via dgraphzero") } glog.Infof("Connected to group zero. Assigned group: %+v\n", connState.GetMember().GetGroupId()) - Config.RaftId = connState.GetMember().GetId() - glog.Infof("Raft Id after connection to Zero: %#x\n", Config.RaftId) + x.WorkerConfig.RaftId = connState.GetMember().GetId() + glog.Infof("Raft Id after connection to Zero: %#x\n", x.WorkerConfig.RaftId) // This timestamp would be used for reading during snapshot after bulk load. // The stream is async, we need this information before we start or else replica might @@ -128,8 +128,8 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { gr.triggerCh = make(chan struct{}, 1) // Initialize DiskStorage and pass it along. - store := raftwal.Init(walStore, Config.RaftId, gid) - gr.Node = newNode(store, gid, Config.RaftId, Config.MyAddr) + store := raftwal.Init(walStore, x.WorkerConfig.RaftId, gid) + gr.Node = newNode(store, gid, x.WorkerConfig.RaftId, x.WorkerConfig.MyAddr) x.Checkf(schema.LoadFromDb(), "Error while initializing schema") raftServer.Node = gr.Node.Node @@ -170,47 +170,9 @@ func (g *groupi) informZeroAboutTablets() { } func (g *groupi) proposeInitialSchema() { - // propose the schema for _predicate_ - if Config.ExpandEdge { - g.upsertSchema(&pb.SchemaUpdate{ - Predicate: x.PredicateListAttr, - ValueType: pb.Posting_STRING, - List: true, - }) - } - - g.upsertSchema(&pb.SchemaUpdate{ - Predicate: "type", - ValueType: pb.Posting_STRING, - Directive: pb.SchemaUpdate_INDEX, - Tokenizer: []string{"exact"}, - }) - - if Config.AclEnabled { - // propose the schema update for acl predicates - g.upsertSchema(&pb.SchemaUpdate{ - Predicate: "dgraph.xid", - ValueType: pb.Posting_STRING, - Directive: pb.SchemaUpdate_INDEX, - Upsert: true, - Tokenizer: []string{"exact"}, - }) - - g.upsertSchema(&pb.SchemaUpdate{ - Predicate: "dgraph.password", - ValueType: pb.Posting_PASSWORD, - }) - - g.upsertSchema(&pb.SchemaUpdate{ - Predicate: "dgraph.user.group", - Directive: pb.SchemaUpdate_REVERSE, - ValueType: pb.Posting_UID, - List: true, - }) - g.upsertSchema(&pb.SchemaUpdate{ - Predicate: "dgraph.group.acl", - ValueType: pb.Posting_STRING, - }) + initialSchema := schema.InitialSchema() + for _, s := range initialSchema { + g.upsertSchema(s) } } @@ -289,11 +251,11 @@ func (g *groupi) applyState(state *pb.MembershipState) { g.tablets = make(map[string]*pb.Tablet) for gid, group := range g.state.Groups { for _, member := range group.Members { - if Config.RaftId == member.Id { + if x.WorkerConfig.RaftId == member.Id { foundSelf = true atomic.StoreUint32(&g.gid, gid) } - if Config.MyAddr != member.Addr { + if x.WorkerConfig.MyAddr != member.Addr { conn.Get().Connect(member.Addr) } } @@ -305,7 +267,7 @@ func (g *groupi) applyState(state *pb.MembershipState) { } } for _, member := range g.state.Zeros { - if Config.MyAddr != member.Addr { + if x.WorkerConfig.MyAddr != member.Addr { conn.Get().Connect(member.Addr) } } @@ -554,7 +516,7 @@ func (g *groupi) connToZeroLeader() *conn.Pool { } pl := g.AnyServer(0) if pl == nil { - pl = conn.Get().Connect(Config.ZeroAddr) + pl = conn.Get().Connect(x.WorkerConfig.ZeroAddr) } if pl == nil { glog.V(1).Infof("No healthy Zero server found. Retrying...") @@ -572,9 +534,9 @@ func (g *groupi) connToZeroLeader() *conn.Pool { func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error { leader := g.Node.AmLeader() member := &pb.Member{ - Id: Config.RaftId, + Id: x.WorkerConfig.RaftId, GroupId: g.groupId(), - Addr: Config.MyAddr, + Addr: x.WorkerConfig.MyAddr, Leader: leader, LastUpdate: uint64(time.Now().Unix()), } diff --git a/worker/mutation.go b/worker/mutation.go index 2900fa33fb4..fc55cb75262 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -146,7 +146,7 @@ func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, start return rebuild.Run(ctx) } -// We commit schema to disk in blocking way, should be ok because this happens +// updateSchema commits the schema to disk in blocking way, should be ok because this happens // only during schema mutations or we see a new predicate. func updateSchema(attr string, s pb.SchemaUpdate) error { schema.State().Set(attr, s) @@ -507,7 +507,7 @@ func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext, m *pb.Mutations) error { - if Config.StrictMutations { + if x.WorkerConfig.StrictMutations { for _, edge := range m.Edges { if _, err := schema.State().TypeOf(edge.Attr); err != nil { return err diff --git a/worker/worker.go b/worker/worker.go index 55b2d525f5e..eb34533d83f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -53,7 +53,7 @@ func workerPort() int { func Init(ps *badger.DB) { pstore = ps // needs to be initialized after group config - pendingProposals = make(chan struct{}, Config.NumPendingProposals) + pendingProposals = make(chan struct{}, x.WorkerConfig.NumPendingProposals) workerServer = grpc.NewServer( grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), diff --git a/x/config.go b/x/config.go index b00d964207d..80ad3e48260 100644 --- a/x/config.go +++ b/x/config.go @@ -16,6 +16,8 @@ package x +import "net" + type Options struct { DebugMode bool PortOffset int @@ -23,3 +25,24 @@ type Options struct { } var Config Options + +type IPRange struct { + Lower, Upper net.IP +} + +type WorkerOptions struct { + ExportPath string + NumPendingProposals int + // TODO: Get rid of this here. + Tracing float64 + MyAddr string + ZeroAddr string + RaftId uint64 + ExpandEdge bool + WhiteListedIPRanges []IPRange + MaxRetries int + StrictMutations bool + AclEnabled bool +} + +var WorkerConfig WorkerOptions diff --git a/x/keys.go b/x/keys.go index 7599bd72649..c6018ae3d36 100644 --- a/x/keys.go +++ b/x/keys.go @@ -311,14 +311,3 @@ func IsReservedPredicate(pred string) bool { _, ok := m[strings.ToLower(pred)] return ok } - -func IsAclPredicate(pred string) bool { - var m = map[string]struct{}{ - "dgraph.xid": {}, - "dgraph.password": {}, - "dgraph.user.group": {}, - "dgraph.group.acl": {}, - } - _, ok := m[pred] - return ok -}