diff --git a/examples/common/csi-wekafs-api-secret.yaml b/examples/common/csi-wekafs-api-secret.yaml index 8dbad7c8f..46583e7e8 100644 --- a/examples/common/csi-wekafs-api-secret.yaml +++ b/examples/common/csi-wekafs-api-secret.yaml @@ -17,3 +17,5 @@ data: endpoints: MTcyLjMxLjQxLjU0OjE0MDAwLDE3Mi4zMS40Ny4xNTI6MTQwMDAsMTcyLjMxLjM4LjI1MDoxNDAwMCwxNzIuMzEuNDcuMTU1OjE0MDAwLDE3Mi4zMS4zMy45MToxNDAwMCwxNzIuMzEuMzguMTU1OjE0MDAwCg== # protocol to use for API connection (may be either http or https, base64-encoded) scheme: aHR0cA== + # for multiple clusters setup, set specific container name rather than attempt to idenfity it automatically + localContainerName: "" diff --git a/go.mod b/go.mod index 4d0b8fc4a..1e6c8841d 100644 --- a/go.mod +++ b/go.mod @@ -39,10 +39,10 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect - golang.org/x/net v0.11.0 // indirect - golang.org/x/sys v0.9.0 // indirect - golang.org/x/text v0.10.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect + golang.org/x/net v0.12.0 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect k8s.io/klog/v2 v2.100.1 // indirect - k8s.io/utils v0.0.0-20230505201702-9f6742963106 // indirect + k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect ) diff --git a/go.sum b/go.sum index fa26bac00..fb4206197 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/container-storage-interface/spec v1.8.0 h1:D0vhF3PLIZwlwZEf2eNbpujGCN github.com/container-storage-interface/spec v1.8.0/go.mod h1:ROLik+GhPslwwWRNFF1KasPzroNARibH2rfz1rkg4H0= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= @@ -28,9 +26,6 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kubernetes-csi/csi-lib-utils v0.14.0 h1:pusB32LkSd7GhuT8Z6cyRFqByujc28ygWV97ndaT19s= github.com/kubernetes-csi/csi-lib-utils v0.14.0/go.mod h1:uX8xidqxGJOLXtsfCCVsxWtZl/9NiLyd2DD3Nb+KoP4= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= @@ -44,14 +39,10 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= @@ -77,12 +68,10 @@ go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiM go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= -golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= -golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= -golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= -golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= @@ -92,22 +81,19 @@ golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= -golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58= -golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= -google.golang.org/grpc v1.56.1 h1:z0dNfjIl0VpaZ9iSVjA6daGatAYwPGstTjt5vkRMFkQ= -google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/grpc v1.56.2 h1:fVRFRnXvU+x6C4IlHZewvJOVHoOv1TUuQyoRsYnB4bI= google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= k8s.io/apimachinery v0.27.3 h1:Ubye8oBufD04l9QnNtW05idcOe9Z3GQN8+7PqmuVcUM= k8s.io/apimachinery v0.27.3/go.mod h1:XNfZ6xklnMCOGGFNqXG7bUrQCoR04dh/E7FprV6pb+E= @@ -117,5 +103,5 @@ k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/mount-utils v0.27.3 h1:oubkDKLTZUneW27wgyOmp8a1AAZj04vGmtq+YW8wdvY= k8s.io/mount-utils v0.27.3/go.mod h1:vmcjYdi2Vg1VTWY7KkhvwJVY6WDHxb/QQhiQKkR8iNs= -k8s.io/utils v0.0.0-20230505201702-9f6742963106 h1:EObNQ3TW2D+WptiYXlApGNLVy0zm/JIBVY9i+M4wpAU= -k8s.io/utils v0.0.0-20230505201702-9f6742963106/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20230711102312-30195339c3c7 h1:ZgnF1KZsYxWIifwSNZFZgNtWE89WI5yiP5WwlfDoIyc= +k8s.io/utils v0.0.0-20230711102312-30195339c3c7/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/pkg/wekafs/apiclient/apiclient.go b/pkg/wekafs/apiclient/apiclient.go index 9bf393190..d1df9f436 100644 --- a/pkg/wekafs/apiclient/apiclient.go +++ b/pkg/wekafs/apiclient/apiclient.go @@ -58,9 +58,10 @@ type ApiClient struct { refreshTokenExpiryDate time.Time CompatibilityMap *WekaCompatibilityMap clientHash uint32 + hostname string } -func NewApiClient(ctx context.Context, credentials Credentials, allowInsecureHttps bool) (*ApiClient, error) { +func NewApiClient(ctx context.Context, credentials Credentials, allowInsecureHttps bool, hostname string) (*ApiClient, error) { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: allowInsecureHttps}, } @@ -76,6 +77,7 @@ func NewApiClient(ctx context.Context, credentials Credentials, allowInsecureHtt Credentials: credentials, CompatibilityMap: &WekaCompatibilityMap{}, currentEndpointId: -1, + hostname: hostname, } log.Ctx(ctx).Trace().Bool("insecure_skip_verify", allowInsecureHttps).Msg("Creating new API client") a.clientHash = a.generateHash() @@ -572,11 +574,12 @@ type ApiObjectRequest interface { } type Credentials struct { - Username string - Password string - Organization string - HttpScheme string - Endpoints []string + Username string + Password string + Organization string + HttpScheme string + Endpoints []string + LocalContainerName string } func (c *Credentials) String() string { diff --git a/pkg/wekafs/apiclient/cluster.go b/pkg/wekafs/apiclient/cluster.go index 062ff006a..84fb93bc7 100644 --- a/pkg/wekafs/apiclient/cluster.go +++ b/pkg/wekafs/apiclient/cluster.go @@ -2,9 +2,11 @@ package apiclient import ( "context" + "errors" "fmt" "github.com/google/uuid" "github.com/rs/zerolog/log" + "time" ) const ApiPathLogin = "login" @@ -15,7 +17,9 @@ const ApiPathRefresh = "login/refresh" const ApiPathClusterInfo = "cluster" -//updateTokensExpiryInterval fetches the refresh token expiry from API +const ApiContainersInfo = "containers" + +// updateTokensExpiryInterval fetches the refresh token expiry from API func (a *ApiClient) updateTokensExpiryInterval(ctx context.Context) error { responseData := &TokenExpiryResponse{} if err := a.Get(ctx, ApiPathTokenExpiry, nil, responseData); err != nil { @@ -47,6 +51,7 @@ func (a *ApiClient) fetchClusterInfo(ctx context.Context) error { logger.Info().Msg(fmt.Sprintf("Cluster compatibility for authenticated filesystem mounts: %t", a.SupportsAuthenticatedMounts())) logger.Info().Msg(fmt.Sprintf("Cluster compatibility for new filesystem from snapshot: %t", a.SupportsNewFileSystemFromSnapshot())) logger.Info().Msg(fmt.Sprintf("Cluster compatibility for cloning filesystems: %t", a.SupportsFilesystemCloning())) + logger.Info().Msg(fmt.Sprintf("Cluster compatibility for supporting multiple connections: %t", a.SupportsMultipleClusters())) return nil } @@ -102,3 +107,109 @@ type ClusterInfoResponse struct { Guid uuid.UUID `json:"guid"` Capacity Capacity `json:"capacity,omitempty"` } + +type Container struct { + Id string `json:"id,omitempty"` + SwReleaseString string `json:"sw_release_string,omitempty"` + Mode string `json:"mode,omitempty"` + ContainerName string `json:"container_name,omitempty"` + FailureDomain string `json:"failure_domain,omitempty"` + AddedTime time.Time `json:"added_time,omitempty"` + Uid string `json:"uid,omitempty"` + DrivesDedicatedCores int `json:"drives_dedicated_cores,omitempty"` + Hostname string `json:"hostname,omitempty"` + Ips []string `json:"ips,omitempty"` + MemberOfLeadership bool `json:"member_of_leadership,omitempty"` + Cloud struct { + InstanceType string `json:"instance_type,omitempty"` + Provider string `json:"provider,omitempty"` + AvailabilityZone string `json:"availability_zone,omitempty"` + InstanceId string `json:"instance_id,omitempty"` + } `json:"cloud,omitempty"` + LastFailureTime interface{} `json:"last_failure_time,omitempty"` + State string `json:"state,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + Aws struct { + InstanceType string `json:"instance_type,omitempty"` + Provider string `json:"provider,omitempty"` + AvailabilityZone string `json:"availability_zone,omitempty"` + InstanceId string `json:"instance_id,omitempty"` + } `json:"aws,omitempty"` + SwVersion string `json:"sw_version,omitempty"` + OsInfo struct { + KernelName string `json:"kernel_name,omitempty"` + Platform string `json:"platform,omitempty"` + KernelVersion string `json:"kernel_version,omitempty"` + OsName string `json:"os_name,omitempty"` + KernelRelease string `json:"kernel_release,omitempty"` + Drivers struct { + Ixgbe string `json:"ixgbe,omitempty"` + Ixgbevf string `json:"ixgbevf,omitempty"` + Mlx5Core string `json:"mlx5_core,omitempty"` + IbUverbs string `json:"ib_uverbs,omitempty"` + UioPciGeneric string `json:"uio_pci_generic,omitempty"` + } `json:"drivers,omitempty"` + } `json:"os_info,omitempty"` + LastFailureCode interface{} `json:"last_failure_code,omitempty"` + CoresIds []int `json:"cores_ids,omitempty"` + Memory int `json:"memory,omitempty"` + FrontendDedicatedCores int `json:"frontend_dedicated_cores,omitempty"` + FailureDomainType string `json:"failure_domain_type,omitempty"` + LeadershipRole interface{} `json:"leadership_role,omitempty"` + StateChangedTime time.Time `json:"state_changed_time,omitempty"` + Status string `json:"status,omitempty"` + Cores int `json:"cores,omitempty"` + HwMachineIdentifier string `json:"hw_machine_identifier,omitempty"` + IsDedicated bool `json:"is_dedicated,omitempty"` + LastFailure interface{} `json:"last_failure,omitempty"` + MgmtPort int `json:"mgmt_port,omitempty"` + AutoRemoveTimeout interface{} `json:"auto_remove_timeout,omitempty"` + TotalScrubberLimit int `json:"total_scrubber_limit,omitempty"` + ServerIdentifier string `json:"server_identifier,omitempty"` + IoProcesses int `json:"io_processes,omitempty"` + ContainerIp string `json:"container_ip,omitempty"` +} + +type ContainersResponse []Container + +func (a *ApiClient) getContainers(ctx context.Context) (*ContainersResponse, error) { + responseData := &ContainersResponse{} + err := a.Get(ctx, ApiContainersInfo, nil, responseData) + return responseData, err +} + +func (a *ApiClient) GetLocalContainer(ctx context.Context) (*Container, error) { + logger := log.Ctx(ctx) + logger.Info().Str("hostname", a.hostname).Msg("Fetching client containers on host") + allContainers, err := a.getContainers(ctx) + if err != nil { + return nil, err + } + + var ret []Container + for _, container := range *allContainers { + if container.Hostname == a.hostname { + if container.Mode == "backend" { + logger.Trace().Str("container_hostname", container.Hostname).Msg("Skipping a backend container") + continue + } + if container.State != "ACTIVE" { + logger.Trace().Str("container_hostname", container.Hostname).Msg("Skipping an INACTIVE container") + continue + } + logger.Debug().Str("container_hostname", container.Hostname).Msg("Found a valid container") + ret = append(ret, container) + } + } + if len(ret) == 1 { + return &ret[0], nil + } else if len(ret) > 1 { + err := errors.New("could not determine local client containers, ambiguous hostname") + logger.Error().Err(err).Msg("Cannot fetch local container") + return nil, err + } else { + err := errors.New("could not find any local client container") + logger.Error().Err(err).Msg("Cannot fetch local container") + return nil, err + } +} diff --git a/pkg/wekafs/apiclient/compatibility.go b/pkg/wekafs/apiclient/compatibility.go index 21db233a8..8adc731d6 100644 --- a/pkg/wekafs/apiclient/compatibility.go +++ b/pkg/wekafs/apiclient/compatibility.go @@ -16,6 +16,7 @@ type WekaCompatibilityRequiredVersions struct { CloneFilesystem string UrlQueryParams string SyncOnCloseMountOption string + SingleClientMultipleClusters string } var MinimumSupportedWekaVersions = &WekaCompatibilityRequiredVersions{ @@ -29,6 +30,7 @@ var MinimumSupportedWekaVersions = &WekaCompatibilityRequiredVersions{ CloneFilesystem: "v9.99", // can clone a volume directly on storage side UrlQueryParams: "v4.0", // can perform URL query by fields SyncOnCloseMountOption: "v4.2", // can perform sync_on_close mount option + SingleClientMultipleClusters: "v4.2", // single client can have multiple Weka cluster connections } type WekaCompatibilityMap struct { @@ -42,6 +44,7 @@ type WekaCompatibilityMap struct { CloneFilesystem bool UrlQueryParams bool SyncOnCloseMountOption bool + SingleClientMultipleClusters bool } func (cm *WekaCompatibilityMap) fillIn(versionStr string) { @@ -58,6 +61,7 @@ func (cm *WekaCompatibilityMap) fillIn(versionStr string) { cm.QuotaOnSnapshot = false cm.UrlQueryParams = false cm.SyncOnCloseMountOption = false + cm.SingleClientMultipleClusters = false return } d, _ := version.NewVersion(MinimumSupportedWekaVersions.DirectoryAsCSIVolume) @@ -70,6 +74,7 @@ func (cm *WekaCompatibilityMap) fillIn(versionStr string) { qs, _ := version.NewVersion(MinimumSupportedWekaVersions.QuotaOnSnapshot) u, _ := version.NewVersion(MinimumSupportedWekaVersions.UrlQueryParams) sc, _ := version.NewVersion(MinimumSupportedWekaVersions.SyncOnCloseMountOption) + mc, _ := version.NewVersion(MinimumSupportedWekaVersions.SingleClientMultipleClusters) cm.DirectoryAsCSIVolume = v.GreaterThanOrEqual(d) cm.FilesystemAsCSIVolume = v.GreaterThanOrEqual(f) @@ -81,6 +86,7 @@ func (cm *WekaCompatibilityMap) fillIn(versionStr string) { cm.QuotaOnSnapshot = v.GreaterThanOrEqual(qs) cm.UrlQueryParams = v.GreaterThanOrEqual(u) cm.SyncOnCloseMountOption = v.GreaterThanOrEqual(sc) + cm.SingleClientMultipleClusters = v.GreaterThanOrEqual(mc) } func (a *ApiClient) SupportsQuotaDirectoryAsVolume() bool { @@ -122,3 +128,7 @@ func (a *ApiClient) SupportsUrlQueryParams() bool { func (a *ApiClient) SupportsSyncOnCloseMountOption() bool { return a.CompatibilityMap.SyncOnCloseMountOption } + +func (a *ApiClient) SupportsMultipleClusters() bool { + return a.CompatibilityMap.SingleClientMultipleClusters +} diff --git a/pkg/wekafs/controllerserver.go b/pkg/wekafs/controllerserver.go index 5a32d66db..a89c9fd79 100644 --- a/pkg/wekafs/controllerserver.go +++ b/pkg/wekafs/controllerserver.go @@ -54,6 +54,10 @@ func (cs *ControllerServer) getDefaultMountOptions() MountOptions { return getDefaultMountOptions().MergedWith(NewMountOptionsFromString(ControlServerAdditionalMountOptions), cs.getConfig().mutuallyExclusiveOptions) } +func (cs *ControllerServer) getNodeId() string { + return cs.nodeID +} + func (cs *ControllerServer) isInDevMode() bool { return cs.getConfig().isInDevMode() } diff --git a/pkg/wekafs/interfaces.go b/pkg/wekafs/interfaces.go index 37c2f9d7a..75af3f50f 100644 --- a/pkg/wekafs/interfaces.go +++ b/pkg/wekafs/interfaces.go @@ -6,4 +6,5 @@ type AnyServer interface { getConfig() *DriverConfig isInDevMode() bool // TODO: Rename to isInDevMode getDefaultMountOptions() MountOptions + getNodeId() string } diff --git a/pkg/wekafs/mount.go b/pkg/wekafs/mount.go index e16ad6ffc..cef44aafb 100644 --- a/pkg/wekafs/mount.go +++ b/pkg/wekafs/mount.go @@ -2,6 +2,7 @@ package wekafs import ( "context" + "errors" "fmt" "github.com/rs/zerolog/log" "github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient" @@ -90,19 +91,63 @@ func (m *wekaMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger() mountToken := "" var mountOptionsSensitive []string + var localContainerName string if err := os.MkdirAll(m.mountPoint, DefaultVolumePermissions); err != nil { return err } if !m.isInDevMode() { + pattern := "/proc/wekafs/*/queue" + containerPaths, err := filepath.Glob(pattern) + if err != nil || len(containerPaths) == 0 { + logger.Error().Err(err).Msg("Failed to fetch WekaFS containers on host, cannot mount filesystem without Weka container") + return err + } else if len(containerPaths) == 0 { + logger.Error().Err(err).Msg("Failed to find active Weka container, cannot mount filesystem") + return err + } + if apiClient == nil { logger.Trace().Msg("No API client for mount, not requesting mount token") } else { - var err error - if mountToken, err = apiClient.GetMountTokenForFilesystemName(ctx, m.fsName); err != nil { - return err + if len(containerPaths) > 1 { + localContainerName = apiClient.Credentials.LocalContainerName + var err error + if mountToken, err = apiClient.GetMountTokenForFilesystemName(ctx, m.fsName); err != nil { + return err + } + mountOptionsSensitive = append(mountOptionsSensitive, fmt.Sprintf("token=%s", mountToken)) + if apiClient.SupportsMultipleClusters() { + if localContainerName != "" { + logger.Info().Str("local_container_name", localContainerName).Msg("Local container name set by secrets") + } else { + container, err := apiClient.GetLocalContainer(ctx) + if err != nil || container == nil { + logger.Warn().Err(err).Msg("Failed to determine local container, assuming default") + } else { + localContainerName = container.ContainerName + } + + } + if localContainerName != "" { + for _, p := range containerPaths { + containerName := filepath.Base(filepath.Dir(p)) + if localContainerName == containerName { + mountOptions.customOptions["container_name"] = mountOption{ + option: "container_name", + value: localContainerName, + } + + break + } + } + + } else { + logger.Error().Err(errors.New("Could not determine container name, refer to documentation on handling multiple clusters clients with Kubernetes")).Msg("Failed to mount") + } + } } - mountOptionsSensitive = append(mountOptionsSensitive, fmt.Sprintf("token=%s", mountToken)) } + logger.Trace().Strs("mount_options", m.mountOptions.Strings()). Fields(mountOptions).Msg("Performing mount") return m.kMounter.MountSensitive(m.fsName, m.mountPoint, "wekafs", mountOptions.Strings(), mountOptionsSensitive) diff --git a/pkg/wekafs/nodeserver.go b/pkg/wekafs/nodeserver.go index bbe9bd71c..1aba31a15 100644 --- a/pkg/wekafs/nodeserver.go +++ b/pkg/wekafs/nodeserver.go @@ -56,8 +56,12 @@ type NodeServer struct { sync.Mutex } +func (ns *NodeServer) getNodeId() string { + return ns.nodeID +} + func (ns *NodeServer) getDefaultMountOptions() MountOptions { - return getDefaultMountOptions().MergedWith(NewMountOptionsFromString(NodeServerAdditionalMountOptions), ns.getConfig().mutuallyExclusiveOptions) + return getDefaultMountOptions().RemoveOption("acl").MergedWith(NewMountOptionsFromString(NodeServerAdditionalMountOptions), ns.getConfig().mutuallyExclusiveOptions) } func (ns *NodeServer) isInDevMode() bool { @@ -243,7 +247,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis attrib := req.GetVolumeContext() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() - volume.mountOptions.Merge(NewMountOptionsFromString(strings.Join(mountFlags, ",")), ns.getConfig().mutuallyExclusiveOptions) + volume.mountOptions.RemoveOption("acl").Merge(NewMountOptionsFromString(strings.Join(mountFlags, ",")), ns.getConfig().mutuallyExclusiveOptions) logger.Debug().Str("target_path", targetPath). Str("fs_type", fsType). diff --git a/pkg/wekafs/wekafs.go b/pkg/wekafs/wekafs.go index fde8488df..5563055f4 100644 --- a/pkg/wekafs/wekafs.go +++ b/pkg/wekafs/wekafs.go @@ -63,6 +63,7 @@ type ApiStore struct { apis map[uint32]*apiclient.ApiClient legacySecrets *map[string]string allowInsecureHttps bool + Hostname string } // Die used to intentionally panic and exit, while updating termination log @@ -90,7 +91,7 @@ func (api *ApiStore) getByClusterGuid(guid uuid.UUID) (*apiclient.ApiClient, err } // fromSecrets returns a pointer to API by secret contents -func (api *ApiStore) fromSecrets(ctx context.Context, secrets map[string]string) (*apiclient.ApiClient, error) { +func (api *ApiStore) fromSecrets(ctx context.Context, secrets map[string]string, hostname string) (*apiclient.ApiClient, error) { endpointsRaw := strings.TrimSpace(strings.ReplaceAll(strings.TrimSuffix(secrets["endpoints"], "\n"), "\n", ",")) endpoints := func() []string { var ret []string @@ -99,23 +100,30 @@ func (api *ApiStore) fromSecrets(ctx context.Context, secrets map[string]string) } return ret }() + + localContainerName, ok := secrets["localContainerName"] + if !ok { + localContainerName = "" + } + credentials := apiclient.Credentials{ - Username: strings.TrimSpace(strings.TrimSuffix(secrets["username"], "\n")), - Password: strings.TrimSuffix(secrets["password"], "\n"), - Organization: strings.TrimSpace(strings.TrimSuffix(secrets["organization"], "\n")), - Endpoints: endpoints, - HttpScheme: strings.TrimSpace(strings.TrimSuffix(secrets["scheme"], "\n")), + Username: strings.TrimSpace(strings.TrimSuffix(secrets["username"], "\n")), + Password: strings.TrimSuffix(secrets["password"], "\n"), + Organization: strings.TrimSpace(strings.TrimSuffix(secrets["organization"], "\n")), + Endpoints: endpoints, + HttpScheme: strings.TrimSpace(strings.TrimSuffix(secrets["scheme"], "\n")), + LocalContainerName: localContainerName, } - return api.fromCredentials(ctx, credentials) + return api.fromCredentials(ctx, credentials, hostname) } // fromCredentials returns a pointer to API by credentials and endpoints // If this is a new API, it will be created and put in hashmap -func (api *ApiStore) fromCredentials(ctx context.Context, credentials apiclient.Credentials) (*apiclient.ApiClient, error) { +func (api *ApiStore) fromCredentials(ctx context.Context, credentials apiclient.Credentials, hostname string) (*apiclient.ApiClient, error) { logger := log.Ctx(ctx) logger.Trace().Str("api_client", credentials.String()).Msg("Creating new Weka API client") // doing this to fetch a client hash - newClient, err := apiclient.NewApiClient(ctx, credentials, api.allowInsecureHttps) + newClient, err := apiclient.NewApiClient(ctx, credentials, api.allowInsecureHttps, hostname) if err != nil { return nil, errors.New("could not create API client object from supplied params") } @@ -176,7 +184,7 @@ func (api *ApiStore) GetClientFromSecrets(ctx context.Context, secrets map[strin return nil, nil } } - client, err := api.fromSecrets(ctx, secrets) + client, err := api.fromSecrets(ctx, secrets, api.Hostname) if err != nil { logger.Error().Err(err).Msg("Failed to initialize API client from secret, cannot proceed") return nil, err @@ -189,11 +197,12 @@ func (api *ApiStore) GetClientFromSecrets(ctx context.Context, secrets map[strin return client, nil } -func NewApiStore(allowInsecureHttps bool) *ApiStore { +func NewApiStore(allowInsecureHttps bool, hostname string) *ApiStore { s := &ApiStore{ Mutex: sync.Mutex{}, apis: make(map[uint32]*apiclient.ApiClient), allowInsecureHttps: allowInsecureHttps, + Hostname: hostname, } secrets, err := s.GetDefaultSecrets() if err != nil { @@ -235,7 +244,7 @@ func NewWekaFsDriver( version: vendorVersion, endpoint: endpoint, maxVolumesPerNode: maxVolumesPerNode, - api: NewApiStore(config.allowInsecureHttps), + api: NewApiStore(config.allowInsecureHttps, nodeID), debugPath: debugPath, csiMode: csiMode, // either "controller", "node", "all" selinuxSupport: selinuxSupport,