Skip to content

Commit

Permalink
snapshot: Ensure a snapshot tx
Browse files Browse the repository at this point in the history
Previously a snapshot could contain tx that were >= the snapshot_tx.
This worked fine due to the index dropping tx during replay that were >=
tx already in the index.

However it would be better for the future if snapshots actually only
contained tx's up to the snapshot tx
  • Loading branch information
thorfour committed Feb 12, 2024
1 parent 3c7e8dc commit 8d7d330
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 13 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
filepath.Join(table.db.indexDir(), table.name, table.ActiveBlock().ulid.String()), // Any index files are found at <db.indexDir>/<table.name>/<block.id>
table.schema,
table.IndexConfig(),
db.Wait,
db.HighWatermark,
index.LSMWithMetrics(table.metrics.indexMetrics),
index.LSMWithLogger(table.logger),
)
Expand Down
34 changes: 24 additions & 10 deletions index/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type LSM struct {
sizes []atomic.Int64

// Options
logger log.Logger
metrics *LSMMetrics
wait func(uint64)
logger log.Logger
metrics *LSMMetrics
watermark func() uint64
}

// LSMMetrics are the metrics for an LSM index.
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewLSMMetrics(reg prometheus.Registerer) *LSMMetrics {
// NewLSM returns an LSM-like index of len(levels) levels.
// wait is a function that will block until the given transaction has been committed; this is used only during compaction to ensure
// that all the tx in the level up to the compaction tx have been committed before compacting.
func NewLSM(dir string, schema *dynparquet.Schema, levels []*LevelConfig, wait func(uint64), options ...LSMOption) (*LSM, error) {
func NewLSM(dir string, schema *dynparquet.Schema, levels []*LevelConfig, watermark func() uint64, options ...LSMOption) (*LSM, error) {
if err := validateLevels(levels); err != nil {
return nil, err
}
Expand All @@ -132,7 +132,7 @@ func NewLSM(dir string, schema *dynparquet.Schema, levels []*LevelConfig, wait f
sizes: make([]atomic.Int64, len(levels)),
compacting: sync.Mutex{},
logger: log.NewNopLogger(),
wait: wait,
watermark: watermark,
}

for _, opt := range options {
Expand Down Expand Up @@ -252,7 +252,7 @@ func (l *LSM) LevelSize(t SentinelType) int64 {
}

// Snapshot creates a snapshot of the index at the given transaction. It will call the writer function with the parts in the index that are in-memory.
func (l *LSM) Snapshot(writer func(parts.Part) error, dir string) error {
func (l *LSM) Snapshot(tx uint64, writer func(parts.Part) error, dir string) error {
l.compacting.Lock()
defer l.compacting.Unlock()

Expand Down Expand Up @@ -290,7 +290,9 @@ func (l *LSM) Snapshot(writer func(parts.Part) error, dir string) error {
return true
}

snapshotList = append(snapshotList, node.part)
if node.part.TX() <= tx {
snapshotList = append(snapshotList, node.part)
}
return true
})
if iterError != nil {
Expand Down Expand Up @@ -538,14 +540,26 @@ func (l *LSM) merge(level SentinelType) error {

compact := l.findLevel(level)

// Wait for the watermark to reach the most recent transaction.
// This ensures a sorted list of transactions.
// Find a transaction that is <= the current watermark.
// This ensures a contiguous sorted list of transactions.
if level == L0 {
compact = compact.next.Load()
if compact == nil || compact.part == nil {
return nil // nothing to compact
}
l.wait(compact.part.TX())

// Find the first part that is <= the watermark and reset the compact list to that part.
wm := l.watermark()
compact.Iterate(func(node *Node) bool {
if node.part != nil && node.sentinel != L0 {
return false
}
if node.part.TX() <= wm {
compact = node
return false
}
return true
})
}

nodeList := []*Node{}
Expand Down
25 changes: 24 additions & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/polarsignals/frostdb/dynparquet"
snapshotpb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/snapshot/v1alpha1"
tablepb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/table/v1alpha1"
walpb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/wal/v1alpha1"
"github.com/polarsignals/frostdb/index"
"github.com/polarsignals/frostdb/parts"
)
Expand Down Expand Up @@ -128,14 +129,36 @@ func (db *DB) snapshot(ctx context.Context, async bool, onSuccess func()) {
return
}

tx := db.beginRead()
tx, _, commit := db.begin()
level.Debug(db.logger).Log(
"msg", "starting a new snapshot",
"tx", tx,
)
doSnapshot := func(writeSnapshot func(context.Context, io.Writer) error) {
db.Wait(tx - 1) // Wait for all transactions to complete before taking a snapshot.
start := time.Now()
defer db.snapshotInProgress.Store(false)
defer commit()
if db.columnStore.enableWAL {
// Appending a snapshot record to the WAL is necessary,
// since the WAL expects a 1:1 relationship between txn ids
// and record indexes. This is done before the actual snapshot so
// that a failure to snapshot still appends a record to the WAL,
// avoiding a WAL deadlock.
if err := db.wal.Log(
tx,
&walpb.Record{
Entry: &walpb.Entry{
EntryType: &walpb.Entry_Snapshot_{Snapshot: &walpb.Entry_Snapshot{Tx: tx}},
},
},
); err != nil {
level.Error(db.logger).Log(
"msg", "failed to append snapshot record to WAL", "err", err,
)
return
}
}
if err := db.snapshotAtTX(ctx, tx, writeSnapshot); err != nil {
level.Error(db.logger).Log(
"msg", "failed to snapshot database", "err", err,
Expand Down
2 changes: 1 addition & 1 deletion table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ func newTableBlock(table *Table, prevTx, tx uint64, id ulid.ULID) (*TableBlock,
filepath.Join(table.db.indexDir(), table.name, id.String()), // Any index files are found at <db.indexDir>/<table.name>/<block.id>
table.schema,
table.IndexConfig(),
table.db.Wait,
table.db.HighWatermark,
index.LSMWithMetrics(table.metrics.indexMetrics),
index.LSMWithLogger(table.logger),
)
Expand Down

0 comments on commit 8d7d330

Please sign in to comment.