Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc: support graceful upgrade TiCDC pods #4647

Merged
merged 10 commits into from
Aug 3, 2022
16 changes: 16 additions & 0 deletions docs/api-references/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14521,6 +14521,22 @@ string
Defaults to Kubernetes default storage class.</p>
</td>
</tr>
<tr>
<td>
<code>gracefulShutdownTimeout</code></br>
<em>
<a href="https://godoc.org/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>GracefulShutdownTimeout is the timeout of gracefully shutdown a TiCDC pod.
Encoded in the format of Go Duration.
Defaults to 10m</p>
</td>
</tr>
</tbody>
</table>
<h3 id="ticdcstatus">TiCDCStatus</h3>
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21443,6 +21443,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd/v1/pingcap.com_tidbclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9134,6 +9134,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd/v1beta1/pingcap.com_tidbclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9122,6 +9122,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd_v1beta1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21429,6 +21429,8 @@ spec:
type: object
type: object
type: array
gracefulShutdownTimeout:
type: string
hostNetwork:
type: boolean
image:
Expand Down
8 changes: 7 additions & 1 deletion pkg/apis/pingcap/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions pkg/apis/pingcap/v1alpha1/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
defaultEnablePVReclaim = false
// defaultEvictLeaderTimeout is the timeout limit of evict leader
defaultEvictLeaderTimeout = 1500 * time.Minute
// defaultTiCDCGracefulShutdownTimeout is the timeout limit of graceful
// shutdown a TiCDC pod.
defaultTiCDCGracefulShutdownTimeout = 10 * time.Minute
)

var (
Expand Down Expand Up @@ -194,6 +197,14 @@ func (tc *TidbCluster) TiFlashVersion() string {
return "latest"
}

func (tc *TidbCluster) TiFlashContainerPrivilege() *bool {
if tc.Spec.TiFlash == nil || tc.Spec.TiFlash.Privileged == nil {
pri := false
return &pri
}
return tc.Spec.TiFlash.Privileged
}

// TiCDCImage return the image used by TiCDC.
//
// If TiCDC isn't specified, return empty string.
Expand All @@ -219,12 +230,13 @@ func (tc *TidbCluster) TiCDCImage() string {
return image
}

func (tc *TidbCluster) TiFlashContainerPrivilege() *bool {
if tc.Spec.TiFlash == nil || tc.Spec.TiFlash.Privileged == nil {
pri := false
return &pri
// TiCDCGracefulShutdownTimeout returns the timeout of gracefully shutdown
// a TiCDC pod.
func (tc *TidbCluster) TiCDCGracefulShutdownTimeout() time.Duration {
if tc.Spec.TiCDC != nil && tc.Spec.TiCDC.GracefulShutdownTimeout != nil {
return tc.Spec.TiCDC.GracefulShutdownTimeout.Duration
}
return tc.Spec.TiFlash.Privileged
return defaultTiCDCGracefulShutdownTimeout
}

// TiDBImage return the image used by TiDB.
Expand Down
14 changes: 14 additions & 0 deletions pkg/apis/pingcap/v1alpha1/tidbcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package v1alpha1

import (
"testing"
"time"

. "github.com/onsi/gomega"
apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -672,6 +673,19 @@ func TestPDVersion(t *testing.T) {
}
}

func TestTiCDCGracefulShutdownTimeout(t *testing.T) {
g := NewGomegaWithT(t)

tc := newTidbCluster()
g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(defaultTiCDCGracefulShutdownTimeout))

tc.Spec.TiCDC = &TiCDCSpec{GracefulShutdownTimeout: nil}
g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(defaultTiCDCGracefulShutdownTimeout))

tc.Spec.TiCDC = &TiCDCSpec{GracefulShutdownTimeout: &metav1.Duration{Duration: time.Minute}}
g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(time.Minute))
}

func TestComponentFunc(t *testing.T) {
t.Run("ComponentIsNormal", func(t *testing.T) {
g := NewGomegaWithT(t)
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,12 @@ type TiCDCSpec struct {
// Defaults to Kubernetes default storage class.
// +optional
StorageClassName *string `json:"storageClassName,omitempty"`

// GracefulShutdownTimeout is the timeout of gracefully shutdown a TiCDC pod.
// Encoded in the format of Go Duration.
// Defaults to 10m
// +optional
GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutdownTimeout,omitempty"`
}

// TiCDCConfig is the configuration of tidbcdc
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/manager/member/ticdc_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,6 @@ func gracefulShutdownTiCDC(
return nil
}

// TODO: support configurable graceful shutdown timeout.
var ticdcGracefulShutdownTimeout time.Duration = 10 * time.Second

func checkTiCDCGracefulShutdownTimeout(
tc *v1alpha1.TidbCluster,
podCtl controller.PodControlInterface,
Expand All @@ -186,7 +183,7 @@ func checkTiCDCGracefulShutdownTimeout(
return true, nil
}

gracefulShutdownTimeout := ticdcGracefulShutdownTimeout
gracefulShutdownTimeout := tc.TiCDCGracefulShutdownTimeout()
if time.Now().After(beginTime.Add(gracefulShutdownTimeout)) {
klog.Infof("ticdc.%s: graceful shutdown timeout (threshold: %v) for Pod %s in cluster %s/%s",
action, gracefulShutdownTimeout, podName, ns, tc.GetName())
Expand Down
1 change: 1 addition & 0 deletions pkg/manager/member/ticdc_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func TestTiCDCGracefulShutdown(t *testing.T) {

tc := newTidbClusterForPD()
tc.Spec.TiCDC = &v1alpha1.TiCDCSpec{}
ticdcGracefulShutdownTimeout := tc.TiCDCGracefulShutdownTimeout()
newPod := func() *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"},
Expand Down
15 changes: 11 additions & 4 deletions pkg/manager/member/ticdc_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (u *ticdcUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS

mngerutils.SetUpgradePartition(newSet, *oldSet.Spec.UpdateStrategy.RollingUpdate.Partition)
podOrdinals := helper.GetPodOrdinals(*oldSet.Spec.Replicas, oldSet).List()
for _i := len(podOrdinals) - 1; _i >= 0; _i-- {
i := podOrdinals[_i]
podName := ticdcPodName(tcName, i)
for i := len(podOrdinals) - 1; i >= 0; i-- {
ordinal := podOrdinals[i]
podName := ticdcPodName(tcName, ordinal)
pod, err := u.deps.PodLister.Pods(ns).Get(podName)
if err != nil {
return fmt.Errorf("ticdcUpgrader.Upgrade: failed to get pod %s for cluster %s/%s, error: %s", podName, ns, tcName, err)
Expand All @@ -108,7 +108,14 @@ func (u *ticdcUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS
}
continue
}
mngerutils.SetUpgradePartition(newSet, i)

err = gracefulShutdownTiCDC(tc, u.deps.CDCControl, u.deps.PodControl, pod, ordinal, "Upgrade")
if err != nil {
return err
KanShiori marked this conversation as resolved.
Show resolved Hide resolved
}
KanShiori marked this conversation as resolved.
Show resolved Hide resolved
klog.Infof("ticdcUpgrade.Upgrade: %s has graceful shutdown in cluster %s/%s", podName, tc.GetNamespace(), tc.GetName())

mngerutils.SetUpgradePartition(newSet, ordinal)
return nil
}

Expand Down
36 changes: 28 additions & 8 deletions pkg/manager/member/ticdc_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
g := NewGomegaWithT(t)

type testcase struct {
name string
changeFn func(*v1alpha1.TidbCluster)
invalidPod bool
changePods func(pods []*corev1.Pod)
missPod bool
errorExpect bool
changeOldSet func(set *apps.StatefulSet)
expectFn func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet)
name string
changeFn func(*v1alpha1.TidbCluster)
invalidPod bool
changePods func(pods []*corev1.Pod)
missPod bool
errorExpect bool
changeOldSet func(set *apps.StatefulSet)
changeUpgrader func(u *ticdcUpgrader)
expectFn func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet)
}

testFn := func(test *testcase, t *testing.T) {
Expand All @@ -61,6 +62,9 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
if test.changePods != nil {
test.changePods(pods)
}
if test.changeUpgrader != nil {
test.changeUpgrader(upgrader.(*ticdcUpgrader))
}
for _, pod := range pods {
podInformer.Informer().GetIndexer().Add(pod)
}
Expand Down Expand Up @@ -89,6 +93,22 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(0)))
},
},
{
name: "graceful upgrade retry",
errorExpect: true,
changeUpgrader: func(u *ticdcUpgrader) {
u.deps.CDCControl = &cdcCtlMock{
// resignOwner returns false to let graceful shutdown retry.
resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) {
return false, nil
},
}
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.UpgradePhase))
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
{
name: "normal with pod notReady",
changePods: func(pods []*corev1.Pod) {
Expand Down