This repository has been archived by the owner on Dec 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathsubscriptionsource.go
147 lines (129 loc) · 4.71 KB
/
subscriptionsource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package subscriptionsource
import (
"context"
"github.com/keptn/go-utils/pkg/sdk/connector/types"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/keptn/go-utils/pkg/api/models"
api "github.com/keptn/go-utils/pkg/api/utils"
"github.com/keptn/go-utils/pkg/sdk/connector/logger"
)
type SubscriptionSource interface {
Start(context.Context, types.RegistrationData, chan []models.EventSubscription, chan error, *sync.WaitGroup) error
Register(integration models.Integration) (string, error)
Stop() error
}
var _ SubscriptionSource = FixedSubscriptionSource{}
var _ SubscriptionSource = (*UniformSubscriptionSource)(nil)
// UniformSubscriptionSource represents a source for uniform subscriptions
type UniformSubscriptionSource struct {
uniformAPI api.UniformV1Interface
clock clock.Clock
fetchInterval time.Duration
logger logger.Logger
quitC chan struct{}
}
func (s *UniformSubscriptionSource) Register(integration models.Integration) (string, error) {
integrationID, err := s.uniformAPI.RegisterIntegration(integration)
if err != nil {
return "", err
}
return integrationID, nil
}
// WithFetchInterval specifies the interval the subscription source should
// use when polling for new subscriptions
func WithFetchInterval(interval time.Duration) func(s *UniformSubscriptionSource) {
return func(s *UniformSubscriptionSource) {
s.fetchInterval = interval
}
}
// WithLogger sets the logger to use
func WithLogger(logger logger.Logger) func(s *UniformSubscriptionSource) {
return func(s *UniformSubscriptionSource) {
s.logger = logger
}
}
// New creates a new UniformSubscriptionSource
func New(uniformAPI api.UniformV1Interface, options ...func(source *UniformSubscriptionSource)) *UniformSubscriptionSource {
s := &UniformSubscriptionSource{
uniformAPI: uniformAPI,
clock: clock.New(),
fetchInterval: time.Second * 5,
quitC: make(chan struct{}, 1),
logger: logger.NewDefaultLogger()}
for _, o := range options {
o(s)
}
return s
}
// Start triggers the execution of the UniformSubscriptionSource
func (s *UniformSubscriptionSource) Start(ctx context.Context, registrationData types.RegistrationData, subscriptionChannel chan []models.EventSubscription, errC chan error, wg *sync.WaitGroup) error {
s.logger.Debugf("UniformSubscriptionSource: Starting to fetch subscriptions for Integration ID %s", registrationData.ID)
ticker := s.clock.Ticker(s.fetchInterval)
go func() {
s.ping(registrationData.ID, subscriptionChannel)
for {
select {
case <-ctx.Done():
wg.Done()
return
case <-ticker.C:
s.ping(registrationData.ID, subscriptionChannel)
case <-s.quitC:
wg.Done()
return
}
}
}()
return nil
}
func (s *UniformSubscriptionSource) Stop() error {
s.quitC <- struct{}{}
return nil
}
func (s *UniformSubscriptionSource) ping(registrationId string, subscriptionChannel chan []models.EventSubscription) {
s.logger.Debugf("UniformSubscriptionSource: Renewing Integration ID %s", registrationId)
updatedIntegrationData, err := s.uniformAPI.Ping(registrationId)
if err != nil {
s.logger.Errorf("Unable to ping control plane: %v", err)
return
}
s.logger.Debugf("UniformSubscriptionSource: Ping successful, got %d subscriptions for %s", len(updatedIntegrationData.Subscriptions), registrationId)
subscriptionChannel <- updatedIntegrationData.Subscriptions
}
// FixedSubscriptionSource can be used to use a fixed list of subscriptions rather than
// consulting the Keptn API for subscriptions.
// This is useful when you want to consume events from an event source, but NOT register
// as an Keptn integration to the control plane
type FixedSubscriptionSource struct {
fixedSubscriptions []models.EventSubscription
}
// WithFixedSubscriptions adds a fixed list of subscriptions to the FixedSubscriptionSource
func WithFixedSubscriptions(subscriptions ...models.EventSubscription) func(s *FixedSubscriptionSource) {
return func(s *FixedSubscriptionSource) {
s.fixedSubscriptions = subscriptions
}
}
// NewFixedSubscriptionSource creates a new instance of FixedSubscriptionSource
func NewFixedSubscriptionSource(options ...func(source *FixedSubscriptionSource)) *FixedSubscriptionSource {
fss := &FixedSubscriptionSource{fixedSubscriptions: []models.EventSubscription{}}
for _, o := range options {
o(fss)
}
return fss
}
func (s FixedSubscriptionSource) Start(ctx context.Context, data types.RegistrationData, c chan []models.EventSubscription, errC chan error, wg *sync.WaitGroup) error {
go func() {
c <- s.fixedSubscriptions
<-ctx.Done()
wg.Done()
}()
return nil
}
func (s FixedSubscriptionSource) Register(integration models.Integration) (string, error) {
return "", nil
}
func (s FixedSubscriptionSource) Stop() error {
return nil
}