diff --git a/cluster.go b/cluster.go index 11e2b41c..1e71ae2b 100644 --- a/cluster.go +++ b/cluster.go @@ -525,9 +525,8 @@ func (c *clusterClient) toReplica(cmd Completed) bool { return false } -func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { +func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init bool) { last := cmds.InitSlot - init := false for _, cmd := range multi { if cmd.Slot() == cmds.InitSlot { @@ -550,7 +549,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { cc = c.pslots[cmd.Slot()] } if cc == nil { - return nil + return nil, false } count.m[cc]++ } @@ -569,13 +568,13 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { cc = c.pslots[cmd.Slot()] } if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas. - return nil + return nil, false } re := retries.m[cc] re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } - return retries + return retries, init } inits := 0 @@ -589,25 +588,28 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { } else if init && last != cmd.Slot() { panic(panicMixCxSlot) } - p := c.pslots[cmd.Slot()] - if p == nil { - return nil + cc := c.pslots[cmd.Slot()] + if cc == nil { + return nil, false } - count.m[p]++ + count.m[cc]++ } if last == cmds.InitSlot { // if all commands have no slots, such as INFO, we pick a non-nil slot. - for i, p := range c.pslots { - if p != nil { + for i, cc := range c.pslots { + if cc != nil { last = uint16(i) - count.m[p] = inits + count.m[cc] = inits break } } if last == cmds.InitSlot { - return nil + return nil, false } + } else if init { + cc := c.pslots[last] + count.m[cc] += inits } retries = connretryp.Get(len(count.m), len(count.m)) @@ -627,25 +629,34 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) { re.commands = append(re.commands, cmd) re.cIndexes = append(re.cIndexes, i) } - return retries + return retries, init } -func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, error) { - conns := c._pickMulti(multi) +func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, bool, error) { + conns, hasInit := c._pickMulti(multi) if conns == nil { if err := c.refresh(ctx); err != nil { - return nil, err + return nil, false, err } - if conns = c._pickMulti(multi); conns == nil { - return nil, ErrNoSlot + if conns, hasInit = c._pickMulti(multi); conns == nil { + return nil, false, ErrNoSlot } } - return conns, nil + return conns, hasInit, nil +} + +func isMulti(cmd Completed) bool { + return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "MULTI" +} +func isExec(cmd Completed) bool { + return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "EXEC" } func (c *clusterClient) doresultfn( - ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int, + ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int, single bool, ) (clean bool) { + mi := -1 + ei := -1 clean = true for i, resp := range resps { clean = clean && resp.NonRedisError() == nil @@ -664,6 +675,37 @@ func (c *clusterClient) doresultfn( } else { nc = c.redirectOrNew(addr, cc, cm.Slot(), mode) } + if single && ei < i { // find out if there is a transaction block or not. + for mi = i; mi >= 0 && !isMulti(commands[mi]); mi-- { + } + for ei = i; ei < len(commands) && !isExec(commands[ei]); ei++ { + } + if mi >= 0 && ei < len(commands) { // a transaction is found. + mu.Lock() + retries.Redirects++ + nr := retries.m[nc] + if nr == nil { + nr = retryp.Get(0, len(commands)) + retries.m[nc] = nr + } + for i := mi; i <= ei; i++ { + ii := cIndexes[i] + cm := commands[i] + if mode == RedirectAsk { + nr.aIndexes = append(nr.aIndexes, ii) + nr.cAskings = append(nr.cAskings, cm) + } else { + nr.cIndexes = append(nr.cIndexes, ii) + nr.commands = append(nr.commands, cm) + } + } + mu.Unlock() + continue // the transaction has been added to the retries, go to the next cmd. + } + } + if i <= ei && ei < len(commands) { + continue // the current cmd is in the processed transaction and has been added to the retries. + } mu.Lock() if mode != RedirectRetry { retries.Redirects++ @@ -690,17 +732,17 @@ func (c *clusterClient) doresultfn( } func (c *clusterClient) doretry( - ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, + ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, hasInit bool, ) { clean := true if len(re.commands) != 0 { resps := cc.DoMulti(ctx, re.commands...) - clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts) + clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit) resultsp.Put(resps) } if len(re.cAskings) != 0 { resps := askingMulti(cc, ctx, re.cAskings) - clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean + clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean resultsp.Put(resps) } if clean { @@ -714,7 +756,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis return nil } - retries, err := c.pickMulti(ctx, multi) + retries, hasInit, err := c.pickMulti(ctx, multi) if err != nil { return fillErrs(len(multi), err) } @@ -742,10 +784,10 @@ retry: } for cc, re := range retries.m { delete(retries.m, cc) - go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts) + go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts, hasInit) } mu.Unlock() - c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts) + c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts, hasInit) wg.Wait() if len(retries.m) != 0 { @@ -753,7 +795,6 @@ retry: retries.Redirects = 0 goto retry } - if retries.RetryDelay >= 0 { c.retryHandler.WaitForRetry(ctx, retries.RetryDelay) attempts++ @@ -946,7 +987,6 @@ func (c *clusterClient) resultcachefn( if !c.retry { continue } - retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error()) } else { nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode) @@ -1040,7 +1080,6 @@ retry: retries.Redirects = 0 goto retry } - if retries.RetryDelay >= 0 { c.retryHandler.WaitForRetry(ctx, retries.RetryDelay) attempts++ diff --git a/syncp.go b/syncp.go index 77b99c1f..ff340bc7 100644 --- a/syncp.go +++ b/syncp.go @@ -221,8 +221,8 @@ func (r *conncount) ResetLen(n int) { type connretry struct { m map[conn]*retry n int + RetryDelay time.Duration // NOTE: This is not thread-safe. Redirects uint32 // NOTE: This is not thread-safe. - RetryDelay time.Duration // NOTE: It is not thread-safe. } func (r *connretry) Capacity() int { @@ -238,8 +238,8 @@ func (r *connretry) ResetLen(n int) { type connretrycache struct { m map[conn]*retrycache n int + RetryDelay time.Duration // NOTE: This is not thread-safe. Redirects uint32 // NOTE: This is not thread-safe. - RetryDelay time.Duration // NOTE: It is not thread-safe. } func (r *connretrycache) Capacity() int {