Skip to content

Commit

Permalink
Merge pull request ODIM-Project#1227 from AngithCJ/odim-7383
Browse files Browse the repository at this point in the history
avoided creating new DB connection every time for Redis stream event messagebus
  • Loading branch information
amar-shalgar authored May 22, 2023
2 parents fb76db1 + 1075db5 commit f05027c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 36 deletions.
4 changes: 4 additions & 0 deletions lib-messagebus/datacommunicator/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func Communicator(bt string, messageQueueConfigPath, pipe string) (MQBus, error)
rp = new(RedisStreamsPacket)
rp.BrokerType = bt
rp.pipe = pipe
err := rp.getDBConnection()
if err != nil {
return nil, err
}
return rp, nil
default:
return nil, fmt.Errorf("Broker: \"Broker Type\" is not supported - %s", bt)
Expand Down
90 changes: 54 additions & 36 deletions lib-messagebus/datacommunicator/redisstreamcomm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
uuid "github.com/satori/go.uuid"
)

var dbConn *redis.Client

const (
// DefaultTLSMinVersion is default minimum version for tls
DefaultTLSMinVersion = tls.VersionTLS12
Expand All @@ -33,16 +35,28 @@ const (
// RedisStreamsPacket defines the RedisStreamsPacket Message Packet Object. Apart from Base Packet, it
// will contain Redis Connection Object
type RedisStreamsPacket struct {
client *redis.Client
Packet
pipe string
}

func getDBConnection() (*redis.Client, error) {
var dbConn *redis.Client
func (rp *RedisStreamsPacket) getDBConnection() error {

// Assigning the existing the db connection to RedisStreamsPacket.
// This connection will be validated. If it is a bad connection,
// a new connection will be created and assigned to RedisStreamsPacket
rp.client = dbConn
if rp.Ping() {
return nil
}

// closing the existing connection as it is corrupted or nil.
// A new connection will be created and assigned to RedisStreamsPacket
rp.Close()

tlsConfig, e := TLS(MQ.RedisStreams.RedisCertFile, MQ.RedisStreams.RedisKeyFile, MQ.RedisStreams.RedisCAFile)
if e != nil {
return nil, fmt.Errorf("error while trying to get DB connection: %s", e.Error())
return fmt.Errorf("error while trying to get DB connection: %s", e.Error())
}

tlsConfig.MinVersion = DefaultTLSMinVersion
Expand All @@ -56,15 +70,26 @@ func getDBConnection() (*redis.Client, error) {
SentinelPassword: string(MQ.RedisStreams.RedisInMemoryPassword),
Password: string(MQ.RedisStreams.RedisInMemoryPassword),
})
} else {
dbConn = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", MQ.RedisStreams.RedisServerAddress, MQ.RedisStreams.RedisServerPort),
TLSConfig: tlsConfig,
Password: string(MQ.RedisStreams.RedisInMemoryPassword),
DB: 0, // use default DB
})
}
return dbConn, nil

dbConn = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", MQ.RedisStreams.RedisServerAddress, MQ.RedisStreams.RedisServerPort),
TLSConfig: tlsConfig,
Password: string(MQ.RedisStreams.RedisInMemoryPassword),
DB: 0, // use default DB
})

rp.client = dbConn
return nil
}

// Ping function is used to test the db connection with ping command
func (rp *RedisStreamsPacket) Ping() bool {
if rp.client != nil {
_, err := rp.client.Ping(rp.client.Context()).Result()
return err == nil
}
return false
}

// Distribute defines the Producer / Publisher role and functionality. Writer
Expand All @@ -78,23 +103,19 @@ func (rp *RedisStreamsPacket) Distribute(data interface{}) error {
if e != nil {
return fmt.Errorf("while trying to encode message: %s", e.Error())
}
redisClient, err := getDBConnection()
if err != nil {
return err
}
defer redisClient.Close()
_, rerr := redisClient.XAdd(ctx, &redis.XAddArgs{

_, rerr := rp.client.XAdd(ctx, &redis.XAddArgs{
Stream: rp.pipe,
Values: map[string]interface{}{"data": b},
}).Result()

if rerr != nil {
if strings.Contains(rerr.Error(), " connection timed out") {
redisClient, err = getDBConnection()
err := rp.getDBConnection()
if err != nil {
return err
}
redisClient.XAdd(ctx, &redis.XAddArgs{
rp.client.XAdd(ctx, &redis.XAddArgs{
Stream: rp.pipe,
Values: map[string]interface{}{"data": b},
}).Result()
Expand All @@ -107,17 +128,15 @@ func (rp *RedisStreamsPacket) Distribute(data interface{}) error {

// Accept implmentation need to be added
func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error {
redisClient, err := getDBConnection()
if err != nil {
return err
}

// create a unique consumer id for the instance
var err error
var id = uuid.NewV4().String()
rerr := redisClient.XGroupCreateMkStream(context.Background(),
rerr := rp.client.XGroupCreateMkStream(context.Background(),
rp.pipe, EVENTREADERGROUPNAME, "$").Err()
if rerr != nil {
if strings.Contains(rerr.Error(), " connection timed out") {
redisClient, err = getDBConnection()
err := rp.getDBConnection()
if err != nil {
return err
}
Expand All @@ -134,7 +153,7 @@ func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error {

go func() {
for {
events, err := redisClient.XReadGroup(context.Background(),
events, err := rp.client.XReadGroup(context.Background(),
&redis.XReadGroupArgs{
Group: EVENTREADERGROUPNAME,
Consumer: id,
Expand All @@ -144,7 +163,7 @@ func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error {
if err != nil {
errChan <- fmt.Errorf("unable to get data from the group %s", err.Error())
if strings.Contains(err.Error(), " connection timed out") {
redisClient, err = getDBConnection()
err := rp.getDBConnection()
if err != nil {
errChan <- err
return
Expand All @@ -162,7 +181,7 @@ func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error {
return
}
fn(evt)
redisClient.XAck(context.Background(), rp.pipe, EVENTREADERGROUPNAME, messageID)
rp.client.XAck(context.Background(), rp.pipe, EVENTREADERGROUPNAME, messageID)
}
}
}
Expand Down Expand Up @@ -194,17 +213,16 @@ func (rp *RedisStreamsPacket) Remove() error {

// Close implmentation need to be added
func (rp *RedisStreamsPacket) Close() error {
if rp.client != nil {
return rp.client.Close()
}
return nil
}

func (rp *RedisStreamsPacket) checkUnacknowledgedEvents(fn MsgProcess, id string, errChan chan<- error) {
redisClient, err := getDBConnection()
if err != nil {
errChan <- err
return
}

for {
events, _, err := redisClient.XAutoClaim(context.Background(), &redis.XAutoClaimArgs{
events, _, err := rp.client.XAutoClaim(context.Background(), &redis.XAutoClaimArgs{
Stream: rp.pipe,
Group: EVENTREADERGROUPNAME,
Consumer: id,
Expand All @@ -214,7 +232,7 @@ func (rp *RedisStreamsPacket) checkUnacknowledgedEvents(fn MsgProcess, id string
}).Result()
if err != nil {
if strings.Contains(err.Error(), " connection timed out") {
redisClient, err = getDBConnection()
err = rp.getDBConnection()
if err != nil {
errChan <- err
return
Expand All @@ -231,7 +249,7 @@ func (rp *RedisStreamsPacket) checkUnacknowledgedEvents(fn MsgProcess, id string
return
}
fn(evt)
redisClient.XAck(context.Background(), rp.pipe, EVENTREADERGROUPNAME, messageID)
rp.client.XAck(context.Background(), rp.pipe, EVENTREADERGROUPNAME, messageID)
}
// Pass the nil to errChan when no error encountered
errChan <- nil
Expand Down

0 comments on commit f05027c

Please sign in to comment.