Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How should we implement WebSocket connection? #229

Closed
alexjg opened this issue Apr 19, 2020 · 12 comments
Closed

How should we implement WebSocket connection? #229

alexjg opened this issue Apr 19, 2020 · 12 comments
Labels
api Api abstraction related client-gold gold client requirements

Comments

@alexjg
Copy link

alexjg commented Apr 19, 2020

There are a bunch of API calls which require interactive streams to the API server. To my knowledge these are:

  • exec
  • port-forward
  • proxy
  • attach

There's not a lot of documentation on exactly how these are implemented. I've been able to find this document which - under the header "Specialized Requests" describes a websocket based streaming protocol. Apparently the api server prefixes ever frame in the websocket connection with an integer between 0 and 256 which labels the stream that frame is intended for. e.g 0 is meant for stdin, 1 for stdout, and 2 for stderr.

With respect to port-fofrwarding each frame is additionally labelled with a two byte prefix (after the stream prefix) indicating which port the frame is intended for.

This indicates that the approach we might take would be to make a POST request to the port forwarding endpoint (documented here) with a websocket upgrade header. Something like this:

POST /api/v1/namespaces/{namespace}/pods/{name}/portforward?ports=9090
Connection: Upgrade
Upgrade: websocket

This all seemed straightforward enough so I started thinking about the API we could use, and I followed along from the logging implementation to produce something like this in subresource.rs:

pub trait PortForwardObject {}

impl <K> Api<K>
where
    K: Clone + DeserializeOwned + PortForwardObject,
{
    pub async fn port_forward(&self, pod_name: &str, port_bindings: Vec<(u16, u16)>) -> Result<()> {
        ...
    }
}

Where the returned future would be intended to run forever to drive the port forward loop. This is probably not a good API and needs work, but more importantly I haven't found a way to implement it. That's what I want to figure out here.

WebSockets, Reqwest, and SSL. A tale of woe.

If we're going to write some code which examines each frame of the websocket stream and then dispatch that frame to the relevant port on localhost then we need an asynchronous stream of websocket messages.

My first thought was that maybe reqwest supports this usecase, unfortunately it does not, though there is an issue.

There are a few websockets libraries out there. The two that people seem to use are tungstenite and it's associated tokio bindings, or websocket-rs.

Of these two Tungstenite seems more actively maintatined (indeed, the first line of the websocket repo suggests that Tungstenite might be a better option as it's more actively maintainted). Unfortunately neither library gives us an API which we can easily use.

The problem is that we have some custom SSL parameters to set up. Specifically we need to set the root certificate and client certificate. This causes us two problems:

  1. We don't actually have that config anywhere by the time you want to create a websocket stream. We give those parameters to Api::client and then forget about it
  2. Tungstenite doesn't give us any way to pass that config on to it's various functions for opening a connection and websocket-rs only alloss us to do that for native_tls

The first problem is not so hard to surmount, we can just add some simple structs for tracking this information. The second problem is harder. In order to overcome this problem we would need to use methods - exposed in both libraries - which run websockets over an existing stream. E.g, in Tungstenite:

pub async fn client_async_tls<R, S>(
    request: R, 
    stream: S
) -> Result<(WebSocketStream<S>, Response), Error> where
    R: IntoClientRequest + Unpin,
    S: 'static + AsyncRead + AsyncWrite + Send + Unpin,
    S: Unpin, 

In order to do that we need to do DNS resolution, then open a TLS stream to the resolved host. And we need to do this twice, once for native-tls and once for rustls. At this point it feels like we're implementing work that perhaps should be in another library.

What then?

This is mostly just a braindump of things I learned whilst trying to implement this today. I'm leaving it here in case anyone has any idea how to move it forward and as notes to myself when I come back to it later.

Maybe someone has some experience or ideas here that I haven't thought of?

@nightkr
Copy link
Member

nightkr commented Apr 19, 2020

IMO we should just provide a AsyncRead + AsyncWrite pair for the connection, binding that to a local port isn't really specific to K8s.

@alexjg
Copy link
Author

alexjg commented Apr 19, 2020

Yeah that makes sense. Although I guess you would actually want to provide something like HashMap<u16, S> where S: AsyncRead + AsyncWrite right? Because you might bind multiple ports. at once.

@nightkr
Copy link
Member

nightkr commented Apr 19, 2020

That makes sense. It might be slightly more efficient to provide our own struct that builds the impl AsyncRead + AsyncWrite on demand and tie the lifetimes of the streams to that, but it would also complicate the API for the sake of skipping an Arc.

@nightkr
Copy link
Member

nightkr commented Apr 19, 2020

Actually.. Each port in the original request represents a single connection, right? If so, we should probably ensure that we can have more than one open connection to a single port. Maybe return a Vec<S> instead, with the same order as the port list.

@alexjg
Copy link
Author

alexjg commented Apr 19, 2020

Yeah building our own stream impl makes sense.

Not sure I fully understand what you mean by returning a Vec<S>. Do you mean if the user was trying to do something akin to kubectl port-forward 1234:8080 1235:8080 then there would be two local ports bound to the single remote 8080 port and the return hashmap is keyed on the remote port?

@nightkr
Copy link
Member

nightkr commented Apr 19, 2020

Not sure I fully understand what you mean by returning a Vec<S>. Do you mean if the user was trying to do something akin to kubectl port-forward 1234:8080 1235:8080 then there would be two local ports bound to the single remote 8080 port and the return hashmap is keyed on the remote port?

It doesn't map directly to kubectl port-forward. When kubectl forwards a port, there can be multiple connections to the local port, which each require a separate stream (and TCP connection to the actual pod from the apiserver's end).

That said, from what I can tell, kubectl can't actually do this kind of batching, because the API requires you to declare all the desired streams up front when making the initial request.

@alexjg
Copy link
Author

alexjg commented Apr 20, 2020

kubectl might possibly be doing something different because it's actually implemented over SPDY streams. The websocket implementation is - as far as I can tell - something which was added to support the python client. It may be more limited than the SPDY version.

@clux clux added the api Api abstraction related label Apr 27, 2020
@koiuo
Copy link
Contributor

koiuo commented Jun 7, 2020

It may be more limited than the SPDY version.

It is claimed kubernetes/enhancements#384 that websockets have full feature parity with SPDY. SPDY is also officially deprecated, although judging from that kubernetes issue it may stick with kubernetes for a while.

A breif description of the exec communication protocol over websockets can be found here
https://www.openshift.com/blog/executing-commands-in-pods-using-k8s-api

@alexjg
Copy link
Author

alexjg commented Jun 8, 2020

There's also an ongoing issue in kubectl to switch to the websockets API here kubernetes/kubernetes#89163

@clux clux added the client-gold gold client requirements label Aug 14, 2020
@kazk
Copy link
Member

kazk commented Dec 25, 2020

I'm still very new to kube, so I might be missing something, but I think the following is worth a try.

  • Use async-tungstenite with tokio-runtime feature to connect
  • If Kubernetes is message oriented, then we can split the stream into read and write binary message streams. If it's continuous, we'll need to use ws_stream_tungstenite to get a stream of bytes (AsyncRead/AsyncWrite/AsyncBufRead) from WebSocketStream and get that framed based on the specification.
  • To handle channels, implement demultiplexer to read (split into separate channels using the message prefix), multiplexer to write (prefix the channel id to the message). I had to do something like this for Docker before with just stdin/stdout/stderr (message had 8 bytes prefix: [chan, 0, 0, 0, 4 bytes size]), but I didn't need to handle stdin.

If I'm understanding correctly, kubernetes-client/python-base's stream/ws_client.py keeps a map of channel to its buffer and provides a way to read from them. It also provides a way to write to a channel by sending a message prefixed with an integer. We should be able to do something similar with the above crates and some code.

For port forwarding, I think it pipes to the local ports based on the prefix, but I only skimmed.

@kazk
Copy link
Member

kazk commented Dec 27, 2020

@alexjg @edio I opened a PR (#360) based on the above and implemented the output half of Pod /attach (see the example). I'd appreciate any feedback :)

It's currently awkward to use:

// Need to specify all the fields :(
let ap = AttachParams {
    container: None,
    stdout: Some(tokio::io::stdout()),
    stdin: Some(tokio::io::empty()),
    stderr: None,
    tty: false,
};
// Pod's stdout is piped to stdout
pods.attach("example", ap).await?;

So I'm considering to change the params to booleans and return a struct that you can read/write as @teozkr suggested:

Just spitballing a bit, but maybe it'd be worth considering turning this on its head and returning an object that impls AsyncRead + AsyncWrite instead? That way it would work as a pretty decent inverse of tokio::io's stdio family, and be less likely to require the application writer to implement AsyncRead / AsyncWrite on their own.

I haven't decided how to implement it yet.

Protocol

I haven't found any official documentation on the protocol, so I'm using the official Python client implementation as a reference. As far as I can tell, it's simple and message oriented, so we don't need to write a codec. Also, a single multiplexed connection is used to send and receive messages. The message is sent in a binary frame and its first byte is used to specify its channel (destination/purpose).

For /attach and /exec, 5 channels are defined: 0 = stdin, 1 = stdout, 2 = stderr, 3 = error, 4 = resize. The purpose of error and resize channels are unclear. error channel seems to receive some YAML message before the connection terminates that contains error details if any. resize is related to the tty flag.

For /portforward, channels are associated with the specified ports and uses 2 channels per port: duplex data channel (2*i) and read-only error channel (2*i + 1) for ith port.

The first message for a channel is 3 bytes and it describes this association:

  1. channel (u8)
  2. port number (u16 LE)

Subsequent messages have just the channel prefix like the attach/exec one.

@kazk kazk changed the title How should we implement port-forwarding? How should we implement WebSocket connection? Feb 8, 2021
@kazk
Copy link
Member

kazk commented Feb 8, 2021

Closing because the discussion was about how to implement WebSocket connection in general and not about portforward.

WebSocket connection was first implemented (#360) using async-tungstenite with a separate TLS connector. We've now replaced reqwest with hyper (#394) and use hyper to upgrade to WebSocket connection instead. We no longer have a separate client and WebSocketStream is created from the upgraded connection.

@kazk kazk closed this as completed Feb 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api Api abstraction related client-gold gold client requirements
Projects
None yet
Development

No branches or pull requests

5 participants