Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run job deregistering in a single transaction #4861

Merged
merged 2 commits into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,14 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}
return n.state.WithWriteTransaction(func(tx state.Txn) error {
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}

return nil
return nil
})
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
Expand All @@ -518,38 +520,56 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
panic(fmt.Errorf("failed to decode request: %v", err))
}

for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS, "error", err)
// Perform all store updates atomically to ensure a consistent views for store readers.
notnoop marked this conversation as resolved.
Show resolved Hide resolved
// A partial update may increment the snapshot index, allowing eval brokers to process
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
notnoop marked this conversation as resolved.
Show resolved Hide resolved
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS, "error", err)
return err
}
}

if err := n.state.UpsertEvalsTxn(index, req.Evals, tx); err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
return err
}

return nil
})

if err != nil {
return err
}

return n.upsertEvals(index, req.Evals)
// perform the side effects outside the transactions
n.handleUpsertedEvals(req.Evals)
return nil
}

// handleJobDeregister is used to deregister a job.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool) error {
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error {
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
n.logger.Error("periodicDispatcher.Remove failed", "error", err)
return err
}

if purge {
if err := n.state.DeleteJob(index, namespace, jobID); err != nil {
if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil {
n.logger.Error("DeleteJob failed", "error", err)
return err
}

// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-periodic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, namespace, jobID)
n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx)
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByID(ws, namespace, jobID)
current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx)
if err != nil {
n.logger.Error("JobByID lookup failed", "error", err)
return err
Expand All @@ -562,7 +582,7 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
stopped := current.Copy()
stopped.Stop = true

if err := n.state.UpsertJob(index, stopped); err != nil {
if err := n.state.UpsertJobTxn(index, stopped, tx); err != nil {
n.logger.Error("UpsertJob failed", "error", err)
return err
}
Expand Down
62 changes: 59 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

// Txn is a transaction against a state store.
// This can be a read or write transaction.
type Txn = *memdb.Txn
notnoop marked this conversation as resolved.
Show resolved Hide resolved

const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
Expand Down Expand Up @@ -923,6 +927,12 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return nil
}

// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob,
// but in a transcation. Useful for when making multiple modifications atomically
func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error {
return s.upsertJobImpl(index, job, false, txn)
}

// upsertJobImpl is the implementation for registering a job or updating a job definition
func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
Expand Down Expand Up @@ -1006,6 +1016,16 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()

err := s.DeleteJobTxn(index, namespace, jobID, txn)
if err == nil {
txn.Commit()
}
return err
}

// DeleteJobTxn is used to deregister a job, like DeleteJob,
// but in a transcation. Useful for when making multiple modifications atomically
notnoop marked this conversation as resolved.
Show resolved Hide resolved
func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
namespace = structs.DefaultNamespace
Expand Down Expand Up @@ -1092,7 +1112,6 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
return fmt.Errorf("index update failed: %v", err)
}

txn.Commit()
return nil
}

Expand Down Expand Up @@ -1190,7 +1209,12 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb
// version.
func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) {
txn := s.db.Txn(false)
return s.JobByIDTxn(ws, namespace, id, txn)
}

// JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version
// accessable through in the transaction
func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
namespace = structs.DefaultNamespace
Expand Down Expand Up @@ -1511,6 +1535,16 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
txn := s.db.Txn(true)
defer txn.Abort()

err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn)
if err == nil {
txn.Commit()
}
return err
}

// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch
// but in a transcation. Useful for when making multiple modifications atomically
func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error {
// COMPAT 0.7: Upgrade old objects that do not have namespaces
if namespace == "" {
namespace = structs.DefaultNamespace
Expand All @@ -1533,7 +1567,6 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
return fmt.Errorf("index update failed: %v", err)
}

txn.Commit()
return nil
}

Expand Down Expand Up @@ -1580,6 +1613,16 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
txn := s.db.Txn(true)
defer txn.Abort()

err := s.UpsertEvalsTxn(index, evals, txn)
if err == nil {
txn.Commit()
}
return err
}

// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals
// but in a transcation. Useful for when making multiple modifications atomically
func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error {
// Do a nested upsert
jobs := make(map[structs.NamespacedID]string, len(evals))
for _, eval := range evals {
Expand All @@ -1599,7 +1642,6 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
return fmt.Errorf("setting job status failed: %v", err)
}

txn.Commit()
return nil
}

Expand Down Expand Up @@ -3889,6 +3931,20 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon
return nil
}

// WithWriteTransaction executes the passed function within a write transaction,
// and returns its result. If the invocation returns no error, the transaction
// is committed; otherwise, it's aborted.
func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error {
notnoop marked this conversation as resolved.
Show resolved Hide resolved
tx := s.db.Txn(true)
defer tx.Abort()

err := fn(tx)
if err == nil {
tx.Commit()
}
return err
}

// SchedulerCASConfig is used to update the scheduler configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop.
Expand Down