From f137af06ee12e6164406f8f59045342fa389ccf5 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Tue, 24 Sep 2024 20:06:51 +0530 Subject: [PATCH 1/6] Redis-backed queues are no longer listened to once a list is empty. * LPop does not wait for new messages. * Use BLPop which is blocking version for LPop and waits for new messages. Signed-off-by: Md Soharab Ansari --- redis-http-connector/main.go | 38 ++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/redis-http-connector/main.go b/redis-http-connector/main.go index e587393..13a13db 100644 --- a/redis-http-connector/main.go +++ b/redis-http-connector/main.go @@ -29,34 +29,34 @@ func (conn redisConnector) consumeMessage(ctx context.Context) { "KEDA-Source-Name": {conn.connectordata.SourceName}, } - var messages []string - var listItr int64 forever := make(chan bool) go func() { - listLength, err := conn.rdbConnection.LLen(ctx, conn.connectordata.Topic).Result() - if err != nil { - conn.logger.Fatal("Error in consuming queue: ", zap.Error(err)) - } - for listItr = 0; listItr < listLength; listItr++ { - msg, err := conn.rdbConnection.LPop(ctx, conn.connectordata.Topic).Result() + for { + // BLPop will block and wait for a new message if the list is empty + msg, err := conn.rdbConnection.BLPop(ctx, 0, conn.connectordata.Topic).Result() if err != nil { conn.logger.Fatal("Error in consuming queue: ", zap.Error((err))) } - messages = append(messages, msg) - } - for _, message := range messages { - response, err := common.HandleHTTPRequest(message, headers, conn.connectordata, conn.logger) - if err != nil { - conn.errorHandler(ctx, err) - } else { - defer response.Body.Close() - body, err := io.ReadAll(response.Body) + + if len(msg) > 1 { + // BLPop returns a slice with topic and message, we need the second item + message := msg[1] + response, err := common.HandleHTTPRequest(message, headers, conn.connectordata, conn.logger) if err != nil { conn.errorHandler(ctx, err) } else { - if success := conn.responseHandler(ctx, string(body)); success { - conn.logger.Info("Message sending to response successful") + body, err := io.ReadAll(response.Body) + if err != nil { + conn.errorHandler(ctx, err) + } else { + if success := conn.responseHandler(ctx, string(body)); success { + conn.logger.Info("Message sending to response successful") + } + } + err = response.Body.Close() + if err != nil { + conn.logger.Error(err.Error()) } } } From 1735cb6b27f8afe349bf8f013c730f42991868d6 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Tue, 24 Sep 2024 20:09:49 +0530 Subject: [PATCH 2/6] Bump minor version for redis-http-connector Signed-off-by: Md Soharab Ansari --- redis-http-connector/version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis-http-connector/version b/redis-http-connector/version index 03776fb..86de203 100644 --- a/redis-http-connector/version +++ b/redis-http-connector/version @@ -1 +1 @@ -v0.7 +v0.8 From badedbf741ce2fb47c40913bc8dcc8c7864ef553 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Wed, 25 Sep 2024 20:06:58 +0530 Subject: [PATCH 3/6] Use exit signal handler to terminate the process cleanly Signed-off-by: Md Soharab Ansari --- redis-http-connector/main.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/redis-http-connector/main.go b/redis-http-connector/main.go index 13a13db..450b0d5 100644 --- a/redis-http-connector/main.go +++ b/redis-http-connector/main.go @@ -6,6 +6,8 @@ import ( "log" "net/http" "os" + "os/signal" + "syscall" "github.com/go-redis/redis/v8" "go.uber.org/zap" @@ -19,7 +21,7 @@ type redisConnector struct { logger *zap.Logger } -func (conn redisConnector) consumeMessage(ctx context.Context) { +func (conn redisConnector) consumeMessage(sigterm chan os.Signal) { headers := http.Header{ "KEDA-Topic": {conn.connectordata.Topic}, @@ -29,6 +31,7 @@ func (conn redisConnector) consumeMessage(ctx context.Context) { "KEDA-Source-Name": {conn.connectordata.SourceName}, } + var ctx = context.Background() forever := make(chan bool) go func() { @@ -36,7 +39,8 @@ func (conn redisConnector) consumeMessage(ctx context.Context) { // BLPop will block and wait for a new message if the list is empty msg, err := conn.rdbConnection.BLPop(ctx, 0, conn.connectordata.Topic).Result() if err != nil { - conn.logger.Fatal("Error in consuming queue: ", zap.Error((err))) + conn.logger.Error("Error in consuming queue: ", zap.Error((err))) + forever <- false } if len(msg) > 1 { @@ -57,6 +61,7 @@ func (conn redisConnector) consumeMessage(ctx context.Context) { err = response.Body.Close() if err != nil { conn.logger.Error(err.Error()) + forever <- false } } } @@ -64,6 +69,7 @@ func (conn redisConnector) consumeMessage(ctx context.Context) { }() conn.logger.Info("Redis consumer up and running!") <-forever + sigterm <- syscall.SIGTERM } func (conn redisConnector) errorHandler(ctx context.Context, err error) { @@ -118,16 +124,20 @@ func main() { } password := os.Getenv("PASSWORD_FROM_ENV") - var ctx = context.Background() rdb := redis.NewClient(&redis.Options{ Addr: address, Password: password, }) + sigterm := make(chan os.Signal, 1) conn := redisConnector{ rdbConnection: rdb, connectordata: connectordata, logger: logger, } - conn.consumeMessage(ctx) + conn.consumeMessage(sigterm) + + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + <-sigterm + logger.Info("Terminating: Redis consumer") } From 66edffb9334aae63bdbef8a602bb4927a1830ec5 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Thu, 26 Sep 2024 10:55:06 +0530 Subject: [PATCH 4/6] Upgrade go version to 1.23.1. Fix redis connector issue. Signed-off-by: Md Soharab Ansari --- common/util.go | 3 +- go.mod | 23 ++++---- go.sum | 67 +++++++++++++--------- redis-http-connector/main.go | 106 +++++++++++++++++++++-------------- 4 files changed, 117 insertions(+), 82 deletions(-) diff --git a/common/util.go b/common/util.go index 937732f..cc4fc0e 100644 --- a/common/util.go +++ b/common/util.go @@ -9,9 +9,8 @@ import ( "strconv" "strings" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/pkg/errors" "go.uber.org/zap" ) diff --git a/go.mod b/go.mod index db1a9f9..b368514 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/fission/keda-connectors -go 1.22 +go 1.23.1 require ( cloud.google.com/go/pubsub v1.39.0 @@ -15,6 +15,7 @@ require ( github.com/xdg/scram v1.0.5 go.uber.org/zap v1.27.0 google.golang.org/api v0.185.0 + sigs.k8s.io/controller-runtime v0.19.0 ) require ( @@ -23,14 +24,14 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/iam v1.1.8 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/go-resiliency v1.6.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect @@ -58,11 +59,11 @@ require ( github.com/xdg/stringprep v1.0.3 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect - go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect - go.uber.org/multierr v1.10.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect @@ -72,7 +73,7 @@ require ( golang.org/x/time v0.5.0 // indirect google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect - google.golang.org/grpc v1.64.1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect + google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index bee8dbf..8cc2064 100644 --- a/go.sum +++ b/go.sum @@ -23,13 +23,14 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+ github.com/aws/aws-sdk-go v1.54.4 h1:xZga3fPu7uxVgh83DIaQlb7r0cixFx1xKiiROTWAhpU= github.com/aws/aws-sdk-go v1.54.4/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= @@ -48,15 +49,17 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -85,9 +88,13 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg= @@ -157,8 +164,10 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= -github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -166,8 +175,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= -github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= @@ -198,20 +207,20 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -292,6 +301,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -307,15 +318,15 @@ google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4 h1:CUiCqkPw1nNrNQz google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4/go.mod h1:EvuUDCulqGgV80RvP1BHuom+smhX4qtlhnNatHuroGQ= google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3 h1:QW9+G6Fir4VcRXVH8x3LilNAb6cxBGLa6+GM4hRwexE= google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3/go.mod h1:kdrSS/OiLkPrNUpzD4aHgCq2rVuC/YRxok32HXZ4vRE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 h1:Di6ANFilr+S60a4S61ZM00vLdw0IrQOSMS2/6mrnOU0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= -google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -339,3 +350,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +sigs.k8s.io/controller-runtime v0.19.0 h1:nWVM7aq+Il2ABxwiCizrVDSlmDcshi9llbaFbC0ji/Q= +sigs.k8s.io/controller-runtime v0.19.0/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= diff --git a/redis-http-connector/main.go b/redis-http-connector/main.go index 450b0d5..60a777f 100644 --- a/redis-http-connector/main.go +++ b/redis-http-connector/main.go @@ -2,15 +2,16 @@ package main import ( "context" + "fmt" "io" "log" "net/http" "os" - "os/signal" - "syscall" + "sync" "github.com/go-redis/redis/v8" "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" "github.com/fission/keda-connectors/common" ) @@ -21,8 +22,7 @@ type redisConnector struct { logger *zap.Logger } -func (conn redisConnector) consumeMessage(sigterm chan os.Signal) { - +func (conn redisConnector) consumeMessage(ctx context.Context) error { headers := http.Header{ "KEDA-Topic": {conn.connectordata.Topic}, "KEDA-Response-Topic": {conn.connectordata.ResponseTopic}, @@ -31,45 +31,42 @@ func (conn redisConnector) consumeMessage(sigterm chan os.Signal) { "KEDA-Source-Name": {conn.connectordata.SourceName}, } - var ctx = context.Background() - forever := make(chan bool) + for { + // Check if the context is done + if ctx.Err() != nil { + return ctx.Err() + } + // BLPop will block and wait for a new message if the list is empty + msg, err := conn.rdbConnection.BLPop(ctx, 0, conn.connectordata.Topic).Result() + if err != nil { + return fmt.Errorf("error in consuming queue: %w", err) + } - go func() { - for { - // BLPop will block and wait for a new message if the list is empty - msg, err := conn.rdbConnection.BLPop(ctx, 0, conn.connectordata.Topic).Result() + if len(msg) > 1 { + // BLPop returns a slice with topic and message, we need the second item + message := msg[1] + response, err := common.HandleHTTPRequest(message, headers, conn.connectordata, conn.logger) if err != nil { - conn.logger.Error("Error in consuming queue: ", zap.Error((err))) - forever <- false + conn.errorHandler(ctx, err) + continue // Skip to the next iteration } - if len(msg) > 1 { - // BLPop returns a slice with topic and message, we need the second item - message := msg[1] - response, err := common.HandleHTTPRequest(message, headers, conn.connectordata, conn.logger) - if err != nil { - conn.errorHandler(ctx, err) - } else { - body, err := io.ReadAll(response.Body) - if err != nil { - conn.errorHandler(ctx, err) - } else { - if success := conn.responseHandler(ctx, string(body)); success { - conn.logger.Info("Message sending to response successful") - } - } - err = response.Body.Close() - if err != nil { - conn.logger.Error(err.Error()) - forever <- false - } - } + body, err := io.ReadAll(response.Body) + if err != nil { + conn.errorHandler(ctx, err) + response.Body.Close() // Close the body even if ReadAll fails + continue // Skip to the next iteration + } + + if success := conn.responseHandler(ctx, string(body)); success { + conn.logger.Info("Message sending to response successful") + } + + if err := response.Body.Close(); err != nil { + conn.logger.Warn("Error closing response body", zap.Error(err)) } } - }() - conn.logger.Info("Redis consumer up and running!") - <-forever - sigterm <- syscall.SIGTERM + } } func (conn redisConnector) errorHandler(ctx context.Context, err error) { @@ -105,6 +102,10 @@ func (conn redisConnector) responseHandler(ctx context.Context, response string) return true } +func (conn redisConnector) close() error { + return conn.rdbConnection.Close() +} + func main() { logger, err := zap.NewProduction() @@ -115,7 +116,7 @@ func main() { connectordata, err := common.ParseConnectorMetadata() if err != nil { - logger.Error("Error while parsing connector metadata", zap.Error(err)) + logger.Fatal("Error while parsing connector metadata", zap.Error(err)) } address := os.Getenv("ADDRESS") @@ -129,15 +130,36 @@ func main() { Password: password, }) - sigterm := make(chan os.Signal, 1) + ctx, cancel := context.WithCancel(signals.SetupSignalHandler()) + defer cancel() + + wg := sync.WaitGroup{} conn := redisConnector{ rdbConnection: rdb, connectordata: connectordata, logger: logger, } - conn.consumeMessage(sigterm) + defer func() { + if err := conn.close(); err != nil { + logger.Error("Error closing Redis connection", zap.Error(err)) + } + }() - signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - <-sigterm + wg.Add(1) + go func() { + defer wg.Done() + for { + if err := conn.consumeMessage(ctx); err != nil { + if err == context.Canceled || err == context.DeadlineExceeded { + conn.logger.Info("Context cancelled, stopping consumer") + return + } + logger.Error("Error in consuming message", zap.Error(err)) + } + // Remove the context check here, as it's now handled in consumeMessage + conn.logger.Info("Restarting consumer") + } + }() + wg.Wait() logger.Info("Terminating: Redis consumer") } From 001c7cfb82a612d0c0cc86629f37c34ed0e05a3e Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Thu, 26 Sep 2024 11:38:09 +0530 Subject: [PATCH 5/6] Update golangci-lint version to v1.61.0 to support Go v1.23.1 Signed-off-by: Md Soharab Ansari --- .github/workflows/lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 732377b..0d155f2 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -10,7 +10,7 @@ on: workflow_dispatch: env: - GOLANGCI_LINT_VERSION: v1.59.1 + GOLANGCI_LINT_VERSION: v1.61.0 GOLANGCI_LINT_TIMEOUT: 5m jobs: From f8bda6e9175d6fa4e4de6938821adcf46a2376ed Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Thu, 26 Sep 2024 12:24:29 +0530 Subject: [PATCH 6/6] Bump minor version for all the connectors Signed-off-by: Md Soharab Ansari --- aws-kinesis-http-connector/version | 2 +- aws-sqs-http-connector/version | 2 +- gcp-pubsub-http-connector/version | 2 +- kafka-http-connector/version | 2 +- nats-jetstream-http-connector/version | 2 +- nats-streaming-http-connector/version | 2 +- rabbitmq-http-connector/version | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/aws-kinesis-http-connector/version b/aws-kinesis-http-connector/version index 1ec829a..2048935 100644 --- a/aws-kinesis-http-connector/version +++ b/aws-kinesis-http-connector/version @@ -1 +1 @@ -v0.14 +v0.15 diff --git a/aws-sqs-http-connector/version b/aws-sqs-http-connector/version index 2048935..39fa176 100644 --- a/aws-sqs-http-connector/version +++ b/aws-sqs-http-connector/version @@ -1 +1 @@ -v0.15 +v0.16 diff --git a/gcp-pubsub-http-connector/version b/gcp-pubsub-http-connector/version index 9097bf9..5416288 100644 --- a/gcp-pubsub-http-connector/version +++ b/gcp-pubsub-http-connector/version @@ -1 +1 @@ -v0.10 +v0.11 diff --git a/kafka-http-connector/version b/kafka-http-connector/version index 39fa176..2b8bac3 100644 --- a/kafka-http-connector/version +++ b/kafka-http-connector/version @@ -1 +1 @@ -v0.16 +v0.17 diff --git a/nats-jetstream-http-connector/version b/nats-jetstream-http-connector/version index 86de203..490a0cd 100644 --- a/nats-jetstream-http-connector/version +++ b/nats-jetstream-http-connector/version @@ -1 +1 @@ -v0.8 +v0.9 diff --git a/nats-streaming-http-connector/version b/nats-streaming-http-connector/version index 2b8bac3..d314dc4 100644 --- a/nats-streaming-http-connector/version +++ b/nats-streaming-http-connector/version @@ -1 +1 @@ -v0.17 +v0.18 diff --git a/rabbitmq-http-connector/version b/rabbitmq-http-connector/version index 1ec829a..2048935 100644 --- a/rabbitmq-http-connector/version +++ b/rabbitmq-http-connector/version @@ -1 +1 @@ -v0.14 +v0.15