Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beanq DefaultHandle #74

Merged
merged 24 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/reviewdog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: stable
go-version: 1.22.6
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.59
version: v1.60
7 changes: 2 additions & 5 deletions async_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@ func newAsyncPool(poolSize int) *asyncPool {

func (a *asyncPool) Execute(ctx context.Context, fn func(c context.Context) error, durations ...time.Duration) {
var (
c context.Context
cancel context.CancelFunc
)
if len(durations) > 0 {
c, cancel = context.WithTimeout(context.TODO(), durations[0])
ctx, cancel = context.WithTimeout(ctx, durations[0])
defer cancel()
} else {
c = context.TODO()
}

err := a.pool.Submit(func() {
Expand All @@ -47,7 +44,7 @@ func (a *asyncPool) Execute(ctx context.Context, fn func(c context.Context) erro
}
}()

e := fn(c)
e := fn(ctx)
if e != nil {
a.captureException(ctx, e)
}
Expand Down
9 changes: 3 additions & 6 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package beanq

import (
"context"
"errors"
"fmt"
"hash/fnv"
"math"
Expand Down Expand Up @@ -82,7 +81,7 @@ func MakeStreamKey(subType subscribeType, prefix, channel, topic string) string

// MakeStatusKey create key for type string
func MakeStatusKey(prefix, channel, id string) string {
return makeKey(prefix, channel, "status", id)
return makeKey(prefix, channel, "=-status-=", id)
}

// MakeDynamicKey create key for dynamic
Expand Down Expand Up @@ -150,13 +149,11 @@ func doTimeout(ctx context.Context, f func() error) error {
// RetryInfo retry=0 means no retries, but it will be executed at least once.
func RetryInfo(ctx context.Context, f func() error, retry int) (i int, err error) {
for i = 0; i <= retry; i++ {
e := doTimeout(ctx, f)
if e == nil {
err = doTimeout(ctx, f)
if err == nil {
return
}

err = errors.Join(err, e)

waitTime := jitterBackoff(500*time.Millisecond, time.Second, i)
select {
case <-time.After(waitTime):
Expand Down
5 changes: 4 additions & 1 deletion beanq.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type (
On bool
Mongo struct {
Database string
Collection string
UserName string
Password string
Host string
Expand Down Expand Up @@ -139,6 +140,9 @@ func (t *BeanqConfig) init() {
if t.TimeToRun == 0 {
t.TimeToRun = DefaultOptions.TimeToRun
}
if t.History.Mongo.Collection == "" {
t.History.Mongo.Collection = "event_logs"
}
if t.History.Mongo.ConnectTimeOut == 0 {
t.History.Mongo.ConnectTimeOut = 10 * time.Second
}
Expand All @@ -154,7 +158,6 @@ func (t *BeanqConfig) init() {
type IHandle interface {
Channel() string
Topic() string
Check(ctx context.Context) error
Process(ctx context.Context)
Schedule(ctx context.Context) error
DeadLetter(ctx context.Context) error
Expand Down
20 changes: 16 additions & 4 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package beanq

import (
"context"
"errors"
"sync"

"github.com/retail-ai-inc/beanq/helper/logger"
)

type (
IBroker interface {
driver() any
checkStatus(ctx context.Context, channel, id string) (*Message, error)
enqueue(ctx context.Context, msg *Message, dynamicOn bool) error
startConsuming(ctx context.Context)
Expand Down Expand Up @@ -43,7 +45,18 @@ func NewBroker(config *BeanqConfig) IBroker {
return broker
}

func GetBrokerDriver[T any]() T {
if broker == nil {
logger.New().Panic("the broker has not been initialized yet")
}
return broker.driver().(T)
}

// consumer...
var (
NilHandle = errors.New("beanq:handle is nil")
NilCancel = errors.New("beanq:cancel is nil")
)

type (
IConsumeHandle interface {
Expand All @@ -68,23 +81,22 @@ type (
)

func (c WorkflowHandler) Handle(ctx context.Context, message *Message) error {
workflow := NewWorkflow(message)

workflow := NewWorkflow(ctx, message)
return c(ctx, workflow)
}

func (c DefaultHandle) Handle(ctx context.Context, message *Message) error {
if c.DoHandle != nil {
return c.DoHandle(ctx, message)
}
return nil
return NilHandle
}

func (c DefaultHandle) Cancel(ctx context.Context, message *Message) error {
if c.DoCancel != nil {
return c.DoCancel(ctx, message)
}
return nil
return NilCancel
}

func (c DefaultHandle) Error(ctx context.Context, err error) {
Expand Down
1 change: 1 addition & 0 deletions env.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"on": true,
"mongo": {
"database": "beanq_logs",
"collection": "event_logs",
"username": "",
"password": "",
"host": "127.0.0.1",
Expand Down
18 changes: 16 additions & 2 deletions examples/sequential/consumer/env.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"debugLog": {
"on": true,
"path": ""
"path": "./app.log"
},
"redis": {
"host": "localhost",
Expand Down Expand Up @@ -30,5 +30,19 @@
"keepSuccessJobsInHistory": "3600s",
"minConsumers": 10,
"publishTimeOut":"10s",
"consumeTimeOut": "10s"
"consumeTimeOut": "10s",
"history": {
"on": true,
"mongo": {
"database": "lollipop_logs",
"collection": "event_logs",
"username": "lollipop_logs",
"password": "secret",
"host": "127.0.0.1",
"port": ":27017",
"connectTimeout": "10s",
"maxConnectionPoolSize": 200,
"maxConnectionLifeTime": "600s"
}
}
}
24 changes: 12 additions & 12 deletions examples/sequential/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"fmt"
"log"
_ "net/http/pprof"
"path/filepath"
Expand Down Expand Up @@ -107,11 +106,12 @@ func main() {
// return nil
// })
//
// err := wf.WithRollbackResultHandler(func(taskID string, err error) {
// err := wf.WithRollbackResultHandler(func(taskID string, err error) error {
// if err == nil {
// return
// return nil
// }
// log.Printf("%s rollback error: %v\n", taskID, err)
// return nil
// }).Run()
// if err != nil {
// return err
Expand All @@ -124,13 +124,13 @@ func main() {

_, err := csm.BQ().WithContext(ctx).SubscribeSequential("delay-channel", "order-topic", beanq.DefaultHandle{
DoHandle: func(ctx context.Context, message *beanq.Message) error {
message.Response = fmt.Sprintf("test val,id=%+v", message.Id)
log.Println("default handler ", message.Id)
//message.Response = fmt.Sprintf("test val,id=%+v", message.Id)
//log.Println("default handler ", message.Id)
return nil
},
DoCancel: func(ctx context.Context, message *beanq.Message) error {
log.Println("default cancel ", message.Id)
return nil
return beanq.NilCancel
},
DoError: func(ctx context.Context, err error) {
log.Println("default error ", err)
Expand All @@ -139,12 +139,12 @@ func main() {
if err != nil {
logger.New().Error(err)
}
go func() {
for {
time.Sleep(3 * time.Second)
fmt.Println(runtime.NumGoroutine())
}
}()
//go func() {
// for {
// time.Sleep(3 * time.Second)
// fmt.Println(runtime.NumGoroutine())
// }
//}()
// _, err = csm.BQ().WithContext(ctx).SubscribeSequential("delay-channel", "order-topic", &seqCustomer{
// metadata: "I am a custom",
// })
Expand Down
63 changes: 32 additions & 31 deletions examples/sequential/publisher-dynamic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
"log"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -46,36 +47,36 @@ func initCnf() *beanq.BeanqConfig {
}
func main() {
//
// wait := sync.WaitGroup{}
// wait.Add(300)
// for i := 1; i <= 300; i++ {
// go func(ni int) {
// defer wait.Done()
//
// data := make(map[string]any)
// data["deviceUuid"] = "device_1"
// data["uuid"] = "test-" + cast.ToString(ni)
// data["amount"] = 700
// data["transactionType"] = 12
// data["cardId"] = "5732542140"
// data["retailerStoreId"] = 1
// data["retailerTerminalId"] = 111
// data["retailerCompanyId"] = 1
// now := time.Now()
// client := resty.New()
// resp, err := client.R().
// SetHeader("Content-Type", "application/json").
// SetBody(data).
// Post("http://127.0.0.1:8888/v1/prepaid/card/deposit")
// if err != nil {
// fmt.Printf("错误:%+v \n", err)
// return
// }
// fmt.Printf("返回值 :%+v,耗时:%+v \n", string(resp.Body()), time.Now().Sub(now))
// }(i)
// }
// wait.Wait()
// return
wait := sync.WaitGroup{}
wait.Add(300)
for i := 1; i <= 300; i++ {
go func(ni int) {
defer wait.Done()

data := make(map[string]any)
data["deviceUuid"] = "device_1"
data["uuid"] = "test-" + cast.ToString(ni)
data["amount"] = 700
data["transactionType"] = 12
data["cardId"] = "5732542140"
data["retailerStoreId"] = 1
data["retailerTerminalId"] = 111
data["retailerCompanyId"] = 1
now := time.Now()
client := resty.New()
resp, err := client.R().
SetHeader("Content-Type", "application/json").
SetBody(data).
Post("http://127.0.0.1:8888/v1/prepaid/card/deposit")
if err != nil {
fmt.Printf("错误:%+v \n", err)
return
}
fmt.Printf("返回值 :%+v,耗时:%+v \n", string(resp.Body()), time.Now().Sub(now))
}(i)
}
wait.Wait()
return

pub := beanq.New(initCnf())
for i := 0; i < 1000; i++ {
Expand All @@ -86,7 +87,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

r, err := pub.BQ().WithContext(ctx).SetId(cast.ToString(i)).PublishInSequential("default-delay-channel", "mynewstream", b).WaitingAck(ctx, cast.ToString(i))
r, err := pub.BQ().WithContext(ctx).SetId(cast.ToString(i)).PublishInSequential("default-delay-channel", "mynewstream", b).WaitingAck()
if err != nil {
logger.New().Error(err)
}
Expand Down
17 changes: 10 additions & 7 deletions examples/sequential/publisher-with-ack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,25 @@ func main() {
pub := beanq.New(config)
wait := sync.WaitGroup{}

for i := 0; i < 5; i++ {
for i := 0; i < 300; i++ {
wait.Add(1)
go func() {
go func(i1 int) {
defer wait.Done()
id := cast.ToString(i1)

m := make(map[string]any)
m["delayMsg"] = "new msg" + cast.ToString(i)
m["delayMsg"] = "new msg" + id

b, _ := json.Marshal(m)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
result, err := pub.BQ().WithContext(ctx).SetId(cast.ToString(i)).PublishInSequential("delay-channel", "order-topic", b).WaitingAck()
result, err := pub.BQ().WithContext(ctx).SetId(id).PublishInSequential("delay-channel", "order-topic", b).WaitingAck()
if err != nil {
logger.New().Error(err)
logger.New().Error(err, m)
} else {
log.Printf("%+v \n", result)
log.Printf("ID:%+v \n", result.Id)
}
}()
}(i)

}
wait.Wait()
Expand Down
Loading