From 8ba809622668a1287e9f7152bfd34ad6be220304 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 21 Dec 2018 23:39:16 +0800 Subject: [PATCH] *: run 'go mod vendor' (#8772) --- go.mod | 2 +- .../pingcap/tidb-tools/pkg/etcd/etcd.go | 2 +- .../tidb-tools/tidb-binlog/node/registry.go | 2 +- .../tidb-binlog/pump_client/client.go | 90 +++++++++++++++---- .../tidb-binlog/pump_client/pump.go | 20 ++++- .../tidb-binlog/pump_client/selector.go | 45 ++++++++++ vendor/modules.txt | 2 +- 7 files changed, 137 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index af48cf2f7f834..b073c8997d09a 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e // indirect github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect - github.com/etcd-io/gofail v0.0.0-20180808172546-51ce9a71510a + github.com/etcd-io/gofail v0.0.0-20180808172546-51ce9a71510a // indirect github.com/fsnotify/fsnotify v1.4.7 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-sql-driver/mysql v0.0.0-20170715192408-3955978caca4 diff --git a/vendor/github.com/pingcap/tidb-tools/pkg/etcd/etcd.go b/vendor/github.com/pingcap/tidb-tools/pkg/etcd/etcd.go index 0beb844ac9bf4..beb866c2dde51 100644 --- a/vendor/github.com/pingcap/tidb-tools/pkg/etcd/etcd.go +++ b/vendor/github.com/pingcap/tidb-tools/pkg/etcd/etcd.go @@ -14,6 +14,7 @@ package etcd import ( + "context" "crypto/tls" "path" "strings" @@ -21,7 +22,6 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/pingcap/errors" - "golang.org/x/net/context" ) // Node organizes the ectd query result as a Trie tree diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/registry.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/registry.go index 46e1e1697b1d2..9637e9c41e340 100644 --- a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/registry.go +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/node/registry.go @@ -1,6 +1,7 @@ package node import ( + "context" "encoding/json" "path" "strings" @@ -10,7 +11,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb-tools/pkg/etcd" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) // EtcdRegistry wraps the reactions with etcd diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/client.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/client.go index 48e4a537010ba..e8d092fec277b 100644 --- a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/client.go +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/client.go @@ -14,6 +14,7 @@ package client import ( + "context" "crypto/tls" "encoding/json" "path" @@ -29,15 +30,17 @@ import ( "github.com/pingcap/tidb-tools/tidb-binlog/node" pb "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) const ( // DefaultEtcdTimeout is the default timeout config for etcd. DefaultEtcdTimeout = 5 * time.Second - // DefaultRetryTime is the default time of retry. - DefaultRetryTime = 20 + // DefaultAllRetryTime is the default retry time for all pumps, should greter than RetryTime. + DefaultAllRetryTime = 20 + + // RetryTime is the retry time for each pump. + RetryTime = 5 // DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump. DefaultBinlogWriteTimeout = 15 * time.Second @@ -71,6 +74,15 @@ type PumpInfos struct { UnAvaliablePumps map[string]*PumpStatus } +// NewPumpInfos returns a PumpInfos. +func NewPumpInfos() *PumpInfos { + return &PumpInfos{ + Pumps: make(map[string]*PumpStatus), + AvaliablePumps: make(map[string]*PumpStatus), + UnAvaliablePumps: make(map[string]*PumpStatus), + } +} + // PumpsClient is the client of pumps. type PumpsClient struct { ctx context.Context @@ -99,14 +111,14 @@ type PumpsClient struct { // Security is the security config Security *tls.Config + + // binlog socket file path, for compatible with kafka version pump. + binlogSocket string } // NewPumpsClient returns a PumpsClient. +// TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now. func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) { - // TODO: get strategy from etcd, and can update strategy in real-time. now use Range as default. - strategy := Range - selector := NewSelector(strategy) - ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs) if err != nil { return nil, errors.Trace(err) @@ -130,21 +142,15 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur return nil, errors.Trace(err) } - pumpInfos := &PumpInfos{ - Pumps: make(map[string]*PumpStatus), - AvaliablePumps: make(map[string]*PumpStatus), - UnAvaliablePumps: make(map[string]*PumpStatus), - } - ctx, cancel := context.WithCancel(context.Background()) newPumpsClient := &PumpsClient{ ctx: ctx, cancel: cancel, ClusterID: clusterID, EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout), - Pumps: pumpInfos, - Selector: selector, - RetryTime: DefaultRetryTime, + Pumps: NewPumpInfos(), + Selector: NewSelector(Range), + RetryTime: DefaultAllRetryTime, BinlogWriteTimeout: timeout, Security: security, } @@ -162,7 +168,55 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur return newPumpsClient, nil } -// getPumpStatus retruns all the pumps status in the etcd. +// NewLocalPumpsClient returns a PumpsClient, this PumpsClient will write binlog by socket file. For compatible with kafka version pump. +func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error) { + ectdEndpoints, err := utils.ParseHostPortAddr(etcdURLs) + if err != nil { + return nil, errors.Trace(err) + } + + // get clusterid + pdCli, err := pd.NewClient(ectdEndpoints, securityOpt) + if err != nil { + return nil, errors.Trace(err) + } + clusterID := pdCli.GetClusterID(context.Background()) + pdCli.Close() + + security, err := utils.ToTLSConfig(securityOpt.CAPath, securityOpt.CertPath, securityOpt.KeyPath) + if err != nil { + return nil, errors.Trace(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + newPumpsClient := &PumpsClient{ + ctx: ctx, + cancel: cancel, + ClusterID: clusterID, + Pumps: NewPumpInfos(), + Selector: NewSelector(LocalUnix), + RetryTime: DefaultAllRetryTime, + BinlogWriteTimeout: timeout, + Security: security, + binlogSocket: binlogSocket, + } + newPumpsClient.getLocalPumpStatus(ctx) + + return newPumpsClient, nil +} + +// getLocalPumpStatus gets the local pump. For compatible with kafka version tidb-binlog. +func (c *PumpsClient) getLocalPumpStatus(pctx context.Context) { + nodeStatus := &node.Status{ + NodeID: localPump, + Addr: c.binlogSocket, + IsAlive: true, + State: node.Online, + } + c.addPump(NewPumpStatus(nodeStatus, c.Security), true) +} + +// getPumpStatus gets all the pumps status in the etcd. func (c *PumpsClient) getPumpStatus(pctx context.Context) error { nodesStatus, err := c.EtcdRegistry.Nodes(pctx, node.NodePrefix[node.PumpNode]) if err != nil { @@ -221,7 +275,7 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error { } // every pump can retry 5 times, if retry 5 times and still failed, set this pump unavaliable, and choose a new pump. - if (retryTime+1)%5 == 0 { + if (retryTime+1)%RetryTime == 0 { c.setPumpAvaliable(pump, false) pump = c.Selector.Next(binlog, retryTime/5+1) Logger.Debugf("[pumps client] avaliable pumps: %v, write binlog choose pump %v", c.Pumps.AvaliablePumps, pump) diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/pump.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/pump.go index 370ddba25ee17..bd45b9c662171 100644 --- a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/pump.go +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/pump.go @@ -14,6 +14,7 @@ package client import ( + "context" "crypto/tls" "net" "time" @@ -21,11 +22,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb-tools/tidb-binlog/node" pb "github.com/pingcap/tipb/go-binlog" - "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) +var ( + // localPump is used to write local pump through unix socket connection. + localPump = "localPump" +) + // PumpStatus saves pump's status. type PumpStatus struct { /* @@ -78,9 +83,16 @@ func (p *PumpStatus) createGrpcClient(security *tls.Config) error { p.grpcConn.Close() } - dialerOpt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("tcp", addr, timeout) - }) + var dialerOpt grpc.DialOption + if p.NodeID == localPump { + dialerOpt = grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }) + } else { + dialerOpt = grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("tcp", addr, timeout) + }) + } Logger.Debugf("[pumps client] create gcpc client at %s", p.Addr) var clientConn *grpc.ClientConn var err error diff --git a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/selector.go b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/selector.go index 343c250e1cc67..5a7f7257c3cf4 100644 --- a/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/selector.go +++ b/vendor/github.com/pingcap/tidb-tools/tidb-binlog/pump_client/selector.go @@ -30,6 +30,9 @@ const ( // Score means choose pump by it's score. Score = "score" + + // LocalUnix means will only use the local pump by unix socket. + LocalUnix = "local unix" ) // PumpSelector selects pump for sending binlog. @@ -224,6 +227,46 @@ func (r *RangeSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { return nextPump } +// LocalUnixSelector will always select the local pump, used for compatible with kafka version tidb-binlog. +type LocalUnixSelector struct { + sync.RWMutex + + // the pump to be selected. + Pump *PumpStatus +} + +// NewLocalUnixSelector returns a new LocalUnixSelector. +func NewLocalUnixSelector() PumpSelector { + return &LocalUnixSelector{} +} + +// SetPumps implement PumpSelector.SetPumps. +func (u *LocalUnixSelector) SetPumps(pumps []*PumpStatus) { + u.Lock() + if len(pumps) == 0 { + u.Pump = nil + } else { + u.Pump = pumps[0] + } + u.Unlock() +} + +// Select implement PumpSelector.Select. +func (u *LocalUnixSelector) Select(binlog *pb.Binlog) *PumpStatus { + u.RLock() + defer u.RUnlock() + + return u.Pump +} + +// Next implement PumpSelector.Next. Only for Prewrite binlog. +func (u *LocalUnixSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { + u.RLock() + defer u.RUnlock() + + return u.Pump +} + // ScoreSelector select a pump by pump's score. type ScoreSelector struct{} @@ -259,6 +302,8 @@ func NewSelector(algorithm string) PumpSelector { selector = NewHashSelector() case Score: selector = NewScoreSelector() + case LocalUnix: + selector = NewLocalUnixSelector() default: Logger.Warnf("unknow algorithm %s, use range as default", algorithm) selector = NewRangeSelector() diff --git a/vendor/modules.txt b/vendor/modules.txt index 97dd12acede42..ce6219a1961a8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -100,7 +100,7 @@ github.com/pingcap/parser/charset github.com/pingcap/parser/format # github.com/pingcap/pd v2.1.0-rc.4+incompatible github.com/pingcap/pd/client -# github.com/pingcap/tidb-tools v0.0.0-20181112132202-4860a0d5de03 +# github.com/pingcap/tidb-tools v2.1.1-0.20181218072513-b2235d442b06+incompatible github.com/pingcap/tidb-tools/tidb-binlog/pump_client github.com/pingcap/tidb-tools/tidb-binlog/node github.com/pingcap/tidb-tools/pkg/etcd