Skip to content

Commit

Permalink
Merge pull request #15187 from andreimatei/distsql-retryable-err
Browse files Browse the repository at this point in the history
sql, roachpb: correctly marshall retryable errors through DistSQL
  • Loading branch information
andreimatei authored Apr 28, 2017
2 parents dc3a2cb + 659d611 commit 1ae28f6
Show file tree
Hide file tree
Showing 38 changed files with 1,873 additions and 724 deletions.
2 changes: 1 addition & 1 deletion pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestClientRunConcurrentTransaction(t *testing.T) {
for _, err := range concErrs {
if err != nil {
anyError = err
if _, ok := err.(*roachpb.RetryableTxnError); ok {
if _, ok := err.(*roachpb.HandledRetryableTxnError); ok {
return err
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package client

import (
"bytes"
"errors"
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -492,11 +492,11 @@ func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) err
if err != nil {
txn.CleanupOnError(ctx, err)
}
// Terminate RetryableTxnError here, so it doesn't cause a higher-level txn to
// be retried. We don't do this in any of the other functions in DB; I guess
// we should.
if _, ok := err.(*roachpb.RetryableTxnError); ok {
return errors.New(err.Error())
// Terminate HandledRetryableTxnError here, so it doesn't cause a higher-level
// txn to be retried. We don't do this in any of the other functions in DB; I
// guess we should.
if _, ok := err.(*roachpb.HandledRetryableTxnError); ok {
return errors.Wrapf(err, "terminated retryable error")
}
return err
}
Expand Down
90 changes: 46 additions & 44 deletions pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,50 +359,52 @@ func TestCommonMethods(t *testing.T) {
// request with the information that this particular Get must be
// unmarshaled, which didn't seem worth doing as we're not using
// Batch.GetProto at the moment.
{dbType, "GetProto"}: {},
{txnType, "GetProto"}: {},
{batchType, "CheckConsistency"}: {},
{batchType, "AddRawRequest"}: {},
{batchType, "PutInline"}: {},
{batchType, "RawResponse"}: {},
{batchType, "MustPErr"}: {},
{dbType, "AdminMerge"}: {},
{dbType, "AdminSplit"}: {},
{dbType, "AdminTransferLease"}: {},
{dbType, "AdminChangeReplicas"}: {},
{dbType, "CheckConsistency"}: {},
{dbType, "Run"}: {},
{dbType, "Txn"}: {},
{dbType, "GetSender"}: {},
{dbType, "PutInline"}: {},
{dbType, "WriteBatch"}: {},
{txnType, "Commit"}: {},
{txnType, "CommitInBatch"}: {},
{txnType, "CommitOrCleanup"}: {},
{txnType, "Rollback"}: {},
{txnType, "CleanupOnError"}: {},
{txnType, "DebugName"}: {},
{txnType, "InternalSetPriority"}: {},
{txnType, "IsFinalized"}: {},
{txnType, "NewBatch"}: {},
{txnType, "Exec"}: {},
{txnType, "GetDeadline"}: {},
{txnType, "ResetDeadline"}: {},
{txnType, "Run"}: {},
{txnType, "SetDebugName"}: {},
{txnType, "SetIsolation"}: {},
{txnType, "SetUserPriority"}: {},
{txnType, "SetSystemConfigTrigger"}: {},
{txnType, "SetTxnAnchorKey"}: {},
{txnType, "UpdateDeadlineMaybe"}: {},
{txnType, "AddCommitTrigger"}: {},
{txnType, "CommandCount"}: {},
{txnType, "IsRetryableErrMeantForTxn"}: {},
{txnType, "Isolation"}: {},
{txnType, "OrigTimestamp"}: {},
{txnType, "Proto"}: {},
{txnType, "UserPriority"}: {},
{txnType, "AnchorKey"}: {},
{dbType, "GetProto"}: {},
{txnType, "GetProto"}: {},
{batchType, "CheckConsistency"}: {},
{batchType, "AddRawRequest"}: {},
{batchType, "PutInline"}: {},
{batchType, "RawResponse"}: {},
{batchType, "MustPErr"}: {},
{dbType, "AdminMerge"}: {},
{dbType, "AdminSplit"}: {},
{dbType, "AdminTransferLease"}: {},
{dbType, "AdminChangeReplicas"}: {},
{dbType, "CheckConsistency"}: {},
{dbType, "Run"}: {},
{dbType, "Txn"}: {},
{dbType, "GetSender"}: {},
{dbType, "PutInline"}: {},
{dbType, "WriteBatch"}: {},
{txnType, "AcceptUnhandledRetryableErrors"}: {},
{txnType, "Commit"}: {},
{txnType, "CommitInBatch"}: {},
{txnType, "CommitOrCleanup"}: {},
{txnType, "Rollback"}: {},
{txnType, "CleanupOnError"}: {},
{txnType, "DebugName"}: {},
{txnType, "InternalSetPriority"}: {},
{txnType, "IsFinalized"}: {},
{txnType, "NewBatch"}: {},
{txnType, "Exec"}: {},
{txnType, "GetDeadline"}: {},
{txnType, "ResetDeadline"}: {},
{txnType, "Run"}: {},
{txnType, "SetDebugName"}: {},
{txnType, "SetIsolation"}: {},
{txnType, "SetUserPriority"}: {},
{txnType, "SetSystemConfigTrigger"}: {},
{txnType, "SetTxnAnchorKey"}: {},
{txnType, "UpdateDeadlineMaybe"}: {},
{txnType, "UpdateStateOnRemoteRetryableErr"}: {},
{txnType, "AddCommitTrigger"}: {},
{txnType, "CommandCount"}: {},
{txnType, "IsRetryableErrMeantForTxn"}: {},
{txnType, "Isolation"}: {},
{txnType, "OrigTimestamp"}: {},
{txnType, "Proto"}: {},
{txnType, "UserPriority"}: {},
{txnType, "AnchorKey"}: {},
}

for b := range omittedChecks {
Expand Down
11 changes: 11 additions & 0 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// Sender is the interface used to call into a Cockroach instance.
Expand All @@ -28,6 +29,16 @@ type Sender interface {
Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
}

// SenderWithDistSQLBackdoor is implemented by the TxnCoordSender to give
// DistSQL some hacky powers when handling errors that happened on remote nodes.
type SenderWithDistSQLBackdoor interface {
Sender

// GetTxnState returns the state that the TxnCoordSender has for a
// transaction. The bool is false is no state is found.
GetTxnState(txnID uuid.UUID) (roachpb.Transaction, bool)
}

// SenderFunc is an adapter to allow the use of ordinary functions
// as Senders.
type SenderFunc func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
Expand Down
Loading

0 comments on commit 1ae28f6

Please sign in to comment.