-
Notifications
You must be signed in to change notification settings - Fork 10
/
zigg.go
90 lines (75 loc) · 1.81 KB
/
zigg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package ziggurat
import (
"context"
"errors"
"github.com/gojekfarm/ziggurat/v2/logger"
"sync"
"time"
)
var ErrCleanShutdown = errors.New("clean shutdown of streams")
// Ziggurat serves as a container for message consumers to run in
// can be used without initialization
// var z ziggurat.Ziggurat
// z.run(ctx context.Context,s ziggurat.MessageConsumer,h ziggurat.Handler)
type Ziggurat struct {
handler Handler
Logger StructuredLogger
ShutdownTimeout time.Duration
ErrorHandler func(err error)
}
func (z *Ziggurat) Run(ctx context.Context, handler Handler, consumers ...MessageConsumer) error {
z.mustInit(consumers, handler)
var wg sync.WaitGroup
wg.Add(len(consumers))
errChan := make(chan error)
for i := range consumers {
go func(i int) {
err := consumers[i].Consume(ctx, handler)
if err != nil {
errChan <- err
}
wg.Done()
}(i)
}
timeout := make(chan bool, 1)
go func() {
<-ctx.Done()
<-time.After(z.ShutdownTimeout)
z.Logger.Info("ziggurat consumer orchestration wait timeout")
timeout <- true
close(errChan)
}()
go func() {
wg.Wait()
close(errChan)
close(timeout)
}()
var allErrs []error
for consErr := range errChan {
if z.ErrorHandler != nil {
z.ErrorHandler(consErr)
}
allErrs = append(allErrs, consErr)
}
if <-timeout {
return errors.New("shutdown timeout")
}
if len(allErrs) > 0 {
return errors.Join(allErrs...)
}
return ErrCleanShutdown
}
func (z *Ziggurat) mustInit(consumers []MessageConsumer, handler Handler) {
if z.Logger == nil {
z.Logger = logger.NOOP
}
if z.ShutdownTimeout == 0 {
z.ShutdownTimeout = 6000 * time.Millisecond
}
if len(consumers) < 1 {
panic("error: at least one ziggurat.MessageConsumer implementation should be provided")
}
if handler == nil {
panic("error: handler cannot be nil")
}
}