diff --git a/Dockerfile b/Dockerfile index 860a21a..cb8c945 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ -FROM golang:1.17-stretch AS builder +FROM golang:1.19-bullseye AS builder -WORKDIR /go/src/promtun +WORKDIR /go/src/coroot-connect COPY go.mod . COPY go.sum . @@ -13,5 +13,5 @@ RUN CGO_ENABLED=0 go install -mod=readonly -ldflags "-X main.version=$VERSION" . FROM scratch COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ -COPY --from=builder /go/bin/promtun /promtun -CMD ["/promtun"] +COPY --from=builder /go/bin/coroot-connect /coroot-connect +CMD ["/coroot-connect"] diff --git a/README.md b/README.md index 206c39f..a071e14 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,13 @@ -# Promtum +# Coroot-connect -Promtun is a tiny tool to establish a tunnel between a Prometheus server and Coroot cloud. - -![](./schema.svg) +Coroot-connect is a tool that establishes secure tunnels between the Coroot cloud and customers' clusters. ## Run ### Docker -```bash -docker run --detach --name coroot-promtun \ - -e PROMETHEUS_ADDRESS= \ - -e PROJECT_TOKEN= \ - ghcr.io/coroot/promtun -``` +TDB ### Kubernetes -```yaml -apiVersion: apps/v1 -kind: Deployment -metadata: - name: promtun - namespace: coroot -spec: - selector: - matchLabels: {app: promtun} - replicas: 1 - template: - metadata: - labels: {app: promtun} - spec: - containers: - - name: promtun - image: ghcr.io/coroot/promtun - env: - - name: PROMETHEUS_ADDRESS - value: - - name: PROJECT_TOKEN - value: -``` +TBD diff --git a/promtun.go b/connect.go similarity index 57% rename from promtun.go rename to connect.go index 26a13c7..847c687 100644 --- a/promtun.go +++ b/connect.go @@ -26,24 +26,24 @@ var ( backoffFactor = 2. backoffMin = 5 * time.Second backoffMax = time.Minute - streamTimeout = time.Minute + streamTimeout = 5 * time.Minute ) type Tunnel struct { - promAddr string - address string - projectToken string - serverName string - cancelFn context.CancelFunc - gwConn net.Conn + address string + serverName string + token string + config []byte + cancelFn context.CancelFunc + gwConn net.Conn } -func NewTunnel(address, serverName, promAddr, projectToken string) *Tunnel { +func NewTunnel(address, serverName string, token string, config []byte) *Tunnel { t := &Tunnel{ - address: address, - promAddr: promAddr, - projectToken: projectToken, - serverName: serverName, + address: address, + serverName: serverName, + token: token, + config: config, } var ctx context.Context ctx, t.cancelFn = context.WithCancel(context.Background()) @@ -59,7 +59,7 @@ func (t *Tunnel) keepConnected(ctx context.Context) { case <-ctx.Done(): return default: - t.gwConn, err = connect(t.address, t.serverName, t.projectToken) + t.gwConn, err = connect(t.address, t.serverName, t.token, t.config) if err != nil { d := b.Duration() klog.Errorf("%s, reconnecting to %s in %.0fs", err, t.address, d.Seconds()) @@ -67,7 +67,7 @@ func (t *Tunnel) keepConnected(ctx context.Context) { continue } b.Reset() - proxy(ctx, t.gwConn, t.promAddr) + proxy(ctx, t.gwConn) _ = t.gwConn.Close() } } @@ -81,30 +81,40 @@ func (t *Tunnel) Close() { } func main() { - klog.Infof("version: %s", version) resolverUrl := os.Getenv("RESOLVER_URL") if resolverUrl == "" { resolverUrl = "https://gw.coroot.com/promtun/resolve" } - promAddr := mustEnv("PROMETHEUS_ADDRESS") - projectToken := mustEnv("PROJECT_TOKEN") + token := mustEnv("PROJECT_TOKEN") + if len(token) != 36 { + klog.Exitln("invalid project token") + } + configPath := mustEnv("CONFIG_PATH") - u, err := url.Parse(resolverUrl) + data, err := os.ReadFile(configPath) if err != nil { - klog.Exitf("invalid RESOLVER_URL %s: %s", resolverUrl, err) + klog.Exitln("failed to read config:", err) } - serverName := u.Hostname() + config := []byte(os.ExpandEnv(string(data))) - if err := pingProm(promAddr); err != nil { - klog.Exitf("failed to ping prometheus: %s", err) + klog.Infof("version: %s", version) + + loop(token, resolverUrl, config) +} + +func loop(token, resolverUrl string, config []byte) { + u, err := url.Parse(resolverUrl) + if err != nil { + klog.Exitf("invalid resolver URL %s: %s", resolverUrl, err) } + tlsServerName := u.Hostname() tunnels := map[string]*Tunnel{} b := backoff.Backoff{Factor: backoffFactor, Min: backoffMin, Max: backoffMax} for { klog.Infof("updating gateways endpoints from %s", resolverUrl) - endpoints, err := getEndpoints(resolverUrl, projectToken) + endpoints, err := getEndpoints(resolverUrl, token) if err != nil { d := b.Duration() klog.Errorf("failed to get gateway endpoints: %s, retry in %.0fs", err, d.Seconds()) @@ -117,13 +127,13 @@ func main() { for _, e := range endpoints { fresh[e] = true if _, ok := tunnels[e]; !ok { - klog.Infof("starting tunnel to %s", e) - tunnels[e] = NewTunnel(e, serverName, promAddr, projectToken) + klog.Infof("starting a tunnel to %s", e) + tunnels[e] = NewTunnel(e, tlsServerName, token, config) } } for e, t := range tunnels { if !fresh[e] { - klog.Infof("closing tunnel to %s", e) + klog.Infof("closing tunnel with %s", e) t.Close() delete(tunnels, e) } @@ -132,9 +142,9 @@ func main() { } } -func getEndpoints(resolverUrl, projectToken string) ([]string, error) { +func getEndpoints(resolverUrl, token string) ([]string, error) { req, _ := http.NewRequest("GET", resolverUrl, nil) - req.Header.Set("X-Token", projectToken) + req.Header.Set("X-Token", token) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err @@ -150,29 +160,44 @@ func getEndpoints(resolverUrl, projectToken string) ([]string, error) { return strings.Split(strings.TrimSpace(string(payload)), ";"), nil } -func connect(gwAddr, serverName, projectToken string) (net.Conn, error) { +type Header struct { + Token [36]byte + Version [16]byte + ConfigSize uint32 +} + +func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error) { + h := Header{} + copy(h.Token[:], token) + copy(h.Version[:], version) + h.ConfigSize = uint32(len(config)) + klog.Infof("connecting to %s (%s)", gwAddr, serverName) deadline := time.Now().Add(timeout) dialer := &net.Dialer{Deadline: deadline} - tlsCfg := &tls.Config{InsecureSkipVerify: tlsSkipVerify, ServerName: serverName} + tlsCfg := &tls.Config{ServerName: serverName, InsecureSkipVerify: tlsSkipVerify} gwConn, err := tls.DialWithDialer(dialer, "tcp", gwAddr, tlsCfg) if err != nil { - return nil, fmt.Errorf("failed to establish connection to %s: %s", gwAddr, err) + return nil, fmt.Errorf("failed to establish a connection to %s: %s", gwAddr, err) } klog.Infof("connected to gateway %s", gwAddr) _ = gwConn.SetDeadline(deadline) - if _, err := gwConn.Write([]byte(projectToken)); err != nil { + if err = binary.Write(gwConn, binary.LittleEndian, h); err != nil { + _ = gwConn.Close() + return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err) + } + if _, err = gwConn.Write(config); err != nil { _ = gwConn.Close() - return nil, fmt.Errorf("failed to send project token to %s: %s", gwAddr, err) + return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err) } var resp uint16 if err := binary.Read(gwConn, binary.LittleEndian, &resp); err != nil { _ = gwConn.Close() - return nil, fmt.Errorf("failed to read gateway response from %s: %s", gwAddr, err) + return nil, fmt.Errorf("failed to read the response from %s: %s", gwAddr, err) } _ = gwConn.SetDeadline(time.Time{}) - klog.Infof("got from gateway %s: %d", gwAddr, resp) + klog.Infof(`got "%d" from the gateway %s`, resp, gwAddr) if resp != 200 { _ = gwConn.Close() @@ -182,7 +207,7 @@ func connect(gwAddr, serverName, projectToken string) (net.Conn, error) { return gwConn, nil } -func proxy(ctx context.Context, gwConn net.Conn, promAddr string) { +func proxy(ctx context.Context, gwConn net.Conn) { cfg := yamux.DefaultConfig() cfg.KeepAliveInterval = time.Second cfg.LogOutput = ioutil.Discard @@ -199,30 +224,41 @@ func proxy(ctx context.Context, gwConn net.Conn, promAddr string) { default: gwStream, err := session.Accept() if err != nil { - klog.Errorf("failed to accept stream: %s", err) + klog.Errorf("failed to accept a stream: %s", err) return } go func(c net.Conn) { defer c.Close() deadline := time.Now().Add(streamTimeout) if err := c.SetDeadline(deadline); err != nil { - klog.Errorf("failed to set deadline to stream: %s", err) + klog.Errorf("failed to set a deadline for the stream: %s", err) return } - promConn, err := net.DialTimeout("tcp", promAddr, timeout) + var dstLen uint16 + if err := binary.Read(c, binary.LittleEndian, &dstLen); err != nil { + klog.Errorf("failed to read the destination size: %s", err) + return + } + dest := make([]byte, int(dstLen)) + if _, err := io.ReadFull(c, dest); err != nil { + klog.Errorf("failed to read the destination address: %s", err) + return + } + destAddress := string(dest) + destConn, err := net.DialTimeout("tcp", destAddress, timeout) if err != nil { - klog.Errorf("failed to establish prometheus connection: %s", err) + klog.Errorf("failed to establish a connection to %s: %s", destAddress, err) return } - defer promConn.Close() - if err = promConn.SetDeadline(deadline); err != nil { - klog.Errorf("failed to set deadline to prometheus connection: %s", err) + defer destConn.Close() + if err = destConn.SetDeadline(deadline); err != nil { + klog.Errorf("failed to set a deadline for the dest connection: %s", err) return } go func() { - io.Copy(c, promConn) + io.Copy(c, destConn) }() - io.Copy(promConn, c) + io.Copy(destConn, c) }(gwStream) } } @@ -235,12 +271,3 @@ func mustEnv(key string) string { } return value } - -func pingProm(addr string) error { - c, err := net.DialTimeout("tcp", addr, timeout) - if err != nil { - return err - } - _ = c.Close() - return nil -} diff --git a/promtun_test.go b/connect_test.go similarity index 59% rename from promtun_test.go rename to connect_test.go index ad24a1d..67941ec 100644 --- a/promtun_test.go +++ b/connect_test.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "crypto/tls" "encoding/binary" @@ -17,60 +18,56 @@ import ( "time" ) -var projectToken = "b8ea8af6-ffee-44b3-aa9a-1fc02233cfb7" - func init() { timeout = time.Second tlsSkipVerify = true + version = "1.2.3" } func TestHandshakeTimeout(t *testing.T) { + token := "b8ea8af6-ffee-44b3-aa9a-1fc02233cfb7" addr, stop := gateway(t, func(listener net.Listener) { // doesn't accept connections }) defer stop() - _, err := connect(addr, "", projectToken) + var err error + _, err = connect(addr, "", token, []byte("config_data")) require.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") + assert.Contains(t, err.Error(), "deadline exceeded") } func TestHandshakeError(t *testing.T) { + token := "b8ea8af6-ffee-44b3-aa9a-1fc02233cfb7" addr, stop := gateway(t, func(listener net.Listener) { conn, err := listener.Accept() require.NoError(t, err) - readToken(t, conn) + readHeaderAndConfig(t, conn, token, []byte("config_data")) writeStatus(t, conn, 500) }) defer stop() - _, err := connect(addr, "", projectToken) + _, err := connect(addr, "", token, []byte("config_data")) require.Error(t, err) assert.Contains(t, err.Error(), "failed to authenticate project") } func TestProxy(t *testing.T) { - endpointCh := make(chan string) + sessionChan := make(chan *yamux.Session) + token := "b8ea8af6-ffee-44b3-aa9a-1fc02233cfb7" + version = "1.2.3" + addr, stop := gateway(t, func(listener net.Listener) { conn, err := listener.Accept() require.NoError(t, err) - readToken(t, conn) + readHeaderAndConfig(t, conn, token, []byte("config_data")) writeStatus(t, conn, 200) - endpoint, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - endpointCh <- endpoint.Addr().String() - cfg := yamux.DefaultConfig() cfg.KeepAliveInterval = time.Second + session, err := yamux.Client(conn, cfg) require.NoError(t, err) - clientConn, err := endpoint.Accept() - require.NoError(t, err) - tunConn, err := session.Open() - require.NoError(t, err) - go func() { - io.Copy(clientConn, tunConn) - }() - io.Copy(tunConn, clientConn) + + sessionChan <- session }) defer stop() @@ -79,25 +76,78 @@ func TestProxy(t *testing.T) { })) defer prometheus.Close() - gwConn, err := connect(addr, "", projectToken) + pyroscope := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "Pyroscope is Healthy.") + })) + defer pyroscope.Close() + + gwConn, err := connect(addr, "", token, []byte("config_data")) go func() { - proxy(context.Background(), gwConn, prometheus.Listener.Addr().String()) + proxy(context.Background(), gwConn) }() - endpointAddr := <-endpointCh - res, err := http.Get("http://" + endpointAddr + "/-/healthy") + session := <-sessionChan + + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + stream, err := session.Open() + if err != nil { + return nil, err + } + if err := binary.Write(stream, binary.LittleEndian, uint16(len(prometheus.Listener.Addr().String()))); err != nil { + return nil, err + } + if _, err = stream.Write([]byte(prometheus.Listener.Addr().String())); err != nil { + return nil, err + } + return stream, nil + }, + } + client := &http.Client{Transport: transport} + + res, err := client.Get("http://any/-/healthy") require.NoError(t, err) data, err := ioutil.ReadAll(res.Body) require.NoError(t, err) res.Body.Close() assert.Equal(t, "Prometheus is Healthy.", string(data)) + + transport = &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + stream, err := session.Open() + if err != nil { + return nil, err + } + if err := binary.Write(stream, binary.LittleEndian, uint16(len(pyroscope.Listener.Addr().String()))); err != nil { + return nil, err + } + if _, err = stream.Write([]byte(pyroscope.Listener.Addr().String())); err != nil { + return nil, err + } + return stream, nil + }, + } + client = &http.Client{Transport: transport} + + res, err = client.Get("http://any/-/healthy") + require.NoError(t, err) + data, err = ioutil.ReadAll(res.Body) + require.NoError(t, err) + res.Body.Close() + assert.Equal(t, "Pyroscope is Healthy.", string(data)) + } -func readToken(t *testing.T, conn net.Conn) { - buf := make([]byte, 36) - _, err := conn.Read(buf) +func readHeaderAndConfig(t *testing.T, conn net.Conn, token string, config []byte) { + h := Header{} + require.NoError(t, binary.Read(conn, binary.LittleEndian, &h)) + require.Equal(t, token, string(h.Token[:])) + require.Equal(t, version, string(bytes.Trim(h.Version[:], "\x00"))) + + buf := make([]byte, int(h.ConfigSize)) + _, err := io.ReadFull(conn, buf) require.NoError(t, err) - require.Equal(t, projectToken, string(buf)) + require.Equal(t, config, buf) } func writeStatus(t *testing.T, conn net.Conn, status uint16) { diff --git a/go.mod b/go.mod index be46e7e..0214b89 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ -module github.com/coroot/promtun +module github.com/coroot/coroot-connect -go 1.15 +go 1.19 require ( github.com/hashicorp/yamux v0.0.0-20200609203250-aecfd211c9ce @@ -8,3 +8,9 @@ require ( github.com/stretchr/testify v1.7.0 k8s.io/klog v1.0.0 ) + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/schema.svg b/schema.svg deleted file mode 100644 index 1e1c2ce..0000000 --- a/schema.svg +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - - - - - - - - - -