Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 'batch mode' for persisting allocations on the client. #9093

Merged
merged 2 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,31 +888,33 @@ func (ar *allocRunner) PersistState() error {
defer ar.destroyedLock.Unlock()

if ar.destroyed {
err := ar.stateDB.DeleteAllocationBucket(ar.id)
err := ar.stateDB.DeleteAllocationBucket(ar.id, cstate.WithBatchMode())
if err != nil {
ar.logger.Warn("failed to delete allocation bucket", "error", err)
}
return nil
}

// persist network status, wrapping in a func to release state lock as early as possible
if err := func() error {
err := func() error {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
if ar.state.NetworkStatus != nil {
if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus); err != nil {
err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus, cstate.WithBatchMode())
if err != nil {
return err
}
}
return nil
}(); err != nil {
}()
if err != nil {
return err
}

// TODO: consider persisting deployment state along with task status.
// While we study why only the alloc is persisted, I opted to maintain current
// behavior and not risk adding yet more IO calls unnecessarily.
return ar.stateDB.PutAllocation(ar.Alloc())
return ar.stateDB.PutAllocation(ar.Alloc(), cstate.WithBatchMode())
}

// Destroy the alloc runner by stopping it if it is still running and cleaning
Expand Down
117 changes: 116 additions & 1 deletion client/state/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"io/ioutil"
"os"
"reflect"
"sync"
"testing"
"time"

trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
Expand Down Expand Up @@ -57,7 +59,7 @@ func testDB(t *testing.T, f func(*testing.T, StateDB)) {
}
}

// TestStateDB asserts the behavior of GetAllAllocations, PutAllocation, and
// TestStateDB_Allocations asserts the behavior of GetAllAllocations, PutAllocation, and
// DeleteAllocationBucket for all operational StateDB implementations.
func TestStateDB_Allocations(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -137,6 +139,119 @@ func TestStateDB_Allocations(t *testing.T) {
})
}

// Integer division, rounded up.
func ceilDiv(a, b int) int {
return (a + b - 1) / b
}

// TestStateDB_Batch asserts the behavior of PutAllocation, PutNetworkStatus and
// DeleteAllocationBucket in batch mode, for all operational StateDB implementations.
func TestStateDB_Batch(t *testing.T) {
t.Parallel()

testDB(t, func(t *testing.T, db StateDB) {
require := require.New(t)

// For BoltDB, get initial tx_id
var getTxID func() int
var prevTxID int
var batchDelay time.Duration
var batchSize int
if boltStateDB, ok := db.(*BoltStateDB); ok {
boltdb := boltStateDB.DB().BoltDB()
getTxID = func() int {
tx, err := boltdb.Begin(true)
require.NoError(err)
defer tx.Rollback()
return tx.ID()
}
prevTxID = getTxID()
batchDelay = boltdb.MaxBatchDelay
batchSize = boltdb.MaxBatchSize
}

// Write 1000 allocations and network statuses in batch mode
startTime := time.Now()
const numAllocs = 1000
var allocs []*structs.Allocation
for i := 0; i < numAllocs; i++ {
allocs = append(allocs, mock.Alloc())
}
var wg sync.WaitGroup
for _, alloc := range allocs {
wg.Add(1)
go func(alloc *structs.Allocation) {
require.NoError(db.PutNetworkStatus(alloc.ID, mock.AllocNetworkStatus(), WithBatchMode()))
require.NoError(db.PutAllocation(alloc, WithBatchMode()))
wg.Done()
}(alloc)
}
wg.Wait()

// Check BoltDB actually combined PutAllocation calls into much fewer transactions.
// The actual number of transactions depends on how fast the goroutines are spawned,
// with every batchDelay (10ms by default) period saved in a separate transaction,
// plus each transaction is limited to batchSize writes (1000 by default).
// See boltdb MaxBatchDelay and MaxBatchSize parameters for more details.
if getTxID != nil {
numTransactions := getTxID() - prevTxID
writeTime := time.Now().Sub(startTime)
expectedNumTransactions := ceilDiv(2 * numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay))
require.LessOrEqual(numTransactions, expectedNumTransactions)
prevTxID = getTxID()
}

// Retrieve allocs and make sure they are the same (order can differ)
readAllocs, errs, err := db.GetAllAllocations()
require.NoError(err)
require.NotNil(readAllocs)
require.Len(readAllocs, len(allocs))
require.NotNil(errs)
require.Empty(errs)

readAllocsById := make(map[string]*structs.Allocation)
for _, readAlloc := range readAllocs {
readAllocsById[readAlloc.ID] = readAlloc
}
for _, alloc := range allocs {
readAlloc, ok := readAllocsById[alloc.ID]
if !ok {
t.Fatalf("no alloc with ID=%q", alloc.ID)
}
if !reflect.DeepEqual(readAlloc, alloc) {
pretty.Ldiff(t, readAlloc, alloc)
t.Fatalf("alloc %q unequal", alloc.ID)
}
}

// Delete all allocs in batch mode
startTime = time.Now()
for _, alloc := range allocs {
wg.Add(1)
go func(alloc *structs.Allocation) {
require.NoError(db.DeleteAllocationBucket(alloc.ID, WithBatchMode()))
wg.Done()
}(alloc)
}
wg.Wait()

// Check BoltDB combined DeleteAllocationBucket calls into much fewer transactions.
if getTxID != nil {
numTransactions := getTxID() - prevTxID
writeTime := time.Now().Sub(startTime)
expectedNumTransactions := ceilDiv(numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay))
require.LessOrEqual(numTransactions, expectedNumTransactions)
prevTxID = getTxID()
}

// Check all allocs were deleted.
readAllocs, errs, err = db.GetAllAllocations()
require.NoError(err)
require.Empty(readAllocs)
require.Empty(errs)
})
}

// TestStateDB_TaskState asserts the behavior of task state related StateDB
// methods.
func TestStateDB_TaskState(t *testing.T) {
Expand Down
18 changes: 15 additions & 3 deletions client/state/errdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -27,7 +28,7 @@ func (m *ErrDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er
return m.Allocs, nil, nil
}

func (m *ErrDB) PutAllocation(alloc *structs.Allocation) error {
func (m *ErrDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error {
return fmt.Errorf("Error!")
}

Expand All @@ -43,7 +44,7 @@ func (m *ErrDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e
return nil, fmt.Errorf("Error!")
}

func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error {
func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error {
return fmt.Errorf("Error!")
}

Expand All @@ -63,14 +64,22 @@ func (m *ErrDB) DeleteTaskBucket(allocID, taskName string) error {
return fmt.Errorf("Error!")
}

func (m *ErrDB) DeleteAllocationBucket(allocID string) error {
func (m *ErrDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error {
return fmt.Errorf("Error!")
}

func (m *ErrDB) PutDevicePluginState(ps *dmstate.PluginState) error {
return fmt.Errorf("Error!")
}

func (m *ErrDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) {
return nil, fmt.Errorf("Error!")
}

func (m *ErrDB) PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error {
return fmt.Errorf("Error!")
}

// GetDevicePluginState stores the device manager's plugin state or returns an
// error.
func (m *ErrDB) GetDevicePluginState() (*dmstate.PluginState, error) {
Expand All @@ -88,3 +97,6 @@ func (m *ErrDB) PutDriverPluginState(ps *driverstate.PluginState) error {
func (m *ErrDB) Close() error {
return fmt.Errorf("Error!")
}

// Ensure *ErrDB implements StateDB
var _ StateDB = (*ErrDB)(nil)
43 changes: 38 additions & 5 deletions client/state/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ type StateDB interface {
// If a single error is returned then both allocations and the map will be nil.
GetAllAllocations() ([]*structs.Allocation, map[string]error, error)

// PulAllocation stores an allocation or returns an error if it could
// PutAllocation stores an allocation or returns an error if it could
// not be stored.
PutAllocation(*structs.Allocation) error
PutAllocation(*structs.Allocation, ...WriteOption) error

// Get/Put DeploymentStatus get and put the allocation's deployment
// status. It may be nil.
Expand All @@ -36,7 +36,7 @@ type StateDB interface {
// Get/Put NetworkStatus get and put the allocation's network
// status. It may be nil.
GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error)
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error
PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error

// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. Either state may be nil if it is not found, but if an
Expand All @@ -57,7 +57,7 @@ type StateDB interface {

// DeleteAllocationBucket deletes an allocation's state bucket if it
// exists. No error is returned if it does not exist.
DeleteAllocationBucket(allocID string) error
DeleteAllocationBucket(allocID string, opts ...WriteOption) error

// GetDevicePluginState is used to retrieve the device manager's plugin
// state.
Expand All @@ -78,10 +78,43 @@ type StateDB interface {
// GetDynamicPluginRegistryState is used to retrieve a dynamic plugin manager's state.
GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error)

// PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state.
// PutDynamicPluginRegistryState is used to store the dynamic plugin manager's state.
PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error

// Close the database. Unsafe for further use after calling regardless
// of return value.
Close() error
}

// WriteOptions adjusts the way the data is persisted by the StateDB above. Default is
// zero/false values for all fields. To provide different values, use With* functions
// below, like this: statedb.PutAllocation(alloc, WithBatchMode())
type WriteOptions struct {
// In Batch mode, concurrent writes (Put* and Delete* operations above) are
// coalesced into a single transaction, increasing write performance. To benefit
// from this mode, writes must happen concurrently in goroutines, as every write
// request still waits for the shared transaction to commit before returning.
// See https://github.com/boltdb/bolt#batch-read-write-transactions for details.
// This mode is only supported for BoltDB state backend and is ignored in other backends.
BatchMode bool
}

// WriteOption is a function that modifies WriteOptions struct above.
type WriteOption func(*WriteOptions)

// mergeWriteOptions creates a final WriteOptions struct to be used by the write methods above
// from a list of WriteOption-s provided as variadic arguments.
func mergeWriteOptions(opts []WriteOption) WriteOptions {
writeOptions := WriteOptions{} // Default WriteOptions is zero value.
for _, opt := range opts {
opt(&writeOptions)
}
return writeOptions
}

// Enable Batch mode for write requests (Put* and Delete* operations above).
func WithBatchMode() WriteOption {
return func(s *WriteOptions) {
s.BatchMode = true
}
}
6 changes: 3 additions & 3 deletions client/state/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er
return allocs, map[string]error{}, nil
}

func (m *MemDB) PutAllocation(alloc *structs.Allocation) error {
func (m *MemDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error {
m.mu.Lock()
defer m.mu.Unlock()
m.allocs[alloc.ID] = alloc
Expand All @@ -99,7 +99,7 @@ func (m *MemDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e
return m.networkStatus[allocID], nil
}

func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error {
func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error {
m.mu.Lock()
m.networkStatus[allocID] = ns
defer m.mu.Unlock()
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *MemDB) DeleteTaskBucket(allocID, taskName string) error {
return nil
}

func (m *MemDB) DeleteAllocationBucket(allocID string) error {
func (m *MemDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions client/state/noopdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er
return nil, nil, nil
}

func (n NoopDB) PutAllocation(*structs.Allocation) error {
func (n NoopDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error {
return nil
}

Expand All @@ -39,7 +39,7 @@ func (n NoopDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e
return nil, nil
}

func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error {
func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, opts ...WriteOption) error {
return nil
}

Expand All @@ -59,7 +59,7 @@ func (n NoopDB) DeleteTaskBucket(allocID, taskName string) error {
return nil
}

func (n NoopDB) DeleteAllocationBucket(allocID string) error {
func (n NoopDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error {
return nil
}

Expand Down
Loading