-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
tx.go
151 lines (134 loc) · 3.96 KB
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package redis
import (
"context"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
// TxFailedErr transaction redis failed.
const TxFailedErr = proto.RedisError("redis: transaction failed")
// Tx implements Redis transactions as described in
// http://redis.io/topics/transactions. It's NOT safe for concurrent use
// by multiple goroutines, because Exec resets list of watched keys.
//
// If you don't need WATCH, use Pipeline instead.
type Tx struct {
baseClient
cmdable
statefulCmdable
hooksMixin
}
func (c *Client) newTx() *Tx {
tx := Tx{
baseClient: baseClient{
opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool),
},
hooksMixin: c.hooksMixin.clone(),
}
tx.init()
return &tx
}
func (c *Tx) init() {
c.cmdable = c.Process
c.statefulCmdable = c.Process
c.initHooks(hooks{
dial: c.baseClient.dial,
process: c.baseClient.process,
pipeline: c.baseClient.processPipeline,
txPipeline: c.baseClient.processTxPipeline,
})
}
func (c *Tx) Process(ctx context.Context, cmd Cmder) error {
err := c.processHook(ctx, cmd)
cmd.SetErr(err)
return err
}
// Watch prepares a transaction and marks the keys to be watched
// for conditional execution if there are any keys.
//
// The transaction is automatically closed when fn exits.
func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
tx := c.newTx()
defer tx.Close(ctx)
if len(keys) > 0 {
if err := tx.Watch(ctx, keys...).Err(); err != nil {
return err
}
}
return fn(tx)
}
// Close closes the transaction, releasing any open resources.
func (c *Tx) Close(ctx context.Context) error {
_ = c.Unwatch(ctx).Err()
return c.baseClient.Close()
}
// Watch marks the keys to be watched for conditional execution
// of a transaction.
func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "watch"
for i, key := range keys {
args[1+i] = key
}
cmd := NewStatusCmd(ctx, args...)
_ = c.Process(ctx, cmd)
return cmd
}
// Unwatch flushes all the previously watched keys for a transaction.
func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "unwatch"
for i, key := range keys {
args[1+i] = key
}
cmd := NewStatusCmd(ctx, args...)
_ = c.Process(ctx, cmd)
return cmd
}
// Pipeline creates a pipeline. Usually it is more convenient to use Pipelined.
func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{
exec: func(ctx context.Context, cmds []Cmder) error {
return c.processPipelineHook(ctx, cmds)
},
}
pipe.init()
return &pipe
}
// Pipelined executes commands queued in the fn outside of the transaction.
// Use TxPipelined if you need transactional behavior.
func (c *Tx) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn)
}
// TxPipelined executes commands queued in the fn in the transaction.
//
// When using WATCH, EXEC will execute commands only if the watched keys
// were not modified, allowing for a check-and-set mechanism.
//
// Exec always returns list of commands. If transaction fails
// TxFailedErr is returned. Otherwise Exec returns an error of the first
// failed command or nil.
func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(ctx, fn)
}
// TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined.
func (c *Tx) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: func(ctx context.Context, cmds []Cmder) error {
cmds = wrapMultiExec(ctx, cmds)
return c.processTxPipelineHook(ctx, cmds)
},
}
pipe.init()
return &pipe
}
func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {
if len(cmds) == 0 {
panic("not reached")
}
cmdsCopy := make([]Cmder, len(cmds)+2)
cmdsCopy[0] = NewStatusCmd(ctx, "multi")
copy(cmdsCopy[1:], cmds)
cmdsCopy[len(cmdsCopy)-1] = NewSliceCmd(ctx, "exec")
return cmdsCopy
}