diff --git a/Dockerfile b/Dockerfile index eaa5d09f2..f2802dd90 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,7 @@ RUN go mod download COPY main.go main.go COPY api/ api/ COPY controllers/ controllers/ +COPY internal/ internal/ # build RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o source-controller main.go diff --git a/api/v1alpha1/condition_types.go b/api/v1alpha1/condition_types.go index ad4fad6d8..7c5136dc7 100644 --- a/api/v1alpha1/condition_types.go +++ b/api/v1alpha1/condition_types.go @@ -5,22 +5,24 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// RepositoryCondition contains condition information for a repository -type RepositoryCondition struct { +// SourceCondition contains condition information for a source +type SourceCondition struct { // Type of the condition, currently ('Ready'). - Type RepositoryConditionType `json:"type"` + // +required + Type string `json:"type"` // Status of the condition, one of ('True', 'False', 'Unknown'). + // +required Status corev1.ConditionStatus `json:"status"` // LastTransitionTime is the timestamp corresponding to the last status // change of this condition. - // +optional - LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"` + // +required + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` // Reason is a brief machine readable explanation for the condition's last // transition. - // +optional + // +required Reason string `json:"reason,omitempty"` // Message is a human readable description of the details of the last @@ -29,11 +31,16 @@ type RepositoryCondition struct { Message string `json:"message,omitempty"` } -// RepositoryConditionType represents an repository condition value -type RepositoryConditionType string - const ( - // RepositoryConditionReady represents the fact that a given repository condition - // is in ready state. - RepositoryConditionReady RepositoryConditionType = "Ready" + // ReadyCondition represents the fact that a given source is in ready state. + ReadyCondition string = "Ready" + + // InitializingReason represents the fact that a given source is being initialize. + InitializingReason string = "Initializing" + + // StorageOperationFailedReason signals a failure caused by a storage operation. + StorageOperationFailedReason string = "StorageOperationFailed" + + // URLInvalidReason represents the fact that a given source has an invalid URL. + URLInvalidReason string = "URLInvalid" ) diff --git a/api/v1alpha1/gitrepository_types.go b/api/v1alpha1/gitrepository_types.go index 714020841..8d19af637 100644 --- a/api/v1alpha1/gitrepository_types.go +++ b/api/v1alpha1/gitrepository_types.go @@ -22,12 +22,13 @@ import ( // GitRepositorySpec defines the desired state of GitRepository type GitRepositorySpec struct { - // +kubebuilder:validation:Pattern="^(http|https|ssh)://" - // The repository URL, can be a HTTP or SSH address. - Url string `json:"url"` + // +kubebuilder:validation:Pattern="^(http|https|ssh)://" + // +required + URL string `json:"url"` // The interval at which to check for repository updates. + // +required Interval metav1.Duration `json:"interval"` // The git branch to checkout, defaults to ('master'). @@ -46,16 +47,16 @@ type GitRepositorySpec struct { // GitRepositoryStatus defines the observed state of GitRepository type GitRepositoryStatus struct { // +optional - Conditions []RepositoryCondition `json:"conditions,omitempty"` + Conditions []SourceCondition `json:"conditions,omitempty"` // LastUpdateTime is the timestamp corresponding to the last status // change of this repository. // +optional LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` - // Path to the artifacts of the last repository sync. + // Path to the artifact output of the last repository sync. // +optional - Artifacts string `json:"artifacts,omitempty"` + Artifact string `json:"artifacts,omitempty"` } // +kubebuilder:object:root=true @@ -74,9 +75,8 @@ type GitRepository struct { Status GitRepositoryStatus `json:"status,omitempty"` } -// +kubebuilder:object:root=true - // GitRepositoryList contains a list of GitRepository +// +kubebuilder:object:root=true type GitRepositoryList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` @@ -86,3 +86,8 @@ type GitRepositoryList struct { func init() { SchemeBuilder.Register(&GitRepository{}, &GitRepositoryList{}) } + +const ( + GitOperationSucceedReason string = "GitOperationSucceed" + GitOperationFailedReason string = "GitOperationFailed" +) diff --git a/api/v1alpha1/helmrepository_types.go b/api/v1alpha1/helmrepository_types.go index 40dd809b9..7b85081a6 100644 --- a/api/v1alpha1/helmrepository_types.go +++ b/api/v1alpha1/helmrepository_types.go @@ -24,16 +24,18 @@ import ( type HelmRepositorySpec struct { // The repository address // +kubebuilder:validation:MinLength=4 + // +required URL string `json:"url"` // The interval at which to check for repository updates + // +required Interval metav1.Duration `json:"interval"` } // HelmRepositoryStatus defines the observed state of HelmRepository type HelmRepositoryStatus struct { // +optional - Conditions []RepositoryCondition `json:"conditions,omitempty"` + Conditions []SourceCondition `json:"conditions,omitempty"` // LastUpdateTime is the timestamp corresponding to the last status // change of this repository. @@ -61,9 +63,8 @@ type HelmRepository struct { Status HelmRepositoryStatus `json:"status,omitempty"` } -// +kubebuilder:object:root=true - // HelmRepositoryList contains a list of HelmRepository +// +kubebuilder:object:root=true type HelmRepositoryList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` @@ -75,7 +76,11 @@ func init() { } const ( - InvalidHelmRepositoryURLReason string = "InvalidHelmRepositoryURL" - IndexFetchFailedReason string = "IndexFetchFailedReason" - IndexFetchSucceededReason string = "IndexFetchSucceed" + // IndexationFailedReason represents the fact that the indexation + // of the given Helm repository failed. + IndexationFailedReason string = "IndexationFailed" + + // IndexationSucceededReason represents the fact that the indexation + // of the given Helm repository succeeded. + IndexationSucceededReason string = "IndexationSucceed" ) diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 2c73a750c..900570a46 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -104,7 +104,7 @@ func (in *GitRepositoryStatus) DeepCopyInto(out *GitRepositoryStatus) { *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]RepositoryCondition, len(*in)) + *out = make([]SourceCondition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -205,7 +205,7 @@ func (in *HelmRepositoryStatus) DeepCopyInto(out *HelmRepositoryStatus) { *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]RepositoryCondition, len(*in)) + *out = make([]SourceCondition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -227,20 +227,17 @@ func (in *HelmRepositoryStatus) DeepCopy() *HelmRepositoryStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *RepositoryCondition) DeepCopyInto(out *RepositoryCondition) { +func (in *SourceCondition) DeepCopyInto(out *SourceCondition) { *out = *in - if in.LastTransitionTime != nil { - in, out := &in.LastTransitionTime, &out.LastTransitionTime - *out = (*in).DeepCopy() - } + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RepositoryCondition. -func (in *RepositoryCondition) DeepCopy() *RepositoryCondition { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SourceCondition. +func (in *SourceCondition) DeepCopy() *SourceCondition { if in == nil { return nil } - out := new(RepositoryCondition) + out := new(SourceCondition) in.DeepCopyInto(out) return out } diff --git a/config/crd/bases/source.fluxcd.io_gitrepositories.yaml b/config/crd/bases/source.fluxcd.io_gitrepositories.yaml index bef14846b..357d70ab0 100644 --- a/config/crd/bases/source.fluxcd.io_gitrepositories.yaml +++ b/config/crd/bases/source.fluxcd.io_gitrepositories.yaml @@ -73,12 +73,12 @@ spec: description: GitRepositoryStatus defines the observed state of GitRepository properties: artifacts: - description: Path to the artifacts of the last repository sync. + description: Path to the artifact output of the last repository sync. type: string conditions: items: - description: RepositoryCondition contains condition information for - a repository + description: SourceCondition contains condition information for a + source properties: lastTransitionTime: description: LastTransitionTime is the timestamp corresponding diff --git a/config/crd/bases/source.fluxcd.io_helmrepositories.yaml b/config/crd/bases/source.fluxcd.io_helmrepositories.yaml index 1eadc9a45..17854da90 100644 --- a/config/crd/bases/source.fluxcd.io_helmrepositories.yaml +++ b/config/crd/bases/source.fluxcd.io_helmrepositories.yaml @@ -64,13 +64,12 @@ spec: description: HelmRepositoryStatus defines the observed state of HelmRepository properties: artifact: - description: Path to the artifact (index file) of the last repository - sync. + description: Path to the artifact of the last repository index. type: string conditions: items: - description: RepositoryCondition contains condition information for - a repository + description: SourceCondition contains condition information for a + source properties: lastTransitionTime: description: LastTransitionTime is the timestamp corresponding diff --git a/controllers/conditions.go b/controllers/conditions.go new file mode 100644 index 000000000..f151ae67c --- /dev/null +++ b/controllers/conditions.go @@ -0,0 +1,44 @@ +/* +Copyright 2020 The Flux CD contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1" +) + +func ReadyCondition(reason, message string) sourcev1.SourceCondition { + return sourcev1.SourceCondition{ + Type: sourcev1.ReadyCondition, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, + } +} + +func NotReadyCondition(reason, message string) sourcev1.SourceCondition { + return sourcev1.SourceCondition{ + Type: sourcev1.ReadyCondition, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, + } +} diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index 92f254852..2d82c1a1f 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -21,8 +21,6 @@ import ( "fmt" "io/ioutil" "os" - "os/exec" - "path/filepath" "strings" "time" @@ -44,9 +42,10 @@ import ( // GitRepositoryReconciler reconciles a GitRepository object type GitRepositoryReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - StoragePath string + Log logr.Logger + Scheme *runtime.Scheme + Storage *Storage + Kind string } // +kubebuilder:rbac:groups=source.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete @@ -56,7 +55,7 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - log := r.Log.WithValues("gitrepository", req.NamespacedName) + log := r.Log.WithValues(r.Kind, req.NamespacedName) var repo sourcev1.GitRepository if err := r.Get(ctx, req.NamespacedName, &repo); err != nil { @@ -66,40 +65,35 @@ func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro result := ctrl.Result{RequeueAfter: repo.Spec.Interval.Duration} // set initial status - if r.shouldResetStatus(repo) { - log.Info("Initialising repository") - repo.Status.Artifacts = "" - repo.Status.LastUpdateTime = nil - repo.Status.Conditions = []sourcev1.RepositoryCondition{ - { - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionUnknown, - }, - } + if reset, status := r.shouldResetStatus(repo); reset { + log.Info("Initializing repository") + repo.Status = status if err := r.Status().Update(ctx, &repo); err != nil { log.Error(err, "unable to update GitRepository status") return result, err } } + // try to remove old artifacts + r.gc(repo) + // try git clone readyCondition, artifacts, err := r.sync(repo) if err != nil { log.Info("Repository sync failed", "error", err.Error()) } else { // update artifacts if commit hash changed - if repo.Status.Artifacts != artifacts { + if repo.Status.Artifact != artifacts { timeNew := metav1.Now() repo.Status.LastUpdateTime = &timeNew - repo.Status.Artifacts = artifacts + repo.Status.Artifact = artifacts } log.Info("Repository sync succeeded", "msg", readyCondition.Message) } // update status - timeNew := metav1.Now() - readyCondition.LastTransitionTime = &timeNew - repo.Status.Conditions = []sourcev1.RepositoryCondition{readyCondition} + readyCondition.LastTransitionTime = metav1.Now() + repo.Status.Conditions = []sourcev1.SourceCondition{readyCondition} if err := r.Status().Update(ctx, &repo); err != nil { log.Error(err, "unable to update GitRepository status") @@ -117,14 +111,13 @@ func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error { WithEventFilter(predicate.Funcs{ DeleteFunc: func(e event.DeleteEvent) bool { // delete artifacts - repoDir := filepath.Join(r.StoragePath, - fmt.Sprintf("repositories/%s-%s", e.Meta.GetName(), e.Meta.GetNamespace())) - if err := os.RemoveAll(repoDir); err != nil { + artifact := r.Storage.ArtifactFor(r.Kind, e.Meta, "dummy") + if err := r.Storage.RemoveAll(artifact); err != nil { r.Log.Error(err, "unable to delete artifacts", - "gitrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) + r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) } else { r.Log.Info("Repository artifacts deleted", - "gitrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) + r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) } return false }, @@ -132,69 +125,49 @@ func (r *GitRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.RepositoryCondition, string, error) { +func (r *GitRepositoryReconciler) sync(repository sourcev1.GitRepository) (sourcev1.SourceCondition, string, error) { // determine ref refName := plumbing.NewBranchReferenceName("master") - if gr.Spec.Branch != "" { - refName = plumbing.NewBranchReferenceName(gr.Spec.Branch) + if repository.Spec.Branch != "" { + refName = plumbing.NewBranchReferenceName(repository.Spec.Branch) } - if gr.Spec.Tag != "" { - refName = plumbing.NewTagReferenceName(gr.Spec.Tag) + if repository.Spec.Tag != "" { + refName = plumbing.NewTagReferenceName(repository.Spec.Tag) } // create tmp dir - dir, err := ioutil.TempDir("", gr.Name) + dir, err := ioutil.TempDir("", repository.Name) if err != nil { - ex := fmt.Errorf("tmp dir error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "ExecFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("tmp dir error %w", err) + return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err } defer os.RemoveAll(dir) // clone to tmp repo, err := git.PlainClone(dir, false, &git.CloneOptions{ - URL: gr.Spec.Url, + URL: repository.Spec.URL, Depth: 2, ReferenceName: refName, SingleBranch: true, Tags: git.AllTags, }) if err != nil { - ex := fmt.Errorf("git clone error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "GitCloneFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("git clone error %w", err) + return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err } // checkout tag based on semver expression - if gr.Spec.SemVer != "" { - rng, err := semver.ParseRange(gr.Spec.SemVer) + if repository.Spec.SemVer != "" { + rng, err := semver.ParseRange(repository.Spec.SemVer) if err != nil { - ex := fmt.Errorf("semver parse range error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "GitCloneFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("semver parse range error %w", err) + return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err } repoTags, err := repo.Tags() if err != nil { - ex := fmt.Errorf("git list tags error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "GitCloneFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("git list tags error %w", err) + return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err } tags := make(map[string]string) @@ -221,112 +194,92 @@ func (r *GitRepositoryReconciler) sync(gr sourcev1.GitRepository) (sourcev1.Repo w, err := repo.Worktree() if err != nil { - ex := fmt.Errorf("git worktree error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "GitCheckoutFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("git worktree error %w", err) + return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err } err = w.Checkout(&git.CheckoutOptions{ Hash: plumbing.NewHash(commit), }) if err != nil { - ex := fmt.Errorf("git checkout error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "GitCheckoutFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("git checkout error %w", err) + return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err } } else { - ex := fmt.Errorf("no match found for semver %s", gr.Spec.SemVer) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "GitCheckoutFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("no match found for semver %s", repository.Spec.SemVer) + return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err } } // read commit hash ref, err := repo.Head() if err != nil { - ex := fmt.Errorf("git resolve HEAD error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "GitHeadFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("git resolve HEAD error %w", err) + return NotReadyCondition(sourcev1.GitOperationFailedReason, err.Error()), "", err } - // create artifacts dir - repoDir := fmt.Sprintf("repositories/%s-%s", gr.Name, gr.Namespace) - storage := filepath.Join(r.StoragePath, repoDir) - err = os.MkdirAll(storage, 0777) + artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), + fmt.Sprintf("%s.tar.gz", ref.Hash().String())) + + // create artifact dir + err = r.Storage.MkdirAll(artifact) if err != nil { - ex := fmt.Errorf("mkdir dir error %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "ExecFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("mkdir dir error %w", err) + return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err } - // store artifacts - artifacts := filepath.Join(storage, fmt.Sprintf("%s.tar.gz", ref.Hash().String())) - excludes := "--exclude=\\*.{jpg,jpeg,gif,png,wmv,flv,tar.gz,zip} --exclude .git" - command := exec.Command("/bin/sh", "-c", - fmt.Sprintf("cd %s && tar -c %s -f - . | gzip > %s", dir, excludes, artifacts)) - err = command.Run() + // acquire lock + unlock, err := r.Storage.Lock(artifact) if err != nil { - ex := fmt.Errorf("tar %s error %w", artifacts, err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: "ExecFailed", - Message: ex.Error(), - }, "", ex + err = fmt.Errorf("unable to acquire lock: %w", err) + return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err } + defer unlock() - // compose artifacts URL - hostname := "localhost" - if os.Getenv("RUNTIME_NAMESPACE") != "" { - svcParts := strings.Split(os.Getenv("HOSTNAME"), "-") - hostname = fmt.Sprintf("%s.%s", - strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE")) + // archive artifact + err = r.Storage.Archive(artifact, dir, "") + if err != nil { + err = fmt.Errorf("storage error %w", err) + return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err } - artifactsURL := fmt.Sprintf("http://%s/repositories/%s-%s/%s.tar.gz", - hostname, gr.Name, gr.Namespace, ref.Hash().String()) - - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionTrue, - Reason: "GitCloneSucceed", - Message: fmt.Sprintf("Fetched artifacts are available at %s", artifacts), - }, artifactsURL, nil + + message := fmt.Sprintf("Artifact is available at %s", artifact.Path) + return ReadyCondition(sourcev1.GitOperationSucceedReason, message), artifact.URL, nil } -func (r *GitRepositoryReconciler) shouldResetStatus(gr sourcev1.GitRepository) bool { +func (r *GitRepositoryReconciler) shouldResetStatus(repository sourcev1.GitRepository) (bool, sourcev1.GitRepositoryStatus) { resetStatus := false - if gr.Status.Artifacts != "" { - pathParts := strings.Split(gr.Status.Artifacts, "/") - path := fmt.Sprintf("repositories/%s-%s/%s", gr.Name, gr.Namespace, pathParts[len(pathParts)-1]) - if _, err := os.Stat(filepath.Join(r.StoragePath, path)); err != nil { + if repository.Status.Artifact != "" { + parts := strings.Split(repository.Status.Artifact, "/") + artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1]) + if !r.Storage.ArtifactExist(artifact) { resetStatus = true } } // set initial status - if len(gr.Status.Conditions) == 0 || resetStatus { + if len(repository.Status.Conditions) == 0 || resetStatus { resetStatus = true } - return resetStatus + return resetStatus, sourcev1.GitRepositoryStatus{ + Conditions: []sourcev1.SourceCondition{ + { + Type: sourcev1.ReadyCondition, + Status: corev1.ConditionUnknown, + Reason: sourcev1.InitializingReason, + LastTransitionTime: metav1.Now(), + }, + }, + } +} + +func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) { + if repository.Status.Artifact != "" { + parts := strings.Split(repository.Status.Artifact, "/") + artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1]) + if err := r.Storage.RemoveAllButCurrent(artifact); err != nil { + r.Log.Info("Artifacts GC failed", "error", err) + } + } } diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index 5dc6db816..57965f76c 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -18,13 +18,10 @@ package controllers import ( "context" - "crypto/sha1" "fmt" "io/ioutil" "net/url" - "os" "path" - "path/filepath" "strings" "time" @@ -46,10 +43,11 @@ import ( // HelmRepositoryReconciler reconciles a HelmRepository object type HelmRepositoryReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - StoragePath string - Getters getter.Providers + Log logr.Logger + Scheme *runtime.Scheme + Storage *Storage + Kind string + Getters getter.Providers } // +kubebuilder:rbac:groups=source.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete @@ -69,21 +67,18 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err result := ctrl.Result{RequeueAfter: repository.Spec.Interval.Duration} // set initial status - if r.shouldResetStatus(repository) { - log.Info("Initialising repository") - repository.Status = sourcev1.HelmRepositoryStatus{} - repository.Status.Conditions = []sourcev1.RepositoryCondition{ - { - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionUnknown, - }, - } + if reset, status := r.shouldResetStatus(repository); reset { + log.Info("Initializing repository") + repository.Status = status if err := r.Status().Update(ctx, &repository); err != nil { log.Error(err, "unable to update HelmRepository status") return result, err } } + // try to remove old artifacts + r.gc(repository) + // try to download index readyCondition, artifact, err := r.index(repository) if err != nil { @@ -99,9 +94,8 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err } // update status - timeNew := metav1.Now() - readyCondition.LastTransitionTime = &timeNew - repository.Status.Conditions = []sourcev1.RepositoryCondition{readyCondition} + readyCondition.LastTransitionTime = metav1.Now() + repository.Status.Conditions = []sourcev1.SourceCondition{readyCondition} if err := r.Status().Update(ctx, &repository); err != nil { log.Error(err, "unable to update HelmRepository status") @@ -119,14 +113,13 @@ func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error { WithEventFilter(predicate.Funcs{ DeleteFunc: func(e event.DeleteEvent) bool { // delete artifacts - repoDir := filepath.Join(r.StoragePath, - fmt.Sprintf("helmrepositories/%s-%s", e.Meta.GetName(), e.Meta.GetNamespace())) - if err := os.RemoveAll(repoDir); err != nil { + artifact := r.Storage.ArtifactFor(r.Kind, e.Meta, "dummy") + if err := r.Storage.RemoveAll(artifact); err != nil { r.Log.Error(err, "unable to delete artifacts", - "helmrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) + r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) } else { - r.Log.Info("Helm repository artifacts deleted", - "helmrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) + r.Log.Info("Repository artifacts deleted", + r.Kind, fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName())) } return false }, @@ -134,25 +127,15 @@ func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *HelmRepositoryReconciler) index(repository sourcev1.HelmRepository) (sourcev1.RepositoryCondition, string, error) { +func (r *HelmRepositoryReconciler) index(repository sourcev1.HelmRepository) (sourcev1.SourceCondition, string, error) { u, err := url.Parse(repository.Spec.URL) if err != nil { - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.InvalidHelmRepositoryURLReason, - Message: err.Error(), - }, "", err + return NotReadyCondition(sourcev1.URLInvalidReason, err.Error()), "", err } c, err := r.Getters.ByScheme(u.Scheme) if err != nil { - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.InvalidHelmRepositoryURLReason, - Message: err.Error(), - }, "", err + return NotReadyCondition(sourcev1.URLInvalidReason, err.Error()), "", err } u.RawPath = path.Join(u.RawPath, "index.yaml") @@ -162,96 +145,60 @@ func (r *HelmRepositoryReconciler) index(repository sourcev1.HelmRepository) (so // TODO(hidde): add authentication config res, err := c.Get(indexURL, getter.WithURL(repository.Spec.URL)) if err != nil { - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.IndexFetchFailedReason, - Message: err.Error(), - }, "", err + return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err } - index, err := ioutil.ReadAll(res) + data, err := ioutil.ReadAll(res) if err != nil { - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.IndexFetchFailedReason, - Message: err.Error(), - }, "", err + return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err } i := &repo.IndexFile{} - if err := yaml.Unmarshal(index, i); err != nil { - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.IndexFetchFailedReason, - Message: err.Error(), - }, "", err + if err := yaml.Unmarshal(data, i); err != nil { + return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err } - b, err := yaml.Marshal(i) + index, err := yaml.Marshal(i) if err != nil { - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.IndexFetchFailedReason, - Message: err.Error(), - }, "", err + return NotReadyCondition(sourcev1.IndexationFailedReason, err.Error()), "", err } - repoPath := fmt.Sprintf("helmrepositories/%s-%s", repository.Name, repository.Namespace) - storage := filepath.Join(r.StoragePath, repoPath) - sum := checksum(b) - indexFileName := fmt.Sprintf("index-%s.yaml", sum) - indexFilePath := filepath.Join(storage, indexFileName) - artifactsURL := fmt.Sprintf("http://%s/helmrepositories/%s/%s", host(), repoPath, indexFileName) - - if file, err := os.Stat(indexFilePath); !os.IsNotExist(err) && !file.IsDir() { - if fb, err := ioutil.ReadFile(indexFilePath); err == nil && sum == checksum(fb) { - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionTrue, - Reason: "GitCloneSucceed", - Message: fmt.Sprintf("Fetched artifact is available at %s", indexFilePath), - }, artifactsURL, nil - } - } + sum := r.Storage.Checksum(index) + artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), + fmt.Sprintf("index-%s.yaml", sum)) - err = os.MkdirAll(storage, 0755) + // create artifact dir + err = r.Storage.MkdirAll(artifact) if err != nil { err = fmt.Errorf("unable to create repository index directory: %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.IndexFetchFailedReason, - Message: err.Error(), - }, "", err + return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err + } + + // acquire lock + unlock, err := r.Storage.Lock(artifact) + if err != nil { + err = fmt.Errorf("unable to acquire lock: %w", err) + return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err } - err = ioutil.WriteFile(indexFilePath, index, 0644) + defer unlock() + + // save artifact to storage + err = r.Storage.WriteFile(artifact, index) if err != nil { err = fmt.Errorf("unable to write repository index file: %w", err) - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionFalse, - Reason: sourcev1.IndexFetchFailedReason, - Message: err.Error(), - }, "", err + return NotReadyCondition(sourcev1.StorageOperationFailedReason, err.Error()), "", err } - return sourcev1.RepositoryCondition{ - Type: sourcev1.RepositoryConditionReady, - Status: corev1.ConditionTrue, - Reason: sourcev1.IndexFetchSucceededReason, - Message: fmt.Sprintf("Fetched artifact is available at %s", indexFilePath), - }, artifactsURL, nil + + message := fmt.Sprintf("Artifact is available at %s", artifact.Path) + return ReadyCondition(sourcev1.IndexationSucceededReason, message), artifact.URL, nil } -func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRepository) bool { +func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRepository) (bool, sourcev1.HelmRepositoryStatus) { resetStatus := false if repository.Status.Artifact != "" { - pathParts := strings.Split(repository.Status.Artifact, "/") - path := fmt.Sprintf("helmrepositories/%s-%s/%s", repository.Name, repository.Namespace, pathParts[len(pathParts)-1]) - if _, err := os.Stat(filepath.Join(r.StoragePath, path)); err != nil { + parts := strings.Split(repository.Status.Artifact, "/") + artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1]) + if !r.Storage.ArtifactExist(artifact) { resetStatus = true } } @@ -261,20 +208,24 @@ func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRep resetStatus = true } - return resetStatus -} - -// Checksum returns the SHA1 checksum for the given bytes as a string. -func checksum(b []byte) string { - return fmt.Sprintf("%x", sha1.Sum(b)) + return resetStatus, sourcev1.HelmRepositoryStatus{ + Conditions: []sourcev1.SourceCondition{ + { + Type: sourcev1.ReadyCondition, + Status: corev1.ConditionUnknown, + Reason: sourcev1.InitializingReason, + LastTransitionTime: metav1.Now(), + }, + }, + } } -func host() string { - hostname := "localhost" - if os.Getenv("RUNTIME_NAMESPACE") != "" { - svcParts := strings.Split(os.Getenv("HOSTNAME"), "-") - hostname = fmt.Sprintf("%s.%s", - strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE")) +func (r *HelmRepositoryReconciler) gc(repository sourcev1.HelmRepository) { + if repository.Status.Artifact != "" { + parts := strings.Split(repository.Status.Artifact, "/") + artifact := r.Storage.ArtifactFor(r.Kind, repository.ObjectMeta.GetObjectMeta(), parts[len(parts)-1]) + if err := r.Storage.RemoveAllButCurrent(artifact); err != nil { + r.Log.Info("Artifacts GC failed", "error", err) + } } - return hostname } diff --git a/controllers/predicate.go b/controllers/predicate.go index 8ede12a9a..aee7e2afe 100644 --- a/controllers/predicate.go +++ b/controllers/predicate.go @@ -1,3 +1,19 @@ +/* +Copyright 2020 The Flux CD contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controllers import ( diff --git a/controllers/storage.go b/controllers/storage.go new file mode 100644 index 000000000..f52ea2496 --- /dev/null +++ b/controllers/storage.go @@ -0,0 +1,161 @@ +/* +Copyright 2020 The Flux CD contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "crypto/sha1" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/fluxcd/source-controller/internal/lockedfile" +) + +// Storage manages artifacts +type Storage struct { + // BasePath is the local directory path where the source artifacts are stored. + BasePath string `json:"basePath"` + + // Hostname is the file server host name used to compose the artifacts URIs. + Hostname string `json:"hostname"` + + // Timeout for artifacts operations + Timeout time.Duration `json:"timeout"` +} + +// Artifact represents the output of a source synchronisation +type Artifact struct { + // Path is the local file path of this artifact + Path string `json:"path"` + + // URL is the HTTP address of this artifact + URL string `json:"url"` +} + +// NewStorage creates the storage helper for a given path and hostname +func NewStorage(basePath string, hostname string, timeout time.Duration) (*Storage, error) { + if f, err := os.Stat(basePath); os.IsNotExist(err) || !f.IsDir() { + return nil, fmt.Errorf("invalid dir path %s", basePath) + } + + return &Storage{ + BasePath: basePath, + Hostname: hostname, + Timeout: timeout, + }, nil +} + +// ArtifactFor returns an artifact for the given Kubernetes object +func (s *Storage) ArtifactFor(kind string, metadata metav1.Object, fileName string) Artifact { + path := fmt.Sprintf("%s/%s-%s/%s", kind, metadata.GetName(), metadata.GetNamespace(), fileName) + localPath := filepath.Join(s.BasePath, path) + url := fmt.Sprintf("http://%s/%s", s.Hostname, path) + + return Artifact{ + Path: localPath, + URL: url, + } +} + +// MkdirAll calls os.MkdirAll for the given artifact base dir +func (s *Storage) MkdirAll(artifact Artifact) error { + dir := filepath.Dir(artifact.Path) + return os.MkdirAll(dir, 0777) +} + +// RemoveAll calls os.RemoveAll for the given artifact base dir +func (s *Storage) RemoveAll(artifact Artifact) error { + dir := filepath.Dir(artifact.Path) + return os.RemoveAll(dir) +} + +// RemoveAllButCurrent removes all files for the given artifact base dir excluding the current one +func (s *Storage) RemoveAllButCurrent(artifact Artifact) error { + dir := filepath.Dir(artifact.Path) + errors := []string{} + _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if path != artifact.Path && !info.IsDir() { + if err := os.Remove(path); err != nil { + errors = append(errors, info.Name()) + } + } + return nil + }) + + if len(errors) > 0 { + return fmt.Errorf("faild to remove files: %s", strings.Join(errors, " ")) + } + return nil +} + +// ArtifactExist returns a boolean indicating whether the artifact file exists in storage +func (s *Storage) ArtifactExist(artifact Artifact) bool { + if _, err := os.Stat(artifact.Path); os.IsNotExist(err) { + return false + } + return true +} + +// Archive creates a tar.gz to the artifact path from the given dir excluding the provided file extensions +func (s *Storage) Archive(artifact Artifact, dir string, excludes string) error { + if excludes == "" { + excludes = "jpg,jpeg,gif,png,wmv,flv,tar.gz,zip" + } + + ctx, cancel := context.WithTimeout(context.Background(), s.Timeout) + defer cancel() + + tarExcludes := fmt.Sprintf("--exclude=\\*.{%s} --exclude .git", excludes) + cmd := fmt.Sprintf("cd %s && tar -c %s -f - . | gzip > %s", dir, tarExcludes, artifact.Path) + command := exec.CommandContext(ctx, "/bin/sh", "-c", cmd) + + err := command.Run() + if err != nil { + return fmt.Errorf("command '%s' failed: %w", cmd, err) + } + return nil +} + +// WriteFile writes the given bytes to the artifact path if the checksum differs +func (s *Storage) WriteFile(artifact Artifact, data []byte) error { + sum := s.Checksum(data) + if file, err := os.Stat(artifact.Path); !os.IsNotExist(err) && !file.IsDir() { + if fb, err := ioutil.ReadFile(artifact.Path); err == nil && sum == s.Checksum(fb) { + return nil + } + } + + return ioutil.WriteFile(artifact.Path, data, 0644) +} + +// Checksum returns the SHA1 checksum for the given bytes as a string +func (s *Storage) Checksum(b []byte) string { + return fmt.Sprintf("%x", sha1.Sum(b)) +} + +func (s *Storage) Lock(artifact Artifact) (unlock func(), err error) { + lockFile := artifact.Path + ".lock" + mutex := lockedfile.MutexAt(lockFile) + return mutex.Lock() +} diff --git a/internal/lockedfile/internal/filelock/filelock.go b/internal/lockedfile/internal/filelock/filelock.go new file mode 100755 index 000000000..aba3eed77 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock.go @@ -0,0 +1,98 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package filelock provides a platform-independent API for advisory file +// locking. Calls to functions in this package on platforms that do not support +// advisory locks will return errors for which IsNotSupported returns true. +package filelock + +import ( + "errors" + "os" +) + +// A File provides the minimal set of methods required to lock an open file. +// File implementations must be usable as map keys. +// The usual implementation is *os.File. +type File interface { + // Name returns the name of the file. + Name() string + + // Fd returns a valid file descriptor. + // (If the File is an *os.File, it must not be closed.) + Fd() uintptr + + // Stat returns the FileInfo structure describing file. + Stat() (os.FileInfo, error) +} + +// Lock places an advisory write lock on the file, blocking until it can be +// locked. +// +// If Lock returns nil, no other process will be able to place a read or write +// lock on the file until this process exits, closes f, or calls Unlock on it. +// +// If f's descriptor is already read- or write-locked, the behavior of Lock is +// unspecified. +// +// Closing the file may or may not release the lock promptly. Callers should +// ensure that Unlock is always called when Lock succeeds. +func Lock(f File) error { + return lock(f, writeLock) +} + +// RLock places an advisory read lock on the file, blocking until it can be locked. +// +// If RLock returns nil, no other process will be able to place a write lock on +// the file until this process exits, closes f, or calls Unlock on it. +// +// If f is already read- or write-locked, the behavior of RLock is unspecified. +// +// Closing the file may or may not release the lock promptly. Callers should +// ensure that Unlock is always called if RLock succeeds. +func RLock(f File) error { + return lock(f, readLock) +} + +// Unlock removes an advisory lock placed on f by this process. +// +// The caller must not attempt to unlock a file that is not locked. +func Unlock(f File) error { + return unlock(f) +} + +// String returns the name of the function corresponding to lt +// (Lock, RLock, or Unlock). +func (lt lockType) String() string { + switch lt { + case readLock: + return "RLock" + case writeLock: + return "Lock" + default: + return "Unlock" + } +} + +// IsNotSupported returns a boolean indicating whether the error is known to +// report that a function is not supported (possibly for a specific input). +// It is satisfied by ErrNotSupported as well as some syscall errors. +func IsNotSupported(err error) bool { + return isNotSupported(underlyingError(err)) +} + +var ErrNotSupported = errors.New("operation not supported") + +// underlyingError returns the underlying error for known os error types. +func underlyingError(err error) error { + switch err := err.(type) { + case *os.PathError: + return err.Err + case *os.LinkError: + return err.Err + case *os.SyscallError: + return err.Err + } + return err +} diff --git a/internal/lockedfile/internal/filelock/filelock_unix.go b/internal/lockedfile/internal/filelock/filelock_unix.go new file mode 100755 index 000000000..00c426283 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_unix.go @@ -0,0 +1,44 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin dragonfly freebsd linux netbsd openbsd + +package filelock + +import ( + "os" + "syscall" +) + +type lockType int16 + +const ( + readLock lockType = syscall.LOCK_SH + writeLock lockType = syscall.LOCK_EX +) + +func lock(f File, lt lockType) (err error) { + for { + err = syscall.Flock(int(f.Fd()), int(lt)) + if err != syscall.EINTR { + break + } + } + if err != nil { + return &os.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: err, + } + } + return nil +} + +func unlock(f File) error { + return lock(f, syscall.LOCK_UN) +} + +func isNotSupported(err error) bool { + return err == syscall.ENOSYS || err == syscall.ENOTSUP || err == syscall.EOPNOTSUPP || err == ErrNotSupported +} diff --git a/internal/lockedfile/lockedfile.go b/internal/lockedfile/lockedfile.go new file mode 100755 index 000000000..59b2dba44 --- /dev/null +++ b/internal/lockedfile/lockedfile.go @@ -0,0 +1,187 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package lockedfile creates and manipulates files whose contents should only +// change atomically. +package lockedfile + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "runtime" +) + +// A File is a locked *os.File. +// +// Closing the file releases the lock. +// +// If the program exits while a file is locked, the operating system releases +// the lock but may not do so promptly: callers must ensure that all locked +// files are closed before exiting. +type File struct { + osFile + closed bool +} + +// osFile embeds a *os.File while keeping the pointer itself unexported. +// (When we close a File, it must be the same file descriptor that we opened!) +type osFile struct { + *os.File +} + +// OpenFile is like os.OpenFile, but returns a locked file. +// If flag includes os.O_WRONLY or os.O_RDWR, the file is write-locked; +// otherwise, it is read-locked. +func OpenFile(name string, flag int, perm os.FileMode) (*File, error) { + var ( + f = new(File) + err error + ) + f.osFile.File, err = openFile(name, flag, perm) + if err != nil { + return nil, err + } + + // Although the operating system will drop locks for open files when the go + // command exits, we want to hold locks for as little time as possible, and we + // especially don't want to leave a file locked after we're done with it. Our + // Close method is what releases the locks, so use a finalizer to report + // missing Close calls on a best-effort basis. + runtime.SetFinalizer(f, func(f *File) { + panic(fmt.Sprintf("lockedfile.File %s became unreachable without a call to Close", f.Name())) + }) + + return f, nil +} + +// Open is like os.Open, but returns a read-locked file. +func Open(name string) (*File, error) { + return OpenFile(name, os.O_RDONLY, 0) +} + +// Create is like os.Create, but returns a write-locked file. +func Create(name string) (*File, error) { + return OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) +} + +// Edit creates the named file with mode 0666 (before umask), +// but does not truncate existing contents. +// +// If Edit succeeds, methods on the returned File can be used for I/O. +// The associated file descriptor has mode O_RDWR and the file is write-locked. +func Edit(name string) (*File, error) { + return OpenFile(name, os.O_RDWR|os.O_CREATE, 0666) +} + +// Close unlocks and closes the underlying file. +// +// Close may be called multiple times; all calls after the first will return a +// non-nil error. +func (f *File) Close() error { + if f.closed { + return &os.PathError{ + Op: "close", + Path: f.Name(), + Err: os.ErrClosed, + } + } + f.closed = true + + err := closeFile(f.osFile.File) + runtime.SetFinalizer(f, nil) + return err +} + +// Read opens the named file with a read-lock and returns its contents. +func Read(name string) ([]byte, error) { + f, err := Open(name) + if err != nil { + return nil, err + } + defer f.Close() + + return ioutil.ReadAll(f) +} + +// Write opens the named file (creating it with the given permissions if needed), +// then write-locks it and overwrites it with the given content. +func Write(name string, content io.Reader, perm os.FileMode) (err error) { + f, err := OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + + _, err = io.Copy(f, content) + if closeErr := f.Close(); err == nil { + err = closeErr + } + return err +} + +// Transform invokes t with the result of reading the named file, with its lock +// still held. +// +// If t returns a nil error, Transform then writes the returned contents back to +// the file, making a best effort to preserve existing contents on error. +// +// t must not modify the slice passed to it. +func Transform(name string, t func([]byte) ([]byte, error)) (err error) { + f, err := Edit(name) + if err != nil { + return err + } + defer f.Close() + + old, err := ioutil.ReadAll(f) + if err != nil { + return err + } + + new, err := t(old) + if err != nil { + return err + } + + if len(new) > len(old) { + // The overall file size is increasing, so write the tail first: if we're + // about to run out of space on the disk, we would rather detect that + // failure before we have overwritten the original contents. + if _, err := f.WriteAt(new[len(old):], int64(len(old))); err != nil { + // Make a best effort to remove the incomplete tail. + f.Truncate(int64(len(old))) + return err + } + } + + // We're about to overwrite the old contents. In case of failure, make a best + // effort to roll back before we close the file. + defer func() { + if err != nil { + if _, err := f.WriteAt(old, 0); err == nil { + f.Truncate(int64(len(old))) + } + } + }() + + if len(new) >= len(old) { + if _, err := f.WriteAt(new[:len(old)], 0); err != nil { + return err + } + } else { + if _, err := f.WriteAt(new, 0); err != nil { + return err + } + // The overall file size is decreasing, so shrink the file to its final size + // after writing. We do this after writing (instead of before) so that if + // the write fails, enough filesystem space will likely still be reserved + // to contain the previous contents. + if err := f.Truncate(int64(len(new))); err != nil { + return err + } + } + + return nil +} diff --git a/internal/lockedfile/lockedfile_filelock.go b/internal/lockedfile/lockedfile_filelock.go new file mode 100755 index 000000000..ed565e02a --- /dev/null +++ b/internal/lockedfile/lockedfile_filelock.go @@ -0,0 +1,64 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !plan9 + +package lockedfile + +import ( + "os" + + "github.com/fluxcd/source-controller/internal/lockedfile/internal/filelock" +) + +func openFile(name string, flag int, perm os.FileMode) (*os.File, error) { + // On BSD systems, we could add the O_SHLOCK or O_EXLOCK flag to the OpenFile + // call instead of locking separately, but we have to support separate locking + // calls for Linux and Windows anyway, so it's simpler to use that approach + // consistently. + + f, err := os.OpenFile(name, flag&^os.O_TRUNC, perm) + if err != nil { + return nil, err + } + + switch flag & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) { + case os.O_WRONLY, os.O_RDWR: + err = filelock.Lock(f) + default: + err = filelock.RLock(f) + } + if err != nil { + f.Close() + return nil, err + } + + if flag&os.O_TRUNC == os.O_TRUNC { + if err := f.Truncate(0); err != nil { + // The documentation for os.O_TRUNC says “if possible, truncate file when + // opened”, but doesn't define “possible” (golang.org/issue/28699). + // We'll treat regular files (and symlinks to regular files) as “possible” + // and ignore errors for the rest. + if fi, statErr := f.Stat(); statErr != nil || fi.Mode().IsRegular() { + filelock.Unlock(f) + f.Close() + return nil, err + } + } + } + + return f, nil +} + +func closeFile(f *os.File) error { + // Since locking syscalls operate on file descriptors, we must unlock the file + // while the descriptor is still valid — that is, before the file is closed — + // and avoid unlocking files that are already closed. + err := filelock.Unlock(f) + + if closeErr := f.Close(); err == nil { + err = closeErr + } + return err +} diff --git a/internal/lockedfile/mutex.go b/internal/lockedfile/mutex.go new file mode 100755 index 000000000..180a36c62 --- /dev/null +++ b/internal/lockedfile/mutex.go @@ -0,0 +1,67 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package lockedfile + +import ( + "fmt" + "os" + "sync" +) + +// A Mutex provides mutual exclusion within and across processes by locking a +// well-known file. Such a file generally guards some other part of the +// filesystem: for example, a Mutex file in a directory might guard access to +// the entire tree rooted in that directory. +// +// Mutex does not implement sync.Locker: unlike a sync.Mutex, a lockedfile.Mutex +// can fail to lock (e.g. if there is a permission error in the filesystem). +// +// Like a sync.Mutex, a Mutex may be included as a field of a larger struct but +// must not be copied after first use. The Path field must be set before first +// use and must not be change thereafter. +type Mutex struct { + Path string // The path to the well-known lock file. Must be non-empty. + mu sync.Mutex // A redundant mutex. The race detector doesn't know about file locking, so in tests we may need to lock something that it understands. +} + +// MutexAt returns a new Mutex with Path set to the given non-empty path. +func MutexAt(path string) *Mutex { + if path == "" { + panic("lockedfile.MutexAt: path must be non-empty") + } + return &Mutex{Path: path} +} + +func (mu *Mutex) String() string { + return fmt.Sprintf("lockedfile.Mutex(%s)", mu.Path) +} + +// Lock attempts to lock the Mutex. +// +// If successful, Lock returns a non-nil unlock function: it is provided as a +// return-value instead of a separate method to remind the caller to check the +// accompanying error. (See https://golang.org/issue/20803.) +func (mu *Mutex) Lock() (unlock func(), err error) { + if mu.Path == "" { + panic("lockedfile.Mutex: missing Path during Lock") + } + + // We could use either O_RDWR or O_WRONLY here. If we choose O_RDWR and the + // file at mu.Path is write-only, the call to OpenFile will fail with a + // permission error. That's actually what we want: if we add an RLock method + // in the future, it should call OpenFile with O_RDONLY and will require the + // files must be readable, so we should not let the caller make any + // assumptions about Mutex working with write-only files. + f, err := OpenFile(mu.Path, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + mu.mu.Lock() + + return func() { + mu.mu.Unlock() + f.Close() + }, nil +} diff --git a/main.go b/main.go index 23439206d..44de5f365 100644 --- a/main.go +++ b/main.go @@ -18,20 +18,22 @@ package main import ( "flag" - "github.com/go-logr/logr" - "helm.sh/helm/v3/pkg/getter" - + "fmt" "net/http" "os" "path/filepath" + "strings" + "time" + "github.com/go-logr/logr" + "helm.sh/helm/v3/pkg/getter" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" - sourcev1alpha1 "github.com/fluxcd/source-controller/api/v1alpha1" + sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1" "github.com/fluxcd/source-controller/controllers" // +kubebuilder:scaffold:imports ) @@ -44,7 +46,7 @@ var ( func init() { _ = clientgoscheme.AddToScheme(scheme) - _ = sourcev1alpha1.AddToScheme(scheme) + _ = sourcev1.AddToScheme(scheme) // +kubebuilder:scaffold:scheme } @@ -76,26 +78,26 @@ func main() { os.Exit(1) } - if storagePath == "" { - p, _ := os.Getwd() - storagePath = filepath.Join(p, "bin") - } - go startFileServer(storagePath, storageAddr, setupLog) + storage := mustInitStorage(storagePath, setupLog) + + go startFileServer(storage.BasePath, storageAddr, setupLog) if err = (&controllers.GitRepositoryReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("GitRepository"), - Scheme: mgr.GetScheme(), - StoragePath: storagePath, + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("GitRepository"), + Scheme: mgr.GetScheme(), + Kind: "gitrepository", + Storage: storage, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "GitRepository") os.Exit(1) } if err = (&controllers.HelmRepositoryReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("HelmRepository"), - Scheme: mgr.GetScheme(), - StoragePath: storagePath, + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("HelmRepository"), + Scheme: mgr.GetScheme(), + Kind: "helmrepository", + Storage: storage, Getters: getter.Providers{ getter.Provider{ Schemes: []string{"http", "https"}, @@ -123,3 +125,25 @@ func startFileServer(path string, address string, l logr.Logger) { l.Error(err, "file server error") } } + +func mustInitStorage(path string, l logr.Logger) *controllers.Storage { + if path == "" { + p, _ := os.Getwd() + path = filepath.Join(p, "bin") + } + + hostname := "localhost" + if os.Getenv("RUNTIME_NAMESPACE") != "" { + svcParts := strings.Split(os.Getenv("HOSTNAME"), "-") + hostname = fmt.Sprintf("%s.%s", + strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE")) + } + + storage, err := controllers.NewStorage(path, hostname, 5*time.Minute) + if err != nil { + l.Error(err, "unable to initialise storage") + os.Exit(1) + } + + return storage +}