Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bucket s3 client #2

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cmd/sample-cosi-driver/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var (

objectStoreAccessKey = ""
objectStoreSecretKey = ""
objectStoreEndpoint = ""
s3Endpoint = ""
objectscaleGateway = ""
)

var cmd = &cobra.Command{
Expand Down Expand Up @@ -68,11 +69,17 @@ func init() {
driverAddress,
"path to unix domain socket where driver should listen")

stringFlag(&objectStoreEndpoint,
stringFlag(&s3Endpoint,
"object-store-endpoint",
"m",
objectStoreEndpoint,
"endpoint where object store is listening")
"e",
s3Endpoint,
"ObjectStore S3 endpoint URL")

stringFlag(&objectscaleGateway,
"object-scale-gateway",
"g",
objectscaleGateway,
"ObjectScale gateway URL")

stringFlag(&objectStoreAccessKey,
"object-store-access-key",
Expand All @@ -96,12 +103,13 @@ func init() {

func run(ctx context.Context, args []string) error {
fmt.Println("Starting COSI driver " + provisionerName)
fmt.Println("S3 endpoint: " + objectStoreEndpoint)
fmt.Println("S3 endpoint: " + s3Endpoint)

identityServer, bucketProvisioner, err := pkg.NewDriver(
ctx,
provisionerName,
objectStoreEndpoint,
s3Endpoint,
objectscaleGateway,
objectStoreAccessKey,
objectStoreSecretKey,
)
Expand Down
24 changes: 18 additions & 6 deletions pkg/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,33 @@ package pkg

import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"net/url"
"sigs.k8s.io/cosi-driver-sample/pkg/objectscale"
)

func NewDriver(
ctx context.Context,
provisioner, objectStoreEndpoint, objectStoreAccessKey, objectStoreSecretKey string,
provisioner, s3Endpoint, objectscaleGateway, objectStoreAccessKey, objectStoreSecretKey string,
) (*IdentityServer, *ProvisionerServer, error) {
objectscaleGatewayUrl, e := url.Parse(objectscaleGateway)
if e != nil {
return nil, nil, errors.New("Failed to parse Objectscale gateway url: " + e.Error())
}

s3EndpointUrl, e := url.Parse(s3Endpoint)
if e != nil {
return nil, nil, errors.New("Failed to parse S3 endpoint url: " + e.Error())
}

obClient := objectscale.NewObjectScaleClient(
objectscale.ServiceEndpoint{Host: "", Port: 32585},
objectscale.ServiceEndpoint{Host: "", Port: 31651},
objectscaleGatewayUrl, //objectscale.ServiceEndpoint{Host: "", Port: 32585},
s3EndpointUrl, //objectscale.ServiceEndpoint{Host: "", Port: 31651},
objectStoreAccessKey,
objectStoreSecretKey,
)
Expand All @@ -39,7 +51,7 @@ func NewDriver(

fmt.Printf("Connecting to Object store...\n")
sess, err := session.NewSession(&aws.Config{
Endpoint: &objectStoreEndpoint,
Endpoint: &s3Endpoint,
S3ForcePathStyle: aws.Bool(true),
Credentials: creds,
Region: &region,
Expand All @@ -54,15 +66,15 @@ func NewDriver(
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Printf("Successfully connected to Object store %s\n", objectStoreEndpoint)
fmt.Printf("Successfully connected to Object store %s\n", s3Endpoint)
}
}

return &IdentityServer{
provisioner: provisioner,
}, &ProvisionerServer{
provisioner: provisioner,
endpoint: objectStoreEndpoint,
endpoint: s3Endpoint,
accessKeyId: objectStoreAccessKey,
secretKeyId: objectStoreSecretKey,
objectScaleClient: obClient,
Expand Down
36 changes: 20 additions & 16 deletions pkg/objectscale/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package objectscale

import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
cosi "sigs.k8s.io/container-object-storage-interface-spec"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -21,14 +22,9 @@ type objectScaleService struct {
client *ObjectScaleClient
}

type ServiceEndpoint struct {
Host string
Port int
}

type ObjectScaleClient struct {
objectScaleGtwEndpoint ServiceEndpoint
s3Endpoint ServiceEndpoint
objectScaleGtwEndpoint *url.URL
s3Endpoint *url.URL

credentials *credentials.Credentials
sess *session.Session
Expand All @@ -40,9 +36,9 @@ type ObjectScaleClient struct {
S3 *S3Service
}

func NewObjectScaleClient(objectScaleGtwEndpoint ServiceEndpoint, s3Endpoint ServiceEndpoint, accessKeyId, secretKey string) *ObjectScaleClient {
func NewObjectScaleClient(objectScaleGateway, s3Endpoint *url.URL, accessKeyId, secretKey string) *ObjectScaleClient {
client := &ObjectScaleClient{}
client.objectScaleGtwEndpoint = objectScaleGtwEndpoint
client.objectScaleGtwEndpoint = objectScaleGateway
client.s3Endpoint = s3Endpoint
client.credentials = credentials.NewStaticCredentials(accessKeyId, secretKey, "")

Expand All @@ -57,22 +53,22 @@ func NewObjectScaleClient(objectScaleGtwEndpoint ServiceEndpoint, s3Endpoint Ser
Region: aws.String(DefaultRegion),
S3ForcePathStyle: aws.Bool(true),
}))
client.iam = client.newIamClient(objectScaleGtwEndpoint, client.credentials)
client.s3 = client.newS3Client(s3Endpoint, client.credentials)
client.iam = client.newIamClient(objectScaleGateway)
client.s3 = client.newS3Client(s3Endpoint)

client.service.client = client
client.Iam = (*IamService)(&client.service)
client.S3 = (*S3Service)(&client.service)
return client
}

func (client *ObjectScaleClient) newIamClient(endpoint ServiceEndpoint, credentials *credentials.Credentials) *iam.IAM {
cfg := aws.NewConfig().WithEndpoint(fmt.Sprintf("https://%s:%d/iam", endpoint.Host, endpoint.Port)).WithLogLevel(aws.LogDebugWithHTTPBody)
func (client *ObjectScaleClient) newIamClient(endpoint *url.URL) *iam.IAM {
cfg := aws.NewConfig().WithEndpoint(endpoint.String()).WithLogLevel(aws.LogDebugWithHTTPBody)
return iam.New(client.sess, cfg)
}

func (client *ObjectScaleClient) newS3Client(endpoint ServiceEndpoint, credentials *credentials.Credentials) *s3.S3 {
cfg := aws.NewConfig().WithEndpoint(fmt.Sprintf("https://%s:%d", endpoint.Host, endpoint.Port)).WithLogLevel(aws.LogDebugWithHTTPBody)
func (client *ObjectScaleClient) newS3Client(endpoint *url.URL) *s3.S3 {
cfg := aws.NewConfig().WithEndpoint(endpoint.String()).WithLogLevel(aws.LogDebugWithHTTPBody)
return s3.New(client.sess, cfg)
}

Expand All @@ -82,3 +78,11 @@ func HandleError(err error) error {
}
return err
}

func GetS3Region(protocol *cosi.Protocol) *string {
if protocol != nil && protocol.GetS3() != nil && protocol.GetS3().GetRegion() != "" {
return aws.String(protocol.GetS3().GetRegion())
} else {
return aws.String(DefaultRegion)
}
}
50 changes: 47 additions & 3 deletions pkg/objectscale/s3Service.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,55 @@
package objectscale

import "github.com/aws/aws-sdk-go/service/s3"
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type S3Service objectScaleService

func (t *S3Service) CreateBucket(name string) error {
return nil
func (t *S3Service) CreateBucket(bucketName string) (*s3.CreateBucketOutput, error) {
s3Config := &aws.Config{
Credentials: t.client.credentials,
Endpoint: aws.String(t.client.s3Endpoint.String()),
Region: GetS3Region(nil),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
}

s3Client := s3.New(session.New(s3Config))
out, err := s3Client.CreateBucket(
&s3.CreateBucketInput{
Bucket: aws.String(bucketName), // Required
})

return out, err
}

func (t *S3Service) DeleteBucket(bucketName string) (*s3.DeleteBucketOutput, error) {
s3Config := &aws.Config{
Credentials: t.client.credentials,
Endpoint: aws.String(t.client.s3Endpoint.String()),
Region: GetS3Region(nil),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
}

s3Client := s3.New(session.New(s3Config))

out, err := s3Client.DeleteBucket(
&s3.DeleteBucketInput{
Bucket: aws.String(bucketName), // Required
})
if err != nil {
fmt.Println(err.Error())
return nil, status.Error(codes.Internal, "ProvisionerDeleteBucket: operation failed")
}

return out, nil
}

func (t *S3Service) ListBuckets() ([]*s3.Bucket, error) {
Expand Down
44 changes: 3 additions & 41 deletions pkg/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
cosi "sigs.k8s.io/container-object-storage-interface-spec"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)

type ProvisionerServer struct {
Expand All @@ -49,22 +44,10 @@ func (s *ProvisionerServer) ProvisionerCreateBucket(
) (*cosi.ProvisionerCreateBucketResponse, error) {
fmt.Println("Creating bucket " + req.GetName())

s3Config := &aws.Config{
Credentials: credentials.NewStaticCredentials(s.accessKeyId, s.secretKeyId, ""),
Endpoint: aws.String(s.endpoint),
Region: getS3Region(req.GetProtocol()),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
}

s3Client := s3.New(session.New(s3Config))
out, err := s3Client.CreateBucket(
&s3.CreateBucketInput{
Bucket: aws.String(req.GetName()), // Required
})
out, err := s.objectScaleClient.S3.CreateBucket(req.GetName())
if err != nil {
fmt.Println(err.Error())
return nil, status.Error(codes.Internal, "ProvisionerCreateBucket: operation failed")
return nil, status.Error(codes.Internal, "ProvisionerCreateBucket: operation failed: "+err.Error())
}

fmt.Println("Created bucket " + req.GetName() + " : " + out.GoString())
Expand All @@ -78,20 +61,7 @@ func (s *ProvisionerServer) ProvisionerDeleteBucket(
) (*cosi.ProvisionerDeleteBucketResponse, error) {
fmt.Println("Deleting bucket id " + req.GetBucketId())

s3Config := &aws.Config{
Credentials: credentials.NewStaticCredentials(s.accessKeyId, s.secretKeyId, ""),
Endpoint: aws.String(s.endpoint),
Region: getS3Region(nil), // ahaha, no protocol in delete request!
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
}

s3Client := s3.New(session.New(s3Config))

out, err := s3Client.DeleteBucket(
&s3.DeleteBucketInput{
Bucket: aws.String(req.GetBucketId()), // Required
})
out, err := s.objectScaleClient.S3.DeleteBucket(req.GetBucketId())
if err != nil {
fmt.Println(err.Error())
return nil, status.Error(codes.Internal, "ProvisionerDeleteBucket: operation failed")
Expand Down Expand Up @@ -124,11 +94,3 @@ func (s *ProvisionerServer) ProvisionerRevokeBucketAccess(ctx context.Context,

return nil, status.Error(codes.Unimplemented, "ProvisionerCreateBucket: not implemented")
}

func getS3Region(protocol *cosi.Protocol) *string {
if protocol != nil && protocol.GetS3() != nil && protocol.GetS3().GetRegion() != "" {
return aws.String(protocol.GetS3().GetRegion())
} else {
return aws.String(objectscale.DefaultRegion)
}
}
3 changes: 2 additions & 1 deletion resources/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ metadata:
app.kubernetes.io/name: cosi-driver-sample
type: Opaque
stringData:
OBJECT_STORE_ENDPOINT: "http://10.243.23.16:30813"
OBJECT_SCALE_GATEWAY: "http://10.243.23.16:32585"
OBJECT_STORE_ENDPOINT: "http://10.243.23.16:31651"
OBJECT_STORE_ACCESS_KEY: "OKIA53EA314F369AAAFA"
OBJECT_STORE_SECRET_KEY: "1fixtMYMBr+9RpfOMn2ePpPZT30RqwHkTia1ypVM"
V: "3"