Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

refactor: suppress some golint errors #66

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ bin
pegasus-tools-*
coverage.txt
.idea
.*-history
*.log
11 changes: 10 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ linters:
- errcheck
- ineffassign
- gosimple
- govet
enable:
- govet
- gofmt
- goimports
- bodyclose
- exhaustive
- exportloopref
- goimports

run:
skip-files:
- idl/base/error_code.go
- idl/base/rpc_address.go
4 changes: 4 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ignore:
- "session/admin_rpc_types.go"
- "session/radmin_rpc_types.go"
- "idl/**"
4 changes: 4 additions & 0 deletions pegasus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ package pegasus
type Config struct {
MetaServers []string `json:"meta_servers"`
}

var testingCfg = Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
}
4 changes: 0 additions & 4 deletions pegasus/table_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ func testSingleKeyOperations(t *testing.T, tb TableConnector, hashKey []byte, so
assert.Nil(t, tb.Del(context.Background(), hashKey, sortKey))
}

var testingCfg = Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
}

func TestPegasusTableConnector_SingleKeyOperations(t *testing.T) {
defer leaktest.Check(t)()

Expand Down
44 changes: 20 additions & 24 deletions rpc/rpc_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ import (
"github.com/XiaoMi/pegasus-go-client/pegalog"
)

// TODO(wutao1): make these parameters configurable
const (
RpcConnKeepAliveInterval = time.Second * 30
RpcConnDialTimeout = time.Second * 3
RpcConnReadTimeout = time.Second
RpcConnWriteTimeout = time.Second
var (
rpcConnKeepAliveInterval = time.Second * 30
rpcConnDialTimeout = time.Second * 3
rpcConnReadTimeout = 1 * time.Second
rpcConnWriteTimeout = 1 * time.Second
)

type ConnState int
Expand Down Expand Up @@ -66,9 +65,6 @@ type RpcConn struct {
rstream *ReadStream
conn net.Conn

writeTimeout time.Duration
readTimeout time.Duration

cstate ConnState
mu sync.RWMutex

Expand Down Expand Up @@ -102,8 +98,8 @@ func (rc *RpcConn) TryConnect() (err error) {

// unlock for blocking call
d := &net.Dialer{
KeepAlive: RpcConnKeepAliveInterval,
Timeout: RpcConnDialTimeout,
KeepAlive: rpcConnKeepAliveInterval,
Timeout: rpcConnDialTimeout,
}
conn, err := d.Dial("tcp", rc.Endpoint)

Expand Down Expand Up @@ -146,7 +142,7 @@ func (rc *RpcConn) Write(msgBytes []byte) (err error) {

tcpConn, ok := rc.conn.(*net.TCPConn)
if ok {
tcpConn.SetWriteDeadline(time.Now().Add(rc.writeTimeout))
tcpConn.SetWriteDeadline(time.Now().Add(rpcConnWriteTimeout))
}

return rc.wstream.Write(msgBytes)
Expand All @@ -172,7 +168,7 @@ func (rc *RpcConn) Read(size int) (bytes []byte, err error) {

tcpConn, ok := rc.conn.(*net.TCPConn)
if ok {
tcpConn.SetReadDeadline(time.Now().Add(rc.readTimeout))
tcpConn.SetReadDeadline(time.Now().Add(rpcConnReadTimeout))
}

bytes, err = rc.rstream.Next(size)
Expand All @@ -188,22 +184,22 @@ func (rc *RpcConn) Read(size int) (bytes []byte, err error) {
// Returns an idle connection.
func NewRpcConn(addr string) *RpcConn {
return &RpcConn{
Endpoint: addr,
logger: pegalog.GetLogger(),
cstate: ConnStateInit,
readTimeout: RpcConnReadTimeout,
writeTimeout: RpcConnWriteTimeout,
Endpoint: addr,
logger: pegalog.GetLogger(),
cstate: ConnStateInit,
}
}

// Not thread-safe
func (rc *RpcConn) SetWriteTimeout(timeout time.Duration) {
rc.writeTimeout = timeout
// SetWriteTimeout sets the TCP deadline for waiting write to complete.
// If timeout exceeds, the TCP write returns error.
func SetWriteTimeout(timeout time.Duration) {
rpcConnWriteTimeout = timeout
}

// Not thread-safe
func (rc *RpcConn) SetReadTimeout(timeout time.Duration) {
rc.readTimeout = timeout
// SetReadTimeout sets the TCP deadline for waiting read to complete.
// If timeout exceeds, the TCP read returns error.
func SetReadTimeout(timeout time.Duration) {
rpcConnReadTimeout = timeout
}

func (rc *RpcConn) setReady(reader io.Reader, writer io.Writer) {
Expand Down
10 changes: 5 additions & 5 deletions session/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (p *PegasusCodec) Marshal(v interface{}) ([]byte, error) {

header := &thriftHeader{
headerLength: uint32(thriftHeaderBytesLen),
appId: r.Gpid.Appid,
appID: r.Gpid.Appid,
partitionIndex: r.Gpid.PartitionIndex,
threadHash: gpidToThreadHash(r.Gpid),
partitionHash: 0,
Expand Down Expand Up @@ -75,13 +75,13 @@ func (p *PegasusCodec) Unmarshal(data []byte, v interface{}) error {
return err
}

name, _, seqId, err := iprot.ReadMessageBegin()
name, _, seqID, err := iprot.ReadMessageBegin()
if err != nil {
return err
}

r.Name = name
r.SeqId = seqId
r.SeqId = seqID

if ec.Errno != base.ERR_OK.String() {
// convert string to base.DsnErrCode
Expand Down Expand Up @@ -320,13 +320,13 @@ func (p *MockCodec) MockUnMarshal(unmarshal UnmarshalFunc) {
p.unmars = unmarshal
}

// a trait of the thrift-generated argument type (MetaQueryCfgArgs, RrdbPutArgs e.g.)
// RpcRequestArgs is a trait of the thrift-generated argument type (MetaQueryCfgArgs, RrdbPutArgs e.g.)
type RpcRequestArgs interface {
String() string
Write(oprot thrift.TProtocol) error
}

// a trait of the thrift-generated result type (MetaQueryCfgResult e.g.)
// RpcResponseResult is a trait of the thrift-generated result type (MetaQueryCfgResult e.g.)
type RpcResponseResult interface {
String() string
Read(iprot thrift.TProtocol) error
Expand Down
2 changes: 1 addition & 1 deletion session/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCodec_Marshal(t *testing.T) {
r := &PegasusRpcCall{
Args: arg,
Name: "RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX",
Gpid: &base.Gpid{0, 0},
Gpid: &base.Gpid{Appid: 0, PartitionIndex: 0},
SeqId: 1,
}

Expand Down
4 changes: 2 additions & 2 deletions session/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type thriftHeader struct {
headerCrc32 uint32
bodyLength uint32
bodyCrc32 uint32
appId int32
appID int32
partitionIndex int32
clientTimeout uint32
threadHash int32
Expand All @@ -41,7 +41,7 @@ func (t *thriftHeader) marshall(buf []byte) {
binary.BigEndian.PutUint32(buf[12:16], t.headerCrc32)
binary.BigEndian.PutUint32(buf[16:20], t.bodyLength)
binary.BigEndian.PutUint32(buf[20:24], t.bodyCrc32)
binary.BigEndian.PutUint32(buf[24:28], uint32(t.appId))
binary.BigEndian.PutUint32(buf[24:28], uint32(t.appID))
binary.BigEndian.PutUint32(buf[28:32], uint32(t.partitionIndex))
binary.BigEndian.PutUint32(buf[32:36], t.clientTimeout)
binary.BigEndian.PutUint32(buf[36:40], uint32(t.threadHash))
Expand Down
4 changes: 2 additions & 2 deletions session/replica_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (rs *ReplicaSession) Scan(ctx context.Context, gpid *base.Gpid, request *rr
return ret.GetSuccess(), nil
}

func (rs *ReplicaSession) ClearScanner(ctx context.Context, gpid *base.Gpid, contextId int64) error {
args := &rrdb.RrdbClearScannerArgs{ContextID: contextId}
func (rs *ReplicaSession) ClearScanner(ctx context.Context, gpid *base.Gpid, contextID int64) error {
args := &rrdb.RrdbClearScannerArgs{ContextID: contextID}
_, err := rs.CallWithGpid(ctx, gpid, args, "RPC_RRDB_RRDB_CLEAR_SCANNER")
if err != nil {
return err
Expand Down
14 changes: 7 additions & 7 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
NodeTypeMeta NodeType = "meta"
NodeTypeReplica NodeType = "replica"

kDialInterval = time.Second * 60
dialInterval = time.Second * 60

// LatencyTracingThreshold means RPC's latency higher than the threshold (1000ms) will be traced
LatencyTracingThreshold = time.Millisecond * 1000
Expand Down Expand Up @@ -57,7 +57,7 @@ type nodeSession struct {

// atomic incremented counter that ensures each rpc
// has a unique sequence id
seqId int32
seqID int32

addr string
ntype NodeType
Expand All @@ -84,7 +84,7 @@ func newNodeSessionAddr(addr string, ntype NodeType) *nodeSession {
return &nodeSession{
logger: pegalog.GetLogger(),
ntype: ntype,
seqId: 0,
seqID: 0,
codec: NewPegasusCodec(),
pendingResp: make(map[int32]*requestListener),
reqc: make(chan *requestListener),
Expand Down Expand Up @@ -153,9 +153,9 @@ func (n *nodeSession) tryDial() {
// loopForResponse which handle the data communications.
// If the last attempt failed, it will retry again.
func (n *nodeSession) dial() {
if time.Now().Sub(n.lastDialTime) < kDialInterval {
if time.Now().Sub(n.lastDialTime) < dialInterval {
select {
case <-time.After(kDialInterval):
case <-time.After(dialInterval):
case <-n.tom.Dying():
return
}
Expand Down Expand Up @@ -302,8 +302,8 @@ func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args Rp
return nil, err
}

seqId := atomic.AddInt32(&n.seqId, 1) // increment sequence id
rcall, err := MarshallPegasusRpc(n.codec, seqId, gpid, args, name)
seqID := atomic.AddInt32(&n.seqID, 1) // increment sequence id
rcall, err := MarshallPegasusRpc(n.codec, seqID, gpid, args, name)
if err != nil {
return nil, err
}
Expand Down
20 changes: 12 additions & 8 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ func TestNodeSession_LoopForDialingCancelled(t *testing.T) {
n.tryDial()

time.Sleep(time.Second)
// time.Second < rpc.RpcConnDialTimeout, it must still be connecting.
// time.Second < rpc.rpcConnDialTimeout, it must still be connecting.
assert.Equal(t, rpc.ConnStateConnecting, n.conn.GetState())

time.Sleep(rpc.RpcConnDialTimeout) // dial failed.
time.Sleep(3 * time.Second /*rpc.rpcConnDialTimeout*/) // dial failed.
assert.Equal(t, rpc.ConnStateTransientFailure, n.conn.GetState())
n.Close()
// After session closed, the dialing goroutine correspondingly closes,
// which is ensured by leaktest.
}

type IOErrWriter struct {
Expand Down Expand Up @@ -145,7 +147,7 @@ func TestNodeSession_WriteFailed(t *testing.T) {
})
n.codec = mockCodec

_, err := n.CallWithGpid(context.Background(), &base.Gpid{0, 0}, arg, "RPC_NAME")
_, err := n.CallWithGpid(context.Background(), &base.Gpid{}, arg, "RPC_NAME")
assert.NotNil(t, err)
assert.Equal(t, n.conn.GetState(), rpc.ConnStateTransientFailure)
}
Expand All @@ -157,7 +159,8 @@ func TestNodeSession_WaitUntilSessionReady(t *testing.T) {
n := newNodeSession("www.baidu.com:12321", "meta")
defer n.Close()

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*50)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
err := n.waitUntilSessionReady(ctx)

// timeout waiting for dialing
Expand Down Expand Up @@ -241,11 +244,11 @@ func TestNodeSession_ConcurrentCallToEcho(t *testing.T) {
assert.True(t, len(data) > thriftHeaderBytesLen)
data = data[thriftHeaderBytesLen:]
iprot := thrift.NewTBinaryProtocolTransport(thrift.NewStreamTransportR(bytes.NewBuffer(data)))
_, _, seqId, err := iprot.ReadMessageBegin()
_, _, seqID, err := iprot.ReadMessageBegin()
if err != nil {
return err
}
r.SeqId = seqId
r.SeqId = seqID
r.Result = rrdb.NewMetaQueryCfgResult()

return nil
Expand Down Expand Up @@ -327,7 +330,7 @@ func TestNodeSession_ReceiveErrorCode(t *testing.T) {
return nil
})

result, err := n.CallWithGpid(context.Background(), &base.Gpid{0, 0}, arg, "RPC_NAME")
result, err := n.CallWithGpid(context.Background(), &base.Gpid{}, arg, "RPC_NAME")
assert.Equal(t, result, nil)
assert.Equal(t, err, base.ERR_INVALID_STATE)
}
Expand Down Expand Up @@ -367,12 +370,13 @@ func TestNodeSession_Redial(t *testing.T) {

arg := rrdb.NewMetaQueryCfgArgs()
arg.Query = replication.NewQueryCfgRequest()
_, err := n.CallWithGpid(context.Background(), &base.Gpid{0, 0}, arg, "RPC_NAME")
_, err := n.CallWithGpid(context.Background(), &base.Gpid{}, arg, "RPC_NAME")

assert.Equal(t, n.ConnState(), rpc.ConnStateReady)
assert.Equal(t, err, base.ERR_INVALID_STATE)
}

// Ensure the session automatically fails when it reads EOF (which means tcp shutdown)
func TestNodeSession_ReadEOF(t *testing.T) {
defer leaktest.Check(t)()

Expand Down