Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push Txns & Resolve Intents on Conflicts #72

Merged
merged 3 commits into from
Sep 21, 2014
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (rh *ResponseHeader) Verify(req Request) error {
return nil
}

// GoError converts the Error field of the response header to a
// GenericError.
// GoError returns the non-nil error from the proto.Error union.
func (rh *ResponseHeader) GoError() error {
if rh.Error == nil {
return nil
Expand All @@ -74,6 +73,10 @@ func (rh *ResponseHeader) GoError() error {
return rh.Error.TransactionStatus
case rh.Error.TransactionRetry != nil:
return rh.Error.TransactionRetry
case rh.Error.WriteIntent != nil:
return rh.Error.WriteIntent
case rh.Error.WriteTooOld != nil:
return rh.Error.WriteTooOld
default:
return nil
}
Expand All @@ -97,6 +100,10 @@ func (rh *ResponseHeader) SetGoError(err error) {
rh.Error = &Error{TransactionStatus: t}
case *TransactionRetryError:
rh.Error = &Error{TransactionRetry: t}
case *WriteIntentError:
rh.Error = &Error{WriteIntent: t}
case *WriteTooOldError:
rh.Error = &Error{WriteTooOld: t}
default:
var canRetry bool
if r, ok := err.(util.Retryable); ok {
Expand Down
18 changes: 15 additions & 3 deletions proto/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ func NewRangeKeyMismatchError(start, end []byte, meta *RangeMetadata) *RangeKeyM
// Error formats error.
func (e *RangeKeyMismatchError) Error() string {
if e.Range != nil {
return fmt.Sprintf("key range %q-%q outside of bounds of range %q: %q-%q",
string(e.RequestStartKey), string(e.RequestEndKey),
string(e.Range.RangeID),
return fmt.Sprintf("key range %q-%q outside of bounds of range %d: %q-%q",
string(e.RequestStartKey), string(e.RequestEndKey), e.Range.RangeID,
string(e.Range.StartKey), string(e.Range.EndKey))
}
return fmt.Sprintf("key range %q-%q could not be located within a range on store",
Expand Down Expand Up @@ -104,3 +103,16 @@ func (e *TransactionRetryError) Error() string {
func (e *TransactionRetryError) CanRetry() bool {
return true
}

// Error formats error.
func (e *WriteIntentError) Error() string {
return fmt.Sprintf("conflicting write intent at key %q from transaction %+v", e.Key, e.Txn)
}

// Error formats error.
func (e *WriteTooOldError) Error() string {
if e.Txn != nil {
return fmt.Sprintf("cannot write with a timestamp older than %+v, or older txn epoch: %+v", e.Timestamp, e.Txn)
}
return fmt.Sprintf("cannot write with a timestamp older than %+v", e.Timestamp)
}
34 changes: 29 additions & 5 deletions proto/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ message NotLeaderError {
optional Replica leader = 1 [(gogoproto.nullable) = false];
}

// RangeNotFoundError indicates that a command was sent to a range
// A RangeNotFoundError indicates that a command was sent to a range
// which is not hosted on this store.
message RangeNotFoundError {
optional int64 range_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "RangeID"];
}

// RangeKeyMismatchError indicates that a command was sent to a range which did
// not contain the key(s) specified by the command.
// A RangeKeyMismatchError indicates that a command was sent to a
// range which did not contain the key(s) specified by the command.
message RangeKeyMismatchError {
optional bytes request_start_key = 1 [(gogoproto.nullable) = false];
optional bytes request_end_key = 2 [(gogoproto.nullable) = false];
optional RangeMetadata range = 3;
}

// TransactionStatusError indicates that the transaction status is
// A TransactionStatusError indicates that the transaction status is
// incompatible with the requested operation. This means the
// transaction has already either been committed or aborted. It might
// also be the case that the request to modify the transaction failed
Expand All @@ -59,13 +59,34 @@ message TransactionStatusError {
optional string msg = 2 [(gogoproto.nullable) = false];
}

// TransactionRetryError indicates that the transaction must be
// A TransactionRetryError indicates that the transaction must be
// retried, usually with an increased transaction timestamp. The
// transaction struct to use is returned with the error.
message TransactionRetryError {
optional Transaction txn = 1 [(gogoproto.nullable) = false];
}

// A WriteIntentError indicates that a write intent belonging to
// another transaction was encountered leading to a read/write or
// write/write conflict. The Key at which the intent was encountered
// is set, as is the Txn record for the intent's transaction.
// Resolved is set if the intent was successfully resolved, meaning
// the client may retry the operation immediately. If Resolved is
// false, the client should back off and retry.
message WriteIntentError {
optional bytes key = 1 [(gogoproto.nullable) = false];
optional Transaction txn = 2 [(gogoproto.nullable) = false];
optional bool resolved = 3 [(gogoproto.nullable) = false];
}

// A WriteTooOldError indicates that a write encountered a versioned
// value newer than its timestamp, making it impossible to rewrite
// history. The write should be retried at the Timestamp + 1.
message WriteTooOldError {
optional Timestamp timestamp = 1 [(gogoproto.nullable) = false];
optional Transaction txn = 2;
}

// Error is a union type containing all available errors.
// NOTE: new error types must be added here.
message Error {
Expand All @@ -75,4 +96,7 @@ message Error {
optional RangeKeyMismatchError range_key_mismatch = 4;
optional TransactionStatusError transaction_status = 5;
optional TransactionRetryError transaction_retry = 6;
optional WriteIntentError write_intent = 7;
optional WriteTooOldError write_too_old = 8;
}

44 changes: 11 additions & 33 deletions storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,6 @@ type MVCC struct {
engine Engine // The underlying key-value store
}

// writeIntentError is a trivial implementation of error.
type writeIntentError struct {
Txn *proto.Transaction
}

type writeTooOldError struct {
Timestamp proto.Timestamp
Txn *proto.Transaction
}

func (e *writeIntentError) Error() string {
return fmt.Sprintf("there exists a write intent from transaction %+v", e.Txn)
}

func (e *writeTooOldError) Error() string {
if e.Txn != nil {
return fmt.Sprintf("cannot write with a timestamp older than %+v, or older txn epoch: %+v", e.Timestamp, e.Txn)
}
return fmt.Sprintf("cannot write with a timestamp older than %+v", e.Timestamp)
}

// NewMVCC returns a new instance of MVCC.
func NewMVCC(engine Engine) *MVCC {
return &MVCC{
Expand Down Expand Up @@ -129,7 +108,7 @@ func (mvcc *MVCC) Get(key Key, timestamp proto.Timestamp, txn *proto.Transaction
var valBytes []byte
if !timestamp.Less(meta.Timestamp) {
if meta.Txn != nil && (txn == nil || !bytes.Equal(meta.Txn.ID, txn.ID)) {
return nil, &writeIntentError{Txn: meta.Txn}
return nil, &proto.WriteIntentError{Key: key, Txn: *meta.Txn}
}

latestKey := mvccEncodeKey(binKey, meta.Timestamp)
Expand Down Expand Up @@ -169,30 +148,29 @@ func (mvcc *MVCC) Get(key Key, timestamp proto.Timestamp, txn *proto.Transaction
// We assume the range will check for an existing write intent before
// executing any Put action at the MVCC level.
func (mvcc *MVCC) Put(key Key, timestamp proto.Timestamp, value proto.Value, txn *proto.Transaction) error {
binKey := encoding.EncodeBinary(nil, key)
if value.Timestamp != nil && !value.Timestamp.Equal(timestamp) {
return util.Errorf(
"the timestamp %+v provided in value does not match the timestamp %+v in request",
value.Timestamp, timestamp)
}
return mvcc.putInternal(binKey, timestamp, proto.MVCCValue{Value: &value}, txn)
return mvcc.putInternal(key, timestamp, proto.MVCCValue{Value: &value}, txn)
}

// Delete marks the key deleted and will not return in the next get response.
func (mvcc *MVCC) Delete(key Key, timestamp proto.Timestamp, txn *proto.Transaction) error {
binKey := encoding.EncodeBinary(nil, key)
return mvcc.putInternal(binKey, timestamp, proto.MVCCValue{Deleted: true}, txn)
return mvcc.putInternal(key, timestamp, proto.MVCCValue{Deleted: true}, txn)
}

// putInternal adds a new timestamped value to the specified key.
// If value is nil, creates a deletion tombstone value.
func (mvcc *MVCC) putInternal(key Key, timestamp proto.Timestamp, value proto.MVCCValue, txn *proto.Transaction) error {
binKey := encoding.EncodeBinary(nil, key)
if value.Value != nil && value.Value.Bytes != nil && value.Value.Integer != nil {
return util.Errorf("key %q value contains both a byte slice and an integer value: %+v", key, value)
}

meta := &proto.MVCCMetadata{}
ok, err := GetProto(mvcc.engine, key, meta)
ok, err := GetProto(mvcc.engine, binKey, meta)
if err != nil {
return err
}
Expand All @@ -207,7 +185,7 @@ func (mvcc *MVCC) putInternal(key Key, timestamp proto.Timestamp, value proto.MV
// This should not happen since range should check the existing
// write intent before executing any Put action at MVCC level.
if meta.Txn != nil && (txn == nil || !bytes.Equal(meta.Txn.ID, txn.ID)) {
return &writeIntentError{Txn: meta.Txn}
return &proto.WriteIntentError{Key: key, Txn: *meta.Txn}
}

// We can update the current metadata only if both the timestamp
Expand All @@ -217,10 +195,10 @@ func (mvcc *MVCC) putInternal(key Key, timestamp proto.Timestamp, value proto.MV
if !timestamp.Less(meta.Timestamp) && (meta.Txn == nil || txn.Epoch >= meta.Txn.Epoch) {
// If this is an intent and timestamps have changed, need to remove old version.
if meta.Txn != nil && !timestamp.Equal(meta.Timestamp) {
batch = append(batch, BatchDelete(mvccEncodeKey(key, meta.Timestamp)))
batch = append(batch, BatchDelete(mvccEncodeKey(binKey, meta.Timestamp)))
}
meta = &proto.MVCCMetadata{Txn: txn, Timestamp: timestamp}
batchPut, err := MakeBatchPutProto(key, meta)
batchPut, err := MakeBatchPutProto(binKey, meta)
if err != nil {
return err
}
Expand All @@ -229,12 +207,12 @@ func (mvcc *MVCC) putInternal(key Key, timestamp proto.Timestamp, value proto.MV
// In case we receive a Put request to update an old version,
// it must be an error since raft should handle any client
// retry from timeout.
return &writeTooOldError{Timestamp: meta.Timestamp, Txn: meta.Txn}
return &proto.WriteTooOldError{Timestamp: meta.Timestamp, Txn: meta.Txn}
}
} else { // In case the key metadata does not exist yet.
// Create key metadata.
meta = &proto.MVCCMetadata{Txn: txn, Timestamp: timestamp}
batchPut, err := MakeBatchPutProto(key, meta)
batchPut, err := MakeBatchPutProto(binKey, meta)
if err != nil {
return err
}
Expand All @@ -246,7 +224,7 @@ func (mvcc *MVCC) putInternal(key Key, timestamp proto.Timestamp, value proto.MV
if value.Value != nil {
value.Value.Timestamp = nil
}
batchPut, err := MakeBatchPutProto(mvccEncodeKey(key, timestamp), &value)
batchPut, err := MakeBatchPutProto(mvccEncodeKey(binKey, timestamp), &value)
if err != nil {
return err
}
Expand Down
75 changes: 59 additions & 16 deletions storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,40 @@ func (r *Range) ContainsKeyRange(start, end engine.Key) bool {
return r.Meta.ContainsKeyRange(start, end)
}

// EnqueueCmd enqueues a command to Raft.
func (r *Range) EnqueueCmd(cmd *Cmd) error {
r.raft <- cmd
return <-cmd.done
// AddCmd adds a command for execution on this range. The command's
// affected keys are verified to be contained within the range and the
// range's leadership is confirmed. The command is then dispatched
// either along the read-only execution path or the read-write Raft
// command queue. If wait is false, read-write commands are added to
// Raft without waiting for their completion.
func (r *Range) AddCmd(method string, args proto.Request, reply proto.Response, wait bool) error {
header := args.Header()
var err error
if !r.ContainsKeyRange(header.Key, header.EndKey) {
err = proto.NewRangeKeyMismatchError(header.Key, header.EndKey, r.Meta)
} else if !r.IsLeader() {
// TODO(spencer): when we happen to know the leader, fill it in here via replica.
err = &proto.NotLeaderError{}
}
if err != nil {
reply.Header().SetGoError(err)
return err
}

// Differentiate between read-only and read-write.
if IsReadOnly(method) {
if !wait {
return util.Errorf("cannot specify !wait for read-only requests")
}
return r.addReadOnlyCmd(method, args, reply)
}
return r.addReadWriteCmd(method, args, reply, wait)
}

// ReadOnlyCmd updates the read timestamp cache and waits for any
// addReadOnlyCmd updates the read timestamp cache and waits for any
// overlapping writes currently processing through Raft ahead of us to
// clear via the read queue.
func (r *Range) ReadOnlyCmd(method string, args proto.Request, reply proto.Response) error {
func (r *Range) addReadOnlyCmd(method string, args proto.Request, reply proto.Response) error {
header := args.Header()
r.Lock()
r.tsCache.Add(header.Key, header.EndKey, header.Timestamp)
Expand Down Expand Up @@ -297,7 +321,7 @@ func (r *Range) ReadOnlyCmd(method string, args proto.Request, reply proto.Respo
return r.executeCmd(method, args, reply)
}

// ReadWriteCmd first consults the response cache to determine whether
// addReadWriteCmd first consults the response cache to determine whether
// this command has already been sent to the range. If a response is
// found, it's returned immediately and not submitted to raft. Next,
// the timestamp cache is checked to determine if any newer accesses to
Expand All @@ -306,7 +330,8 @@ func (r *Range) ReadOnlyCmd(method string, args proto.Request, reply proto.Respo
// command are added as pending writes to the read queue and the
// command is submitted to Raft. Upon completion, the write is removed
// from the read queue and the reply is added to the repsonse cache.
func (r *Range) ReadWriteCmd(method string, args proto.Request, reply proto.Response) error {
// If wait is true, will block until the command is complete.
func (r *Range) addReadWriteCmd(method string, args proto.Request, reply proto.Response, wait bool) error {
// Check the response cache in case this is a replay. This call
// may block if the same command is already underway.
header := args.Header()
Expand Down Expand Up @@ -356,15 +381,29 @@ func (r *Range) ReadWriteCmd(method string, args proto.Request, reply proto.Resp
Reply: reply,
done: make(chan error, 1),
}
// This waits for the command to complete.
err := r.EnqueueCmd(cmd)
r.raft <- cmd

// Now that the command has completed, remove the pending write.
r.Lock()
r.readQ.RemoveWrite(wKey)
r.Unlock()
// Create a completion func for mandatory cleanups which we either
// run synchronously if we're waiting or in a goroutine otherwise.
completionFunc := func() error {
err := <-cmd.done
// Now that the command has completed, remove the pending write.
r.Lock()
r.readQ.RemoveWrite(wKey)
r.Unlock()
// If the original client didn't wait (e.g. resolve write intent),
// log execution errors so they're surfaced somewhere.
if !wait && err != nil {
log.Warningf("non-synchronous execution of %s with %+v failed: %v", cmd.Method, cmd.Args, err)
}
return err
}

return err
if wait {
return completionFunc()
}
go completionFunc()
return nil
}

// processRaft processes read/write commands, sending them to the Raft
Expand Down Expand Up @@ -530,6 +569,8 @@ func (r *Range) executeCmd(method string, args proto.Request, reply proto.Respon
// Propagate the request timestamp (which may have changed).
reply.Header().Timestamp = args.Header().Timestamp

log.V(1).Infof("executed %s command %+v: %+v", method, args, reply)

// Add this command's result to the response cache if this is a
// read/write method. This must be done as part of the execution of
// raft commands so that every replica maintains the same responses
Expand Down Expand Up @@ -857,7 +898,9 @@ func (r *Range) InternalHeartbeatTxn(args *proto.InternalHeartbeatTxnRequest, re
// adjust pushee's persisted txn depending on value of args.Abort. If
// args.Abort is true, set txn.Status to ABORTED, and priority to one
// less than the pusher's priority and return success. If args.Abort
// is false, set txn.Timestamp to pusher's txn.Timestamp + 1.
// is false, set txn.Timestamp to pusher's Timestamp + 1 (note that
// we use the pusher's Args.Timestamp, not Txn.Timestamp because the
// args timestamp can advance during the txn).
//
// Higher Txn Priority: If pushee txn has a higher priority than
// pusher, return TransactionRetryError. Transaction will be retried
Expand Down
Loading