-
Notifications
You must be signed in to change notification settings - Fork 0
/
watcher.go
92 lines (85 loc) · 2.2 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package jq
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
type Status struct {
IsRunning bool
Process int
Success int
Failed int
Dropped int
Total int
}
// Status of the queue
func (q *Queue) Status() (*Status, error) {
status := new(Status)
res, err := q.rdb.HGetAll(context.Background(), q.name+":count").Result()
if errors.Is(err, redis.Nil) {
return status, nil
} else if err != nil {
return nil, err
}
if res["process"] != "" {
status.Process, err = strconv.Atoi(res["process"])
if err != nil {
return nil, fmt.Errorf("invalid count %s", res["process"])
}
}
if res["success"] != "" {
status.Success, err = strconv.Atoi(res["success"])
if err != nil {
return nil, fmt.Errorf("invalid count %s", res["success"])
}
}
if res["failed"] != "" {
status.Failed, err = strconv.Atoi(res["failed"])
if err != nil {
return nil, fmt.Errorf("invalid count %s", res["failed"])
}
}
if res["dropped"] != "" {
status.Dropped, err = strconv.Atoi(res["dropped"])
if err != nil {
return nil, fmt.Errorf("invalid count %s", res["dropped"])
}
}
status.Total = status.Success + status.Dropped
status.IsRunning = true
return status, nil
}
// count process,success,failed,dropped jobs
// the value will be cleared when idle time reached opt.Idle
func (q *Queue) count(field string) {
ctx := context.Background()
pipe := q.rdb.TxPipeline()
pipe.HIncrBy(ctx, q.name+":count", field, 1)
pipe.Set(ctx, q.name+":active", time.Now(), 0)
_, err := pipe.Exec(ctx)
if err != nil {
q.log.Errorf("job queue %s count %s failed: %s", q.name, err)
}
}
// reset the counter
func (q *Queue) reset() {
err := q.rdb.Del(context.Background(), q.name+":count").Err()
if err != nil {
q.log.Errorf("queue %s reset counter failed:%s", q.name, err)
}
err = q.rdb.Del(context.Background(), q.name+":active").Err()
if err != nil {
q.log.Errorf("queue %s reset active failed:%s", q.name, err)
}
}
func (q *Queue) activeAt() time.Time {
var res = time.Now()
err := q.rdb.Get(context.Background(), q.name+":active").Scan(&res)
if err != nil && !errors.Is(err, redis.Nil) {
q.log.Errorf("queue %s get active time failed:%s", q.name, err)
}
return res
}