Skip to content

Commit

Permalink
Merge pull request #818 from LaurenceLiZhixin/feature/cli-support
Browse files Browse the repository at this point in the history
Ftr: Add dubbo-go-cli telnet tool support
  • Loading branch information
cityiron authored Feb 3, 2021
2 parents 52fc46b + 18ab1c4 commit 3ae865a
Show file tree
Hide file tree
Showing 29 changed files with 2,344 additions and 0 deletions.
5 changes: 5 additions & 0 deletions tools/cli/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.idea/

# Binary
example/dubbo-go-cli

14 changes: 14 additions & 0 deletions tools/cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# dubbo-go-cli

### 1. Problem we solved.

For the running dubbo-go server, we need a telnet-cli tool to test if the server works healthily.
The tool should support dubbo protocol, making it easy for you to define your own request pkg, get rsp struct of your server, and total costing time.


### 2. How to get cli-tool
run in dubbo-go/tools/cli \
`$ sh build.sh`\
and you can get dubbo-go-cli

### 3. Quick start:[example](example/README.md)
11 changes: 11 additions & 0 deletions tools/cli/README_CN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# dubbo-go-cli

### 1. 解决问题

针对正在运行的dubbo服务,需要拥有一个命令行工具来针对服务进行测试。
该服务需要支持dubbo协议,方便用户进行自定义传输包体。

### 2. cli工具获取方法
`sh build.sh`

### 3. 使用方法:见[example](example/README_CN.md)
2 changes: 2 additions & 0 deletions tools/cli/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export GOPROXY="http://goproxy.io"
go build -o dubbo-go-cli
197 changes: 197 additions & 0 deletions tools/cli/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package client

import (
"log"
"net"
"strconv"
"sync"
"time"
)

import (
"go.uber.org/atomic"
)

import (
"github.com/apache/dubbo-go/tools/cli/common"
"github.com/apache/dubbo-go/tools/cli/protocol"
_ "github.com/apache/dubbo-go/tools/cli/protocol/dubbo"
)

// defaultBufferSize is the tcp read default buffer size
const defaultBufferSize = 1024 * 1024 * 4

// TelnetClient maintain a connection to target
type TelnetClient struct {
tcpAddr string
responseTimeout time.Duration
protocolName string
requestList []*protocol.Request
conn *net.TCPConn
proto protocol.Protocol

sequence atomic.Uint64
pendingResponses *sync.Map
waitNum atomic.Uint64
}

// NewTelnetClient create a new tcp connection, and create default request
func NewTelnetClient(host string, port int, protocolName, interfaceID, version, group, method string, reqPkg interface{}, timeout int) (*TelnetClient, error) {
tcpAddr := net.JoinHostPort(host, strconv.Itoa(port))
resolved := resolveTCPAddr(tcpAddr)
conn, err := net.DialTCP("tcp", nil, resolved)
if err != nil {
return nil, err
}
log.Printf("connected to %s:%d\n", host, port)
log.Printf("try calling interface:%s.%s\n", interfaceID, method)
log.Printf("with protocol:%s\n\n", protocolName)
proto := common.GetProtocol(protocolName)

return &TelnetClient{
tcpAddr: tcpAddr,
conn: conn,
responseTimeout: time.Duration(timeout) * time.Millisecond, //default timeout
protocolName: protocolName,
pendingResponses: &sync.Map{},
proto: proto,
requestList: []*protocol.Request{
{
InterfaceID: interfaceID,
Version: version,
Method: method,
Group: group,
Params: []interface{}{reqPkg},
},
},
}, nil
}

func resolveTCPAddr(addr string) *net.TCPAddr {
resolved, error := net.ResolveTCPAddr("tcp", addr)
if nil != error {
log.Fatalf("Error occured while resolving TCP address \"%v\": %v\n", addr, error)
}

return resolved
}

// ProcessRequests send all requests
func (t *TelnetClient) ProcessRequests(userPkg interface{}) {
for i, _ := range t.requestList {
t.processSingleRequest(t.requestList[i], userPkg)
}
}

// addPendingResponse add a response @model to pending queue
// once the rsp got, the model will be used.
func (t *TelnetClient) addPendingResponse(model interface{}) uint64 {
seqId := t.sequence.Load()
t.pendingResponses.Store(seqId, model)
t.waitNum.Inc()
t.sequence.Inc()
return seqId
}

// removePendingResponse delete item from pending queue by @seq
func (t *TelnetClient) removePendingResponse(seq uint64) {
if t.pendingResponses == nil {
return
}
if _, ok := t.pendingResponses.Load(seq); ok {
t.pendingResponses.Delete(seq)
t.waitNum.Dec()
}
return
}

// processSingleRequest call one req.
func (t *TelnetClient) processSingleRequest(req *protocol.Request, userPkg interface{}) {
// proto create package procedure
req.ID = t.sequence.Load()
inputData, err := t.proto.Write(req)
if err != nil {
log.Fatalln("error: handler.Writer err = ", err)
}
startTime := time.Now()

// init rsp Package and add to pending queue
seqId := t.addPendingResponse(userPkg)
defer t.removePendingResponse(seqId)

requestDataChannel := make(chan []byte, 0)
responseDataChannel := make(chan []byte, 0)

// start data transfer procedure
go t.readInputData(string(inputData), requestDataChannel)
go t.readServerData(t.conn, responseDataChannel)

timeAfter := time.After(t.responseTimeout)

for {
select {
case <-timeAfter:
log.Println("request timeout to:", t.tcpAddr)
return
case request := <-requestDataChannel:
if _, err := t.conn.Write(request); nil != err {
log.Fatalf("Error occured while writing to TCP socket: %v\n", err)
}
case response := <-responseDataChannel:
rspPkg, _, err := t.proto.Read(response, t.pendingResponses)
if err != nil {
log.Fatalln("Error with protocol Read(): ", err)
}
t.removePendingResponse(seqId)
log.Printf("After %dms , Got Rsp:", time.Now().Sub(startTime).Milliseconds())
common.PrintInterface(rspPkg)
if t.waitNum.Sub(0) == 0 {
return
}
}
}
}

func (t *TelnetClient) readInputData(inputData string, toSent chan<- []byte) {
toSent <- []byte(inputData)
}

func (t *TelnetClient) readServerData(connection *net.TCPConn, received chan<- []byte) {
buffer := make([]byte, defaultBufferSize)
var err error
var n int
for nil == err {
n, err = connection.Read(buffer)
received <- buffer[:n]
}

t.assertEOF(err)
}

func (t *TelnetClient) assertEOF(error error) {
if "EOF" != error.Error() {
log.Fatalf("Error occured while operating on TCP socket: %v\n", error)
}
}

// Destroy close the tcp conn
func (t *TelnetClient) Destroy() {
t.conn.Close()
}
39 changes: 39 additions & 0 deletions tools/cli/common/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package common

import (
"github.com/apache/dubbo-go/tools/cli/protocol"
)

var (
protocols = make(map[string]func() protocol.Protocol, 8)
)

// SetProtocol sets the protocol extension with @name
func SetProtocol(name string, v func() protocol.Protocol) {
protocols[name] = v
}

// GetProtocol finds the protocol extension with @name
func GetProtocol(name string) protocol.Protocol {
if protocols[name] == nil {
panic("protocol for " + name + " is not existing, make sure you have import the package.")
}
return protocols[name]()
}
39 changes: 39 additions & 0 deletions tools/cli/common/tool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package common

import (
"fmt"
"log"
"reflect"
)

// PrintInterface print the interface by level
func PrintInterface(v interface{}) {
val := reflect.ValueOf(v).Elem()
typ := reflect.TypeOf(v)
log.Printf("%+v\n", v)
nums := val.NumField()
for i := 0; i < nums; i++ {
if typ.Elem().Field(i).Type.Kind() == reflect.Ptr {
log.Printf("%s: ", typ.Elem().Field(i).Name)
PrintInterface(val.Field(i).Interface())
}
}
fmt.Println("")
}
Loading

0 comments on commit 3ae865a

Please sign in to comment.