Skip to content

Commit

Permalink
batch save log for mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
kongliang1 committed Aug 23, 2024
1 parent 5fe7f8f commit 2a96e33
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 213 deletions.
7 changes: 2 additions & 5 deletions async_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@ func newAsyncPool(poolSize int) *asyncPool {

func (a *asyncPool) Execute(ctx context.Context, fn func(c context.Context) error, durations ...time.Duration) {
var (
c context.Context
cancel context.CancelFunc
)
if len(durations) > 0 {
c, cancel = context.WithTimeout(context.TODO(), durations[0])
ctx, cancel = context.WithTimeout(ctx, durations[0])
defer cancel()
} else {
c = context.TODO()
}

err := a.pool.Submit(func() {
Expand All @@ -47,7 +44,7 @@ func (a *asyncPool) Execute(ctx context.Context, fn func(c context.Context) erro
}
}()

e := fn(c)
e := fn(ctx)
if e != nil {
a.captureException(ctx, e)
}
Expand Down
1 change: 0 additions & 1 deletion beanq.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (t *BeanqConfig) init() {
type IHandle interface {
Channel() string
Topic() string
Check(ctx context.Context) error
Process(ctx context.Context)
Schedule(ctx context.Context) error
DeadLetter(ctx context.Context) error
Expand Down
15 changes: 14 additions & 1 deletion examples/sequential/consumer/env.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,18 @@
"keepSuccessJobsInHistory": "3600s",
"minConsumers": 10,
"publishTimeOut":"10s",
"consumeTimeOut": "10s"
"consumeTimeOut": "10s",
"history": {
"on": true,
"mongo": {
"database": "lollipop_logs",
"username": "lollipop_logs",
"password": "secret",
"host": "127.0.0.1",
"port": ":27017",
"connectTimeout": "10s",
"maxConnectionPoolSize": 200,
"maxConnectionLifeTime": "600s"
}
}
}
2 changes: 1 addition & 1 deletion examples/sequential/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func main() {
},
DoCancel: func(ctx context.Context, message *beanq.Message) error {
log.Println("default cancel ", message.Id)
return nil
return beanq.NilCancel
},
DoError: func(ctx context.Context, err error) {
log.Println("default error ", err)
Expand Down
6 changes: 3 additions & 3 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type ILog interface {
// Archive log
Archive(ctx context.Context, result *Message) error
// Obsolete ,if log has expired ,then delete it
Obsolete(ctx context.Context)
Obsolete(ctx context.Context, data []map[string]any) error
}

type Log struct {
Expand All @@ -101,12 +101,12 @@ func (t *Log) Archives(ctx context.Context, result Message) error {
return nil
}

func (t *Log) Obsoletes(ctx context.Context) error {
func (t *Log) Obsoletes(ctx context.Context, datas []map[string]any) error {

for _, log := range t.logs {
nlog := log
go func() {
nlog.Obsolete(ctx)
nlog.Obsolete(ctx, datas)
}()
}
return nil
Expand Down
24 changes: 10 additions & 14 deletions mongo_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package beanq

import (
"context"
"fmt"
"strings"
"time"

"github.com/retail-ai-inc/beanq/helper/logger"
"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -57,23 +57,19 @@ func NewMongoLog(ctx context.Context, config *BeanqConfig) *MongoLog {

// Archive save log
func (t *MongoLog) Archive(ctx context.Context, result *Message) error {
data := bson.M{
"sid": result.Id,
"status": result.Status,
"level": result.Level,
"type": result.Info,
"data": result,
"createdAt": time.Now(),
"updatedAt": time.Now(),
}
if _, err := t.database.Collection(MongoCollection).InsertOne(ctx, data); err != nil {
return err
}
return nil
}

// Obsolete log
// If you don't want to implement an elimination strategy, you can skip implementing the method
func (t *MongoLog) Obsolete(ctx context.Context) {
func (t *MongoLog) Obsolete(ctx context.Context, data []map[string]any) error {

datas := make(bson.A, 0, len(data))
for _, v := range data {
datas = append(datas, bson.M(v))
}
if _, err := t.database.Collection(MongoCollection).InsertMany(ctx, datas); err != nil {
return fmt.Errorf("Mongo Error:%w \n", err)
}
return nil
}
154 changes: 102 additions & 52 deletions redis_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/retail-ai-inc/beanq/helper/logger"
"github.com/retail-ai-inc/beanq/helper/redisx"
"github.com/retail-ai-inc/beanq/helper/timex"
"github.com/rs/xid"
"github.com/spf13/cast"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -108,7 +109,6 @@ func newRedisBroker(config *BeanqConfig) IBroker {
successKey: MakeLogKey(config.Redis.Prefix, "success"),
}
var logs []ILog
logs = append(logs, broker)
if config.History.On {
mongoLog := NewMongoLog(ctx, config)
logs = append(logs, mongoLog)
Expand Down Expand Up @@ -159,66 +159,120 @@ func (t *RedisBroker) monitorStream(ctx context.Context, channel, id string) (*M
}

// Archive log

func (t *RedisBroker) Archive(ctx context.Context, result *Message) error {
// update status
// keep 6 hours for cache
key := MakeStatusKey(t.prefix, result.Channel, result.Id)

val := map[string]any{
"id": result.Id,
"status": result.Status,
"level": result.Level,
"info": result.Info,
"payload": result.Payload,
"pendingRetry": result.PendingRetry,
"retry": result.Retry,
"priority": result.Priority,
"addTime": result.AddTime,
"runTime": result.RunTime,
"beginTime": result.BeginTime,
"endTime": result.EndTime,
"executeTime": result.ExecuteTime,
"topic": result.Topic,
"channel": result.Channel,
"consumer": result.Consumer,
"moodType": result.MoodType,
"response": result.Response,
}

if err := t.client.Watch(ctx, func(tx *redis.Tx) error {
_, err := t.client.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error {
pipeliner.HMSet(ctx, key, map[string]any{
"id": result.Id,
"status": result.Status,
"level": result.Level,
"info": result.Info,
"payload": result.Payload,
"pendingRetry": result.PendingRetry,
"retry": result.Retry,
"priority": result.Priority,
"addTime": result.AddTime,
"runTime": result.RunTime,
"beginTime": result.BeginTime,
"endTime": result.EndTime,
"executeTime": result.ExecuteTime,
"topic": result.Topic,
"channel": result.Channel,
"consumer": result.Consumer,
"moodType": result.MoodType,
"response": result.Response,
})
pipeliner.HMSet(ctx, key, val)
pipeliner.Expire(ctx, key, 6*time.Hour)
return nil
})
return err
}, key); err != nil {
return err
}
// log for mongo to batch saving
logStream := strings.Join([]string{t.prefix, "logs"}, ":")
if err := t.client.XAdd(ctx, &redis.XAddArgs{
Stream: logStream,
NoMkStream: false,
MaxLen: 20000,
Approx: false,
ID: "*",
Values: val,
}).Err(); err != nil {
return err
}

return nil
}

// Obsolete log
func (t *RedisBroker) Obsolete(ctx context.Context) {
func (t *RedisBroker) Obsolete(ctx context.Context, data []map[string]any) error {

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
timer := timex.TimerPool.Get(5 * time.Second)
defer timex.TimerPool.Put(timer)

key := strings.Join([]string{t.prefix, "logs"}, ":")
logGroup := "log-group"
for {
// check state
select {
case <-ctx.Done():
logger.New().Info("Redis Obsolete Stop")
return
case <-ticker.C:
return nil
case <-timer.C:
}
timer.Reset(5 * time.Second)
result, err := t.client.XReadGroup(ctx, redisx.NewReadGroupArgs(logGroup, key, []string{key, ">"}, 200, 0)).Result()
if err != nil {
if strings.Contains(err.Error(), "NOGROUP No such") {
if err := t.client.XGroupCreateMkStream(ctx, key, logGroup, "0").Err(); err != nil {
t.captureException(ctx, err)
return nil
}
continue
}
if errors.Is(err, context.Canceled) {
logger.New().Info("Redis Obsolete Stop")
return nil
}
if !errors.Is(err, redis.Nil) && !errors.Is(err, redis.ErrClosed) {
t.captureException(ctx, err)
}
continue
}
if len(result) <= 0 {
continue
}
messages := result[0].Messages
datas := make([]map[string]any, 0, len(messages))
ids := make([]string, 0, len(messages))

for _, v := range messages {
if v.ID != "" {
ids = append(ids, v.ID)
datas = append(datas, v.Values)
}
}

// delete fail logs
t.asyncPool.Execute(ctx, func(ctx context.Context) error {
return t.client.ZRemRangeByScore(ctx, t.failKey, "0", cast.ToString(time.Now().UnixMilli())).Err()
})

// delete success logs
t.asyncPool.Execute(ctx, func(ctx context.Context) error {
return t.client.ZRemRangeByScore(ctx, t.successKey, "0", cast.ToString(time.Now().UnixMilli())).Err()
})
if err := t.logJob.Obsoletes(ctx, datas); err != nil {
t.captureException(ctx, err)
continue
}
if _, err := t.client.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error {
pipeliner.XAck(ctx, key, logGroup, ids...)
pipeliner.XDel(ctx, key, ids...)
return nil
}); err != nil {
t.captureException(ctx, err)
}

}
}

Expand Down Expand Up @@ -286,7 +340,7 @@ func (t *RedisBroker) Delete(ctx context.Context, key string) error {

func (t *RedisBroker) checkStatus(ctx context.Context, channel, id string) (*Message, error) {

key := strings.Join([]string{t.prefix, "status", id}, ":")
key := MakeStatusKey(t.prefix, channel, id)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -318,9 +372,12 @@ func (t *RedisBroker) enqueue(ctx context.Context, msg *Message, dynamicOn bool)

// record status, after idempotency check, before publish
msg.Status = StatusPrepare
if err := t.logJob.Archives(ctx, *msg); err != nil {
return err
if err := t.Archive(ctx, msg); err != nil {

}
// if err := t.logJob.Archives(ctx, *msg); err != nil {
// return err
// }

_, err := t.client.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error {
message := msg.ToMap()
Expand Down Expand Up @@ -348,8 +405,8 @@ func (t *RedisBroker) enqueue(ctx context.Context, msg *Message, dynamicOn bool)

// publish success
msg.Status = StatusPublished
if err := t.logJob.Archives(ctx, *msg); err != nil {
return err
if err := t.Archive(ctx, msg); err != nil {

}

return nil
Expand Down Expand Up @@ -454,10 +511,10 @@ func (t *RedisBroker) dynamicConsuming(subType subscribeType, channel string, su
// monitor signal
t.waitSignal(cancel)
t.once.Do(func() {
err := t.logJob.Obsoletes(ctx)
if err != nil {
t.captureException(ctx, err)
}
// err := t.logJob.Obsoletes(ctx)
// if err != nil {
// t.captureException(ctx, err)
// }

t.asyncPool.Execute(ctx, func(ctx context.Context) error {
return t.filter.Delete(ctx, MakeFilter(t.prefix))
Expand Down Expand Up @@ -559,11 +616,7 @@ func (t *RedisBroker) startConsuming(ctx context.Context) {
}

t.asyncPool.Execute(ctx, func(ctx context.Context) error {
return t.logJob.Obsoletes(ctx)
})

t.asyncPool.Execute(ctx, func(ctx context.Context) error {
return t.filter.Delete(ctx, MakeFilter(t.prefix))
return t.Obsolete(ctx, nil)
})

logger.New().Info("Beanq Start")
Expand All @@ -572,9 +625,6 @@ func (t *RedisBroker) startConsuming(ctx context.Context) {
}

func (t *RedisBroker) worker(ctx context.Context, handle IHandle) error {
if err := handle.Check(ctx); err != nil {
return err
}
t.asyncPool.Execute(ctx, func(ctx context.Context) error {
handle.Process(ctx)
return nil
Expand Down
Loading

0 comments on commit 2a96e33

Please sign in to comment.