Skip to content

Commit

Permalink
binlog: use pumps client to write binlog (#8098)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored and zimulala committed Oct 30, 2018
1 parent 26a3a3d commit 6999f64
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 136 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type PreparedPlanCache struct {

// OpenTracing is the opentracing section of the config.
type OpenTracing struct {
Enable bool `toml:"enable" json:"enbale"`
Enable bool `toml:"enable" json:"enable"`
Sampler OpenTracingSampler `toml:"sampler" json:"sampler"`
Reporter OpenTracingReporter `toml:"reporter" json:"reporter"`
RPCMetrics bool `toml:"rpc-metrics" json:"rpc-metrics"`
Expand Down Expand Up @@ -241,7 +241,7 @@ type TiKVClient struct {

// Binlog is the config for binlog.
type Binlog struct {
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Enable bool `toml:"enable" json:"enable"`
WriteTimeout string `toml:"write-timeout" json:"write-timeout"`
// If IgnoreError is true, when writting binlog meets error, TiDB would
// ignore the error.
Expand Down
5 changes: 2 additions & 3 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,8 @@ enabled = false
capacity = 10240000

[binlog]

# Socket file to write binlog.
binlog-socket = ""
# enable to write binlog.
enable = false

# WriteTimeout specifies how long it will wait for writing binlog to pump.
write-timeout = "15s"
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestT(t *testing.T) {

func (s *testConfigSuite) TestConfig(c *C) {
conf := new(Config)
conf.Binlog.BinlogSocket = "/tmp/socket"
conf.Binlog.Enable = true
conf.Binlog.IgnoreError = true
conf.TiKVClient.CommitTimeout = "10s"

Expand All @@ -52,7 +52,7 @@ commit-timeout="41s"`)
c.Assert(conf.Load(configFile), IsNil)

// Test that the original value will not be clear by load the config file that does not contain the option.
c.Assert(conf.Binlog.BinlogSocket, Equals, "/tmp/socket")
c.Assert(conf.Binlog.Enable, Equals, true)

c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")
c.Assert(f.Close(), IsNil)
Expand Down
11 changes: 6 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/coreos/etcd/clientv3"
"github.com/ngaut/pools"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -225,7 +226,7 @@ type DDL interface {
// GetTableMaxRowID gets the max row ID of a normal table or a partition.
GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (int64, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(interface{})
SetBinlogClient(*pumpcli.PumpsClient)
}

// ddl is used to handle the statements that define the structure or schema of the database.
Expand All @@ -246,8 +247,8 @@ type ddlCtx struct {
schemaSyncer SchemaSyncer
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
binlogCli interface{} // binlogCli is used for Binlog.
lease time.Duration // lease is schema lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.

// hook may be modified.
mu struct {
Expand Down Expand Up @@ -327,7 +328,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ddlJobDoneCh: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpClient(),
binlogCli: binloginfo.GetPumpsClient(),
}
ddlCtx.mu.hook = hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
Expand Down Expand Up @@ -537,7 +538,7 @@ func (d *ddl) callHookOnChanged(err error) error {
}

// SetBinlogClient implements DDL.SetBinlogClient interface.
func (d *ddl) SetBinlogClient(binlogCli interface{}) {
func (d *ddl) SetBinlogClient(binlogCli *pumpcli.PumpsClient) {
d.binlogCli = binlogCli
}

Expand Down
4 changes: 2 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (s *session) doCommit(ctx context.Context) error {
Tp: binlog.BinlogType_Prewrite,
PrewriteValue: prewriteData,
},
Client: s.sessionVars.BinlogClient.(binlog.PumpClient),
Client: s.sessionVars.BinlogClient,
}
s.txn.SetOption(kv.BinlogInfo, info)
}
Expand Down Expand Up @@ -1187,7 +1187,7 @@ func createSession(store kv.Storage) (*session, error) {
domain.BindDomain(s, dom)
// session implements variable.GlobalVarAccessor. Bind it to ctx.
s.sessionVars.GlobalVarsAccessor = s
s.sessionVars.BinlogClient = binloginfo.GetPumpClient()
s.sessionVars.BinlogClient = binloginfo.GetPumpsClient()
s.txn.init()
return s, nil
}
Expand Down
3 changes: 2 additions & 1 deletion session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *testSessionSuite) TestForCoverage(c *C) {
tk.MustExec("admin check table t")

// Cover dirty table operations in StateTxn.
tk.Se.GetSessionVars().BinlogClient = &mockBinlogPump{}
tk.Se.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&mockBinlogPump{})
tk.MustExec("begin")
tk.MustExec("truncate table t")
tk.MustExec("insert t values ()")
Expand Down
29 changes: 0 additions & 29 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type domainMap struct {
Expand Down Expand Up @@ -266,33 +264,6 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
return s, errors.Trace(err)
}

// DialPumpClientWithRetry tries to dial to binlogSocket,
// if any error happens, it will try to re-dial,
// or return this error when timeout.
func DialPumpClientWithRetry(binlogSocket string, maxRetries int, dialerOpt grpc.DialOption) (*grpc.ClientConn, error) {
var clientCon *grpc.ClientConn
err := util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
log.Infof("setup binlog client")
var err error
tlsConfig, err := config.GetGlobalConfig().Security.ToTLSConfig()
if err != nil {
log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err))
}

if tlsConfig != nil {
clientCon, err = grpc.Dial(binlogSocket, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), dialerOpt)
} else {
clientCon, err = grpc.Dial(binlogSocket, grpc.WithInsecure(), dialerOpt)
}

if err != nil {
log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err))
}
return true, errors.Trace(err)
})
return clientCon, errors.Trace(err)
}

var queryStmtTable = []string{"explain", "select", "show", "execute", "describe", "desc", "admin"}

func trimSQL(sql string) string {
Expand Down
17 changes: 0 additions & 17 deletions session/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/testleak"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -125,21 +123,6 @@ func (s *testMainSuite) TestRetryOpenStore(c *C) {
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second))
}

func (s *testMainSuite) TestRetryDialPumpClient(c *C) {
retryDialPumpClientMustFail := func(binlogSocket string, clientCon *grpc.ClientConn, maxRetries int, dialerOpt grpc.DialOption) (err error) {
return util.RunWithRetry(maxRetries, 10, func() (bool, error) {
// Assume that it'll always return an error.
return true, errors.New("must fail")
})
}
begin := time.Now()
err := retryDialPumpClientMustFail("", nil, 3, nil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "must fail")
elapse := time.Since(begin)
c.Assert(uint64(elapse), GreaterEqual, uint64(6*10*time.Millisecond))
}

func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) {
store, dom := newStoreWithBootstrap(c, s.dbName+"goroutine_leak")
defer dom.Close()
Expand Down
Loading

0 comments on commit 6999f64

Please sign in to comment.