diff --git a/drivers/storage/s3fs/executor/s3fs_executor.go b/drivers/storage/s3fs/executor/s3fs_executor.go new file mode 100644 index 00000000..49381086 --- /dev/null +++ b/drivers/storage/s3fs/executor/s3fs_executor.go @@ -0,0 +1,224 @@ +// +build !libstorage_storage_executor libstorage_storage_executor_s3fs + +package executor + +import ( + "bufio" + "fmt" + "io" + "os" + "strings" + + gofig "github.com/akutz/gofig/types" + "github.com/akutz/goof" + + "github.com/codedellemc/libstorage/api/registry" + "github.com/codedellemc/libstorage/api/types" + + "github.com/codedellemc/libstorage/drivers/storage/s3fs" + "github.com/codedellemc/libstorage/drivers/storage/s3fs/utils" +) + +const ( + // Template for parsing mount info file (/proc/self/mountinfo) + mountinfoFormat = "%d %d %d:%d %s %s %s %s" +) + +// driver is the storage executor for the s3fs storage driver. +type driver struct { + name string + config gofig.Config + credFile string +} + +func init() { + registry.RegisterStorageExecutor(s3fs.Name, newDriver) +} + +func newDriver() types.StorageExecutor { + return &driver{name: s3fs.Name} +} + +func (d *driver) Init(ctx types.Context, config gofig.Config) error { + ctx.Info("s3fs_executor: Init") + d.config = config + if d.credFile = d.getCredFilePath(); d.credFile == "" { + return goof.New(fmt.Sprintf( + "%s mount driver requires %s option", + d.name, s3fs.ConfigS3FSCredFilePathKey)) + } + return nil +} + +func (d *driver) Name() string { + return d.name +} + +// Supported returns a flag indicating whether or not the platform +// implementing the executor is valid for the host on which the executor +// resides. +func (d *driver) Supported( + ctx types.Context, + opts types.Store) (types.LSXSupportedOp, error) { + + supportedOp := types.LSXSOpNone + var supp bool + var err error + if supp, err = utils.Supported(ctx); err != nil { + return supportedOp, err + } + if supp { + supportedOp = types.LSXSOpInstanceID | + types.LSXSOpLocalDevices | + types.LSXSOpMount + } + return supportedOp, nil +} + +// InstanceID +func (d *driver) InstanceID( + ctx types.Context, + opts types.Store) (*types.InstanceID, error) { + return utils.InstanceID(ctx) +} + +// NextDevice returns the next available device. +func (d *driver) NextDevice( + ctx types.Context, + opts types.Store) (string, error) { + return "", types.ErrNotImplemented +} + +// Return list of local devices +func (d *driver) LocalDevices( + ctx types.Context, + opts *types.LocalDevicesOpts) (*types.LocalDevices, error) { + + mtt, err := parseMountTable(ctx) + if err != nil { + return nil, err + } + + idmnt := make(map[string]string) + for _, mt := range mtt { + idmnt[mt.Source] = mt.MountPoint + } + + return &types.LocalDevices{ + Driver: d.name, + DeviceMap: idmnt, + }, nil +} + +// Mount mounts a device to a specified path. +func (d *driver) Mount( + ctx types.Context, + deviceName, mountPoint string, + opts *types.DeviceMountOpts) error { + + if !utils.IsS3FSURI(deviceName) { + return goof.WithField( + "device name", deviceName, + "Unsupported device name format") + } + bucket := utils.BucketFromURI(deviceName) + if mp, ok := utils.FindMountPoint(ctx, bucket); ok { + ctx.Debugf("DBG: bucket '%s' is already mounted to '%s'", + bucket, mp) + if mp == mountPoint { + // bucket is mounted to the required target => ok + return nil + } + // bucket is mounted to another target => error + return goof.WithFields(goof.Fields{ + "bucket": bucket, + "mount point": mp, + }, "bucket is already mounted") + } + return utils.Mount(ctx, d.credFile, bucket, mountPoint, opts) +} + +// Unmount unmounts the underlying device from the specified path. +func (d *driver) Unmount( + ctx types.Context, + mountPoint string, + opts types.Store) error { + + return types.ErrNotImplemented +} + +func (d *driver) getCredFilePath() string { + return d.config.GetString(s3fs.ConfigS3FSCredFilePathKey) +} + +func parseMountTable(ctx types.Context) ([]*types.MountInfo, error) { + f, err := os.Open("/proc/self/mountinfo") + if err != nil { + return nil, err + } + defer f.Close() + + return parseInfoFile(ctx, f) +} + +func parseInfoFile( + ctx types.Context, + r io.Reader) ([]*types.MountInfo, error) { + + var ( + s = bufio.NewScanner(r) + out = []*types.MountInfo{} + ) + + for s.Scan() { + if err := s.Err(); err != nil { + return nil, err + } + + var ( + p = &types.MountInfo{} + text = s.Text() + optionalFields string + ) + + if _, err := fmt.Sscanf(text, mountinfoFormat, + &p.ID, &p.Parent, &p.Major, &p.Minor, + &p.Root, &p.MountPoint, &p.Opts, + &optionalFields); err != nil { + + return nil, fmt.Errorf("Scanning '%s' failed: %s", + text, err) + } + // Safe as mountinfo encodes mountpoints with spaces as \040. + index := strings.Index(text, " - ") + postSeparatorFields := strings.Fields(text[index+3:]) + if len(postSeparatorFields) < 3 { + return nil, fmt.Errorf( + "Error found less than 3 fields post '-' in %q", + text) + } + + if optionalFields != "-" { + p.Optional = optionalFields + } + + p.FSType = postSeparatorFields[0] + p.Source = postSeparatorFields[1] + // s3fs doesnt provide mounted bucket, source is just 's3fs' + // it is workaround - find bucket by mount point + if strings.EqualFold(p.Source, s3fs.CmdName) { + patchMountInfo(ctx, p) + } + p.VFSOpts = strings.Join(postSeparatorFields[2:], " ") + out = append(out, p) + } + return out, nil +} + +func patchMountInfo(ctx types.Context, m *types.MountInfo) { + if m != nil && m.MountPoint != "" { + if bucket, ok := utils.FindBucket(ctx, m.MountPoint); ok { + m.Source = utils.BucketURI(bucket) + } + } +} diff --git a/drivers/storage/s3fs/s3.go b/drivers/storage/s3fs/s3.go new file mode 100644 index 00000000..6b33c837 --- /dev/null +++ b/drivers/storage/s3fs/s3.go @@ -0,0 +1,56 @@ +// +build !libstorage_storage_driver libstorage_storage_driver_s3fs + +package s3fs + +import ( + gofigCore "github.com/akutz/gofig" + gofig "github.com/akutz/gofig/types" +) + +const ( + // Name is the provider's name. + Name = "s3fs" + + // CmdName of the s3fs cmd utility + CmdName = "s3fs" + + // TagDelimiter separates tags from volume or snapshot names + TagDelimiter = "/" + + // BucketsKey is a name of config parameter with buckets list + BucketsKey = "buckets" + + // CredFilePathKey is a name of config parameter with cred file path + CredFilePathKey = "cred_file" + + // TagKey is a tag key + TagKey = "tag" +) + +const ( + // ConfigS3FS is a config key + ConfigS3FS = Name + + // ConfigS3FSBucketsKey is a key for available buckets list + ConfigS3FSBucketsKey = ConfigS3FS + "." + BucketsKey + + // ConfigS3FSCredFilePathKey is a key for cred file path + ConfigS3FSCredFilePathKey = ConfigS3FS + "." + CredFilePathKey + + // ConfigS3FSTagKey is a config key + ConfigS3FSTagKey = ConfigS3FS + "." + TagKey +) + +func init() { + r := gofigCore.NewRegistration("S3FS") + r.Key(gofig.String, "", "", + "List of buckets available as file systems", + ConfigS3FSBucketsKey) + r.Key(gofig.String, "", "", + "File path with S3 credentials in format ID:KEY", + ConfigS3FSCredFilePathKey) + r.Key(gofig.String, "", "", + "Tag prefix for S3FS naming", + ConfigS3FSTagKey) + gofigCore.Register(r) +} diff --git a/drivers/storage/s3fs/storage/s3fs_storage.go b/drivers/storage/s3fs/storage/s3fs_storage.go new file mode 100644 index 00000000..8caf3cd2 --- /dev/null +++ b/drivers/storage/s3fs/storage/s3fs_storage.go @@ -0,0 +1,384 @@ +// +build !libstorage_storage_driver libstorage_storage_driver_s3fs + +package storage + +import ( + "fmt" + "strings" + + gofig "github.com/akutz/gofig/types" + "github.com/akutz/goof" + + "github.com/codedellemc/libstorage/api/context" + "github.com/codedellemc/libstorage/api/registry" + "github.com/codedellemc/libstorage/api/types" + + "github.com/codedellemc/libstorage/drivers/storage/s3fs" + "github.com/codedellemc/libstorage/drivers/storage/s3fs/utils" +) + +type driver struct { + name string + config gofig.Config + buckets []string +} + +func init() { + registry.RegisterStorageDriver(s3fs.Name, newDriver) +} + +func newDriver() types.StorageDriver { + return &driver{name: s3fs.Name} +} + +func (d *driver) Name() string { + return d.name +} + +// Init initializes the driver. +func (d *driver) Init(context types.Context, config gofig.Config) error { + d.config = config + + // TODO: add options + d.buckets = d.getBuckets() + + context.Info(fmt.Sprintf( + "s3fs storage driver initialized: %s", d.buckets)) + return nil +} + +// NextDeviceInfo returns the information about the driver's next available +func (d *driver) NextDeviceInfo( + ctx types.Context) (*types.NextDeviceInfo, error) { + return nil, nil +} + +// Type returns the type of storage the driver provides. +func (d *driver) Type(ctx types.Context) (types.StorageType, error) { + return types.Object, nil + // return types.NAS, nil +} + +// InstanceInspect returns an instance. +func (d *driver) InstanceInspect( + ctx types.Context, + opts types.Store) (*types.Instance, error) { + + iid := context.MustInstanceID(ctx) + return &types.Instance{ + Name: iid.ID, + // Region: iid.Fields[s3fs.InstanceIDFieldRegion], + InstanceID: iid, + ProviderName: iid.Driver, + }, nil +} + +// Volumes returns all volumes or a filtered list of volumes. +func (d *driver) Volumes( + ctx types.Context, + opts *types.VolumesOpts) ([]*types.Volume, error) { + + // Convert retrieved volumes to libStorage types.Volume + vols, convErr := d.toTypesVolume(ctx, &d.buckets, opts.Attachments) + if convErr != nil { + return nil, goof.WithError( + "error converting to types.Volume", convErr) + } + + ctx.Debugf("DBG: volumes: %s", vols) + + return vols, nil +} + +// VolumeInspect inspects a single volume. +func (d *driver) VolumeInspect( + ctx types.Context, + volumeID string, + opts *types.VolumeInspectOpts) (*types.Volume, error) { + + return d.getVolume(ctx, volumeID, opts.Attachments) +} + +// VolumeCreate creates a new volume. +func (d *driver) VolumeCreate(ctx types.Context, volumeName string, + opts *types.VolumeCreateOpts) (*types.Volume, error) { + + if opts.Encrypted != nil && *opts.Encrypted { + return nil, types.ErrNotImplemented + } + + // TODO: bucket creation is not supported by s3fs, + // possible options to implement if needed (probably both could be + // implemented and behaviour could be switched via config): + // - implement bucket creation via S3 REST API + // - use 'backet:/path' semantic, where bucket should exist + // and '/path' is a volume description object, in that case + // all volumes would be in one bucket. + // WARN: This options doesn't work in s3fs on my environemnt!!! + // For now it just returns volume object if bucket is in available + // list from config + volume, err := d.getVolume(ctx, volumeName, types.VolAttNone) + if err != nil { + return nil, goof.WithError( + "Volume is not in the list of allowed ones in config", + err) + } + return volume, nil +} + +// VolumeCreateFromSnapshot creates a new volume from an existing snapshot. +func (d *driver) VolumeCreateFromSnapshot( + ctx types.Context, + snapshotID, volumeName string, + opts *types.VolumeCreateOpts) (*types.Volume, error) { + // TODO Snapshots are not implemented yet + return nil, types.ErrNotImplemented +} + +// VolumeCopy copies an existing volume. +func (d *driver) VolumeCopy( + ctx types.Context, + volumeID, volumeName string, + opts types.Store) (*types.Volume, error) { + // TODO Snapshots are not implemented yet + return nil, types.ErrNotImplemented +} + +// VolumeSnapshot snapshots a volume. +func (d *driver) VolumeSnapshot( + ctx types.Context, + volumeID, snapshotName string, + opts types.Store) (*types.Snapshot, error) { + // TODO Snapshots are not implemented yet + return nil, types.ErrNotImplemented +} + +// VolumeRemove removes a volume. +func (d *driver) VolumeRemove( + ctx types.Context, + volumeID string, + opts types.Store) error { + + fields := map[string]interface{}{ + "provider": d.Name(), + "volumeID": volumeID, + } + _, err := d.getVolume(ctx, volumeID, types.VolAttNone) + if err != nil { + return goof.WithFields(fields, "volume does not exist") + } + + // TODO: see comment in VolumeCreate, + // For now there is nothing to do + return nil +} + +// VolumeAttach attaches a volume and provides a token clients can use +// to validate that device has appeared locally. +func (d *driver) VolumeAttach( + ctx types.Context, + volumeID string, + opts *types.VolumeAttachOpts) (*types.Volume, string, error) { + + fields := map[string]interface{}{ + "provider": d.Name(), + "volumeID": volumeID, + } + + volume, err := d.getVolume(ctx, volumeID, + types.VolumeAttachmentsRequested) + if err != nil { + return nil, "", goof.WithFieldsE(fields, + "failed to get volume for attach", err) + } + + // Nothing to do for attach + return volume, "", nil +} + +// VolumeDetach detaches a volume. +func (d *driver) VolumeDetach( + ctx types.Context, + volumeID string, + opts *types.VolumeDetachOpts) (*types.Volume, error) { + + fields := map[string]interface{}{ + "provider": d.Name(), + "volumeID": volumeID, + } + + volume, err := d.getVolume(ctx, volumeID, + types.VolumeAttachmentsRequested) + if err != nil { + return nil, goof.WithFieldsE( + fields, "failed to get volume", err) + } + + // Nothing to do for detach + return volume, nil +} + +// Snapshots returns all volumes or a filtered list of snapshots. +func (d *driver) Snapshots( + ctx types.Context, + opts types.Store) ([]*types.Snapshot, error) { + // TODO Snapshots are not implemented yet + return nil, types.ErrNotImplemented +} + +// SnapshotInspect inspects a single snapshot. +func (d *driver) SnapshotInspect( + ctx types.Context, + snapshotID string, + opts types.Store) (*types.Snapshot, error) { + // TODO Snapshots are not implemented yet + return nil, types.ErrNotImplemented +} + +// SnapshotCopy copies an existing snapshot. +func (d *driver) SnapshotCopy( + ctx types.Context, + snapshotID, snapshotName, destinationID string, + opts types.Store) (*types.Snapshot, error) { + // TODO Snapshots are not implemented yet + return nil, types.ErrNotImplemented +} + +// SnapshotRemove removes a snapshot. +func (d *driver) SnapshotRemove( + ctx types.Context, + snapshotID string, + opts types.Store) error { + // TODO Snapshots are not implemented yet + return types.ErrNotImplemented +} + +// Retrieve config arguments +func (d *driver) getCredFilePath() string { + return d.config.GetString(s3fs.ConfigS3FSCredFilePathKey) +} + +func (d *driver) getBuckets() []string { + result := d.config.GetString(s3fs.ConfigS3FSBucketsKey) + return strings.Split(result, ",") +} + +func (d *driver) getTag() string { + return d.config.GetString(s3fs.ConfigS3FSTagKey) +} + +var errGetLocDevs = goof.New("error getting local devices from context") + +func (d *driver) toTypesVolume( + ctx types.Context, + buckets *[]string, + attachments types.VolumeAttachmentsTypes) ([]*types.Volume, error) { + + var volumesSD []*types.Volume + for _, bucket := range *buckets { + volumeSD, err := d.toTypeVolume(ctx, bucket, attachments) + if err != nil { + return nil, goof.WithError( + "Failed to convert volume", err) + } else if volumeSD != nil { + volumesSD = append(volumesSD, volumeSD) + } + } + return volumesSD, nil +} + +const ( + psOutputFormat = "%s %s %s %s %s" +) + +// TODO: +// - it should be run on client side... +// so it is needed to do something with it.. +// - it is code duplication with os driver part +// probably it useless here nad could be just removed +func (d *driver) getMountedBuckets(ctx types.Context) ([]string, error) { + + var bucketsMap map[string]string + var err error + if bucketsMap, err = utils.MountedBuckets(ctx); err != nil { + return nil, err + } + var buckets []string + for b := range bucketsMap { + buckets = append(buckets, b) + } + ctx.Debug("DBG: mounted buckets: %s", buckets) + return buckets, nil +} + +func find(array []string, element string) bool { + for _, item := range array { + if item == element { + return true + } + } + return false +} + +func (d *driver) toTypeVolume( + ctx types.Context, + bucket string, + attachments types.VolumeAttachmentsTypes) (*types.Volume, error) { + + var buckets []string + var err error + if buckets, err = d.getMountedBuckets(ctx); err != nil { + return nil, goof.WithError(fmt.Sprintf( + "Failed to convert bucket '%s' to volume", bucket), + err) + } + + attachmentStatus := "Exported and Unmounted" + if find(buckets, bucket) { + attachmentStatus = "Exported and Mounted" + } + var attachmentsSD []*types.VolumeAttachment + if attachments.Requested() { + id, ok := context.InstanceID(ctx) + if !ok || id == nil { + return nil, goof.New("Can't get instance ID to filter volume") + } + attachmentsSD = append( + attachmentsSD, + &types.VolumeAttachment{ + InstanceID: id, + VolumeID: bucket, + DeviceName: utils.BucketURI(bucket), + Status: attachmentStatus, + }) + } + + volumeSD := &types.Volume{ + Name: bucket, + ID: bucket, + Attachments: attachmentsSD, + // TODO: + //AvailabilityZone: *volume.AvailabilityZone, + //Encrypted: *volume.Encrypted, + } + + // Some volume types have no IOPS, so we get nil in volume.Iops + //if volume.Iops != nil { + // volumeSD.IOPS = *volume.Iops + //} + + return volumeSD, nil +} + +func (d *driver) getVolume( + ctx types.Context, + volumeID string, + attachments types.VolumeAttachmentsTypes) (*types.Volume, error) { + + for _, bucket := range d.buckets { + if bucket == volumeID { + return d.toTypeVolume(ctx, bucket, attachments) + } + } + return nil, fmt.Errorf("Error to get volume %s", volumeID) +} diff --git a/drivers/storage/s3fs/tests/coverage.mk b/drivers/storage/s3fs/tests/coverage.mk new file mode 100644 index 00000000..4899aa1f --- /dev/null +++ b/drivers/storage/s3fs/tests/coverage.mk @@ -0,0 +1,2 @@ +S3FS_COVERPKG := $(ROOT_IMPORT_PATH)/drivers/storage/s3fs +TEST_COVERPKG_./drivers/storage/s3fs/tests := $(S3FS_COVERPKG),$(S3FS_COVERPKG)/executor diff --git a/drivers/storage/s3fs/tests/s3fs_test.go b/drivers/storage/s3fs/tests/s3fs_test.go new file mode 100644 index 00000000..aca642e0 --- /dev/null +++ b/drivers/storage/s3fs/tests/s3fs_test.go @@ -0,0 +1,456 @@ +// +build !libstorage_storage_driver libstorage_storage_driver_s3fs + +package s3fs + +import ( + "os" + "strconv" + "testing" + + log "github.com/Sirupsen/logrus" + gofig "github.com/akutz/gofig/types" + + "github.com/stretchr/testify/assert" + + "github.com/codedellemc/libstorage/api/context" + "github.com/codedellemc/libstorage/api/registry" + "github.com/codedellemc/libstorage/api/server" + apitests "github.com/codedellemc/libstorage/api/tests" + "github.com/codedellemc/libstorage/api/types" + "github.com/codedellemc/libstorage/api/utils" + "github.com/codedellemc/libstorage/drivers/storage/s3fs" + s3fsUtil "github.com/codedellemc/libstorage/drivers/storage/s3fs/utils" +) + +const ( + driverName = s3fs.Name + testCredFile = "/home/test/cred_file" +) + +// Put contents of sample config.yml here +var ( + configYAML = []byte(` + s3fs: + buckets: "vol1,vol2" + cred_file: "/home/test/cred_file" +`) +) + +var volumeName string +var volumeName2 string +var volumeName3 string + +type CleanupIface interface { + cleanup(key string) +} + +type CleanupObjectContextT struct { + objects map[string]CleanupIface +} + +var cleanupObjectContext CleanupObjectContextT + +func (ctx *CleanupObjectContextT) remove(key string) { + delete(ctx.objects, key) +} + +func (ctx *CleanupObjectContextT) cleanup() { + for key, value := range ctx.objects { + value.cleanup(key) + delete(ctx.objects, key) + } +} + +type CleanupVolume struct { + vol *types.Volume + client types.Client +} + +func (ctx *CleanupObjectContextT) add( + key string, + vol *types.Volume, + client types.Client) { + + cobj := &CleanupVolume{vol: vol, client: client} + ctx.objects[key] = cobj +} + +func (cvol *CleanupVolume) cleanup(key string) { + cvol.client.API().VolumeDetach(nil, driverName, cvol.vol.Name, + &types.VolumeDetachRequest{Force: true}) + cvol.client.API().VolumeRemove(nil, driverName, cvol.vol.Name) +} + +// Check environment vars to see whether or not to run this test +func skipTests() bool { + travis, _ := strconv.ParseBool(os.Getenv("TRAVIS")) + noTestS3FS, _ := strconv.ParseBool(os.Getenv("TEST_SKIP_S3FS")) + return travis || noTestS3FS +} + +// Set volume names to first part of UUID before the - +func init() { + volumeName = "vol1" + volumeName2 = "vol2" + volumeName3 = "vole3" + cleanupObjectContext = CleanupObjectContextT{ + objects: make(map[string]CleanupIface), + } +} + +func TestMain(m *testing.M) { + server.CloseOnAbort() + ec := m.Run() + os.Exit(ec) +} + +/////////////////////////////////////////////////////////////////////// +///////// PUBLIC TESTS ///////// +/////////////////////////////////////////////////////////////////////// +func TestConfig(t *testing.T) { + if skipTests() { + t.SkipNow() + } + tf := func(config gofig.Config, client types.Client, t *testing.T) { + f := config.GetString("s3fs.cred_file") + assert.NotEqual(t, f, "") + assert.Equal(t, f, testCredFile) + } + apitests.Run(t, driverName, configYAML, tf) + cleanupObjectContext.cleanup() +} + +// Check if InstanceID is properly returned by executor +// and InstanceID.ID is filled out by InstanceInspect +func TestInstanceID(t *testing.T) { + if skipTests() { + t.SkipNow() + } + + // create storage driver + sd, err := registry.NewStorageDriver(driverName) + if err != nil { + t.Fatal(err) + } + + // initialize storage driver + ctx := context.Background() + if err := sd.Init(ctx, registry.NewConfig()); err != nil { + t.Fatal(err) + } + // Get Instance ID from executor + iid, err := s3fsUtil.InstanceID(ctx) + assert.NoError(t, err) + if err != nil { + t.Fatal(err) + } + + // Fill in Instance ID's ID field with InstanceInspect + ctx = ctx.WithValue(context.InstanceIDKey, iid) + i, err := sd.InstanceInspect(ctx, utils.NewStore()) + if err != nil { + t.Fatal(err) + } + + iid = i.InstanceID + + // test resulting InstanceID + apitests.Run( + t, driverName, configYAML, + (&apitests.InstanceIDTest{ + Driver: driverName, + Expected: iid, + }).Test) + cleanupObjectContext.cleanup() +} + +// Test if Services are configured and returned properly from the client +func TestServices(t *testing.T) { + if skipTests() { + t.SkipNow() + } + + tf := func(config gofig.Config, client types.Client, t *testing.T) { + reply, err := client.API().Services(nil) + assert.NoError(t, err) + assert.Equal(t, len(reply), 1) + + _, ok := reply[driverName] + assert.True(t, ok) + } + apitests.Run(t, driverName, configYAML, tf) + cleanupObjectContext.cleanup() +} + +// Test volume functionality from storage driver +func TestVolumeCreateRemove(t *testing.T) { + if skipTests() { + t.SkipNow() + } + + tf := func(config gofig.Config, client types.Client, t *testing.T) { + vol := volumeCreate(t, client, volumeName) + volumeRemove(t, client, vol.ID) + } + apitests.Run(t, driverName, configYAML, tf) + cleanupObjectContext.cleanup() +} + +// Test volume functionality from storage driver +func TestVolumes(t *testing.T) { + if skipTests() { + t.SkipNow() + } + + tf := func(config gofig.Config, client types.Client, t *testing.T) { + _ = volumeCreate(t, client, volumeName) + _ = volumeCreate(t, client, volumeName2) + + vol1 := volumeByName(t, client, volumeName) + vol2 := volumeByName(t, client, volumeName2) + + volumeRemove(t, client, vol1.ID) + volumeRemove(t, client, vol2.ID) + } + apitests.Run(t, driverName, configYAML, tf) + cleanupObjectContext.cleanup() +} + +// Test volume functionality from storage driver +func TestVolumeAttachDetach(t *testing.T) { + if skipTests() { + t.SkipNow() + } + var vol *types.Volume + tf := func(config gofig.Config, client types.Client, t *testing.T) { + vol = volumeCreate(t, client, volumeName) + _ = volumeAttach(t, client, vol.ID) + _ = volumeInspectAttached(t, client, vol.ID) + _ = volumeInspectAttachedToMyInstance(t, client, vol.ID) + } + tf2 := func(config gofig.Config, client types.Client, t *testing.T) { + _ = volumeInspectAttachedToMyInstanceWithForeignInstance(t, + client, vol.ID) + } + tf3 := func(config gofig.Config, client types.Client, t *testing.T) { + _ = volumeDetach(t, client, vol.ID) + _ = volumeInspectDetached(t, client, vol.ID) + volumeRemove(t, client, vol.ID) + } + apitests.Run(t, driverName, configYAML, tf) + apitests.RunWithClientType(t, types.ControllerClient, driverName, + configYAML, tf2) + apitests.Run(t, driverName, configYAML, tf3) + cleanupObjectContext.cleanup() +} + +/////////////////////////////////////////////////////////////////////// +///////// PRIVATE TESTS FOR VOLUME FUNCTIONALITY ///////// +/////////////////////////////////////////////////////////////////////// +// Test volume creation specifying size and volume name +func volumeCreate( + t *testing.T, client types.Client, volumeName string) *types.Volume { + log.WithField("volumeName", volumeName).Info("creating volume") + // Prepare request for storage driver call to create volume + // s3fs doesnt provide size + // size := int64(1) + + opts := map[string]interface{}{ + "priority": 2, + "owner": "root@example.com", + } + volumeCreateRequest := &types.VolumeCreateRequest{ + Name: volumeName, + // Size: &size, + Opts: opts, + } + // Send request and retrieve created libStorage types.Volume + vol, err := client.API().VolumeCreate(nil, driverName, + volumeCreateRequest) + assert.NoError(t, err) + if err != nil { + t.FailNow() + t.Error("failed volumeCreate") + } + apitests.LogAsJSON(vol, t) + // Add obj to automated cleanup in case of errors + cleanupObjectContext.add(vol.ID, vol, client) + // Check volume options + assert.Equal(t, volumeName, vol.Name) + // assert.Equal(t, size, vol.Size) + return vol +} + +// Test volume retrieval by volume name using Volumes, which retrieves all volumes +// from the storage driver without filtering, and filters the volumes externally. +func volumeByName( + t *testing.T, client types.Client, volumeName string) *types.Volume { + log.WithField("volumeName", volumeName).Info("get volume by s3fs.Name") + // Retrieve all volumes + vols, err := client.API().Volumes(nil, 0) + assert.NoError(t, err) + if err != nil { + t.FailNow() + } + // Filter volumes to those under the service, + // and find a volume matching inputted volume name + assert.Contains(t, vols, driverName) + for _, vol := range vols[driverName] { + if vol.Name == volumeName { + return vol + } + } + // No matching volumes found + t.FailNow() + t.Error("failed volumeByName") + return nil +} + +// Test volume removal by volume ID +func volumeRemove(t *testing.T, client types.Client, volumeID string) { + log.WithField("volumeID", volumeID).Info("removing volume") + err := client.API().VolumeRemove( + nil, driverName, volumeID) + assert.NoError(t, err) + if err != nil { + t.Error("failed volumeRemove") + t.FailNow() + } + cleanupObjectContext.remove(volumeID) +} + +// Test volume attachment by volume ID +func volumeAttach( + t *testing.T, client types.Client, volumeID string) *types.Volume { + log.WithField("volumeID", volumeID).Info("attaching volume") + // Get next device name from executor + nextDevice, err := client.Executor().NextDevice( + context.Background().WithValue(context.ServiceKey, driverName), + utils.NewStore()) + assert.NoError(t, err) + if err != nil { + t.Error("error getting next device name from executor") + t.FailNow() + } + reply, token, err := client.API().VolumeAttach( + nil, driverName, volumeID, &types.VolumeAttachRequest{ + NextDeviceName: &nextDevice, + }) + assert.NoError(t, err) + if err != nil { + t.Error("failed volumeAttach") + t.FailNow() + } + apitests.LogAsJSON(reply, t) + assert.Equal(t, token, "") + return reply +} + +// Test volume retrieval by volume ID using VolumeInspect, which directly +// retrieves matching volumes from the storage driver. Contrast with +// volumeByID, which uses Volumes to retrieve all volumes from the storage +// driver without filtering, and filters the volumes externally. +func volumeInspect( + t *testing.T, client types.Client, volumeID string) *types.Volume { + log.WithField("volumeID", volumeID).Info("inspecting volume") + reply, err := client.API().VolumeInspect(nil, driverName, volumeID, 0) + assert.NoError(t, err) + if err != nil { + t.Error("failed volumeInspect") + t.FailNow() + } + apitests.LogAsJSON(reply, t) + return reply +} + +// Test if volume is attached, its Attachments field should be populated +func volumeInspectAttached( + t *testing.T, client types.Client, volumeID string) *types.Volume { + log.WithField("volumeID", volumeID).Info("inspecting volume") + reply, err := client.API().VolumeInspect( + nil, driverName, volumeID, + types.VolAttReq) + assert.NoError(t, err) + if err != nil { + t.Error("failed volumeInspectAttached") + t.FailNow() + } + apitests.LogAsJSON(reply, t) + assert.Len(t, reply.Attachments, 1) + return reply +} + +// Test if volume is attached to specified instance +func volumeInspectAttachedToMyInstance( + t *testing.T, client types.Client, volumeID string) *types.Volume { + log.WithField("volumeID", volumeID).Info( + "inspecting volume attached to my instance") + reply, err := client.API().VolumeInspect(nil, driverName, volumeID, + types.VolAttReqForInstance) + assert.NoError(t, err) + if err != nil { + t.Error("failed volumeInspectAttached") + t.FailNow() + } + apitests.LogAsJSON(reply, t) + assert.Len(t, reply.Attachments, 1) + return reply +} + +// Test if volume is attached to my instance with foreign instance in filter +func volumeInspectAttachedToMyInstanceWithForeignInstance( + t *testing.T, client types.Client, volumeID string) *types.Volume { + ctx := context.Background() + iidm := types.InstanceIDMap{ + driverName: &types.InstanceID{ID: "none", Driver: driverName}} + ctx = ctx.WithValue(context.AllInstanceIDsKey, iidm) + log.WithField("volumeID", volumeID).Info( + "inspecting volume attached to my instance with foreign id") + reply, err := client.API().VolumeInspect( + ctx, driverName, volumeID, + types.VolAttReqForInstance) + assert.NoError(t, err) + if err != nil { + t.Error("failed volumeInspectAttached") + t.FailNow() + } + apitests.LogAsJSON(reply, t) + // s3fs doesn't filter by 'Mine' instanceID + assert.Len(t, reply.Attachments, 1) + return reply +} + +// Test if volume is detached, its Attachments field should not be populated +func volumeInspectDetached( + t *testing.T, client types.Client, volumeID string) *types.Volume { + log.WithField("volumeID", volumeID).Info("inspecting volume") + reply, err := client.API().VolumeInspect( + nil, driverName, volumeID, + types.VolAttNone) + assert.NoError(t, err) + + if err != nil { + t.Error("failed volumeInspectDetached") + t.FailNow() + } + apitests.LogAsJSON(reply, t) + assert.Len(t, reply.Attachments, 0) + apitests.LogAsJSON(reply, t) + return reply +} + +// Test detaching volume by volume ID +func volumeDetach( + t *testing.T, client types.Client, volumeID string) *types.Volume { + log.WithField("volumeID", volumeID).Info("detaching volume") + reply, err := client.API().VolumeDetach( + nil, driverName, volumeID, &types.VolumeDetachRequest{}) + assert.NoError(t, err) + if err != nil { + t.Error("failed volumeDetach") + t.FailNow() + } + apitests.LogAsJSON(reply, t) + assert.Len(t, reply.Attachments, 1) + return reply +} diff --git a/drivers/storage/s3fs/utils/utils.go b/drivers/storage/s3fs/utils/utils.go new file mode 100644 index 00000000..800a90da --- /dev/null +++ b/drivers/storage/s3fs/utils/utils.go @@ -0,0 +1,133 @@ +// +build !libstorage_storage_driver libstorage_storage_driver_s3fs + +package utils + +import ( + "fmt" + "os" + "os/exec" + "strings" + + "github.com/akutz/goof" + "github.com/akutz/gotil" + + "github.com/codedellemc/libstorage/api/types" + "github.com/codedellemc/libstorage/drivers/storage/s3fs" +) + +// Supported returns eiter current platform supports s3fs or not +func Supported(ctx types.Context) (bool, error) { + return gotil.FileExistsInPath(s3fs.CmdName), nil +} + +// InstanceID returns the instance ID for the local host. +func InstanceID(ctx types.Context) (*types.InstanceID, error) { + hostname := os.Getenv("S3FS_INSTANCE_ID") + if hostname == "" { + var err error + hostname, err = os.Hostname() + if err != nil { + return nil, err + } + } else { + ctx.Info("Use InstanceID from env " + hostname) + } + return &types.InstanceID{ + ID: hostname, + Driver: s3fs.Name, + }, nil +} + +// baseURI is base URI: s3fs:// +var baseURI = fmt.Sprintf("%s://", s3fs.Name) + +// IsS3FSURI checks if uri has requried prefix +func IsS3FSURI(uri string) bool { + return strings.HasPrefix(uri, baseURI) +} + +// BucketFromURI extracts bucket name from device URI +func BucketFromURI(uri string) string { + return strings.TrimPrefix(uri, baseURI) +} + +// BucketURI makes bucket URI in form s3fs://bucket +func BucketURI(bucket string) string { + return fmt.Sprintf("%s%s", baseURI, bucket) +} + +// FindBucket finds mounted bucket name by mount point +func FindBucket( + ctx types.Context, + mountPoint string) (string, bool) { + + if buckets, err := MountedBuckets(ctx); err == nil { + for b, mp := range buckets { + if mp == mountPoint { + return b, true + } + } + } + return "", false +} + +// FindMountPoint finds mount point by bucket name +func FindMountPoint( + ctx types.Context, + bucket string) (string, bool) { + + if buckets, err := MountedBuckets(ctx); err == nil { + b, ok := buckets[bucket] + return b, ok + } + return "", false +} + +// MountedBuckets enumerates mounted bucket +// and returns a map of buckets to their mount points +func MountedBuckets( + ctx types.Context) (map[string]string, error) { + + buckets := map[string]string{} + command := exec.Command("bash", "-c", + fmt.Sprintf("ps ax | awk '{if($5==\"%s\"){print($6\",\"$7)}}'", + s3fs.CmdName)) + output, err := command.CombinedOutput() + if err == nil { + for _, text := range strings.Split(string(output), "\n") { + if text != "" { + pair := strings.Split(text, ",") + buckets[pair[0]] = pair[1] + } + } + } else { + ctx.Warning(fmt.Sprintf(fmt.Sprintf( + "Cant read s3fs processes: %s", + string(output)))) + } + ctx.Debugf("DBG: mounted buckets: %s", buckets) + return buckets, nil +} + +// Mount performs mounting via s3fs fuse cmd +func Mount( + ctx types.Context, + credFile, bucket, mountPoint string, + opts *types.DeviceMountOpts) error { + + ctx.Debugf("DBG: s3fs mount bucket '%s' to mount point '%s'", + bucket, mountPoint) + + // TODO: use opts + command := exec.Command( + s3fs.CmdName, bucket, mountPoint, + "-o", fmt.Sprintf("passwd_file=%s", credFile)) + output, err := command.CombinedOutput() + if err != nil { + return goof.WithError(fmt.Sprintf( + "failed to mount bucket %s, output '%s'", + bucket, string(output)), err) + } + + return nil +} diff --git a/drivers/storage/s3fs/utils/utils_test.go b/drivers/storage/s3fs/utils/utils_test.go new file mode 100644 index 00000000..55839939 --- /dev/null +++ b/drivers/storage/s3fs/utils/utils_test.go @@ -0,0 +1,28 @@ +// +build !libstorage_storage_driver libstorage_storage_driver_s3fs + +package utils + +import ( + "os" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/codedellemc/libstorage/api/context" +) + +func skipTest(t *testing.T) { + if ok, _ := strconv.ParseBool(os.Getenv("S3FS_UTILS_TEST")); !ok { + t.Skip() + } +} + +func TestInstanceID(t *testing.T) { + skipTest(t) + iid, err := InstanceID(context.Background()) + if !assert.NoError(t, err) { + t.FailNow() + } + t.Logf("instanceID=%s", iid.String()) +} diff --git a/drivers/storage/s3fs/utils/utils_unix.go b/drivers/storage/s3fs/utils/utils_unix.go new file mode 100644 index 00000000..a26b0aef --- /dev/null +++ b/drivers/storage/s3fs/utils/utils_unix.go @@ -0,0 +1,16 @@ +// +build !windows +// +build !libstorage_storage_driver libstorage_storage_driver_s3fs + +package utils + +import ( + "github.com/codedellemc/libstorage/api/types" +) + +// NextDeviceInfo is the NextDeviceInfo object for S3FS. +// +var NextDeviceInfo = &types.NextDeviceInfo{ + Prefix: "", + Pattern: "", + Ignore: true, +} diff --git a/imports/executors/imports_executor.go b/imports/executors/imports_executor.go index 96108941..9e477a26 100644 --- a/imports/executors/imports_executor.go +++ b/imports/executors/imports_executor.go @@ -8,6 +8,7 @@ import ( _ "github.com/codedellemc/libstorage/drivers/storage/efs/executor" _ "github.com/codedellemc/libstorage/drivers/storage/isilon/executor" _ "github.com/codedellemc/libstorage/drivers/storage/rbd/executor" + _ "github.com/codedellemc/libstorage/drivers/storage/s3fs/executor" _ "github.com/codedellemc/libstorage/drivers/storage/scaleio/executor" _ "github.com/codedellemc/libstorage/drivers/storage/vbox/executor" _ "github.com/codedellemc/libstorage/drivers/storage/vfs/executor" diff --git a/imports/executors/imports_executor_s3fs.go b/imports/executors/imports_executor_s3fs.go new file mode 100644 index 00000000..9c063921 --- /dev/null +++ b/imports/executors/imports_executor_s3fs.go @@ -0,0 +1,8 @@ +// +build libstorage_storage_executor,libstorage_storage_executor_s3fs + +package executors + +import ( + // load the packages + _ "github.com/codedellemc/libstorage/drivers/storage/s3fs/executor" +) diff --git a/imports/remote/imports_remote.go b/imports/remote/imports_remote.go index 20fd0457..152cfd30 100644 --- a/imports/remote/imports_remote.go +++ b/imports/remote/imports_remote.go @@ -8,6 +8,7 @@ import ( _ "github.com/codedellemc/libstorage/drivers/storage/efs/storage" _ "github.com/codedellemc/libstorage/drivers/storage/isilon/storage" _ "github.com/codedellemc/libstorage/drivers/storage/rbd/storage" + _ "github.com/codedellemc/libstorage/drivers/storage/s3fs/storage" _ "github.com/codedellemc/libstorage/drivers/storage/scaleio/storage" _ "github.com/codedellemc/libstorage/drivers/storage/vbox/storage" _ "github.com/codedellemc/libstorage/drivers/storage/vfs/storage" diff --git a/imports/remote/imports_remote_s3fs.go b/imports/remote/imports_remote_s3fs.go new file mode 100644 index 00000000..56fb2ec9 --- /dev/null +++ b/imports/remote/imports_remote_s3fs.go @@ -0,0 +1,8 @@ +// +build libstorage_storage_driver,libstorage_storage_driver_s3fs + +package remote + +import ( + // load the packages + _ "github.com/codedellemc/libstorage/drivers/storage/s3fs/storage" +)