Skip to content

Commit

Permalink
Exit if alloc listener closes
Browse files Browse the repository at this point in the history
Add test for that case, add comments, remove debug logging
  • Loading branch information
schmichael committed Aug 11, 2017
1 parent 8253439 commit 4f7c5e6
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 16 deletions.
59 changes: 43 additions & 16 deletions client/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type terminated interface {
Terminated() bool
}

// prevAllocWatcher allows AllocRunners to wait for a previous allocation to
// terminate and migrate its data whether or not the previous allocation is
// local or remote.
type prevAllocWatcher interface {
// Wait for previous alloc to terminate
Wait(context.Context) error
Expand All @@ -39,7 +42,7 @@ type prevAllocWatcher interface {
}

// newAllocWatcher creates a prevAllocWatcher appropriate for whether this
// allocs previous allocation was local or remote. If this alloc has no
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher {
if alloc.PreviousAllocation == "" {
Expand Down Expand Up @@ -78,15 +81,31 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer,
// localPrevAlloc is a prevAllocWatcher for previous allocations on the same
// node as an updated allocation.
type localPrevAlloc struct {
allocID string
// allocID is the ID of the alloc being blocked
allocID string

// prevAllocID is the ID of the alloc being replaced
prevAllocID string
tasks []*structs.Task

sticky bool
// tasks on the new alloc
tasks []*structs.Task

// sticky is true if data should be moved
sticky bool

// prevAllocDir is the alloc dir for the previous alloc
prevAllocDir *allocdir.AllocDir

// prevListener allows blocking for updates to the previous alloc
prevListener *cstructs.AllocListener
prevStatus terminated
prevWaitCh <-chan struct{}

// prevStatus allows checking if the previous alloc has already
// terminated (and therefore won't send updates to the listener)
prevStatus terminated

// prevWaitCh is closed when the previous alloc is GC'd which is a
// failsafe against blocking the new alloc forever
prevWaitCh <-chan struct{}

logger *log.Logger
}
Expand All @@ -103,18 +122,14 @@ func (p *localPrevAlloc) Wait(ctx context.Context) error {
// Block until previous alloc exits
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID)
for {
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX LOOP", p.allocID, p.prevAllocID)
select {
case prevAlloc := <-p.prevListener.Ch:
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX UPDATE %v", p.allocID, p.prevAllocID, prevAlloc.Terminated())
if prevAlloc.Terminated() {
case prevAlloc, ok := <-p.prevListener.Ch:
if !ok || prevAlloc.Terminated() {
return nil
}
case <-p.prevWaitCh:
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX CLOSED", p.allocID, p.prevAllocID)
return nil
case <-ctx.Done():
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX DONE", p.allocID, p.prevAllocID)
return ctx.Err()
}
}
Expand All @@ -140,13 +155,24 @@ func (p *localPrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) e
// remotePrevAlloc is a prevAllcWatcher for previous allocations on remote
// nodes as an updated allocation.
type remotePrevAlloc struct {
allocID string
// allocID is the ID of the alloc being blocked
allocID string

// prevAllocID is the ID of the alloc being replaced
prevAllocID string
tasks []*structs.Task

config *config.Config
// tasks on the new alloc
tasks []*structs.Task

// config for the Client to get AllocDir and Region
config *config.Config

// migrate is true if data should be moved between nodes
migrate bool
rpc rpcer

// rpc provides an RPC method for watching for updates to the previous
// alloc and determining what node it was on.
rpc rpcer

// nodeID is the node the previous alloc. Set by Wait() for use in
// Migrate() iff the previous alloc has not already been GC'd.
Expand All @@ -155,6 +181,7 @@ type remotePrevAlloc struct {
logger *log.Logger
}

// Wait until the remote previousl allocation has terminated.
func (p *remotePrevAlloc) Wait(ctx context.Context) error {
p.logger.Printf("[DEBUG] client: alloc %q waiting for remote previous alloc %q to terminate", p.allocID, p.prevAllocID)
req := structs.AllocSpecificRequest{
Expand Down
56 changes: 56 additions & 0 deletions client/alloc_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,66 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/mock"
)

// TestPrevAlloc_LocalPrevAlloc asserts that when a previous alloc runner is
// set a localPrevAlloc will block on it.
func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
_, prevAR := testAllocRunner(false)
prevAR.alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "10s"

newAlloc := mock.Alloc()
newAlloc.PreviousAllocation = prevAR.Alloc().ID
newAlloc.Job.TaskGroups[0].EphemeralDisk.Sticky = false
task := newAlloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "500ms"

waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger())

// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer cancel()
waiter.Wait(ctx)
}()

select {
case <-ctx.Done():
t.Fatalf("Wait exited too early")
case <-time.After(33 * time.Millisecond):
// Good! It's blocking
}

// Start the previous allocs to cause it to update but not terminate
go prevAR.Run()
defer prevAR.Destroy()

select {
case <-ctx.Done():
t.Fatalf("Wait exited too early")
case <-time.After(33 * time.Millisecond):
// Good! It's still blocking
}

// Stop the previous alloc
prevAR.Destroy()

select {
case <-ctx.Done():
// Good! We unblocked when the previous alloc stopped
case <-time.After(time.Second):
t.Fatalf("Wait exited too early")
}
}

// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir
// works.
func TestPrevAlloc_StreamAllocDir(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "")
Expand Down

0 comments on commit 4f7c5e6

Please sign in to comment.