diff --git a/docs/api-references/docs.md b/docs/api-references/docs.md
index d876f72ea62..3d876b85708 100644
--- a/docs/api-references/docs.md
+++ b/docs/api-references/docs.md
@@ -14521,6 +14521,22 @@ string
Defaults to Kubernetes default storage class.
+
+
+gracefulShutdownTimeout
+
+
+Kubernetes meta/v1.Duration
+
+
+ |
+
+(Optional)
+ GracefulShutdownTimeout is the timeout of gracefully shutdown a TiCDC pod.
+Encoded in the format of Go Duration.
+Defaults to 10m
+ |
+
TiCDCStatus
diff --git a/manifests/crd.yaml b/manifests/crd.yaml
index 5182d1d3fb2..5ed02618968 100644
--- a/manifests/crd.yaml
+++ b/manifests/crd.yaml
@@ -21443,6 +21443,8 @@ spec:
type: object
type: object
type: array
+ gracefulShutdownTimeout:
+ type: string
hostNetwork:
type: boolean
image:
diff --git a/manifests/crd/v1/pingcap.com_tidbclusters.yaml b/manifests/crd/v1/pingcap.com_tidbclusters.yaml
index 1c76f0daee3..eb842958286 100644
--- a/manifests/crd/v1/pingcap.com_tidbclusters.yaml
+++ b/manifests/crd/v1/pingcap.com_tidbclusters.yaml
@@ -9134,6 +9134,8 @@ spec:
type: object
type: object
type: array
+ gracefulShutdownTimeout:
+ type: string
hostNetwork:
type: boolean
image:
diff --git a/manifests/crd/v1beta1/pingcap.com_tidbclusters.yaml b/manifests/crd/v1beta1/pingcap.com_tidbclusters.yaml
index da00f2d11f2..1c95047e6fa 100644
--- a/manifests/crd/v1beta1/pingcap.com_tidbclusters.yaml
+++ b/manifests/crd/v1beta1/pingcap.com_tidbclusters.yaml
@@ -9122,6 +9122,8 @@ spec:
type: object
type: object
type: array
+ gracefulShutdownTimeout:
+ type: string
hostNetwork:
type: boolean
image:
diff --git a/manifests/crd_v1beta1.yaml b/manifests/crd_v1beta1.yaml
index 4cc98d753bb..e4dee0205b6 100644
--- a/manifests/crd_v1beta1.yaml
+++ b/manifests/crd_v1beta1.yaml
@@ -21429,6 +21429,8 @@ spec:
type: object
type: object
type: array
+ gracefulShutdownTimeout:
+ type: string
hostNetwork:
type: boolean
image:
diff --git a/pkg/apis/pingcap/v1alpha1/openapi_generated.go b/pkg/apis/pingcap/v1alpha1/openapi_generated.go
index ede8ed7d186..ccb78ed6d1a 100644
--- a/pkg/apis/pingcap/v1alpha1/openapi_generated.go
+++ b/pkg/apis/pingcap/v1alpha1/openapi_generated.go
@@ -7640,12 +7640,18 @@ func schema_pkg_apis_pingcap_v1alpha1_TiCDCSpec(ref common.ReferenceCallback) co
Format: "",
},
},
+ "gracefulShutdownTimeout": {
+ SchemaProps: spec.SchemaProps{
+ Description: "GracefulShutdownTimeout is the timeout of gracefully shutdown a TiCDC pod. Encoded in the format of Go Duration. Defaults to 10m",
+ Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Duration"),
+ },
+ },
},
Required: []string{"replicas"},
},
},
Dependencies: []string{
- "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.CDCConfigWraper", "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.StorageVolume", "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.SuspendAction", "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.TopologySpreadConstraint", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Container", "k8s.io/api/core/v1.EnvFromSource", "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference", "k8s.io/api/core/v1.PodDNSConfig", "k8s.io/api/core/v1.PodSecurityContext", "k8s.io/api/core/v1.Toleration", "k8s.io/api/core/v1.Volume", "k8s.io/api/core/v1.VolumeMount", "k8s.io/apimachinery/pkg/api/resource.Quantity"},
+ "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.CDCConfigWraper", "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.StorageVolume", "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.SuspendAction", "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.TopologySpreadConstraint", "k8s.io/api/core/v1.Affinity", "k8s.io/api/core/v1.Container", "k8s.io/api/core/v1.EnvFromSource", "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference", "k8s.io/api/core/v1.PodDNSConfig", "k8s.io/api/core/v1.PodSecurityContext", "k8s.io/api/core/v1.Toleration", "k8s.io/api/core/v1.Volume", "k8s.io/api/core/v1.VolumeMount", "k8s.io/apimachinery/pkg/api/resource.Quantity", "k8s.io/apimachinery/pkg/apis/meta/v1.Duration"},
}
}
diff --git a/pkg/apis/pingcap/v1alpha1/tidbcluster.go b/pkg/apis/pingcap/v1alpha1/tidbcluster.go
index 1f819963971..66e1871a134 100644
--- a/pkg/apis/pingcap/v1alpha1/tidbcluster.go
+++ b/pkg/apis/pingcap/v1alpha1/tidbcluster.go
@@ -38,6 +38,9 @@ const (
defaultEnablePVReclaim = false
// defaultEvictLeaderTimeout is the timeout limit of evict leader
defaultEvictLeaderTimeout = 1500 * time.Minute
+ // defaultTiCDCGracefulShutdownTimeout is the timeout limit of graceful
+ // shutdown a TiCDC pod.
+ defaultTiCDCGracefulShutdownTimeout = 10 * time.Minute
)
var (
@@ -194,6 +197,14 @@ func (tc *TidbCluster) TiFlashVersion() string {
return "latest"
}
+func (tc *TidbCluster) TiFlashContainerPrivilege() *bool {
+ if tc.Spec.TiFlash == nil || tc.Spec.TiFlash.Privileged == nil {
+ pri := false
+ return &pri
+ }
+ return tc.Spec.TiFlash.Privileged
+}
+
// TiCDCImage return the image used by TiCDC.
//
// If TiCDC isn't specified, return empty string.
@@ -219,12 +230,13 @@ func (tc *TidbCluster) TiCDCImage() string {
return image
}
-func (tc *TidbCluster) TiFlashContainerPrivilege() *bool {
- if tc.Spec.TiFlash == nil || tc.Spec.TiFlash.Privileged == nil {
- pri := false
- return &pri
+// TiCDCGracefulShutdownTimeout returns the timeout of gracefully shutdown
+// a TiCDC pod.
+func (tc *TidbCluster) TiCDCGracefulShutdownTimeout() time.Duration {
+ if tc.Spec.TiCDC != nil && tc.Spec.TiCDC.GracefulShutdownTimeout != nil {
+ return tc.Spec.TiCDC.GracefulShutdownTimeout.Duration
}
- return tc.Spec.TiFlash.Privileged
+ return defaultTiCDCGracefulShutdownTimeout
}
// TiDBImage return the image used by TiDB.
diff --git a/pkg/apis/pingcap/v1alpha1/tidbcluster_test.go b/pkg/apis/pingcap/v1alpha1/tidbcluster_test.go
index ccd8cb79eb6..2aceecb9715 100644
--- a/pkg/apis/pingcap/v1alpha1/tidbcluster_test.go
+++ b/pkg/apis/pingcap/v1alpha1/tidbcluster_test.go
@@ -15,6 +15,7 @@ package v1alpha1
import (
"testing"
+ "time"
. "github.com/onsi/gomega"
apps "k8s.io/api/apps/v1"
@@ -672,6 +673,19 @@ func TestPDVersion(t *testing.T) {
}
}
+func TestTiCDCGracefulShutdownTimeout(t *testing.T) {
+ g := NewGomegaWithT(t)
+
+ tc := newTidbCluster()
+ g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(defaultTiCDCGracefulShutdownTimeout))
+
+ tc.Spec.TiCDC = &TiCDCSpec{GracefulShutdownTimeout: nil}
+ g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(defaultTiCDCGracefulShutdownTimeout))
+
+ tc.Spec.TiCDC = &TiCDCSpec{GracefulShutdownTimeout: &metav1.Duration{Duration: time.Minute}}
+ g.Expect(tc.TiCDCGracefulShutdownTimeout()).To(Equal(time.Minute))
+}
+
func TestComponentFunc(t *testing.T) {
t.Run("ComponentIsNormal", func(t *testing.T) {
g := NewGomegaWithT(t)
diff --git a/pkg/apis/pingcap/v1alpha1/types.go b/pkg/apis/pingcap/v1alpha1/types.go
index 5c0371efc8c..e7362282400 100644
--- a/pkg/apis/pingcap/v1alpha1/types.go
+++ b/pkg/apis/pingcap/v1alpha1/types.go
@@ -680,6 +680,12 @@ type TiCDCSpec struct {
// Defaults to Kubernetes default storage class.
// +optional
StorageClassName *string `json:"storageClassName,omitempty"`
+
+ // GracefulShutdownTimeout is the timeout of gracefully shutdown a TiCDC pod.
+ // Encoded in the format of Go Duration.
+ // Defaults to 10m
+ // +optional
+ GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutdownTimeout,omitempty"`
}
// TiCDCConfig is the configuration of tidbcdc
diff --git a/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go
index 0173d38008f..48b79a94e57 100644
--- a/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go
@@ -5281,6 +5281,11 @@ func (in *TiCDCSpec) DeepCopyInto(out *TiCDCSpec) {
*out = new(string)
**out = **in
}
+ if in.GracefulShutdownTimeout != nil {
+ in, out := &in.GracefulShutdownTimeout, &out.GracefulShutdownTimeout
+ *out = new(metav1.Duration)
+ **out = **in
+ }
return
}
diff --git a/pkg/manager/member/ticdc_scaler.go b/pkg/manager/member/ticdc_scaler.go
index ce654757f81..9450ff8a6a8 100644
--- a/pkg/manager/member/ticdc_scaler.go
+++ b/pkg/manager/member/ticdc_scaler.go
@@ -162,9 +162,6 @@ func gracefulShutdownTiCDC(
return nil
}
-// TODO: support configurable graceful shutdown timeout.
-var ticdcGracefulShutdownTimeout time.Duration = 10 * time.Second
-
func checkTiCDCGracefulShutdownTimeout(
tc *v1alpha1.TidbCluster,
podCtl controller.PodControlInterface,
@@ -186,7 +183,7 @@ func checkTiCDCGracefulShutdownTimeout(
return true, nil
}
- gracefulShutdownTimeout := ticdcGracefulShutdownTimeout
+ gracefulShutdownTimeout := tc.TiCDCGracefulShutdownTimeout()
if time.Now().After(beginTime.Add(gracefulShutdownTimeout)) {
klog.Infof("ticdc.%s: graceful shutdown timeout (threshold: %v) for Pod %s in cluster %s/%s",
action, gracefulShutdownTimeout, podName, ns, tc.GetName())
diff --git a/pkg/manager/member/ticdc_scaler_test.go b/pkg/manager/member/ticdc_scaler_test.go
index 7841a5c9b35..3cc32ac9f95 100644
--- a/pkg/manager/member/ticdc_scaler_test.go
+++ b/pkg/manager/member/ticdc_scaler_test.go
@@ -350,6 +350,7 @@ func TestTiCDCGracefulShutdown(t *testing.T) {
tc := newTidbClusterForPD()
tc.Spec.TiCDC = &v1alpha1.TiCDCSpec{}
+ ticdcGracefulShutdownTimeout := tc.TiCDCGracefulShutdownTimeout()
newPod := func() *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"},
diff --git a/pkg/manager/member/ticdc_upgrader.go b/pkg/manager/member/ticdc_upgrader.go
index 8b6a53e802c..646b8a25561 100644
--- a/pkg/manager/member/ticdc_upgrader.go
+++ b/pkg/manager/member/ticdc_upgrader.go
@@ -87,9 +87,9 @@ func (u *ticdcUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS
mngerutils.SetUpgradePartition(newSet, *oldSet.Spec.UpdateStrategy.RollingUpdate.Partition)
podOrdinals := helper.GetPodOrdinals(*oldSet.Spec.Replicas, oldSet).List()
- for _i := len(podOrdinals) - 1; _i >= 0; _i-- {
- i := podOrdinals[_i]
- podName := ticdcPodName(tcName, i)
+ for i := len(podOrdinals) - 1; i >= 0; i-- {
+ ordinal := podOrdinals[i]
+ podName := ticdcPodName(tcName, ordinal)
pod, err := u.deps.PodLister.Pods(ns).Get(podName)
if err != nil {
return fmt.Errorf("ticdcUpgrader.Upgrade: failed to get pod %s for cluster %s/%s, error: %s", podName, ns, tcName, err)
@@ -108,7 +108,14 @@ func (u *ticdcUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS
}
continue
}
- mngerutils.SetUpgradePartition(newSet, i)
+
+ err = gracefulShutdownTiCDC(tc, u.deps.CDCControl, u.deps.PodControl, pod, ordinal, "Upgrade")
+ if err != nil {
+ return err
+ }
+ klog.Infof("ticdcUpgrade.Upgrade: %s has graceful shutdown in cluster %s/%s", podName, tc.GetNamespace(), tc.GetName())
+
+ mngerutils.SetUpgradePartition(newSet, ordinal)
return nil
}
diff --git a/pkg/manager/member/ticdc_upgrader_test.go b/pkg/manager/member/ticdc_upgrader_test.go
index e449b590424..2227adfa630 100644
--- a/pkg/manager/member/ticdc_upgrader_test.go
+++ b/pkg/manager/member/ticdc_upgrader_test.go
@@ -34,14 +34,15 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
g := NewGomegaWithT(t)
type testcase struct {
- name string
- changeFn func(*v1alpha1.TidbCluster)
- invalidPod bool
- changePods func(pods []*corev1.Pod)
- missPod bool
- errorExpect bool
- changeOldSet func(set *apps.StatefulSet)
- expectFn func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet)
+ name string
+ changeFn func(*v1alpha1.TidbCluster)
+ invalidPod bool
+ changePods func(pods []*corev1.Pod)
+ missPod bool
+ errorExpect bool
+ changeOldSet func(set *apps.StatefulSet)
+ changeUpgrader func(u *ticdcUpgrader)
+ expectFn func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet)
}
testFn := func(test *testcase, t *testing.T) {
@@ -61,6 +62,9 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
if test.changePods != nil {
test.changePods(pods)
}
+ if test.changeUpgrader != nil {
+ test.changeUpgrader(upgrader.(*ticdcUpgrader))
+ }
for _, pod := range pods {
podInformer.Informer().GetIndexer().Add(pod)
}
@@ -89,6 +93,22 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(0)))
},
},
+ {
+ name: "graceful upgrade retry",
+ errorExpect: true,
+ changeUpgrader: func(u *ticdcUpgrader) {
+ u.deps.CDCControl = &cdcCtlMock{
+ // resignOwner returns false to let graceful shutdown retry.
+ resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) {
+ return false, nil
+ },
+ }
+ },
+ expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
+ g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.UpgradePhase))
+ g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
+ },
+ },
{
name: "normal with pod notReady",
changePods: func(pods []*corev1.Pod) {