forked from benbjohnson/litestream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replica_client.go
519 lines (448 loc) · 16.4 KB
/
replica_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
package litestream
import (
"context"
"fmt"
"io"
"log"
"os"
"time"
"github.com/benbjohnson/litestream/internal"
"github.com/pierrec/lz4/v4"
)
// DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const DefaultRestoreParallelism = 8
// ReplicaClient represents client to connect to a Replica.
type ReplicaClient interface {
// Returns the type of client.
Type() string
// Returns a list of available generations.
Generations(ctx context.Context) ([]string, error)
// Deletes all snapshots & WAL segments within a generation.
DeleteGeneration(ctx context.Context, generation string) error
// Returns an iterator of all snapshots within a generation on the replica.
Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)
// Writes LZ4 compressed snapshot data to the replica at a given index
// within a generation. Returns metadata for the snapshot.
WriteSnapshot(ctx context.Context, generation string, index int, r io.Reader) (SnapshotInfo, error)
// Deletes a snapshot with the given generation & index.
DeleteSnapshot(ctx context.Context, generation string, index int) error
// Returns a reader that contains LZ4 compressed snapshot data for a
// given index within a generation. Returns an os.ErrNotFound error if
// the snapshot does not exist.
SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
// Returns an iterator of all WAL segments within a generation on the replica.
WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)
// Writes an LZ4 compressed WAL segment at a given position.
// Returns metadata for the written segment.
WriteWALSegment(ctx context.Context, pos Pos, r io.Reader) (WALSegmentInfo, error)
// Deletes one or more WAL segments at the given positions.
DeleteWALSegments(ctx context.Context, a []Pos) error
// Returns a reader that contains an LZ4 compressed WAL segment at a given
// index/offset within a generation. Returns an os.ErrNotFound error if the
// WAL segment does not exist.
WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)
}
// FindSnapshotForIndex returns the highest index for a snapshot within a
// generation that occurs before a given index.
func FindSnapshotForIndex(ctx context.Context, client ReplicaClient, generation string, index int) (int, error) {
itr, err := client.Snapshots(ctx, generation)
if err != nil {
return 0, fmt.Errorf("snapshots: %w", err)
}
defer itr.Close()
// Iterate over all snapshots to find the closest to our given index.
snapshotIndex := -1
var n int
for ; itr.Next(); n++ {
info := itr.Snapshot()
if info.Index > index {
continue // after given index, skip
}
// Use snapshot if it's more recent.
if info.Index >= snapshotIndex {
snapshotIndex = info.Index
}
}
if err := itr.Close(); err != nil {
return 0, fmt.Errorf("snapshot iteration: %w", err)
}
// Ensure we find at least one snapshot and that it's before the given index.
if n == 0 {
return 0, ErrNoSnapshots
} else if snapshotIndex == -1 {
return 0, fmt.Errorf("no snapshots available at or before index %s", FormatIndex(index))
}
return snapshotIndex, nil
}
// GenerationTimeBounds returns the creation time & last updated time of a generation.
// Returns ErrNoSnapshots if no data exists for the generation.
func GenerationTimeBounds(ctx context.Context, client ReplicaClient, generation string) (createdAt, updatedAt time.Time, err error) {
// Determine bounds for snapshots only first.
// This will return ErrNoSnapshots if no snapshots exist.
if createdAt, updatedAt, err = SnapshotTimeBounds(ctx, client, generation); err != nil {
return createdAt, updatedAt, err
}
// Update ending time bounds if WAL segments exist after the last snapshot.
_, max, err := WALTimeBounds(ctx, client, generation)
if err != nil && err != ErrNoWALSegments {
return createdAt, updatedAt, err
} else if max.After(updatedAt) {
updatedAt = max
}
return createdAt, updatedAt, nil
}
// SnapshotTimeBounds returns the minimum and maximum snapshot timestamps within a generation.
// Returns ErrNoSnapshots if no data exists for the generation.
func SnapshotTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error) {
itr, err := client.Snapshots(ctx, generation)
if err != nil {
return min, max, fmt.Errorf("snapshots: %w", err)
}
defer itr.Close()
// Iterate over all snapshots to find the oldest and newest.
var n int
for ; itr.Next(); n++ {
info := itr.Snapshot()
if min.IsZero() || info.CreatedAt.Before(min) {
min = info.CreatedAt
}
if max.IsZero() || info.CreatedAt.After(max) {
max = info.CreatedAt
}
}
if err := itr.Close(); err != nil {
return min, max, fmt.Errorf("snapshot iteration: %w", err)
}
// Return error if no snapshots exist.
if n == 0 {
return min, max, ErrNoSnapshots
}
return min, max, nil
}
// WALTimeBounds returns the minimum and maximum snapshot timestamps.
// Returns ErrNoWALSegments if no data exists for the generation.
func WALTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error) {
itr, err := client.WALSegments(ctx, generation)
if err != nil {
return min, max, fmt.Errorf("wal segments: %w", err)
}
defer itr.Close()
// Iterate over all WAL segments to find oldest and newest.
var n int
for ; itr.Next(); n++ {
info := itr.WALSegment()
if min.IsZero() || info.CreatedAt.Before(min) {
min = info.CreatedAt
}
if max.IsZero() || info.CreatedAt.After(max) {
max = info.CreatedAt
}
}
if err := itr.Close(); err != nil {
return min, max, fmt.Errorf("wal segment iteration: %w", err)
}
if n == 0 {
return min, max, ErrNoWALSegments
}
return min, max, nil
}
// FindLatestGeneration returns the most recent generation for a client.
func FindLatestGeneration(ctx context.Context, client ReplicaClient) (generation string, err error) {
generations, err := client.Generations(ctx)
if err != nil {
return "", fmt.Errorf("generations: %w", err)
}
// Search generations for one latest updated.
var maxTime time.Time
for i := range generations {
// Determine the latest update for the generation.
_, updatedAt, err := GenerationTimeBounds(ctx, client, generations[i])
if err != nil {
return "", fmt.Errorf("generation time bounds: %w", err)
}
// Use the latest replica if we have multiple candidates.
if updatedAt.After(maxTime) {
maxTime = updatedAt
generation = generations[i]
}
}
if generation == "" {
return "", ErrNoGeneration
}
return generation, nil
}
// ReplicaClientTimeBounds returns time range covered by a replica client
// across all generations. It scans the time range of all generations and
// computes the lower and upper bounds of them.
func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, max time.Time, err error) {
generations, err := client.Generations(ctx)
if err != nil {
return min, max, fmt.Errorf("generations: %w", err)
} else if len(generations) == 0 {
return min, max, ErrNoGeneration
}
// Iterate over generations to determine outer bounds.
for i := range generations {
// Determine the time range for the generation.
createdAt, updatedAt, err := GenerationTimeBounds(ctx, client, generations[i])
if err != nil {
return min, max, fmt.Errorf("generation time bounds: %w", err)
}
// Update time bounds.
if min.IsZero() || createdAt.Before(min) {
min = createdAt
}
if max.IsZero() || updatedAt.After(max) {
max = updatedAt
}
}
return min, max, nil
}
// FindIndexByTimestamp returns the highest index before a given point-in-time
// within a generation. Returns ErrNoSnapshots if no index exists on the replica
// for the generation.
func FindIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
snapshotIndex, err := FindSnapshotIndexByTimestamp(ctx, client, generation, timestamp)
if err == ErrNoSnapshots {
return 0, err
} else if err != nil {
return 0, fmt.Errorf("max snapshot index: %w", err)
}
// Determine the highest available WAL index.
walIndex, err := FindWALIndexByTimestamp(ctx, client, generation, timestamp)
if err != nil && err != ErrNoWALSegments {
return 0, fmt.Errorf("max wal index: %w", err)
}
// Use snapshot index if it's after the last WAL index.
if snapshotIndex > walIndex {
return snapshotIndex, nil
}
return walIndex, nil
}
// FindSnapshotIndexByTimestamp returns the highest snapshot index before timestamp.
// Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.
func FindSnapshotIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
itr, err := client.Snapshots(ctx, generation)
if err != nil {
return 0, fmt.Errorf("snapshots: %w", err)
}
defer func() { _ = itr.Close() }()
// Iterate over snapshots to find the highest index.
var n int
for ; itr.Next(); n++ {
if info := itr.Snapshot(); info.CreatedAt.After(timestamp) {
continue
} else if info.Index > index {
index = info.Index
}
}
if err := itr.Close(); err != nil {
return 0, fmt.Errorf("snapshot iteration: %w", err)
}
// Return an error if no snapshots were found.
if n == 0 {
return 0, ErrNoSnapshots
}
return index, nil
}
// FindWALIndexByTimestamp returns the highest WAL index before timestamp.
// Returns ErrNoWALSegments if no segments exist for the generation on the replica.
func FindWALIndexByTimestamp(ctx context.Context, client ReplicaClient, generation string, timestamp time.Time) (index int, err error) {
itr, err := client.WALSegments(ctx, generation)
if err != nil {
return 0, fmt.Errorf("wal segments: %w", err)
}
defer func() { _ = itr.Close() }()
// Iterate over WAL segments to find the highest index.
var n int
for ; itr.Next(); n++ {
if info := itr.WALSegment(); info.CreatedAt.After(timestamp) {
continue
} else if info.Index > index {
index = info.Index
}
}
if err := itr.Close(); err != nil {
return 0, fmt.Errorf("wal segment iteration: %w", err)
}
// Return an error if no WAL segments were found.
if n == 0 {
return 0, ErrNoWALSegments
}
return index, nil
}
// FindMaxIndexByGeneration returns the last index within a generation.
// Returns ErrNoSnapshots if no index exists on the replica for the generation.
func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {
// Determine the highest available snapshot index. Returns an error if no
// snapshot are available as WALs are not useful without snapshots.
snapshotIndex, err := FindMaxSnapshotIndexByGeneration(ctx, client, generation)
if err == ErrNoSnapshots {
return index, err
} else if err != nil {
return index, fmt.Errorf("max snapshot index: %w", err)
}
// Determine the highest available WAL index.
walIndex, err := FindMaxWALIndexByGeneration(ctx, client, generation)
if err != nil && err != ErrNoWALSegments {
return index, fmt.Errorf("max wal index: %w", err)
}
// Use snapshot index if it's after the last WAL index.
if snapshotIndex > walIndex {
return snapshotIndex, nil
}
return walIndex, nil
}
// FindMaxSnapshotIndexByGeneration returns the last snapshot index within a generation.
// Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.
func FindMaxSnapshotIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {
itr, err := client.Snapshots(ctx, generation)
if err != nil {
return 0, fmt.Errorf("snapshots: %w", err)
}
defer func() { _ = itr.Close() }()
// Iterate over snapshots to find the highest index.
var n int
for ; itr.Next(); n++ {
if info := itr.Snapshot(); info.Index > index {
index = info.Index
}
}
if err := itr.Close(); err != nil {
return 0, fmt.Errorf("snapshot iteration: %w", err)
}
// Return an error if no snapshots were found.
if n == 0 {
return 0, ErrNoSnapshots
}
return index, nil
}
// FindMaxWALIndexByGeneration returns the last WAL index within a generation.
// Returns ErrNoWALSegments if no segments exist for the generation on the replica.
func FindMaxWALIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error) {
itr, err := client.WALSegments(ctx, generation)
if err != nil {
return 0, fmt.Errorf("wal segments: %w", err)
}
defer func() { _ = itr.Close() }()
// Iterate over WAL segments to find the highest index.
var n int
for ; itr.Next(); n++ {
if info := itr.WALSegment(); info.Index > index {
index = info.Index
}
}
if err := itr.Close(); err != nil {
return 0, fmt.Errorf("wal segment iteration: %w", err)
}
// Return an error if no WAL segments were found.
if n == 0 {
return 0, ErrNoWALSegments
}
return index, nil
}
// Restore restores the database to the given index on a generation.
func Restore(ctx context.Context, client ReplicaClient, filename, generation string, snapshotIndex, targetIndex int, opt RestoreOptions) (err error) {
// Validate options.
if filename == "" {
return fmt.Errorf("restore path required")
} else if generation == "" {
return fmt.Errorf("generation required")
} else if snapshotIndex < 0 {
return fmt.Errorf("snapshot index required")
} else if targetIndex < 0 {
return fmt.Errorf("target index required")
}
// Require a default level of parallelism.
if opt.Parallelism < 1 {
opt.Parallelism = DefaultRestoreParallelism
}
// Ensure logger exists.
logger := opt.Logger
if logger == nil {
logger = log.New(io.Discard, "", 0)
}
// Ensure output path does not already exist.
// If doesn't exist, also remove the journal, shm, & wal if left behind.
if _, err := os.Stat(filename); err == nil {
return fmt.Errorf("cannot restore, output path already exists: %s", filename)
} else if err != nil && !os.IsNotExist(err) {
return err
} else if err := removeDBFiles(filename); err != nil {
return err
}
// Copy snapshot to output path.
tmpPath := filename + ".tmp"
logger.Printf("%srestoring snapshot %s/%s to %s", opt.LogPrefix, generation, FormatIndex(snapshotIndex), tmpPath)
if err := RestoreSnapshot(ctx, client, tmpPath, generation, snapshotIndex, opt.Mode, opt.Uid, opt.Gid); err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err)
}
// Download & apply all WAL files between the snapshot & the target index.
d := NewWALDownloader(client, tmpPath, generation, snapshotIndex, targetIndex)
d.Parallelism = opt.Parallelism
d.Mode = opt.Mode
d.Uid, d.Gid = opt.Uid, opt.Gid
for {
// Read next WAL file from downloader.
walIndex, walPath, err := d.Next(ctx)
if err == io.EOF {
break
}
// If we are only reading a single index, a WAL file may not be found.
if _, ok := err.(*WALNotFoundError); ok && snapshotIndex == targetIndex {
logger.Printf("%sno wal files found, snapshot only", opt.LogPrefix)
break
} else if err != nil {
return fmt.Errorf("cannot download WAL: %w", err)
}
// Apply WAL file.
startTime := time.Now()
if err = ApplyWAL(ctx, tmpPath, walPath); err != nil {
return fmt.Errorf("cannot apply wal: %w", err)
}
logger.Printf("%sapplied wal %s/%s elapsed=%s", opt.LogPrefix, generation, FormatIndex(walIndex), time.Since(startTime).String())
}
// Copy file to final location.
logger.Printf("%srenaming database from temporary location", opt.LogPrefix)
if err := os.Rename(tmpPath, filename); err != nil {
return err
}
return nil
}
// RestoreOptions represents options for DB.Restore().
type RestoreOptions struct {
// File info used for restored snapshot & WAL files.
Mode os.FileMode
Uid, Gid int
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
// Logging settings.
Logger *log.Logger
LogPrefix string
}
// NewRestoreOptions returns a new instance of RestoreOptions with defaults.
func NewRestoreOptions() RestoreOptions {
return RestoreOptions{
Mode: 0600,
Parallelism: DefaultRestoreParallelism,
}
}
// RestoreSnapshot copies a snapshot from the replica client to a file.
func RestoreSnapshot(ctx context.Context, client ReplicaClient, filename, generation string, index int, mode os.FileMode, uid, gid int) error {
f, err := internal.CreateFile(filename, mode, uid, gid)
if err != nil {
return err
}
defer f.Close()
rd, err := client.SnapshotReader(ctx, generation, index)
if err != nil {
return err
}
defer rd.Close()
if _, err := io.Copy(f, lz4.NewReader(rd)); err != nil {
return err
} else if err := f.Sync(); err != nil {
return err
}
return f.Close()
}