-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathriverinternaltest.go
435 lines (366 loc) · 12.8 KB
/
riverinternaltest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
// Package riverinternaltest contains shared testing utilities for tests
// throughout the rest of the project.
package riverinternaltest
import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"net/url"
"os"
"runtime"
"sync"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/riverinternaltest/slogtest" //nolint:depguard
"github.com/riverqueue/river/internal/testdb"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/valutil"
)
// SchedulerShortInterval is an artificially short interval for the scheduler
// that's used in the tests of various components to make sure that errored jobs
// always end up in a `retryable` state rather than `available`. Normally, the
// job executor sets `available` if the retry delay is smaller than the
// scheduler's interval. To simplify things so errors are always `retryable`,
// this time is picked to be smaller than the any retry delay that the default
// retry policy will ever produce. It's shared so we can document/explain it all
// in one place.
const SchedulerShortInterval = 500 * time.Millisecond
var (
dbManager *testdb.Manager //nolint:gochecknoglobals
// Maximum number of connections for the connection pool. This is the same
// default that pgxpool uses (the larger of 4 or number of CPUs), but made a
// variable here so that we can reference it from the test suite and not
// rely on implicit knowledge of pgxpool implementation details that could
// change in the future. If changing this value, also change the number of
// databases to create in `testdbman`.
dbPoolMaxConns = int32(max(4, runtime.NumCPU())) //nolint:gochecknoglobals
// Shared rand instance for archetypes. Random number generation is rare
// enough that it's not likely to produce much contention.
rand = randutil.NewCryptoSeededConcurrentSafeRand() //nolint:gochecknoglobals
)
// BaseServiceArchetype returns a new base service suitable for use in tests.
// Returns a new instance so that it's not possible to accidentally taint a
// shared object.
func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {
tb.Helper()
return &baseservice.Archetype{
Logger: Logger(tb),
Rand: rand,
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}
}
func DatabaseConfig(databaseName string) *pgxpool.Config {
config, err := pgxpool.ParseConfig(DatabaseURL(databaseName))
if err != nil {
panic(fmt.Sprintf("error parsing database URL: %v", err))
}
config.MaxConns = dbPoolMaxConns
// Use a short conn timeout here to attempt to quickly cancel attempts that
// are unlikely to succeed even with more time:
config.ConnConfig.ConnectTimeout = 2 * time.Second
config.ConnConfig.RuntimeParams["timezone"] = "UTC"
return config
}
// DatabaseURL gets a test database URL from TEST_DATABASE_URL or falls back on
// a default pointing to `river_testdb`. If databaseName is set, it replaces the
// database in the URL, although the host and other parameters are preserved.
//
// Most of the time DatabaseConfig should be used instead of this function, but
// it may be useful in non-pgx situations like for examples showing the use of
// `database/sql`.
func DatabaseURL(databaseName string) string {
parsedURL, err := url.Parse(valutil.ValOrDefault(
os.Getenv("TEST_DATABASE_URL"),
"postgres://localhost/river_testdb?sslmode=disable"),
)
if err != nil {
panic(err)
}
if databaseName != "" {
parsedURL.Path = databaseName
}
return parsedURL.String()
}
// DiscardContinuously drains continuously out of the given channel and discards
// anything that comes out of it. Returns a stop function that should be invoked
// to stop draining. Stop must be invoked before tests finish to stop an
// internal goroutine.
func DiscardContinuously[T any](drainChan <-chan T) func() {
var (
stop = make(chan struct{})
stopped = make(chan struct{})
stopOnce sync.Once
)
go func() {
defer close(stopped)
for {
select {
case <-drainChan:
case <-stop:
return
}
}
}()
return func() {
stopOnce.Do(func() {
close(stop)
<-stopped
})
}
}
// DrainContinuously drains continuously out of the given channel and
// accumulates items that are received from it. Returns a get function that can
// be called to retrieve the current set of received items, and which will also
// cause the function to shut down and stop draining. This function must be
// invoked before tests finish to stop an internal goroutine. It's safe to call
// it multiple times.
func DrainContinuously[T any](drainChan <-chan T) func() []T {
var (
items []T
stop = make(chan struct{})
stopped = make(chan struct{})
stopOnce sync.Once
)
go func() {
defer close(stopped)
for {
select {
case item := <-drainChan:
items = append(items, item)
case <-stop:
// Drain until empty
for {
select {
case item := <-drainChan:
items = append(items, item)
default:
return
}
}
}
}
}()
return func() []T {
stopOnce.Do(func() {
close(stop)
<-stopped
})
return items
}
}
// Logger returns a logger suitable for use in tests.
//
// Defaults to informational verbosity. If env is set with `RIVER_DEBUG=true`,
// debug level verbosity is activated.
func Logger(tb testing.TB) *slog.Logger {
tb.Helper()
if os.Getenv("RIVER_DEBUG") == "1" || os.Getenv("RIVER_DEBUG") == "true" {
return slogtest.NewLogger(tb, &slog.HandlerOptions{Level: slog.LevelDebug})
}
return slogtest.NewLogger(tb, nil)
}
// Logger returns a logger suitable for use in tests which outputs only at warn
// or above. Useful in tests where particularly noisy log output is expected.
func LoggerWarn(tb testing.TB) *slog.Logger {
tb.Helper()
return slogtest.NewLogger(tb, &slog.HandlerOptions{Level: slog.LevelWarn})
}
// TestDB acquires a dedicated test database for the duration of the test. If an
// error occurs, the test fails. The test database will be automatically
// returned to the pool at the end of the test and the pgxpool will be closed.
func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool {
tb.Helper()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
testPool, err := dbManager.Acquire(ctx)
if err != nil {
tb.Fatalf("Failed to acquire pool for test DB: %v", err)
}
tb.Cleanup(testPool.Release)
// Also close the pool just to ensure nothing is still active on it:
tb.Cleanup(testPool.Pool().Close)
return testPool.Pool()
}
// StubTime returns a pair of function for (getTime, setTime), the former of
// which is compatible with `TimeNowUTC` found in the service archetype.
// It's concurrent safe is so that a started service can access its stub
// time while the test case is setting it, and without the race detector
// triggering.
func StubTime(initialTime time.Time) (func() time.Time, func(t time.Time)) {
var (
mu sync.RWMutex
stubbedTime = &initialTime
)
getTime := func() time.Time {
mu.RLock()
defer mu.RUnlock()
return *stubbedTime
}
setTime := func(newTime time.Time) {
mu.Lock()
defer mu.Unlock()
stubbedTime = &newTime
}
return getTime, setTime
}
// A pool and mutex to protect it, lazily initialized by TestTx. Once open, this
// pool is never explicitly closed, instead closing implicitly as the package
// tests finish.
var (
dbPool *pgxpool.Pool //nolint:gochecknoglobals
dbPoolMu sync.RWMutex //nolint:gochecknoglobals
)
// TestTx starts a test transaction that's rolled back automatically as the test
// case is cleaning itself up. This can be used as a lighter weight alternative
// to `testdb.Manager` in components where it's not necessary to have many
// connections open simultaneously.
func TestTx(ctx context.Context, tb testing.TB) pgx.Tx {
tb.Helper()
tryPool := func() *pgxpool.Pool {
dbPoolMu.RLock()
defer dbPoolMu.RUnlock()
return dbPool
}
getPool := func() *pgxpool.Pool {
if dbPool := tryPool(); dbPool != nil {
return dbPool
}
dbPoolMu.Lock()
defer dbPoolMu.Unlock()
// Multiple goroutines may have passed the initial `nil` check on start
// up, so check once more to make sure pool hasn't been set yet.
if dbPool != nil {
return dbPool
}
var err error
dbPool, err = pgxpool.NewWithConfig(ctx, DatabaseConfig("river_testdb"))
require.NoError(tb, err)
return dbPool
}
tx, err := getPool().Begin(ctx)
require.NoError(tb, err)
tb.Cleanup(func() {
err := tx.Rollback(ctx)
if err == nil {
return
}
// Try to look for an error on rollback because it does occasionally
// reveal a real problem in the way a test is written. However, allow
// tests to roll back their transaction early if they like, so ignore
// `ErrTxClosed`.
if errors.Is(err, pgx.ErrTxClosed) {
return
}
// In case of a cancelled context during a database operation, which
// happens in many tests, pgx seems to not only roll back the
// transaction, but closes the connection, and returns this error on
// rollback. Allow this error since it's hard to prevent it in our flows
// that use contexts heavily.
if err.Error() == "conn closed" {
return
}
// Similar to the above, but a newly appeared error that wraps the
// above. As far as I can tell, no error variables are available to use
// with `errors.Is`.
if err.Error() == "failed to deallocate cached statement(s): conn closed" {
return
}
require.NoError(tb, err)
})
return tx
}
// TruncateRiverTables truncates River tables in the target database. This is
// for test cleanup and should obviously only be used in tests.
func TruncateRiverTables(ctx context.Context, pool *pgxpool.Pool) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
tables := []string{"river_job", "river_leader", "river_queue"}
for _, table := range tables {
if _, err := pool.Exec(ctx, fmt.Sprintf("TRUNCATE TABLE %s;", table)); err != nil {
return fmt.Errorf("error truncating %q: %w", table, err)
}
}
return nil
}
// WaitOrTimeout tries to wait on the given channel for a value to come through,
// and returns it if one does, but times out after a reasonable amount of time.
// Useful to guarantee that test cases don't hang forever, even in the event of
// something wrong.
func WaitOrTimeout[T any](tb testing.TB, waitChan <-chan T) T {
tb.Helper()
timeout := rivercommon.WaitTimeout()
select {
case value := <-waitChan:
return value
case <-time.After(timeout):
require.FailNowf(tb, "WaitOrTimeout timed out",
"WaitOrTimeout timed out after waiting %s", timeout)
}
return *new(T) // unreachable
}
// WaitOrTimeoutN tries to wait on the given channel for N values to come
// through, and returns it if they do, but times out after a reasonable amount
// of time. Useful to guarantee that test cases don't hang forever, even in the
// event of something wrong.
func WaitOrTimeoutN[T any](tb testing.TB, waitChan <-chan T, numValues int) []T {
tb.Helper()
var (
timeout = rivercommon.WaitTimeout()
deadline = time.Now().Add(timeout)
values = make([]T, 0, numValues)
)
for {
select {
case value := <-waitChan:
values = append(values, value)
if len(values) >= numValues {
return values
}
case <-time.After(time.Until(deadline)):
require.FailNowf(tb, "WaitOrTimeout timed out",
"WaitOrTimeout timed out after waiting %s (received %d value(s), wanted %d)", timeout, len(values), numValues)
return nil
}
}
}
var ignoredKnownGoroutineLeaks = []goleak.Option{ //nolint:gochecknoglobals
// This goroutine contains a 500 ms uninterruptable sleep that may still be
// running by the time the test suite finishes and cause a failure. This
// might be something that should be fixed in pgx, but ignore it for the
// time being lest we have intermittent tests.
//
// We opened an issue on pgx, but it may or may not be one that gets fixed:
//
// https://github.com/jackc/pgx/issues/1641
goleak.IgnoreTopFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
// Similar to the above, may be sitting in a sleep when the program finishes
// and there's not much we can do about it.
goleak.IgnoreAnyFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).triggerHealthCheck.func1"),
}
// WrapTestMain performs some common setup and teardown that should be shared
// amongst all packages. e.g. Configures a manager for test databases on setup,
// and checks for no goroutine leaks on teardown.
func WrapTestMain(m *testing.M) {
var err error
dbManager, err = testdb.NewManager(DatabaseConfig("river_testdb"), dbPoolMaxConns, nil, TruncateRiverTables)
if err != nil {
log.Fatal(err)
}
status := m.Run()
dbManager.Close()
if status == 0 {
if err := goleak.Find(ignoredKnownGoroutineLeaks...); err != nil {
fmt.Fprintf(os.Stderr, "goleak: Errors on successful test run: %v\n", err)
status = 1
}
}
os.Exit(status)
}