Skip to content

Commit

Permalink
Merge pull request #4359 from hashicorp/b-tests
Browse files Browse the repository at this point in the history
test fixes
  • Loading branch information
dadgar authored Jun 1, 2018
2 parents 69a94ba + 0e85d20 commit 8c88146
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 26 deletions.
1 change: 1 addition & 0 deletions client/driver/docker_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/client/testutil"
tu "github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
Expand Down
3 changes: 3 additions & 0 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
task2, _, _ := dockerTask(t)
task2.Config["image"] = "busybox:musl"
task2.Config["load"] = "busybox_musl.tar"
task2.Config["args"] = []string{"-l", "-p", "0"}

task3, _, _ := dockerTask(t)
task3.Config["image"] = "busybox:glibc"
Expand All @@ -773,6 +774,7 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
handles := make([]DriverHandle, len(taskList))

t.Logf("Starting %d tasks", len(taskList))
client := newTestDockerClient(t)

// Let's spin up a bunch of things
for idx, task := range taskList {
Expand All @@ -794,6 +796,7 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
continue
}
handles[idx] = resp.Handle
waitForExist(t, client, resp.Handle.(*DockerHandle))
}

t.Log("All tasks are started. Terminating...")
Expand Down
33 changes: 26 additions & 7 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
// tree for finding out the pids that the executor and it's child processes
// have forked
pidScanInterval = 5 * time.Second

// processOutputCloseTolerance is the length of time we will wait for the
// launched process to close its stdout/stderr before we force close it. If
// data is written after this tolerance, we will not capture it.
processOutputCloseTolerance = 2 * time.Second
)

var (
Expand Down Expand Up @@ -285,6 +290,11 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
if err := e.cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.cmd.Args, err)
}

// Close the files. This is copied from the os/exec package.
e.lro.processOutWriter.Close()
e.lre.processOutWriter.Close()

go e.collectPids()
go e.wait()
ic := e.resConCtx.getIsolationConfig()
Expand Down Expand Up @@ -832,9 +842,10 @@ func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) {
// log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
hasFinishedCopied chan struct{}
}

// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the
Expand All @@ -846,9 +857,10 @@ func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, err
}

wrap := &logRotatorWrapper{
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}, 1),
}
wrap.start()
return wrap, nil
Expand All @@ -860,13 +872,20 @@ func (l *logRotatorWrapper) start() {
go func() {
io.Copy(l.rotatorWriter, l.processOutReader)
l.processOutReader.Close() // in case io.Copy stopped due to write error
close(l.hasFinishedCopied)
}()
return
}

// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() error {
// Wait up to the close tolerance before we force close
select {
case <-l.hasFinishedCopied:
case <-time.After(processOutputCloseTolerance):
}
err := l.processOutReader.Close()
l.rotatorWriter.Close()
return l.processOutWriter.Close()
return err
}
7 changes: 5 additions & 2 deletions client/driver/executor/executor_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
if ps.Pid == 0 {
t.Fatalf("expected process to start and have non zero pid")
}
_, err = executor.Wait()
state, err := executor.Wait()
if err != nil {
t.Fatalf("error in waiting for command: %v", err)
}
if state.ExitCode != 0 {
t.Errorf("exited with non-zero code: %v", state.ExitCode)
}

// Check if the resource constraints were applied
memLimits := filepath.Join(ps.IsolationConfig.CgroupPaths["memory"], "memory.limit_in_bytes")
Expand Down Expand Up @@ -135,7 +138,7 @@ ld.so.conf.d/`

act := strings.TrimSpace(string(output))
if act != expected {
t.Fatalf("Command output incorrectly: want %v; got %v", expected, act)
t.Errorf("Command output incorrectly: want %v; got %v", expected, act)
}
}

Expand Down
4 changes: 2 additions & 2 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
if agentConfig.Server.RaftProtocol != 0 {
conf.RaftConfig.ProtocolVersion = raft.ProtocolVersion(agentConfig.Server.RaftProtocol)
}
if agentConfig.Server.NumSchedulers != 0 {
conf.NumSchedulers = agentConfig.Server.NumSchedulers
if agentConfig.Server.NumSchedulers != nil {
conf.NumSchedulers = *agentConfig.Server.NumSchedulers
}
if len(agentConfig.Server.EnabledSchedulers) != 0 {
// Convert to a set and require the core scheduler
Expand Down
6 changes: 5 additions & 1 deletion command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/golang/snappy"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -435,7 +436,10 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) {
// snapshotting a valid tar is not returned.
func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
httpTest(t, func(c *Config) {
// Disable the schedulers
c.Server.NumSchedulers = helper.IntToPtr(0)
}, func(s *TestAgent) {
// Create an alloc
state := s.server.State()
alloc := mock.Alloc()
Expand Down
6 changes: 3 additions & 3 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type ServerConfig struct {
// NumSchedulers is the number of scheduler thread that are run.
// This can be as many as one per core, or zero to disable this server
// from doing any scheduling work.
NumSchedulers int `mapstructure:"num_schedulers"`
NumSchedulers *int `mapstructure:"num_schedulers"`

// EnabledSchedulers controls the set of sub-schedulers that are
// enabled for this server to handle. This will restrict the evaluations
Expand Down Expand Up @@ -1009,8 +1009,8 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.RaftProtocol != 0 {
result.RaftProtocol = b.RaftProtocol
}
if b.NumSchedulers != 0 {
result.NumSchedulers = b.NumSchedulers
if b.NumSchedulers != nil {
result.NumSchedulers = helper.IntToPtr(*b.NumSchedulers)
}
if b.NodeGCThreshold != "" {
result.NodeGCThreshold = b.NodeGCThreshold
Expand Down
2 changes: 1 addition & 1 deletion command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestConfig_Parse(t *testing.T) {
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
NumSchedulers: 2,
NumSchedulers: helper.IntToPtr(2),
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
EvalGCThreshold: "12h",
Expand Down
4 changes: 2 additions & 2 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestConfig_Merge(t *testing.T) {
DataDir: "/tmp/data1",
ProtocolVersion: 1,
RaftProtocol: 1,
NumSchedulers: 1,
NumSchedulers: helper.IntToPtr(1),
NodeGCThreshold: "1h",
HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second,
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestConfig_Merge(t *testing.T) {
DataDir: "/tmp/data2",
ProtocolVersion: 2,
RaftProtocol: 2,
NumSchedulers: 2,
NumSchedulers: helper.IntToPtr(2),
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",
HeartbeatGrace: 2 * time.Minute,
Expand Down
22 changes: 14 additions & 8 deletions command/node_drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,23 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
out := outBuf.String()
t.Logf("Output:\n%s", out)

require.Contains(out, "marked all allocations for migration")
for _, a := range allocs {
if *a.Job.Type == "system" {
if strings.Contains(out, a.ID) {
t.Fatalf("output should not contain system alloc %q", a.ID)
// Unfortunately travis is too slow to reliably see the expected output. The
// monitor goroutines may start only after some or all the allocs have been
// migrated.
if !testutil.IsTravis() {
require.Contains(out, "marked all allocations for migration")
for _, a := range allocs {
if *a.Job.Type == "system" {
if strings.Contains(out, a.ID) {
t.Fatalf("output should not contain system alloc %q", a.ID)
}
continue
}
continue
require.Contains(out, fmt.Sprintf("Alloc %q marked for migration", a.ID))
require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID))
}
require.Contains(out, fmt.Sprintf("Alloc %q marked for migration", a.ID))
require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID))
}

expected := fmt.Sprintf("All allocations on node %q have stopped.\n", nodeID)
if !strings.HasSuffix(out, expected) {
t.Fatalf("expected output to end with:\n%s", expected)
Expand Down
6 changes: 6 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,12 @@ func TestFSM_SnapshotRestore_Deployments(t *testing.T) {
state := fsm.State()
d1 := mock.Deployment()
d2 := mock.Deployment()

j := mock.Job()
d1.JobID = j.ID
d2.JobID = j.ID

state.UpsertJob(999, j)
state.UpsertDeployment(1000, d1)
state.UpsertDeployment(1001, d2)

Expand Down

0 comments on commit 8c88146

Please sign in to comment.