Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved readiness probe, database services only use ready pods #176

Merged
merged 4 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions pkg/apis/deployment/v1alpha/deployment_status_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
package v1alpha

import (
"fmt"

"github.com/pkg/errors"
)

Expand Down Expand Up @@ -213,17 +211,25 @@ func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) erro
return nil
}

// AllMembersReady returns true when all members are in the Ready state.
func (ds DeploymentStatusMembers) AllMembersReady() bool {
if err := ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
for _, x := range list {
if !x.Conditions.IsTrue(ConditionTypeReady) {
return fmt.Errorf("not ready")
}
// AllMembersReady returns true when all members, that must be ready for the given mode, are in the Ready state.
func (ds DeploymentStatusMembers) AllMembersReady(mode DeploymentMode, syncEnabled bool) bool {
syncReady := func() bool {
if syncEnabled {
return ds.SyncMasters.AllMembersReady() && ds.SyncWorkers.AllMembersReady()
}
return nil
}); err != nil {
return true
}
switch mode {
case DeploymentModeSingle:
return ds.Single.MembersReady() > 0
case DeploymentModeActiveFailover:
return ds.Agents.AllMembersReady() && ds.Single.MembersReady() > 0
case DeploymentModeCluster:
return ds.Agents.AllMembersReady() &&
ds.DBServers.AllMembersReady() &&
ds.Coordinators.AllMembersReady() &&
syncReady()
default:
return false
}
return true
}
16 changes: 16 additions & 0 deletions pkg/apis/deployment/v1alpha/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,19 @@ func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) {
}
return MemberStatus{}, maskAny(errors.Wrap(NotFoundError, "No member available for removal"))
}

// MembersReady returns the number of members that are in the Ready state.
func (l MemberStatusList) MembersReady() int {
readyCount := 0
for _, x := range l {
if x.Conditions.IsTrue(ConditionTypeReady) {
readyCount++
}
}
return readyCount
}

// AllMembersReady returns the true if all members are in the Ready state.
func (l MemberStatusList) AllMembersReady() bool {
return len(l) == l.MembersReady()
}
19 changes: 13 additions & 6 deletions pkg/deployment/resources/pod_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (r *Resources) createLivenessProbe(spec api.DeploymentSpec, group api.Serve

// createReadinessProbe creates configuration for a readiness probe of a server in the given group.
func (r *Resources) createReadinessProbe(spec api.DeploymentSpec, group api.ServerGroup) (*k8sutil.HTTPProbeConfig, error) {
if group != api.ServerGroupCoordinators {
if group != api.ServerGroupSingle && group != api.ServerGroupCoordinators {
return nil, nil
}
authorization := ""
Expand All @@ -373,11 +373,18 @@ func (r *Resources) createReadinessProbe(spec api.DeploymentSpec, group api.Serv
return nil, maskAny(err)
}
}
return &k8sutil.HTTPProbeConfig{
LocalPath: "/_api/version",
Secure: spec.IsSecure(),
Authorization: authorization,
}, nil
probeCfg := &k8sutil.HTTPProbeConfig{
LocalPath: "/_api/version",
Secure: spec.IsSecure(),
Authorization: authorization,
InitialDelaySeconds: 2,
PeriodSeconds: 2,
}
switch spec.GetMode() {
case api.DeploymentModeActiveFailover:
probeCfg.LocalPath = "/_admin/echo"
}
return probeCfg, nil
}

// createPodFinalizers creates a list of finalizers for a pod created for the given group.
Expand Down
3 changes: 2 additions & 1 deletion pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (r *Resources) InspectPods(ctx context.Context) error {
})

// Update overall conditions
allMembersReady := status.Members.AllMembersReady()
spec := r.context.GetSpec()
allMembersReady := status.Members.AllMembersReady(spec.GetMode(), spec.Sync.IsEnabled())
status.Conditions.Update(api.ConditionTypeReady, allMembersReady, "", "")

// Update conditions
Expand Down
30 changes: 21 additions & 9 deletions pkg/util/k8sutil/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type HTTPProbeConfig struct {
Authorization string
// Port to inspect (defaults to ArangoPort)
Port int
// Number of seconds after the container has started before liveness probes are initiated (defaults to 30)
InitialDelaySeconds int32
// Number of seconds after which the probe times out (defaults to 2).
TimeoutSeconds int32
// How often (in seconds) to perform the probe (defaults to 10).
PeriodSeconds int32
// Minimum consecutive successes for the probe to be considered successful after having failed (defaults to 1).
SuccessThreshold int32
// Minimum consecutive failures for the probe to be considered failed after having succeeded (defaults to 3).
FailureThreshold int32
}

// Create creates a probe from given config
Expand All @@ -52,23 +62,25 @@ func (config HTTPProbeConfig) Create() *v1.Probe {
Value: config.Authorization,
})
}
port := config.Port
if port == 0 {
port = ArangoPort
def := func(value, defaultValue int32) int32 {
if value != 0 {
return value
}
return defaultValue
}
return &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Path: config.LocalPath,
Port: intstr.FromInt(port),
Port: intstr.FromInt(int(def(int32(config.Port), ArangoPort))),
Scheme: scheme,
HTTPHeaders: headers,
},
},
InitialDelaySeconds: 30, // Wait 30s before first probe
TimeoutSeconds: 2, // Timeout of each probe is 2s
PeriodSeconds: 10, // Interval between probes is 10s
SuccessThreshold: 1, // Single probe is enough to indicate success
FailureThreshold: 3, // Need 3 failed probes to consider a failed state
InitialDelaySeconds: def(config.InitialDelaySeconds, 30), // Wait 30s before first probe
TimeoutSeconds: def(config.TimeoutSeconds, 2), // Timeout of each probe is 2s
PeriodSeconds: def(config.PeriodSeconds, 10), // Interval between probes is 10s
SuccessThreshold: def(config.SuccessThreshold, 1), // Single probe is enough to indicate success
FailureThreshold: def(config.FailureThreshold, 3), // Need 3 failed probes to consider a failed state
}
}
14 changes: 12 additions & 2 deletions pkg/util/k8sutil/probes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestCreate(t *testing.T) {
secret := "the secret"

// http
config := HTTPProbeConfig{path, false, secret, 0}
config := HTTPProbeConfig{path, false, secret, 0, 0, 0, 0, 0, 0}
probe := config.Create()

assert.Equal(t, probe.InitialDelaySeconds, int32(30))
Expand All @@ -50,8 +50,18 @@ func TestCreate(t *testing.T) {
assert.Equal(t, probe.Handler.HTTPGet.Scheme, v1.URISchemeHTTP)

// https
config = HTTPProbeConfig{path, true, secret, 0}
config = HTTPProbeConfig{path, true, secret, 0, 0, 0, 0, 0, 0}
probe = config.Create()

assert.Equal(t, probe.Handler.HTTPGet.Scheme, v1.URISchemeHTTPS)

// http, custom timing
config = HTTPProbeConfig{path, false, secret, 0, 1, 2, 3, 4, 5}
probe = config.Create()

assert.Equal(t, probe.InitialDelaySeconds, int32(1))
assert.Equal(t, probe.TimeoutSeconds, int32(2))
assert.Equal(t, probe.PeriodSeconds, int32(3))
assert.Equal(t, probe.SuccessThreshold, int32(4))
assert.Equal(t, probe.FailureThreshold, int32(5))
}
10 changes: 6 additions & 4 deletions pkg/util/k8sutil/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
package k8sutil

import (
"strconv"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -67,7 +69,7 @@ func CreateHeadlessService(kubecli kubernetes.Interface, deployment metav1.Objec
Port: ArangoPort,
},
}
publishNotReadyAddresses := false
publishNotReadyAddresses := true
serviceType := v1.ServiceTypeClusterIP
newlyCreated, err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), ClusterIPNone, "", serviceType, ports, "", publishNotReadyAddresses, owner)
if err != nil {
Expand Down Expand Up @@ -96,8 +98,8 @@ func CreateDatabaseClientService(kubecli kubernetes.Interface, deployment metav1
} else {
role = "coordinator"
}
publishNotReadyAddresses := true
serviceType := v1.ServiceTypeClusterIP
publishNotReadyAddresses := false
newlyCreated, err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), "", role, serviceType, ports, "", publishNotReadyAddresses, owner)
if err != nil {
return "", false, maskAny(err)
Expand All @@ -119,7 +121,7 @@ func CreateExternalAccessService(kubecli kubernetes.Interface, svcName, role str
NodePort: int32(nodePort),
},
}
publishNotReadyAddresses := true
publishNotReadyAddresses := false
newlyCreated, err := createService(kubecli, svcName, deploymentName, deployment.GetNamespace(), "", role, serviceType, ports, loadBalancerIP, publishNotReadyAddresses, owner)
if err != nil {
return "", false, maskAny(err)
Expand All @@ -142,7 +144,7 @@ func createService(kubecli kubernetes.Interface, svcName, deploymentName, ns, cl
// This annotation is deprecated, PublishNotReadyAddresses is
// used instead. We leave the annotation in for a while.
// See https://github.com/kubernetes/kubernetes/pull/49061
TolerateUnreadyEndpointsAnnotation: "true",
TolerateUnreadyEndpointsAnnotation: strconv.FormatBool(publishNotReadyAddresses),
},
},
Spec: v1.ServiceSpec{
Expand Down