From 9f97a8f70e8a728c5e856147bc519b9ab83e6417 Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Fri, 2 Feb 2018 15:21:00 -0800 Subject: [PATCH] grpc: Handle hotplugged devices by updating spec info When adding a host device to the container by using things like "docker run --device /dev/sda1:/dev/blk1", the runtime will not be able to anticipate the major/minor numbers for the device being hotplugged into the VM. This means the agent has to update the list of devices provided through the spec, by relying on the list of Storages. This commit allows the agent to analyze a list of mounts identified as devices by using the field Driver of the Storage structure. This way, it knows when it should try to find major/minor pair related to the devices listed in the spec. Then, it updates this list in the spec structure so that proper bind mounts between the VM and the container are performed. Fixes #132 Signed-off-by: Sebastien Boeuf --- device.go | 144 +++++++++++++++++++++++++++++++++++++ device_test.go | 33 +++++++++ grpc.go | 33 +++++++++ grpc_test.go | 134 ++++++++++++++++++++++++++++++++++ mount.go | 83 ++------------------- protocols/grpc/agent.proto | 2 +- 6 files changed, 350 insertions(+), 79 deletions(-) create mode 100644 device.go create mode 100644 device_test.go diff --git a/device.go b/device.go new file mode 100644 index 0000000000..1b2aac0b49 --- /dev/null +++ b/device.go @@ -0,0 +1,144 @@ +// +// Copyright (c) 2018 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +package main + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/kata-containers/agent/pkg/uevent" + pb "github.com/kata-containers/agent/protocols/grpc" + "github.com/sirupsen/logrus" +) + +type deviceDriversHandler func(device pb.Storage, spec *pb.Spec) error + +var deviceDriversHandlerList = map[string]deviceDriversHandler{ + "blk": blockDeviceHandler, +} + +func blockDeviceHandler(device pb.Storage, spec *pb.Spec) error { + if spec.Linux == nil || len(spec.Linux.Devices) == 0 { + return fmt.Errorf("No devices found from the spec, cannot update") + } + + // First need to make sure the expected device shows up properly, + // and then we need to retrieve its device info (such as major and + // minor numbers), useful to update the device provided + // through the OCI specification. + if err := waitForDevice(device.Source); err != nil { + return err + } + + stat := syscall.Stat_t{} + if err := syscall.Stat(device.Source, &stat); err != nil { + return err + } + + major := int64(stat.Rdev / 256) + minor := int64(stat.Rdev % 256) + + agentLog.WithFields(logrus.Fields{ + "device-path": device.Source, + "device-major": major, + "device-minor": minor, + }).Info("handling block device") + + // Update the spec + updated := false + for idx, d := range spec.Linux.Devices { + if d.Path == device.MountPoint { + agentLog.WithFields(logrus.Fields{ + "device-path": device.Source, + "host-device-major": spec.Linux.Devices[idx].Major, + "host-device-minor": spec.Linux.Devices[idx].Minor, + "guest-device-major": major, + "guest-device-minor": minor, + }).Info("updating block device major/minor into the spec") + spec.Linux.Devices[idx].Major = major + spec.Linux.Devices[idx].Minor = minor + updated = true + break + } + } + + if !updated { + return fmt.Errorf("Should have found a matching device %s in the spec", device.Source) + } + + return nil +} + +func waitForDevice(devicePath string) error { + deviceName := strings.TrimPrefix(devicePath, devPrefix) + + if _, err := os.Stat(devicePath); err == nil { + return nil + } + + uEvHandler, err := uevent.NewHandler() + if err != nil { + return err + } + defer uEvHandler.Close() + + fieldLogger := agentLog.WithField("device", deviceName) + + // Check if the device already exists. + if _, err := os.Stat(devicePath); err == nil { + fieldLogger.Info("Device already hotplugged, quit listening") + return nil + } + + fieldLogger.Info("Started listening for uevents for device hotplug") + + // Channel to signal when desired uevent has been received. + done := make(chan bool) + + go func() { + // This loop will be either ended if the hotplugged device is + // found by listening to the netlink socket, or it will end + // after the function returns and the uevent handler is closed. + for { + uEv, err := uEvHandler.Read() + if err != nil { + fieldLogger.Error(err) + continue + } + + fieldLogger = fieldLogger.WithFields(logrus.Fields{ + "uevent-action": uEv.Action, + "uevent-devpath": uEv.DevPath, + "uevent-subsystem": uEv.SubSystem, + "uevent-seqnum": uEv.SeqNum, + }) + + fieldLogger.Info("Got uevent") + + if uEv.Action == "add" && + filepath.Base(uEv.DevPath) == deviceName { + fieldLogger.Info("Hotplug event received") + break + } + } + + close(done) + }() + + select { + case <-done: + case <-time.After(time.Duration(timeoutHotplug) * time.Second): + return fmt.Errorf("Timeout reached after %ds waiting for device %s", + timeoutHotplug, deviceName) + } + + return nil +} diff --git a/device_test.go b/device_test.go new file mode 100644 index 0000000000..51808ebbea --- /dev/null +++ b/device_test.go @@ -0,0 +1,33 @@ +// +// Copyright (c) 2017 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +package main + +import ( + "testing" + + pb "github.com/kata-containers/agent/protocols/grpc" + "github.com/stretchr/testify/assert" +) + +func testblockDeviceHandlerFailure(t *testing.T, device pb.Storage, spec *pb.Spec) { + err := blockDeviceHandler(device, spec) + assert.NotNil(t, err, "blockDeviceHandler() should have failed") +} + +func TestBlockDeviceHandlerNilLinuxSpecSuccessful(t *testing.T) { + spec := &pb.Spec{} + + testblockDeviceHandlerFailure(t, pb.Storage{}, spec) +} + +func TestBlockDeviceHandlerEmptyLinuxDevicesSpecSuccessful(t *testing.T) { + spec := &pb.Spec{ + Linux: &pb.Linux{}, + } + + testblockDeviceHandlerFailure(t, pb.Storage{}, spec) +} diff --git a/grpc.go b/grpc.go index 3d192b8f80..97513ef8b5 100644 --- a/grpc.go +++ b/grpc.go @@ -320,6 +320,35 @@ func (a *agentGRPC) updateContainerConfig(spec *specs.Spec, config *configs.Conf return a.updateContainerConfigPrivileges(spec, config) } +func updateGRPCSpecDevices(mounts []*pb.Storage, spec *pb.Spec) error { + if spec.Linux == nil || len(spec.Linux.Devices) == 0 { + // No reason to go further in this case since we won't be + // able to modify any device from the spec. + return nil + } + + for _, mnt := range mounts { + if mnt == nil { + continue + } + + if mnt.Driver == "" { + continue + } + + devHandler, ok := deviceDriversHandlerList[mnt.Driver] + if !ok { + return fmt.Errorf("Unknown device driver %q", mnt.Driver) + } + + if err := devHandler(*mnt, spec); err != nil { + return err + } + } + + return nil +} + func (a *agentGRPC) CreateContainer(ctx context.Context, req *pb.CreateContainerRequest) (*gpb.Empty, error) { if a.sandbox.running == false { return emptyResp, fmt.Errorf("Sandbox not started, impossible to run a new container") @@ -340,6 +369,10 @@ func (a *agentGRPC) CreateContainer(ctx context.Context, req *pb.CreateContainer return emptyResp, err } + if err := updateGRPCSpecDevices(req.Storages, req.OCI); err != nil { + return emptyResp, err + } + // Convert the spec to an actual OCI specification structure. ociSpec, err := pb.GRPCtoOCI(req.OCI) if err != nil { diff --git a/grpc_test.go b/grpc_test.go index d63170ba17..d2c23b4b63 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -7,9 +7,11 @@ package main import ( + "fmt" "reflect" "testing" + pb "github.com/kata-containers/agent/protocols/grpc" "github.com/opencontainers/runc/libcontainer/configs" "github.com/opencontainers/runtime-spec/specs-go" "github.com/stretchr/testify/assert" @@ -102,3 +104,135 @@ func TestUpdateContainerConfigPrivilegesNoNewPrivileges(t *testing.T) { testUpdateContainerConfigPrivileges(t, spec, config, expectedConfig) } } + +func testUpdateGRPCSpecDevicesSuccessful(t *testing.T, mounts []*pb.Storage, spec *pb.Spec) { + err := updateGRPCSpecDevices(mounts, spec) + assert.Nil(t, err, "updateGRPCSpecDevices() failed: %v", err) +} + +func TestUpdateGRPCSpecDevicesNilLinuxSpecSuccessful(t *testing.T) { + var mounts []*pb.Storage + spec := &pb.Spec{} + + testUpdateGRPCSpecDevicesSuccessful(t, mounts, spec) +} + +func TestUpdateGRPCSpecDevicesEmptyLinuxDevicesSpecSuccessful(t *testing.T) { + var mounts []*pb.Storage + spec := &pb.Spec{ + Linux: &pb.Linux{}, + } + + testUpdateGRPCSpecDevicesSuccessful(t, mounts, spec) +} + +func TestUpdateGRPCSpecDevicesEmptyMountsSuccessful(t *testing.T) { + var mounts []*pb.Storage + spec := &pb.Spec{ + Linux: &pb.Linux{ + Devices: []pb.LinuxDevice{ + {}, + }, + }, + } + + testUpdateGRPCSpecDevicesSuccessful(t, mounts, spec) +} + +func TestUpdateGRPCSpecDevicesNilMountsSuccessful(t *testing.T) { + mounts := []*pb.Storage{ + nil, + } + + spec := &pb.Spec{ + Linux: &pb.Linux{ + Devices: []pb.LinuxDevice{ + {}, + }, + }, + } + + testUpdateGRPCSpecDevicesSuccessful(t, mounts, spec) +} + +func noopDeviceHandlerReturnNil(device pb.Storage, spec *pb.Spec) error { + return nil +} + +func noopDeviceHandlerReturnError(device pb.Storage, spec *pb.Spec) error { + return fmt.Errorf("Noop handler failure") +} + +func TestUpdateGRPCSpecDevicesNoopHandlerSuccessful(t *testing.T) { + noopHandlerTag := "noop" + deviceDriversHandlerList = map[string]deviceDriversHandler{ + noopHandlerTag: noopDeviceHandlerReturnNil, + } + + mounts := []*pb.Storage{ + { + Driver: noopHandlerTag, + }, + { + Driver: "", + }, + } + + spec := &pb.Spec{ + Linux: &pb.Linux{ + Devices: []pb.LinuxDevice{ + {}, + }, + }, + } + + testUpdateGRPCSpecDevicesSuccessful(t, mounts, spec) +} + +func testUpdateGRPCSpecDevicesFailure(t *testing.T, mounts []*pb.Storage, spec *pb.Spec) { + err := updateGRPCSpecDevices(mounts, spec) + assert.NotNil(t, err, "updateGRPCSpecDevices() should have failed") +} + +func TestUpdateGRPCSpecDevicesUnknownHandlerFailure(t *testing.T) { + deviceDriversHandlerList = map[string]deviceDriversHandler{} + + mounts := []*pb.Storage{ + { + Driver: "unknown", + }, + } + + spec := &pb.Spec{ + Linux: &pb.Linux{ + Devices: []pb.LinuxDevice{ + {}, + }, + }, + } + + testUpdateGRPCSpecDevicesFailure(t, mounts, spec) +} + +func TestUpdateGRPCSpecDevicesNoopHandlerFailure(t *testing.T) { + noopHandlerTag := "noop" + deviceDriversHandlerList = map[string]deviceDriversHandler{ + noopHandlerTag: noopDeviceHandlerReturnError, + } + + mounts := []*pb.Storage{ + { + Driver: noopHandlerTag, + }, + } + + spec := &pb.Spec{ + Linux: &pb.Linux{ + Devices: []pb.LinuxDevice{ + {}, + }, + }, + } + + testUpdateGRPCSpecDevicesFailure(t, mounts, spec) +} diff --git a/mount.go b/mount.go index 06a59eaaa1..696120908e 100644 --- a/mount.go +++ b/mount.go @@ -12,11 +12,8 @@ import ( "path/filepath" "strings" "syscall" - "time" - "github.com/kata-containers/agent/pkg/uevent" pb "github.com/kata-containers/agent/protocols/grpc" - "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) @@ -126,72 +123,6 @@ func ensureDestinationExists(source, destination string, fsType string) error { return nil } -func waitForDevice(devicePath string) error { - deviceName := strings.TrimPrefix(devicePath, devPrefix) - - if _, err := os.Stat(devicePath); err == nil { - return nil - } - - uEvHandler, err := uevent.NewHandler() - if err != nil { - return err - } - defer uEvHandler.Close() - - fieldLogger := agentLog.WithField("device", deviceName) - - // Check if the device already exists. - if _, err := os.Stat(devicePath); err == nil { - fieldLogger.Info("Device already hotplugged, quit listening") - return nil - } - - fieldLogger.Info("Started listening for uevents for device hotplug") - - // Channel to signal when desired uevent has been received. - done := make(chan bool) - - go func() { - // This loop will be either ended if the hotplugged device is - // found by listening to the netlink socket, or it will end - // after the function returns and the uevent handler is closed. - for { - uEv, err := uEvHandler.Read() - if err != nil { - fieldLogger.Error(err) - continue - } - - fieldLogger = fieldLogger.WithFields(logrus.Fields{ - "uevent-action": uEv.Action, - "uevent-devpath": uEv.DevPath, - "uevent-subsystem": uEv.SubSystem, - "uevent-seqnum": uEv.SeqNum, - }) - - fieldLogger.Info("Got uevent") - - if uEv.Action == "add" && - filepath.Base(uEv.DevPath) == deviceName { - fieldLogger.Info("Hotplug event received") - break - } - } - - close(done) - }() - - select { - case <-done: - case <-time.After(time.Duration(timeoutHotplug) * time.Second): - return fmt.Errorf("Timeout reached after %ds waiting for device %s", - timeoutHotplug, deviceName) - } - - return nil -} - func parseMountFlagsAndOptions(optionList []string) (int, string, error) { var ( flags int @@ -219,15 +150,11 @@ func addMounts(mounts []*pb.Storage) ([]string, error) { continue } - // Consider all other fs types as being hotpluggable, meaning - // we should wait for them to show up before trying to mount - // them. - if mnt.Fstype != "" && - mnt.Fstype != "bind" && - mnt.Fstype != type9pFs { - if err := waitForDevice(mnt.Source); err != nil { - return nil, err - } + // The field Driver is used to differentiate a device from + // a mount. Nothing needs to be done for a device in this + // function. + if mnt.Driver != "" { + continue } flags, options, err := parseMountFlagsAndOptions(mnt.Options) diff --git a/protocols/grpc/agent.proto b/protocols/grpc/agent.proto index 2b85ba7398..9aa2eb68bf 100644 --- a/protocols/grpc/agent.proto +++ b/protocols/grpc/agent.proto @@ -193,7 +193,7 @@ message OnlineCPUMemRequest { } message Storage { - string driver = 1; // empty in most cases. it will support "drbd", "bcache" ... + string driver = 1; // If driver field is not empty, we expect to do an operation related to a device. The type of device should be given through this field, triggering a specific function from the agent code. string source = 2; // "/dev/sdb", "/dev/disk/by-scsi/xxxx", "none",... string fstype = 3; // "xfs", "ext4" etc. for block dev, or "9p" for shared fs, or "tmpfs" for shared /dev/shm for all containers ... repeated string options = 4; // options for the storage device, might required by some special devices.