-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
119 lines (97 loc) · 2.7 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"context"
"net"
"regexp"
"github.com/redis/go-redis/v9"
)
type Client struct {
*redis.Client
Cluster bool
Network string
Addr string
}
type ClusterClient struct {
*redis.ClusterClient
}
type UnifiedClient struct {
Single *Client
Cluster *ClusterClient
IsCluster bool
}
var notClusterPattern = regexp.MustCompile(`cluster_enabled:\s*0`)
func NewClient(options *redis.Options) *Client {
return WrapClient(redis.NewClient(options))
}
func WrapClient(client *redis.Client) *Client {
result := Client{
Client: client,
}
result.AddHook(&result)
return &result
}
func (c *Client) IsCluster(ctx context.Context) (bool, error) {
info, err := c.Info(ctx, "cluster").Result()
if err != nil {
return false, err
}
return !notClusterPattern.MatchString(info), nil
}
func (c *Client) DialHook(next redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
c.Network = network
c.Addr = addr
return next(ctx, network, addr)
}
}
func (c *Client) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
return next(ctx, cmd)
}
}
func (c Client) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
return next(ctx, cmds)
}
}
func NewClusterClient(options *redis.ClusterOptions) *ClusterClient {
return &ClusterClient{
ClusterClient: redis.NewClusterClient(options),
}
}
func (c *ClusterClient) ForEachShard(ctx context.Context, fn func(ctx context.Context, client *Client) error) error {
return c.ClusterClient.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
return fn(ctx, WrapClient(client))
})
}
func (c *ClusterClient) ForEachMaster(ctx context.Context, fn func(ctx context.Context, client *Client) error) error {
return c.ClusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
return fn(ctx, WrapClient(client))
})
}
func (c *ClusterClient) ForEachSlave(ctx context.Context, fn func(ctx context.Context, client *Client) error) error {
return c.ClusterClient.ForEachSlave(ctx, func(ctx context.Context, client *redis.Client) error {
return fn(ctx, WrapClient(client))
})
}
func NewUnifiedClient(ctx context.Context, addr string) (UnifiedClient, error) {
client := NewClient(&redis.Options{
Addr: addr,
})
isCluster, err := client.IsCluster(ctx)
if err != nil {
return UnifiedClient{}, err
}
result := UnifiedClient{
Single: client,
IsCluster: isCluster,
}
if isCluster {
result.Cluster = NewClusterClient(
&redis.ClusterOptions{
Addrs: []string{addr},
},
)
}
return result, nil
}