Skip to content

Commit

Permalink
upgrade: refactor to a semi state-machine
Browse files Browse the repository at this point in the history
  • Loading branch information
rkojedzinszky committed Mar 16, 2024
1 parent 9b434dd commit 6b6b0aa
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 110 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/container-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ jobs:
GOARCH=arm go build -ldflags -s -o patroni-postgres-operator.arm .
GOARCH=arm64 go build -ldflags -s -o patroni-postgres-operator.arm64 .
GOARCH=amd64 go build -ldflags -s -o helper.amd64 ./cmd/helper/
GOARCH=arm go build -ldflags -s -o helper.arm ./cmd/helper/
GOARCH=arm64 go build -ldflags -s -o helper.arm64 ./cmd/helper/
GOARCH=amd64 go build -ldflags -s -o upgrade.amd64 ./cmd/upgrade/
GOARCH=arm go build -ldflags -s -o upgrade.arm ./cmd/upgrade/
GOARCH=arm64 go build -ldflags -s -o upgrade.arm64 ./cmd/upgrade/
- name: Setup qemu
uses: docker/setup-qemu-action@v2
Expand Down
2 changes: 1 addition & 1 deletion controllers/patronipostgres_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *PatroniPostgresReconciler) Reconcile(ctx context.Context, req ctrl.Requ

// handle upgrade
if instance.Status.UpgradeVersion != 0 {
return upgrade.Do(wctx, instance)
return upgrade.Handle(wctx, instance)
}

if instance.Status.State == v1alpha1.PatroniPostgresStateReady {
Expand Down
16 changes: 12 additions & 4 deletions private/controllers/statefulset/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ var (

// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;delete

func Reconcile(ctx context.Context, p *v1alpha1.PatroniPostgres) (err error) {
var sts *appsv1.StatefulSet
func ReconcileSts(ctx context.Context, p *v1alpha1.PatroniPostgres) (sts *appsv1.StatefulSet, err error) {
var create bool

sts, err = GetK8SStatefulSet(ctx, p)
Expand All @@ -105,7 +104,7 @@ func Reconcile(ctx context.Context, p *v1alpha1.PatroniPostgres) (err error) {
podLabels := ctx.CommonLabels()
labelsBytes, err := json.Marshal(podLabels)
if err != nil {
return err
return
}
labelsString := string(labelsBytes)

Expand Down Expand Up @@ -309,13 +308,22 @@ func Reconcile(ctx context.Context, p *v1alpha1.PatroniPostgres) (err error) {

p.Status.Ready = sts.Status.ReadyReplicas

return
}

func Reconcile(ctx context.Context, p *v1alpha1.PatroniPostgres) (err error) {
sts, err := ReconcileSts(ctx, p)
if err != nil {
return
}

if int(sts.Status.ReadyReplicas) == len(p.Spec.Nodes) {
p.Status.State = v1alpha1.PatroniPostgresStateReady
} else {
p.Status.State = v1alpha1.PatroniPostgresStateScaling
}

return err
return
}

func GetK8SStatefulSet(ctx context.Context, p *v1alpha1.PatroniPostgres) (sts *appsv1.StatefulSet, err error) {
Expand Down
9 changes: 6 additions & 3 deletions private/upgrade/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,20 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/k-web-s/patroni-postgres-operator/api/v1alpha1"
pcontext "github.com/k-web-s/patroni-postgres-operator/private/context"
"github.com/k-web-s/patroni-postgres-operator/private/controllers/secret"
"github.com/k-web-s/patroni-postgres-operator/private/controllers/statefulset"
)

const (
upgradeModeEnvVar = "MODE"
)

// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete

func createUpgradeJob(ctx pcontext.Context, p *v1alpha1.PatroniPostgres, mode string) (ret ctrl.Result, err error) {
func createUpgradeJob(ctx pcontext.Context, p *v1alpha1.PatroniPostgres, mode string) (err error) {
var activeDeadlineSeconds int64 = 60
var completions int32 = 1

Expand Down Expand Up @@ -85,7 +88,7 @@ func createUpgradeJob(ctx pcontext.Context, p *v1alpha1.PatroniPostgres, mode st
Value: "postgres",
},
{
Name: helperModeEnvVar,
Name: upgradeModeEnvVar,
Value: mode,
},
},
Expand Down
42 changes: 34 additions & 8 deletions private/upgrade/postupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,37 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package upgrade

import (
"fmt"

"github.com/k-web-s/patroni-postgres-operator/api/v1alpha1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

pcontext "github.com/k-web-s/patroni-postgres-operator/private/context"
"github.com/k-web-s/patroni-postgres-operator/private/controllers/statefulset"
"github.com/k-web-s/patroni-postgres-operator/private/upgrade/postupgrade"
)

func handlePostupgrade(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (ret ctrl.Result, err error) {
var (
errPostupgradeJobFailed = fmt.Errorf("preupgrade job failed")
)

type postupgradeHandler struct {
}

func (postupgradeHandler) name() v1alpha1.PatroniPostgresState {
return v1alpha1.PatroniPostgresStateUpgradePostupgrade
}

func (postupgradeHandler) handle(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (done bool, err error) {
// Ensure cluster is up & running
if _, err = statefulset.ReconcileSts(ctx, p); err != nil {
return
}

job := &batchv1.Job{}
jobname := postupgradeJobname(p)

Expand All @@ -46,19 +66,25 @@ func handlePostupgrade(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (ret c
return
}

ret, err = createUpgradeJob(ctx, p, postupgrade.ModeString)
err = createUpgradeJob(ctx, p, postupgrade.ModeString)

return
}

if job.Status.Succeeded > 0 {
done = true
}

if job.Status.Succeeded+job.Status.Failed > 0 {
if job.Status.Succeeded > 0 {
p.Status.State = v1alpha1.PatroniPostgresStateReady
p.Status.UpgradeVersion = 0
deletePropagationPolicy := metav1.DeletePropagationForeground

if err = ctx.Delete(ctx, job, &client.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}); err != nil {
return
}
}

err = ctx.Delete(ctx, job)
ret.Requeue = true
if job.Status.Failed > 0 {
err = errPostupgradeJobFailed
}

return
Expand Down
40 changes: 30 additions & 10 deletions private/upgrade/preupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,36 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package upgrade

import (
"github.com/k-web-s/patroni-postgres-operator/api/v1alpha1"
"fmt"

batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/k-web-s/patroni-postgres-operator/api/v1alpha1"
pcontext "github.com/k-web-s/patroni-postgres-operator/private/context"
"github.com/k-web-s/patroni-postgres-operator/private/controllers/statefulset"
"github.com/k-web-s/patroni-postgres-operator/private/upgrade/preupgrade"
)

func checkPreupgradeJob(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (ready bool, ret ctrl.Result, err error) {
var (
errPreupgradeJobFailed = fmt.Errorf("preupgrade job failed")
)

type preupgradeHandler struct {
}

func (preupgradeHandler) name() v1alpha1.PatroniPostgresState {
return v1alpha1.PatroniPostgresStateUpgradePreupgrade
}

func (preupgradeHandler) handle(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (done bool, err error) {
// Ensure cluster is up & running
if _, err = statefulset.ReconcileSts(ctx, p); err != nil {
return
}

// Create/handle preupgrade job
job := &batchv1.Job{}
jobname := preupgradeJobname(p)

Expand All @@ -46,18 +65,19 @@ func checkPreupgradeJob(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (read
return
}

ret, err = createUpgradeJob(ctx, p, preupgrade.ModeString)
err = createUpgradeJob(ctx, p, preupgrade.ModeString)

return
}

if job.Status.Succeeded+job.Status.Failed > 0 {
if job.Status.Succeeded > 0 {
ready = true
} else {
err = ctx.Delete(ctx, job)
ret.Requeue = true
if job.Status.Succeeded > 0 {
done = true
} else if job.Status.Failed > 0 {
if err = ctx.Delete(ctx, job); err != nil {
return
}

err = errPreupgradeJobFailed
}

return
Expand Down
70 changes: 46 additions & 24 deletions private/upgrade/primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/k-web-s/patroni-postgres-operator/api/v1alpha1"
Expand All @@ -52,17 +51,20 @@ import (
var (
//go:embed upgrade-scripts/primary-upgrade
primaryUpgrade string

errPrimaryUpgradeJobFailed = fmt.Errorf("primary upgrade job failed")
)

// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete
type primaryUpgradeHandler struct {
}

func upgradePrimary(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (ret ctrl.Result, err error) {
// preprocessJob must exist at this point
preprocessJob := &batchv1.Job{}
if err = ctx.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: preupgradeJobname(p)}, preprocessJob); err != nil {
return
}
func (primaryUpgradeHandler) name() v1alpha1.PatroniPostgresState {
return v1alpha1.PatroniPostgresStateUpgradePrimary
}

// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete

func (primaryUpgradeHandler) handle(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (done bool, err error) {
job := &batchv1.Job{}
jobname := fmt.Sprintf("%s-up", p.Name)

Expand All @@ -72,6 +74,12 @@ func upgradePrimary(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (ret ctrl
return
}

// preprocessJob must exist at this point
preprocessJob := &batchv1.Job{}
if err = ctx.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: preupgradeJobname(p)}, preprocessJob); err != nil {
return
}

var initdbArgs string
if initdbArgs, err = getInitdbArgsFromJob(ctx, preprocessJob); err != nil {
return
Expand Down Expand Up @@ -162,31 +170,45 @@ func upgradePrimary(ctx pcontext.Context, p *v1alpha1.PatroniPostgres) (ret ctrl
return
}

if job.Status.Succeeded+job.Status.Failed > 0 {
if job.Status.Succeeded > 0 {
var dbid string
if dbid, err = getDBIDFromJob(ctx, job); err != nil {
return
}

if err = configmap.SetDBId(ctx, p, dbid); err != nil {
return
}
// handle success
if job.Status.Succeeded > 0 {
var dbid string
if dbid, err = getDBIDFromJob(ctx, job); err != nil {
return
}

p.Status.Version = p.Status.UpgradeVersion
p.Status.State = v1alpha1.PatroniPostgresStateUpgradeSecondaries
if err = configmap.SetDBId(ctx, p, dbid); err != nil {
return
}

propagationPolicy := metav1.DeletePropagationBackground
if err = ctx.Delete(ctx, preprocessJob, &client.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
// delete preprocessJob if still exists
preprocessJob := &batchv1.Job{}
if err = ctx.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: preupgradeJobname(p)}, preprocessJob); err == nil {
propagationPolicy := metav1.DeletePropagationBackground
if err = ctx.Delete(ctx, preprocessJob, &client.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
return
}
} else if !errors.IsNotFound(err) {
return
}

if err = ctx.Delete(ctx, job, &client.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
p.Status.Version = p.Status.UpgradeVersion

done = true
}

// cleanup job in case of success/fail
if job.Status.Succeeded+job.Status.Failed > 0 {
deletePropagationPolicy := metav1.DeletePropagationBackground

if err = ctx.Delete(ctx, job, &client.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}); err != nil {
return
}
}

ret.Requeue = true
// handle failed case
if job.Status.Failed > 0 {
err = errPrimaryUpgradeJobFailed
}

return
Expand Down
Loading

0 comments on commit 6b6b0aa

Please sign in to comment.