Skip to content
This repository has been archived by the owner on May 27, 2024. It is now read-only.

Commit

Permalink
use heartbeat instead of websocket discover, including whatsinput
Browse files Browse the repository at this point in the history
  • Loading branch information
codeskyblue committed Apr 13, 2018
1 parent 57b16a8 commit 96a2ae7
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 97 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Websocket连接 `$DEVICE_URL/whatsinput`, 接收JSON格式
- 编辑框内容输入

```json
{"type": "InputChange", "text": "some text"}
{"type": "InputEdit", "text": "some text"}
```

# TODO
Expand Down
4 changes: 3 additions & 1 deletion install.bat
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ REM adb install app-debug-androidTest.apk
adb push atx-agent /data/local/tmp
adb shell chmod 777 /data/local/tmp/atx-agent
adb shell /data/local/tmp/atx-agent -stop
adb shell /data/local/tmp/atx-agent -d -t 10.240.171.86:8000 -nouia

REM adb shell /data/local/tmp/atx-agent -d -t 10.240.171.86:8000 -nouia
adb shell /data/local/tmp/atx-agent -d -t 10.240.187.224:8000 -nouia
REM adb shell /data/local/tmp/atx-agent -d -t 10.246.46.160:8200 -nouia
REM pause
96 changes: 78 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func installAPKForce(path string, packageName string) error {
return installAPK(path)
}

func Screenshot(filename string) (err error) {
func Screenshot(filename string, thumbnailSize string) (err error) {
output, err := runShellOutput("LD_LIBRARY_PATH=/data/local/tmp", "/data/local/tmp/minicap", "-i")
if err != nil {
return
Expand All @@ -240,10 +240,13 @@ func Screenshot(filename string) (err error) {
err = fmt.Errorf("minicap not supported: %v", er)
return
}
if thumbnailSize == "" {
thumbnailSize = fmt.Sprintf("%dx%d", f.Width, f.Height)
}
if _, err = runShell(
"LD_LIBRARY_PATH=/data/local/tmp",
"/data/local/tmp/minicap",
"-P", fmt.Sprintf("%dx%d@%dx%d/%d", f.Width, f.Height, f.Width, f.Height, f.Rotation),
"-P", fmt.Sprintf("%dx%d@%s/%d", f.Width, f.Height, thumbnailSize, f.Rotation),
"-s", ">"+filename); err != nil {
return
}
Expand Down Expand Up @@ -454,12 +457,50 @@ func ServeHTTP(lis net.Listener, tunnel *TunnelProxy) error {

/* WhatsInput */
var whatsinput = struct {
C chan string
Recent string
}{make(chan string, 0), ""}
ChangeC chan string
EditC chan string
KeyCodeC chan int
Recent string
}{make(chan string, 0), make(chan string, 0), make(chan int, 0), ""}

const whatsInputFinishedMagic = "__inputFinished__"

// simple pubsub system
m.HandleFunc("/whatsinput", func(w http.ResponseWriter, r *http.Request) {
host := r.Header.Get("Host")
log.Println("CONNECT", host)
conn, err := hijackHTTPRequest(w)
if err != nil {
log.Println("Hijack failed:", err)
return
}
quit := make(chan bool, 1)
go func() {
for {
select {
case text := <-whatsinput.EditC:
base64Str := base64.StdEncoding.EncodeToString([]byte(text)) + "\n"
conn.Write([]byte("I" + base64Str))
case keyCode := <-whatsinput.KeyCodeC:
conn.Write([]byte("K" + strconv.Itoa(keyCode)))
case <-time.After(10 * time.Second):
conn.Write([]byte("P")) // ping message
case <-quit:
return
}
}
}()

buf := make([]byte, 4096)
for {
_, err := conn.Read(buf)
if err != nil {
quit <- true
break
}
}
}).Methods("CONNECT")

// Send input to device
// Highly affected by project https://github.com/willerce/WhatsInput
// Also thanks to https://github.com/senzhk/ADBKeyBoard
Expand All @@ -473,7 +514,7 @@ func ServeHTTP(lis net.Listener, tunnel *TunnelProxy) error {
go func() {
for {
select {
case msg := <-whatsinput.C:
case msg := <-whatsinput.ChangeC:
log.Println("Receive msg", msg)
if msg == whatsInputFinishedMagic {
log.Println("FinishedInput")
Expand All @@ -498,9 +539,13 @@ func ServeHTTP(lis net.Listener, tunnel *TunnelProxy) error {
break
}
switch v.Type {
case "InputChange":
base64Str := base64.StdEncoding.EncodeToString([]byte(v.Text))
runShell("am", "broadcast", "-a", "ADB_SET_TEXT", "--es", "text", strconv.Quote(base64Str))
case "InputEdit":
select {
case whatsinput.EditC <- v.Text:
log.Println("Message sended", v.Text)
case <-time.After(100 * time.Millisecond):
}
// runShell("am", "broadcast", "-a", "ADB_SET_TEXT", "--es", "text", strconv.Quote(base64Str))
case "InputKey":
runShell("input", "keyevent", "KEYCODE_ENTER") // HOTFIX(ssx): need fix later
// runShell("am", "broadcast", "-a", "ADB_INPUT_KEYCODE", "--ei", "code", strconv.Itoa(v.Code))
Expand All @@ -523,7 +568,7 @@ func ServeHTTP(lis net.Listener, tunnel *TunnelProxy) error {
whatsinput.Recent = ""
}
select {
case whatsinput.C <- input:
case whatsinput.ChangeC <- input:
io.WriteString(w, "Success")
case <-time.After(100 * time.Millisecond):
io.WriteString(w, "No WebSocket client connected")
Expand Down Expand Up @@ -1127,13 +1172,22 @@ func ServeHTTP(lis net.Listener, tunnel *TunnelProxy) error {
json.NewEncoder(w).Encode(info)
})

screenshotFilename := "/data/local/tmp/minicap-screenshot.jpg"
if username := currentUserName(); username != "" {
screenshotFilename = "/data/local/tmp/minicap-screenshot-" + username + ".jpg"
screenshotIndex := -1
nextScreenshotFilename := func() string {
targetFolder := "/data/local/tmp/minicap-images"
if _, err := os.Stat(targetFolder); err != nil {
os.MkdirAll(targetFolder, 0755)
}
screenshotIndex = (screenshotIndex + 1) % 5
return filepath.Join(targetFolder, fmt.Sprintf("%d.jpg", screenshotIndex))
}

m.HandleFunc("/screenshot", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/screenshot/0", 302)
targetURL := "/screenshot/0"
if r.URL.RawQuery != "" {
targetURL += "?" + r.URL.RawQuery
}
http.Redirect(w, r, targetURL, 302)
}).Methods("GET")

m.Handle("/jsonrpc/0", uiautomatorProxy)
Expand All @@ -1143,12 +1197,14 @@ func ServeHTTP(lis net.Listener, tunnel *TunnelProxy) error {
uiautomatorProxy.ServeHTTP(w, r)
return
}
if err := Screenshot(screenshotFilename); err != nil {
thumbnailSize := r.FormValue("thumbnail")
filename := nextScreenshotFilename()
if err := Screenshot(filename, thumbnailSize); err != nil {
log.Printf("screenshot[minicap] error: %v", err)
uiautomatorProxy.ServeHTTP(w, r)
} else {
w.Header().Set("X-Screenshot-Method", "minicap")
http.ServeFile(w, r, screenshotFilename)
http.ServeFile(w, r, filename)
}
})

Expand Down Expand Up @@ -1294,9 +1350,13 @@ func main() {
}
}

tunnel := &TunnelProxy{ServerAddr: *fTunnelServer}
tunnel := &TunnelProxy{
ServerAddr: *fTunnelServer,
Secret: "hello kitty",
}
if *fTunnelServer != "" {
go tunnel.RunForever()
// go tunnel.RunForever()
go tunnel.Heratbeat()
}

sigc := make(chan os.Signal, 1)
Expand Down
197 changes: 197 additions & 0 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package pubsub

import (
"bufio"
"encoding/json"
"errors"
"io"
"log"
"net"
"net/http"
"sync"
"time"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

type PubSub struct {
messageC chan Message
subs map[chan interface{}]Message
mu sync.Mutex
}

type Message struct {
Topic string
Receiver string
Data interface{}
}

func New() *PubSub {
return &PubSub{
messageC: make(chan Message, 10),
}
}

func (ps *PubSub) drain() {
for message := range ps.messageC {
ps.mu.Lock()
for ch, m := range ps.subs {
if m.Topic == message.Topic && m.Receiver == message.Receiver {
select {
case ch <- message.Data:
case <-time.After(1 * time.Second):
log.Println("Sub-chan receive timeout 1s, deleted")
delete(ps.subs, ch)
}
}
}
ps.mu.Unlock()
}
}

func (ps *PubSub) Publish(data interface{}, topic string, receiver string) {
ps.messageC <- Message{
Topic: topic,
Receiver: receiver,
Data: data,
}
}

func (ps *PubSub) Subscribe(topic string, receiver string) chan interface{} {
ps.mu.Lock()
defer ps.mu.Unlock()
C := make(chan interface{})
ps.subs[C] = Message{
Topic: topic,
Receiver: receiver,
}
return C
}

func (ps *PubSub) Unsubscribe(ch chan interface{}) {
ps.mu.Lock()
defer ps.mu.Unlock()
delete(ps.subs, ch)
}

type HTTPPubSub struct {
ps *PubSub
r *mux.Router
}

func NewHTTPPubSub(ps *PubSub) *HTTPPubSub {
r := mux.NewRouter()
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

// publish
r.HandleFunc("/{topic}/{receiver}", func(w http.ResponseWriter, r *http.Request) {
topic := mux.Vars(r)["topic"]
receiver := mux.Vars(r)["receiver"]
var data interface{}
json.NewDecoder(r.Body).Decode(&data)
ps.Publish(data, topic, receiver)
}).Methods("POST")

// subscribe WebSocket
r.HandleFunc("/{topic}/{receiver}", func(w http.ResponseWriter, r *http.Request) {
topic := mux.Vars(r)["topic"]
receiver := mux.Vars(r)["receiver"]
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
dataC := ps.Subscribe(topic, receiver)
defer ps.Unsubscribe(dataC)
quitC := make(chan bool, 1)
go func() {
for {
select {
case <-quitC:
return
case data := <-dataC:
ws.WriteJSON(data)
}
}
}()
for {
_, _, err := ws.ReadMessage()
if err != nil {
quitC <- true
break
}
}
}).Methods("GET")

// subscribe hijack
r.HandleFunc("/{topic}/{receiver}", func(w http.ResponseWriter, r *http.Request) {
topic := mux.Vars(r)["topic"]
receiver := mux.Vars(r)["receiver"]
conn, err := hijackHTTPRequest(w)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
dataC := ps.Subscribe(topic, receiver)
defer ps.Unsubscribe(dataC)
for data := range dataC {
jsdata, _ := json.Marshal(data)
if _, err := io.WriteString(conn, string(jsdata)+"\n"); err != nil {
break
}
}
}).Methods("CONNECT")

return &HTTPPubSub{
ps: ps,
r: r,
}
}

func (h *HTTPPubSub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.r.ServeHTTP(w, r)
}

func hijackHTTPRequest(w http.ResponseWriter) (conn net.Conn, err error) {
hj, ok := w.(http.Hijacker)
if !ok {
err = errors.New("webserver don't support hijacking")
return
}

hjconn, bufrw, err := hj.Hijack()
if err != nil {
return nil, err
}
conn = newHijackReadWriteCloser(hjconn.(*net.TCPConn), bufrw)
return
}

type hijackRW struct {
*net.TCPConn
bufrw *bufio.ReadWriter
}

func (this *hijackRW) Write(data []byte) (int, error) {
nn, err := this.bufrw.Write(data)
this.bufrw.Flush()
return nn, err
}

func (this *hijackRW) Read(p []byte) (int, error) {
return this.bufrw.Read(p)
}

func newHijackReadWriteCloser(conn *net.TCPConn, bufrw *bufio.ReadWriter) net.Conn {
return &hijackRW{
bufrw: bufrw,
TCPConn: conn,
}
}
Loading

0 comments on commit 96a2ae7

Please sign in to comment.