diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 813a67c26cc..289f8aad04a 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -33,7 +33,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/txnutil" @@ -314,7 +314,7 @@ type CDCClient struct { regionCache *tikv.RegionCache kvStorage tikv.Storage - pdClock pdtime.Clock + pdClock pdutil.Clock changefeed model.ChangeFeedID regionLimiters *regionEventFeedLimiters @@ -327,7 +327,7 @@ func NewCDCClient( kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache, - pdClock pdtime.Clock, + pdClock pdutil.Clock, changefeed model.ChangeFeedID, cfg *config.KVClientConfig, ) (c CDCKVClient) { diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 0741c8e5dbe..02440a8abfb 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -27,7 +27,7 @@ import ( "github.com/pingcap/tidb/store/mockstore/mockcopr" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" @@ -203,7 +203,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 1000000) @@ -300,7 +300,7 @@ func prepareBench(b *testing.B, regionNum int) ( regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 1000000) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index dbb59933137..d664569c9e2 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -36,7 +36,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" @@ -71,7 +71,7 @@ func TestNewClient(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cli := NewCDCClient( - context.Background(), pdClient, nil, grpcPool, regionCache, pdtime.NewClock4Test(), + context.Background(), pdClient, nil, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) require.NotNil(t, cli) @@ -322,7 +322,7 @@ func TestConnectOfflineTiKV(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) // Take care of the eventCh, it's used to output resolvedTs event or kv event @@ -427,7 +427,7 @@ func TestRecvLargeMessageSize(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -530,7 +530,7 @@ func TestHandleError(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -692,7 +692,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -763,7 +763,7 @@ func TestClusterIDMismatch(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -835,7 +835,7 @@ func testHandleFeedEvent(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -1293,7 +1293,7 @@ func TestStreamSendWithError(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -1409,7 +1409,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -1543,7 +1543,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -1754,7 +1754,7 @@ func TestIncompatibleTiKV(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) // NOTICE: eventCh may block the main logic of EventFeed @@ -1835,7 +1835,7 @@ func TestNoPendingRegionError(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -1917,7 +1917,7 @@ func TestDropStaleRequest(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2031,7 +2031,7 @@ func TestResolveLock(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2137,7 +2137,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2293,7 +2293,7 @@ func testEventAfterFeedStop(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2477,7 +2477,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2696,7 +2696,7 @@ func TestResolveLockNoCandidate(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2795,7 +2795,7 @@ func TestFailRegionReentrant(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2880,7 +2880,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -2950,7 +2950,7 @@ func testClientErrNoPendingRegion(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -3031,7 +3031,7 @@ func testKVClientForceReconnect(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -3183,7 +3183,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 100) @@ -3303,7 +3303,7 @@ func TestEvTimeUpdate(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -3428,7 +3428,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) @@ -3523,7 +3523,7 @@ func TestPrewriteNotMatchError(t *testing.T) { regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() cdcClient := NewCDCClient( - ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + ctx, pdClient, kvStorage, grpcPool, regionCache, pdutil.NewClock4Test(), model.DefaultChangeFeedID(""), config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index a8413da98ad..300d843713b 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller/frontier" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/txnutil" "github.com/tikv/client-go/v2/oracle" @@ -72,7 +72,7 @@ func NewPuller( grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, kvStorage tidbkv.Storage, - pdClock pdtime.Clock, + pdClock pdutil.Clock, changefeed model.ChangeFeedID, checkpointTs uint64, spans []regionspan.Span, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 227f1a1a95c..39bfb22c8d3 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -27,7 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" @@ -61,7 +61,7 @@ func newMockCDCKVClient( kvStorage tikv.Storage, grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, - pdClock pdtime.Clock, + pdClock pdutil.Clock, changefeed model.ChangeFeedID, cfg *config.KVClientConfig, ) kv.CDCKVClient { @@ -127,7 +127,7 @@ func newPullerForTest( regionCache := tikv.NewRegionCache(pdCli) defer regionCache.Close() plr := NewPuller( - ctx, pdCli, grpcPool, regionCache, store, pdtime.NewClock4Test(), + ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), model.DefaultChangeFeedID("changefeed-id-test"), checkpointTs, spans, config.GetDefaultServerConfig().KVClient) wg.Add(1) diff --git a/pkg/httputil/httputil.go b/pkg/httputil/httputil.go index 27fa7ab3cef..64fa20b1f78 100644 --- a/pkg/httputil/httputil.go +++ b/pkg/httputil/httputil.go @@ -15,6 +15,7 @@ package httputil import ( "context" + "io" "net/http" "net/url" "strings" @@ -86,6 +87,37 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { return c.client.Do(req) } +// DoRequest sends an request and returns an HTTP response content. +func (c *Client) DoRequest( + ctx context.Context, url, method string, headers http.Header, body io.Reader, +) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, method, url, body) + if err != nil { + return nil, errors.Trace(err) + } + + for key, values := range headers { + for _, v := range values { + req.Header.Add(key, v) + } + } + + resp, err := c.Do(req) + if err != nil { + return nil, errors.Trace(err) + } + defer resp.Body.Close() + + content, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + if resp.StatusCode != http.StatusOK { + return nil, errors.Errorf("[%d] %s", resp.StatusCode, content) + } + return content, nil +} + // CloseIdleConnections closes any connections are now sitting idle. // See http.Client.CloseIdleConnections. func (c *Client) CloseIdleConnections() { diff --git a/pkg/pdtime/clock.go b/pkg/pdutil/clock.go similarity index 99% rename from pkg/pdtime/clock.go rename to pkg/pdutil/clock.go index 2c0b4659682..5efaabe838d 100644 --- a/pkg/pdtime/clock.go +++ b/pkg/pdutil/clock.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pdtime +package pdutil import ( "context" diff --git a/pkg/pdtime/clock_test.go b/pkg/pdutil/clock_test.go similarity index 98% rename from pkg/pdtime/clock_test.go rename to pkg/pdutil/clock_test.go index 2a0440c41af..01be7eb2fb6 100644 --- a/pkg/pdtime/clock_test.go +++ b/pkg/pdutil/clock_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pdtime +package pdutil import ( "context" diff --git a/pkg/pdtime/main_test.go b/pkg/pdutil/main_test.go similarity index 92% rename from pkg/pdtime/main_test.go rename to pkg/pdutil/main_test.go index 24c9b4e26ad..141c4e4c542 100644 --- a/pkg/pdtime/main_test.go +++ b/pkg/pdutil/main_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 PingCAP, Inc. +// Copyright 2022 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pdtime +package pdutil import ( "testing" diff --git a/pkg/pdutil/region_label.go b/pkg/pdutil/region_label.go new file mode 100644 index 00000000000..1f1c4640e18 --- /dev/null +++ b/pkg/pdutil/region_label.go @@ -0,0 +1,116 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "bytes" + "context" + "net/http" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/httputil" + "github.com/pingcap/tiflow/pkg/retry" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const ( + regionLabelPrefix = "/pd/api/v1/config/region-label/rules" + + // Split the default rule by `6e000000000000000000f8` to keep metadata region + // isolated from the normal data area. + addMetaJSON = `{ + "sets": [ + { + "id": "ticdc/meta", + "labels": [ + { + "key": "data-type", + "value": "meta" + } + ], + "rule_type": "key-range", + "data": [ + { + "start_key": "6d00000000000000f8", + "end_key": "6e00000000000000f8" + } + ] + } + ] + }` +) + +var defaultMaxRetry uint64 = 3 + +// pdAPIClient is api client of Placement Driver. +type pdAPIClient struct { + pdClient pd.Client + dialClient *httputil.Client +} + +// newPDApiClient create a new pdAPIClient. +func newPDApiClient(ctx context.Context, pdClient pd.Client) (*pdAPIClient, error) { + conf := config.GetGlobalServerConfig() + dialClient, err := httputil.NewClient(conf.Security) + if err != nil { + return nil, errors.Trace(err) + } + return &pdAPIClient{ + pdClient: pdClient, + dialClient: dialClient, + }, nil +} + +// UpdateMetaLabel is a reentrant function that updates the meta-region label of upstream cluster. +func UpdateMetaLabel(ctx context.Context, pdClient pd.Client) error { + pc, err := newPDApiClient(ctx, pdClient) + if err != nil { + return err + } + defer pc.dialClient.CloseIdleConnections() + + err = retry.Do(ctx, func() error { + err = pc.patchMetaLabel(ctx) + if err != nil { + log.Error("Fail to add meta region label to PD", zap.Error(err)) + return err + } + + log.Info("Succeed to add meta region label to PD") + return nil + }, retry.WithMaxTries(defaultMaxRetry), retry.WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + return err +} + +func (pc *pdAPIClient) patchMetaLabel(ctx context.Context) error { + url := pc.pdClient.GetLeaderAddr() + regionLabelPrefix + header := http.Header{"Content-Type": {"application/json"}} + content := []byte(addMetaJSON) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + _, err := pc.dialClient.DoRequest(ctx, url, http.MethodPatch, + header, bytes.NewReader(content)) + return errors.Trace(err) +} diff --git a/pkg/pdutil/region_label_test.go b/pkg/pdutil/region_label_test.go new file mode 100644 index 00000000000..44b107083e3 --- /dev/null +++ b/pkg/pdutil/region_label_test.go @@ -0,0 +1,85 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" +) + +type mockPDClient struct { + pd.Client + testServer *httptest.Server + url string +} + +func (m *mockPDClient) GetLeaderAddr() string { + return m.url +} + +func newMockPDClient(ctx context.Context, normal bool) *mockPDClient { + mock := &mockPDClient{} + status := http.StatusOK + if !normal { + status = http.StatusNotFound + } + mock.testServer = httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(status) + }, + )) + mock.url = mock.testServer.URL + + return mock +} + +func TestMetaLabelNormal(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockClient := newMockPDClient(ctx, true) + + err := UpdateMetaLabel(ctx, mockClient) + require.Nil(t, err) + mockClient.testServer.Close() +} + +func TestMetaLabelFail(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockClient := newMockPDClient(ctx, false) + pc, err := newPDApiClient(ctx, mockClient) + require.Nil(t, err) + mockClient.url = "http://127.0.1.1:2345" + + // test url error + err = pc.patchMetaLabel(ctx) + require.NotNil(t, err) + + // test 404 + mockClient.url = mockClient.testServer.URL + err = pc.patchMetaLabel(ctx) + require.Regexp(t, ".*404.*", err) + + err = UpdateMetaLabel(ctx, mockClient) + require.ErrorIs(t, err, cerror.ErrReachMaxTry) + mockClient.testServer.Close() +} diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 50c4683bcd7..037d63ba371 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -48,7 +48,7 @@ type Manager interface { type gcManager struct { pdClient pd.Client - pdClock pdtime.Clock + pdClock pdutil.Clock gcTTL int64 lastUpdatedTime time.Time @@ -58,7 +58,7 @@ type gcManager struct { } // NewManager creates a new Manager. -func NewManager(pdClient pd.Client, pdClock pdtime.Clock) Manager { +func NewManager(pdClient pd.Client, pdClock pdutil.Clock) Manager { serverConfig := config.GetGlobalServerConfig() failpoint.Inject("InjectGcSafepointUpdateInterval", func(val failpoint.Value) { gcSafepointUpdateInterval = time.Duration(val.(int) * int(time.Millisecond)) diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 0e5bc1ee951..2102115ac2a 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -18,13 +18,12 @@ import ( "testing" "time" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/pdtime" - "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/model" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/tikv/client-go/v2/oracle" ) @@ -40,7 +39,7 @@ type gcManagerSuite struct{} func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { defer testleak.AfterTest(c)() mockPDClient := &MockPDClient{} - pdClock := pdtime.NewClock4Test() + pdClock := pdutil.NewClock4Test() gcManager := NewManager(mockPDClient, pdClock).(*gcManager) ctx := cdcContext.NewBackendContext4Test(true) @@ -94,7 +93,7 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { defer testleak.AfterTest(c)() mockPDClient := &MockPDClient{} - pdClock := pdtime.NewClock4Test() + pdClock := pdutil.NewClock4Test() gcManager := NewManager(mockPDClient, pdClock).(*gcManager) gcManager.isTiCDCBlockGC = true ctx := context.Background() diff --git a/pkg/upstream/upstream.go b/pkg/upstream/upstream.go index 520b50295b6..5a0723a7b65 100644 --- a/pkg/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -25,7 +25,7 @@ import ( tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/pdtime" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/tikv" @@ -55,7 +55,7 @@ type Upstream struct { KVStorage tidbkv.Storage GrpcPool kv.GrpcPool RegionCache *tikv.RegionCache - PDClock pdtime.Clock + PDClock pdutil.Clock GCManager gc.Manager wg *sync.WaitGroup @@ -71,7 +71,7 @@ func newUpstream(clusterID uint64, pdEndpoints []string, securityConfig *config. // NewUpstream4Test new a upstream for unit test. func NewUpstream4Test(pdClient pd.Client) *Upstream { - pdClock := pdtime.NewClock4Test() + pdClock := pdutil.NewClock4Test() gcManager := gc.NewManager(pdClient, pdClock) res := &Upstream{PDClient: pdClient, PDClock: pdClock, GCManager: gcManager, status: uninit} res.status = normal @@ -127,7 +127,7 @@ func (up *Upstream) init(ctx context.Context) error { up.RegionCache = tikv.NewRegionCache(up.PDClient) log.Info("upstream's RegionCache created", zap.Uint64("clusterID", up.clusterID)) - up.PDClock, err = pdtime.NewClock(ctx, up.PDClient) + up.PDClock, err = pdutil.NewClock(ctx, up.PDClient) if err != nil { return errors.Trace(err) } @@ -136,6 +136,15 @@ func (up *Upstream) init(ctx context.Context) error { up.GCManager = gc.NewManager(up.PDClient, up.PDClock) log.Info("upstream's GCManager created", zap.Uint64("clusterID", up.clusterID)) + // Update meta-region label to ensure that meta region isolated from data regions. + err = pdutil.UpdateMetaLabel(ctx, up.PDClient) + if err != nil { + log.Warn("Fail to verify region label rule", + zap.Error(err), + zap.Uint64("upstreamClusterID", up.clusterID), + zap.Strings("upstramEndpoints", up.pdEndpoints)) + } + up.wg.Add(1) go func() { defer up.wg.Done() diff --git a/tests/integration_tests/simple/run.sh b/tests/integration_tests/simple/run.sh index bfd2dc8dcd5..7a50b2e6037 100644 --- a/tests/integration_tests/simple/run.sh +++ b/tests/integration_tests/simple/run.sh @@ -97,8 +97,22 @@ function sql_test() { cleanup_process $CDC_BINARY } +function region_label_test() { + i=0 + while [ -z "$(curl -X GET http://127.0.0.1:2379/pd/api/v1/config/region-label/rules 2>/dev/null | grep 'meta')" ]; do + i=$((i + 1)) + if [ "$i" -gt 5 ]; then + echo 'Failed to verify meta region labels' + exit 1 + fi + sleep 1 + done + echo 'succeed to verify meta placement rules' +} + trap stop_tidb_cluster EXIT prepare $* +region_label_test $* sql_test $* check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"