Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable rkt driver to use address_mode = 'driver' #3256

Merged
merged 13 commits into from
Sep 26, 2017
Merged
236 changes: 172 additions & 64 deletions client/driver/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"os"
Expand All @@ -18,6 +17,9 @@ import (
"syscall"
"time"

appcschema "github.com/appc/spec/schema"
rktv1 "github.com/rkt/rkt/api/v1"

"github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/client/allocdir"
Expand Down Expand Up @@ -55,8 +57,8 @@ const (
// rktCmd is the command rkt is installed as.
rktCmd = "rkt"

// rktUuidDeadline is how long to wait for the uuid file to be written
rktUuidDeadline = 5 * time.Second
// rktNetworkDeadline is how long to wait for container network to start
rktNetworkDeadline = 5 * time.Second
)

// RktDriver is a driver for running images via Rkt
Expand Down Expand Up @@ -112,6 +114,72 @@ type rktPID struct {
MaxKillTimeout time.Duration
}

// Retrieve pod status for the pod with the given UUID.
func rktGetStatus(uuid string) (*rktv1.Pod, error) {
statusArgs := []string{
"status",
"--format=json",
uuid,
}
var outBuf, errBuf bytes.Buffer
cmd := exec.Command(rktCmd, statusArgs...)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ioutil.Discard for stderr or include a fixed size snippet in Run's error case.

if err := cmd.Run(); err != nil {
return nil, err
}
var status rktv1.Pod
if err := json.Unmarshal(outBuf.Bytes(), &status); err != nil {
return nil, err
}
return &status, nil
}

// Retrieves a pod manifest
func rktGetManifest(uuid string) (*appcschema.PodManifest, error) {
statusArgs := []string{
"cat-manifest",
uuid,
}
var outBuf, errBuf bytes.Buffer
cmd := exec.Command(rktCmd, statusArgs...)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ioutil.Discard for stderr or include a fixed size snippet in Run's error case.

if err := cmd.Run(); err != nil {
return nil, err
}
var manifest appcschema.PodManifest
if err := json.Unmarshal(outBuf.Bytes(), &manifest); err != nil {
return nil, err
}
return &manifest, nil
}

// Given a rkt/appc pod manifest and driver portmap configuration, create
// a driver portmap.
func rktManifestMakePortMap(manifest *appcschema.PodManifest, configPortMap map[string]string) (map[string]int, error) {
if len(manifest.Apps) == 0 {
return nil, fmt.Errorf("manifest has no apps")
}
if len(manifest.Apps) != 1 {
return nil, fmt.Errorf("manifest has multiple apps!")
}
app := manifest.Apps[0]
if app.App == nil {
return nil, fmt.Errorf("specified app has no App object")
}

portMap := make(map[string]int)
for svc, name := range configPortMap {
for _, port := range app.App.Ports {
if port.Name.String() == name {
portMap[svc] = int(port.Port)
}
}
}
return portMap, nil
}

// NewRktDriver is used to create a new exec driver
func NewRktDriver(ctx *DriverContext) Driver {
return &RktDriver{DriverContext: *ctx}
Expand Down Expand Up @@ -247,8 +315,8 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
// ACI image
img := driverConfig.ImageName

// Build the command.
cmdArgs := make([]string, 0, 50)
// Global arguments given to both prepare and run-prepared
globalArgs := make([]string, 0, 50)

// Add debug option to rkt command.
debug := driverConfig.Debug
Expand All @@ -274,43 +342,44 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
// if we have a selective insecure_options, prefer them
// insecure options are rkt's global argument, so we do this before the actual "run"
if len(driverConfig.InsecureOptions) > 0 {
cmdArgs = append(cmdArgs, fmt.Sprintf("--insecure-options=%s", strings.Join(driverConfig.InsecureOptions, ",")))
globalArgs = append(globalArgs, fmt.Sprintf("--insecure-options=%s", strings.Join(driverConfig.InsecureOptions, ",")))
} else if insecure {
cmdArgs = append(cmdArgs, "--insecure-options=all")
globalArgs = append(globalArgs, "--insecure-options=all")
}

// debug is rkt's global argument, so add it before the actual "run"
cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug))
globalArgs = append(globalArgs, fmt.Sprintf("--debug=%t", debug))

prepareArgs := make([]string, 0, 50)
runArgs := make([]string, 0, 50)

cmdArgs = append(cmdArgs, "run")
prepareArgs = append(prepareArgs, globalArgs...)
prepareArgs = append(prepareArgs, "prepare")
runArgs = append(runArgs, globalArgs...)
runArgs = append(runArgs, "run-prepared")

// disable overlayfs
if driverConfig.NoOverlay {
cmdArgs = append(cmdArgs, "--no-overlay=true")
prepareArgs = append(prepareArgs, "--no-overlay=true")
}

// Write the UUID out to a file in the state dir so we can read it back
// in and access the pod by UUID from other commands
uuidPath := filepath.Join(ctx.TaskDir.Dir, "rkt.uuid")
cmdArgs = append(cmdArgs, fmt.Sprintf("--uuid-file-save=%s", uuidPath))

// Convert underscores to dashes in task names for use in volume names #2358
sanitizedName := strings.Replace(task.Name, "_", "-", -1)

// Mount /alloc
allocVolName := fmt.Sprintf("%s-%s-alloc", d.DriverContext.allocID, sanitizedName)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, ctx.TaskDir.SharedAllocDir))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, allocdir.SharedAllocContainerPath))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, ctx.TaskDir.SharedAllocDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, allocdir.SharedAllocContainerPath))

// Mount /local
localVolName := fmt.Sprintf("%s-%s-local", d.DriverContext.allocID, sanitizedName)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, ctx.TaskDir.LocalDir))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, allocdir.TaskLocalContainerPath))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, ctx.TaskDir.LocalDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, allocdir.TaskLocalContainerPath))

// Mount /secrets
secretsVolName := fmt.Sprintf("%s-%s-secrets", d.DriverContext.allocID, sanitizedName)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, ctx.TaskDir.SecretsDir))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, allocdir.TaskSecretsContainerPath))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, ctx.TaskDir.SecretsDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, allocdir.TaskSecretsContainerPath))

// Mount arbitrary volumes if enabled
if len(driverConfig.Volumes) > 0 {
Expand All @@ -334,54 +403,55 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, fmt.Errorf("invalid rkt volume: %q", rawvol)
}
volName := fmt.Sprintf("%s-%s-%d", d.DriverContext.allocID, sanitizedName, i)
cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%s", volName, parts[0], readOnly))
cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1]))
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%s", volName, parts[0], readOnly))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1]))
}
}

cmdArgs = append(cmdArgs, img)

// Inject environment variables
for k, v := range ctx.TaskEnv.Map() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%s=%s", k, v))
prepareArgs = append(prepareArgs, fmt.Sprintf("--set-env=%s=%s", k, v))
}

// Image is set here, because the commands that follow apply to it
prepareArgs = append(prepareArgs, img)

// Check if the user has overridden the exec command.
if driverConfig.Command != "" {
cmdArgs = append(cmdArgs, fmt.Sprintf("--exec=%v", driverConfig.Command))
prepareArgs = append(prepareArgs, fmt.Sprintf("--exec=%v", driverConfig.Command))
}

// Add memory isolator
cmdArgs = append(cmdArgs, fmt.Sprintf("--memory=%vM", int64(task.Resources.MemoryMB)))
prepareArgs = append(prepareArgs, fmt.Sprintf("--memory=%vM", int64(task.Resources.MemoryMB)))

// Add CPU isolator
cmdArgs = append(cmdArgs, fmt.Sprintf("--cpu=%vm", int64(task.Resources.CPU)))
prepareArgs = append(prepareArgs, fmt.Sprintf("--cpu=%vm", int64(task.Resources.CPU)))

// Add DNS servers
if len(driverConfig.DNSServers) == 1 && (driverConfig.DNSServers[0] == "host" || driverConfig.DNSServers[0] == "none") {
// Special case single item lists with the special values "host" or "none"
cmdArgs = append(cmdArgs, fmt.Sprintf("--dns=%s", driverConfig.DNSServers[0]))
runArgs = append(runArgs, fmt.Sprintf("--dns=%s", driverConfig.DNSServers[0]))
} else {
for _, ip := range driverConfig.DNSServers {
if err := net.ParseIP(ip); err == nil {
msg := fmt.Errorf("invalid ip address for container dns server %q", ip)
d.logger.Printf("[DEBUG] driver.rkt: %v", msg)
return nil, msg
} else {
cmdArgs = append(cmdArgs, fmt.Sprintf("--dns=%s", ip))
runArgs = append(runArgs, fmt.Sprintf("--dns=%s", ip))
}
}
}

// set DNS search domains
for _, domain := range driverConfig.DNSSearchDomains {
cmdArgs = append(cmdArgs, fmt.Sprintf("--dns-search=%s", domain))
runArgs = append(runArgs, fmt.Sprintf("--dns-search=%s", domain))
}

// set network
network := strings.Join(driverConfig.Net, ",")
if network != "" {
cmdArgs = append(cmdArgs, fmt.Sprintf("--net=%s", network))
runArgs = append(runArgs, fmt.Sprintf("--net=%s", network))
}

// Setup port mapping and exposed ports
Expand All @@ -407,7 +477,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,

d.logger.Printf("[DEBUG] driver.rkt: exposed port %s", containerPort)
// Add port option to rkt run arguments. rkt allows multiple port args
cmdArgs = append(cmdArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
}

for _, port := range network.DynamicPorts {
Expand All @@ -425,7 +495,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,

d.logger.Printf("[DEBUG] driver.rkt: exposed port %s", containerPort)
// Add port option to rkt run arguments. rkt allows multiple port args
cmdArgs = append(cmdArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
}

}
Expand All @@ -436,11 +506,11 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,

// Need to start arguments with "--"
if len(parsed) > 0 {
cmdArgs = append(cmdArgs, "--")
prepareArgs = append(prepareArgs, "--")
}

for _, arg := range parsed {
cmdArgs = append(cmdArgs, fmt.Sprintf("%v", arg))
prepareArgs = append(prepareArgs, fmt.Sprintf("%v", arg))
}
}

Expand All @@ -455,6 +525,24 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, err
}

absPath, err := GetAbsolutePath(rktCmd)
if err != nil {
return nil, err
}

var outBuf, errBuf bytes.Buffer
cmd := exec.Command(rktCmd, prepareArgs...)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
d.logger.Printf("[DEBUG] driver.rkt: preparing pod %q for task %q with: %v", img, d.taskName, prepareArgs)
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("Error preparing rkt pod: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String())
}
uuid := strings.TrimSpace(outBuf.String())
d.logger.Printf("[DEBUG] driver.rkt: pod %q for task %q prepared, UUID is: %s", img, d.taskName, uuid)
runArgs = append(runArgs, uuid)

// The task's environment is set via --set-env flags above, but the rkt
// command itself needs an evironment with PATH set to find iptables.
eb := env.NewEmptyBuilder()
Expand All @@ -473,14 +561,9 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, fmt.Errorf("failed to set executor context: %v", err)
}

absPath, err := GetAbsolutePath(rktCmd)
if err != nil {
return nil, err
}

execCmd := &executor.ExecCommand{
Cmd: absPath,
Args: cmdArgs,
Args: runArgs,
User: task.User,
}
ps, err := execIntf.LaunchCmd(execCmd)
Expand All @@ -489,25 +572,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
return nil, err
}

// Wait for UUID file to get written
uuid := ""
deadline := time.Now().Add(rktUuidDeadline)
var lastErr error
for time.Now().Before(deadline) {
if uuidBytes, err := ioutil.ReadFile(uuidPath); err != nil {
lastErr = err
} else {
uuid = string(uuidBytes)
break
}
time.Sleep(400 * time.Millisecond)
}
if uuid == "" {
d.logger.Printf("[WARN] driver.rkt: reading uuid from %q failed; unable to run script checks for task %q. Last error: %v",
uuidPath, d.taskName, lastErr)
}

d.logger.Printf("[DEBUG] driver.rkt: started ACI %q (UUID: %s) for task %q with: %v", img, uuid, d.taskName, cmdArgs)
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q (UUID: %s) for task %q with: %v", img, uuid, d.taskName, runArgs)
maxKill := d.DriverContext.config.MaxKillTimeout
h := &rktHandle{
uuid: uuid,
Expand All @@ -523,8 +588,51 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse,
waitCh: make(chan *dstructs.WaitResult, 1),
}
go h.run()
//TODO Set Network
return &StartResponse{Handle: h}, nil

d.logger.Printf("[DEBUG] driver.rkt: retrieving status for pod %q (UUID %s) for task %q with", img, uuid, d.taskName)
deadline := time.Now().Add(rktNetworkDeadline)
var driverNetwork *cstructs.DriverNetwork
var notFirst bool
for driverNetwork == nil && time.Now().Before(deadline) {
if notFirst {
time.Sleep(400 * time.Millisecond)
}
notFirst = true

status, err := rktGetStatus(uuid)
if err != nil {
continue
}
for _, net := range status.Networks {
if !net.IP.IsGlobalUnicast() {
continue
}

// Get the pod manifest so we can figure out which ports are exposed
var portmap map[string]int
d.logger.Printf("[DEBUG] driver.rkt: grabbing manifest for pod %q (UUID %s) for task %q", img, uuid, d.taskName)
manifest, err := rktGetManifest(uuid)
if err == nil {
portmap, err = rktManifestMakePortMap(manifest, driverConfig.PortMap)
if err != nil {
d.logger.Printf("[DEBUG] driver.rkt: could not create manifest-based portmap for %q (UUID %s) for task %q: %v", img, uuid, d.taskName, err)
}
} else {
d.logger.Printf("[WARN] driver.rkt: could get not pod manifest for %q (UUID %s) for task %q: %v", img, uuid, d.taskName, err)
}

driverNetwork = &cstructs.DriverNetwork{
PortMap: portmap,
IP: status.Networks[0].IP.String(),
}
break
}
}
if driverNetwork == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Docker driver we return an error if driverConfig.PortMap is set but no networks could be found. I think the same behavior would be appropriate here.

https://github.com/hashicorp/nomad/blob/master/client/driver/docker.go#L1065-L1070

d.logger.Printf("[WARN] driver.rkt: network status retrieval for pod %q (UUID %s) for task %q failed", img, uuid, d.taskName)
}

return &StartResponse{Handle: h, Network: driverNetwork}, nil
}

func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil }
Expand Down
Loading