Skip to content

Commit

Permalink
k8s: add method to proxy connection to container's TCP port
Browse files Browse the repository at this point in the history
Extent the Client implementation introducing a new method which allows
to proxy a connection to a TCP port inside a container. This mimics the
behavior of the port-forward implementation, but it directly provides
access to the forwarded stream (through a ReadWriteCloser interface),
rather than exposing it through a local port.

Signed-off-by: Marco Iorio <[email protected]>
  • Loading branch information
giorio94 authored and michi-covalent committed May 15, 2024
1 parent 9f8496f commit ca3037f
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand All @@ -34,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
Expand All @@ -43,6 +47,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth" // Register all auth providers (azure, gcp, oidc, openstack, ..).
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/transport/spdy"

"github.com/cilium/cilium-cli/defaults"
)
Expand Down Expand Up @@ -734,6 +739,94 @@ func (c *Client) ProxyGet(ctx context.Context, namespace, name, url string) (str
return string(rawbody), nil
}

func (c *Client) ProxyTCP(ctx context.Context, namespace, name string, port uint16, handler func(io.ReadWriteCloser) error) error {
request := c.Clientset.CoreV1().RESTClient().Post().
Resource(corev1.ResourcePods.String()).
Namespace(namespace).
Name(name).
SubResource("portforward")

transport, upgrader, err := spdy.RoundTripperFor(c.Config)
if err != nil {
return fmt.Errorf("creating round tripper: %w", err)
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, request.URL())

const portForwardProtocolV1Name = "portforward.k8s.io"
conn, proto, err := dialer.Dial(portForwardProtocolV1Name)
if err != nil {
return fmt.Errorf("connecting: %w", err)
}

defer conn.Close()
if proto != portForwardProtocolV1Name {
return fmt.Errorf("unable to negotiate protocol: client supports %q, server returned %q", portForwardProtocolV1Name, proto)
}

go func() {
select {
case <-ctx.Done():
// Close aborts all remaining streams, and unblocks read operations.
conn.Close()
case <-conn.CloseChan():
}
}()

return stream(conn, port, handler)
}

// The following is an adapted version of part of the client-go's port-forward connection handling implementation:
// https://github.com/kubernetes/client-go/blob/4ebe42d8c9c18f464fcc7b4f15b3a632db4cbdb2/tools/portforward/portforward.go#L335-L416
func stream(conn httpstream.Connection, port uint16, handler func(io.ReadWriteCloser) error) error {
headers := http.Header{}
headers.Set(corev1.StreamType, corev1.StreamTypeError)
headers.Set(corev1.PortHeader, strconv.FormatUint(uint64(port), 10))

errorStream, err := conn.CreateStream(headers)
if err != nil {
return fmt.Errorf("creating error stream: %w", err)
}
// we're not writing to this stream
errorStream.Close()
defer conn.RemoveStreams(errorStream)

errorDone := make(chan error)
go func() {
defer close(errorDone)
message, err := io.ReadAll(errorStream)
switch {
case err != nil:
errorDone <- fmt.Errorf("reading from error stream: %w", err)
case len(message) > 0:
errorDone <- errors.New(string(message))
}
}()

headers.Set(corev1.StreamType, corev1.StreamTypeData)
dataStream, err := conn.CreateStream(headers)
if err != nil {
return fmt.Errorf("creating data stream: %w", err)
}
defer conn.RemoveStreams(dataStream)

dataDone := make(chan error)
go func() {
defer close(dataDone)
if err := handler(dataStream); err != nil {
dataDone <- err
}
}()

// Wait for both goroutines to terminate
err = <-dataDone
if err2 := <-errorDone; err2 != nil {
err = err2
}

return err
}

func (c *Client) ListUnstructured(ctx context.Context, gvr schema.GroupVersionResource, namespace *string, o metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if namespace == nil {
return c.DynamicClientset.Resource(gvr).List(ctx, o)
Expand Down

0 comments on commit ca3037f

Please sign in to comment.