Skip to content

Commit

Permalink
Merge pull request #5634 from hashicorp/f-nomad-exec-parts-03-executors
Browse files Browse the repository at this point in the history
nomad exec part 3: executor based drivers
  • Loading branch information
Mahmood Ali authored May 11, 2019
2 parents 152d642 + 7f76aed commit c25b247
Show file tree
Hide file tree
Showing 49 changed files with 1,710 additions and 78 deletions.
19 changes: 19 additions & 0 deletions drivers/exec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,22 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*
},
}, nil
}

var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {

if len(command) == 0 {
return fmt.Errorf("error cmd must have atleast one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}

return handle.exec.ExecStreaming(ctx, command, tty, stream)
}
30 changes: 30 additions & 0 deletions drivers/exec/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,33 @@ func TestExecDriver_StartWaitStop(t *testing.T) {

require.NoError(harness.DestroyTask(task.ID, true))
}

func TestExec_ExecTaskStreaming(t *testing.T) {
t.Parallel()
require := require.New(t)

d := NewExecDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
}

cleanup := harness.MkAllocDir(task, false)
defer cleanup()

tc := &TaskConfig{
Command: "/bin/sleep",
Args: []string{"9000"},
}
require.NoError(task.EncodeConcreteDriverConfig(&tc))

_, _, err := harness.StartTask(task)
require.NoError(err)
defer d.DestroyTask(task.ID, true)

dtestutil.ExecTaskStreamingConformanceTests(t, harness, task.ID)

}
19 changes: 19 additions & 0 deletions drivers/java/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,25 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*
}, nil
}

var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {

if len(command) == 0 {
return fmt.Errorf("error cmd must have atleast one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}

return handle.exec.ExecStreaming(ctx, command, tty, stream)
}

// GetAbsolutePath returns the absolute path of the passed binary by resolving
// it in the path and following symlinks.
func GetAbsolutePath(bin string) (string, error) {
Expand Down
29 changes: 29 additions & 0 deletions drivers/java/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,35 @@ func TestJavaCmdArgs(t *testing.T) {
}
}

func TestJavaDriver_ExecTaskStreaming(t *testing.T) {
javaCompatible(t)
if !testutil.IsCI() {
t.Parallel()
}

require := require.New(t)
d := NewDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

tc := &TaskConfig{
Class: "Hello",
Args: []string{"900"},
}
task := basicTask(t, "demo-app", tc)

cleanup := harness.MkAllocDir(task, true)
defer cleanup()

copyFile("./test-resources/Hello.class", filepath.Join(task.TaskDir().Dir, "Hello.class"), t)

_, _, err := harness.StartTask(task)
require.NoError(err)
defer d.DestroyTask(task.ID, true)

dtestutil.ExecTaskStreamingConformanceTests(t, harness, task.ID)

}
func basicTask(t *testing.T, name string, taskConfig *TaskConfig) *drivers.TaskConfig {
t.Helper()

Expand Down
19 changes: 19 additions & 0 deletions drivers/rawexec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,22 @@ func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*
},
}, nil
}

var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)

func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {

if len(command) == 0 {
return fmt.Errorf("error cmd must have at least one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}

return handle.exec.ExecStreaming(ctx, command, tty, stream)
}
34 changes: 34 additions & 0 deletions drivers/rawexec/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,37 @@ func TestRawExecDriver_StartWaitStop(t *testing.T) {

require.NoError(harness.DestroyTask(task.ID, true))
}

func TestRawExec_ExecTaskStreaming(t *testing.T) {
t.Parallel()
if runtime.GOOS == "darwin" {
t.Skip("skip running exec tasks on darwin as darwin has restrictions on starting tty shells")
}
require := require.New(t)

d := NewRawExecDriver(testlog.HCLogger(t))
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
}

cleanup := harness.MkAllocDir(task, false)
defer cleanup()

tc := &TaskConfig{
Command: testtask.Path(),
Args: []string{"sleep", "9000s"},
}
require.NoError(task.EncodeConcreteDriverConfig(&tc))
testtask.SetTaskConfigEnv(task)

_, _, err := harness.StartTask(task)
require.NoError(err)
defer d.DestroyTask(task.ID, true)

dtestutil.ExecTaskStreamingConformanceTests(t, harness, task.ID)

}
73 changes: 73 additions & 0 deletions drivers/shared/executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
hclog "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/helper/pluginutils/grpcutils"
"github.com/hashicorp/nomad/plugins/drivers"
dproto "github.com/hashicorp/nomad/plugins/drivers/proto"
)

var _ Executor = (*grpcExecutorClient)(nil)
Expand Down Expand Up @@ -181,3 +183,74 @@ func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string)

return resp.Output, int(resp.ExitCode), nil
}

func (d *grpcExecutorClient) ExecStreaming(ctx context.Context,
command []string,
tty bool,
execStream drivers.ExecTaskStream) error {

err := d.execStreaming(ctx, command, tty, execStream)
if err != nil {
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
return nil
}

func (d *grpcExecutorClient) execStreaming(ctx context.Context,
command []string,
tty bool,
execStream drivers.ExecTaskStream) error {

stream, err := d.client.ExecStreaming(ctx)
if err != nil {
return err
}

err = stream.Send(&dproto.ExecTaskStreamingRequest{
Setup: &dproto.ExecTaskStreamingRequest_Setup{
Command: command,
Tty: tty,
},
})
if err != nil {
return err
}

errCh := make(chan error, 1)
go func() {
for {
m, err := execStream.Recv()
if err == io.EOF {
return
} else if err != nil {
errCh <- err
return
}

if err := stream.Send(m); err != nil {
errCh <- err
return
}

}
}()

for {
select {
case err := <-errCh:
return err
default:
}

m, err := stream.Recv()
if err == io.EOF {
return nil
} else if err != nil {
return err
}

if err := execStream.Send(m); err != nil {
return err
}
}
}
Loading

0 comments on commit c25b247

Please sign in to comment.