Skip to content

Commit

Permalink
added support for proxying to multiple destinations within a single t…
Browse files Browse the repository at this point in the history
…unnel: Prometheus, ClickHouse, and Pyroscope
  • Loading branch information
def committed May 24, 2023
1 parent b0a1539 commit 677ba26
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 136 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:1.17-stretch AS builder
FROM golang:1.19-bullseye AS builder

WORKDIR /go/src/promtun
WORKDIR /go/src/coroot-connect

COPY go.mod .
COPY go.sum .
Expand All @@ -13,5 +13,5 @@ RUN CGO_ENABLED=0 go install -mod=readonly -ldflags "-X main.version=$VERSION" .

FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/promtun /promtun
CMD ["/promtun"]
COPY --from=builder /go/bin/coroot-connect /coroot-connect
CMD ["/coroot-connect"]
37 changes: 4 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,13 @@
# Promtum
# Coroot-connect

Promtun is a tiny tool to establish a tunnel between a Prometheus server and Coroot cloud.

![](./schema.svg)
Coroot-connect is a tool that establishes secure tunnels between the Coroot cloud and customers' clusters.

## Run

### Docker

```bash
docker run --detach --name coroot-promtun \
-e PROMETHEUS_ADDRESS=<INTERNAL_PROMETHEUS_HOST_AND_PORT> \
-e PROJECT_TOKEN=<COROOT_PROJECT_TOKEN> \
ghcr.io/coroot/promtun
```
TDB

### Kubernetes

```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: promtun
namespace: coroot
spec:
selector:
matchLabels: {app: promtun}
replicas: 1
template:
metadata:
labels: {app: promtun}
spec:
containers:
- name: promtun
image: ghcr.io/coroot/promtun
env:
- name: PROMETHEUS_ADDRESS
value: <INTERNAL_PROMETHEUS_HOST_AND_PORT>
- name: PROJECT_TOKEN
value: <COROOT_PROJECT_TOKEN>
```
TBD
135 changes: 81 additions & 54 deletions promtun.go → connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@ var (
backoffFactor = 2.
backoffMin = 5 * time.Second
backoffMax = time.Minute
streamTimeout = time.Minute
streamTimeout = 5 * time.Minute
)

type Tunnel struct {
promAddr string
address string
projectToken string
serverName string
cancelFn context.CancelFunc
gwConn net.Conn
address string
serverName string
token string
config []byte
cancelFn context.CancelFunc
gwConn net.Conn
}

func NewTunnel(address, serverName, promAddr, projectToken string) *Tunnel {
func NewTunnel(address, serverName string, token string, config []byte) *Tunnel {
t := &Tunnel{
address: address,
promAddr: promAddr,
projectToken: projectToken,
serverName: serverName,
address: address,
serverName: serverName,
token: token,
config: config,
}
var ctx context.Context
ctx, t.cancelFn = context.WithCancel(context.Background())
Expand All @@ -59,15 +59,15 @@ func (t *Tunnel) keepConnected(ctx context.Context) {
case <-ctx.Done():
return
default:
t.gwConn, err = connect(t.address, t.serverName, t.projectToken)
t.gwConn, err = connect(t.address, t.serverName, t.token, t.config)
if err != nil {
d := b.Duration()
klog.Errorf("%s, reconnecting to %s in %.0fs", err, t.address, d.Seconds())
time.Sleep(d)
continue
}
b.Reset()
proxy(ctx, t.gwConn, t.promAddr)
proxy(ctx, t.gwConn)
_ = t.gwConn.Close()
}
}
Expand All @@ -81,30 +81,40 @@ func (t *Tunnel) Close() {
}

func main() {
klog.Infof("version: %s", version)
resolverUrl := os.Getenv("RESOLVER_URL")
if resolverUrl == "" {
resolverUrl = "https://gw.coroot.com/promtun/resolve"
}
promAddr := mustEnv("PROMETHEUS_ADDRESS")
projectToken := mustEnv("PROJECT_TOKEN")
token := mustEnv("PROJECT_TOKEN")
if len(token) != 36 {
klog.Exitln("invalid project token")
}
configPath := mustEnv("CONFIG_PATH")

u, err := url.Parse(resolverUrl)
data, err := os.ReadFile(configPath)
if err != nil {
klog.Exitf("invalid RESOLVER_URL %s: %s", resolverUrl, err)
klog.Exitln("failed to read config:", err)
}
serverName := u.Hostname()
config := []byte(os.ExpandEnv(string(data)))

if err := pingProm(promAddr); err != nil {
klog.Exitf("failed to ping prometheus: %s", err)
klog.Infof("version: %s", version)

loop(token, resolverUrl, config)
}

func loop(token, resolverUrl string, config []byte) {
u, err := url.Parse(resolverUrl)
if err != nil {
klog.Exitf("invalid resolver URL %s: %s", resolverUrl, err)
}
tlsServerName := u.Hostname()

tunnels := map[string]*Tunnel{}

b := backoff.Backoff{Factor: backoffFactor, Min: backoffMin, Max: backoffMax}
for {
klog.Infof("updating gateways endpoints from %s", resolverUrl)
endpoints, err := getEndpoints(resolverUrl, projectToken)
endpoints, err := getEndpoints(resolverUrl, token)
if err != nil {
d := b.Duration()
klog.Errorf("failed to get gateway endpoints: %s, retry in %.0fs", err, d.Seconds())
Expand All @@ -117,13 +127,13 @@ func main() {
for _, e := range endpoints {
fresh[e] = true
if _, ok := tunnels[e]; !ok {
klog.Infof("starting tunnel to %s", e)
tunnels[e] = NewTunnel(e, serverName, promAddr, projectToken)
klog.Infof("starting a tunnel to %s", e)
tunnels[e] = NewTunnel(e, tlsServerName, token, config)
}
}
for e, t := range tunnels {
if !fresh[e] {
klog.Infof("closing tunnel to %s", e)
klog.Infof("closing tunnel with %s", e)
t.Close()
delete(tunnels, e)
}
Expand All @@ -132,9 +142,9 @@ func main() {
}
}

func getEndpoints(resolverUrl, projectToken string) ([]string, error) {
func getEndpoints(resolverUrl, token string) ([]string, error) {
req, _ := http.NewRequest("GET", resolverUrl, nil)
req.Header.Set("X-Token", projectToken)
req.Header.Set("X-Token", token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
Expand All @@ -150,29 +160,44 @@ func getEndpoints(resolverUrl, projectToken string) ([]string, error) {
return strings.Split(strings.TrimSpace(string(payload)), ";"), nil
}

func connect(gwAddr, serverName, projectToken string) (net.Conn, error) {
type Header struct {
Token [36]byte
Version [16]byte
ConfigSize uint32
}

func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error) {
h := Header{}
copy(h.Token[:], token)
copy(h.Version[:], version)
h.ConfigSize = uint32(len(config))

klog.Infof("connecting to %s (%s)", gwAddr, serverName)
deadline := time.Now().Add(timeout)
dialer := &net.Dialer{Deadline: deadline}
tlsCfg := &tls.Config{InsecureSkipVerify: tlsSkipVerify, ServerName: serverName}
tlsCfg := &tls.Config{ServerName: serverName, InsecureSkipVerify: tlsSkipVerify}
gwConn, err := tls.DialWithDialer(dialer, "tcp", gwAddr, tlsCfg)
if err != nil {
return nil, fmt.Errorf("failed to establish connection to %s: %s", gwAddr, err)
return nil, fmt.Errorf("failed to establish a connection to %s: %s", gwAddr, err)
}
klog.Infof("connected to gateway %s", gwAddr)

_ = gwConn.SetDeadline(deadline)
if _, err := gwConn.Write([]byte(projectToken)); err != nil {
if err = binary.Write(gwConn, binary.LittleEndian, h); err != nil {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err)
}
if _, err = gwConn.Write(config); err != nil {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to send project token to %s: %s", gwAddr, err)
return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err)
}
var resp uint16
if err := binary.Read(gwConn, binary.LittleEndian, &resp); err != nil {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to read gateway response from %s: %s", gwAddr, err)
return nil, fmt.Errorf("failed to read the response from %s: %s", gwAddr, err)
}
_ = gwConn.SetDeadline(time.Time{})
klog.Infof("got from gateway %s: %d", gwAddr, resp)
klog.Infof(`got "%d" from the gateway %s`, resp, gwAddr)

if resp != 200 {
_ = gwConn.Close()
Expand All @@ -182,7 +207,7 @@ func connect(gwAddr, serverName, projectToken string) (net.Conn, error) {
return gwConn, nil
}

func proxy(ctx context.Context, gwConn net.Conn, promAddr string) {
func proxy(ctx context.Context, gwConn net.Conn) {
cfg := yamux.DefaultConfig()
cfg.KeepAliveInterval = time.Second
cfg.LogOutput = ioutil.Discard
Expand All @@ -199,30 +224,41 @@ func proxy(ctx context.Context, gwConn net.Conn, promAddr string) {
default:
gwStream, err := session.Accept()
if err != nil {
klog.Errorf("failed to accept stream: %s", err)
klog.Errorf("failed to accept a stream: %s", err)
return
}
go func(c net.Conn) {
defer c.Close()
deadline := time.Now().Add(streamTimeout)
if err := c.SetDeadline(deadline); err != nil {
klog.Errorf("failed to set deadline to stream: %s", err)
klog.Errorf("failed to set a deadline for the stream: %s", err)
return
}
promConn, err := net.DialTimeout("tcp", promAddr, timeout)
var dstLen uint16
if err := binary.Read(c, binary.LittleEndian, &dstLen); err != nil {
klog.Errorf("failed to read the destination size: %s", err)
return
}
dest := make([]byte, int(dstLen))
if _, err := io.ReadFull(c, dest); err != nil {
klog.Errorf("failed to read the destination address: %s", err)
return
}
destAddress := string(dest)
destConn, err := net.DialTimeout("tcp", destAddress, timeout)
if err != nil {
klog.Errorf("failed to establish prometheus connection: %s", err)
klog.Errorf("failed to establish a connection to %s: %s", destAddress, err)
return
}
defer promConn.Close()
if err = promConn.SetDeadline(deadline); err != nil {
klog.Errorf("failed to set deadline to prometheus connection: %s", err)
defer destConn.Close()
if err = destConn.SetDeadline(deadline); err != nil {
klog.Errorf("failed to set a deadline for the dest connection: %s", err)
return
}
go func() {
io.Copy(c, promConn)
io.Copy(c, destConn)
}()
io.Copy(promConn, c)
io.Copy(destConn, c)
}(gwStream)
}
}
Expand All @@ -235,12 +271,3 @@ func mustEnv(key string) string {
}
return value
}

func pingProm(addr string) error {
c, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return err
}
_ = c.Close()
return nil
}
Loading

0 comments on commit 677ba26

Please sign in to comment.