From 156da93ab43286db9683c3f86eaaef9493306ec5 Mon Sep 17 00:00:00 2001 From: Alexander Shtuchkin Date: Tue, 20 Oct 2020 17:15:37 -0300 Subject: [PATCH] Implement 'batch mode' for persisting allocations on the client. (#9093) Fixes #9047, see problem details there. As a solution, we use BoltDB's 'Batch' mode that combines multiple parallel writes into small number of transactions. See https://github.com/boltdb/bolt#batch-read-write-transactions for more information. --- client/allocrunner/alloc_runner.go | 12 +-- client/state/db_test.go | 117 ++++++++++++++++++++++++++++- client/state/errdb.go | 18 ++++- client/state/interface.go | 43 +++++++++-- client/state/memdb.go | 6 +- client/state/noopdb.go | 6 +- client/state/state_database.go | 25 ++++-- helper/boltdd/boltdd.go | 7 ++ nomad/mock/mock.go | 12 +++ 9 files changed, 220 insertions(+), 26 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index dc827e335f2..651cc0d2ce7 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -888,7 +888,7 @@ func (ar *allocRunner) PersistState() error { defer ar.destroyedLock.Unlock() if ar.destroyed { - err := ar.stateDB.DeleteAllocationBucket(ar.id) + err := ar.stateDB.DeleteAllocationBucket(ar.id, cstate.WithBatchMode()) if err != nil { ar.logger.Warn("failed to delete allocation bucket", "error", err) } @@ -896,23 +896,25 @@ func (ar *allocRunner) PersistState() error { } // persist network status, wrapping in a func to release state lock as early as possible - if err := func() error { + err := func() error { ar.stateLock.Lock() defer ar.stateLock.Unlock() if ar.state.NetworkStatus != nil { - if err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus); err != nil { + err := ar.stateDB.PutNetworkStatus(ar.id, ar.state.NetworkStatus, cstate.WithBatchMode()) + if err != nil { return err } } return nil - }(); err != nil { + }() + if err != nil { return err } // TODO: consider persisting deployment state along with task status. // While we study why only the alloc is persisted, I opted to maintain current // behavior and not risk adding yet more IO calls unnecessarily. - return ar.stateDB.PutAllocation(ar.Alloc()) + return ar.stateDB.PutAllocation(ar.Alloc(), cstate.WithBatchMode()) } // Destroy the alloc runner by stopping it if it is still running and cleaning diff --git a/client/state/db_test.go b/client/state/db_test.go index bb63507a296..06b78cb8719 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -4,7 +4,9 @@ import ( "io/ioutil" "os" "reflect" + "sync" "testing" + "time" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" @@ -57,7 +59,7 @@ func testDB(t *testing.T, f func(*testing.T, StateDB)) { } } -// TestStateDB asserts the behavior of GetAllAllocations, PutAllocation, and +// TestStateDB_Allocations asserts the behavior of GetAllAllocations, PutAllocation, and // DeleteAllocationBucket for all operational StateDB implementations. func TestStateDB_Allocations(t *testing.T) { t.Parallel() @@ -137,6 +139,119 @@ func TestStateDB_Allocations(t *testing.T) { }) } +// Integer division, rounded up. +func ceilDiv(a, b int) int { + return (a + b - 1) / b +} + +// TestStateDB_Batch asserts the behavior of PutAllocation, PutNetworkStatus and +// DeleteAllocationBucket in batch mode, for all operational StateDB implementations. +func TestStateDB_Batch(t *testing.T) { + t.Parallel() + + testDB(t, func(t *testing.T, db StateDB) { + require := require.New(t) + + // For BoltDB, get initial tx_id + var getTxID func() int + var prevTxID int + var batchDelay time.Duration + var batchSize int + if boltStateDB, ok := db.(*BoltStateDB); ok { + boltdb := boltStateDB.DB().BoltDB() + getTxID = func() int { + tx, err := boltdb.Begin(true) + require.NoError(err) + defer tx.Rollback() + return tx.ID() + } + prevTxID = getTxID() + batchDelay = boltdb.MaxBatchDelay + batchSize = boltdb.MaxBatchSize + } + + // Write 1000 allocations and network statuses in batch mode + startTime := time.Now() + const numAllocs = 1000 + var allocs []*structs.Allocation + for i := 0; i < numAllocs; i++ { + allocs = append(allocs, mock.Alloc()) + } + var wg sync.WaitGroup + for _, alloc := range allocs { + wg.Add(1) + go func(alloc *structs.Allocation) { + require.NoError(db.PutNetworkStatus(alloc.ID, mock.AllocNetworkStatus(), WithBatchMode())) + require.NoError(db.PutAllocation(alloc, WithBatchMode())) + wg.Done() + }(alloc) + } + wg.Wait() + + // Check BoltDB actually combined PutAllocation calls into much fewer transactions. + // The actual number of transactions depends on how fast the goroutines are spawned, + // with every batchDelay (10ms by default) period saved in a separate transaction, + // plus each transaction is limited to batchSize writes (1000 by default). + // See boltdb MaxBatchDelay and MaxBatchSize parameters for more details. + if getTxID != nil { + numTransactions := getTxID() - prevTxID + writeTime := time.Now().Sub(startTime) + expectedNumTransactions := ceilDiv(2 * numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay)) + require.LessOrEqual(numTransactions, expectedNumTransactions) + prevTxID = getTxID() + } + + // Retrieve allocs and make sure they are the same (order can differ) + readAllocs, errs, err := db.GetAllAllocations() + require.NoError(err) + require.NotNil(readAllocs) + require.Len(readAllocs, len(allocs)) + require.NotNil(errs) + require.Empty(errs) + + readAllocsById := make(map[string]*structs.Allocation) + for _, readAlloc := range readAllocs { + readAllocsById[readAlloc.ID] = readAlloc + } + for _, alloc := range allocs { + readAlloc, ok := readAllocsById[alloc.ID] + if !ok { + t.Fatalf("no alloc with ID=%q", alloc.ID) + } + if !reflect.DeepEqual(readAlloc, alloc) { + pretty.Ldiff(t, readAlloc, alloc) + t.Fatalf("alloc %q unequal", alloc.ID) + } + } + + // Delete all allocs in batch mode + startTime = time.Now() + for _, alloc := range allocs { + wg.Add(1) + go func(alloc *structs.Allocation) { + require.NoError(db.DeleteAllocationBucket(alloc.ID, WithBatchMode())) + wg.Done() + }(alloc) + } + wg.Wait() + + // Check BoltDB combined DeleteAllocationBucket calls into much fewer transactions. + if getTxID != nil { + numTransactions := getTxID() - prevTxID + writeTime := time.Now().Sub(startTime) + expectedNumTransactions := ceilDiv(numAllocs, batchSize) + ceilDiv(int(writeTime), int(batchDelay)) + require.LessOrEqual(numTransactions, expectedNumTransactions) + prevTxID = getTxID() + } + + // Check all allocs were deleted. + readAllocs, errs, err = db.GetAllAllocations() + require.NoError(err) + require.Empty(readAllocs) + require.Empty(errs) + }) +} + // TestStateDB_TaskState asserts the behavior of task state related StateDB // methods. func TestStateDB_TaskState(t *testing.T) { diff --git a/client/state/errdb.go b/client/state/errdb.go index 1e6270f994a..b6e75e6e265 100644 --- a/client/state/errdb.go +++ b/client/state/errdb.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" + "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,7 +28,7 @@ func (m *ErrDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er return m.Allocs, nil, nil } -func (m *ErrDB) PutAllocation(alloc *structs.Allocation) error { +func (m *ErrDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error { return fmt.Errorf("Error!") } @@ -43,7 +44,7 @@ func (m *ErrDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e return nil, fmt.Errorf("Error!") } -func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error { +func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error { return fmt.Errorf("Error!") } @@ -63,7 +64,7 @@ func (m *ErrDB) DeleteTaskBucket(allocID, taskName string) error { return fmt.Errorf("Error!") } -func (m *ErrDB) DeleteAllocationBucket(allocID string) error { +func (m *ErrDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error { return fmt.Errorf("Error!") } @@ -71,6 +72,14 @@ func (m *ErrDB) PutDevicePluginState(ps *dmstate.PluginState) error { return fmt.Errorf("Error!") } +func (m *ErrDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + return nil, fmt.Errorf("Error!") +} + +func (m *ErrDB) PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error { + return fmt.Errorf("Error!") +} + // GetDevicePluginState stores the device manager's plugin state or returns an // error. func (m *ErrDB) GetDevicePluginState() (*dmstate.PluginState, error) { @@ -88,3 +97,6 @@ func (m *ErrDB) PutDriverPluginState(ps *driverstate.PluginState) error { func (m *ErrDB) Close() error { return fmt.Errorf("Error!") } + +// Ensure *ErrDB implements StateDB +var _ StateDB = (*ErrDB)(nil) diff --git a/client/state/interface.go b/client/state/interface.go index b29ba5daff0..9bfb93cf128 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -24,9 +24,9 @@ type StateDB interface { // If a single error is returned then both allocations and the map will be nil. GetAllAllocations() ([]*structs.Allocation, map[string]error, error) - // PulAllocation stores an allocation or returns an error if it could + // PutAllocation stores an allocation or returns an error if it could // not be stored. - PutAllocation(*structs.Allocation) error + PutAllocation(*structs.Allocation, ...WriteOption) error // Get/Put DeploymentStatus get and put the allocation's deployment // status. It may be nil. @@ -36,7 +36,7 @@ type StateDB interface { // Get/Put NetworkStatus get and put the allocation's network // status. It may be nil. GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, error) - PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error + PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. Either state may be nil if it is not found, but if an @@ -57,7 +57,7 @@ type StateDB interface { // DeleteAllocationBucket deletes an allocation's state bucket if it // exists. No error is returned if it does not exist. - DeleteAllocationBucket(allocID string) error + DeleteAllocationBucket(allocID string, opts ...WriteOption) error // GetDevicePluginState is used to retrieve the device manager's plugin // state. @@ -78,10 +78,43 @@ type StateDB interface { // GetDynamicPluginRegistryState is used to retrieve a dynamic plugin manager's state. GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) - // PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state. + // PutDynamicPluginRegistryState is used to store the dynamic plugin manager's state. PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error // Close the database. Unsafe for further use after calling regardless // of return value. Close() error } + +// WriteOptions adjusts the way the data is persisted by the StateDB above. Default is +// zero/false values for all fields. To provide different values, use With* functions +// below, like this: statedb.PutAllocation(alloc, WithBatchMode()) +type WriteOptions struct { + // In Batch mode, concurrent writes (Put* and Delete* operations above) are + // coalesced into a single transaction, increasing write performance. To benefit + // from this mode, writes must happen concurrently in goroutines, as every write + // request still waits for the shared transaction to commit before returning. + // See https://github.com/boltdb/bolt#batch-read-write-transactions for details. + // This mode is only supported for BoltDB state backend and is ignored in other backends. + BatchMode bool +} + +// WriteOption is a function that modifies WriteOptions struct above. +type WriteOption func(*WriteOptions) + +// mergeWriteOptions creates a final WriteOptions struct to be used by the write methods above +// from a list of WriteOption-s provided as variadic arguments. +func mergeWriteOptions(opts []WriteOption) WriteOptions { + writeOptions := WriteOptions{} // Default WriteOptions is zero value. + for _, opt := range opts { + opt(&writeOptions) + } + return writeOptions +} + +// Enable Batch mode for write requests (Put* and Delete* operations above). +func WithBatchMode() WriteOption { + return func(s *WriteOptions) { + s.BatchMode = true + } +} diff --git a/client/state/memdb.go b/client/state/memdb.go index b4799478017..1e281410979 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -73,7 +73,7 @@ func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er return allocs, map[string]error{}, nil } -func (m *MemDB) PutAllocation(alloc *structs.Allocation) error { +func (m *MemDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error { m.mu.Lock() defer m.mu.Unlock() m.allocs[alloc.ID] = alloc @@ -99,7 +99,7 @@ func (m *MemDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e return m.networkStatus[allocID], nil } -func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus) error { +func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error { m.mu.Lock() m.networkStatus[allocID] = ns defer m.mu.Unlock() @@ -175,7 +175,7 @@ func (m *MemDB) DeleteTaskBucket(allocID, taskName string) error { return nil } -func (m *MemDB) DeleteAllocationBucket(allocID string) error { +func (m *MemDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index 2c35cc6693b..e199a7affab 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -23,7 +23,7 @@ func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, er return nil, nil, nil } -func (n NoopDB) PutAllocation(*structs.Allocation) error { +func (n NoopDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error { return nil } @@ -39,7 +39,7 @@ func (n NoopDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkStatus, e return nil, nil } -func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error { +func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, opts ...WriteOption) error { return nil } @@ -59,7 +59,7 @@ func (n NoopDB) DeleteTaskBucket(allocID, taskName string) error { return nil } -func (n NoopDB) DeleteAllocationBucket(allocID string) error { +func (n NoopDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index 4c0ad4fe07f..25ab25917cc 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -234,8 +234,8 @@ func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, m } // PutAllocation stores an allocation or returns an error. -func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { - return s.db.Update(func(tx *boltdd.Tx) error { +func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation, opts ...WriteOption) error { + return s.updateWithOptions(opts, func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucketName) if err != nil { @@ -321,8 +321,8 @@ type networkStatusEntry struct { // PutDeploymentStatus stores an allocation's DeploymentStatus or returns an // error. -func (s *BoltStateDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus) error { - return s.db.Update(func(tx *boltdd.Tx) error { +func (s *BoltStateDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, opts ...WriteOption) error { + return s.updateWithOptions(opts, func(tx *boltdd.Tx) error { return putNetworkStatusImpl(tx, allocID, ds) }) } @@ -493,8 +493,8 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { } // DeleteAllocationBucket is used to delete an allocation bucket if it exists. -func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error { - return s.db.Update(func(tx *boltdd.Tx) error { +func (s *BoltStateDB) DeleteAllocationBucket(allocID string, opts ...WriteOption) error { + return s.updateWithOptions(opts, func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket allocations := tx.Bucket(allocationsBucketName) if allocations == nil { @@ -725,6 +725,19 @@ func (s *BoltStateDB) init() error { }) } +// updateWithOptions enables adjustments to db.Update operation, including Batch mode. +func (s *BoltStateDB) updateWithOptions(opts []WriteOption, updateFn func(tx *boltdd.Tx) error) error { + writeOpts := mergeWriteOptions(opts) + + if writeOpts.BatchMode { + // In Batch mode, BoltDB opportunistically combines multiple concurrent writes into one or + // several transactions. See boltdb.Batch() documentation for details. + return s.db.Batch(updateFn) + } else { + return s.db.Update(updateFn) + } +} + // Upgrade bolt state db from 0.8 schema to 0.9 schema. Noop if already using // 0.9 schema. Creates a backup before upgrading. func (s *BoltStateDB) Upgrade() error { diff --git a/helper/boltdd/boltdd.go b/helper/boltdd/boltdd.go index 2c221c10a8f..ea337047de7 100644 --- a/helper/boltdd/boltdd.go +++ b/helper/boltdd/boltdd.go @@ -141,6 +141,13 @@ func (db *DB) Update(fn func(*Tx) error) error { }) } +func (db *DB) Batch(fn func(*Tx) error) error { + return db.bdb.Batch(func(btx *bolt.Tx) error { + tx := newTx(db, btx) + return fn(tx) + }) +} + func (db *DB) View(fn func(*Tx) error) error { return db.bdb.View(func(btx *bolt.Tx) error { tx := newTx(db, btx) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 765ca0730fa..a2bda9e5725 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1493,3 +1493,15 @@ func Events(index uint64) *structs.Events { }, } } + +func AllocNetworkStatus() *structs.AllocNetworkStatus { + return &structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "192.168.0.100", + DNS: &structs.DNSConfig{ + Servers: []string{"1.1.1.1"}, + Searches: []string{"localdomain"}, + Options: []string{"ndots:5"}, + }, + } +}