Skip to content

Commit

Permalink
fix(transports): bad interface implementation (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
darkweak authored Nov 23, 2023
1 parent db093e5 commit 3a27cda
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 12 deletions.
3 changes: 2 additions & 1 deletion amqp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type RabbitMQ struct {

var connection *amqp.Connection
var channel *amqp.Channel
var _ Transport = (*RabbitMQ)(nil)

func (rabbitmq RabbitMQ) connect() error {
var err error
Expand All @@ -38,7 +39,7 @@ func (rabbitmq RabbitMQ) connect() error {
return nil
}

func (rabbitmq RabbitMQ) listen(fn process, message any) error {
func (rabbitmq RabbitMQ) listen(fn process, message any, _ int) error {
err := rabbitmq.connect()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion amqp_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestAmqpListen(t *testing.T) {
}

go func() {
err := transport.listen(processMessage, Message{})
err := transport.listen(processMessage, Message{}, 0)
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
Expand Down
12 changes: 4 additions & 8 deletions pgsql_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type PgDatabase struct {
}

var pool *pgxpool.Pool
var _ Transport = (*PgDatabase)(nil)

func (database PgDatabase) connect() error {
var err error
Expand Down Expand Up @@ -72,17 +73,12 @@ func (database PgDatabase) listen(fn process, message any, sec int) error {
}

func (database PgDatabase) listenEvery(seconds int, fn process, message any) {
ticker := time.NewTicker(time.Duration(seconds) * time.Second)
delay := time.Duration(seconds) * time.Second

go func() error {
for {
select {
case <-ticker.C:
err := database.processMessage(fn, message)
if err != nil {
continue
}
}
<- time.After(delay)
_ = database.processMessage(fn, message)
}
}()
}
Expand Down
3 changes: 2 additions & 1 deletion redis_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Redis struct {
}

var rdb *redis.Client
var _ Transport = (*Redis)(nil)

func (red Redis) connect() error {
var err error
Expand All @@ -36,7 +37,7 @@ func (red Redis) connect() error {
return nil
}

func (red Redis) listen(fn process, message any) error {
func (red Redis) listen(fn process, message any, _ int) error {
err := red.connect()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion redis_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestRedisListen(t *testing.T) {
}

go func() {
err := transport.listen(processMessage, Message{})
err := transport.listen(processMessage, Message{}, 0)
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
Expand Down

0 comments on commit 3a27cda

Please sign in to comment.