Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(both): update dep of etcd, go-mysql and others #4755

Merged
merged 16 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cdc/api/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/client-go/v2/oracle"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"

"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/tikv/client-go/v2/oracle"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/client/v3/concurrency"
)

func TestHTTPStatus(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/version"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
)

// status of cdc server
Expand Down
13 changes: 7 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
Expand All @@ -39,12 +46,6 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
Expand Down
7 changes: 4 additions & 3 deletions cdc/processor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -30,9 +34,6 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/version"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

const (
Expand Down
9 changes: 5 additions & 4 deletions cdc/processor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pingcap/tiflow/cdc/model"
pscheduler "github.com/pingcap/tiflow/cdc/scheduler"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/version"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)

const (
Expand Down
7 changes: 4 additions & 3 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/redo/common"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber-go/atomic"
pioutil "go.etcd.io/etcd/pkg/ioutil"
pioutil "go.etcd.io/etcd/pkg/v3/ioutil"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/pingcap/tiflow/cdc/redo/common"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
Expand Down
19 changes: 10 additions & 9 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"

"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/sorter/unified"
Expand All @@ -41,15 +51,6 @@ import (
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
p2pProto "github.com/pingcap/tiflow/proto/p2p"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

const (
Expand Down
11 changes: 6 additions & 5 deletions cdc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
"time"

"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/tempurl"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"golang.org/x/sync/errgroup"

"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -37,11 +43,6 @@ import (
"github.com/pingcap/tiflow/pkg/retry"
security2 "github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"golang.org/x/sync/errgroup"
)

type testServer struct {
Expand Down
26 changes: 15 additions & 11 deletions dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ const (

var getAllServerIDFunc = utils.GetAllServerID

// SampleConfigFile is sample config file of source.
// SampleSourceConfig is sample config file of source.
// The embed source.yaml is a copy of dm/master/source.yaml, because embed
// can only match regular files in the current directory and subdirectories.
//go:embed source.yaml
var SampleConfigFile string
var SampleSourceConfig string

// PurgeConfig is the configuration for Purger.
type PurgeConfig struct {
Expand Down Expand Up @@ -170,6 +170,18 @@ func ParseYaml(content string) (*SourceConfig, error) {
return c, nil
}

// ParseYamlAndVerify does ParseYaml and Verify.
func ParseYamlAndVerify(content string) (*SourceConfig, error) {
c, err := ParseYaml(content)
if err != nil {
return nil, err
}
if err = c.Verify(); err != nil {
return nil, err
}
return c, nil
}

// EncodeToml encodes config.
func (c *SourceConfig) EncodeToml() (string, error) {
buf := new(bytes.Buffer)
Expand Down Expand Up @@ -348,19 +360,11 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {

// LoadFromFile loads config from file.
func LoadFromFile(path string) (*SourceConfig, error) {
c := newSourceConfig()
content, err := os.ReadFile(path)
if err != nil {
return nil, terror.ErrConfigReadCfgFromFile.Delegate(err, path)
}
if err = yaml.UnmarshalStrict(content, c); err != nil {
return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
}
c.adjust()
if err = c.Verify(); err != nil {
return nil, err
}
return c, nil
return ParseYaml(string(content))
}

func (c *SourceConfig) check(metaData *toml.MetaData, err error) error {
Expand Down
17 changes: 7 additions & 10 deletions dm/dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ import (
"github.com/pingcap/tiflow/dm/pkg/utils"
)

// do not forget to update this path if the file removed/renamed.
const sourceSampleFile = "../worker/source.yaml"

func (t *testConfig) TestConfig(c *C) {
cfg, err := LoadFromFile(sourceSampleFile)
cfg, err := ParseYaml(SampleSourceConfig)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
c.Assert(cfg.RelayDir, Equals, "./xx")
Expand Down Expand Up @@ -134,7 +131,7 @@ aaa: xxx

func (t *testConfig) TestConfigVerify(c *C) {
newConfig := func() *SourceConfig {
cfg, err := LoadFromFile(sourceSampleFile)
cfg, err := ParseYaml(SampleSourceConfig)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
return cfg
Expand Down Expand Up @@ -240,7 +237,7 @@ func (t *testConfig) TestConfigVerify(c *C) {
}

func (t *testConfig) TestSourceConfigForDowngrade(c *C) {
cfg, err := LoadFromFile(sourceSampleFile)
cfg, err := ParseYaml(SampleSourceConfig)
c.Assert(err, IsNil)

// make sure all new field were added
Expand Down Expand Up @@ -274,7 +271,7 @@ func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedErr
}

func (t *testConfig) TestAdjustFlavor(c *C) {
cfg, err := LoadFromFile(sourceSampleFile)
cfg, err := ParseYaml(SampleSourceConfig)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

Expand All @@ -297,7 +294,7 @@ func (t *testConfig) TestAdjustServerID(c *C) {
}()
getAllServerIDFunc = getMockServerIDs

cfg, err := LoadFromFile(sourceSampleFile)
cfg, err := ParseYaml(SampleSourceConfig)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

Expand All @@ -317,7 +314,7 @@ func getMockServerIDs(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err
}

func (t *testConfig) TestAdjustCaseSensitive(c *C) {
cfg, err := LoadFromFile(sourceSampleFile)
cfg, err := ParseYaml(SampleSourceConfig)
c.Assert(err, IsNil)

db, mock, err := sqlmock.New()
Expand All @@ -339,5 +336,5 @@ func (t *testConfig) TestAdjustCaseSensitive(c *C) {
func (t *testConfig) TestEmbedSampleFile(c *C) {
data, err := os.ReadFile("./source.yaml")
c.Assert(err, IsNil)
c.Assert(SampleConfigFile, Equals, string(data))
c.Assert(SampleSourceConfig, Equals, string(data))
}
5 changes: 4 additions & 1 deletion dm/dm/config/source_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ package config

import (
"github.com/pingcap/check"

"github.com/pingcap/tiflow/dm/openapi/fixtures"
)

func (t *testConfig) TestConverterWithSourceAndOpenAPISource(c *check.C) {
sourceCfg1, err := LoadFromFile(sourceSampleFile)
sourceCfg1, err := ParseYaml(SampleSourceConfig)
c.Assert(err, check.IsNil)

// 1. test user create source from dmctl, after convert to openapi.Source then convert back to source config
Expand All @@ -28,6 +29,8 @@ func (t *testConfig) TestConverterWithSourceAndOpenAPISource(c *check.C) {
// we need set ServerID and MaxAllowedPacket manually, because user don't need to config those field in openapi
sourceCfg2.ServerID = sourceCfg1.ServerID
sourceCfg2.From.MaxAllowedPacket = sourceCfg1.From.MaxAllowedPacket
// TODO: OpenAPI source config will add flavor
sourceCfg1.Flavor = ""

// we only need to make sure the source config that user can see is the same as the source config that user create
c.Assert(sourceCfg1.String(), check.Equals, sourceCfg2.String())
Expand Down
5 changes: 5 additions & 0 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package config

import (
"bytes"
_ "embed"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -270,6 +271,10 @@ type SubTaskConfig struct {
} `yaml:"experimental" toml:"experimental" json:"experimental"`
}

// SampleSubtaskConfig is the content of subtask.toml in current folder.
//go:embed subtask.toml
var SampleSubtaskConfig string

// NewSubTaskConfig creates a new SubTaskConfig.
func NewSubTaskConfig() *SubTaskConfig {
cfg := &SubTaskConfig{}
Expand Down
Loading