Skip to content

Commit

Permalink
agent: switch to go.etc.io/bbolt for state store
Browse files Browse the repository at this point in the history
This PR modifies the server and client agents to use `go.etc.io/bbolt` as the
implementation for their state stores.
  • Loading branch information
shoenig committed Feb 23, 2022
1 parent 16efcf4 commit b2fe196
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 58 deletions.
5 changes: 3 additions & 2 deletions .changelog/12107.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ deps: Update hashicorp/raft-boltdb to v2.2.0
```

```release-note:improvement
core: Switch from boltdb/bolt to go.etcd.io/bbolt
agent: Switch from boltdb/bolt to go.etcd.io/bbolt
```

```release-note:improvement
Expand All @@ -15,5 +15,6 @@ metrics: Emit metrics regarding raft boltdb operations
```

```release-note:breaking-change
core: The server raft implementation will automatically migrate its underlying raft.db database on startup. Downgrading to a previous version of the server after upgrading it to Nomad 1.3 is not supported.
agent: The state database on both clients and servers will automatically migrate its underlying database on startup. Downgrading to a previous version of an agent after upgrading it to Nomad 1.3 is not supported.
```

7 changes: 3 additions & 4 deletions client/state/state_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"path/filepath"
"time"

"github.com/boltdb/bolt"

hclog "github.com/hashicorp/go-hclog"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/nomad/structs"
"go.etcd.io/bbolt"
)

/*
Expand Down Expand Up @@ -139,11 +138,11 @@ func NewBoltStateDB(logger hclog.Logger, stateDir string) (StateDB, error) {
firstRun := fi == nil

// Timeout to force failure when accessing a data dir that is already in use
timeout := &bolt.Options{Timeout: 5 * time.Second}
timeout := &bbolt.Options{Timeout: 5 * time.Second}

// Create or open the boltdb state database
db, err := boltdd.Open(fn, 0600, timeout)
if err == bolt.ErrTimeout {
if err == bbolt.ErrTimeout {
return nil, fmt.Errorf("timed out while opening database, is another Nomad process accessing data_dir %s?", stateDir)
} else if err != nil {
return nil, fmt.Errorf("failed to create state database: %v", err)
Expand Down
17 changes: 8 additions & 9 deletions client/state/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ import (
"fmt"
"os"

"github.com/boltdb/bolt"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/nomad/structs"
)

// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
// already up to date.
func NeedsUpgrade(bdb *bolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
func NeedsUpgrade(bdb *bbolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
upgradeTo09 = true
upgradeTo13 = true
err = bdb.View(func(tx *bolt.Tx) error {
err = bdb.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(metaBucketName)
if b == nil {
// No meta bucket; upgrade
Expand Down Expand Up @@ -53,7 +52,7 @@ func NeedsUpgrade(bdb *bolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {

// addMeta adds version metadata to BoltDB to mark it as upgraded and
// should be run at the end of the upgrade transaction.
func addMeta(tx *bolt.Tx) error {
func addMeta(tx *bbolt.Tx) error {
// Create the meta bucket if it doesn't exist
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
Expand All @@ -64,13 +63,13 @@ func addMeta(tx *bolt.Tx) error {

// backupDB backs up the existing state database prior to upgrade overwriting
// previous backups.
func backupDB(bdb *bolt.DB, dst string) error {
func backupDB(bdb *bbolt.DB, dst string) error {
fd, err := os.Create(dst)
if err != nil {
return err
}

return bdb.View(func(tx *bolt.Tx) error {
return bdb.View(func(tx *bbolt.Tx) error {
if _, err := tx.WriteTo(fd); err != nil {
fd.Close()
return err
Expand Down Expand Up @@ -145,7 +144,7 @@ func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error {
}

// upgradeAllocBucket upgrades an alloc bucket.
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, allocID string) error {
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bbolt.Bucket, allocID string) error {
allocFound := false
taskBuckets := [][]byte{}
cur := bkt.Cursor()
Expand Down Expand Up @@ -253,7 +252,7 @@ func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, al

// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys
// and returning the 0.8 version of the state.
func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState08, error) {
func upgradeTaskBucket(logger hclog.Logger, bkt *bbolt.Bucket) (*taskRunnerState08, error) {
simpleFound := false
var trState taskRunnerState08

Expand Down
14 changes: 7 additions & 7 deletions client/state/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (
"path/filepath"
"testing"

"github.com/boltdb/bolt"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)

func setupBoltDB(t *testing.T) (*bolt.DB, func()) {
func setupBoltDB(t *testing.T) (*bbolt.DB, func()) {
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)

db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
db, err := bbolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
if err != nil {
os.RemoveAll(dir)
require.NoError(t, err)
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestUpgrade_NeedsUpgrade_Old(t *testing.T) {

// Create the allocations bucket which exists in both the old and 0.9
// schemas
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucket(allocationsBucketName)
return err
}))
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestUpgrade_NeedsUpgrade_Error(t *testing.T) {
db, cleanup := setupBoltDB(t)
defer cleanup()

require.NoError(t, db.Update(func(tx *bolt.Tx) error {
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
require.NoError(t, err)

Expand Down Expand Up @@ -160,7 +160,7 @@ func TestUpgrade_upgradeTaskBucket_InvalidEntries(t *testing.T) {
taskName := []byte("fake-task")

// Insert unexpected bucket, unexpected key, and missing simple-all
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
bkt, err := tx.CreateBucket(taskName)
if err != nil {
return err
Expand All @@ -174,7 +174,7 @@ func TestUpgrade_upgradeTaskBucket_InvalidEntries(t *testing.T) {
return bkt.Put([]byte("unexepectedKey"), []byte{'x'})
}))

require.NoError(t, db.Update(func(tx *bolt.Tx) error {
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(taskName)

// upgradeTaskBucket should fail
Expand Down
58 changes: 29 additions & 29 deletions helper/boltdd/boltdd.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// BOLTdd contains a wrapper around BoltDB to deduplicate writes and encode
// values using mgspack. (dd stands for DeDuplicate)
// Package boltdd contains a wrapper around BBoltDB to deduplicate writes and encode
// values using mgspack. (dd stands for de-duplicate)
package boltdd

import (
Expand All @@ -8,9 +8,9 @@ import (
"os"
"sync"

"github.com/boltdb/bolt"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/structs"
"go.etcd.io/bbolt"
"golang.org/x/crypto/blake2b"
)

Expand All @@ -37,35 +37,35 @@ func IsErrNotFound(e error) bool {
return ok
}

// DB wraps an underlying bolt.DB to create write deduplicating buckets and
// DB wraps an underlying bolt.DB to create write de-duplicating buckets and
// msgpack encoded values.
type DB struct {
rootBuckets map[string]*bucketMeta
rootBucketsLock sync.Mutex

bdb *bolt.DB
boltDB *bbolt.DB
}

// Open a bolt.DB and wrap it in a write-deduplicating msgpack-encoding
// Open a bolt.DB and wrap it in a write-de-duplicating msgpack-encoding
// implementation.
func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) {
bdb, err := bolt.Open(path, mode, options)
func Open(path string, mode os.FileMode, options *bbolt.Options) (*DB, error) {
bdb, err := bbolt.Open(path, mode, options)
if err != nil {
return nil, err
}

return New(bdb), nil
}

// New deduplicating wrapper for the given boltdb.
func New(bdb *bolt.DB) *DB {
// New de-duplicating wrapper for the given bboltdb.
func New(bdb *bbolt.DB) *DB {
return &DB{
rootBuckets: make(map[string]*bucketMeta),
bdb: bdb,
boltDB: bdb,
}
}

func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {
func (db *DB) bucket(btx *bbolt.Tx, name []byte) *Bucket {
bb := btx.Bucket(name)
if bb == nil {
return nil
Expand All @@ -87,7 +87,7 @@ func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {
return newBucket(b, bb)
}

func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) {
func (db *DB) createBucket(btx *bbolt.Tx, name []byte) (*Bucket, error) {
bb, err := btx.CreateBucket(name)
if err != nil {
return nil, err
Expand All @@ -99,7 +99,7 @@ func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) {
// While creating a bucket on a closed db would error, we must recheck
// after acquiring the lock to avoid races.
if db.isClosed() {
return nil, bolt.ErrDatabaseNotOpen
return nil, bbolt.ErrDatabaseNotOpen
}

// Always create a new Bucket since CreateBucket above fails if the
Expand All @@ -110,7 +110,7 @@ func (db *DB) createBucket(btx *bolt.Tx, name []byte) (*Bucket, error) {
return newBucket(b, bb), nil
}

func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error) {
func (db *DB) createBucketIfNotExists(btx *bbolt.Tx, name []byte) (*Bucket, error) {
bb, err := btx.CreateBucketIfNotExists(name)
if err != nil {
return nil, err
Expand All @@ -122,7 +122,7 @@ func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error
// While creating a bucket on a closed db would error, we must recheck
// after acquiring the lock to avoid races.
if db.isClosed() {
return nil, bolt.ErrDatabaseNotOpen
return nil, bbolt.ErrDatabaseNotOpen
}

b, ok := db.rootBuckets[string(name)]
Expand All @@ -135,21 +135,21 @@ func (db *DB) createBucketIfNotExists(btx *bolt.Tx, name []byte) (*Bucket, error
}

func (db *DB) Update(fn func(*Tx) error) error {
return db.bdb.Update(func(btx *bolt.Tx) error {
return db.boltDB.Update(func(btx *bbolt.Tx) error {
tx := newTx(db, btx)
return fn(tx)
})
}

func (db *DB) Batch(fn func(*Tx) error) error {
return db.bdb.Batch(func(btx *bolt.Tx) error {
return db.boltDB.Batch(func(btx *bbolt.Tx) error {
tx := newTx(db, btx)
return fn(tx)
})
}

func (db *DB) View(fn func(*Tx) error) error {
return db.bdb.View(func(btx *bolt.Tx) error {
return db.boltDB.View(func(btx *bbolt.Tx) error {
tx := newTx(db, btx)
return fn(tx)
})
Expand All @@ -167,20 +167,20 @@ func (db *DB) Close() error {
db.rootBucketsLock.Lock()
db.rootBuckets = nil
db.rootBucketsLock.Unlock()
return db.bdb.Close()
return db.boltDB.Close()
}

// BoltDB returns the underlying bolt.DB.
func (db *DB) BoltDB() *bolt.DB {
return db.bdb
func (db *DB) BoltDB() *bbolt.DB {
return db.boltDB
}

type Tx struct {
db *DB
btx *bolt.Tx
btx *bbolt.Tx
}

func newTx(db *DB, btx *bolt.Tx) *Tx {
func newTx(db *DB, btx *bbolt.Tx) *Tx {
return &Tx{
db: db,
btx: btx,
Expand Down Expand Up @@ -208,7 +208,7 @@ func (tx *Tx) Writable() bool {
}

// BoltTx returns the underlying bolt.Tx.
func (tx *Tx) BoltTx() *bolt.Tx {
func (tx *Tx) BoltTx() *bbolt.Tx {
return tx.btx
}

Expand Down Expand Up @@ -290,12 +290,12 @@ func (bm *bucketMeta) getOrCreateBucket(name []byte) *bucketMeta {

type Bucket struct {
bm *bucketMeta
boltBucket *bolt.Bucket
boltBucket *bbolt.Bucket
}

// newBucket creates a new view into a bucket backed by a boltdb
// transaction.
func newBucket(b *bucketMeta, bb *bolt.Bucket) *Bucket {
func newBucket(b *bucketMeta, bb *bbolt.Bucket) *Bucket {
return &Bucket{
bm: b,
boltBucket: bb,
Expand Down Expand Up @@ -408,7 +408,7 @@ func (b *Bucket) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
func (b *Bucket) DeleteBucket(name []byte) error {
// Delete the bucket from the underlying boltdb
err := b.boltBucket.DeleteBucket(name)
if err == bolt.ErrBucketNotFound {
if err == bbolt.ErrBucketNotFound {
err = nil
}

Expand All @@ -419,6 +419,6 @@ func (b *Bucket) DeleteBucket(name []byte) error {

// BoltBucket returns the internal bolt.Bucket for this Bucket. Only valid
// for the duration of the current transaction.
func (b *Bucket) BoltBucket() *bolt.Bucket {
func (b *Bucket) BoltBucket() *bbolt.Bucket {
return b.boltBucket
}
6 changes: 3 additions & 3 deletions helper/boltdd/boltdd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"path/filepath"
"testing"

"github.com/boltdb/bolt"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)

type testingT interface {
Expand Down Expand Up @@ -66,12 +66,12 @@ func TestDB_Close(t *testing.T) {
require.Equal(t, db.Update(func(tx *Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("foo"))
return err
}), bolt.ErrDatabaseNotOpen)
}), bbolt.ErrDatabaseNotOpen)

require.Equal(t, db.Update(func(tx *Tx) error {
_, err := tx.CreateBucket([]byte("foo"))
return err
}), bolt.ErrDatabaseNotOpen)
}), bbolt.ErrDatabaseNotOpen)
}

func TestBucket_Create(t *testing.T) {
Expand Down
Loading

0 comments on commit b2fe196

Please sign in to comment.