Skip to content

Commit

Permalink
Merge pull request #4861 from hashicorp/b-batch-deregister-transaction
Browse files Browse the repository at this point in the history
Run job deregistering in a single transaction
  • Loading branch information
notnoop authored Nov 13, 2018
2 parents 66fc70f + 9405473 commit 91ea405
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 17 deletions.
48 changes: 34 additions & 14 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,14 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}
return n.state.WithWriteTransaction(func(tx state.Txn) error {
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}

return nil
return nil
})
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
Expand All @@ -518,38 +520,56 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
panic(fmt.Errorf("failed to decode request: %v", err))
}

for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS, "error", err)
// Perform all store updates atomically to ensure a consistent views for store readers.
// A partial update may increment the snapshot index, allowing eval brokers to process
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS, "error", err)
return err
}
}

if err := n.state.UpsertEvalsTxn(index, req.Evals, tx); err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
return err
}

return nil
})

if err != nil {
return err
}

return n.upsertEvals(index, req.Evals)
// perform the side effects outside the transactions
n.handleUpsertedEvals(req.Evals)
return nil
}

// handleJobDeregister is used to deregister a job.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool) error {
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error {
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
n.logger.Error("periodicDispatcher.Remove failed", "error", err)
return err
}

if purge {
if err := n.state.DeleteJob(index, namespace, jobID); err != nil {
if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil {
n.logger.Error("DeleteJob failed", "error", err)
return err
}

// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-periodic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, namespace, jobID)
n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx)
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByID(ws, namespace, jobID)
current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx)
if err != nil {
n.logger.Error("JobByID lookup failed", "error", err)
return err
Expand All @@ -562,7 +582,7 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
stopped := current.Copy()
stopped.Stop = true

if err := n.state.UpsertJob(index, stopped); err != nil {
if err := n.state.UpsertJobTxn(index, stopped, tx); err != nil {
n.logger.Error("UpsertJob failed", "error", err)
return err
}
Expand Down
62 changes: 59 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

// Txn is a transaction against a state store.
// This can be a read or write transaction.
type Txn = *memdb.Txn

const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
Expand Down Expand Up @@ -922,6 +926,12 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return nil
}

// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob,
// but in a transcation. Useful for when making multiple modifications atomically
func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error {
return s.upsertJobImpl(index, job, false, txn)
}

// upsertJobImpl is the implementation for registering a job or updating a job definition
func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
Expand Down Expand Up @@ -1005,6 +1015,16 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()

err := s.DeleteJobTxn(index, namespace, jobID, txn)
if err == nil {
txn.Commit()
}
return err
}

// DeleteJobTxn is used to deregister a job, like DeleteJob,
// but in a transcation. Useful for when making multiple modifications atomically
func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
namespace = structs.DefaultNamespace
Expand Down Expand Up @@ -1091,7 +1111,6 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
return fmt.Errorf("index update failed: %v", err)
}

txn.Commit()
return nil
}

Expand Down Expand Up @@ -1189,7 +1208,12 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb
// version.
func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) {
txn := s.db.Txn(false)
return s.JobByIDTxn(ws, namespace, id, txn)
}

// JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version
// accessable through in the transaction
func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
namespace = structs.DefaultNamespace
Expand Down Expand Up @@ -1510,6 +1534,16 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
txn := s.db.Txn(true)
defer txn.Abort()

err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn)
if err == nil {
txn.Commit()
}
return err
}

// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch
// but in a transcation. Useful for when making multiple modifications atomically
func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
namespace = structs.DefaultNamespace
Expand All @@ -1532,7 +1566,6 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
return fmt.Errorf("index update failed: %v", err)
}

txn.Commit()
return nil
}

Expand Down Expand Up @@ -1579,6 +1612,16 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
txn := s.db.Txn(true)
defer txn.Abort()

err := s.UpsertEvalsTxn(index, evals, txn)
if err == nil {
txn.Commit()
}
return err
}

// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals
// but in a transcation. Useful for when making multiple modifications atomically
func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error {
// Do a nested upsert
jobs := make(map[structs.NamespacedID]string, len(evals))
for _, eval := range evals {
Expand All @@ -1598,7 +1641,6 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Commit()
return nil
}

Expand Down Expand Up @@ -3901,6 +3943,20 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon
return nil
}

// WithWriteTransaction executes the passed function within a write transaction,
// and returns its result. If the invocation returns no error, the transaction
// is committed; otherwise, it's aborted.
func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error {
tx := s.db.Txn(true)
defer tx.Abort()

err := fn(tx)
if err == nil {
tx.Commit()
}
return err
}

// SchedulerCASConfig is used to update the scheduler configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop.
Expand Down

0 comments on commit 91ea405

Please sign in to comment.