Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding syncmaster&worker reconciliation support. #145

Merged
merged 16 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions docs/Manual/Deployment/Kubernetes/DeploymentResource.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,6 @@ replication in the cluster. When enabled, the cluster will contain
a number of `syncmaster` & `syncworker` servers.
The default value is `false`.

### `spec.sync.image: string`

This setting specifies the docker image to use for all ArangoSync servers.
When not specified, the `spec.image` value is used.

### `spec.sync.imagePullPolicy: string`

This setting specifies the pull policy for the docker image to use for all ArangoSync servers.
For possible values, see `spec.imagePullPolicy`.
When not specified, the `spec.imagePullPolicy` value is used.

### `spec.sync.externalAccess.type: string`

This setting specifies the type of `Service` that will be created to provide
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/deployment/v1alpha/deployment_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *DeploymentSpec) SetDefaults(deploymentName string) {
s.RocksDB.SetDefaults()
s.Authentication.SetDefaults(deploymentName + "-jwt")
s.TLS.SetDefaults(deploymentName + "-ca")
s.Sync.SetDefaults(s.GetImage(), s.GetImagePullPolicy(), deploymentName+"-sync-jwt", deploymentName+"-sync-client-auth-ca", deploymentName+"-sync-ca")
s.Sync.SetDefaults(deploymentName+"-sync-jwt", deploymentName+"-sync-client-auth-ca", deploymentName+"-sync-ca", deploymentName+"-sync-mt")
s.Single.SetDefaults(ServerGroupSingle, s.GetMode().HasSingleServers(), s.GetMode())
s.Agents.SetDefaults(ServerGroupAgents, s.GetMode().HasAgents(), s.GetMode())
s.DBServers.SetDefaults(ServerGroupDBServers, s.GetMode().HasDBServers(), s.GetMode())
Expand Down
8 changes: 6 additions & 2 deletions pkg/apis/deployment/v1alpha/monitoring_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ func (s MonitoringSpec) Validate() error {
}

// SetDefaults fills in missing defaults
func (s *MonitoringSpec) SetDefaults() {
// Nothing needed
func (s *MonitoringSpec) SetDefaults(defaultTokenSecretName string) {
if s.GetTokenSecretName() == "" {
// Note that we don't check for nil here, since even a specified, but empty
// string should result in the default value.
s.TokenSecretName = util.NewString(defaultTokenSecretName)
}
}

// SetDefaultsFrom fills unspecified fields with a value from given source spec.
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/deployment/v1alpha/server_group_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (s *ServerGroupSpec) SetDefaults(group ServerGroup, used bool, mode Deploym
default:
s.Count = util.NewInt(3)
}
} else if s.GetCount() > 0 && !used {
s.Count = util.NewInt(0)
}
if _, found := s.Resources.Requests[v1.ResourceStorage]; !found {
switch group {
Expand Down
34 changes: 3 additions & 31 deletions pkg/apis/deployment/v1alpha/sync_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ package v1alpha

import (
"github.com/pkg/errors"
"k8s.io/api/core/v1"

"github.com/arangodb/kube-arangodb/pkg/util"
)

// SyncSpec holds dc2dc replication specific configuration settings
type SyncSpec struct {
Enabled *bool `json:"enabled,omitempty"`
Image *string `json:"image,omitempty"`
ImagePullPolicy *v1.PullPolicy `json:"imagePullPolicy,omitempty"`
Enabled *bool `json:"enabled,omitempty"`

ExternalAccess SyncExternalAccessSpec `json:"externalAccess"`
Authentication SyncAuthenticationSpec `json:"auth"`
Expand All @@ -46,24 +43,11 @@ func (s SyncSpec) IsEnabled() bool {
return util.BoolOrDefault(s.Enabled)
}

// GetImage returns the value of image.
func (s SyncSpec) GetImage() string {
return util.StringOrDefault(s.Image)
}

// GetImagePullPolicy returns the value of imagePullPolicy.
func (s SyncSpec) GetImagePullPolicy() v1.PullPolicy {
return util.PullPolicyOrDefault(s.ImagePullPolicy)
}

// Validate the given spec
func (s SyncSpec) Validate(mode DeploymentMode) error {
if s.IsEnabled() && !mode.SupportsSync() {
return maskAny(errors.Wrapf(ValidationError, "Cannot enable sync with mode: '%s'", mode))
}
if s.GetImage() == "" {
return maskAny(errors.Wrapf(ValidationError, "image must be set"))
}
if s.IsEnabled() {
if err := s.ExternalAccess.Validate(); err != nil {
return maskAny(err)
Expand All @@ -82,30 +66,18 @@ func (s SyncSpec) Validate(mode DeploymentMode) error {
}

// SetDefaults fills in missing defaults
func (s *SyncSpec) SetDefaults(defaultImage string, defaulPullPolicy v1.PullPolicy, defaultJWTSecretName, defaultClientAuthCASecretName, defaultTLSCASecretName string) {
if s.GetImage() == "" {
s.Image = util.NewString(defaultImage)
}
if s.GetImagePullPolicy() == "" {
s.ImagePullPolicy = util.NewPullPolicy(defaulPullPolicy)
}
func (s *SyncSpec) SetDefaults(defaultJWTSecretName, defaultClientAuthCASecretName, defaultTLSCASecretName, defaultMonitoringSecretName string) {
s.ExternalAccess.SetDefaults()
s.Authentication.SetDefaults(defaultJWTSecretName, defaultClientAuthCASecretName)
s.TLS.SetDefaults(defaultTLSCASecretName)
s.Monitoring.SetDefaults()
s.Monitoring.SetDefaults(defaultMonitoringSecretName)
}

// SetDefaultsFrom fills unspecified fields with a value from given source spec.
func (s *SyncSpec) SetDefaultsFrom(source SyncSpec) {
if s.Enabled == nil {
s.Enabled = util.NewBoolOrNil(source.Enabled)
}
if s.Image == nil {
s.Image = util.NewStringOrNil(source.Image)
}
if s.ImagePullPolicy == nil {
s.ImagePullPolicy = util.NewPullPolicyOrNil(source.ImagePullPolicy)
}
s.ExternalAccess.SetDefaultsFrom(source.ExternalAccess)
s.Authentication.SetDefaultsFrom(source.Authentication)
s.TLS.SetDefaultsFrom(source.TLS)
Expand Down
34 changes: 7 additions & 27 deletions pkg/apis/deployment/v1alpha/sync_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,31 @@ import (

"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
)

func TestSyncSpecValidate(t *testing.T) {
// Valid
auth := SyncAuthenticationSpec{JWTSecretName: util.NewString("foo"), ClientCASecretName: util.NewString("foo-client")}
tls := TLSSpec{CASecretName: util.NewString("None")}
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth}.Validate(DeploymentModeSingle))
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth}.Validate(DeploymentModeActiveFailover))
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth}.Validate(DeploymentModeCluster))
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeCluster))
assert.Nil(t, SyncSpec{Authentication: auth}.Validate(DeploymentModeSingle))
assert.Nil(t, SyncSpec{Authentication: auth}.Validate(DeploymentModeActiveFailover))
assert.Nil(t, SyncSpec{Authentication: auth}.Validate(DeploymentModeCluster))
assert.Nil(t, SyncSpec{Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeCluster))

// Not valid
assert.Error(t, SyncSpec{Image: util.NewString(""), Authentication: auth}.Validate(DeploymentModeSingle))
assert.Error(t, SyncSpec{Image: util.NewString(""), Authentication: auth}.Validate(DeploymentModeActiveFailover))
assert.Error(t, SyncSpec{Image: util.NewString(""), Authentication: auth}.Validate(DeploymentModeCluster))
assert.Error(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeSingle))
assert.Error(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeActiveFailover))
assert.Error(t, SyncSpec{Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeSingle))
assert.Error(t, SyncSpec{Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeActiveFailover))
}

func TestSyncSpecSetDefaults(t *testing.T) {
def := func(spec SyncSpec) SyncSpec {
spec.SetDefaults("test-image", v1.PullAlways, "test-jwt", "test-client-auth-ca", "test-tls-ca")
spec.SetDefaults("test-jwt", "test-client-auth-ca", "test-tls-ca")
return spec
}

assert.False(t, def(SyncSpec{}).IsEnabled())
assert.False(t, def(SyncSpec{Enabled: util.NewBool(false)}).IsEnabled())
assert.True(t, def(SyncSpec{Enabled: util.NewBool(true)}).IsEnabled())
assert.Equal(t, "test-image", def(SyncSpec{}).GetImage())
assert.Equal(t, "foo", def(SyncSpec{Image: util.NewString("foo")}).GetImage())
assert.Equal(t, v1.PullAlways, def(SyncSpec{}).GetImagePullPolicy())
assert.Equal(t, v1.PullNever, def(SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullNever)}).GetImagePullPolicy())
assert.Equal(t, "test-jwt", def(SyncSpec{}).Authentication.GetJWTSecretName())
assert.Equal(t, "foo", def(SyncSpec{Authentication: SyncAuthenticationSpec{JWTSecretName: util.NewString("foo")}}).Authentication.GetJWTSecretName())
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no test for new parameter to monitoring.SetDefaults("string")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good! Added some tests

Expand All @@ -84,18 +76,6 @@ func TestSyncSpecResetImmutableFields(t *testing.T) {
SyncSpec{Enabled: util.NewBool(false)},
nil,
},
{
SyncSpec{Image: util.NewString("foo")},
SyncSpec{Image: util.NewString("foo2")},
SyncSpec{Image: util.NewString("foo2")},
nil,
},
{
SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullAlways)},
SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullNever)},
SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullNever)},
nil,
},
{
SyncSpec{Authentication: SyncAuthenticationSpec{JWTSecretName: util.NewString("None"), ClientCASecretName: util.NewString("some")}},
SyncSpec{Authentication: SyncAuthenticationSpec{JWTSecretName: util.NewString("None"), ClientCASecretName: util.NewString("some")}},
Expand Down
18 changes: 0 additions & 18 deletions pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,24 +678,6 @@ func (in *SyncSpec) DeepCopyInto(out *SyncSpec) {
**out = **in
}
}
if in.Image != nil {
in, out := &in.Image, &out.Image
if *in == nil {
*out = nil
} else {
*out = new(string)
**out = **in
}
}
if in.ImagePullPolicy != nil {
in, out := &in.ImagePullPolicy, &out.ImagePullPolicy
if *in == nil {
*out = nil
} else {
*out = new(core_v1.PullPolicy)
**out = **in
}
}
in.ExternalAccess.DeepCopyInto(&out.ExternalAccess)
in.Authentication.DeepCopyInto(&out.Authentication)
in.TLS.DeepCopyInto(&out.TLS)
Expand Down
34 changes: 34 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package deployment
import (
"context"

"github.com/arangodb/arangosync/client"
"github.com/arangodb/arangosync/tasks"
driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -119,6 +121,38 @@ func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id str
return result, nil
}

// GetSyncServerClient returns a cached client for a specific arangosync server.
func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) {
// Fetch monitoring token
log := d.deps.Log
kubecli := d.deps.KubeCli
ns := d.apiObject.GetNamespace()
secretName := d.apiObject.Spec.Sync.Monitoring.GetTokenSecretName()
monitoringToken, err := k8sutil.GetTokenSecret(kubecli.CoreV1(), secretName, ns)
if err != nil {
log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get sync monitoring secret")
return nil, maskAny(err)
}

// Fetch server DNS name
dnsName := k8sutil.CreatePodDNSName(d.apiObject, group.AsRole(), id)

// Build client
source := client.Endpoint{dnsName}
tlsAuth := tasks.TLSAuthentication{
TLSClientAuthentication: tasks.TLSClientAuthentication{
ClientToken: monitoringToken,
},
}
auth := client.NewAuthentication(tlsAuth, "")
insecureSkipVerify := true
c, err := d.syncClientCache.GetClient(d.deps.Log, source, auth, insecureSkipVerify)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}

// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"

"github.com/arangodb/arangosync/client"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"k8s.io/api/core/v1"
Expand Down Expand Up @@ -101,6 +102,7 @@ type Deployment struct {
resilience *resilience.Resilience
resources *resources.Resources
chaosMonkey *chaos.Monkey
syncClientCache client.ClientCache
}

// New creates a new Deployment from the given API object.
Expand Down Expand Up @@ -280,6 +282,7 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
}
newAPIObject := current.DeepCopy()
newAPIObject.Spec.SetDefaultsFrom(specBefore)
newAPIObject.Spec.SetDefaults(d.apiObject.GetName())
newAPIObject.Status = d.status
resetFields := specBefore.ResetImmutableFields(&newAPIObject.Spec)
if len(resetFields) > 0 {
Expand Down
12 changes: 12 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"fmt"

"github.com/arangodb/arangosync/client"
driver "github.com/arangodb/go-driver"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -45,6 +46,8 @@ type ActionContext interface {
GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
// GetAgencyClients returns a client connection for every agency member.
GetAgencyClients(ctx context.Context) ([]driver.Connection, error)
// GetSyncServerClient returns a cached client for a specific arangosync server.
GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error)
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
Expand Down Expand Up @@ -115,6 +118,15 @@ func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]driver.Connect
return c, nil
}

// GetSyncServerClient returns a cached client for a specific arangosync server.
func (ac *actionContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) {
c, err := ac.context.GetSyncServerClient(ctx, group, id)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}

// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
Expand Down
11 changes: 10 additions & 1 deletion pkg/deployment/reconcile/action_wait_for_member_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool,
// checkProgressArangoSync checks the progress of the action in the case
// of a sync master / worker.
func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, error) {
// TODO
log := a.log
c, err := a.actionCtx.GetSyncServerClient(ctx, a.action.Group, a.action.ID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create arangosync client")
return false, maskAny(err)
}
if err := c.Health(ctx); err != nil {
log.Debug().Err(err).Msg("Health not ok yet")
return false, maskAny(err)
}
return true, nil
}
3 changes: 3 additions & 0 deletions pkg/deployment/reconcile/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package reconcile
import (
"context"

"github.com/arangodb/arangosync/client"
driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"

Expand All @@ -51,6 +52,8 @@ type Context interface {
// GetAgencyClients returns a client connection for every agency member.
// If the given predicate is not nil, only agents are included where the given predicate returns true.
GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error)
// GetSyncServerClient returns a cached client for a specific arangosync server.
GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error)
// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
CreateMember(group api.ServerGroup, id string) error
Expand Down
5 changes: 4 additions & 1 deletion pkg/deployment/reconcile/plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,12 @@ func createPlan(log zerolog.Logger, apiObject metav1.Object,
// Only scale singles
plan = append(plan, createScalePlan(log, status.Members.Single, api.ServerGroupSingle, spec.Single.GetCount())...)
case api.DeploymentModeCluster:
// Scale dbservers, coordinators, syncmasters & syncworkers
// Scale dbservers, coordinators
plan = append(plan, createScalePlan(log, status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.GetCount())...)
plan = append(plan, createScalePlan(log, status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.GetCount())...)
}
if spec.GetMode().SupportsSync() {
// Scale syncmasters & syncworkers
plan = append(plan, createScalePlan(log, status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.GetCount())...)
plan = append(plan, createScalePlan(log, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount())...)
}
Expand Down
Loading