Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tidb_client: improve behavior when no alive tidb instance #900

Merged
merged 6 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions pkg/tidb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ func (c *Client) OpenSQLConn(user string, pass string) (*gorm.DB, error) {

addr := c.sqlAPIAddress
if addr == "" {
if overrideEndpoint != "" {
addr = overrideEndpoint
} else {
addr = fmt.Sprintf("127.0.0.1:%d", c.forwarder.sqlPort)
var err error
if addr, err = c.forwarder.resolveSqlAddr(overrideEndpoint); err != nil {
return nil, err
}
}

Expand All @@ -133,6 +132,9 @@ func (c *Client) OpenSQLConn(user string, pass string) (*gorm.DB, error) {
if strings.HasPrefix(addr, "0.0.0.0:") {
log.Warn("TiDB reported its address to be 0.0.0.0. Please specify `-advertise-address` command line parameter when running TiDB")
}
if c.forwarder.sqlProxy.noAliveRemote.Load() {
return nil, ErrNoAliveTiDB.NewWithNoMessage()
}
return nil, ErrTiDBConnFailed.Wrap(err, "failed to connect to TiDB")
} else if mysqlErr, ok := err.(*mysql.MySQLError); ok {
if mysqlErr.Number == mysqlerr.ER_ACCESS_DENIED_ERROR {
Expand All @@ -155,13 +157,18 @@ func (c *Client) SendGetRequest(path string) ([]byte, error) {

addr := c.statusAPIAddress
if addr == "" {
if overrideEndpoint != "" {
addr = overrideEndpoint
} else {
addr = fmt.Sprintf("127.0.0.1:%d", c.forwarder.statusPort)
var err error
if addr, err = c.forwarder.resolveStatusAddr(overrideEndpoint); err != nil {
return nil, err
}
}

uri := fmt.Sprintf("%s://%s%s", c.statusAPIHTTPScheme, addr, path)
return c.statusAPIHTTPClient.WithTimeout(c.statusAPITimeout).SendRequest(c.lifecycleCtx, uri, http.MethodGet, nil, ErrTiDBClientRequestFailed, "TiDB")
result, err := c.statusAPIHTTPClient.
WithTimeout(c.statusAPITimeout).
SendRequest(c.lifecycleCtx, uri, http.MethodGet, nil, ErrTiDBClientRequestFailed, "TiDB")
if err != nil && c.forwarder.statusProxy.noAliveRemote.Load() {
return nil, ErrNoAliveTiDB.NewWithNoMessage()
}
return result, err
}
33 changes: 26 additions & 7 deletions pkg/tidb/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/joomcode/errorx"
"github.com/pingcap/log"
"github.com/pingcap/tidb-dashboard/pkg/utils/topology"
"go.etcd.io/etcd/clientv3"
"go.uber.org/fx"

"github.com/pingcap/tidb-dashboard/pkg/utils/topology"
)

var (
Expand Down Expand Up @@ -93,10 +92,8 @@ func (f *Forwarder) pollingForTiDB() {
return err
}, bo)
if err != nil {
if errorx.IsOfType(err, ErrNoAliveTiDB) {
unbyte marked this conversation as resolved.
Show resolved Hide resolved
f.sqlProxy.updateRemotes(nil)
f.statusProxy.updateRemotes(nil)
}
f.sqlProxy.updateRemotes(nil)
f.statusProxy.updateRemotes(nil)
} else {
statusEndpoints := make(map[string]struct{}, len(allTiDB))
tidbEndpoints := make(map[string]struct{}, len(allTiDB))
Expand All @@ -118,6 +115,28 @@ func (f *Forwarder) pollingForTiDB() {
}
}

func (f *Forwarder) resolveSqlAddr(override string) (string, error) {
if override != "" {
return override, nil
}
if f.sqlProxy.noAliveRemote.Load() {
log.Warn("Unable to resolve sql connection address since no alive TiDB instance")
return "", ErrNoAliveTiDB.NewWithNoMessage()
}
return fmt.Sprintf("127.0.0.1:%d", f.sqlPort), nil
unbyte marked this conversation as resolved.
Show resolved Hide resolved
}

func (f *Forwarder) resolveStatusAddr(override string) (string, error) {
if override != "" {
return override, nil
}
if f.statusProxy.noAliveRemote.Load() {
log.Warn("Unable to resolve status connection address since no alive TiDB instance")
return "", ErrNoAliveTiDB.NewWithNoMessage()
}
return fmt.Sprintf("127.0.0.1:%d", f.statusPort), nil
}

func newForwarder(lc fx.Lifecycle, etcdClient *clientv3.Client) *Forwarder {
f := &Forwarder{
config: &forwarderConfig{
Expand Down
48 changes: 28 additions & 20 deletions pkg/tidb/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type proxy struct {
checkInterval time.Duration
dialTimeout time.Duration

remotes sync.Map
current string
noAliveRemote *atomic.Bool
remotes sync.Map
current string
}

func newProxy(l net.Listener, endpoints map[string]string, checkInterval time.Duration, timeout time.Duration) *proxy {
Expand All @@ -57,6 +58,7 @@ func newProxy(l net.Listener, endpoints map[string]string, checkInterval time.Du
remotes: sync.Map{},
dialTimeout: timeout,
checkInterval: checkInterval,
noAliveRemote: atomic.NewBool(len(endpoints) == 0),
}
for key, e := range endpoints {
p.remotes.Store(key, &remote{addr: e, inactive: atomic.NewBool(true)})
Expand All @@ -74,6 +76,7 @@ func (p *proxy) updateRemotes(remotes map[string]struct{}) {
p.remotes.Delete(key)
return true
})
p.noAliveRemote.Store(true)
return
}
// update or create new remote
Expand All @@ -95,12 +98,32 @@ func (p *proxy) updateRemotes(remotes map[string]struct{}) {
}
return true
})
p.noAliveRemote.Store(false)
}

func (p *proxy) serve(in net.Conn) {
out := p.pickActiveConn()
if out == nil {
log.Warn("no alive remote, drop incoming conn")
in.Close()
return
}
// bidirectional copy
go func() {
//nolint
io.Copy(in, out)
in.Close()
out.Close()
}()
//nolint
io.Copy(out, in)
out.Close()
in.Close()
}

func (p *proxy) pickActiveConn() (out net.Conn) {
var (
err error
out net.Conn
picked *remote
)
for {
Expand All @@ -116,23 +139,8 @@ func (p *proxy) serve(in net.Conn) {
picked.becomeInactive()
log.Warn("remote become inactive", zap.String("remote", picked.addr))
}
if out == nil {
log.Warn("no alive remote, drop incoming conn")
// Do we need issue a error here?
in.Close()
return
}
// bidirectional copy
go func() {
//nolint
io.Copy(in, out)
in.Close()
out.Close()
}()
//nolint
io.Copy(out, in)
out.Close()
in.Close()
p.noAliveRemote.Store(out == nil)
return
}

// pick returns an active remote if there is any
Expand Down
2 changes: 1 addition & 1 deletion ui/lib/client/translations/en.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ error:
export_no_data: No statements can be exported
other: Other error
tidb:
no_alive_tidb: No live TiDB instance in the cluster
no_alive_tidb: No alive TiDB instance
pd_access_failed: Failed to access PD node
tidb_conn_failed: Failed to connect to TiDB
tidb_auth_failed: TiDB authentication failed
2 changes: 1 addition & 1 deletion ui/lib/client/translations/zh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ error:
export_no_data: 没有可导出的语句
other: 其他错误
tidb:
no_alive_tidb: 集群未启动 TiDB 实例
no_alive_tidb: 没有正在运行的 TiDB 实例
pd_access_failed: 无法访问 PD 节点
tidb_conn_failed: 无法连接到 TiDB
tidb_auth_failed: TiDB 登录验证失败