From 2d6ed3652c1184245be1e172b8d921a824abbdb2 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 11 Jul 2022 17:52:30 +0800 Subject: [PATCH 1/7] Support graceful scale in TiCDC cluster Signed-off-by: Neil Shen --- pkg/apis/pingcap/v1alpha1/tidbcluster.go | 37 ++- pkg/controller/ticdc_control.go | 195 ++++++++++- pkg/controller/ticdc_control_test.go | 311 ++++++++++++++++++ .../tidb_cluster_condition_updater.go | 3 + .../tidb_cluster_condition_updater_test.go | 68 ++++ .../tidbcluster/tidb_cluster_control.go | 2 + pkg/manager/member/ticdc_member_manager.go | 20 +- pkg/manager/member/ticdc_scaler.go | 37 ++- pkg/util/tidbcluster/tidbcluster.go | 2 + 9 files changed, 658 insertions(+), 17 deletions(-) create mode 100644 pkg/controller/ticdc_control_test.go diff --git a/pkg/apis/pingcap/v1alpha1/tidbcluster.go b/pkg/apis/pingcap/v1alpha1/tidbcluster.go index 3891c051e90..200e4ddb44c 100644 --- a/pkg/apis/pingcap/v1alpha1/tidbcluster.go +++ b/pkg/apis/pingcap/v1alpha1/tidbcluster.go @@ -552,14 +552,6 @@ func (tc *TidbCluster) TiFlashStsDesiredReplicas() int32 { return tc.Spec.TiFlash.Replicas + int32(len(tc.Status.TiFlash.FailureStores)) } -func (tc *TidbCluster) TiCDCDeployDesiredReplicas() int32 { - if tc.Spec.TiCDC == nil { - return 0 - } - - return tc.Spec.TiCDC.Replicas -} - func (tc *TidbCluster) TiFlashStsActualReplicas() int32 { stsStatus := tc.Status.TiFlash.StatefulSet if stsStatus == nil { @@ -579,6 +571,35 @@ func (tc *TidbCluster) TiFlashStsDesiredOrdinals(excludeFailover bool) sets.Int3 return GetPodOrdinalsFromReplicasAndDeleteSlots(replicas, tc.getDeleteSlots(label.TiFlashLabelVal)) } +// TiCDCAllCapturesReady return whether all captures of TiCDC are ready. +// +// If TiCDC isn't specified, return false. +func (tc *TidbCluster) TiCDCAllCapturesReady() bool { + if tc.Spec.TiCDC == nil { + return false + } + + if int(tc.TiCDCDeployDesiredReplicas()) != len(tc.Status.TiCDC.Captures) { + return false + } + + for _, c := range tc.Status.TiCDC.Captures { + if !c.Ready { + return false + } + } + + return true +} + +func (tc *TidbCluster) TiCDCDeployDesiredReplicas() int32 { + if tc.Spec.TiCDC == nil { + return 0 + } + + return tc.Spec.TiCDC.Replicas +} + // TiDBAllPodsStarted return whether all pods of TiDB are started. // // If TiDB isn't specified, return false. diff --git a/pkg/controller/ticdc_control.go b/pkg/controller/ticdc_control.go index 7c05f5d4c88..6ec6a8d0f88 100644 --- a/pkg/controller/ticdc_control.go +++ b/pkg/controller/ticdc_control.go @@ -14,11 +14,17 @@ package controller import ( + "bytes" "encoding/json" "fmt" + "io/ioutil" + "net/http" + "strings" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + httputil "github.com/pingcap/tidb-operator/pkg/util/http" corelisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" ) type CaptureStatus struct { @@ -27,10 +33,35 @@ type CaptureStatus struct { IsOwner bool `json:"is_owner"` } +type captureInfo struct { + ID string `json:"id"` + IsOwner bool `json:"is_owner"` + AdvertiseAddr string `json:"address"` +} + +// drainCaptureRequest is request for manual `DrainCapture` +type drainCaptureRequest struct { + CaptureID string `json:"capture_id"` +} + +// drainCaptureResp is response for manual `DrainCapture` +type drainCaptureResp struct { + CurrentTableCount int `json:"current_table_count"` +} + // TiCDCControlInterface is the interface that knows how to manage ticdc captures type TiCDCControlInterface interface { // GetStatus returns ticdc's status GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) (*CaptureStatus, error) + // DrainCapture remove capture ownership and moves its tables to other captures. + // Returns the number of tables in the capture. + // If there is only one capture, it always return 0. + DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, err error) + // ResignOwner tries to resign ownership from the current capture. + // Returns true if the capture has already resigned ownership, + // otherwise caller should retry resign owner. + // If there is only one capture, it always return true. + ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) } // defaultTiCDCControl is default implementation of TiCDCControlInterface. @@ -63,17 +94,169 @@ func (c *defaultTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) return &status, err } +func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, error) { + httpClient, err := c.getHTTPClient(tc) + if err != nil { + klog.Warningf("drain capture is failed, error: %s", err) + return 0, err + } + + baseURL := c.getBaseURL(tc, ordinal) + + captures, err := getCaptures(httpClient, baseURL) + if err != nil { + klog.Warningf("drain capture is failed, error: %s", err) + return 0, err + } + if len(captures) == 1 { + // No way to drain a single node TiCDC cluster, ignore. + return 0, nil + } + + this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures) + if this == nil { + addr := getCaptureAdvertiseAddressPrefix(tc, ordinal) + return 0, fmt.Errorf("capture not found, address: %s %+v", addr, captures) + } + if owner == nil { + return 0, fmt.Errorf("owner not found") + } + + payload := drainCaptureRequest{ + CaptureID: this.ID, + } + payloadBody, err := json.Marshal(payload) + if err != nil { + return 0, err + } + req, err := http.NewRequest("PUT", baseURL+"/api/v1/captures/drain", bytes.NewReader(payloadBody)) + if err != nil { + return 0, err + } + res, err := httpClient.Do(req) + if err != nil { + return 0, err + } + defer httputil.DeferClose(res.Body) + if res.StatusCode >= 400 { + // It is likely the TiCDC does not support the API, ignore. + return 0, nil + } + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return 0, err + } + + var resp drainCaptureResp + err = json.Unmarshal(body, &resp) + if err != nil { + // It is likely the TiCDC does not support the API, ignore. + return 0, nil + } + return resp.CurrentTableCount, nil +} + +func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (bool, error) { + httpClient, err := c.getHTTPClient(tc) + if err != nil { + klog.Warningf("resign owner failed, error: %s", err) + return false, err + } + + baseURL := c.getBaseURL(tc, ordinal) + captures, err := getCaptures(httpClient, baseURL) + if err != nil { + klog.Warningf("resign owner failed, error: %s", err) + return false, err + } + if len(captures) == 1 { + // No way to resign owner in a single node TiCDC cluster, ignore. + return true, nil + } + + this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures) + if owner != nil && this != nil { + if owner.ID != this.ID { + // Ownership has been transferred another capture. + return true, nil + } + } else { + // Owner or this capture not found, resign ownership from the capture is + // meaning less, ignore. + return true, nil + } + + res, err := httpClient.Post(baseURL+"/api/v1/owner/resign", "", nil) + if err != nil { + return false, err + } + httputil.DeferClose(res.Body) + if res.StatusCode >= 400 { + // It is likely the TiCDC does not support the API, ignore. + return true, nil + } + return false, nil +} + func (c *defaultTiCDCControl) getBaseURL(tc *v1alpha1.TidbCluster, ordinal int32) string { if c.testURL != "" { return c.testURL } + scheme := tc.Scheme() + addr := getCaptureAdvertiseAddressPrefix(tc, ordinal) + return fmt.Sprintf("%s://%s:8301", scheme, addr) +} + +// getCaptureAdvertiseAddressPrefix is the prefix of TiCDC advertiseAddress +// which is composed by ${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc.${ClusterDomain}:8301 +// this function return a string "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}" +func getCaptureAdvertiseAddressPrefix(tc *v1alpha1.TidbCluster, ordinal int32) string { tcName := tc.GetName() ns := tc.GetNamespace() - scheme := tc.Scheme() hostName := fmt.Sprintf("%s-%d", TiCDCMemberName(tcName), ordinal) - return fmt.Sprintf("%s://%s.%s.%s:8301", scheme, hostName, TiCDCPeerMemberName(tcName), ns) + return fmt.Sprintf("%s.%s.%s", hostName, TiCDCPeerMemberName(tcName), ns) +} + +func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, error) { + res, err := httpClient.Get(baseURL + "/api/v1/captures") + if err != nil { + return nil, err + } + defer httputil.DeferClose(res.Body) + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + if res.StatusCode >= 400 { + // It is likely the TiCDC does not support the API, ignore. + return nil, nil + } + + var resp []captureInfo + err = json.Unmarshal(body, &resp) + if err != nil { + // It is likely the TiCDC does not support the API, ignore. + return nil, nil + } + return resp, nil +} + +func getOrdinalAndOwnerCaptureInfo( + tc *v1alpha1.TidbCluster, ordinal int32, captures []captureInfo, +) (this, owner *captureInfo) { + addrPrefix := getCaptureAdvertiseAddressPrefix(tc, ordinal) + for i := range captures { + cp := &captures[i] + if strings.Contains(cp.AdvertiseAddr, addrPrefix) { + this = cp + } + if cp.IsOwner { + owner = cp + } + } + return } // FakeTiCDCControl is a fake implementation of TiCDCControlInterface. @@ -97,3 +280,11 @@ func (c *FakeTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) (* } return c.getStatus(tc, ordinal) } + +func (c *FakeTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, err error) { + return 0, nil +} + +func (c *FakeTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) { + return true, nil +} diff --git a/pkg/controller/ticdc_control_test.go b/pkg/controller/ticdc_control_test.go new file mode 100644 index 00000000000..df244df05c6 --- /dev/null +++ b/pkg/controller/ticdc_control_test.go @@ -0,0 +1,311 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + . "github.com/onsi/gomega" + "github.com/onsi/gomega/types" +) + +func TestTiCDCControllerResignOwner(t *testing.T) { + g := NewGomegaWithT(t) + + cdc := defaultTiCDCControl{} + tc := getTidbCluster() + + cases := []struct { + caseName string + handlers map[string]func(http.ResponseWriter, *http.Request) + ordinal int32 + expectedOk types.GomegaMatcher + expectedErr types.GomegaMatcher + }{ + { + caseName: "1 captures", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + IsOwner: true, + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + }, + ordinal: 1, + expectedOk: BeTrue(), + expectedErr: BeNil(), + }, + { + caseName: "2 captures, no owner", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + }, + ordinal: 1, + expectedOk: BeTrue(), + expectedErr: BeNil(), + }, + { + caseName: "2 captures, resign owner ok", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + IsOwner: true, + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + "/api/v1/owner/resign": func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusAccepted) + }, + }, + ordinal: 1, + expectedOk: BeFalse(), + expectedErr: BeNil(), + }, + { + caseName: "2 captures, resign owner 404", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + IsOwner: true, + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + "/api/v1/owner/resign": func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + }, + }, + ordinal: 1, + expectedOk: BeTrue(), + expectedErr: BeNil(), + }, + } + + for _, c := range cases { + mux := http.NewServeMux() + svr := httptest.NewServer(mux) + for p, h := range c.handlers { + mux.HandleFunc(p, h) + } + cdc.testURL = svr.URL + ok, err := cdc.ResignOwner(tc, c.ordinal) + g.Expect(ok).Should(c.expectedOk, c.caseName) + g.Expect(err).Should(c.expectedErr, c.caseName) + svr.Close() + } +} + +func TestTiCDCControllerDrainCapture(t *testing.T) { + g := NewGomegaWithT(t) + + cdc := defaultTiCDCControl{} + tc := getTidbCluster() + + cases := []struct { + caseName string + handlers map[string]func(http.ResponseWriter, *http.Request) + ordinal int32 + expectedCount types.GomegaMatcher + expectedErr types.GomegaMatcher + }{ + { + caseName: "1 captures", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + IsOwner: true, + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + }, + ordinal: 1, + expectedCount: BeZero(), + expectedErr: BeNil(), + }, + { + caseName: "2 captures, no self", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + }, + ordinal: 3, + expectedCount: BeZero(), + expectedErr: Not(BeNil()), + }, + { + caseName: "2 captures, no owner", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + }, + ordinal: 1, + expectedCount: BeZero(), + expectedErr: Not(BeNil()), + }, + { + caseName: "2 captures, drain capture ok 0", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + IsOwner: true, + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { + payload, err := json.Marshal(drainCaptureResp{CurrentTableCount: 0}) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + }, + ordinal: 1, + expectedCount: BeZero(), + expectedErr: BeNil(), + }, + { + caseName: "2 captures, drain capture ok 1", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + IsOwner: true, + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { + body, err := io.ReadAll(req.Body) + g.Expect(err).Should(BeNil()) + var reqPayload drainCaptureRequest + err = json.Unmarshal(body, &reqPayload) + g.Expect(err).Should(BeNil()) + g.Expect(reqPayload.CaptureID).Should(Equal("1")) + + payload, err := json.Marshal(drainCaptureResp{CurrentTableCount: 1}) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + }, + ordinal: 1, + expectedCount: Equal(1), + expectedErr: BeNil(), + }, + { + caseName: "2 captures, drain capture 404", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + IsOwner: true, + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + }, + }, + ordinal: 1, + expectedCount: BeZero(), + expectedErr: BeNil(), + }, + } + + for _, c := range cases { + mux := http.NewServeMux() + svr := httptest.NewServer(mux) + for p, h := range c.handlers { + mux.HandleFunc(p, h) + } + cdc.testURL = svr.URL + count, err := cdc.DrainCapture(tc, c.ordinal) + g.Expect(count).Should(c.expectedCount, c.caseName) + g.Expect(err).Should(c.expectedErr, c.caseName) + svr.Close() + } +} diff --git a/pkg/controller/tidbcluster/tidb_cluster_condition_updater.go b/pkg/controller/tidbcluster/tidb_cluster_condition_updater.go index 960bf596ce0..b2acce843a1 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_condition_updater.go +++ b/pkg/controller/tidbcluster/tidb_cluster_condition_updater.go @@ -71,6 +71,9 @@ func (u *tidbClusterConditionUpdater) updateReadyCondition(tc *v1alpha1.TidbClus case tc.Spec.TiFlash != nil && !tc.TiFlashAllStoresReady(): reason = utiltidbcluster.TiFlashStoreNotUp message = "TiFlash store(s) are not up" + case tc.Spec.TiCDC != nil && !tc.TiCDCAllCapturesReady(): + reason = utiltidbcluster.TiCDCCaptureNotReady + message = "TiCDC capture(s) are not up" default: status = v1.ConditionTrue reason = utiltidbcluster.Ready diff --git a/pkg/controller/tidbcluster/tidb_cluster_condition_updater_test.go b/pkg/controller/tidbcluster/tidb_cluster_condition_updater_test.go index 36c6408b0bb..fc809c5c59d 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_condition_updater_test.go +++ b/pkg/controller/tidbcluster/tidb_cluster_condition_updater_test.go @@ -273,6 +273,74 @@ func TestTidbClusterConditionUpdater_Ready(t *testing.T) { wantReason: utiltidbcluster.TiFlashStoreNotUp, wantMessage: "TiFlash store(s) are not up", }, + { + name: "ticdc(s) not ready", + tc: &v1alpha1.TidbCluster{ + Spec: v1alpha1.TidbClusterSpec{ + PD: &v1alpha1.PDSpec{ + Replicas: 1, + }, + TiKV: &v1alpha1.TiKVSpec{ + Replicas: 1, + }, + TiDB: &v1alpha1.TiDBSpec{ + Replicas: 1, + }, + TiCDC: &v1alpha1.TiCDCSpec{ + Replicas: 1, + }, + }, + Status: v1alpha1.TidbClusterStatus{ + PD: v1alpha1.PDStatus{ + Members: map[string]v1alpha1.PDMember{ + "pd-0": { + Health: true, + }, + }, + StatefulSet: &appsv1.StatefulSetStatus{ + CurrentRevision: "2", + UpdateRevision: "2", + }, + }, + TiDB: v1alpha1.TiDBStatus{ + Members: map[string]v1alpha1.TiDBMember{ + "tidb-0": { + Health: true, + }, + }, + StatefulSet: &appsv1.StatefulSetStatus{ + CurrentRevision: "2", + UpdateRevision: "2", + }, + }, + TiKV: v1alpha1.TiKVStatus{ + Stores: map[string]v1alpha1.TiKVStore{ + "tikv-0": { + State: "Up", + }, + }, + StatefulSet: &appsv1.StatefulSetStatus{ + CurrentRevision: "2", + UpdateRevision: "2", + }, + }, + TiCDC: v1alpha1.TiCDCStatus{ + Captures: map[string]v1alpha1.TiCDCCapture{ + "cdc-0": { + Ready: false, + }, + }, + StatefulSet: &appsv1.StatefulSetStatus{ + CurrentRevision: "2", + UpdateRevision: "2", + }, + }, + }, + }, + wantStatus: v1.ConditionFalse, + wantReason: utiltidbcluster.TiCDCCaptureNotReady, + wantMessage: "TiCDC capture(s) are not up", + }, { name: "all ready", tc: &v1alpha1.TidbCluster{ diff --git a/pkg/controller/tidbcluster/tidb_cluster_control.go b/pkg/controller/tidbcluster/tidb_cluster_control.go index 3ef6c4e5f83..394427487f0 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_control.go +++ b/pkg/controller/tidbcluster/tidb_cluster_control.go @@ -219,7 +219,9 @@ func (c *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster) return err } + // works that should be done to make the ticdc cluster current state match the desired state: // - waiting for the pd cluster available(pd cluster is in quorum) + // - waiting for the tikv cluster available(at least one peer works) // - create or update ticdc deployment // - sync ticdc cluster status from pd to TidbCluster object if err := c.ticdcMemberManager.Sync(tc); err != nil { diff --git a/pkg/manager/member/ticdc_member_manager.go b/pkg/manager/member/ticdc_member_manager.go index 4c327fa97ee..765c7a2bb0f 100644 --- a/pkg/manager/member/ticdc_member_manager.go +++ b/pkg/manager/member/ticdc_member_manager.go @@ -127,6 +127,16 @@ func (m *ticdcMemberManager) Sync(tc *v1alpha1.TidbCluster) error { return nil } + ns := tc.GetNamespace() + tcName := tc.GetName() + + if tc.Spec.PD != nil && !tc.PDIsAvailable() { + return controller.RequeueErrorf("TidbCluster: [%s/%s], TiCDC is waiting for PD cluster running", ns, tcName) + } + if tc.Spec.TiKV != nil && !tc.TiKVIsAvailable() { + return controller.RequeueErrorf("TidbCluster: [%s/%s], TiCDC is waiting for TiKV cluster running", ns, tcName) + } + // Sync CDC Headless Service if err := m.syncCDCHeadlessService(tc); err != nil { return err @@ -169,10 +179,6 @@ func (m *ticdcMemberManager) syncStatefulSet(tc *v1alpha1.TidbCluster) error { } if stsNotExist { - if !tc.PDIsAvailable() { - klog.Infof("TidbCluster: %s/%s, waiting for PD cluster running", ns, tcName) - return nil - } err = mngerutils.SetStatefulSetLastAppliedConfigAnnotation(newSts) if err != nil { return err @@ -344,7 +350,11 @@ func getNewTiCDCStatefulSet(tc *v1alpha1.TidbCluster, cm *corev1.ConfigMap) (*ap stsAnnotations := getStsAnnotations(tc.Annotations, label.TiCDCLabelVal) headlessSvcName := controller.TiCDCPeerMemberName(tcName) - cmdArgs := []string{"/cdc server", "--addr=0.0.0.0:8301", fmt.Sprintf("--advertise-addr=${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc%s:8301", controller.FormatClusterDomain(tc.Spec.ClusterDomain))} + // TODO move advertise addr format to package controller. + // TiCDC control relays the format. + advertiseAddr := fmt.Sprintf("${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc%s:8301", + controller.FormatClusterDomain(tc.Spec.ClusterDomain)) + cmdArgs := []string{"/cdc server", "--addr=0.0.0.0:8301", fmt.Sprintf("--advertise-addr=%s", advertiseAddr)} cmdArgs = append(cmdArgs, fmt.Sprintf("--gc-ttl=%d", tc.TiCDCGCTTL())) cmdArgs = append(cmdArgs, fmt.Sprintf("--log-file=%s", tc.TiCDCLogFile())) cmdArgs = append(cmdArgs, fmt.Sprintf("--log-level=%s", tc.TiCDCLogLevel())) diff --git a/pkg/manager/member/ticdc_scaler.go b/pkg/manager/member/ticdc_scaler.go index dbdde7c1cbd..87bf70a91f6 100644 --- a/pkg/manager/member/ticdc_scaler.go +++ b/pkg/manager/member/ticdc_scaler.go @@ -90,14 +90,18 @@ func (s *ticdcScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newS if err != nil { return fmt.Errorf("ticdcScaler.ScaleIn: failed to get pods %s for cluster %s/%s, error: %s", podName, ns, tcName, err) } + tc, _ := meta.(*v1alpha1.TidbCluster) - // when scaling in TiCDC pods, we let the "capture info" in PD's etcd to be deleted automatically when shutting down the TiCDC process or after TTL expired. + err = gracefulShutdownTiCDC(tc, s.deps.CDCControl, ordinal, podName, "ScaleIn") + if err != nil { + return err + } + klog.Infof("ticdc has graceful shutdown, %s in cluster %s/%s", podName, meta.GetNamespace(), meta.GetName()) pvcs, err := util.ResolvePVCFromPod(pod, s.deps.PVCLister) if err != nil && !errors.IsNotFound(err) { return fmt.Errorf("ticdcScaler.ScaleIn: failed to get pvcs for pod %s/%s in tc %s/%s, error: %s", ns, pod.Name, ns, tcName, err) } - tc, _ := meta.(*v1alpha1.TidbCluster) for _, pvc := range pvcs { if err := addDeferDeletingAnnoToPVC(tc, pvc, s.deps.PVCControl); err != nil { return err @@ -107,3 +111,32 @@ func (s *ticdcScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newS setReplicasAndDeleteSlots(newSet, replicas, deleteSlots) return nil } + +func gracefulShutdownTiCDC( + tc *v1alpha1.TidbCluster, cdcCtl controller.TiCDCControlInterface, + ordinal int32, podName, action string, +) error { + // To graceful shutdown a TiCDC pod, we need to + // + // 1. Remove ownership from the capture. + resigned, err := cdcCtl.ResignOwner(tc, ordinal) + if err != nil { + return err + } + if !resigned { + return controller.RequeueErrorf( + "ticdc.%s, cluster %s/%s %s is still the owner, try resign owner again", + action, tc.GetNamespace(), tc.GetName(), podName) + } + // 2. Drain the capture, move out all its tables. + tableCount, err := cdcCtl.DrainCapture(tc, ordinal) + if err != nil { + return err + } + if tableCount != 0 { + return controller.RequeueErrorf( + "ticdc.%s, cluster %s/%s %s still has %d tables, wait draining", + action, tc.GetNamespace(), tc.GetName(), podName, tableCount) + } + return nil +} diff --git a/pkg/util/tidbcluster/tidbcluster.go b/pkg/util/tidbcluster/tidbcluster.go index 0f52138fd47..d08f32c64d5 100644 --- a/pkg/util/tidbcluster/tidbcluster.go +++ b/pkg/util/tidbcluster/tidbcluster.go @@ -34,6 +34,8 @@ const ( TiDBUnhealthy = "TiDBUnhealthy" // TiFlashStoreNotUp is added when one of tiflash stores is not up. TiFlashStoreNotUp = "TiFlashStoreNotUp" + // TiCDCCaptureNotReady is added when one of ticdc capture is not ready. + TiCDCCaptureNotReady = "TiCDCCaptureNotReady" ) // NewTidbClusterCondition creates a new tidbcluster condition. From bd3769593bd8d97bbeed39782db400d36ccfc394 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 12 Jul 2022 17:17:55 +0800 Subject: [PATCH 2/7] Retry TiCDC API when it returns 503 Signed-off-by: Neil Shen --- pkg/controller/ticdc_control.go | 74 ++++++++++++++++--------- pkg/controller/ticdc_control_test.go | 82 +++++++++++++++++++++++++++- pkg/manager/member/ticdc_scaler.go | 7 ++- 3 files changed, 134 insertions(+), 29 deletions(-) diff --git a/pkg/controller/ticdc_control.go b/pkg/controller/ticdc_control.go index 6ec6a8d0f88..937615ad8ff 100644 --- a/pkg/controller/ticdc_control.go +++ b/pkg/controller/ticdc_control.go @@ -56,7 +56,7 @@ type TiCDCControlInterface interface { // DrainCapture remove capture ownership and moves its tables to other captures. // Returns the number of tables in the capture. // If there is only one capture, it always return 0. - DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, err error) + DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) // ResignOwner tries to resign ownership from the current capture. // Returns true if the capture has already resigned ownership, // otherwise caller should retry resign owner. @@ -94,32 +94,36 @@ func (c *defaultTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) return &status, err } -func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, error) { +func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, bool, error) { httpClient, err := c.getHTTPClient(tc) if err != nil { klog.Warningf("drain capture is failed, error: %s", err) - return 0, err + return 0, false, err } baseURL := c.getBaseURL(tc, ordinal) - captures, err := getCaptures(httpClient, baseURL) + captures, retry, err := getCaptures(httpClient, baseURL) if err != nil { klog.Warningf("drain capture is failed, error: %s", err) - return 0, err + return 0, false, err + } + if retry { + // Let caller retry drain capture. + return 0, true, nil } if len(captures) == 1 { // No way to drain a single node TiCDC cluster, ignore. - return 0, nil + return 0, false, nil } this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures) if this == nil { addr := getCaptureAdvertiseAddressPrefix(tc, ordinal) - return 0, fmt.Errorf("capture not found, address: %s %+v", addr, captures) + return 0, false, fmt.Errorf("capture not found, address: %s %+v", addr, captures) } if owner == nil { - return 0, fmt.Errorf("owner not found") + return 0, false, fmt.Errorf("owner not found") } payload := drainCaptureRequest{ @@ -127,33 +131,37 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int } payloadBody, err := json.Marshal(payload) if err != nil { - return 0, err + return 0, false, err } req, err := http.NewRequest("PUT", baseURL+"/api/v1/captures/drain", bytes.NewReader(payloadBody)) if err != nil { - return 0, err + return 0, false, err } res, err := httpClient.Do(req) if err != nil { - return 0, err + return 0, false, err } defer httputil.DeferClose(res.Body) - if res.StatusCode >= 400 { + if res.StatusCode == http.StatusNotFound { // It is likely the TiCDC does not support the API, ignore. - return 0, nil + return 0, false, nil + } + if res.StatusCode == http.StatusServiceUnavailable { + // TiCDC is not ready, retry. + return 0, true, nil } body, err := ioutil.ReadAll(res.Body) if err != nil { - return 0, err + return 0, false, err } var resp drainCaptureResp err = json.Unmarshal(body, &resp) if err != nil { // It is likely the TiCDC does not support the API, ignore. - return 0, nil + return 0, false, nil } - return resp.CurrentTableCount, nil + return resp.CurrentTableCount, false, nil } func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (bool, error) { @@ -164,11 +172,15 @@ func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int3 } baseURL := c.getBaseURL(tc, ordinal) - captures, err := getCaptures(httpClient, baseURL) + captures, retry, err := getCaptures(httpClient, baseURL) if err != nil { klog.Warningf("resign owner failed, error: %s", err) return false, err } + if retry { + // Let caller retry resign owner. + return false, nil + } if len(captures) == 1 { // No way to resign owner in a single node TiCDC cluster, ignore. return true, nil @@ -191,10 +203,14 @@ func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int3 return false, err } httputil.DeferClose(res.Body) - if res.StatusCode >= 400 { + if res.StatusCode == http.StatusNotFound { // It is likely the TiCDC does not support the API, ignore. return true, nil } + if res.StatusCode == http.StatusServiceUnavailable { + // Let caller retry resign owner. + return false, nil + } return false, nil } @@ -219,28 +235,32 @@ func getCaptureAdvertiseAddressPrefix(tc *v1alpha1.TidbCluster, ordinal int32) s return fmt.Sprintf("%s.%s.%s", hostName, TiCDCPeerMemberName(tcName), ns) } -func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, error) { +func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, bool, error) { res, err := httpClient.Get(baseURL + "/api/v1/captures") if err != nil { - return nil, err + return nil, false, err } defer httputil.DeferClose(res.Body) body, err := ioutil.ReadAll(res.Body) if err != nil { - return nil, err + return nil, false, err } - if res.StatusCode >= 400 { + if res.StatusCode == http.StatusNotFound { // It is likely the TiCDC does not support the API, ignore. - return nil, nil + return nil, false, nil + } + if res.StatusCode == http.StatusServiceUnavailable { + // TiCDC is not ready, retry. + return nil, true, nil } var resp []captureInfo err = json.Unmarshal(body, &resp) if err != nil { // It is likely the TiCDC does not support the API, ignore. - return nil, nil + return nil, false, nil } - return resp, nil + return resp, false, nil } func getOrdinalAndOwnerCaptureInfo( @@ -281,8 +301,8 @@ func (c *FakeTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) (* return c.getStatus(tc, ordinal) } -func (c *FakeTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, err error) { - return 0, nil +func (c *FakeTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) { + return 0, false, nil } func (c *FakeTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) { diff --git a/pkg/controller/ticdc_control_test.go b/pkg/controller/ticdc_control_test.go index df244df05c6..0e65358bebc 100644 --- a/pkg/controller/ticdc_control_test.go +++ b/pkg/controller/ticdc_control_test.go @@ -124,6 +124,41 @@ func TestTiCDCControllerResignOwner(t *testing.T) { expectedOk: BeTrue(), expectedErr: BeNil(), }, + { + caseName: "2 captures, resign owner 503", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + IsOwner: true, + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + "/api/v1/owner/resign": func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + }, + ordinal: 1, + expectedOk: BeFalse(), + expectedErr: BeNil(), + }, + { + caseName: "2 captures, get captures 503", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + }, + ordinal: 1, + expectedOk: BeFalse(), + expectedErr: BeNil(), + }, } for _, c := range cases { @@ -152,6 +187,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { ordinal int32 expectedCount types.GomegaMatcher expectedErr types.GomegaMatcher + expectedRetry types.GomegaMatcher }{ { caseName: "1 captures", @@ -170,6 +206,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { ordinal: 1, expectedCount: BeZero(), expectedErr: BeNil(), + expectedRetry: BeFalse(), }, { caseName: "2 captures, no self", @@ -190,6 +227,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { ordinal: 3, expectedCount: BeZero(), expectedErr: Not(BeNil()), + expectedRetry: BeFalse(), }, { caseName: "2 captures, no owner", @@ -210,6 +248,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { ordinal: 1, expectedCount: BeZero(), expectedErr: Not(BeNil()), + expectedRetry: BeFalse(), }, { caseName: "2 captures, drain capture ok 0", @@ -236,6 +275,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { ordinal: 1, expectedCount: BeZero(), expectedErr: BeNil(), + expectedRetry: BeFalse(), }, { caseName: "2 captures, drain capture ok 1", @@ -269,6 +309,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { ordinal: 1, expectedCount: Equal(1), expectedErr: BeNil(), + expectedRetry: BeFalse(), }, { caseName: "2 captures, drain capture 404", @@ -293,6 +334,44 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { ordinal: 1, expectedCount: BeZero(), expectedErr: BeNil(), + expectedRetry: BeFalse(), + }, + { + caseName: "2 captures, drain capture 503", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + cp := []captureInfo{{ + ID: "1", + IsOwner: true, + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 1), + }, { + ID: "2", + AdvertiseAddr: getCaptureAdvertiseAddressPrefix(tc, 2), + }} + payload, err := json.Marshal(cp) + g.Expect(err).Should(BeNil()) + fmt.Fprintf(w, string(payload)) + }, + "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + }, + ordinal: 1, + expectedCount: BeZero(), + expectedErr: BeNil(), + expectedRetry: BeTrue(), + }, + { + caseName: "2 captures, get captures 503", + handlers: map[string]func(http.ResponseWriter, *http.Request){ + "/api/v1/captures": func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + }, + ordinal: 1, + expectedCount: BeZero(), + expectedErr: BeNil(), + expectedRetry: BeTrue(), }, } @@ -303,9 +382,10 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { mux.HandleFunc(p, h) } cdc.testURL = svr.URL - count, err := cdc.DrainCapture(tc, c.ordinal) + count, retry, err := cdc.DrainCapture(tc, c.ordinal) g.Expect(count).Should(c.expectedCount, c.caseName) g.Expect(err).Should(c.expectedErr, c.caseName) + g.Expect(retry).Should(c.expectedRetry, c.caseName) svr.Close() } } diff --git a/pkg/manager/member/ticdc_scaler.go b/pkg/manager/member/ticdc_scaler.go index 87bf70a91f6..f1f756c5c9a 100644 --- a/pkg/manager/member/ticdc_scaler.go +++ b/pkg/manager/member/ticdc_scaler.go @@ -129,10 +129,15 @@ func gracefulShutdownTiCDC( action, tc.GetNamespace(), tc.GetName(), podName) } // 2. Drain the capture, move out all its tables. - tableCount, err := cdcCtl.DrainCapture(tc, ordinal) + tableCount, retry, err := cdcCtl.DrainCapture(tc, ordinal) if err != nil { return err } + if retry { + return controller.RequeueErrorf( + "ticdc.%s, cluster %s/%s %s needs to retry drain capture", + action, tc.GetNamespace(), tc.GetName()) + } if tableCount != 0 { return controller.RequeueErrorf( "ticdc.%s, cluster %s/%s %s still has %d tables, wait draining", From 89c495471f1313a9ed6887465faa998763ed670d Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 12 Jul 2022 20:42:13 +0800 Subject: [PATCH 3/7] Support TiCDC graceful shutdown timeout Signed-off-by: Neil Shen --- pkg/apis/label/label.go | 2 + pkg/controller/ticdc_control.go | 12 +- pkg/manager/member/ticdc_scaler.go | 86 ++++++++++- pkg/manager/member/ticdc_scaler_test.go | 192 ++++++++++++++++++++++++ 4 files changed, 280 insertions(+), 12 deletions(-) diff --git a/pkg/apis/label/label.go b/pkg/apis/label/label.go index abc84fda9d0..db513f15a9c 100644 --- a/pkg/apis/label/label.go +++ b/pkg/apis/label/label.go @@ -95,6 +95,8 @@ const ( AnnSysctlInit = "tidb.pingcap.com/sysctl-init" // AnnEvictLeaderBeginTime is pod annotation key to indicate the begin time for evicting region leader AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime" + // AnnGracefulShutdownTiCDCBeginTime is pod annotation key to indicate the begin time for graceful shutdown TiCDC + AnnGracefulShutdownTiCDCBeginTime = "tidb.pingcap.com/gracefulShutdownTiCDCBeginTime" // AnnStsLastSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp" diff --git a/pkg/controller/ticdc_control.go b/pkg/controller/ticdc_control.go index 937615ad8ff..70a799a7ea2 100644 --- a/pkg/controller/ticdc_control.go +++ b/pkg/controller/ticdc_control.go @@ -97,7 +97,7 @@ func (c *defaultTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, bool, error) { httpClient, err := c.getHTTPClient(tc) if err != nil { - klog.Warningf("drain capture is failed, error: %s", err) + klog.Warningf("ticdc control: drain capture is failed, error: %s", err) return 0, false, err } @@ -105,7 +105,7 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int captures, retry, err := getCaptures(httpClient, baseURL) if err != nil { - klog.Warningf("drain capture is failed, error: %s", err) + klog.Warningf("ticdc control: drain capture is failed, error: %s", err) return 0, false, err } if retry { @@ -144,10 +144,12 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int defer httputil.DeferClose(res.Body) if res.StatusCode == http.StatusNotFound { // It is likely the TiCDC does not support the API, ignore. + klog.Infof("ticdc control: %s does not support drain capture, skip", this.AdvertiseAddr) return 0, false, nil } if res.StatusCode == http.StatusServiceUnavailable { // TiCDC is not ready, retry. + klog.Infof("ticdc control: %s service unavailable drain capture, retry", this.AdvertiseAddr) return 0, true, nil } body, err := ioutil.ReadAll(res.Body) @@ -167,14 +169,14 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (bool, error) { httpClient, err := c.getHTTPClient(tc) if err != nil { - klog.Warningf("resign owner failed, error: %s", err) + klog.Warningf("ticdc control: resign owner failed, error: %s", err) return false, err } baseURL := c.getBaseURL(tc, ordinal) captures, retry, err := getCaptures(httpClient, baseURL) if err != nil { - klog.Warningf("resign owner failed, error: %s", err) + klog.Warningf("ticdc control: resign owner failed, error: %s", err) return false, err } if retry { @@ -205,10 +207,12 @@ func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int3 httputil.DeferClose(res.Body) if res.StatusCode == http.StatusNotFound { // It is likely the TiCDC does not support the API, ignore. + klog.Infof("ticdc control: %s does not support resign owner, skip", this.AdvertiseAddr) return true, nil } if res.StatusCode == http.StatusServiceUnavailable { // Let caller retry resign owner. + klog.Infof("ticdc control: %s service unavailable resign owner, retry", this.AdvertiseAddr) return false, nil } return false, nil diff --git a/pkg/manager/member/ticdc_scaler.go b/pkg/manager/member/ticdc_scaler.go index f1f756c5c9a..9af34215556 100644 --- a/pkg/manager/member/ticdc_scaler.go +++ b/pkg/manager/member/ticdc_scaler.go @@ -15,8 +15,10 @@ package member import ( "fmt" + "time" apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -27,6 +29,11 @@ import ( "github.com/pingcap/tidb-operator/pkg/util" ) +const ( + // GracefulShutdownTiCDCBeginTime is the key of evict Leader begin time + GracefulShutdownTiCDCBeginTime = "gracefulShutdownTiCDCBeginTime" +) + type ticdcScaler struct { generalScaler } @@ -92,11 +99,11 @@ func (s *ticdcScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newS } tc, _ := meta.(*v1alpha1.TidbCluster) - err = gracefulShutdownTiCDC(tc, s.deps.CDCControl, ordinal, podName, "ScaleIn") + err = gracefulShutdownTiCDC(tc, s.deps.CDCControl, s.deps.PodControl, pod, ordinal, "ScaleIn") if err != nil { return err } - klog.Infof("ticdc has graceful shutdown, %s in cluster %s/%s", podName, meta.GetNamespace(), meta.GetName()) + klog.Infof("ticdcScaler.ScaleIn: %s has graceful shutdown in cluster %s/%s", podName, meta.GetNamespace(), meta.GetName()) pvcs, err := util.ResolvePVCFromPod(pod, s.deps.PVCLister) if err != nil && !errors.IsNotFound(err) { @@ -113,9 +120,22 @@ func (s *ticdcScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newS } func gracefulShutdownTiCDC( - tc *v1alpha1.TidbCluster, cdcCtl controller.TiCDCControlInterface, - ordinal int32, podName, action string, + tc *v1alpha1.TidbCluster, + cdcCtl controller.TiCDCControlInterface, + podCtl controller.PodControlInterface, + pod *corev1.Pod, + ordinal int32, + action string, ) error { + isTimeout, err := checkTiCDCGracefulShutdownTimeout(tc, podCtl, pod, action) + if err != nil { + return err + } + if isTimeout { + return nil + } + podName := pod.GetName() + // To graceful shutdown a TiCDC pod, we need to // // 1. Remove ownership from the capture. @@ -125,7 +145,7 @@ func gracefulShutdownTiCDC( } if !resigned { return controller.RequeueErrorf( - "ticdc.%s, cluster %s/%s %s is still the owner, try resign owner again", + "ticdc.%s: cluster %s/%s %s is still the owner, try resign owner again", action, tc.GetNamespace(), tc.GetName(), podName) } // 2. Drain the capture, move out all its tables. @@ -135,13 +155,63 @@ func gracefulShutdownTiCDC( } if retry { return controller.RequeueErrorf( - "ticdc.%s, cluster %s/%s %s needs to retry drain capture", - action, tc.GetNamespace(), tc.GetName()) + "ticdc.%s: cluster %s/%s %s needs to retry drain capture", + action, tc.GetNamespace(), tc.GetName(), podName) } if tableCount != 0 { return controller.RequeueErrorf( - "ticdc.%s, cluster %s/%s %s still has %d tables, wait draining", + "ticdc.%s: cluster %s/%s %s still has %d tables, wait draining", action, tc.GetNamespace(), tc.GetName(), podName, tableCount) } return nil } + +// TODO: support configurable graceful shutdown timeout. +var ticdcGracefulShutdownTimeout time.Duration = 10 * time.Second + +func checkTiCDCGracefulShutdownTimeout( + tc *v1alpha1.TidbCluster, + podCtl controller.PodControlInterface, + pod *corev1.Pod, + action string, +) (bool, error) { + ns := tc.GetNamespace() + podName := pod.GetName() + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + begin, ok := pod.Annotations[GracefulShutdownTiCDCBeginTime] + if ok { + // Check graceful shutdown timeout. + beginTime, err := time.Parse(time.RFC3339, begin) + if err != nil { + klog.Errorf("ticdc.%s: parse annotation:[%s] \"%s\" to time failed, skip graceful shutdown", + action, GracefulShutdownTiCDCBeginTime, begin) + return true, nil + } + + gracefulShutdownTimeout := ticdcGracefulShutdownTimeout + if time.Now().After(beginTime.Add(gracefulShutdownTimeout)) { + klog.Infof("ticdc.%s: graceful shutdown timeout (threshold: %v) for Pod %s in cluster %s/%s", + action, gracefulShutdownTimeout, podName, ns, tc.GetName()) + return true, nil + } + return false, nil + } + + klog.Infof("ticdc.%s: begin graceful shutdown %s in cluster %s/%s", + action, podName, ns, tc.GetName()) + + // Set graceful shutdown begin time. + now := time.Now().Format(time.RFC3339) + pod.Annotations[GracefulShutdownTiCDCBeginTime] = now + _, err := podCtl.UpdatePod(tc, pod) + if err != nil { + klog.Errorf("ticdc.%s: failed to set pod %s in cluster %s/%s annotation %s to %s, %v", + action, podName, ns, tc.GetName(), GracefulShutdownTiCDCBeginTime, now, err) + return false, err + } + klog.Infof("ticdc.%s: set pod %s in cluster %s/%s annotation %s to %s successfully", + action, podName, ns, tc.GetName(), GracefulShutdownTiCDCBeginTime, now) + return false, nil +} diff --git a/pkg/manager/member/ticdc_scaler_test.go b/pkg/manager/member/ticdc_scaler_test.go index c20df5a7dab..93fc2304de5 100644 --- a/pkg/manager/member/ticdc_scaler_test.go +++ b/pkg/manager/member/ticdc_scaler_test.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" @@ -321,3 +322,194 @@ func newFakeTiCDCScaler(resyncDuration ...time.Duration) (*ticdcScaler, cache.In pvcControl := fakeDeps.PVCControl.(*controller.FakePVCControl) return &ticdcScaler{generalScaler{deps: fakeDeps}}, pvcIndexer, podIndexer, pvcControl } + +type cdcCtlMock struct { + controller.TiCDCControlInterface + drainCapture func(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) + resignOwner func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) +} + +func (c *cdcCtlMock) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, bool, error) { + return c.drainCapture(tc, ordinal) +} +func (c *cdcCtlMock) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (bool, error) { + return c.resignOwner(tc, ordinal) +} + +type podCtlMock struct { + controller.PodControlInterface + updatePod func(runtime.Object, *corev1.Pod) (*corev1.Pod, error) +} + +func (p *podCtlMock) UpdatePod(o runtime.Object, pod *corev1.Pod) (*corev1.Pod, error) { + return p.updatePod(o, pod) +} + +func TestTiCDCGracefulShutdown(t *testing.T) { + g := NewGomegaWithT(t) + + tc := newTidbClusterForPD() + tc.Spec.TiCDC = &v1alpha1.TiCDCSpec{} + newPod := func() *corev1.Pod { + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: ticdcPodName(tc.GetName(), 1), + Namespace: corev1.NamespaceDefault, + CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)}, + }, + } + } + + cases := []struct { + caseName string + cdcCtl controller.TiCDCControlInterface + podCtl controller.PodControlInterface + pod func() *corev1.Pod + expectedErr func(error, string) + }{ + { + caseName: "shutdown ok", + cdcCtl: &cdcCtlMock{ + drainCapture: func(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) { + return 0, false, nil + }, + resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) { + return true, nil + }, + }, + podCtl: &podCtlMock{ + updatePod: func(_ runtime.Object, p *corev1.Pod) (*corev1.Pod, error) { + return p, nil + }, + }, + pod: newPod, + expectedErr: func(err error, name string) { + g.Expect(err).Should(BeNil(), name) + }, + }, + { + caseName: "shutdown timeout", + cdcCtl: &cdcCtlMock{}, + podCtl: &podCtlMock{}, + pod: func() *corev1.Pod { + pod := newPod() + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + now := time.Now().Add(-2 * ticdcGracefulShutdownTimeout).Format(time.RFC3339) + pod.Annotations[GracefulShutdownTiCDCBeginTime] = now + return pod + }, + expectedErr: func(err error, name string) { + g.Expect(err).Should(BeNil(), name) + }, + }, + { + caseName: "shutdown malformed label value", + cdcCtl: &cdcCtlMock{}, + podCtl: &podCtlMock{}, + pod: func() *corev1.Pod { + pod := newPod() + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + pod.Annotations[GracefulShutdownTiCDCBeginTime] = "malformed" + return pod + }, + expectedErr: func(err error, name string) { + g.Expect(err).Should(BeNil(), name) + }, + }, + { + caseName: "shutdown with label set", + cdcCtl: &cdcCtlMock{ + drainCapture: func(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) { + return 0, false, nil + }, + resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) { + return true, nil + }, + }, + podCtl: &podCtlMock{}, + pod: func() *corev1.Pod { + pod := newPod() + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + now := time.Now().Format(time.RFC3339) + pod.Annotations[GracefulShutdownTiCDCBeginTime] = now + return pod + }, + expectedErr: func(err error, name string) { + g.Expect(err).Should(BeNil(), name) + }, + }, + { + caseName: "shutdown retry resign owner", + cdcCtl: &cdcCtlMock{ + resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) { + return false, nil + }, + }, + podCtl: &podCtlMock{ + updatePod: func(_ runtime.Object, p *corev1.Pod) (*corev1.Pod, error) { + return p, nil + }, + }, + pod: newPod, + expectedErr: func(err error, name string) { + g.Expect(err).Should(Not(BeNil()), name) + g.Expect(controller.IsRequeueError(err)).Should(BeTrue(), name) + }, + }, + { + caseName: "shutdown retry drain capture #1", + cdcCtl: &cdcCtlMock{ + drainCapture: func(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) { + return 1, false, nil + }, + resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) { + return true, nil + }, + }, + podCtl: &podCtlMock{ + updatePod: func(_ runtime.Object, p *corev1.Pod) (*corev1.Pod, error) { + return p, nil + }, + }, + pod: newPod, + expectedErr: func(err error, name string) { + g.Expect(err).Should(Not(BeNil()), name) + g.Expect(controller.IsRequeueError(err)).Should(BeTrue(), name) + }, + }, + { + caseName: "shutdown retry drain capture #2", + cdcCtl: &cdcCtlMock{ + drainCapture: func(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) { + return 0, true, nil + }, + resignOwner: func(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) { + return true, nil + }, + }, + podCtl: &podCtlMock{ + updatePod: func(_ runtime.Object, p *corev1.Pod) (*corev1.Pod, error) { + return p, nil + }, + }, + pod: newPod, + expectedErr: func(err error, name string) { + g.Expect(err).Should(Not(BeNil()), name) + g.Expect(controller.IsRequeueError(err)).Should(BeTrue(), name) + }, + }, + } + + for _, c := range cases { + pod := c.pod() + err := gracefulShutdownTiCDC(tc, c.cdcCtl, c.podCtl, pod, 1, "test") + c.expectedErr(err, c.caseName) + } +} From 24c51f4893af31a4e8196027d430a0082c9d07f2 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 13 Jul 2022 17:24:21 +0800 Subject: [PATCH 4/7] address comments and lints Signed-off-by: Neil Shen --- pkg/controller/ticdc_control.go | 18 +++++++++--------- pkg/controller/ticdc_control_test.go | 28 ++++++++++++++-------------- pkg/manager/member/ticdc_scaler.go | 6 +++--- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pkg/controller/ticdc_control.go b/pkg/controller/ticdc_control.go index 70a799a7ea2..e2082b4f13b 100644 --- a/pkg/controller/ticdc_control.go +++ b/pkg/controller/ticdc_control.go @@ -97,7 +97,7 @@ func (c *defaultTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, bool, error) { httpClient, err := c.getHTTPClient(tc) if err != nil { - klog.Warningf("ticdc control: drain capture is failed, error: %s", err) + klog.Warningf("ticdc control: drain capture is failed, error: %v", err) return 0, false, err } @@ -105,7 +105,7 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int captures, retry, err := getCaptures(httpClient, baseURL) if err != nil { - klog.Warningf("ticdc control: drain capture is failed, error: %s", err) + klog.Warningf("ticdc control: drain capture is failed, error: %v", err) return 0, false, err } if retry { @@ -120,7 +120,7 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures) if this == nil { addr := getCaptureAdvertiseAddressPrefix(tc, ordinal) - return 0, false, fmt.Errorf("capture not found, address: %s %+v", addr, captures) + return 0, false, fmt.Errorf("capture not found, address: %s, captures: %+v", addr, captures) } if owner == nil { return 0, false, fmt.Errorf("owner not found") @@ -169,14 +169,14 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (bool, error) { httpClient, err := c.getHTTPClient(tc) if err != nil { - klog.Warningf("ticdc control: resign owner failed, error: %s", err) + klog.Warningf("ticdc control: resign owner failed, error: %v", err) return false, err } baseURL := c.getBaseURL(tc, ordinal) captures, retry, err := getCaptures(httpClient, baseURL) if err != nil { - klog.Warningf("ticdc control: resign owner failed, error: %s", err) + klog.Warningf("ticdc control: resign owner failed, error: %v", err) return false, err } if retry { @@ -245,10 +245,6 @@ func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, bool, return nil, false, err } defer httputil.DeferClose(res.Body) - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, false, err - } if res.StatusCode == http.StatusNotFound { // It is likely the TiCDC does not support the API, ignore. return nil, false, nil @@ -258,6 +254,10 @@ func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, bool, return nil, true, nil } + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, false, err + } var resp []captureInfo err = json.Unmarshal(body, &resp) if err != nil { diff --git a/pkg/controller/ticdc_control_test.go b/pkg/controller/ticdc_control_test.go index 0e65358bebc..35116717874 100644 --- a/pkg/controller/ticdc_control_test.go +++ b/pkg/controller/ticdc_control_test.go @@ -49,7 +49,7 @@ func TestTiCDCControllerResignOwner(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, }, ordinal: 1, @@ -69,7 +69,7 @@ func TestTiCDCControllerResignOwner(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, }, ordinal: 1, @@ -90,7 +90,7 @@ func TestTiCDCControllerResignOwner(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, "/api/v1/owner/resign": func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusAccepted) @@ -114,7 +114,7 @@ func TestTiCDCControllerResignOwner(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, "/api/v1/owner/resign": func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusNotFound) @@ -138,7 +138,7 @@ func TestTiCDCControllerResignOwner(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, "/api/v1/owner/resign": func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) @@ -200,7 +200,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, }, ordinal: 1, @@ -221,7 +221,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, }, ordinal: 3, @@ -242,7 +242,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, }, ordinal: 1, @@ -264,12 +264,12 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { payload, err := json.Marshal(drainCaptureResp{CurrentTableCount: 0}) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, }, ordinal: 1, @@ -291,7 +291,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { body, err := io.ReadAll(req.Body) @@ -303,7 +303,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { payload, err := json.Marshal(drainCaptureResp{CurrentTableCount: 1}) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, }, ordinal: 1, @@ -325,7 +325,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusNotFound) @@ -350,7 +350,7 @@ func TestTiCDCControllerDrainCapture(t *testing.T) { }} payload, err := json.Marshal(cp) g.Expect(err).Should(BeNil()) - fmt.Fprintf(w, string(payload)) + fmt.Fprint(w, string(payload)) }, "/api/v1/captures/drain": func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) diff --git a/pkg/manager/member/ticdc_scaler.go b/pkg/manager/member/ticdc_scaler.go index 9af34215556..b70615e0e76 100644 --- a/pkg/manager/member/ticdc_scaler.go +++ b/pkg/manager/member/ticdc_scaler.go @@ -95,7 +95,7 @@ func (s *ticdcScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newS } pod, err := s.deps.PodLister.Pods(ns).Get(podName) if err != nil { - return fmt.Errorf("ticdcScaler.ScaleIn: failed to get pods %s for cluster %s/%s, error: %s", podName, ns, tcName, err) + return fmt.Errorf("ticdcScaler.ScaleIn: failed to get pods %s for cluster %s/%s, error: %v", podName, ns, tcName, err) } tc, _ := meta.(*v1alpha1.TidbCluster) @@ -107,7 +107,7 @@ func (s *ticdcScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newS pvcs, err := util.ResolvePVCFromPod(pod, s.deps.PVCLister) if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("ticdcScaler.ScaleIn: failed to get pvcs for pod %s/%s in tc %s/%s, error: %s", ns, pod.Name, ns, tcName, err) + return fmt.Errorf("ticdcScaler.ScaleIn: failed to get pvcs for pod %s/%s in tc %s/%s, error: %v", ns, pod.Name, ns, tcName, err) } for _, pvc := range pvcs { if err := addDeferDeletingAnnoToPVC(tc, pvc, s.deps.PVCControl); err != nil { @@ -207,7 +207,7 @@ func checkTiCDCGracefulShutdownTimeout( pod.Annotations[GracefulShutdownTiCDCBeginTime] = now _, err := podCtl.UpdatePod(tc, pod) if err != nil { - klog.Errorf("ticdc.%s: failed to set pod %s in cluster %s/%s annotation %s to %s, %v", + klog.Errorf("ticdc.%s: failed to set pod %s in cluster %s/%s annotation %s to %s, error: %v", action, podName, ns, tc.GetName(), GracefulShutdownTiCDCBeginTime, now, err) return false, err } From 4d49fc903849843b865215c1d2133112822e2b06 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 15 Jul 2022 19:25:33 +0800 Subject: [PATCH 5/7] address comments Signed-off-by: Neil Shen --- pkg/apis/label/label.go | 4 ++-- pkg/controller/ticdc_control.go | 16 ++++++++-------- pkg/manager/member/ticdc_scaler.go | 16 ++++++---------- pkg/manager/member/ticdc_scaler_test.go | 6 +++--- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/pkg/apis/label/label.go b/pkg/apis/label/label.go index db513f15a9c..f5818596bf2 100644 --- a/pkg/apis/label/label.go +++ b/pkg/apis/label/label.go @@ -95,8 +95,8 @@ const ( AnnSysctlInit = "tidb.pingcap.com/sysctl-init" // AnnEvictLeaderBeginTime is pod annotation key to indicate the begin time for evicting region leader AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime" - // AnnGracefulShutdownTiCDCBeginTime is pod annotation key to indicate the begin time for graceful shutdown TiCDC - AnnGracefulShutdownTiCDCBeginTime = "tidb.pingcap.com/gracefulShutdownTiCDCBeginTime" + // AnnTiCDCGracefulShutdownBeginTime is pod annotation key to indicate the begin time for graceful shutdown TiCDC + AnnTiCDCGracefulShutdownBeginTime = "tidb.pingcap.com/gracefulShutdownTiCDCBeginTime" // AnnStsLastSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp" diff --git a/pkg/controller/ticdc_control.go b/pkg/controller/ticdc_control.go index e2082b4f13b..0062bcd4866 100644 --- a/pkg/controller/ticdc_control.go +++ b/pkg/controller/ticdc_control.go @@ -123,7 +123,7 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int return 0, false, fmt.Errorf("capture not found, address: %s, captures: %+v", addr, captures) } if owner == nil { - return 0, false, fmt.Errorf("owner not found") + return 0, false, fmt.Errorf("owner not found, captures: %+v", captures) } payload := drainCaptureRequest{ @@ -131,15 +131,15 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int } payloadBody, err := json.Marshal(payload) if err != nil { - return 0, false, err + return 0, false, fmt.Errorf("ticdc drain capture failed, marshal request error: %v", err) } req, err := http.NewRequest("PUT", baseURL+"/api/v1/captures/drain", bytes.NewReader(payloadBody)) if err != nil { - return 0, false, err + return 0, false, fmt.Errorf("ticdc drain capture failed, new request error: %v", err) } res, err := httpClient.Do(req) if err != nil { - return 0, false, err + return 0, false, fmt.Errorf("ticdc drain capture failed, request error: %v", err) } defer httputil.DeferClose(res.Body) if res.StatusCode == http.StatusNotFound { @@ -154,7 +154,7 @@ func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int } body, err := ioutil.ReadAll(res.Body) if err != nil { - return 0, false, err + return 0, false, fmt.Errorf("ticdc drain capture failed, read response error: %v", err) } var resp drainCaptureResp @@ -202,7 +202,7 @@ func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int3 res, err := httpClient.Post(baseURL+"/api/v1/owner/resign", "", nil) if err != nil { - return false, err + return false, fmt.Errorf("ticdc resign owner failed, request error: %v", err) } httputil.DeferClose(res.Body) if res.StatusCode == http.StatusNotFound { @@ -242,7 +242,7 @@ func getCaptureAdvertiseAddressPrefix(tc *v1alpha1.TidbCluster, ordinal int32) s func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, bool, error) { res, err := httpClient.Get(baseURL + "/api/v1/captures") if err != nil { - return nil, false, err + return nil, false, fmt.Errorf("ticdc get captures failed, request error: %v", err) } defer httputil.DeferClose(res.Body) if res.StatusCode == http.StatusNotFound { @@ -256,7 +256,7 @@ func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, bool, body, err := ioutil.ReadAll(res.Body) if err != nil { - return nil, false, err + return nil, false, fmt.Errorf("ticdc get captures failed, read response error: %v", err) } var resp []captureInfo err = json.Unmarshal(body, &resp) diff --git a/pkg/manager/member/ticdc_scaler.go b/pkg/manager/member/ticdc_scaler.go index b70615e0e76..ce654757f81 100644 --- a/pkg/manager/member/ticdc_scaler.go +++ b/pkg/manager/member/ticdc_scaler.go @@ -24,16 +24,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" + "github.com/pingcap/tidb-operator/pkg/apis/label" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/util" ) -const ( - // GracefulShutdownTiCDCBeginTime is the key of evict Leader begin time - GracefulShutdownTiCDCBeginTime = "gracefulShutdownTiCDCBeginTime" -) - type ticdcScaler struct { generalScaler } @@ -180,13 +176,13 @@ func checkTiCDCGracefulShutdownTimeout( if pod.Annotations == nil { pod.Annotations = map[string]string{} } - begin, ok := pod.Annotations[GracefulShutdownTiCDCBeginTime] + begin, ok := pod.Annotations[label.AnnTiCDCGracefulShutdownBeginTime] if ok { // Check graceful shutdown timeout. beginTime, err := time.Parse(time.RFC3339, begin) if err != nil { klog.Errorf("ticdc.%s: parse annotation:[%s] \"%s\" to time failed, skip graceful shutdown", - action, GracefulShutdownTiCDCBeginTime, begin) + action, label.AnnTiCDCGracefulShutdownBeginTime, begin) return true, nil } @@ -204,14 +200,14 @@ func checkTiCDCGracefulShutdownTimeout( // Set graceful shutdown begin time. now := time.Now().Format(time.RFC3339) - pod.Annotations[GracefulShutdownTiCDCBeginTime] = now + pod.Annotations[label.AnnTiCDCGracefulShutdownBeginTime] = now _, err := podCtl.UpdatePod(tc, pod) if err != nil { klog.Errorf("ticdc.%s: failed to set pod %s in cluster %s/%s annotation %s to %s, error: %v", - action, podName, ns, tc.GetName(), GracefulShutdownTiCDCBeginTime, now, err) + action, podName, ns, tc.GetName(), label.AnnTiCDCGracefulShutdownBeginTime, now, err) return false, err } klog.Infof("ticdc.%s: set pod %s in cluster %s/%s annotation %s to %s successfully", - action, podName, ns, tc.GetName(), GracefulShutdownTiCDCBeginTime, now) + action, podName, ns, tc.GetName(), label.AnnTiCDCGracefulShutdownBeginTime, now) return false, nil } diff --git a/pkg/manager/member/ticdc_scaler_test.go b/pkg/manager/member/ticdc_scaler_test.go index 93fc2304de5..7841a5c9b35 100644 --- a/pkg/manager/member/ticdc_scaler_test.go +++ b/pkg/manager/member/ticdc_scaler_test.go @@ -398,7 +398,7 @@ func TestTiCDCGracefulShutdown(t *testing.T) { pod.Annotations = map[string]string{} } now := time.Now().Add(-2 * ticdcGracefulShutdownTimeout).Format(time.RFC3339) - pod.Annotations[GracefulShutdownTiCDCBeginTime] = now + pod.Annotations[label.AnnTiCDCGracefulShutdownBeginTime] = now return pod }, expectedErr: func(err error, name string) { @@ -414,7 +414,7 @@ func TestTiCDCGracefulShutdown(t *testing.T) { if pod.Annotations == nil { pod.Annotations = map[string]string{} } - pod.Annotations[GracefulShutdownTiCDCBeginTime] = "malformed" + pod.Annotations[label.AnnTiCDCGracefulShutdownBeginTime] = "malformed" return pod }, expectedErr: func(err error, name string) { @@ -438,7 +438,7 @@ func TestTiCDCGracefulShutdown(t *testing.T) { pod.Annotations = map[string]string{} } now := time.Now().Format(time.RFC3339) - pod.Annotations[GracefulShutdownTiCDCBeginTime] = now + pod.Annotations[label.AnnTiCDCGracefulShutdownBeginTime] = now return pod }, expectedErr: func(err error, name string) { From b22728c14e3d5db54a3c6836262dc0547871b96f Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 15 Jul 2022 19:41:09 +0800 Subject: [PATCH 6/7] correct label style Signed-off-by: Neil Shen --- pkg/apis/label/label.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/apis/label/label.go b/pkg/apis/label/label.go index f5818596bf2..c2c187a6d19 100644 --- a/pkg/apis/label/label.go +++ b/pkg/apis/label/label.go @@ -96,7 +96,7 @@ const ( // AnnEvictLeaderBeginTime is pod annotation key to indicate the begin time for evicting region leader AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime" // AnnTiCDCGracefulShutdownBeginTime is pod annotation key to indicate the begin time for graceful shutdown TiCDC - AnnTiCDCGracefulShutdownBeginTime = "tidb.pingcap.com/gracefulShutdownTiCDCBeginTime" + AnnTiCDCGracefulShutdownBeginTime = "tidb.pingcap.com/ticdc-graceful-shutdown-begin-time" // AnnStsLastSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp" From 8360cb0bd01908109cbfc32c0fa53e5faec295f2 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 20 Jul 2022 15:36:04 +0800 Subject: [PATCH 7/7] address comments Signed-off-by: Neil Shen --- pkg/manager/member/ticdc_member_manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/manager/member/ticdc_member_manager.go b/pkg/manager/member/ticdc_member_manager.go index 765c7a2bb0f..fb6fb9562d8 100644 --- a/pkg/manager/member/ticdc_member_manager.go +++ b/pkg/manager/member/ticdc_member_manager.go @@ -130,6 +130,8 @@ func (m *ticdcMemberManager) Sync(tc *v1alpha1.TidbCluster) error { ns := tc.GetNamespace() tcName := tc.GetName() + // NB: All TiCDC operations, e.g. creation, scale, upgrade will be blocked. + // if PD or TiKV is not available. if tc.Spec.PD != nil && !tc.PDIsAvailable() { return controller.RequeueErrorf("TidbCluster: [%s/%s], TiCDC is waiting for PD cluster running", ns, tcName) } @@ -350,8 +352,8 @@ func getNewTiCDCStatefulSet(tc *v1alpha1.TidbCluster, cm *corev1.ConfigMap) (*ap stsAnnotations := getStsAnnotations(tc.Annotations, label.TiCDCLabelVal) headlessSvcName := controller.TiCDCPeerMemberName(tcName) + // NB: TiCDC control relies the format. // TODO move advertise addr format to package controller. - // TiCDC control relays the format. advertiseAddr := fmt.Sprintf("${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc%s:8301", controller.FormatClusterDomain(tc.Spec.ClusterDomain)) cmdArgs := []string{"/cdc server", "--addr=0.0.0.0:8301", fmt.Sprintf("--advertise-addr=%s", advertiseAddr)}