Skip to content

Latest commit

 

History

History
112 lines (92 loc) · 2.65 KB

README.md

File metadata and controls

112 lines (92 loc) · 2.65 KB

rabbitmq

RabbitMQ as backend for Queue Package. See the Go RabbitMQ Client Library maintained by the RabbitMQ core team. It was originally developed by Sean Treadway.

Exchanges and Exchange Types

See the Exchanges and Exchange Types section of AMQP 0-9-1 Model Explained.

Direct Exchange

direct-exchange

See the consumer code:

package main

import (
  "context"
  "encoding/json"
  "flag"
  "fmt"
  "time"

  "github.com/appleboy/graceful"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
  "github.com/golang-queue/rabbitmq"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

var (
  uri          = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
  exchange     = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
  exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
  q            = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
  bindingKey   = flag.String("key", "test-key", "AMQP binding key")
)

func init() {
  flag.Parse()
}

func main() {
  taskN := 10000
  rets := make(chan string, taskN)

  m := graceful.NewManager()

  // define the worker
  w := rabbitmq.NewWorker(
    rabbitmq.WithAddr(*uri),
    rabbitmq.WithQueue(*q),
    rabbitmq.WithExchangeName(*exchange),
    rabbitmq.WithExchangeType(*exchangeType),
    rabbitmq.WithRoutingKey(*bindingKey),
    rabbitmq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
      var v *job
      if err := json.Unmarshal(m.Bytes(), &v); err != nil {
        return err
      }
      rets <- v.Message
      time.Sleep(500 * time.Millisecond)
      return nil
    }),
  )
  // define the queue
  q := queue.NewPool(
    2,
    queue.WithWorker(w),
  )

  m.AddRunningJob(func(ctx context.Context) error {
    for {
      select {
      case <-ctx.Done():
        select {
        case m := <-rets:
          fmt.Println("message:", m)
        default:
        }
        return nil
      case m := <-rets:
        fmt.Println("message:", m)
        time.Sleep(50 * time.Millisecond)
      }
    }
  })

  m.AddShutdownJob(func() error {
    // shutdown the service and notify all the worker
    q.Release()
    return nil
  })

  <-m.Done()
}