Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: support graceful shutdown on receiving signal #6132

Merged
merged 6 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ swagger-spec: tools/bin/swag

generate_mock: tools/bin/mockgen
tools/bin/mockgen -source cdc/owner/owner.go -destination cdc/owner/mock/owner_mock.go
tools/bin/mockgen -source cdc/processor/manager.go -destination cdc/processor/mock/manager_mock.go
tools/bin/mockgen -source cdc/capture/capture.go -destination cdc/capture/mock/capture_mock.go

clean:
go clean -i ./...
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ func (c ChangefeedResp) MarshalJSON() ([]byte, error) {

// ownerAPI provides owner APIs.
type ownerAPI struct {
capture *capture.Capture
capture capture.Capture
}

// RegisterOwnerAPIRoutes registers routes for owner APIs.
func RegisterOwnerAPIRoutes(router *gin.Engine, capture *capture.Capture) {
func RegisterOwnerAPIRoutes(router *gin.Engine, capture capture.Capture) {
ownerAPI := ownerAPI{capture: capture}
owner := router.Group("/capture/owner")

Expand Down Expand Up @@ -241,13 +241,13 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cfInfo, err := h.capture.EtcdClient.GetChangeFeedInfo(ctx, changefeedID)
cfInfo, err := h.capture.GetEtcdClient().GetChangeFeedInfo(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
}
cfStatus, _, err := h.capture.EtcdClient.GetChangeFeedStatus(ctx, changefeedID)
cfStatus, _, err := h.capture.GetEtcdClient().GetChangeFeedStatus(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest, err)
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type status struct {
}

type statusAPI struct {
capture *capture.Capture
capture capture.Capture
}

// RegisterStatusAPIRoutes registers routes for status.
func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) {
func RegisterStatusAPIRoutes(router *gin.Engine, capture capture.Capture) {
statusAPI := statusAPI{capture: capture}
router.GET("/status", gin.WrapF(statusAPI.handleStatus))
router.GET("/debug/info", gin.WrapF(statusAPI.handleDebugInfo))
Expand All @@ -64,7 +64,7 @@ func (h *statusAPI) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
h.capture.WriteDebugInfo(ctx, w)
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
h.writeEtcdInfo(ctx, h.capture.EtcdClient, w)
h.writeEtcdInfo(ctx, h.capture.GetEtcdClient(), w)
}

func (h *statusAPI) handleStatus(w http.ResponseWriter, req *http.Request) {
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func WriteData(w http.ResponseWriter, data interface{}) {

// HandleOwnerJob enqueue the admin job
func HandleOwnerJob(
ctx context.Context, capture *capture.Capture, job model.AdminJob,
ctx context.Context, capture capture.Capture, job model.AdminJob,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
Expand All @@ -113,7 +113,7 @@ func HandleOwnerJob(

// HandleOwnerBalance balance the changefeed tables
func HandleOwnerBalance(
ctx context.Context, capture *capture.Capture, changefeedID model.ChangeFeedID,
ctx context.Context, capture capture.Capture, changefeedID model.ChangeFeedID,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
Expand All @@ -132,7 +132,7 @@ func HandleOwnerBalance(

// HandleOwnerScheduleTable schedule tables
func HandleOwnerScheduleTable(
ctx context.Context, capture *capture.Capture,
ctx context.Context, capture capture.Capture,
changefeedID model.ChangeFeedID, captureID string, tableID int64,
) error {
// Use buffered channel to prevent blocking owner.
Expand Down Expand Up @@ -233,7 +233,7 @@ func ForwardToOwner(c *gin.Context, p CaptureInfoProvider) {

// HandleOwnerDrainCapture schedule drain the target capture
func HandleOwnerDrainCapture(
ctx context.Context, capture *capture.Capture, captureID string,
ctx context.Context, capture capture.Capture, captureID string,
) (*model.DrainCaptureResp, error) {
// Use buffered channel to prevent blocking owner.
done := make(chan error, 1)
Expand Down
29 changes: 15 additions & 14 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ const (

// OpenAPI provides capture APIs.
type OpenAPI struct {
capture *capture.Capture
capture capture.Capture
// use for unit test only
testStatusProvider owner.StatusProvider
}

// NewOpenAPI creates a new OpenAPI.
func NewOpenAPI(c *capture.Capture) OpenAPI {
func NewOpenAPI(c capture.Capture) OpenAPI {
return OpenAPI{capture: c}
}

// NewOpenAPI4Test return a OpenAPI for test
func NewOpenAPI4Test(c *capture.Capture, p owner.StatusProvider) OpenAPI {
func NewOpenAPI4Test(c capture.Capture, p owner.StatusProvider) OpenAPI {
return OpenAPI{capture: c, testStatusProvider: p}
}

Expand Down Expand Up @@ -268,8 +268,8 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info,
model.DefaultChangeFeedID(changefeedConfig.ID))
err = h.capture.GetEtcdClient().CreateChangefeedInfo(
ctx, info, model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -403,7 +403,7 @@ func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
return
}

err = h.capture.EtcdClient.SaveChangeFeedInfo(ctx, newInfo, changefeedID)
err = h.capture.GetEtcdClient().SaveChangeFeedInfo(ctx, newInfo, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -704,7 +704,7 @@ func (h *OpenAPI) ListCapture(c *gin.Context) {
// @Produce json
// @Success 200,202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/captures/drain [post]
// @Router /api/v1/captures/drain [put]
func (h *OpenAPI) DrainCapture(c *gin.Context) {
var req model.DrainCaptureRequest
if err := c.ShouldBindJSON(&req); err != nil {
Expand Down Expand Up @@ -775,18 +775,19 @@ func (h *OpenAPI) DrainCapture(c *gin.Context) {
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/status [get]
func (h *OpenAPI) ServerStatus(c *gin.Context) {
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Pid: os.Getpid(),
}
info, err := h.capture.Info()
if err != nil {
_ = c.Error(err)
return
}
status.ID = info.ID
status.IsOwner = h.capture.IsOwner()
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Pid: os.Getpid(),
ID: info.ID,
IsOwner: h.capture.IsOwner(),
Liveness: h.capture.Liveness(),
}
c.IndentedJSON(http.StatusOK, status)
}

Expand Down
44 changes: 43 additions & 1 deletion cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
mock_capture "github.com/pingcap/tiflow/cdc/capture/mock"
"github.com/pingcap/tiflow/cdc/model"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

func newRouter(c *capture.Capture, p *mockStatusProvider) *gin.Engine {
func newRouter(c capture.Capture, p *mockStatusProvider) *gin.Engine {
router := gin.New()
RegisterOpenAPIRoutes(router, NewOpenAPI4Test(c, p))
return router
Expand Down Expand Up @@ -745,6 +746,47 @@ func TestServerStatus(t *testing.T) {
require.False(t, resp.IsOwner)
}

func TestServerStatusLiveness(t *testing.T) {
t.Parallel()
// capture is owner
ctrl := gomock.NewController(t)
cp := mock_capture.NewMockCapture(ctrl)
ownerRouter := newRouter(cp, newStatusProvider())
api := testCase{url: "/api/v1/status", method: "GET"}

cp.EXPECT().Info().DoAndReturn(func() (model.CaptureInfo, error) {
return model.CaptureInfo{}, nil
}).AnyTimes()
cp.EXPECT().IsOwner().DoAndReturn(func() bool {
return true
}).AnyTimes()

// Alive.
alive := cp.EXPECT().Liveness().DoAndReturn(func() model.Liveness {
return model.LivenessCaptureAlive
})
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
ownerRouter.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
var resp model.ServerStatus
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.EqualValues(t, model.LivenessCaptureAlive, resp.Liveness)

// Draining the capture.
cp.EXPECT().Liveness().DoAndReturn(func() model.Liveness {
return model.LivenessCaptureStopping
}).After(alive)
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
ownerRouter.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.EqualValues(t, model.LivenessCaptureStopping, resp.Liveness)
}

func TestSetLogLevel(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (

// OpenAPIV2 provides CDC v2 APIs
type OpenAPIV2 struct {
capture *capture.Capture
capture capture.Capture
}

// NewOpenAPIV2 creates a new OpenAPIV2.
func NewOpenAPIV2(c *capture.Capture) OpenAPIV2 {
func NewOpenAPIV2(c capture.Capture) OpenAPIV2 {
return OpenAPIV2{capture: c}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type testCase struct {
method string
}

func newRouter(c *capture.Capture) *gin.Engine {
func newRouter(c capture.Capture) *gin.Engine {
router := gin.New()
RegisterOpenAPIV2Routes(router, NewOpenAPIV2(c))
return router
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/v2/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
// GetTso request and returns a TSO from PD
func (h *OpenAPIV2) GetTso(c *gin.Context) {
ctx := c.Request.Context()
if h.capture.UpstreamManager == nil {
if h.capture.GetUpstreamManager() == nil {
c.Status(http.StatusServiceUnavailable)
return
}
pdClient := h.capture.UpstreamManager.Get(upstream.DefaultUpstreamID).PDClient
pdClient := h.capture.GetUpstreamManager().Get(upstream.DefaultUpstreamID).PDClient
if pdClient == nil {
c.Status(http.StatusServiceUnavailable)
return
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ import (
func VerifyCreateChangefeedConfig(
ctx context.Context,
changefeedConfig model.ChangefeedConfig,
capture *capture.Capture,
capture capture.Capture,
) (*model.ChangeFeedInfo, error) {
// TODO(dongmen): we should pass ClusterID in ChangefeedConfig in the upcoming future
up := capture.UpstreamManager.Get(upstream.DefaultUpstreamID)
up := capture.GetUpstreamManager().Get(upstream.DefaultUpstreamID)
defer up.Release()

// verify sinkURI
Expand Down
Loading