Skip to content

Commit

Permalink
feat: add protocol option (#2598)
Browse files Browse the repository at this point in the history
  • Loading branch information
ljun20160606 authored May 16, 2023
1 parent 31ba855 commit 3917988
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type ClusterOptions struct {

OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string

Expand Down Expand Up @@ -263,6 +264,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,

Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,

Expand Down
38 changes: 38 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,35 @@ var _ = Describe("ClusterClient", func() {
})
}

Describe("ClusterClient PROTO 2", func() {
BeforeEach(func() {
opt = redisClusterOptions()
opt.Protocol = 2
client = cluster.newClusterClient(ctx, opt)

err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should CLUSTER PROTO 2", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
return nil
})
})
})

Describe("ClusterClient", func() {
BeforeEach(func() {
opt = redisClusterOptions()
Expand Down Expand Up @@ -746,6 +775,15 @@ var _ = Describe("ClusterClient", func() {
})
})

It("should CLUSTER PROTO 3", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
return nil
})
})

It("should CLUSTER MYSHARDID", func() {
shardID, err := client.ClusterMyShardID(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Options struct {
// Hook that is called when new connection is established.
OnConnect func(ctx context.Context, cn *Conn) error

// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
// Default is 3.
Protocol int
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Expand Down Expand Up @@ -437,6 +440,7 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
o.DB = db
}

o.Protocol = q.int("protocol")
o.ClientName = q.string("client_name")
o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff")
Expand Down
3 changes: 3 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func TestParseURL(t *testing.T) {
}, {
url: "redis://localhost:123/?db=2&client_name=hi", // client name
o: &Options{Addr: "localhost:123", DB: 2, ClientName: "hi"},
}, {
url: "redis://localhost:123/?db=2&protocol=2", // RESP Protocol
o: &Options{Addr: "localhost:123", DB: 2, Protocol: 2},
}, {
url: "unix:///tmp/redis.sock",
o: &Options{Addr: "/tmp/redis.sock"},
Expand Down
7 changes: 6 additions & 1 deletion redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,15 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
conn := newConn(c.opt, connPool)

var auth bool
protocol := c.opt.Protocol
// By default, use RESP3 in current version.
if protocol < 2 {
protocol = 3
}

// for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used.
if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
if err := conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
auth = true
} else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal
Expand Down
27 changes: 27 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,33 @@ var _ = Describe("Client", func() {
Expect(val).Should(ContainSubstring("name=hi"))
})

It("should client PROTO 2", func() {
opt := redisOptions()
opt.Protocol = 2
db := redis.NewClient(opt)

defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()

val, err := db.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
})

It("should client PROTO 3", func() {
opt := redisOptions()
db := redis.NewClient(opt)

defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()

val, err := db.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
})

It("processes custom commands", func() {
cmd := redis.NewCmd(ctx, "PING")
_ = client.Process(ctx, cmd)
Expand Down
4 changes: 3 additions & 1 deletion ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"time"

"github.com/cespare/xxhash/v2"
rendezvous "github.com/dgryski/go-rendezvous" //nolint
"github.com/dgryski/go-rendezvous" //nolint

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/hashtag"
Expand Down Expand Up @@ -70,6 +70,7 @@ type RingOptions struct {
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string
DB int
Expand Down Expand Up @@ -136,6 +137,7 @@ func (opt *RingOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,

Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
Expand Down
40 changes: 40 additions & 0 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,37 @@ import (
"github.com/redis/go-redis/v9"
)

var _ = Describe("Redis Ring PROTO 2", func() {
const heartbeat = 100 * time.Millisecond

var ring *redis.Ring

BeforeEach(func() {
opt := redisRingOptions()
opt.Protocol = 2
opt.HeartbeatFrequency = heartbeat
ring = redis.NewRing(opt)

err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
return cl.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(ring.Close()).NotTo(HaveOccurred())
})

It("should ring PROTO 2", func() {
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
return nil
})
})
})

var _ = Describe("Redis Ring", func() {
const heartbeat = 100 * time.Millisecond

Expand Down Expand Up @@ -65,6 +96,15 @@ var _ = Describe("Redis Ring", func() {
})
})

It("should ring PROTO 3", func() {
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
return nil
})
})

It("distributes keys", func() {
setRingKeys()

Expand Down
3 changes: 3 additions & 0 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type FailoverOptions struct {
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string
DB int
Expand Down Expand Up @@ -88,6 +89,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
OnConnect: opt.OnConnect,

DB: opt.DB,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,

Expand Down Expand Up @@ -151,6 +153,7 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,

Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,

Expand Down
67 changes: 67 additions & 0 deletions sentinel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,30 @@ import (
"github.com/redis/go-redis/v9"
)

var _ = Describe("Sentinel PROTO 2", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: sentinelAddrs,
MaxRetries: -1,
Protocol: 2,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
_ = client.Close()
})

It("should sentinel client PROTO 2", func() {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
})
})

var _ = Describe("Sentinel", func() {
var client *redis.Client
var master *redis.Client
Expand Down Expand Up @@ -134,6 +158,40 @@ var _ = Describe("Sentinel", func() {
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=sentinel_hi"))
})

It("should sentinel client PROTO 3", func() {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
})
})

var _ = Describe("NewFailoverClusterClient PROTO 2", func() {
var client *redis.ClusterClient

BeforeEach(func() {
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: sentinelAddrs,
Protocol: 2,

RouteRandomly: true,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
_ = client.Close()
})

It("should sentinel cluster PROTO 2", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
return nil
})
})
})

var _ = Describe("NewFailoverClusterClient", func() {
Expand Down Expand Up @@ -237,6 +295,15 @@ var _ = Describe("NewFailoverClusterClient", func() {
return nil
})
})

It("should sentinel cluster PROTO 3", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
return nil
})
})
})

var _ = Describe("SentinelAclAuth", func() {
Expand Down
4 changes: 4 additions & 0 deletions universal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type UniversalOptions struct {
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string
SentinelUsername string
Expand Down Expand Up @@ -77,6 +78,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
Dialer: o.Dialer,
OnConnect: o.OnConnect,

Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,

Expand Down Expand Up @@ -122,6 +124,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
OnConnect: o.OnConnect,

DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
SentinelUsername: o.SentinelUsername,
Expand Down Expand Up @@ -162,6 +165,7 @@ func (o *UniversalOptions) Simple() *Options {
OnConnect: o.OnConnect,

DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,

Expand Down

0 comments on commit 3917988

Please sign in to comment.