Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force move a predicate aborting pending transactions. #2215

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,11 @@ func tryAbortTransactions(startTimestamps []uint64) {
for i, startTs := range startTimestamps {
tctx := &api.TxnContext{StartTs: startTs, CommitTs: commitTimestamps[i]}
_, err := commitOrAbort(context.Background(), tctx)
for err != nil {
// Transaction could already have been aborted in which case it would be deleted from the
// transactions map and we should just continue.
// TODO - Make sure all other errors are transient, we don't want to be stuck in an infinite
// loop.
for err != nil && err != posting.ErrInvalidTxn {
// This will fail only due to badger error.
_, err = commitOrAbort(context.Background(), tctx)
}
Expand Down
4 changes: 1 addition & 3 deletions worker/predicate_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/y"
)

var (
Expand Down Expand Up @@ -288,8 +287,7 @@ func (w *grpcWorker) MovePredicate(ctx context.Context,
return pk.Attr == in.Predicate
})
if len(tctxs) > 0 {
go tryAbortTransactions(tctxs)
return &emptyPayload, y.ErrConflict
tryAbortTransactions(tctxs)
}
// We iterate over badger, so need to flush and wait for sync watermark to catch up.
n.applyAllMarks(ctx)
Expand Down
18 changes: 11 additions & 7 deletions worker/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ func (s *scheduler) waitForConflictResolution(attr string) {
tryAbortTransactions(tctxs)
}

func updateTxnMarks(raftIndex uint64, startTs uint64) *posting.Txn {
txn := &posting.Txn{
StartTs: startTs,
Indices: []uint64{raftIndex},
}
return posting.Txns().PutOrMergeIndex(txn)
}

// We don't support schema mutations across nodes in a transaction.
// Wait for all transactions to either abort or complete and all write transactions
// involving the predicate are aborted until schema mutations are done.
Expand Down Expand Up @@ -174,8 +182,8 @@ func (s *scheduler) schedule(proposal *intern.Proposal, index uint64) (err error
schemaMap := make(map[string]types.TypeID)
for _, edge := range proposal.Mutations.Edges {
if tablet := groups().Tablet(edge.Attr); tablet != nil && tablet.ReadOnly {
err = errPredicateMoving
return
updateTxnMarks(index, proposal.Mutations.StartTs)
return errPredicateMoving
}
if edge.Entity == 0 && bytes.Equal(edge.Value, []byte(x.Star)) {
// We should only have one edge drop in one mutation call.
Expand Down Expand Up @@ -216,11 +224,7 @@ func (s *scheduler) schedule(proposal *intern.Proposal, index uint64) (err error

m := proposal.Mutations
pctx := s.n.props.pctx(proposal.Id)
txn := &posting.Txn{
StartTs: m.StartTs,
Indices: []uint64{index},
}
pctx.txn = posting.Txns().PutOrMergeIndex(txn)
pctx.txn = updateTxnMarks(index, m.StartTs)
for _, edge := range m.Edges {
t := &task{
rid: index,
Expand Down