From 85a81f47de0dbac56715348a4d57e889da34c6e7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 May 2017 16:18:03 -0700 Subject: [PATCH] Revert "metrics" This reverts commit 4d6a012c6fb6f1fba6c62985d091b1a20c3198e7. --- client/alloc_runner.go | 4 -- client/client.go | 2 - client/state_database.go | 108 +++++++++++++++++++++++++++++++++++++++ client/task_runner.go | 5 -- 4 files changed, 108 insertions(+), 11 deletions(-) create mode 100644 client/state_database.go diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 25c21d3e99d..21c3ce2ed82 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -8,7 +8,6 @@ import ( "sync" "time" - metrics "github.com/armon/go-metrics" "github.com/boltdb/bolt" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" @@ -540,7 +539,6 @@ func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.T // Run is a long running goroutine used to manage an allocation func (r *AllocRunner) Run() { - start := time.Now() defer close(r.waitCh) go r.dirtySyncState() @@ -606,8 +604,6 @@ func (r *AllocRunner) Run() { } r.taskLock.Unlock() - metrics.MeasureSince([]string{"client", "alloc_runner", "run_delay"}, start) - // taskDestroyEvent contains an event that caused the destroyment of a task // in the allocation. var taskDestroyEvent *structs.TaskEvent diff --git a/client/client.go b/client/client.go index 84bf27afe74..faf0a34bba3 100644 --- a/client/client.go +++ b/client/client.go @@ -1490,8 +1490,6 @@ func (c *Client) watchNodeUpdates() { // runAllocs is invoked when we get an updated set of allocations func (c *Client) runAllocs(update *allocUpdates) { - defer metrics.MeasureSince([]string{"client", "client", "runAllocs"}, time.Now()) - // Get the existing allocs c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) diff --git a/client/state_database.go b/client/state_database.go new file mode 100644 index 00000000000..ba6f71d71f7 --- /dev/null +++ b/client/state_database.go @@ -0,0 +1,108 @@ +package client + +import ( + "bytes" + "fmt" + + "github.com/boltdb/bolt" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/ugorji/go/codec" +) + +/* +The client has a boltDB backed state store. The schema as of 0.6 looks as follows: + +allocations/ (bucket) +|--> / (bucket) + |--> alloc_runner persisted objects (k/v) + |--> / (bucket) + |--> task_runner persisted objects (k/v) +*/ + +var ( + // allocationsBucket is the bucket name containing all allocation related + // data + allocationsBucket = []byte("allocations") +) + +func putObject(bkt *bolt.Bucket, key []byte, obj interface{}) error { + if !bkt.Writable() { + return fmt.Errorf("bucket must be writable") + } + + // Serialize the object + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(obj); err != nil { + return fmt.Errorf("failed to encode passed object: %v", err) + } + + if err := bkt.Put(key, buf.Bytes()); err != nil { + return fmt.Errorf("failed to write data at key %v: %v", string(key), err) + } + + return nil +} + +func putData(bkt *bolt.Bucket, key, value []byte) error { + if !bkt.Writable() { + return fmt.Errorf("bucket must be writable") + } + + if err := bkt.Put(key, value); err != nil { + return fmt.Errorf("failed to write data at key %v: %v", string(key), err) + } + + return nil +} + +// getAllocationBucket returns the bucket used to persist state about a +// particular allocation. If the root allocation bucket or the specific +// allocation bucket doesn't exist, it will be created. +func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { + if !tx.Writable() { + return nil, fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) + if err != nil { + return nil, err + } + + // Retrieve the specific allocations bucket + alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) + if err != nil { + return nil, err + } + + return alloc, nil +} + +// getTaskBucket returns the bucket used to persist state about a +// particular task. If the root allocation bucket, the specific +// allocation or task bucket doesn't exist, they will be created. +func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { + if !tx.Writable() { + return nil, fmt.Errorf("transaction must be writable") + } + + // Retrieve the root allocations bucket + allocations, err := tx.CreateBucketIfNotExists(allocationsBucket) + if err != nil { + return nil, err + } + + // Retrieve the specific allocations bucket + alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID)) + if err != nil { + return nil, err + } + + // Retrieve the specific task bucket + task, err := alloc.CreateBucketIfNotExists([]byte(taskName)) + if err != nil { + return nil, err + } + + return task, nil +} diff --git a/client/task_runner.go b/client/task_runner.go index 00c760a86c2..ca74ae50ac6 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -800,7 +800,6 @@ func (r *TaskRunner) updatedTokenHandler() { // prestart handles life-cycle tasks that occur before the task has started. func (r *TaskRunner) prestart(resultCh chan bool) { - defer metrics.MeasureSince([]string{"client", "task_runner", "prestart"}, time.Now()) if r.task.Vault != nil { // Wait for the token r.logger.Printf("[DEBUG] client: waiting for Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID) @@ -944,7 +943,6 @@ func (r *TaskRunner) postrun() { // run is the main run loop that handles starting the application, destroying // it, restarts and signals. func (r *TaskRunner) run() { - start := time.Now() // Predeclare things so we can jump to the RESTART var stopCollection chan struct{} var handleWaitCh chan *dstructs.WaitResult @@ -983,7 +981,6 @@ func (r *TaskRunner) run() { handleEmpty := r.handle == nil r.handleLock.Unlock() if handleEmpty { - metrics.MeasureSince([]string{"client", "task_runner", "run_delay"}, start) startErr := r.startTask() r.restartTracker.SetStartError(startErr) if startErr != nil { @@ -1261,7 +1258,6 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { // startTask creates the driver, task dir, and starts the task. func (r *TaskRunner) startTask() error { - defer metrics.MeasureSince([]string{"client", "task_runner", "startTask"}, time.Now()) // Create a driver drv, err := r.createDriver() if err != nil { @@ -1349,7 +1345,6 @@ func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) { // buildTaskDir creates the task directory before driver.Prestart. It is safe // to call multiple times as its state is persisted. func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error { - defer metrics.MeasureSince([]string{"client", "task_runner", "buildTaskDir"}, time.Now()) r.persistLock.Lock() built := r.taskDirBuilt r.persistLock.Unlock()