diff --git a/Makefile b/Makefile index 169be53aeb..4403bdc378 100644 --- a/Makefile +++ b/Makefile @@ -73,6 +73,7 @@ GOARCH ?= amd64 LDFLAGS := $(shell cat hack/make/ldflags.txt) LDFLAGS_CSI := $(LDFLAGS) -X "$(MOD_NAME)/pkg/csi/service.version=$(VERSION)" +LDFLAGS_SYNCER := $(LDFLAGS) # The CSI binary. CSI_BIN_NAME := vsphere-csi @@ -87,8 +88,21 @@ $(CSI_BIN): $(CSI_BIN_SRCS) CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -ldflags '$(LDFLAGS_CSI)' -o $(abspath $@) $< @touch $@ +# The Syncer binary. +SYNCER_BIN_NAME := syncer +SYNCER_BIN := $(BIN_OUT)/$(SYNCER_BIN_NAME).$(GOOS)_$(GOARCH) +build-syncer: $(SYNCER_BIN) +ifndef SYNCER_BIN_SRCS +SYNCER_BIN_SRCS := cmd/$(SYNCER_BIN_NAME)/main.go go.mod go.sum +SYNCER_BIN_SRCS += $(addsuffix /*.go,$(shell go list -f '{{ join .Deps "\n" }}' ./cmd/$(SYNCER_BIN_NAME) | grep $(MOD_NAME) | sed 's~$(MOD_NAME)~.~')) +export SYNCER_BIN_SRCS +endif +$(SYNCER_BIN): $(SYNCER_BIN_SRCS) + CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -ldflags '$(LDFLAGS_SYNCER)' -o $(abspath $@) $< + @touch $@ + # The default build target. -build build-bins: $(CSI_BIN) +build build-bins: $(CSI_BIN) $(SYNCER_BIN) build-with-docker: hack/make.sh @@ -110,7 +124,24 @@ $(DIST_CSI_ZIP): $(CSI_BIN) zip -j $(abspath $@) README.md LICENSE "$${_temp_dir}/$(CSI_BIN_NAME)" && \ rm -fr "$${_temp_dir}" -dist: dist-csi-tgz dist-csi-zip +dist-csi: dist-csi-tgz dist-csi-zip + +DIST_SYNCER_NAME := vsphere-syncer-$(VERSION) +DIST_SYNCER_TGZ := $(BUILD_OUT)/dist/$(DIST_SYNCER_NAME)-$(GOOS)_$(GOARCH).tar.gz +dist-syncer-tgz: $(DIST_SYNCER_TGZ) +$(DIST_SYNCER_TGZ): $(SYNCER_BIN) + _temp_dir=$$(mktemp -d) && cp $< "$${_temp_dir}/$(SYNCER_BIN_NAME)" && \ + tar czf $(abspath $@) README.md LICENSE -C "$${_temp_dir}" "$(SYNCER_BIN_NAME)" && \ + rm -fr "$${_temp_dir}" +DIST_SYNCER_ZIP := $(BUILD_OUT)/dist/$(DIST_SYNCER_NAME)-$(GOOS)_$(GOARCH).zip +dist-syncer-zip: $(DIST_SYNCER_ZIP) +$(DIST_SYNCER_ZIP): $(SYNCER_BIN) + _temp_dir=$$(mktemp -d) && cp $< "$${_temp_dir}/$(SYNCER_BIN_NAME)" && \ + zip -j $(abspath $@) README.md LICENSE "$${_temp_dir}/$(SYNCER_BIN_NAME)" && \ + rm -fr "$${_temp_dir}" +dist-syncer: dist-syncer-tgz dist-syncer-zip + +dist: dist-csi dist-syncer ################################################################################ ## DEPLOY ## @@ -129,8 +160,9 @@ deploy: | $(DOCKER_SOCK) clean: @rm -f Dockerfile* rm -f $(CSI_BIN) vsphere-csi-*.tar.gz vsphere-csi-*.zip \ + $(SYNCER_BIN) vsphere-syncer-*.tar.gz vsphere-syncer-*.zip \ image-*.tar image-*.d $(DIST_OUT)/* $(BIN_OUT)/* - GO111MODULE=off go clean -i -x . ./cmd/$(CSI_BIN_NAME) + GO111MODULE=off go clean -i -x . ./cmd/$(CSI_BIN_NAME) ./cmd/$(SYNCER_BIN_NAME) .PHONY: clean-d clean-d: diff --git a/cmd/syncer/main.go b/cmd/syncer/main.go new file mode 100644 index 0000000000..d3dc078a8b --- /dev/null +++ b/cmd/syncer/main.go @@ -0,0 +1,37 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "os" + + "k8s.io/klog" + + metadatasyncer "sigs.k8s.io/vsphere-csi-driver/pkg/syncer" +) + +// main is ignored when this package is built as a go plug-in. +func main() { + klog.InitFlags(nil) + flag.Parse() + metadataSyncer := metadatasyncer.NewInformer() + if err := metadataSyncer.Init(); err != nil { + klog.Errorf("Error initializing Metadata Syncer") + os.Exit(1) + } +} diff --git a/go.mod b/go.mod index 95ee2c1a38..b086213ca6 100644 --- a/go.mod +++ b/go.mod @@ -13,13 +13,13 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/davecgh/go-spew v1.1.1 - github.com/evanphx/json-patch v4.5.0+incompatible // indirect github.com/gogo/protobuf v1.3.0 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/go-cmp v0.3.1 // indirect github.com/googleapis/gnostic v0.3.1 // indirect github.com/gorilla/websocket v1.4.1 // indirect + github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.11.1 // indirect @@ -47,23 +47,19 @@ require ( go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 // indirect - golang.org/x/lint v0.0.0-20190409202823-959b441ac422 // indirect golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 // indirect golang.org/x/sys v0.0.0-20190904005037-43c01164e931 // indirect golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect - golang.org/x/tools v0.0.0-20190903163617-be0da057c5e3 // indirect - google.golang.org/appengine v1.6.2 // indirect google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 // indirect google.golang.org/grpc v1.23.0 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/gcfg.v1 v1.2.3 gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect - honnef.co/go/tools v0.0.1-2019.2.2 // indirect k8s.io/api v0.0.0-20190831074750-7364b6bdad65 k8s.io/apimachinery v0.0.0-20190831074630-461753078381 k8s.io/client-go v0.0.0-20190831074946-3fe2abece89e k8s.io/klog v0.4.0 - k8s.io/sample-controller v0.0.0-20190831080103-b17b22266fdc + k8s.io/sample-controller v0.0.0-20180822125000-be98dc6210ab k8s.io/utils v0.0.0-20190829053155-3a4a5477acf8 // indirect ) diff --git a/go.sum b/go.sum index 0208b5b9c8..0bd9cff041 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,7 @@ github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEo github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= @@ -399,6 +400,8 @@ k8s.io/klog v0.4.0 h1:lCJCxf/LIowc2IGS9TPjWDyXY4nOmdGdfcwwDQCOURQ= k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= +k8s.io/sample-controller v0.0.0-20180822125000-be98dc6210ab h1:nCbBVL6ta3pT1FixeONrGFmoR01Y0z/APJMFtS9iHh4= +k8s.io/sample-controller v0.0.0-20180822125000-be98dc6210ab/go.mod h1:ulrg2qtVB4979TuA1BhaD/oGPZxO5NRWwHbk2FgaC3g= k8s.io/sample-controller v0.0.0-20190831080103-b17b22266fdc h1:JQqSOppU1DHtVO9jtFIRq1odTK3hDEmi+A7xWCjEcU4= k8s.io/sample-controller v0.0.0-20190831080103-b17b22266fdc/go.mod h1:GyUkrAM6QUT61osvjhs7agFIoS1oI9h+/q6xAi0wDcA= k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= diff --git a/hack/release.sh b/hack/release.sh index c949d1731f..593e0b68fb 100755 --- a/hack/release.sh +++ b/hack/release.sh @@ -27,16 +27,20 @@ readonly BASE_IMAGE_REPO=gcr.io/cloud-provider-vsphere # Release images readonly CSI_IMAGE_RELEASE=${BASE_IMAGE_REPO}/csi/release/driver +readonly SYNCER_IMAGE_RELEASE=${BASE_IMAGE_REPO}/csi/release/syncer # PR images readonly CSI_IMAGE_PR=${BASE_IMAGE_REPO}/csi/pr/driver +readonly SYNCER_IMAGE_PR=${BASE_IMAGE_REPO}/csi/pr/syncer # CI images readonly CSI_IMAGE_CI=${BASE_IMAGE_REPO}/csi/ci/driver +readonly SYNCER_IMAGE_CI=${BASE_IMAGE_REPO}/csi/ci/syncer PUSH= LATEST= CSI_IMAGE_NAME= +SYNCER_IMAGE_NAME= VERSION=$(git describe --dirty --always 2>/dev/null) GCR_KEY_FILE="${GCR_KEY_FILE:-}" GOPROXY="${GOPROXY:-}" @@ -91,14 +95,17 @@ function build_images() { ci) # A non-PR, non-release build. This is usually a build off of master CSI_IMAGE_NAME=${CSI_IMAGE_CI} + SYNCER_IMAGE_NAME=${SYNCER_IMAGE_CI} ;; pr) # A PR build CSI_IMAGE_NAME=${CSI_IMAGE_PR} + SYNCER_IMAGE_NAME=${SYNCER_IMAGE_PR} ;; release) # On an annotated tag CSI_IMAGE_NAME=${CSI_IMAGE_RELEASE} + SYNCER_IMAGE_NAME=${SYNCER_IMAGE_RELEASE} ;; esac @@ -110,9 +117,20 @@ function build_images() { --build-arg "VERSION=${VERSION}" \ --build-arg "GOPROXY=${GOPROXY}" \ . + + echo "building ${SYNCER_IMAGE_NAME}:${VERSION}" + docker build \ + -f images/syncer/Dockerfile \ + -t "${SYNCER_IMAGE_NAME}":"${VERSION}" \ + --build-arg "VERSION=${VERSION}" \ + --build-arg "GOPROXY=${GOPROXY}" \ + . if [ "${LATEST}" ]; then echo "tagging image ${CSI_IMAGE_NAME}:${VERSION} as latest" docker tag "${CSI_IMAGE_NAME}":"${VERSION}" "${CSI_IMAGE_NAME}":latest + echo "tagging image ${SYNCER_IMAGE_NAME}:${VERSION} as latest" + docker tag "${SYNCER_IMAGE_NAME}":"${VERSION}" "${SYNCER_IMAGE_NAME}":latest + fi } @@ -125,14 +143,21 @@ function login() { function push_images() { [ "${CSI_IMAGE_NAME}" ] || fatal "CSI_IMAGE_NAME not set" + [ "${SYNCER_IMAGE_NAME}" ] || fatal "SYNCER_IMAGE_NAME not set" login echo "pushing ${CSI_IMAGE_NAME}:${VERSION}" docker push "${CSI_IMAGE_NAME}":"${VERSION}" + echo "pushing ${SYNCER_IMAGE_NAME}:${VERSION}" + docker push "${SYNCER_IMAGE_NAME}":"${VERSION}" + if [ "${LATEST}" ]; then echo "also pushing ${CSI_IMAGE_NAME}:${VERSION} as latest" docker push "${CSI_IMAGE_NAME}":latest + echo "also pushing ${SYNCER_IMAGE_NAME}:${VERSION} as latest" + docker push "${SYNCER_IMAGE_NAME}":latest + fi } diff --git a/images/syncer/Dockerfile b/images/syncer/Dockerfile new file mode 100644 index 0000000000..211934507f --- /dev/null +++ b/images/syncer/Dockerfile @@ -0,0 +1,50 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +################################################################################ +## BUILD ARGS ## +################################################################################ +# This build arg allows the specification of a custom Golang image. +ARG GOLANG_IMAGE=golang:1.12.6 + +# This build arg allows the specification of a custom base image. +ARG BASE_IMAGE=photon:2.0 + +################################################################################ +## BUILD STAGE ## +################################################################################ +FROM ${GOLANG_IMAGE} as builder + +ARG GOPROXY + +WORKDIR /build + +COPY go.mod go.sum ./ + +COPY pkg/ pkg/ + +COPY cmd/ cmd/ + +ENV CGO_ENABLED=0 + +ENV GOPROXY ${GOPROXY:-} + +RUN go build -o vsphere-syncer ./cmd/syncer + +################################################################################ +## MAIN STAGE ## +################################################################################ +FROM ${BASE_IMAGE} + +COPY --from=builder /build/vsphere-syncer /bin/vsphere-syncer + +ENTRYPOINT ["/bin/vsphere-syncer"] diff --git a/pkg/syncer/metadatasyncer.go b/pkg/syncer/metadatasyncer.go new file mode 100644 index 0000000000..de0ce57489 --- /dev/null +++ b/pkg/syncer/metadatasyncer.go @@ -0,0 +1,453 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncer + +import ( + "context" + "errors" + "fmt" + "reflect" + + "github.com/davecgh/go-spew/spew" + csictx "github.com/rexray/gocsi/context" + cnstypes "github.com/vmware/govmomi/cns/types" + v1 "k8s.io/api/core/v1" + "k8s.io/klog" + + volumes "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/volume" + cnsvsphere "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" + cnsconfig "sigs.k8s.io/vsphere-csi-driver/pkg/common/config" + "sigs.k8s.io/vsphere-csi-driver/pkg/csi/service" + "sigs.k8s.io/vsphere-csi-driver/pkg/csi/service/common" + k8s "sigs.k8s.io/vsphere-csi-driver/pkg/kubernetes" +) + +// NewInformer returns uninitialized metadataSyncInformer +func NewInformer() *MetadataSyncInformer { + return &MetadataSyncInformer{} +} + +// Init initializes the Metadata Sync Informer +func (metadataSyncer *MetadataSyncInformer) Init() error { + var err error + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfgPath := csictx.Getenv(ctx, cnsconfig.EnvCloudConfig) + if cfgPath == "" { + cfgPath = cnsconfig.DefaultCloudConfigPath + } + metadataSyncer.cfg, err = cnsconfig.GetCnsconfig(cfgPath) + if err != nil { + klog.Errorf("Failed to parse config. Err: %v", err) + return err + } + + metadataSyncer.vcconfig, err = cnsvsphere.GetVirtualCenterConfig(metadataSyncer.cfg) + if err != nil { + klog.Errorf("Failed to get VirtualCenterConfig. err=%v", err) + return err + } + + // Initialize the virtual center manager + metadataSyncer.virtualcentermanager = cnsvsphere.GetVirtualCenterManager() + + // Register virtual center manager + metadataSyncer.vcenter, err = metadataSyncer.virtualcentermanager.RegisterVirtualCenter(metadataSyncer.vcconfig) + if err != nil { + klog.Errorf("Failed to register VirtualCenter . err=%v", err) + return err + } + + // Connect to VC + err = metadataSyncer.vcenter.Connect(ctx) + if err != nil { + klog.Errorf("Failed to connect to VirtualCenter host: %q. err=%v", metadataSyncer.vcconfig.Host, err) + return err + } + // Create the kubernetes client from config + k8sclient, err := k8s.NewClient() + if err != nil { + klog.Errorf("Creating Kubernetes client failed. Err: %v", err) + return err + } + + // Set up kubernetes resource listeners for metadata syncer + metadataSyncer.k8sInformerManager = k8s.NewInformer(k8sclient) + metadataSyncer.k8sInformerManager.AddPVCListener( + nil, // Add + func(oldObj interface{}, newObj interface{}) { // Update + pvcUpdated(oldObj, newObj, metadataSyncer) + }, + func(obj interface{}) { // Delete + pvcDeleted(obj, metadataSyncer) + }) + metadataSyncer.k8sInformerManager.AddPVListener( + nil, // Add + func(oldObj interface{}, newObj interface{}) { // Update + pvUpdated(oldObj, newObj, metadataSyncer) + }, + func(obj interface{}) { // Delete + pvDeleted(obj, metadataSyncer) + }) + metadataSyncer.k8sInformerManager.AddPodListener( + nil, // Add + func(oldObj interface{}, newObj interface{}) { // Update + podUpdated(oldObj, newObj, metadataSyncer) + }, + func(obj interface{}) { // Delete + podDeleted(obj, metadataSyncer) + }) + metadataSyncer.pvLister = metadataSyncer.k8sInformerManager.GetPVLister() + metadataSyncer.pvcLister = metadataSyncer.k8sInformerManager.GetPVCLister() + klog.V(2).Infof("Initialized metadata syncer") + stopCh := metadataSyncer.k8sInformerManager.Listen() + <-(stopCh) + return nil +} + +// pvcUpdated updates persistent volume claim metadata on VC when pvc labels on K8S cluster have been updated +func pvcUpdated(oldObj, newObj interface{}, metadataSyncer *MetadataSyncInformer) { + // Get old and new pvc objects + oldPvc, ok := oldObj.(*v1.PersistentVolumeClaim) + if oldPvc == nil || !ok { + return + } + newPvc, ok := newObj.(*v1.PersistentVolumeClaim) + if newPvc == nil || !ok { + return + } + + if newPvc.Status.Phase != v1.ClaimBound { + klog.V(3).Infof("PVCUpdated: New PVC not in Bound phase") + return + } + + // Get pv object attached to pvc + pv, err := metadataSyncer.pvLister.Get(newPvc.Spec.VolumeName) + if pv == nil || err != nil { + klog.Errorf("PVCUpdated: Error getting Persistent Volume for pvc %s in namespace %s with err: %v", newPvc.Name, newPvc.Namespace, err) + return + } + + // Verify if pv is vsphere csi volume + if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != service.Name { + klog.V(3).Infof("PVCUpdated: Not a Vsphere CSI Volume") + return + } + + // Verify is old and new labels are not equal + if oldPvc.Status.Phase == v1.ClaimBound && reflect.DeepEqual(newPvc.Labels, oldPvc.Labels) { + klog.V(3).Infof("PVCUpdated: Old PVC and New PVC labels equal") + return + } + + // Create updateSpec + var metadataList []cnstypes.BaseCnsEntityMetadata + pvcMetadata := cnsvsphere.GetCnsKubernetesEntityMetaData(newPvc.Name, newPvc.Labels, false, string(cnstypes.CnsKubernetesEntityTypePVC), newPvc.Namespace) + metadataList = append(metadataList, cnstypes.BaseCnsEntityMetadata(pvcMetadata)) + + updateSpec := &cnstypes.CnsVolumeMetadataUpdateSpec{ + VolumeId: cnstypes.CnsVolumeId{ + Id: pv.Spec.CSI.VolumeHandle, + }, + Metadata: cnstypes.CnsVolumeMetadata{ + ContainerCluster: cnsvsphere.GetContainerCluster(metadataSyncer.cfg.Global.ClusterID, metadataSyncer.cfg.VirtualCenter[metadataSyncer.vcenter.Config.Host].User), + EntityMetadata: metadataList, + }, + } + + klog.V(4).Infof("PVCUpdated: Calling UpdateVolumeMetadata with updateSpec: %+v", spew.Sdump(updateSpec)) + if err := volumes.GetManager(metadataSyncer.vcenter).UpdateVolumeMetadata(updateSpec); err != nil { + klog.Errorf("PVCUpdated: UpdateVolumeMetadata failed with err %v", err) + } +} + +// pvDeleted deletes pvc metadata on VC when pvc has been deleted on K8s cluster +func pvcDeleted(obj interface{}, metadataSyncer *MetadataSyncInformer) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if pvc == nil || !ok { + klog.Warningf("PVCDeleted: unrecognized object %+v", obj) + return + } + klog.V(4).Infof("PVCDeleted: %+v", pvc) + if pvc.Status.Phase != v1.ClaimBound { + return + } + // Get pv object attached to pvc + pv, err := metadataSyncer.pvLister.Get(pvc.Spec.VolumeName) + if pv == nil || err != nil { + klog.Errorf("PVCDeleted: Error getting Persistent Volume for pvc %s in namespace %s with err: %v", pvc.Name, pvc.Namespace, err) + return + } + + // Verify if pv is a vsphere csi volume + if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != service.Name { + klog.V(3).Infof("PVCDeleted: Not a Vsphere CSI Volume") + return + } + + // Volume will be deleted by controller when reclaim policy is delete + if pv.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete { + klog.V(3).Infof("PVCDeleted: Reclaim policy is delete") + return + } + + // If the PV reclaim policy is retain we need to delete PVC labels + var metadataList []cnstypes.BaseCnsEntityMetadata + pvcMetadata := cnsvsphere.GetCnsKubernetesEntityMetaData(pvc.Name, nil, true, string(cnstypes.CnsKubernetesEntityTypePVC), pvc.Namespace) + metadataList = append(metadataList, cnstypes.BaseCnsEntityMetadata(pvcMetadata)) + + updateSpec := &cnstypes.CnsVolumeMetadataUpdateSpec{ + VolumeId: cnstypes.CnsVolumeId{ + Id: pv.Spec.CSI.VolumeHandle, + }, + Metadata: cnstypes.CnsVolumeMetadata{ + ContainerCluster: cnsvsphere.GetContainerCluster(metadataSyncer.cfg.Global.ClusterID, metadataSyncer.cfg.VirtualCenter[metadataSyncer.vcenter.Config.Host].User), + EntityMetadata: metadataList, + }, + } + + klog.V(4).Infof("PVCDeleted: Calling UpdateVolumeMetadata for volume %s with updateSpec: %+v", updateSpec.VolumeId.Id, spew.Sdump(updateSpec)) + if err := volumes.GetManager(metadataSyncer.vcenter).UpdateVolumeMetadata(updateSpec); err != nil { + klog.Errorf("PVCDeleted: UpdateVolumeMetadata failed with err %v", err) + } +} + +// pvUpdated updates volume metadata on VC when volume labels on K8S cluster have been updated +func pvUpdated(oldObj, newObj interface{}, metadataSyncer *MetadataSyncInformer) { + // Get old and new PV objects + oldPv, ok := oldObj.(*v1.PersistentVolume) + if oldPv == nil || !ok { + klog.Warningf("PVUpdated: unrecognized old object %+v", oldObj) + return + } + + newPv, ok := newObj.(*v1.PersistentVolume) + if newPv == nil || !ok { + klog.Warningf("PVUpdated: unrecognized new object %+v", newObj) + return + } + klog.V(4).Infof("PVUpdated: PV Updated from %+v to %+v", oldPv, newPv) + + // Verify if pv is a vsphere csi volume + if oldPv.Spec.CSI == nil || newPv.Spec.CSI == nil || newPv.Spec.CSI.Driver != service.Name { + klog.V(3).Infof("PVUpdated: PV is not a Vsphere CSI Volume: %+v", newPv) + return + } + // Return if new PV status is Pending or Failed + if newPv.Status.Phase == v1.VolumePending || newPv.Status.Phase == v1.VolumeFailed { + klog.V(3).Infof("PVUpdated: PV %s metadata is not updated since updated PV is in phase %s", newPv.Name, newPv.Status.Phase) + return + } + // Return if labels are unchanged + if oldPv.Status.Phase == v1.VolumeAvailable && reflect.DeepEqual(newPv.GetLabels(), oldPv.GetLabels()) { + klog.V(3).Infof("PVUpdated: PV labels have not changed") + return + } + if oldPv.Status.Phase == v1.VolumeBound && newPv.Status.Phase == v1.VolumeReleased && oldPv.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete { + klog.V(3).Infof("PVUpdated: Volume will be deleted by controller") + return + } + if newPv.DeletionTimestamp != nil { + klog.V(3).Infof("PVUpdated: PV already deleted") + return + } + + var metadataList []cnstypes.BaseCnsEntityMetadata + pvMetadata := cnsvsphere.GetCnsKubernetesEntityMetaData(newPv.Name, newPv.GetLabels(), false, string(cnstypes.CnsKubernetesEntityTypePV), newPv.Namespace) + metadataList = append(metadataList, cnstypes.BaseCnsEntityMetadata(pvMetadata)) + + if oldPv.Status.Phase == v1.VolumeAvailable || newPv.Spec.StorageClassName != "" { + updateSpec := &cnstypes.CnsVolumeMetadataUpdateSpec{ + VolumeId: cnstypes.CnsVolumeId{ + Id: newPv.Spec.CSI.VolumeHandle, + }, + Metadata: cnstypes.CnsVolumeMetadata{ + ContainerCluster: cnsvsphere.GetContainerCluster(metadataSyncer.cfg.Global.ClusterID, metadataSyncer.cfg.VirtualCenter[metadataSyncer.vcenter.Config.Host].User), + EntityMetadata: metadataList, + }, + } + + klog.V(4).Infof("PVUpdated: Calling UpdateVolumeMetadata for volume %s with updateSpec: %+v", updateSpec.VolumeId.Id, spew.Sdump(updateSpec)) + if err := volumes.GetManager(metadataSyncer.vcenter).UpdateVolumeMetadata(updateSpec); err != nil { + klog.Errorf("PVUpdated: UpdateVolumeMetadata failed with err %v", err) + } + } else { + createSpec := &cnstypes.CnsVolumeCreateSpec{ + Name: oldPv.Name, + VolumeType: common.BlockVolumeType, + Metadata: cnstypes.CnsVolumeMetadata{ + ContainerCluster: cnsvsphere.GetContainerCluster(metadataSyncer.cfg.Global.ClusterID, metadataSyncer.cfg.VirtualCenter[metadataSyncer.vcenter.Config.Host].User), + EntityMetadata: metadataList, + }, + BackingObjectDetails: &cnstypes.CnsBlockBackingDetails{ + CnsBackingObjectDetails: cnstypes.CnsBackingObjectDetails{}, + BackingDiskId: oldPv.Spec.CSI.VolumeHandle, + }, + } + volumeOperationsLock.Lock() + defer volumeOperationsLock.Unlock() + klog.V(4).Infof("PVUpdated: vSphere provisioner creating volume %s with create spec %+v", oldPv.Name, spew.Sdump(createSpec)) + _, err := volumes.GetManager(metadataSyncer.vcenter).CreateVolume(createSpec) + + if err != nil { + klog.Errorf("PVUpdated: Failed to create disk %s with error %+v", oldPv.Name, err) + } + } +} + +// pvDeleted deletes volume metadata on VC when volume has been deleted on K8s cluster +func pvDeleted(obj interface{}, metadataSyncer *MetadataSyncInformer) { + pv, ok := obj.(*v1.PersistentVolume) + if pv == nil || !ok { + klog.Warningf("PVDeleted: unrecognized object %+v", obj) + return + } + klog.V(4).Infof("PVDeleted: Deleting PV: %+v", pv) + + // Verify if pv is a vsphere csi volume + if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != service.Name { + klog.V(3).Infof("PVDeleted: Not a Vsphere CSI Volume: %+v", pv) + return + } + var deleteDisk bool + if pv.Spec.ClaimRef != nil && (pv.Status.Phase == v1.VolumeAvailable || pv.Status.Phase == v1.VolumeReleased) && pv.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete { + klog.V(3).Infof("PVDeleted: Volume deletion will be handled by Controller") + return + } + + if pv.Spec.ClaimRef == nil || (pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete) { + klog.V(4).Infof("PVDeleted: Setting DeleteDisk to false") + deleteDisk = false + } else { + // We set delete disk=true for the case where PV status is failed after deletion of pvc + // In this case, metadatasyncer will remove the volume + klog.V(4).Infof("PVDeleted: Setting DeleteDisk to true") + deleteDisk = true + } + volumeOperationsLock.Lock() + defer volumeOperationsLock.Unlock() + klog.V(4).Infof("PVDeleted: vSphere provisioner deleting volume %v with delete disk %v", pv, deleteDisk) + if err := volumes.GetManager(metadataSyncer.vcenter).DeleteVolume(pv.Spec.CSI.VolumeHandle, deleteDisk); err != nil { + klog.Errorf("PVDeleted: Failed to delete disk %s with error %+v", pv.Spec.CSI.VolumeHandle, err) + return + } +} + +// podUpdated updates pod metadata on VC when pod labels have been updated on K8s cluster +func podUpdated(oldObj, newObj interface{}, metadataSyncer *MetadataSyncInformer) { + // Get old and new pod objects + oldPod, ok := oldObj.(*v1.Pod) + if oldPod == nil || !ok { + klog.Warningf("PodUpdated: unrecognized old object %+v", oldObj) + return + } + newPod, ok := newObj.(*v1.Pod) + if newPod == nil || !ok { + klog.Warningf("PodUpdated: unrecognized new object %+v", newObj) + return + } + + // If old pod is in pending state and new pod is running, update metadata + if oldPod.Status.Phase == v1.PodPending && newPod.Status.Phase == v1.PodRunning { + + klog.V(3).Infof("PodUpdated: Pod %s calling updatePodMetadata", newPod.Name) + // Update pod metadata + if errorList := updatePodMetadata(newPod, metadataSyncer, false); len(errorList) > 0 { + klog.Errorf("PodUpdated: updatePodMetadata failed for pod %s with errors: ", newPod.Name) + for _, err := range errorList { + klog.Errorf("PodUpdated: %v", err) + } + } + } +} + +// pvDeleted deletes pod metadata on VC when pod has been deleted on K8s cluster +func podDeleted(obj interface{}, metadataSyncer *MetadataSyncInformer) { + // Get pod object + pod, ok := obj.(*v1.Pod) + if pod == nil || !ok { + klog.Warningf("PodDeleted: unrecognized new object %+v", obj) + return + } + + if pod.Status.Phase == v1.PodPending { + return + } + + klog.V(3).Infof("PodDeleted: Pod %s calling updatePodMetadata", pod.Name) + // Update pod metadata + if errorList := updatePodMetadata(pod, metadataSyncer, true); len(errorList) > 0 { + klog.Errorf("PodDeleted: updatePodMetadata failed for pod %s with errors: ", pod.Name) + for _, err := range errorList { + klog.Errorf("PodDeleted: %v", err) + } + + } +} + +// updatePodMetadata updates metadata for volumes attached to the pod +func updatePodMetadata(pod *v1.Pod, metadataSyncer *MetadataSyncInformer, deleteFlag bool) []error { + var errorList []error + // Iterate through volumes attached to pod + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + pvcName := volume.PersistentVolumeClaim.ClaimName + // Get pvc attached to pod + pvc, err := metadataSyncer.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil { + msg := fmt.Sprintf("Error getting Persistent Volume Claim for volume %s with err: %v", volume.Name, err) + errorList = append(errorList, errors.New(msg)) + continue + } + + // Get pv object attached to pvc + pv, err := metadataSyncer.pvLister.Get(pvc.Spec.VolumeName) + if err != nil { + msg := fmt.Sprintf("Error getting Persistent Volume for PVC %s in volume %s with err: %v", pvc.Name, volume.Name, err) + errorList = append(errorList, errors.New(msg)) + continue + } + + // Verify if pv is vsphere csi volume + if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != service.Name { + klog.V(3).Infof("Not a Vsphere CSI Volume") + continue + } + var metadataList []cnstypes.BaseCnsEntityMetadata + podMetadata := cnsvsphere.GetCnsKubernetesEntityMetaData(pod.Name, nil, deleteFlag, string(cnstypes.CnsKubernetesEntityTypePOD), pod.Namespace) + metadataList = append(metadataList, cnstypes.BaseCnsEntityMetadata(podMetadata)) + updateSpec := &cnstypes.CnsVolumeMetadataUpdateSpec{ + VolumeId: cnstypes.CnsVolumeId{ + Id: pv.Spec.CSI.VolumeHandle, + }, + Metadata: cnstypes.CnsVolumeMetadata{ + ContainerCluster: cnsvsphere.GetContainerCluster(metadataSyncer.cfg.Global.ClusterID, metadataSyncer.cfg.VirtualCenter[metadataSyncer.vcenter.Config.Host].User), + EntityMetadata: metadataList, + }, + } + + klog.V(4).Infof("Calling UpdateVolumeMetadata for volume %s with updateSpec: %+v", updateSpec.VolumeId.Id, spew.Sdump(updateSpec)) + if err := volumes.GetManager(metadataSyncer.vcenter).UpdateVolumeMetadata(updateSpec); err != nil { + msg := fmt.Sprintf("UpdateVolumeMetadata failed for volume %s with err: %v", volume.Name, err) + errorList = append(errorList, errors.New(msg)) + } + } + } + return errorList +} diff --git a/pkg/syncer/types.go b/pkg/syncer/types.go new file mode 100644 index 0000000000..bd42721828 --- /dev/null +++ b/pkg/syncer/types.go @@ -0,0 +1,45 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncer + +import ( + "sync" + + corelisters "k8s.io/client-go/listers/core/v1" + + cnsvsphere "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" + cnsconfig "sigs.k8s.io/vsphere-csi-driver/pkg/common/config" + k8s "sigs.k8s.io/vsphere-csi-driver/pkg/kubernetes" +) + +var ( + // Metadata syncer and full sync share a global lock + // to mitigate race conditions related to + // static provisioning of volumes + volumeOperationsLock sync.Mutex +) + +// MetadataSyncInformer is the struct for metadata sync informer +type MetadataSyncInformer struct { + cfg *cnsconfig.Config + vcconfig *cnsvsphere.VirtualCenterConfig + k8sInformerManager *k8s.InformerManager + virtualcentermanager cnsvsphere.VirtualCenterManager + vcenter *cnsvsphere.VirtualCenter + pvLister corelisters.PersistentVolumeLister + pvcLister corelisters.PersistentVolumeClaimLister +}