Skip to content

Commit

Permalink
Merge branch 'release-1.3' into release-1.3-graceful-cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Sep 9, 2022
2 parents 7c13a69 + da0f4ae commit 5d9fa00
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 6 deletions.
29 changes: 27 additions & 2 deletions pkg/manager/member/tidb_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@ package member

import (
"fmt"
"strconv"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"

"github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

const (
// TODO: change to use minReadySeconds in sts spec
// See https://kubernetes.io/blog/2021/08/27/minreadyseconds-statefulsets/
annoKeyTiDBMinReadySeconds = "tidb.pingcap.com/tidb-min-ready-seconds"
)

type tidbUpgrader struct {
deps *controller.Dependencies
}
Expand Down Expand Up @@ -85,6 +94,17 @@ func (u *tidbUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
return nil
}

minReadySeconds := 0
s, ok := tc.Annotations[annoKeyTiDBMinReadySeconds]
if ok {
i, err := strconv.Atoi(s)
if err != nil {
klog.Warningf("tidbcluster: [%s/%s] annotation %s should be an integer: %v", ns, tcName, annoKeyTiDBMinReadySeconds, err)
} else {
minReadySeconds = i
}
}

mngerutils.SetUpgradePartition(newSet, *oldSet.Spec.UpdateStrategy.RollingUpdate.Partition)
podOrdinals := helper.GetPodOrdinals(*oldSet.Spec.Replicas, oldSet).List()
for _i := len(podOrdinals) - 1; _i >= 0; _i-- {
Expand All @@ -100,8 +120,13 @@ func (u *tidbUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
}

if revision == tc.Status.TiDB.StatefulSet.UpdateRevision {
if !podutil.IsPodReady(pod) {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tidb pod: [%s] is not ready", ns, tcName, podName)
if !podutil.IsPodAvailable(pod, int32(minReadySeconds), metav1.Now()) {
readyCond := podutil.GetPodReadyCondition(pod.Status)
if readyCond == nil || readyCond.Status != corev1.ConditionTrue {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tidb pod: [%s] is not ready", ns, tcName, podName)

}
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tidb pod: [%s] is not available, last transition time is %v", ns, tcName, podName, readyCond.LastTransitionTime)
}
if member, exist := tc.Status.TiDB.Members[podName]; !exist || !member.Health {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s tidb upgraded pod: [%s] is not ready", ns, tcName, podName)
Expand Down
21 changes: 21 additions & 0 deletions pkg/manager/member/tidb_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,27 @@ func TestTiDBUpgrader_Upgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
{
name: "upgraded pod is ready but not available",
changePods: func(pods []*corev1.Pod) {
pods[1].Status.Conditions[0].LastTransitionTime = metav1.Now()
},
changeFn: func(tc *v1alpha1.TidbCluster) {
if tc.Annotations == nil {
tc.Annotations = map[string]string{}
}
// 5min is enough for unit test
tc.Annotations[annoKeyTiDBMinReadySeconds] = "300"
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
},
getLastAppliedConfigErr: false,
errorExpect: true,
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiDB.Phase).To(Equal(v1alpha1.UpgradePhase))
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
}

for _, test := range tests {
Expand Down
29 changes: 27 additions & 2 deletions pkg/manager/member/tiflash_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package member

import (
"fmt"
"strconv"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
Expand All @@ -24,10 +25,18 @@ import (

"github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

const (
// TODO: change to use minReadySeconds in sts spec
// See https://kubernetes.io/blog/2021/08/27/minreadyseconds-statefulsets/
annoKeyTiFlashMinReadySeconds = "tidb.pingcap.com/tiflash-min-ready-seconds"
)

var (
// the first version that tiflash support `tiflash/store-status` api.
// https://github.com/pingcap/tidb-operator/issues/4159
Expand Down Expand Up @@ -85,6 +94,17 @@ func (u *tiflashUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Statefu
return nil
}

minReadySeconds := 0
s, ok := tc.Annotations[annoKeyTiFlashMinReadySeconds]
if ok {
i, err := strconv.Atoi(s)
if err != nil {
klog.Warningf("tidbcluster: [%s/%s] annotation %s should be an integer: %v", ns, tcName, annoKeyTiFlashMinReadySeconds, err)
} else {
minReadySeconds = i
}
}

mngerutils.SetUpgradePartition(newSet, *oldSet.Spec.UpdateStrategy.RollingUpdate.Partition)
podOrdinals := helper.GetPodOrdinals(*oldSet.Spec.Replicas, oldSet).List()
for _i := len(podOrdinals) - 1; _i >= 0; _i-- {
Expand All @@ -105,8 +125,13 @@ func (u *tiflashUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Statefu
}

if revision == tc.Status.TiFlash.StatefulSet.UpdateRevision {
if !podutil.IsPodReady(pod) {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded TiFlash pod: [%s] is not ready", ns, tcName, podName)
if !podutil.IsPodAvailable(pod, int32(minReadySeconds), metav1.Now()) {
readyCond := podutil.GetPodReadyCondition(pod.Status)
if readyCond == nil || readyCond.Status != corev1.ConditionTrue {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tiflash pod: [%s] is not ready", ns, tcName, podName)

}
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded TiFlash pod: [%s] is not available, last transition time is %v", ns, tcName, podName, readyCond.LastTransitionTime)
}
if store.State != v1alpha1.TiKVStateUp {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded TiFlash pod: [%s], store state is not UP", ns, tcName, podName)
Expand Down
36 changes: 36 additions & 0 deletions pkg/manager/member/tiflash_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,42 @@ func TestTiFlashUpgraderUpgrade(t *testing.T) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},

{
name: "tiflash version latest and tiflash is running",
changeFn: func(tc *v1alpha1.TidbCluster, tiflashControl *tiflashapi.FakeTiFlashControl) {
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Synced = true
version := "latest"
tc.Spec.TiFlash.BaseImage = "base-image"
tc.Spec.TiFlash.Version = &version

if tc.Annotations == nil {
tc.Annotations = map[string]string{}
}
tc.Annotations[annoKeyTiFlashMinReadySeconds] = "300"

fakeClient := NewFakeTiKVClient(tiflashControl, tc, "upgrader-tiflash-2")
fakeClient.AddReaction(tiflashapi.GetStoreStatusActionType, func(action *tiflashapi.Action) (interface{}, error) {
return tiflashapi.Running, nil
})
},
changeOldSet: func(oldSet *apps.StatefulSet) { // tigger upgrade
mngerutils.SetStatefulSetLastAppliedConfigAnnotation(oldSet)
oldSet.Spec.Template.Spec.Containers[0].Image = "old-image"
},
changePods: func(pods []*corev1.Pod, tc *v1alpha1.TidbCluster, old, new *apps.StatefulSet) {
pods[2].Status.Conditions[0].LastTransitionTime = metav1.Now()
},
updatePodErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, pods map[string]*corev1.Pod) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(2)))
},
},
}

for _, test := range tests {
Expand Down
25 changes: 23 additions & 2 deletions pkg/manager/member/tikv_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
Expand All @@ -34,6 +35,10 @@ import (
const (
// EvictLeaderBeginTime is the key of evict Leader begin time
EvictLeaderBeginTime = "evictLeaderBeginTime"

// TODO: change to use minReadySeconds in sts spec
// See https://kubernetes.io/blog/2021/08/27/minreadyseconds-statefulsets/
annoKeyTiKVMinReadySeconds = "tidb.pingcap.com/tikv-min-ready-seconds"
)

type TiKVUpgrader interface {
Expand Down Expand Up @@ -106,6 +111,17 @@ func (u *tikvUpgrader) Upgrade(meta metav1.Object, oldSet *apps.StatefulSet, new
return nil
}

minReadySeconds := 0
s, ok := tc.Annotations[annoKeyTiKVMinReadySeconds]
if ok {
i, err := strconv.Atoi(s)
if err != nil {
klog.Warningf("tidbcluster: [%s/%s] annotation %s should be an integer: %v", ns, tcName, annoKeyTiKVMinReadySeconds, err)
} else {
minReadySeconds = i
}
}

mngerutils.SetUpgradePartition(newSet, *oldSet.Spec.UpdateStrategy.RollingUpdate.Partition)
podOrdinals := helper.GetPodOrdinals(*oldSet.Spec.Replicas, oldSet).List()
for _i := len(podOrdinals) - 1; _i >= 0; _i-- {
Expand All @@ -127,8 +143,13 @@ func (u *tikvUpgrader) Upgrade(meta metav1.Object, oldSet *apps.StatefulSet, new

if revision == status.StatefulSet.UpdateRevision {

if !podutil.IsPodReady(pod) {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tikv pod: [%s] is not ready", ns, tcName, podName)
if !podutil.IsPodAvailable(pod, int32(minReadySeconds), metav1.Now()) {
readyCond := podutil.GetPodReadyCondition(pod.Status)
if readyCond == nil || readyCond.Status != corev1.ConditionTrue {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tikv pod: [%s] is not ready", ns, tcName, podName)

}
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tikv pod: [%s] is not available, last transition time is %v", ns, tcName, podName, readyCond.LastTransitionTime)
}
if store.State != v1alpha1.TiKVStateUp {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded tikv pod: [%s] is not all ready", ns, tcName, podName)
Expand Down
37 changes: 37 additions & 0 deletions pkg/manager/member/tikv_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,43 @@ func TestTiKVUpgraderUpgrade(t *testing.T) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(2)))
},
},
{
name: "pod is ready but not available",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Phase = v1alpha1.UpgradePhase
tc.Status.TiKV.Synced = true
tc.Status.TiKV.StatefulSet.CurrentReplicas = 2
tc.Status.TiKV.StatefulSet.UpdatedReplicas = 1
if tc.Annotations == nil {
tc.Annotations = map[string]string{}
}
tc.Annotations[annoKeyTiKVMinReadySeconds] = "300"
},
changeOldSet: func(oldSet *apps.StatefulSet) {
mngerutils.SetStatefulSetLastAppliedConfigAnnotation(oldSet)
oldSet.Status.CurrentReplicas = 2
oldSet.Status.UpdatedReplicas = 1
oldSet.Spec.UpdateStrategy.RollingUpdate.Partition = pointer.Int32Ptr(2)
},
changePods: func(pods []*corev1.Pod) {
for _, pod := range pods {
if pod.GetName() == TikvPodName(upgradeTcName, 1) {
pod.Annotations = map[string]string{EvictLeaderBeginTime: time.Now().Format(time.RFC3339)}
pod.Status.Conditions[0].LastTransitionTime = metav1.Now()
}
}
},
beginEvictLeaderErr: false,
endEvictLeaderErr: false,
updatePodErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).To(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, pods map[string]*corev1.Pod) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(2)))
},
},
}

for _, test := range tests {
Expand Down

0 comments on commit 5d9fa00

Please sign in to comment.