Skip to content

Commit

Permalink
feat: add redis transport (#14)
Browse files Browse the repository at this point in the history
* add redis transporter

* add redis to action test
  • Loading branch information
vessaldaneshvar authored Oct 10, 2023
1 parent 4ef1e23 commit 97f0d46
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ jobs:
ports:
- 5672:5672

redis:
image: redis:6.2
ports:
- 6379:6379

steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-pg/zerochecker v0.2.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
Expand All @@ -17,6 +19,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.2.1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/elliotchance/phpserialize v1.3.3 h1:hV4QVmGdCiYgoBbw+ADt6fNgyZ2mYX0OgpnON1adTCM=
github.com/elliotchance/phpserialize v1.3.3/go.mod h1:gt7XX9+ETUcLXbtTKEuyrqW3lcLUAeS/AnGZ2e49TZs=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -71,6 +75,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e h1:zWKUYT07mGmVBH+9UgnHXd/ekCK99C8EbDSAt5qsjXE=
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e/go.mod h1:Yow6lPLSAXx2ifx470yD/nUe22Dv5vBvxK/UK9UUTVs=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
Expand Down
59 changes: 59 additions & 0 deletions redis_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package gosumer

import (
"context"
"fmt"

"github.com/redis/go-redis/v9"
)

type Redis struct {
Host string
Port int
User string
Password string
DB uint8
Channel string
}

var rdb *redis.Client

func (red Redis) connect() error {
var err error
rdb = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", red.Host, red.Port),
Username: red.User,
Password: red.Password,
DB: int(red.DB),
})
ctx := context.Background()
err = rdb.Ping(ctx).Err()

if err != nil {
return err
}

return nil
}

func (red Redis) listen(fn process, message any) error {
err := red.connect()
if err != nil {
return err
}

if err != nil {
return err
}
ctx := context.Background()
sub := rdb.Subscribe(ctx, red.Channel)
defer sub.Close()
for {
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
panic(err)
}
var e chan error
go fn(msg, e)
}
}
58 changes: 58 additions & 0 deletions redis_transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package gosumer

import (
"context"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

func TestRedisConnect(t *testing.T) {
transport := Redis{
Host: "localhost",
Port: 6379,
Channel: "channel_name",
}

err := transport.connect()
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
}

func TestRedisListen(t *testing.T) {
transport := Redis{
Host: "localhost",
Port: 6379,
User: "",
Password: "",
Channel: "channel_1",
DB: 0,
}

go func() {
err := transport.listen(processMessage, Message{})
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
}()

rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
time.Sleep(1 * time.Second)

ctx := context.Background()
body := `{"id": 1, "name": "John Doe"}`
res := rdb.Publish(ctx, "channel_1", body)

if res.Err() != nil {
t.Errorf("Expected no error, got %v", res.Err())
}
time.Sleep(1 * time.Second)

assert.True(t, processMessageCalled, "Expected processMessage to be called")
processMessageCalled = false
}

0 comments on commit 97f0d46

Please sign in to comment.