Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup snapshot functionality. Fixes #94 #95

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 150 additions & 48 deletions boltz/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package boltz
import (
"bytes"
"context"
"fmt"
"github.com/google/uuid"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
Expand All @@ -27,34 +28,54 @@ import (
"go.etcd.io/bbolt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
)

const (
Metadata = "meta"
SnapshotId = "snapshotId"
Metadata = "meta"
SnapshotId = "snapshotId"
ResetTimeline = "resetTimeline"
TimelineId = "timelineId"
)

type TimelineMode string

const (
TimelineModeDefault TimelineMode = "default"
TimelineModeInitIfEmpty TimelineMode = "initIfEmpty"
TimelineModeForceReset TimelineMode = "forceReset"
)

func (t TimelineMode) forceResetTimeline(timelineId *string) bool {
if t == TimelineModeForceReset {
return true
}
if t == TimelineModeInitIfEmpty && timelineId == nil {
return true
}
return false
}

type Db interface {
io.Closer
Update(ctx MutateContext, fn func(ctx MutateContext) error) error
Batch(ctx MutateContext, fn func(ctx MutateContext) error) error
View(fn func(tx *bbolt.Tx) error) error
RootBucket(tx *bbolt.Tx) (*bbolt.Bucket, error)

// Snapshot makes a copy of the bolt file
Snapshot(tx *bbolt.Tx) error
// GetDefaultSnapshotPath returns the default location for a snapshot created now
GetDefaultSnapshotPath() string

// SnapshotToMemory writes a snapshot of the database state to a memory buffer.
// The snapshot has a UUID generated and stored at rootBucket/snapshotId
// The snapshot id and snapshot are returned
SnapshotToMemory() (string, []byte, error)
// Snapshot makes a copy of the bolt file at the given location
Snapshot(path string) (string, string, error)

// SnapshotToWriter writes a snapshot of the database state to the given writer
// The snapshot has a UUID generated and stored at rootBucket/snapshotId
// The snapshot id and snapshot are returned
SnapshotToWriter(w io.Writer) (string, error)
// SnapshotInTx makes a copy of the bolt file at the given location, using an existing tx
SnapshotInTx(tx *bbolt.Tx, path string) (string, string, error)

StreamToWriter(w io.Writer) error

// GetSnapshotId returns the id of the last snapshot created/restored
GetSnapshotId() (*string, error)
Expand All @@ -73,6 +94,9 @@ type Db interface {
// AddTxCompleteListener adds a listener which is called all tx processing is complete, including
// post-commit hooks
AddTxCompleteListener(listener func(ctx MutateContext))

// GetTimelineId returns the timeline id
GetTimelineId(mode TimelineMode, idF func() (string, error)) (string, error)
}

type DbImpl struct {
Expand Down Expand Up @@ -198,38 +222,67 @@ func (self *DbImpl) RootBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
return rootBucket, nil
}

func (self *DbImpl) Snapshot(tx *bbolt.Tx) error {
self.reloadLock.RLock()
defer self.reloadLock.RUnlock()

func (self *DbImpl) GetDefaultSnapshotPath() string {
path := self.db.Path()
path += "-" + time.Now().Format("20060102-150405")
return path
}

_, err := os.Stat(path)
func (self *DbImpl) Snapshot(path string) (string, string, error) {
var actualPath string
var snapshotId string
err := self.View(func(tx *bbolt.Tx) error {
var err error
actualPath, snapshotId, err = self.SnapshotInTx(tx, path)
return err
})
if err != nil {
if !os.IsNotExist(err) {
return err
}
} else {
pfxlog.Logger().Infof("bolt db backup already made: %v", path)
return nil
return "", "", err
}
return actualPath, snapshotId, nil
}

file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
func (self *DbImpl) SnapshotInTx(tx *bbolt.Tx, path string) (string, string, error) {
self.reloadLock.RLock()
defer self.reloadLock.RUnlock()

now := time.Now()
dateStr := now.Format("20060102")
timeStr := now.Format("150405")
path = strings.ReplaceAll(path, "DATE", dateStr)
path = strings.ReplaceAll(path, "TIME", timeStr)
path = strings.ReplaceAll(path, "DB_DIR", filepath.Dir(self.db.Path()))
path = strings.ReplaceAll(path, "DB_FILE", filepath.Base(self.db.Path()))
plorenz marked this conversation as resolved.
Show resolved Hide resolved
path = strings.ReplaceAll(path, "__DATE__", dateStr)
path = strings.ReplaceAll(path, "__TIME__", timeStr)
path = strings.ReplaceAll(path, "__DB_DIR__", filepath.Dir(self.db.Path()))
path = strings.ReplaceAll(path, "__DB_FILE__", filepath.Base(self.db.Path()))

pfxlog.Logger().WithField("path", path).Info("snapshotting database to file")

if err := tx.CopyFile(path, 0600); err != nil {
return "", "", err
}
defer func() {
if err = file.Close(); err != nil {
pfxlog.Logger().Errorf("failed to close backup database file %v (%v)", path, err)
}
}()

_, err = tx.WriteTo(file)
snapshotId, err := self.MarkAsSnapshot(path)
if err != nil {
pfxlog.Logger().Infof("created bolt db backup: %v", path)
if rmErr := os.Remove(path); rmErr != nil {
pfxlog.Logger().WithError(rmErr).Error("failed to removed snapshot after failing to mark snapshot")
}
return "", "", fmt.Errorf("failed to update snapshot metadata: %w", err)
}
return err

return path, snapshotId, nil
}

func (self *DbImpl) StreamToWriter(w io.Writer) error {
self.reloadLock.RLock()
defer self.reloadLock.RUnlock()

return self.db.View(func(tx *bbolt.Tx) error {
_, err := tx.WriteTo(w)
return err
})
}

func (self *DbImpl) RestoreSnapshot(snapshot []byte) {
Expand Down Expand Up @@ -293,26 +346,31 @@ func (self *DbImpl) AddRestoreListener(f func()) {
self.restoreListeners.Append(f)
}

func (self *DbImpl) SnapshotToMemory() (string, []byte, error) {
buf := &bytes.Buffer{}
id, err := self.SnapshotToWriter(buf)
return id, buf.Bytes(), err
}
func (self *DbImpl) MarkAsSnapshot(path string) (string, error) {
db, err := Open(path, self.rootBucket)
if err != nil {
return "", err
}

defer func() {
_ = db.Close()
}()

func (self *DbImpl) SnapshotToWriter(w io.Writer) (string, error) {
snapshotId := uuid.NewString()
err := self.Update(nil, func(ctx MutateContext) error {
b := GetOrCreatePath(ctx.Tx(), Metadata)

err = db.Update(nil, func(ctx MutateContext) error {
tx := ctx.Tx()
b := GetOrCreatePath(tx, Metadata)
b.SetString(SnapshotId, snapshotId, nil)
if b.HasError() {
return b.GetError()
}
_, err := ctx.Tx().WriteTo(w)
return err
b.SetBool(ResetTimeline, true, nil)
return b.GetError()
})

if err != nil {
return "", err
return "", fmt.Errorf("error setting snapshot properties %w", err)
}

pfxlog.Logger().Infof("set snapshot id at %s to %s", path, snapshotId)
return snapshotId, nil
}

Expand All @@ -329,3 +387,47 @@ func (self *DbImpl) GetSnapshotId() (*string, error) {
}
return snapshotId, nil
}

func (self *DbImpl) GetTimelineId(mode TimelineMode, idF func() (string, error)) (string, error) {
timelineId := ""
err := self.Update(nil, func(ctx MutateContext) error {
b := GetOrCreatePath(ctx.Tx(), Metadata)
if b.HasError() {
return b.Err
}
resetRequired := b.GetBoolWithDefault(ResetTimeline, false)
idPointer := b.GetString(TimelineId)
pfxlog.Logger().Infof("checking timeline id. reset required? %v timelineId: %s", resetRequired, func() string {
if idPointer != nil {
return *idPointer
}
return "nil"
}())

if resetRequired || mode.forceResetTimeline(idPointer) {
id, err := idF()
if err != nil {
return err
}
timelineId = id
b.SetString(TimelineId, id, nil)
b.SetBool(ResetTimeline, false, nil)

oldTimelineId := ""
if idPointer != nil {
oldTimelineId = *idPointer
}
pfxlog.Logger().Infof("updated timeline id %s -> %s", oldTimelineId, timelineId)
return b.GetError()
}
if idPointer != nil {
timelineId = *idPointer
}
return nil
})
if err != nil {
return "", err
}
return timelineId, nil

}
2 changes: 1 addition & 1 deletion boltz/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (m *migrationManager) Migrate(component string, targetVersion int, migrator
}

if versionP != nil && version != targetVersion {
if err := m.db.Snapshot(ctx.Tx()); err != nil {
if _, _, err = m.db.SnapshotInTx(ctx.Tx(), m.db.GetDefaultSnapshotPath()); err != nil {
return fmt.Errorf("failed to create bolt db snapshot: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3
0.4
Loading