Skip to content

Commit

Permalink
Introduce DropPrefix API into Dgraph (#3060)
Browse files Browse the repository at this point in the history
* Introduce Badger's new DropPrefix API into Dgraph
* Simplify DeleteAll logic. DeleteAll logic now just calls DropAll. The logic in draft.go takes care of readding the reserved predicates.
  • Loading branch information
martinmr authored Feb 27, 2019
1 parent adfc7c5 commit ddb0d76
Show file tree
Hide file tree
Showing 40 changed files with 985 additions and 553 deletions.
2 changes: 1 addition & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
peers: make(map[uint64]string),
requestCh: make(chan linReadReq),
}
n.Applied.Init()
n.Applied.Init(nil)
// This should match up to the Applied index set above.
n.Applied.SetDoneUntil(n.Cfg.Applied)
glog.Infof("Setting raft.Config to: %+v\n", n.Cfg)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func ipInIPWhitelistRanges(ipString string) bool {
return false
}

for _, ipRange := range worker.Config.WhiteListedIPRanges {
for _, ipRange := range x.WorkerConfig.WhiteListedIPRanges {
if bytes.Compare(ip, ipRange.Lower) >= 0 && bytes.Compare(ip, ipRange.Upper) <= 0 {
return true
}
Expand Down
18 changes: 9 additions & 9 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ func setupCustomTokenizers() {
// and returns a slice of []IPRange.
//
// e.g. "144.142.126.222:144.142.126.244,144.142.126.254,192.168.0.0/16,host.docker.internal"
func getIPsFromString(str string) ([]worker.IPRange, error) {
func getIPsFromString(str string) ([]x.IPRange, error) {
if str == "" {
return []worker.IPRange{}, nil
return []x.IPRange{}, nil
}

var ipRanges []worker.IPRange
var ipRanges []x.IPRange
rangeStrings := strings.Split(str, ",")

for _, s := range rangeStrings {
Expand All @@ -201,15 +201,15 @@ func getIPsFromString(str string) ([]worker.IPRange, error) {
// or IPv6 address like fd03:b188:0f3c:9ec4::babe:face
ipAddr := net.ParseIP(s)
if ipAddr != nil {
ipRanges = append(ipRanges, worker.IPRange{Lower: ipAddr, Upper: ipAddr})
ipRanges = append(ipRanges, x.IPRange{Lower: ipAddr, Upper: ipAddr})
} else {
ipAddrs, err := net.LookupIP(s)
if err != nil {
return nil, fmt.Errorf("invalid IP address or hostname: %s", s)
}

for _, addr := range ipAddrs {
ipRanges = append(ipRanges, worker.IPRange{Lower: addr, Upper: addr})
ipRanges = append(ipRanges, x.IPRange{Lower: addr, Upper: addr})
}
}
} else {
Expand All @@ -226,7 +226,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) {
rangeHi[addrLen-i] |= ^network.Mask[maskLen-i]
}

ipRanges = append(ipRanges, worker.IPRange{Lower: rangeLo, Upper: rangeHi})
ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi})
}
case len(tuple) == 2:
// string is range like a.b.c.d:w.x.y.z
Expand All @@ -239,7 +239,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) {
} else if bytes.Compare(rangeLo, rangeHi) > 0 {
return nil, fmt.Errorf("inverted IP address range: %s", s)
} else {
ipRanges = append(ipRanges, worker.IPRange{Lower: rangeLo, Upper: rangeHi})
ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi})
}
default:
return nil, fmt.Errorf("invalid IP address range: %s", s)
Expand Down Expand Up @@ -482,7 +482,7 @@ func run() {

ips, err := getIPsFromString(Alpha.Conf.GetString("whitelist"))
x.Check(err)
worker.Config = worker.Options{
x.WorkerConfig = x.WorkerOptions{
ExportPath: Alpha.Conf.GetString("export"),
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Tracing: Alpha.Conf.GetFloat64("trace"),
Expand Down Expand Up @@ -519,7 +519,7 @@ func run() {
}
}
otrace.ApplyConfig(otrace.Config{
DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)})
DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Tracing)})

// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1674,7 +1674,7 @@ func TestTypeMutationAndQuery(t *testing.T) {
}

func TestIPStringParsing(t *testing.T) {
var addrRange []worker.IPRange
var addrRange []x.IPRange
var err error

addrRange, err = getIPsFromString("144.142.126.222:144.142.126.244")
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (o *Oracle) Init() {
o.keyCommit = make(map[string]uint64)
o.subscribers = make(map[int]chan *pb.OracleDelta)
o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
o.doneUntil.Init()
o.doneUntil.Init(nil)
go o.sendDeltasToSubscribers()
}

Expand Down
11 changes: 7 additions & 4 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,13 @@ func (s *Server) ShouldServe(
var proposal pb.ZeroProposal
// Multiple Groups might be assigned to same tablet, so during proposal we will check again.
tablet.Force = false
if x.IsAclPredicate(tablet.Predicate) {
// force all the acl predicates to be allocated to group 1
// this is to make it eaiser to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all ACL predicates
if x.IsReservedPredicate(tablet.Predicate) {
// Force all the reserved predicates to be allocated to group 1.
// This is to make it eaiser to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all
// ACL predicates.
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
tablet.GroupId = 1
}
proposal.Tablet = tablet
Expand Down
7 changes: 3 additions & 4 deletions edgraph/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
)

Expand Down Expand Up @@ -89,9 +88,9 @@ func setConfVar(conf Options) {
x.Conf.Set("allotted_memory", newFloat(conf.AllottedMemory))

// Set some vars from worker.Config.
x.Conf.Set("tracing", newFloat(worker.Config.Tracing))
x.Conf.Set("num_pending_proposals", newInt(worker.Config.NumPendingProposals))
x.Conf.Set("expand_edge", newIntFromBool(worker.Config.ExpandEdge))
x.Conf.Set("tracing", newFloat(x.WorkerConfig.Tracing))
x.Conf.Set("num_pending_proposals", newInt(x.WorkerConfig.NumPendingProposals))
x.Conf.Set("expand_edge", newIntFromBool(x.WorkerConfig.ExpandEdge))
}

func SetConfiguration(newConfig Options) {
Expand Down
65 changes: 7 additions & 58 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,6 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
})
}

func deleteAllEntries(prefix []byte) error {
return deleteEntries(prefix, func(key []byte) bool {
return true
})
}

// deleteAllTokens deletes the index for the given attribute. All tokenizers are
// used by this function.
func deleteAllTokens(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteAllEntries(prefix)
}

// deleteTokensFor deletes the index for the given attribute and token.
func deleteTokensFor(attr, tokenizerName string) error {
pk := x.ParsedKey{Attr: attr}
Expand All @@ -469,22 +455,21 @@ func deleteTokensFor(attr, tokenizerName string) error {
}
prefix = append(prefix, tokenizer.Identifier())

return deleteAllEntries(prefix)
return pstore.DropPrefix(prefix)
}

func deleteReverseEdges(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.ReversePrefix()
return deleteAllEntries(prefix)
return pstore.DropPrefix(prefix)
}

func deleteCountIndex(attr string) error {
pk := x.ParsedKey{Attr: attr}
if err := deleteAllEntries(pk.CountPrefix(false)); err != nil {
if err := pstore.DropPrefix(pk.CountPrefix(false)); err != nil {
return err
}

return deleteAllEntries(pk.CountPrefix(true))
return pstore.DropPrefix(pk.CountPrefix(true))
}

// Index rebuilding logic here.
Expand Down Expand Up @@ -983,52 +968,16 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

// DeleteAll deletes all entries in the posting list.
func DeleteAll() error {
return deleteEntries(nil, func(key []byte) bool {
pk := x.Parse(key)
if pk == nil {
return true
} else if pk.IsSchema() {
// Don't delete schema for _predicate_
return !x.IsReservedPredicate(pk.Attr)
}
return true
})
return pstore.DropAll()
}

// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string) error {
glog.Infof("Dropping predicate: [%s]", attr)
pk := x.ParsedKey{
Attr: attr,
}
prefix := pk.DataPrefix()
// Delete all data postings for the given predicate.
err := deleteEntries(prefix, func(key []byte) bool {
return true
})
if err != nil {
prefix := x.PredicatePrefix(attr)
if err := pstore.DropPrefix(prefix); err != nil {
return err
}

// TODO - We will still have the predicate present in <uid, _predicate_> posting lists.
indexed := schema.State().IsIndexed(attr)
reversed := schema.State().IsReversed(attr)
if indexed {
if err := deleteAllTokens(attr); err != nil {
return err
}
} else if reversed {
if err := deleteReverseEdges(attr); err != nil {
return err
}
}

hasCountIndex := schema.State().HasCount(attr)
if hasCountIndex {
if err := deleteCountIndex(attr); err != nil {
return err
}
}

return schema.State().Delete(attr)
}
9 changes: 9 additions & 0 deletions query/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func setSchema(schema string) {
}
}

func dropPredicate(pred string) {
err := client.Alter(context.Background(), &api.Operation{
DropAttr: pred,
})
if err != nil {
panic(fmt.Sprintf("Could not drop predicate. Got error %v", err.Error()))
}
}

func processQuery(t *testing.T, ctx context.Context, query string) (string, error) {
txn := client.NewTxn()
defer txn.Discard(ctx)
Expand Down
4 changes: 2 additions & 2 deletions query/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ import (
)

func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) {
if worker.Config.ExpandEdge {
if x.WorkerConfig.ExpandEdge {
edges, err := expandEdges(ctx, m)
if err != nil {
return nil, x.Wrapf(err, "While adding pb.edges")
}
m.Edges = edges
} else {
for _, mu := range m.Edges {
if mu.Attr == x.Star && !worker.Config.ExpandEdge {
if mu.Attr == x.Star && !x.WorkerConfig.ExpandEdge {
return nil, x.Errorf("Expand edge (--expand_edge) is set to false." +
" Cannot perform S * * deletion.")
}
Expand Down
4 changes: 2 additions & 2 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i]

if !worker.Config.ExpandEdge && child.Attr == "_predicate_" {
if !x.WorkerConfig.ExpandEdge && child.Attr == "_predicate_" {
return out,
x.Errorf("Cannot ask for _predicate_ when ExpandEdge(--expand_edge) is false.")
}
Expand All @@ -1862,7 +1862,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
continue
}

if !worker.Config.ExpandEdge {
if !x.WorkerConfig.ExpandEdge {
return out,
x.Errorf("Cannot run expand() query when ExpandEdge(--expand_edge) is false.")
}
Expand Down
Loading

0 comments on commit ddb0d76

Please sign in to comment.