Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve code smell issues #2

Merged
merged 10 commits into from
Jun 7, 2023
62 changes: 39 additions & 23 deletions lib-messagebus/datacommunicator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,37 +106,53 @@ func SetConfiguration(filePath string) error {
return fmt.Errorf("Configuration File - %v Read Error: %v", filePath, err)
}
if MQ.KafkaF != nil {
if len(MQ.KafkaF.KServersInfo) <= 0 {
return fmt.Errorf("no value found for KServersInfo in messagebus config file")
}
if MQ.KafkaF.KTimeout == 0 {
MQ.KafkaF.KTimeout = 10
}
if MQ.KafkaF.KAFKACertFile == "" {
return fmt.Errorf("no value found for KAFKACertFile in messagebus config file")
}
if MQ.KafkaF.KAFKAKeyFile == "" {
return fmt.Errorf("no value found for KAFKAKeyFile in messagebus config file")
}
if MQ.KafkaF.KAFKACAFile == "" {
return fmt.Errorf("no value found for KAFKACAFile in messagebus config file")
if err := checkKafkaFConfiguration(); err != nil {
return err
}
}
if MQ.RedisStreams != nil {
var err error
if MQ.RedisStreams.RedisInMemoryEncryptedPassword == "" {
return fmt.Errorf("error: no value configured for Redis In memory Encrypted Password")
}
if MQ.RedisStreams.RSAPrivateKey, err = ioutil.ReadFile(MQ.RedisStreams.RSAPrivateKeyPath); err != nil {
return fmt.Errorf("error: value check failed for RSAPrivateKeyPath:%s with %v", MQ.RedisStreams.RSAPrivateKeyPath, err)
}
if MQ.RedisStreams.RedisInMemoryPassword, err = decryptRSAOAEPEncryptedPasswords(MQ.RedisStreams.RedisInMemoryEncryptedPassword); err != nil {
return fmt.Errorf("error: while decrypting In Memory DB password: %v", err)
if err := checkRedisStreamsConfiguration(); err != nil {
return err
}
}
return nil
}

// checkKafkaFConfiguration checks the validity of KafkaPacket object fields
func checkKafkaFConfiguration() error {
if len(MQ.KafkaF.KServersInfo) <= 0 {
return fmt.Errorf("no value found for KServersInfo in messagebus config file")
}
if MQ.KafkaF.KTimeout == 0 {
MQ.KafkaF.KTimeout = 10
}
if MQ.KafkaF.KAFKACertFile == "" {
return fmt.Errorf("no value found for KAFKACertFile in messagebus config file")
}
if MQ.KafkaF.KAFKAKeyFile == "" {
return fmt.Errorf("no value found for KAFKAKeyFile in messagebus config file")
}
if MQ.KafkaF.KAFKACAFile == "" {
return fmt.Errorf("no value found for KAFKACAFile in messagebus config file")
}
return nil
}

// checkRedisStreamsConfiguration checks the validity of RedisStream object fields
func checkRedisStreamsConfiguration() error {
var err error
if MQ.RedisStreams.RedisInMemoryEncryptedPassword == "" {
return fmt.Errorf("error: no value configured for Redis In memory Encrypted Password")
}
if MQ.RedisStreams.RSAPrivateKey, err = ioutil.ReadFile(MQ.RedisStreams.RSAPrivateKeyPath); err != nil {
return fmt.Errorf("error: value check failed for RSAPrivateKeyPath:%s with %v", MQ.RedisStreams.RSAPrivateKeyPath, err)
}
if MQ.RedisStreams.RedisInMemoryPassword, err = decryptRSAOAEPEncryptedPasswords(MQ.RedisStreams.RedisInMemoryEncryptedPassword); err != nil {
return fmt.Errorf("error: while decrypting In Memory DB password: %v", err)
}
return nil
}

func decryptRSAOAEPEncryptedPasswords(encryptedPassword string) ([]byte, error) {
decoded, err := base64.StdEncoding.DecodeString(encryptedPassword)
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions lib-messagebus/datacommunicator/header_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//(C) Copyright [2020] Hewlett Packard Enterprise Development LP
// (C) Copyright [2020] Hewlett Packard Enterprise Development LP
//
//Licensed under the Apache License, Version 2.0 (the "License"); you may
//not use this file except in compliance with the License. You may obtain
//a copy of the License at
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
//WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
//License for the specific language governing permissions and limitations
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package datacommunicator

Expand Down
28 changes: 14 additions & 14 deletions lib-messagebus/datacommunicator/kafkacomm_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//(C) Copyright [2020] Hewlett Packard Enterprise Development LP
// (C) Copyright [2020] Hewlett Packard Enterprise Development LP
//
//Licensed under the Apache License, Version 2.0 (the "License"); you may
//not use this file except in compliance with the License. You may obtain
//a copy of the License at
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
//WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
//License for the specific language governing permissions and limitations
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package datacommunicator

Expand Down Expand Up @@ -69,7 +69,7 @@ func TestKafkaConnect(t *testing.T) {
}
}

func TestKafkaPacket_Distribute(t *testing.T) {
func TestKafkaPacketDistribute(t *testing.T) {
type fields struct {
Packet Packet
DialerConn *kafka.Dialer
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestKafkaPacket_Distribute(t *testing.T) {
}
}

func TestKafkaPacket_Accept(t *testing.T) {
func TestKafkaPacketAccept(t *testing.T) {
type fields struct {
Packet Packet
DialerConn *kafka.Dialer
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestKafkaPacket_Accept(t *testing.T) {
}
}

func TestKafkaPacket_Read(t *testing.T) {
func TestKafkaPacketRead(t *testing.T) {
type fields struct {
Packet Packet
DialerConn *kafka.Dialer
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestKafkaPacket_Read(t *testing.T) {
}
}

func TestKafkaPacket_Get(t *testing.T) {
func TestKafkaPacketGet(t *testing.T) {
type fields struct {
Packet Packet
DialerConn *kafka.Dialer
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestKafkaPacket_Get(t *testing.T) {
}
}

func TestKafkaPacket_Close(t *testing.T) {
func TestKafkaPacketClose(t *testing.T) {
type fields struct {
Packet Packet
DialerConn *kafka.Dialer
Expand Down
63 changes: 30 additions & 33 deletions lib-messagebus/datacommunicator/redisstreamcomm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var dbConn *redis.Client
const (
// DefaultTLSMinVersion is default minimum version for tls
DefaultTLSMinVersion = tls.VersionTLS12
// TimeoutErrMsg is the connection time out error message
TimeoutErrMsg string = " connection timed out"
)

// RedisStreamsPacket defines the RedisStreamsPacket Message Packet Object. Apart from Base Packet, it
Expand Down Expand Up @@ -110,7 +112,7 @@ func (rp *RedisStreamsPacket) Distribute(data interface{}) error {
}).Result()

if rerr != nil {
if strings.Contains(rerr.Error(), " connection timed out") {
if strings.Contains(rerr.Error(), TimeoutErrMsg) {
err := rp.getDBConnection()
if err != nil {
return err
Expand All @@ -134,20 +136,15 @@ func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error {
var id = uuid.NewV4().String()
rerr := rp.client.XGroupCreateMkStream(context.Background(),
rp.pipe, EVENTREADERGROUPNAME, "$").Err()
if rerr != nil {
if strings.Contains(rerr.Error(), " connection timed out") {
err := rp.getDBConnection()
if err != nil {
return err
}
if rerr != nil && strings.Contains(rerr.Error(), TimeoutErrMsg) {
if err := rp.getDBConnection(); err != nil {
return err
}

}
// errChan to hold the errors faced in the below go-rotines
errChan := make(chan error)
go rp.checkUnacknowledgedEvents(fn, id, errChan)
err = <-errChan
if err != nil {
if err = <-errChan; err != nil {
return err
}

Expand All @@ -162,39 +159,42 @@ func (rp *RedisStreamsPacket) Accept(fn MsgProcess) error {
}).Result()
if err != nil {
errChan <- fmt.Errorf("unable to get data from the group %s", err.Error())
if strings.Contains(err.Error(), " connection timed out") {
if strings.Contains(err.Error(), TimeoutErrMsg) {
err := rp.getDBConnection()
if err != nil {
errChan <- err
return
}
}
} else {

if len(events) > 0 && len(events[0].Messages) > 0 {
messageID := events[0].Messages[0].ID
evtStr := events[0].Messages[0].Values["data"].(string)
var evt interface{}
err := Decode([]byte(evtStr), &evt)
if err != nil {
errChan <- err
return
}
fn(evt)
rp.client.XAck(context.Background(), rp.pipe, EVENTREADERGROUPNAME, messageID)
}
processEvent(rp, events, errChan, fn)
}
}
}()

// channel to handle the errors occured during go routines
err = <-errChan
if err != nil {
if err = <-errChan; err != nil {
return err
}
return nil
}

// processEvent processes the redis stream events and decode the data
func processEvent(rp *RedisStreamsPacket, events []redis.XStream, errChan chan<- error, fn MsgProcess) {
if len(events) > 0 && len(events[0].Messages) > 0 {
messageID := events[0].Messages[0].ID
evtStr := events[0].Messages[0].Values["data"].(string)
var evt interface{}
err := Decode([]byte(evtStr), &evt)
if err != nil {
errChan <- err
return
}
fn(evt)
rp.client.XAck(context.Background(), rp.pipe, EVENTREADERGROUPNAME, messageID)
}
}

// Read implmentation need to be added
func (rp *RedisStreamsPacket) Read(fn MsgProcess) error {
return nil
Expand Down Expand Up @@ -230,13 +230,10 @@ func (rp *RedisStreamsPacket) checkUnacknowledgedEvents(fn MsgProcess, id string
Count: 100,
Start: "0-0",
}).Result()
if err != nil {
if strings.Contains(err.Error(), " connection timed out") {
err = rp.getDBConnection()
if err != nil {
errChan <- err
return
}
if err != nil && strings.Contains(err.Error(), TimeoutErrMsg) {
if err = rp.getDBConnection(); err != nil {
errChan <- err
return
}
}
for _, event := range events {
Expand Down
2 changes: 1 addition & 1 deletion lib-persistence-manager/persistencemgr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//License for the specific language governing permissions and limitations
// under the License.

//Package persistencemgr provides an interfaces for database communication
// Package persistencemgr provides an interfaces for database communication
package persistencemgr

import (
Expand Down
Loading