Skip to content

Commit

Permalink
revdial/v2: add new simpler, non-multiplexing revdial implementation
Browse files Browse the repository at this point in the history
The old revdial has a simple multiplexing protocol that was like
HTTP/2 but without flow control, etc. But it was too simple (no flow
control) and too complex. Instead, just use one TCP connection per
reverse dialed connection. For now, the NAT'ed machine needs to go
re-connect for each incoming connection, but in practice that's just
once.

The old implementation is retained for now until all the buildlets are
updated.

Updates golang/go#31639

Change-Id: Id94c98d2949e695b677531b1221a827573543085
Reviewed-on: https://go-review.googlesource.com/c/build/+/174082
Reviewed-by: Dmitri Shuralyov <[email protected]>
  • Loading branch information
bradfitz committed Apr 29, 2019
1 parent 3ae2235 commit 4f0f4bb
Show file tree
Hide file tree
Showing 7 changed files with 552 additions and 103 deletions.
41 changes: 25 additions & 16 deletions buildlet/buildletclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package buildlet // import "golang.org/x/build/buildlet"

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -22,7 +23,6 @@ import (
"sync"
"time"

"context"
"golang.org/x/oauth2"
)

Expand Down Expand Up @@ -117,8 +117,14 @@ func (c *Client) SetHTTPClient(httpClient *http.Client) {
}

// SetDialer sets the function that creates a new connection to the buildlet.
// By default, net.Dial is used.
func (c *Client) SetDialer(dialer func() (net.Conn, error)) {
// By default, net.Dialer.DialContext is used.
//
// TODO(bradfitz): this is only used for ssh connections to buildlets,
// which previously required the client to do its own net.Dial +
// upgrade request. But now that the net/http client supports
// read/write bodies for protocol upgrades, we could change how ssh
// works and delete this.
func (c *Client) SetDialer(dialer func(context.Context) (net.Conn, error)) {
c.dialer = dialer
}

Expand All @@ -138,11 +144,11 @@ type Client struct {
ipPort string // required, unless remoteBuildlet+baseURL is set
tls KeyPair
httpClient *http.Client
dialer func() (net.Conn, error) // nil means to use net.Dial
baseURL string // optional baseURL (used by remote buildlets)
authUser string // defaults to "gomote", if password is non-empty
password string // basic auth password or empty for none
remoteBuildlet string // non-empty if for remote buildlets
dialer func(context.Context) (net.Conn, error) // nil means to use net.Dialer.DialContext
baseURL string // optional baseURL (used by remote buildlets)
authUser string // defaults to "gomote", if password is non-empty
password string // basic auth password or empty for none
remoteBuildlet string // non-empty if for remote buildlets

closeFuncs []func() // optional extra code to run on close
releaseMode bool
Expand Down Expand Up @@ -752,29 +758,30 @@ func (c *Client) ListDir(dir string, opts ListDirOpts, fn func(DirEntry)) error
return sc.Err()
}

func (c *Client) getDialer() func() (net.Conn, error) {
func (c *Client) getDialer() func(context.Context) (net.Conn, error) {
if c.dialer != nil {
return c.dialer
}
return c.dialWithNetDial
}

func (c *Client) dialWithNetDial() (net.Conn, error) {
// TODO: contexts? the tedious part will be adding it to
// revdial.Dial. For now just do a 5 second timeout. Probably
// fine. This is currently only used for ssh connections.
d := net.Dialer{Timeout: 5 * time.Second}
return d.Dial("tcp", c.ipPort)
func (c *Client) dialWithNetDial(ctx context.Context) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "tcp", c.ipPort)
}

// ConnectSSH opens an SSH connection to the buildlet for the given username.
// The authorizedPubKey must be a line from an ~/.ssh/authorized_keys file
// and correspond to the private key to be used to communicate over the net.Conn.
func (c *Client) ConnectSSH(user, authorizedPubKey string) (net.Conn, error) {
conn, err := c.getDialer()()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := c.getDialer()(ctx)
if err != nil {
return nil, fmt.Errorf("error dialing HTTP connection before SSH upgrade: %v", err)
}
deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
req, err := http.NewRequest("POST", "/connect-ssh", nil)
if err != nil {
conn.Close()
Expand All @@ -794,8 +801,10 @@ func (c *Client) ConnectSSH(user, authorizedPubKey string) (net.Conn, error) {
}
if res.StatusCode != http.StatusSwitchingProtocols {
slurp, _ := ioutil.ReadAll(res.Body)
conn.Close()
return nil, fmt.Errorf("unexpected /connect-ssh response: %v, %s", res.Status, slurp)
}
conn.SetDeadline(time.Time{})
return conn, nil
}

Expand Down
23 changes: 23 additions & 0 deletions cmd/buildlet/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ FORCE:
compile:
GOOS=aix GOARCH=ppc64 go install golang.org/x/build/cmd/buildlet
GOOS=darwin GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=dragonfly GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=freebsd GOARCH=arm go install golang.org/x/build/cmd/buildlet
GOOS=freebsd GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=linux GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=linux GOARCH=arm go install golang.org/x/build/cmd/buildlet
Expand All @@ -23,6 +25,7 @@ compile:
GOOS=netbsd GOARCH=386 go install golang.org/x/build/cmd/buildlet
GOOS=netbsd GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=netbsd GOARCH=arm go install golang.org/x/build/cmd/buildlet
GOOS=openbsd GOARCH=arm go install golang.org/x/build/cmd/buildlet
GOOS=openbsd GOARCH=386 go install golang.org/x/build/cmd/buildlet
GOOS=openbsd GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=plan9 GOARCH=386 go install golang.org/x/build/cmd/buildlet
Expand All @@ -42,6 +45,14 @@ buildlet.darwin-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.dragonfly-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.freebsd-arm: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.freebsd-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
Expand All @@ -66,6 +77,10 @@ buildlet.netbsd-386: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.openbsd-arm: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.openbsd-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
Expand All @@ -78,6 +93,14 @@ buildlet.plan9-386: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.plan9-arm: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.plan9-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

buildlet.windows-arm: FORCE buildlet_windows.go
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
Expand Down
3 changes: 2 additions & 1 deletion cmd/buildlet/buildlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ var (
// 18: set TMPDIR and GOCACHE
// 21: GO_BUILDER_SET_GOPROXY=coordinator support
// 22: TrimSpace the reverse buildlet's gobuildkey contents
const buildletVersion = 22
// 23: revdial v2
const buildletVersion = 23

func defaultListenAddr() string {
if runtime.GOOS == "darwin" {
Expand Down
98 changes: 36 additions & 62 deletions cmd/buildlet/reverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ package main

import (
"bufio"
"context"
"crypto/hmac"
"crypto/md5"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -26,8 +25,7 @@ import (
"strings"
"time"

"golang.org/x/build"
"golang.org/x/build/revdial"
"golang.org/x/build/revdial/v2"
)

// mode is either a BuildConfig or HostConfig name (map key in x/build/dashboard/builders.go)
Expand Down Expand Up @@ -127,56 +125,41 @@ func dialCoordinator() error {
goproxyHandler = newProxyToCoordinatorHandler(*reverseType, key)
}

caCert := build.ProdCoordinatorCA
addr := *coordinator
if addr == "farmer.golang.org" {
addr = "farmer.golang.org:443"
}
if devMode {
caCert = build.DevCoordinatorCA
}

var caPool *x509.CertPool
if runtime.GOOS == "windows" {
// No SystemCertPool on Windows. But we don't run
// Windows in reverse mode anyway. So just don't set
// caPool, which will cause tls.Config to use the
// system verification.
} else {
var err error
caPool, err = x509.SystemCertPool()
dial := func(ctx context.Context) (net.Conn, error) {
log.Printf("Dialing coordinator %s ...", addr)
t0 := time.Now()
tcpConn, err := dialCoordinatorTCP(ctx, addr)
if err != nil {
return fmt.Errorf("SystemCertPool: %v", err)
log.Printf("buildlet: reverse dial coordinator (%q) error after %v: %v", addr, time.Since(t0).Round(time.Second/100), err)
return nil, err
}
log.Printf("Dialed coordinator %s.", addr)
serverName := strings.TrimSuffix(addr, ":443")
log.Printf("Doing TLS handshake with coordinator (verifying hostname %q)...", serverName)
tcpConn.SetDeadline(time.Now().Add(30 * time.Second))
config := &tls.Config{
ServerName: serverName,
InsecureSkipVerify: devMode,
}
// Temporarily accept our own CA. This predates LetsEncrypt.
// Our old self-signed cert expires April 4th, 2017.
// We can remove this after golang.org/issue/16442 is fixed.
if !caPool.AppendCertsFromPEM([]byte(caCert)) {
return errors.New("failed to append coordinator CA certificate")
conn := tls.Client(tcpConn, config)
if err := conn.Handshake(); err != nil {
return nil, fmt.Errorf("failed to handshake with coordinator: %v", err)
}
tcpConn.SetDeadline(time.Time{})
return conn, nil
}

log.Printf("Dialing coordinator %s ...", addr)
tcpConn, err := dialCoordinatorTCP(addr)
conn, err := dial(context.Background())
if err != nil {
return err
}

serverName := strings.TrimSuffix(addr, ":443")
log.Printf("Doing TLS handshake with coordinator (verifying hostname %q)...", serverName)
tcpConn.SetDeadline(time.Now().Add(30 * time.Second))
config := &tls.Config{
ServerName: serverName,
RootCAs: caPool,
InsecureSkipVerify: devMode,
}
conn := tls.Client(tcpConn, config)
if err := conn.Handshake(); err != nil {
return fmt.Errorf("failed to handshake with coordinator: %v", err)
}
tcpConn.SetDeadline(time.Time{})

bufr := bufio.NewReader(conn)
bufw := bufio.NewWriter(conn)

log.Printf("Registering reverse mode with coordinator...")
req, err := http.NewRequest("GET", "/reverse", nil)
Expand All @@ -192,9 +175,13 @@ func dialCoordinator() error {
req.Header["X-Go-Builder-Key"] = keys
req.Header.Set("X-Go-Builder-Hostname", *hostname)
req.Header.Set("X-Go-Builder-Version", strconv.Itoa(buildletVersion))
if err := req.Write(conn); err != nil {
req.Header.Set("X-Revdial-Version", "2")
if err := req.Write(bufw); err != nil {
return fmt.Errorf("coordinator /reverse request failed: %v", err)
}
if err := bufw.Flush(); err != nil {
return fmt.Errorf("coordinator /reverse request flush failed: %v", err)
}
resp, err := http.ReadResponse(bufr, req)
if err != nil {
return fmt.Errorf("coordinator /reverse response failed: %v", err)
Expand All @@ -206,12 +193,9 @@ func dialCoordinator() error {

log.Printf("Connected to coordinator; reverse dialing active")
srv := &http.Server{}
ln := revdial.NewListener(bufio.NewReadWriter(
bufio.NewReader(conn),
bufio.NewWriter(deadlinePerWriteConn{conn, 60 * time.Second}),
))
ln := revdial.NewListener(conn, dial)
err = srv.Serve(ln)
if ln.Closed() {
if ln.Closed() { // TODO: this actually wants to know whether an error-free Close was called
return nil
}
return fmt.Errorf("http.Serve on reverse connection complete: %v", err)
Expand All @@ -224,8 +208,8 @@ var coordDialer = &net.Dialer{

// dialCoordinatorTCP returns a TCP connection to the coordinator, making
// a CONNECT request to a proxy as a fallback.
func dialCoordinatorTCP(addr string) (net.Conn, error) {
tcpConn, err := coordDialer.Dial("tcp", addr)
func dialCoordinatorTCP(ctx context.Context, addr string) (net.Conn, error) {
tcpConn, err := coordDialer.DialContext(ctx, "tcp", addr)
if err != nil {
// If we had problems connecting to the TCP addr
// directly, perhaps there's a proxy in the way. See
Expand All @@ -234,20 +218,21 @@ func dialCoordinatorTCP(addr string) (net.Conn, error) {
req, _ := http.NewRequest("GET", "https://"+addr, nil)
proxyURL, _ := http.ProxyFromEnvironment(req)
if proxyURL != nil {
return dialCoordinatorViaCONNECT(addr, proxyURL)
return dialCoordinatorViaCONNECT(ctx, addr, proxyURL)
}
return nil, err
}
return tcpConn, nil
}

func dialCoordinatorViaCONNECT(addr string, proxy *url.URL) (net.Conn, error) {
func dialCoordinatorViaCONNECT(ctx context.Context, addr string, proxy *url.URL) (net.Conn, error) {
proxyAddr := proxy.Host
if proxy.Port() == "" {
proxyAddr = net.JoinHostPort(proxyAddr, "80")
}
log.Printf("dialing proxy %q ...", proxyAddr)
c, err := net.Dial("tcp", proxyAddr)
var d net.Dialer
c, err := d.DialContext(ctx, "tcp", proxyAddr)
if err != nil {
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddr, err)
}
Expand All @@ -273,17 +258,6 @@ func dialCoordinatorViaCONNECT(addr string, proxy *url.URL) (net.Conn, error) {
return c, nil
}

type deadlinePerWriteConn struct {
net.Conn
writeTimeout time.Duration
}

func (c deadlinePerWriteConn) Write(p []byte) (n int, err error) {
c.Conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
defer c.Conn.SetWriteDeadline(time.Time{})
return c.Conn.Write(p)
}

func devBuilderKey(builder string) string {
h := hmac.New(md5.New, []byte("gophers rule"))
io.WriteString(h, builder)
Expand Down
2 changes: 2 additions & 0 deletions cmd/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"golang.org/x/build/internal/sourcecache"
"golang.org/x/build/livelog"
"golang.org/x/build/maintner/maintnerd/apipb"
revdialv2 "golang.org/x/build/revdial/v2"
"golang.org/x/build/types"
"golang.org/x/crypto/acme/autocert"
perfstorage "golang.org/x/perf/storage"
Expand Down Expand Up @@ -303,6 +304,7 @@ func main() {
http.HandleFunc("/builders", handleBuilders)
http.HandleFunc("/temporarylogs", handleLogs)
http.HandleFunc("/reverse", handleReverse)
http.Handle("/revdial", revdialv2.ConnHandler())
http.HandleFunc("/style.css", handleStyleCSS)
http.HandleFunc("/try", serveTryStatus(false))
http.HandleFunc("/try.json", serveTryStatus(true))
Expand Down
Loading

0 comments on commit 4f0f4bb

Please sign in to comment.