Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
grpc: Handle hotplugged devices by updating spec info
Browse files Browse the repository at this point in the history
When adding a host device to the container by using things like
"docker run --device /dev/sda1:/dev/blk1", the runtime will not
be able to anticipate the major/minor numbers for the device
being hotplugged into the VM. This means the agent has to update
the list of devices provided through the spec, by relying on the
list of Storages.

This commit allows the agent to analyze a list of mounts identified
as devices by using the field Driver of the Storage structure. This
way, it knows when it should try to find major/minor pair related
to the devices listed in the spec. Then, it updates this list in the
spec structure so that proper bind mounts between the VM and the
container are performed.

Fixes #132

Signed-off-by: Sebastien Boeuf <[email protected]>
  • Loading branch information
Sebastien Boeuf committed Feb 9, 2018
1 parent ca67bc9 commit 15e72eb
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 79 deletions.
133 changes: 133 additions & 0 deletions device.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//
// Copyright (c) 2018 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//

package main

import (
"fmt"
"os"
"path/filepath"
"strings"
"syscall"
"time"

"github.com/kata-containers/agent/pkg/uevent"
pb "github.com/kata-containers/agent/protocols/grpc"
"github.com/sirupsen/logrus"
)

type deviceDriversHandler func(device pb.Storage, spec *pb.Spec) error

var deviceDriversHandlerList = map[string]deviceDriversHandler{
"blk": blockDeviceHandler,
}

func blockDeviceHandler(device pb.Storage, spec *pb.Spec) error {
if spec.Linux == nil || len(spec.Linux.Devices) == 0 {
return fmt.Errorf("No devices found from the spec, cannot update")
}

// First need to make sure the expected device shows up properly,
// and then we need to retrieve its device info (such as major and
// minor numbers), useful to update the device provided
// through the OCI specification.
if err := waitForDevice(device.Source); err != nil {
return err
}

stat := syscall.Stat_t{}
if err := syscall.Stat(device.Source, &stat); err != nil {
return err
}

major := int64(stat.Rdev / 256)
minor := int64(stat.Rdev % 256)

agentLog.Infof("Device: %s, Major: %d, Minor: %d", device.Source, major, minor)

// Update the spec
updated := false
for idx, d := range spec.Linux.Devices {
if d.Path == device.MountPoint {
spec.Linux.Devices[idx].Major = major
spec.Linux.Devices[idx].Minor = minor
updated = true
break
}
}

if !updated {
return fmt.Errorf("Should have found a matching device in the spec")
}

return nil
}

func waitForDevice(devicePath string) error {
deviceName := strings.TrimPrefix(devicePath, devPrefix)

if _, err := os.Stat(devicePath); err == nil {
return nil
}

uEvHandler, err := uevent.NewHandler()
if err != nil {
return err
}
defer uEvHandler.Close()

fieldLogger := agentLog.WithField("device", deviceName)

// Check if the device already exists.
if _, err := os.Stat(devicePath); err == nil {
fieldLogger.Info("Device already hotplugged, quit listening")
return nil
}

fieldLogger.Info("Started listening for uevents for device hotplug")

// Channel to signal when desired uevent has been received.
done := make(chan bool)

go func() {
// This loop will be either ended if the hotplugged device is
// found by listening to the netlink socket, or it will end
// after the function returns and the uevent handler is closed.
for {
uEv, err := uEvHandler.Read()
if err != nil {
fieldLogger.Error(err)
continue
}

fieldLogger = fieldLogger.WithFields(logrus.Fields{
"uevent-action": uEv.Action,
"uevent-devpath": uEv.DevPath,
"uevent-subsystem": uEv.SubSystem,
"uevent-seqnum": uEv.SeqNum,
})

fieldLogger.Info("Got uevent")

if uEv.Action == "add" &&
filepath.Base(uEv.DevPath) == deviceName {
fieldLogger.Info("Hotplug event received")
break
}
}

close(done)
}()

select {
case <-done:
case <-time.After(time.Duration(timeoutHotplug) * time.Second):
return fmt.Errorf("Timeout reached after %ds waiting for device %s",
timeoutHotplug, deviceName)
}

return nil
}
33 changes: 33 additions & 0 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,35 @@ func (a *agentGRPC) updateContainerConfig(spec *specs.Spec, config *configs.Conf
return a.updateContainerConfigPrivileges(spec, config)
}

func updateGRPCSpecDevices(mounts []*pb.Storage, spec *pb.Spec) error {
if spec.Linux == nil || len(spec.Linux.Devices) == 0 {
// No reason to go further in this case since we won't be
// able to modify any device from the spec.
return nil
}

for _, mnt := range mounts {
if mnt == nil {
continue
}

if mnt.Driver == "" {
continue
}

devHandler, ok := deviceDriversHandlerList[mnt.Driver]
if !ok {
return fmt.Errorf("Unknown device driver %q", mnt.Driver)
}

if err := devHandler(*mnt, spec); err != nil {
return err
}
}

return nil
}

func (a *agentGRPC) CreateContainer(ctx context.Context, req *pb.CreateContainerRequest) (*gpb.Empty, error) {
if a.sandbox.running == false {
return emptyResp, fmt.Errorf("Sandbox not started, impossible to run a new container")
Expand All @@ -340,6 +369,10 @@ func (a *agentGRPC) CreateContainer(ctx context.Context, req *pb.CreateContainer
return emptyResp, err
}

if err := updateGRPCSpecDevices(req.Storages, req.OCI); err != nil {
return emptyResp, err
}

// Convert the spec to an actual OCI specification structure.
ociSpec, err := pb.GRPCtoOCI(req.OCI)
if err != nil {
Expand Down
83 changes: 5 additions & 78 deletions mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ import (
"path/filepath"
"strings"
"syscall"
"time"

"github.com/kata-containers/agent/pkg/uevent"
pb "github.com/kata-containers/agent/protocols/grpc"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -126,72 +123,6 @@ func ensureDestinationExists(source, destination string, fsType string) error {
return nil
}

func waitForDevice(devicePath string) error {
deviceName := strings.TrimPrefix(devicePath, devPrefix)

if _, err := os.Stat(devicePath); err == nil {
return nil
}

uEvHandler, err := uevent.NewHandler()
if err != nil {
return err
}
defer uEvHandler.Close()

fieldLogger := agentLog.WithField("device", deviceName)

// Check if the device already exists.
if _, err := os.Stat(devicePath); err == nil {
fieldLogger.Info("Device already hotplugged, quit listening")
return nil
}

fieldLogger.Info("Started listening for uevents for device hotplug")

// Channel to signal when desired uevent has been received.
done := make(chan bool)

go func() {
// This loop will be either ended if the hotplugged device is
// found by listening to the netlink socket, or it will end
// after the function returns and the uevent handler is closed.
for {
uEv, err := uEvHandler.Read()
if err != nil {
fieldLogger.Error(err)
continue
}

fieldLogger = fieldLogger.WithFields(logrus.Fields{
"uevent-action": uEv.Action,
"uevent-devpath": uEv.DevPath,
"uevent-subsystem": uEv.SubSystem,
"uevent-seqnum": uEv.SeqNum,
})

fieldLogger.Info("Got uevent")

if uEv.Action == "add" &&
filepath.Base(uEv.DevPath) == deviceName {
fieldLogger.Info("Hotplug event received")
break
}
}

close(done)
}()

select {
case <-done:
case <-time.After(time.Duration(timeoutHotplug) * time.Second):
return fmt.Errorf("Timeout reached after %ds waiting for device %s",
timeoutHotplug, deviceName)
}

return nil
}

func parseMountFlagsAndOptions(optionList []string) (int, string, error) {
var (
flags int
Expand Down Expand Up @@ -219,15 +150,11 @@ func addMounts(mounts []*pb.Storage) ([]string, error) {
continue
}

// Consider all other fs types as being hotpluggable, meaning
// we should wait for them to show up before trying to mount
// them.
if mnt.Fstype != "" &&
mnt.Fstype != "bind" &&
mnt.Fstype != type9pFs {
if err := waitForDevice(mnt.Source); err != nil {
return nil, err
}
// The field Driver is used to differentiate a device from
// a mount. Nothing needs to be done for a device in this
// function.
if mnt.Driver != "" {
continue
}

flags, options, err := parseMountFlagsAndOptions(mnt.Options)
Expand Down
2 changes: 1 addition & 1 deletion protocols/grpc/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ message OnlineCPUMemRequest {
}

message Storage {
string driver = 1; // empty in most cases. it will support "drbd", "bcache" ...
string driver = 1; // Empty driver field means that we expect to do a mount. If driver field is not empty, we expect to do an operation related to a device. The type of device should be given through this field, triggering a specific function from the agent code.
string source = 2; // "/dev/sdb", "/dev/disk/by-scsi/xxxx", "none",...
string fstype = 3; // "xfs", "ext4" etc. for block dev, or "9p" for shared fs, or "tmpfs" for shared /dev/shm for all containers ...
repeated string options = 4; // options for the storage device, might required by some special devices.
Expand Down

0 comments on commit 15e72eb

Please sign in to comment.