Skip to content

Commit

Permalink
Merge pull request #71 from arangodb/resources
Browse files Browse the repository at this point in the history
Moved low level resource (pod,pvc,secret,service) creation & inspection to resources sub-package.
  • Loading branch information
ewoutp authored Mar 26, 2018
2 parents 81809c0 + d76295a commit 2e7a95f
Show file tree
Hide file tree
Showing 23 changed files with 423 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,35 @@ import (
driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// GetAPIObject returns the deployment as k8s object.
func (d *Deployment) GetAPIObject() metav1.Object {
func (d *Deployment) GetAPIObject() k8sutil.APIObject {
return d.apiObject
}

// GetServerGroupIterator returns the deployment as ServerGroupIterator.
func (d *Deployment) GetServerGroupIterator() resources.ServerGroupIterator {
return d.apiObject
}

// GetKubeCli returns the kubernetes client
func (d *Deployment) GetKubeCli() kubernetes.Interface {
return d.deps.KubeCli
}

// GetNamespace returns the kubernetes namespace that contains
// this deployment.
func (d *Deployment) GetNamespace() string {
return d.apiObject.GetNamespace()
}

// GetSpec returns the current specification
func (d *Deployment) GetSpec() api.DeploymentSpec {
return d.apiObject.Spec
Expand Down
21 changes: 12 additions & 9 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
Expand Down Expand Up @@ -94,6 +95,7 @@ type Deployment struct {
recentInspectionErrors int
clusterScalingIntegration *clusterScalingIntegration
reconciler *reconcile.Reconciler
resources *resources.Resources
}

// New creates a new Deployment from the given API object.
Expand All @@ -112,6 +114,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
clientCache: newClientCache(deps.KubeCli, apiObject),
}
d.reconciler = reconcile.NewReconciler(deps.Log, d)
d.resources = resources.NewResources(deps.Log, d)
if d.status.AcceptedSpec == nil {
// We've validated the spec, so let's use it from now.
d.status.AcceptedSpec = apiObject.Spec.DeepCopy()
Expand Down Expand Up @@ -167,13 +170,13 @@ func (d *Deployment) run() {

if d.status.State == api.DeploymentStateNone {
// Create secrets
if err := d.createSecrets(d.apiObject); err != nil {
if err := d.resources.EnsureSecrets(); err != nil {
d.failOnError(err, "Failed to create secrets")
return
}

// Create services
if err := d.createServices(d.apiObject); err != nil {
if err := d.resources.EnsureServices(); err != nil {
d.failOnError(err, "Failed to create services")
return
}
Expand All @@ -185,13 +188,13 @@ func (d *Deployment) run() {
}

// Create PVCs
if err := d.ensurePVCs(d.apiObject); err != nil {
if err := d.resources.EnsurePVCs(); err != nil {
d.failOnError(err, "Failed to create persistent volume claims")
return
}

// Create pods
if err := d.ensurePods(d.apiObject); err != nil {
if err := d.resources.EnsurePods(); err != nil {
d.failOnError(err, "Failed to create pods")
return
}
Expand Down Expand Up @@ -266,18 +269,18 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
log.Debug().Strs("fields", resetFields).Msg("Found modified immutable fields")
}
if err := newAPIObject.Spec.Validate(); err != nil {
d.createEvent(k8sutil.NewErrorEvent("Validation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Validation failed", err, d.apiObject))
// Try to reset object
if err := d.updateCRSpec(d.apiObject.Spec); err != nil {
log.Error().Err(err).Msg("Restore original spec failed")
d.createEvent(k8sutil.NewErrorEvent("Restore original failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Restore original failed", err, d.apiObject))
}
return nil
}
if len(resetFields) > 0 {
for _, fieldName := range resetFields {
log.Debug().Str("field", fieldName).Msg("Reset immutable field")
d.createEvent(k8sutil.NewImmutableFieldEvent(fieldName, d.apiObject))
d.CreateEvent(k8sutil.NewImmutableFieldEvent(fieldName, d.apiObject))
}
}

Expand All @@ -302,9 +305,9 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
return nil
}

// createEvent creates a given event.
// CreateEvent creates a given event.
// On error, the error is logged.
func (d *Deployment) createEvent(evt *v1.Event) {
func (d *Deployment) CreateEvent(evt *v1.Event) {
_, err := d.eventsCli.Create(evt)
if err != nil {
d.deps.Log.Error().Err(err).Interface("event", *evt).Msg("Failed to record event")
Expand Down
22 changes: 13 additions & 9 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,41 +46,45 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
// Ensure we have image info
if retrySoon, err := d.ensureImages(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject))
} else if retrySoon {
nextInterval = minInspectionInterval
}

// Inspection of generated resources needed
if err := d.inspectPods(); err != nil {
if err := d.resources.InspectPods(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
}

// Create scale/update plan
if err := d.reconciler.CreatePlan(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
}

// Execute current step of scale/update plan
retrySoon, err := d.reconciler.ExecutePlan(ctx)
if err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
}
if retrySoon {
nextInterval = minInspectionInterval
}

// Ensure all resources are created
if err := d.ensurePVCs(d.apiObject); err != nil {
if err := d.resources.EnsureServices(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Service creation failed", err, d.apiObject))
}
if err := d.ensurePods(d.apiObject); err != nil {
if err := d.resources.EnsurePVCs(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
}
if err := d.resources.EnsurePods(); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
}

// Update next interval (on errors)
Expand Down
10 changes: 1 addition & 9 deletions pkg/deployment/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"strings"

"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand All @@ -40,7 +39,6 @@ import (

const (
dockerPullableImageIDPrefix = "docker-pullable://"
imageIDAndVersionRole = "id" // Role use by identification pods
)

type imagesBuilder struct {
Expand Down Expand Up @@ -99,7 +97,7 @@ func (ib *imagesBuilder) Run(ctx context.Context) (bool, error) {
// When no pod exists, it is created, otherwise the ID is fetched & version detected.
// Returns: retrySoon, error
func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, image string) (bool, error) {
role := imageIDAndVersionRole
role := k8sutil.ImageIDAndVersionRole
id := fmt.Sprintf("%0x", sha1.Sum([]byte(image)))[:6]
podName := k8sutil.CreatePodName(ib.APIObject.GetName(), role, id, "")
ns := ib.APIObject.GetNamespace()
Expand Down Expand Up @@ -175,9 +173,3 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, ima
// Come back soon to inspect the pod
return true, nil
}

// isArangoDBImageIDAndVersionPod returns true if the given pod is used for fetching image ID and ArangoDB version of an image
func isArangoDBImageIDAndVersionPod(p v1.Pod) bool {
role, found := p.GetLabels()[k8sutil.LabelKeyRole]
return found && role == imageIDAndVersionRole
}
6 changes: 3 additions & 3 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ type ActionContext interface {
DeletePvc(pvcName string) error
}

// NewActionContext creates a new ActionContext implementation.
func NewActionContext(log zerolog.Logger, context ReconcileContext) ActionContext {
// newActionContext creates a new ActionContext implementation.
func newActionContext(log zerolog.Logger, context Context) ActionContext {
return &actionContext{
log: log,
context: context,
Expand All @@ -76,7 +76,7 @@ func NewActionContext(log zerolog.Logger, context ReconcileContext) ActionContex
// actionContext implements ActionContext
type actionContext struct {
log zerolog.Logger
context ReconcileContext
context Context
}

// Gets the specified mode of deployment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ import (

driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// ReconcileContext provides methods to the reconcile package.
type ReconcileContext interface {
// Context provides methods to the reconcile package.
type Context interface {
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() metav1.Object
GetAPIObject() k8sutil.APIObject
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/reconcile/plan_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) {
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (d *Reconciler) createAction(ctx context.Context, log zerolog.Logger, action api.Action) Action {
actionCtx := NewActionContext(log, d.context)
actionCtx := newActionContext(log, d.context)
switch action.Type {
case api.ActionTypeAddMember:
return NewAddMemberAction(log, action, actionCtx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import "github.com/rs/zerolog"
// in line with its (changed) specification.
type Reconciler struct {
log zerolog.Logger
context ReconcileContext
context Context
}

// NewReconciler creates a new reconciler with given context.
func NewReconciler(log zerolog.Logger, context ReconcileContext) *Reconciler {
func NewReconciler(log zerolog.Logger, context Context) *Reconciler {
return &Reconciler{
log: log,
context: context,
Expand Down
65 changes: 65 additions & 0 deletions pkg/deployment/resources/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package resources

import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)

// ServerGroupIterator provides a helper to callback on every server
// group of the deployment.
type ServerGroupIterator interface {
// ForeachServerGroup calls the given callback for all server groups.
// If the callback returns an error, this error is returned and no other server
// groups are processed.
// Groups are processed in this order: agents, single, dbservers, coordinators, syncmasters, syncworkers
ForeachServerGroup(cb func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error, status *api.DeploymentStatus) error
}

// Context provides all functions needed by the Resources service
// to perform its service.
type Context interface {
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
// GetServerGroupIterator returns the deployment as ServerGroupIterator.
GetServerGroupIterator() ServerGroupIterator
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment
GetStatus() api.DeploymentStatus
// UpdateStatus replaces the status of the deployment with the given status and
// updates the resources in k8s.
UpdateStatus(status api.DeploymentStatus, force ...bool) error
// GetKubeCli returns the kubernetes client
GetKubeCli() kubernetes.Interface
// GetNamespace returns the namespace that contains the deployment
GetNamespace() string
// createEvent creates a given event.
// On error, the error is logged.
CreateEvent(evt *v1.Event)
// GetOwnedPods returns a list of all pods owned by the deployment.
GetOwnedPods() ([]v1.Pod, error)
}
29 changes: 29 additions & 0 deletions pkg/deployment/resources/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package resources

import "github.com/pkg/errors"

var (
maskAny = errors.WithStack
)
Loading

0 comments on commit 2e7a95f

Please sign in to comment.