-
Notifications
You must be signed in to change notification settings - Fork 0
/
pgx_collect.go
529 lines (439 loc) · 17 KB
/
pgx_collect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
package pgx_collect
import (
"fmt"
"reflect"
"github.com/jackc/pgx/v5"
. "github.com/zolstein/pgx-collect/internal"
)
// Scanner provides an interface for scanning a row into a receiver value,
// while doing type-checking and initialization only once per query.
type Scanner[T any] interface {
// Initialize sets up the Scanner and validates it against the rows.
// Initialize must be called once before ScanRowInto.
Initialize(rows pgx.Rows) error
// ScanRowInto scans the row into the receiver.
ScanRowInto(receiver *T, rows pgx.Rows) error
}
// RowSpec defines a specification for scanning rows into a given type.
//
// Note on the weird type definitions:
// RowSpec returns a struct containing a private function pointer because:
// 1. We want to be able to manage the lifecycle of the returned value inside the
// collection functions. (E.g. we may decide to pool scanners for reuse.)
// In order to do this safely, we need to ensure the Scanner returned by
// the inner function isn't referenced outside of the collecting function.
// This requires that we have a function returning a scanner.
// 2. Returning a struct allows us to extend this value in the future if necessary.
// By comparison, returning a function would not, and would require a (technically)
// breaking change if the type needed to change in the future.
// 3. Returning a non-exported type lets us hide as many details as possible from
// the public API and restrict the only valid usage to:
// pgx.CollectRows(rows, RowTo[Type])
// 4. RowSpec is itself a function to provide a place to put the generic type
// parameter. rowSpecRes cannot be a constant, since then there would be no
// place to put the type parameter. Since rowSpecRes cannot be constructed in
// client code (by design) it can't be applied when creating a struct value.
type RowSpec[T any] func() rowSpecRes[T]
type rowSpecRes[T any] struct {
fn func() Scanner[T]
}
func (rs RowSpec[T]) Scanner() Scanner[T] {
return rs().fn()
}
// AppendRows iterates through rows, scanning each row according to into,
// and appending the results into a slice of T.
func AppendRows[T any, S ~[]T](slice S, rows pgx.Rows, into RowSpec[T]) (S, error) {
return AppendRowsUsing(slice, rows, into().fn())
}
// AppendRowsUsing iterates through rows, scanning each row with the scanner,
// and appending the results into a slice of T.
func AppendRowsUsing[T any, S ~[]T](
slice S,
rows pgx.Rows,
scanner Scanner[T],
) (s S, err error) {
defer rows.Close()
if err := scanner.Initialize(rows); err != nil {
return nil, err
}
startingLen := len(slice)
var startingPtr *T
if cap(slice) > 0 {
startingPtr = &slice[:cap(slice)][0]
}
defer func() {
// Named return values guarantee this err is the err that's actually returned.
if err != nil && len(slice) > startingLen && &slice[0] == startingPtr {
// An error occurred AND slice still has the same backing array as the input slice.
// Therefore, some written values are visible in the input slice. This could cause
// problems, especially if T contains pointers which are kept alive.
// To mitigate this, zero out the slice beyond the starting length.
for i := range slice[startingLen:] {
var zero T
slice[startingLen+i] = zero
}
}
}()
for rows.Next() {
i := len(slice)
var zero T
slice = append(slice, zero)
err := scanner.ScanRowInto(&slice[i], rows)
if err != nil {
return nil, err
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return slice, nil
}
// CollectRows iterates through rows, scanning each row according to into,
// and collecting the results into a slice of T.
func CollectRows[T any](rows pgx.Rows, into RowSpec[T]) ([]T, error) {
return CollectRowsUsing(rows, into().fn())
}
// CollectRowsUsing iterates through rows, scanning each row with the scanner,
// and collecting the results into a slice of T.
func CollectRowsUsing[T any](rows pgx.Rows, scanner Scanner[T]) ([]T, error) {
return AppendRowsUsing([]T{}, rows, scanner)
}
// CollectOneRow scans the first row in rows and returns the result.
// If no rows are found returns an error where errors.Is(pgx.ErrNoRows) is true.
// CollectOneRow is to CollectRows as QueryRow is to Query.
func CollectOneRow[T any](rows pgx.Rows, into RowSpec[T]) (T, error) {
return CollectOneRowUsing(rows, into().fn())
}
// CollectOneRowUsing scans the first row in rows and returns the result.
// If no rows are found returns an error where errors.Is(pgx.ErrNoRows) is true.
// CollectOneRowUsing is to CollectRowsUsing as QueryRow is to Query.
func CollectOneRowUsing[T any](rows pgx.Rows, scanner Scanner[T]) (T, error) {
defer rows.Close()
var (
err error
value T
zero T
)
err = scanner.Initialize(rows)
if err != nil {
return zero, err
}
if !rows.Next() {
if err = rows.Err(); err != nil {
return zero, err
}
return zero, pgx.ErrNoRows
}
err = scanner.ScanRowInto(&value, rows)
if err != nil {
return zero, err
}
rows.Close()
err = rows.Err()
if err != nil {
return zero, err
}
return value, nil
}
// CollectExactlyOneRow scans the first row in rows and returns the result.
// - If no rows are found returns an error where errors.Is(pgx.ErrNoRows) is true.
// - If more than 1 row is found returns an error where errors.Is(ErrTooManyRows) is true.
func CollectExactlyOneRow[T any](rows pgx.Rows, into RowSpec[T]) (T, error) {
return CollectExactlyOneRowUsing(rows, into().fn())
}
// CollectExactlyOneRowUsing scans the first row in rows and returns the result.
// - If no rows are found returns an error where errors.Is(pgx.ErrNoRows) is true.
// - If more than 1 row is found returns an error where errors.Is(ErrTooManyRows) is true.
func CollectExactlyOneRowUsing[T any](rows pgx.Rows, scanner Scanner[T]) (T, error) {
defer rows.Close()
var (
err error
value T
zero T
)
err = scanner.Initialize(rows)
if err != nil {
return zero, err
}
if !rows.Next() {
if err = rows.Err(); err != nil {
return zero, err
}
return zero, pgx.ErrNoRows
}
err = scanner.ScanRowInto(&value, rows)
if err != nil {
return zero, err
}
if rows.Next() {
return zero, pgx.ErrTooManyRows
}
rows.Close()
err = rows.Err()
if err != nil {
return zero, err
}
return value, nil
}
type simpleScanner[T any] struct {
scanTargets []any
}
var _ Scanner[struct{}] = (*simpleScanner[struct{}])(nil)
// newSimpleScanner returns a Scanner that scans a row into a T.
func newSimpleScanner[T any]() Scanner[T] {
return &simpleScanner[T]{}
}
// newAddrOfSimpleScanner returns a Scanner that scans a row into a *T.
func newAddrOfSimpleScanner[T any]() Scanner[*T] {
return newAddrScanner(newSimpleScanner[T]())
}
// RowTo scans a row into a T.
func RowTo[T any]() rowSpecRes[T] {
return rowSpecRes[T]{fn: newSimpleScanner[T]}
}
// RowToAddrOf scans a row into a *T.
func RowToAddrOf[T any]() rowSpecRes[*T] {
return rowSpecRes[*T]{fn: newAddrOfSimpleScanner[T]}
}
func (rs *simpleScanner[T]) Initialize(rows pgx.Rows) error {
return nil
}
func (rs *simpleScanner[T]) ScanRowInto(receiver *T, rows pgx.Rows) error {
if rs.scanTargets == nil {
rs.scanTargets = make([]any, 1)
}
rs.scanTargets[0] = receiver
return rows.Scan(rs.scanTargets...)
}
type positionalStructScanner[T any] struct {
structScanner[T]
}
var _ Scanner[struct{}] = (*positionalStructScanner[struct{}])(nil)
// newPositionalStructScanner returns a Scanner that scans a T from a row.
// T must be a struct. T must have the same number of public fields as row has fields.
// The row and T fields will be matched by position.
// If the "db" struct tag is "-" then the field will be ignored.
func newPositionalStructScanner[T any]() Scanner[T] {
return &positionalStructScanner[T]{}
}
// newPositionalStructScanner returns a Scanner that scans a *T from a row.
// T must be a struct. T must have the same number of public fields as row has fields.
// The row and T fields will be matched by position.
// If the "db" struct tag is "-" then the field will be ignored.
func newAddrOfPositionalStructScanner[T any]() Scanner[*T] {
return newAddrScanner[T](newPositionalStructScanner[T]())
}
// RowToStructByPos scans a row into a T.
// T must be a struct. T must have the same number of public fields as row has fields.
// The row and T fields will be matched by position.
// If the "db" struct tag is "-" then the field will be ignored.
func RowToStructByPos[T any]() rowSpecRes[T] {
return rowSpecRes[T]{fn: newPositionalStructScanner[T]}
}
// RowToStructByPos scans a row into a *T.
// T must be a struct. T must have the same number of public fields as row has fields.
// The row and T fields will be matched by position.
// If the "db" struct tag is "-" then the field will be ignored.
func RowToAddrOfStructByPos[T any]() rowSpecRes[*T] {
return rowSpecRes[*T]{fn: newAddrOfPositionalStructScanner[T]}
}
func (rs *positionalStructScanner[T]) Initialize(rows pgx.Rows) error {
typ := typeFor[T]()
if typ.Kind() != reflect.Struct {
return fmt.Errorf("generic type '%s' is not a struct", typ.Name())
}
fldDescs := rows.FieldDescriptions()
var err error
rs.scanFields, err = GetStructRowFieldsByPos(typ, fldDescs)
return err
}
type namedStructScanner[T any] struct {
structScanner[T]
}
type strictNamedStructScanner[T any] struct {
namedStructScanner[T]
}
type laxNamedStructScanner[T any] struct {
namedStructScanner[T]
}
var (
_ Scanner[struct{}] = (*strictNamedStructScanner[struct{}])(nil)
_ Scanner[struct{}] = (*laxNamedStructScanner[struct{}])(nil)
)
// newNamedStructScanner returns Scanner that scans a row into a T.
// T must be a struct. T must have the same number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func newNamedStructScanner[T any]() Scanner[T] {
return &strictNamedStructScanner[T]{}
}
// newLaxNamedStructScanner returns Scanner that scans a row into a T.
// T must be a struct. T must have greater than or equal number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func newLaxNamedStructScanner[T any]() Scanner[T] {
return &laxNamedStructScanner[T]{}
}
// newAddrOfNamedStructScanner returns Scanner that scans a row into a *T.
// T must be a struct. T must have the same number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func newAddrOfNamedStructScanner[T any]() Scanner[*T] {
return newAddrScanner[T](newNamedStructScanner[T]())
}
// newAddrOfLaxNamedStructScanner returns Scanner that scans a row into a *T.
// T must be a struct. T must have greater than or equal number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func newAddrOfLaxNamedStructScanner[T any]() Scanner[*T] {
return newAddrScanner[T](newLaxNamedStructScanner[T]())
}
// RowToStructByName scans a row into a T.
// T must be a struct. T must have the same number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func RowToStructByName[T any]() rowSpecRes[T] {
return rowSpecRes[T]{fn: newNamedStructScanner[T]}
}
// RowToAddrOfStructByName scans a row into a *T.
// T must be a struct. T must have the same number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func RowToAddrOfStructByName[T any]() rowSpecRes[*T] {
return rowSpecRes[*T]{fn: newAddrOfNamedStructScanner[T]}
}
// RowToStructByNameLax scans a row into a T.
// T must be a struct. T must have greater than or equal number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func RowToStructByNameLax[T any]() rowSpecRes[T] {
return rowSpecRes[T]{fn: newLaxNamedStructScanner[T]}
}
// RowToAddrOfStructByNameLax scans a row into a *T.
// T must be a struct. T must have greater than or equal number of named public fields as row has fields.
// The row and T fields will be matched by name. The match is case-insensitive.
// The database column name can be overridden with a "db" struct tag.
// If the "db" struct tag is "-" then the field will be ignored.
func RowToAddrOfStructByNameLax[T any]() rowSpecRes[*T] {
return rowSpecRes[*T]{fn: newAddrOfLaxNamedStructScanner[T]}
}
func (rs *strictNamedStructScanner[T]) Initialize(rows pgx.Rows) error {
return rs.initialize(rows, false)
}
func (rs *laxNamedStructScanner[T]) Initialize(rows pgx.Rows) error {
return rs.initialize(rows, true)
}
func (rs *namedStructScanner[T]) initialize(rows pgx.Rows, lax bool) error {
typ := typeFor[T]()
if typ.Kind() != reflect.Struct {
return fmt.Errorf("generic type '%s' is not a struct", typ.Name())
}
fldDescs := rows.FieldDescriptions()
var missingField string
var err error
rs.scanFields, missingField, err = GetStructRowFieldsByName(typ, fldDescs)
if !lax && missingField != "" {
return fmt.Errorf("cannot find field %s in returned row", missingField)
} else if err != nil {
return err
}
return nil
}
func typeFor[T any]() reflect.Type {
// Definition copied from reflect.TypeFor.
// TODO: Use reflect.TypeFor when we support for go versions < 1.22 is dropped.
return reflect.TypeOf((*T)(nil)).Elem()
}
// structScanner encapsulates the logic to scan a row into fields of a struct.
type structScanner[T any] struct {
scanFields StructRowFields
scanTargets []any
}
func (rs *structScanner[T]) ScanRowInto(receiver *T, rows pgx.Rows) error {
if rs.scanTargets == nil {
rs.scanTargets = make([]any, rs.scanFields.NumFields())
}
r := ReceiverFromPointer(receiver)
rs.scanFields.Populate(r, rs.scanTargets)
return rows.Scan(rs.scanTargets...)
}
// addrScannerInfo wraps a Scanner[T] into a Scanner[*T].
type addrScanner[T any] struct {
wrapped Scanner[T]
}
// newAddrScanner returns a Scanner that wraps a Scanner to scan into a pointer.
func newAddrScanner[T any](wrapped Scanner[T]) Scanner[*T] {
return &addrScanner[T]{wrapped: wrapped}
}
var _ Scanner[*struct{}] = (*addrScanner[struct{}])(nil)
func (rs *addrScanner[T]) Initialize(rows pgx.Rows) error {
return rs.wrapped.Initialize(rows)
}
func (rs *addrScanner[T]) ScanRowInto(receiver **T, rows pgx.Rows) error {
*receiver = new(T)
return rs.wrapped.ScanRowInto(*receiver, rows)
}
type mapScanner struct{}
var _ Scanner[map[string]any] = (*mapScanner)(nil)
// newMapScanner returns a Scanner that scans a row into a map.
func newMapScanner() Scanner[map[string]any] {
return &mapScanner{}
}
// RowToMap scans a row into a map.
func RowToMap() rowSpecRes[map[string]any] {
return rowSpecRes[map[string]any]{fn: newMapScanner}
}
func (*mapScanner) Initialize(rows pgx.Rows) error {
return nil
}
func (*mapScanner) ScanRowInto(receiver *map[string]any, rows pgx.Rows) error {
values, err := rows.Values()
if err != nil {
return err
}
*receiver = make(map[string]any, len(values))
for i := range values {
(*receiver)[rows.FieldDescriptions()[i].Name] = values[i]
}
return nil
}
type adapterScanner[T any] pgx.RowToFunc[T]
var _ Scanner[struct{}] = (adapterScanner[struct{}])(nil)
// Adapt adapts a RowToFunc (the input to pgx.CollectRows, etc.) into a RowSpec.
//
// This simplifies migration from pgx.CollectRows to pgx_collect.CollectRows by
// allowing code-bases with custom RowToFunc implementations to replace
//
// pgx.CollectRows(rows, myRowToFunc)
//
// with:
//
// pgxc.CollectRows(rows, pgxc.Adapt(myRowToFunc))
//
// This is only recommendation for custom implementations of RowToFunc during
// a migration process. Implementations of RowToFunc in pgx have native
// pgx_collect implementations that are more concise and efficient. Custom
// implementations can likely be refactored to work with RowToCustom which
// should be somewhat more efficient for queries over multiple rows.
func Adapt[T any](rowTo pgx.RowToFunc[T]) RowSpec[T] {
return func() rowSpecRes[T] {
return rowSpecRes[T]{
fn: func() Scanner[T] { return adapterScanner[T](rowTo) },
}
}
}
func (adapterScanner[T]) Initialize(pgx.Rows) error {
return nil
}
func (rs adapterScanner[T]) ScanRowInto(receiver *T, rows pgx.Rows) error {
var err error
*receiver, err = rs(rows)
return err
}