Skip to content

Commit

Permalink
Merge pull request #1672 from influxdb/data-node-recovery
Browse files Browse the repository at this point in the history
Add index tracking to metastore.
  • Loading branch information
benbjohnson committed Feb 21, 2015
2 parents 0be9a1f + 4dbd154 commit 4e5b6e9
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 114 deletions.
28 changes: 27 additions & 1 deletion metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package influxdb

import (
"encoding/binary"
"fmt"
"time"
"unsafe"

Expand Down Expand Up @@ -72,10 +73,22 @@ func (m *metastore) mustView(fn func(*metatx) error) (err error) {
}

// mustUpdate executes a function in the context of a read-write transaction.
// This function requires an index so that it can track the last commit from the broker.
// Panics if a disk or system error occurs. Return error from the fn for validation errors.
func (m *metastore) mustUpdate(fn func(*metatx) error) (err error) {
func (m *metastore) mustUpdate(index uint64, fn func(*metatx) error) (err error) {
if e := m.update(func(tx *metatx) error {
curr := tx.index()
assert(index == 0 || index > curr, "metastore index replay: meta=%d, index=%d", curr, index)

// Execute function passed in.
err = fn(tx)

// Update index, if set.
if index > 0 {
if e := tx.setIndex(index); e != nil {
return fmt.Errorf("set index: %s", e)
}
}
return nil
}); e != nil {
panic("metastore update: " + e.Error())
Expand All @@ -101,6 +114,19 @@ func (tx *metatx) setID(v uint64) error {
return tx.Bucket([]byte("Meta")).Put([]byte("id"), u64tob(v))
}

// index returns the index stored in the meta bucket.
func (tx *metatx) index() (index uint64) {
if v := tx.Bucket([]byte("Meta")).Get([]byte("index")); v != nil {
index = btou64(v)
}
return
}

// setIndex sets the index stored in the meta bucket.
func (tx *metatx) setIndex(v uint64) error {
return tx.Bucket([]byte("Meta")).Put([]byte("index"), u64tob(v))
}

// mustNextSequence generates a new sequence for a key in the meta bucket.
func (tx *metatx) mustNextSequence(key []byte) (id uint64) {
// Retrieve the previous value, if it exists.
Expand Down
Loading

0 comments on commit 4e5b6e9

Please sign in to comment.