Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give SQL transactions a debug name #2233

Merged
merged 2 commits into from
Aug 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func concurrentIncrements(db *client.DB, t *testing.T) {
wgStart.Wait()

if err := db.Txn(func(txn *client.Txn) error {
txn.SetDebugName(fmt.Sprintf("test-%d", i))
txn.SetDebugName(fmt.Sprintf("test-%d", i), 0)

// Retrieve the other key.
gr, err := txn.Get(readKey)
Expand Down
4 changes: 3 additions & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ func (db *DB) Run(b *Batch) error {
//
// TODO(pmattis): Allow transaction options to be specified.
func (db *DB) Txn(retryable func(txn *Txn) error) error {
return newTxn(*db).exec(retryable)
txn := NewTxn(*db)
txn.SetDebugName("", 1)
return txn.exec(retryable)
}

// send runs the specified calls synchronously in a single batch and
Expand Down
1 change: 0 additions & 1 deletion client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ func TestCommonMethods(t *testing.T) {
key{dbType, "Run"}: {},
key{dbType, "Txn"}: {},
key{txnType, "CommitInBatch"}: {},
key{txnType, "ToProto"}: {},
key{txnType, "Commit"}: {},
key{txnType, "Rollback"}: {},
key{txnType, "DebugName"}: {},
Expand Down
58 changes: 20 additions & 38 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ type txnSender Txn

func (ts *txnSender) Send(ctx context.Context, call proto.Call) {
// Send call through wrapped sender.
call.Args.Header().Txn = &ts.txn
call.Args.Header().Txn = &ts.Proto
ts.wrapped.Send(ctx, call)
ts.txn.Update(call.Reply.Header().Txn)
ts.Proto.Update(call.Reply.Header().Txn)

if err, ok := call.Reply.Header().GoError().(*proto.TransactionAbortedError); ok {
// On Abort, reset the transaction so we start anew on restart.
ts.txn = proto.Transaction{
Name: ts.txn.Name,
Isolation: ts.txn.Isolation,
ts.Proto = proto.Transaction{
Name: ts.Proto.Name,
Isolation: ts.Proto.Isolation,
Priority: err.Txn.Priority, // acts as a minimum priority on restart
}
}
Expand All @@ -67,7 +67,7 @@ func (ts *txnSender) Send(ctx context.Context, call proto.Call) {
type Txn struct {
db DB
wrapped Sender
txn proto.Transaction
Proto proto.Transaction
// haveTxnWrite is true as soon as the current attempt contains a write
// (prior to sending). This is in contrast to txn.Writing, which is set
// by the coordinator when the first intent has been created, and which
Expand All @@ -78,49 +78,31 @@ type Txn struct {
haveEndTxn bool // True if there was an explicit EndTransaction
}

func newTxn(db DB) *Txn {
// NewTxn returns a new txn.
func NewTxn(db DB) *Txn {
txn := &Txn{
db: db,
wrapped: db.Sender,
}
txn.db.Sender = (*txnSender)(txn)

// Caller's caller.
file, line, fun := caller.Lookup(2)
txn.txn.Name = fmt.Sprintf("%s:%d %s", file, line, fun)
return txn
}

// NewTxn returns a new txn.
func NewTxn(db DB) *Txn {
return newTxn(db)
}

// NewTxnFromProto returns a transaction created from the preserved
// state.
func NewTxnFromProto(db DB, state proto.Transaction) *Txn {
txn := newTxn(db)
txn.txn = state
return txn
}

// ToProto returns the transaction state in a protobuf.
func (txn *Txn) ToProto() proto.Transaction {
return txn.txn
}

// SetDebugName sets the debug name associated with the transaction which will
// appear in log files and the web UI. Each transaction starts out with an
// automatically assigned debug name composed of the file and line number where
// the transaction was created.
func (txn *Txn) SetDebugName(name string) {
file, line, _ := caller.Lookup(1)
txn.txn.Name = fmt.Sprintf("%s:%d %s", file, line, name)
func (txn *Txn) SetDebugName(name string, depth int) {
file, line, fun := caller.Lookup(depth + 1)
if name == "" {
name = fun
}
txn.Proto.Name = fmt.Sprintf("%s:%d %s", file, line, name)
}

// DebugName returns the debug name associated with the transaction.
func (txn *Txn) DebugName() string {
return txn.txn.Name
return txn.Proto.Name
}

// SetSnapshotIsolation sets the transaction's isolation type to
Expand All @@ -134,7 +116,7 @@ func (txn *Txn) SetSnapshotIsolation() {
// TODO(pmattis): Panic if the transaction has already had
// operations run on it. Needs to tie into the Txn reset in case of
// retries.
txn.txn.Isolation = proto.SNAPSHOT
txn.Proto.Isolation = proto.SNAPSHOT
}

// InternalSetPriority sets the transaction priority. It is intended for
Expand Down Expand Up @@ -307,7 +289,7 @@ func (txn *Txn) CommitInBatch(b *Batch) error {

// Commit sends an EndTransactionRequest with Commit=true.
func (txn *Txn) Commit() error {
if txn.txn.Writing {
if txn.Proto.Writing {
return txn.sendEndTxnCall(true /* commit */)
}
return nil
Expand All @@ -316,12 +298,12 @@ func (txn *Txn) Commit() error {
// Rollback sends an EndTransactionRequest with Commit=false.
func (txn *Txn) Rollback() error {
var err error
if txn.txn.Writing {
if txn.Proto.Writing {
err = txn.sendEndTxnCall(false /* commit */)
}
// Explicitly set the status as ABORTED so that higher layers
// know that this transaction has ended.
txn.txn.Status = proto.ABORTED
txn.Proto.Status = proto.ABORTED
return err
}

Expand Down Expand Up @@ -391,7 +373,7 @@ func (txn *Txn) send(calls ...proto.Call) error {
// If the transaction record indicates that the coordinator never wrote
// an intent (and the client doesn't have one lined up), then there's no
// need to send EndTransaction. If there is one anyways, cut it off.
if txn.haveEndTxn && !(txn.txn.Writing || txn.haveTxnWrite) {
if txn.haveEndTxn && !(txn.Proto.Writing || txn.haveTxnWrite) {
// There's always a call if we get here.
lastIndex := len(calls) - 1
if calls[lastIndex].Method() != proto.EndTransaction {
Expand Down
2 changes: 1 addition & 1 deletion client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestTxnResetTxnOnAbort(t *testing.T) {
txn.db.Sender.Send(context.Background(),
proto.Call{Args: testPutReq, Reply: &proto.PutResponse{}})

if len(txn.txn.ID) != 0 {
if len(txn.Proto.ID) != 0 {
t.Errorf("expected txn to be cleared")
}
}
Expand Down
2 changes: 1 addition & 1 deletion kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (tc *TxnCoordSender) sendOne(ctx context.Context, call proto.Call) {
}
call.Reply.Reset()
if err := tmpDB.Txn(func(txn *client.Txn) error {
txn.SetDebugName("auto-wrap")
txn.SetDebugName("auto-wrap", 0)
b := &client.Batch{}
b.InternalAddCall(call)
return txn.CommitInBatch(b)
Expand Down
2 changes: 1 addition & 1 deletion kv/txn_correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (hv *historyVerifier) runTxn(txnIdx int, priority int32,
var retry int
txnName := fmt.Sprintf("txn%d", txnIdx)
err := db.Txn(func(txn *client.Txn) error {
txn.SetDebugName(txnName)
txn.SetDebugName(txnName, 0)
if isolation == proto.SNAPSHOT {
txn.SetSnapshotIsolation()
}
Expand Down
10 changes: 6 additions & 4 deletions sql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Open a pending transaction if needed.
if planMaker.session.Txn != nil {
planMaker.txn = client.NewTxnFromProto(*s.db, *planMaker.session.Txn)
txn := client.NewTxn(*s.db)
txn.Proto = *planMaker.session.Txn
planMaker.txn = txn
}

// Send the Request for SQL execution and set the application-level error
Expand All @@ -125,8 +127,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Send back the session state even if there were application-level errors.
// Add transaction to session state.
if planMaker.txn != nil {
t := planMaker.txn.ToProto()
planMaker.session.Txn = &t
planMaker.session.Txn = &planMaker.txn.Proto
} else {
planMaker.session.Txn = nil
}
Expand Down Expand Up @@ -218,8 +219,9 @@ func (s *Server) execStmt(stmt parser.Statement, req driver.Request, planMaker *
// Start a transaction here and not in planMaker to prevent begin
// transaction from being called within an auto-transaction below.
planMaker.txn = client.NewTxn(*s.db)
planMaker.txn.SetDebugName("sql", 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we set this to the explained plan instead? That'd be a lot more useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What plan? This is getting set at the moment of BEGIN coming in, which doesn't really do any work.

}
} else if planMaker.txn.ToProto().Status == proto.ABORTED {
} else if planMaker.txn.Proto.Status == proto.ABORTED {
switch stmt := stmt.(type) {
case *parser.CommitTransaction, *parser.RollbackTransaction:
// Reset to allow starting a new transaction.
Expand Down