-
Notifications
You must be signed in to change notification settings - Fork 0
/
linker.go
277 lines (245 loc) · 8.14 KB
/
linker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package dndm
import (
"context"
"log/slog"
"reflect"
"sync"
"github.com/itohio/dndm/errors"
)
// IntentWrapperFunc is a type of function intended to wrap or modify an IntentInternal object.
// It accepts an IntentInternal as input and returns a possibly modified IntentInternal and an error.
// The primary use case for this function is to provide a mechanism to alter or augment the behavior
// of an Intent object at runtime, such as adding logging, validation, or other cross-cutting concerns.
//
// Parameters:
//
// intent - The IntentInternal to wrap or modify.
//
// Returns:
//
// IntentInternal - The wrapped or modified IntentInternal.
// error - An error if something goes wrong during the wrapping/modification process.
type IntentWrapperFunc func(IntentInternal) (IntentInternal, error)
// InterestWrapperFunc is a type of function designed to wrap or modify an InterestInternal object.
// Similar to IntentWrapperFunc, it takes an InterestInternal as input and returns a potentially
// modified InterestInternal and an error. This function type facilitates dynamic alterations to
// the behavior of Interest objects, enabling enhancements such as security checks, data enrichment,
// or custom event handling to be injected transparently.
//
// Parameters:
//
// interest - The InterestInternal to wrap or modify.
//
// Returns:
//
// InterestInternal - The wrapped or modified InterestInternal.
// error - An error if there is a failure in the wrapping/modification process.
type InterestWrapperFunc func(InterestInternal) (InterestInternal, error)
// Link represents a dynamic connection between an Intent and an Interest.
// It manages the lifecycle and interactions between linked entities, ensuring
// that actions on one entity are reflected on the other. For example, closing
// an Intent should also close the linked Interest.
type Linker struct {
Base
log *slog.Logger
size int
mu sync.Mutex
intents map[string]IntentInternal
interests map[string]InterestInternal
onAddIntent func(intent Intent) error
onAddInterest func(interest Interest) error
beforeLink func(Intent, Interest) error
links map[string]*Link
}
// NewLinker creates a new Linker with provided context, logger, size, and callback functions.
// It initializes the Linker with empty maps for intents and interests and sets up a beforeLink function
// if not provided.
func NewLinker(ctx context.Context, log *slog.Logger, size int, addIntent func(intent Intent) error, addInterest func(interest Interest) error, beforeLink func(Intent, Interest) error) *Linker {
if beforeLink == nil {
beforeLink = func(i1 Intent, i2 Interest) error {
return nil
}
}
return &Linker{
Base: NewBaseWithCtx(ctx),
log: log,
size: size,
intents: make(map[string]IntentInternal),
interests: make(map[string]InterestInternal),
links: make(map[string]*Link),
onAddIntent: addIntent,
onAddInterest: addInterest,
beforeLink: beforeLink,
}
}
// Close shuts down the Linker and cleans up all resources associated with it.
// It iterates through all intents and interests, closes them, and finally clears the collections.
func (t *Linker) Close() error {
errarr := make([]error, 0, len(t.links))
t.AddOnClose(func() {
for _, i := range t.intents {
err := i.Close()
if err != nil {
errarr = append(errarr, err)
}
}
for _, i := range t.interests {
err := i.Close()
if err != nil {
errarr = append(errarr, err)
}
}
t.intents = nil
t.interests = nil
})
t.Base.Close()
return errors.Join(errarr...)
}
// Intent returns an intent identified by a route if found.
func (t *Linker) Intent(route Route) (Intent, bool) {
t.mu.Lock()
intent, ok := t.intents[route.ID()]
t.mu.Unlock()
return intent, ok
}
// AddIntent registers a new intent by its route. If a matching intent is found, it attempts to link it
// with a corresponding interest if available.
func (t *Linker) AddIntent(route Route) (Intent, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.addIntentLocked(route, func(ii IntentInternal) (IntentInternal, error) { return ii, nil })
}
// AddIntentWithWrapper acts like AddIntent but allows the intent to be modified or wrapped by a provided function
// before being added to the Linker.
func (t *Linker) AddIntentWithWrapper(route Route, wrapper IntentWrapperFunc) (Intent, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.addIntentLocked(route, wrapper)
}
func (t *Linker) addIntentLocked(route Route, wrapper IntentWrapperFunc) (Intent, error) {
id := route.ID()
intent, ok := t.intents[id]
if ok {
return intent, nil
}
intent = NewIntent(t.ctx, route, t.size)
intent.OnClose(func() {
t.RemoveIntent(route)
})
intent, err := wrapper(intent)
if err != nil {
return nil, err
}
if err := t.onAddIntent(intent); err != nil {
return nil, err
}
t.intents[id] = intent
if interest, ok := t.interests[id]; ok {
t.link(route, intent, interest)
}
return intent, nil
}
// RemoveIntent removes an intent by its route and cleans up any associated links.
func (t *Linker) RemoveIntent(route Route) error {
t.mu.Lock()
t.unlink(route)
delete(t.intents, route.ID())
t.mu.Unlock()
return nil
}
// Interest retrieves an interest by its route if it exists within the Linker.
func (t *Linker) Interest(route Route) (Interest, bool) {
t.mu.Lock()
interest, ok := t.interests[route.ID()]
t.mu.Unlock()
return interest, ok
}
// AddInterest registers a new interest by its route. If a matching interest is found, it attempts to link it
// with a corresponding intent if available.
func (t *Linker) AddInterest(route Route) (Interest, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.addInterestLocked(route, func(ii InterestInternal) (InterestInternal, error) { return ii, nil })
}
// AddInterestWithWrapper acts like AddInterest but allows the interest to be modified or wrapped by a provided function
// before being added to the Linker.
func (t *Linker) AddInterestWithWrapper(route Route, wrapper InterestWrapperFunc) (Interest, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.addInterestLocked(route, wrapper)
}
func (t *Linker) addInterestLocked(route Route, wrapper InterestWrapperFunc) (Interest, error) {
id := route.ID()
interest, ok := t.interests[id]
if ok {
if link, ok := t.links[id]; ok {
link.Notify()
}
return interest, nil
}
interest = NewInterest(t.Ctx(), route, t.size)
interest.OnClose(func() {
t.RemoveInterest(route)
})
interest, err := wrapper(interest)
if err != nil {
return nil, err
}
if err := t.onAddInterest(interest); err != nil {
return nil, err
}
t.interests[id] = interest
if intent, ok := t.intents[id]; ok {
t.link(route, intent, interest)
}
return interest, nil
}
// RemoveInterest removes an interest by its route and cleans up any associated links.
func (t *Linker) RemoveInterest(route Route) error {
t.mu.Lock()
_, ok := t.interests[route.ID()]
var link *Link
if ok {
link = t.unlink(route)
delete(t.interests, route.ID())
}
t.mu.Unlock()
if link != nil {
link.Close()
}
return nil
}
// link establishes a link between an intent and an interest if they share the same route and no existing link is found.
// It logs the linking process and handles errors in linking, including invalid routes.
func (t *Linker) link(route Route, intent IntentInternal, interest InterestInternal) error {
if !route.Equal(intent.Route()) || !route.Equal(interest.Route()) {
return errors.ErrInvalidRoute
}
if _, ok := t.links[route.ID()]; ok {
return nil
}
err := t.beforeLink(intent, interest)
if err != nil {
return err
}
link := NewLink(t.ctx, intent, interest)
link.AddOnClose(func() {
t.mu.Lock()
_ = t.unlink(route)
t.mu.Unlock()
})
t.links[route.ID()] = link
link.Link()
t.log.Info("linked", "route", intent.Route(), "intent", reflect.TypeOf(intent), "interest", reflect.TypeOf(interest), "C", interest.C())
return nil
}
// unlink removes a link associated with a given route, logs the unlinking process, and returns the removed link.
func (t *Linker) unlink(route Route) *Link {
link, ok := t.links[route.ID()]
if !ok {
return nil
}
delete(t.links, route.ID())
t.log.Info("unlinked", "route", route)
return link
}