-
Notifications
You must be signed in to change notification settings - Fork 2
/
facilitators.go
292 lines (243 loc) · 11.3 KB
/
facilitators.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package restate
import (
"time"
"github.com/restatedev/sdk-go/internal/futures"
"github.com/restatedev/sdk-go/internal/options"
"github.com/restatedev/sdk-go/internal/rand"
"github.com/restatedev/sdk-go/internal/state"
)
// Rand returns a random source which will give deterministic results for a given invocation
// The source wraps the stdlib rand.Rand but with some extra helper methods
// This source is not safe for use inside .Run()
func Rand(ctx Context) *rand.Rand {
return ctx.inner().Rand()
}
// Sleep for the duration d. Can return a terminal error in the case where the invocation was cancelled mid-sleep.
func Sleep(ctx Context, d time.Duration) error {
return ctx.inner().Sleep(d)
}
// After is an alternative to [Sleep] which allows you to complete other tasks concurrently
// with the sleep. This is particularly useful when combined with [Select] to race between
// the sleep and other Selectable operations.
func After(ctx Context, d time.Duration) AfterFuture {
return ctx.inner().After(d)
}
// After is a handle on a Sleep operation which allows you to do other work concurrently
// with the sleep.
type AfterFuture interface {
// Done blocks waiting on the remaining duration of the sleep.
// It is *not* safe to call this in a goroutine - use Context.Select if you want to wait on multiple
// results at once. Can return a terminal error in the case where the invocation was cancelled mid-sleep,
// hence Done() should always be called, even after using Context.Select.
Done() error
futures.Selectable
}
// Service gets a Service request client by service and method name
func Service[O any](ctx Context, service string, method string, options ...options.ClientOption) Client[any, O] {
return outputClient[O]{ctx.inner().Service(service, method, options...)}
}
// Service gets a Service send client by service and method name
func ServiceSend(ctx Context, service string, method string, options ...options.ClientOption) SendClient[any] {
return ctx.inner().Service(service, method, options...)
}
// Object gets an Object request client by service name, key and method name
func Object[O any](ctx Context, service string, key string, method string, options ...options.ClientOption) Client[any, O] {
return outputClient[O]{ctx.inner().Object(service, key, method, options...)}
}
// ObjectSend gets an Object send client by service name, key and method name
func ObjectSend(ctx Context, service string, key string, method string, options ...options.ClientOption) SendClient[any] {
return ctx.inner().Object(service, key, method, options...)
}
// Workflow gets a Workflow request client by service name, workflow ID and method name
func Workflow[O any](ctx Context, service string, workflowID string, method string, options ...options.ClientOption) Client[any, O] {
return outputClient[O]{ctx.inner().Workflow(service, workflowID, method, options...)}
}
// WorkflowSend gets a Workflow send client by service name, workflow ID and method name
func WorkflowSend[O any](ctx Context, service string, workflowID string, method string, options ...options.ClientOption) SendClient[any] {
return ctx.inner().Workflow(service, workflowID, method, options...)
}
// Client represents all the different ways you can invoke a particular service-method.
type Client[I any, O any] interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input I, options ...options.RequestOption) ResponseFuture[O]
// Request makes a call and blocks on getting the response
Request(input I, options ...options.RequestOption) (O, error)
SendClient[I]
}
// SendClient allows making one-way invocations
type SendClient[I any] interface {
// Send makes a one-way call which is executed in the background
Send(input I, options ...options.SendOption)
}
type outputClient[O any] struct {
inner *state.Client
}
func (t outputClient[O]) Request(input any, options ...options.RequestOption) (output O, err error) {
err = t.inner.RequestFuture(input, options...).Response(&output)
return
}
func (t outputClient[O]) RequestFuture(input any, options ...options.RequestOption) ResponseFuture[O] {
return responseFuture[O]{t.inner.RequestFuture(input, options...)}
}
func (t outputClient[O]) Send(input any, options ...options.SendOption) {
t.inner.Send(input, options...)
}
type client[I any, O any] struct {
inner Client[any, O]
}
// WithRequestType is primarily intended to be called from generated code, to provide
// type safety of input types. In other contexts it's generally less cumbersome to use [Object] and [Service],
// as the output type can be inferred.
func WithRequestType[I any, O any](inner Client[any, O]) Client[I, O] {
return client[I, O]{inner}
}
func (t client[I, O]) Request(input I, options ...options.RequestOption) (output O, err error) {
output, err = t.inner.RequestFuture(input, options...).Response()
return
}
func (t client[I, O]) RequestFuture(input I, options ...options.RequestOption) ResponseFuture[O] {
return t.inner.RequestFuture(input, options...)
}
func (t client[I, O]) Send(input I, options ...options.SendOption) {
t.inner.Send(input, options...)
}
// ResponseFuture is a handle on a potentially not-yet completed outbound call.
type ResponseFuture[O any] interface {
// Response blocks on the response to the call and returns it or the associated error
// It is *not* safe to call this in a goroutine - use Context.Select if you
// want to wait on multiple results at once.
Response() (O, error)
futures.Selectable
}
type responseFuture[O any] struct {
state.DecodingResponseFuture
}
func (t responseFuture[O]) Response() (output O, err error) {
err = t.DecodingResponseFuture.Response(&output)
return
}
// Awakeable returns a Restate awakeable; a 'promise' to a future
// value or error, that can be resolved or rejected by other services.
func Awakeable[T any](ctx Context, options ...options.AwakeableOption) AwakeableFuture[T] {
return awakeable[T]{ctx.inner().Awakeable(options...)}
}
// AwakeableFuture is a 'promise' to a future value or error, that can be resolved or rejected by other services.
type AwakeableFuture[T any] interface {
// Id returns the awakeable ID, which can be stored or sent to a another service
Id() string
// Result blocks on receiving the result of the awakeable, returning the value it was
// resolved or otherwise returning the error it was rejected with.
// It is *not* safe to call this in a goroutine - use Context.Select if you
// want to wait on multiple results at once.
Result() (T, error)
futures.Selectable
}
type awakeable[T any] struct {
state.DecodingAwakeable
}
func (t awakeable[T]) Result() (output T, err error) {
err = t.DecodingAwakeable.Result(&output)
return
}
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// resolved with a particular value.
func ResolveAwakeable[T any](ctx Context, id string, value T, options ...options.ResolveAwakeableOption) {
ctx.inner().ResolveAwakeable(id, value, options...)
}
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// rejected with a particular error.
func RejectAwakeable(ctx Context, id string, reason error) {
ctx.inner().RejectAwakeable(id, reason)
}
func Select(ctx Context, futs ...futures.Selectable) Selector {
return ctx.inner().Select(futs...)
}
type Selectable = futures.Selectable
// Selector is an iterator over a list of blocking Restate operations that are running
// in the background.
type Selector interface {
// Remaining returns whether there are still operations that haven't been returned by Select().
// There will always be exactly the same number of results as there were operations
// given to Context.Select
Remaining() bool
// Select blocks on the next completed operation or returns nil if there are none left
Select() futures.Selectable
}
// Run runs the function (fn), storing final results (including terminal errors)
// durably in the journal, or otherwise for transient errors stopping execution
// so Restate can retry the invocation. Replays will produce the same value, so
// all non-deterministic operations (eg, generating a unique ID) *must* happen
// inside Run blocks.
func Run[T any](ctx Context, fn func(ctx RunContext) (T, error), options ...options.RunOption) (output T, err error) {
err = ctx.inner().Run(func(ctx state.RunContext) (any, error) {
return fn(ctx)
}, &output, options...)
return
}
// Get gets the value for a key. If there is no associated value with key, the zero value is returned.
// To check explicitly for this case pass a pointer eg *string as T.
// If the invocation was cancelled while obtaining the state (only possible if eager state is disabled),
// a cancellation error is returned.
func Get[T any](ctx ObjectSharedContext, key string, options ...options.GetOption) (output T, err error) {
_, err = ctx.inner().Get(key, &output, options...)
return output, err
}
// If the invocation was cancelled while obtaining the state (only possible if eager state is disabled),
// a cancellation error is returned.
func Keys(ctx ObjectSharedContext) ([]string, error) {
return ctx.inner().Keys()
}
// Key retrieves the key for this virtual object invocation. This is a no-op and is
// always safe to call.
func Key(ctx ObjectSharedContext) string {
return ctx.inner().Key()
}
// Set sets a value against a key, using the provided codec (defaults to JSON)
func Set[T any](ctx ObjectContext, key string, value T, options ...options.SetOption) {
ctx.inner().Set(key, value, options...)
}
// Clear deletes a key
func Clear(ctx ObjectContext, key string) {
ctx.inner().Clear(key)
}
// ClearAll drops all stored state associated with this Object key
func ClearAll(ctx ObjectContext) {
ctx.inner().ClearAll()
}
// Promise returns a named Restate durable Promise that can be resolved or rejected during the workflow execution.
// The promise is bound to the workflow and will be persisted across suspensions and retries.
func Promise[T any](ctx WorkflowSharedContext, name string, options ...options.PromiseOption) DurablePromise[T] {
return durablePromise[T]{ctx.inner().Promise(name, options...)}
}
type DurablePromise[T any] interface {
// Result blocks on receiving the result of the Promise, returning the value it was
// resolved or otherwise returning the error it was rejected with or a cancellation error.
// It is *not* safe to call this in a goroutine - use Context.Select if you
// want to wait on multiple results at once.
Result() (T, error)
// Peek returns the value of the promise if it has been resolved. If it has not been resolved,
// the zero value of T is returned. To check explicitly for this case pass a pointer eg *string as T.
// If the promise was rejected or the invocation was cancelled, an error is returned.
Peek() (T, error)
// Resolve resolves the promise with a value, returning an error if it was already completed
// or if the invocation was cancelled.
Resolve(value T) error
// Reject rejects the promise with an error, returning an error if it was already completed
// or if the invocation was cancelled.
Reject(reason error) error
futures.Selectable
}
type durablePromise[T any] struct {
state.DecodingPromise
}
func (t durablePromise[T]) Result() (output T, err error) {
err = t.DecodingPromise.Result(&output)
return
}
func (t durablePromise[T]) Peek() (output T, err error) {
_, err = t.DecodingPromise.Peek(&output)
return
}
func (t durablePromise[T]) Resolve(value T) (err error) {
return t.DecodingPromise.Resolve(value)
}