Skip to content

Commit

Permalink
fix: redisPubSub ping
Browse files Browse the repository at this point in the history
Signed-off-by: XinTong Zhou <[email protected]>
  • Loading branch information
retoool committed Mar 26, 2024
1 parent 2e50db1 commit 9bd571a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 21 deletions.
24 changes: 18 additions & 6 deletions internal/io/redis/pubsub/redisPub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2023 [email protected]
// Copyright 2023-2024 [email protected]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
package pubsub

import (
"context"
"fmt"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -67,6 +68,22 @@ func (r *redisPub) Validate(props map[string]interface{}) error {
return nil
}

func (r *redisPub) Ping(_ string, props map[string]interface{}) error {
if err := r.Configure(props); err != nil {
return err
}
r.conn = redis.NewClient(&redis.Options{
Addr: r.conf.Address,
Username: r.conf.Username,
Password: r.conf.Password,
DB: r.conf.Db,
})
if err := r.conn.Ping(context.Background()).Err(); err != nil {
return fmt.Errorf("Ping Redis failed with error: %v", err)
}
return nil
}

func (r *redisPub) Configure(props map[string]interface{}) error {
return r.Validate(props)
}
Expand All @@ -79,11 +96,6 @@ func (r *redisPub) Open(ctx api.StreamContext) error {
Password: r.conf.Password,
DB: r.conf.Db,
})
// Ping Redis to check if the connection is alive
err := r.conn.Ping(ctx).Err()
if err != nil {
return fmt.Errorf("Ping Redis failed with error: %v", err)
}
return nil
}

Expand Down
10 changes: 3 additions & 7 deletions internal/io/redis/pubsub/redisPub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/internal/pkg/util"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/mock"
mockContext "github.com/lf-edge/ekuiper/pkg/mock/context"
Expand Down Expand Up @@ -170,19 +171,14 @@ func TestSinkDecompressorError(t *testing.T) {
}

func TestSinkPingRedisError(t *testing.T) {
s := RedisPub()
s := RedisPub().(util.PingableConn)
prop := map[string]interface{}{
"address": "127.0.0.1:6379",
"db": 0,
"channel": DefaultChannel,
}
expErrStr := fmt.Sprintf("Ping Redis failed with error")
err := s.Configure(prop)
if err != nil {
t.Error(err)
}
ctx := mockContext.NewMockContext("ruleSink", "op1")
err = s.Open(ctx)
err := s.Ping("", prop)
if err == nil {
t.Errorf("should have error")
return
Expand Down
15 changes: 9 additions & 6 deletions internal/io/redis/pubsub/redisSub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2023 [email protected]
// Copyright 2023-2024 [email protected]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,18 +72,21 @@ func (r *redisSub) Validate(props map[string]interface{}) error {
return nil
}

func (r *redisSub) Configure(_ string, props map[string]interface{}) error {
if err := r.Validate(props); err != nil {
func (r *redisSub) Ping(dataSource string, props map[string]interface{}) error {
if err := r.Configure(dataSource, props); err != nil {
return err

}
// Ping Redis to check if the connection is alive
err := r.conn.Ping(context.Background()).Err()
if err != nil {
if err := r.conn.Ping(context.Background()).Err(); err != nil {
return fmt.Errorf("Ping Redis failed with error: %v", err)
}
return nil
}

func (r *redisSub) Configure(_ string, props map[string]interface{}) error {
return r.Validate(props)
}

func (r *redisSub) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
logger := ctx.GetLogger()
logger.Infof("redisSub sink Opening")
Expand Down
5 changes: 3 additions & 2 deletions internal/io/redis/pubsub/redisSub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
_ "go.nanomsg.org/mangos/v3/transport/ipc"

"github.com/lf-edge/ekuiper/internal/pkg/util"
"github.com/lf-edge/ekuiper/pkg/api"
mockContext "github.com/lf-edge/ekuiper/pkg/mock/context"
)
Expand Down Expand Up @@ -104,14 +105,14 @@ func TestSourceDecompressorError(t *testing.T) {
}

func TestSourcePingRedisError(t *testing.T) {
s := RedisSub()
s := RedisSub().(util.PingableConn)
prop := map[string]interface{}{
"address": "",
"db": 0,
"channels": []string{DefaultChannel},
}
expErrStr := fmt.Sprintf("Ping Redis failed with error")
err := s.Configure("new", prop)
err := s.Ping("new", prop)
if err == nil {
t.Errorf("should have error")
return
Expand Down

0 comments on commit 9bd571a

Please sign in to comment.