Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support graceful shutdown TiCDC node #4624

Merged
merged 14 commits into from
Jul 25, 2022
Merged
2 changes: 2 additions & 0 deletions pkg/apis/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ const (
AnnSysctlInit = "tidb.pingcap.com/sysctl-init"
// AnnEvictLeaderBeginTime is pod annotation key to indicate the begin time for evicting region leader
AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime"
// AnnTiCDCGracefulShutdownBeginTime is pod annotation key to indicate the begin time for graceful shutdown TiCDC
AnnTiCDCGracefulShutdownBeginTime = "tidb.pingcap.com/ticdc-graceful-shutdown-begin-time"
// AnnStsLastSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts
AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp"

Expand Down
37 changes: 29 additions & 8 deletions pkg/apis/pingcap/v1alpha1/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,14 +569,6 @@ func (tc *TidbCluster) TiFlashStsDesiredReplicas() int32 {
return tc.Spec.TiFlash.Replicas + int32(len(tc.Status.TiFlash.FailureStores))
}

func (tc *TidbCluster) TiCDCDeployDesiredReplicas() int32 {
if tc.Spec.TiCDC == nil {
return 0
}

return tc.Spec.TiCDC.Replicas
}

func (tc *TidbCluster) TiFlashStsActualReplicas() int32 {
stsStatus := tc.Status.TiFlash.StatefulSet
if stsStatus == nil {
Expand All @@ -596,6 +588,35 @@ func (tc *TidbCluster) TiFlashStsDesiredOrdinals(excludeFailover bool) sets.Int3
return GetPodOrdinalsFromReplicasAndDeleteSlots(replicas, tc.getDeleteSlots(label.TiFlashLabelVal))
}

// TiCDCAllCapturesReady return whether all captures of TiCDC are ready.
//
// If TiCDC isn't specified, return false.
func (tc *TidbCluster) TiCDCAllCapturesReady() bool {
if tc.Spec.TiCDC == nil {
return false
}

if int(tc.TiCDCDeployDesiredReplicas()) != len(tc.Status.TiCDC.Captures) {
return false
}

for _, c := range tc.Status.TiCDC.Captures {
if !c.Ready {
return false
}
}

return true
}

func (tc *TidbCluster) TiCDCDeployDesiredReplicas() int32 {
if tc.Spec.TiCDC == nil {
return 0
}

return tc.Spec.TiCDC.Replicas
}

// TiDBAllPodsStarted return whether all pods of TiDB are started.
//
// If TiDB isn't specified, return false.
Expand Down
219 changes: 217 additions & 2 deletions pkg/controller/ticdc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
package controller

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
overvenus marked this conversation as resolved.
Show resolved Hide resolved
"net/http"
"strings"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
httputil "github.com/pingcap/tidb-operator/pkg/util/http"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
)

type CaptureStatus struct {
Expand All @@ -27,10 +33,35 @@ type CaptureStatus struct {
IsOwner bool `json:"is_owner"`
}

type captureInfo struct {
ID string `json:"id"`
IsOwner bool `json:"is_owner"`
AdvertiseAddr string `json:"address"`
}

// drainCaptureRequest is request for manual `DrainCapture`
type drainCaptureRequest struct {
CaptureID string `json:"capture_id"`
}

// drainCaptureResp is response for manual `DrainCapture`
type drainCaptureResp struct {
CurrentTableCount int `json:"current_table_count"`
}

// TiCDCControlInterface is the interface that knows how to manage ticdc captures
type TiCDCControlInterface interface {
// GetStatus returns ticdc's status
GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) (*CaptureStatus, error)
// DrainCapture remove capture ownership and moves its tables to other captures.
// Returns the number of tables in the capture.
// If there is only one capture, it always return 0.
DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error)
// ResignOwner tries to resign ownership from the current capture.
// Returns true if the capture has already resigned ownership,
// otherwise caller should retry resign owner.
// If there is only one capture, it always return true.
ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error)
}

// defaultTiCDCControl is default implementation of TiCDCControlInterface.
Expand Down Expand Up @@ -63,17 +94,193 @@ func (c *defaultTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32)
return &status, err
}

func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, bool, error) {
httpClient, err := c.getHTTPClient(tc)
if err != nil {
klog.Warningf("ticdc control: drain capture is failed, error: %v", err)
return 0, false, err
}

baseURL := c.getBaseURL(tc, ordinal)

captures, retry, err := getCaptures(httpClient, baseURL)
if err != nil {
klog.Warningf("ticdc control: drain capture is failed, error: %v", err)
return 0, false, err
}
if retry {
// Let caller retry drain capture.
return 0, true, nil
}
if len(captures) == 1 {
// No way to drain a single node TiCDC cluster, ignore.
return 0, false, nil
}

this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures)
if this == nil {
addr := getCaptureAdvertiseAddressPrefix(tc, ordinal)
return 0, false, fmt.Errorf("capture not found, address: %s, captures: %+v", addr, captures)
}
if owner == nil {
return 0, false, fmt.Errorf("owner not found, captures: %+v", captures)
}

payload := drainCaptureRequest{
CaptureID: this.ID,
}
payloadBody, err := json.Marshal(payload)
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, marshal request error: %v", err)
}
req, err := http.NewRequest("PUT", baseURL+"/api/v1/captures/drain", bytes.NewReader(payloadBody))
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, new request error: %v", err)
}
res, err := httpClient.Do(req)
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, request error: %v", err)
}
defer httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusNotFound {
overvenus marked this conversation as resolved.
Show resolved Hide resolved
// It is likely the TiCDC does not support the API, ignore.
klog.Infof("ticdc control: %s does not support drain capture, skip", this.AdvertiseAddr)
return 0, false, nil
}
if res.StatusCode == http.StatusServiceUnavailable {
// TiCDC is not ready, retry.
klog.Infof("ticdc control: %s service unavailable drain capture, retry", this.AdvertiseAddr)
return 0, true, nil
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, read response error: %v", err)
}

var resp drainCaptureResp
err = json.Unmarshal(body, &resp)
if err != nil {
// It is likely the TiCDC does not support the API, ignore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct for the unmarshal error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we ignore the error because old ticdc does not support this API.

return 0, false, nil
}
return resp.CurrentTableCount, false, nil
}

func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (bool, error) {
httpClient, err := c.getHTTPClient(tc)
if err != nil {
klog.Warningf("ticdc control: resign owner failed, error: %v", err)
return false, err
}

baseURL := c.getBaseURL(tc, ordinal)
captures, retry, err := getCaptures(httpClient, baseURL)
if err != nil {
klog.Warningf("ticdc control: resign owner failed, error: %v", err)
return false, err
}
if retry {
// Let caller retry resign owner.
return false, nil
}
if len(captures) == 1 {
// No way to resign owner in a single node TiCDC cluster, ignore.
return true, nil
}

this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures)
if owner != nil && this != nil {
if owner.ID != this.ID {
// Ownership has been transferred another capture.
return true, nil
}
} else {
// Owner or this capture not found, resign ownership from the capture is
// meaning less, ignore.
return true, nil
}

res, err := httpClient.Post(baseURL+"/api/v1/owner/resign", "", nil)
if err != nil {
return false, fmt.Errorf("ticdc resign owner failed, request error: %v", err)
}
httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusNotFound {
// It is likely the TiCDC does not support the API, ignore.
klog.Infof("ticdc control: %s does not support resign owner, skip", this.AdvertiseAddr)
return true, nil
}
if res.StatusCode == http.StatusServiceUnavailable {
// Let caller retry resign owner.
klog.Infof("ticdc control: %s service unavailable resign owner, retry", this.AdvertiseAddr)
return false, nil
}
return false, nil
}

func (c *defaultTiCDCControl) getBaseURL(tc *v1alpha1.TidbCluster, ordinal int32) string {
if c.testURL != "" {
return c.testURL
}

scheme := tc.Scheme()
addr := getCaptureAdvertiseAddressPrefix(tc, ordinal)
return fmt.Sprintf("%s://%s:8301", scheme, addr)
}

// getCaptureAdvertiseAddressPrefix is the prefix of TiCDC advertiseAddress
// which is composed by ${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc.${ClusterDomain}:8301
// this function return a string "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}"
func getCaptureAdvertiseAddressPrefix(tc *v1alpha1.TidbCluster, ordinal int32) string {
tcName := tc.GetName()
ns := tc.GetNamespace()
scheme := tc.Scheme()
hostName := fmt.Sprintf("%s-%d", TiCDCMemberName(tcName), ordinal)

return fmt.Sprintf("%s://%s.%s.%s:8301", scheme, hostName, TiCDCPeerMemberName(tcName), ns)
return fmt.Sprintf("%s.%s.%s", hostName, TiCDCPeerMemberName(tcName), ns)
}

func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, bool, error) {
res, err := httpClient.Get(baseURL + "/api/v1/captures")
if err != nil {
return nil, false, fmt.Errorf("ticdc get captures failed, request error: %v", err)
}
defer httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusNotFound {
overvenus marked this conversation as resolved.
Show resolved Hide resolved
// It is likely the TiCDC does not support the API, ignore.
return nil, false, nil
}
if res.StatusCode == http.StatusServiceUnavailable {
// TiCDC is not ready, retry.
return nil, true, nil
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, false, fmt.Errorf("ticdc get captures failed, read response error: %v", err)
}
var resp []captureInfo
err = json.Unmarshal(body, &resp)
if err != nil {
// It is likely the TiCDC does not support the API, ignore.
return nil, false, nil
}
return resp, false, nil
}

func getOrdinalAndOwnerCaptureInfo(
tc *v1alpha1.TidbCluster, ordinal int32, captures []captureInfo,
) (this, owner *captureInfo) {
addrPrefix := getCaptureAdvertiseAddressPrefix(tc, ordinal)
for i := range captures {
cp := &captures[i]
if strings.Contains(cp.AdvertiseAddr, addrPrefix) {
this = cp
}
if cp.IsOwner {
owner = cp
}
}
return
}

// FakeTiCDCControl is a fake implementation of TiCDCControlInterface.
Expand All @@ -97,3 +304,11 @@ func (c *FakeTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) (*
}
return c.getStatus(tc, ordinal)
}

func (c *FakeTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) {
return 0, false, nil
}

func (c *FakeTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) {
return true, nil
}
Loading