Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use WebTransport when it's available #160

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions clientcore/broflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (b *BroflakeEngine) debug() {

func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOptions) (bfconn *BroflakeConn, ui *UIImpl, err error) {
if bfOpt.ClientType != "desktop" && bfOpt.ClientType != "widget" {
err = fmt.Errorf("Invalid clientType '%v\n'", bfOpt.ClientType)
err = fmt.Errorf("invalid clientType '%v\n'", bfOpt.ClientType)
common.Debugf(err.Error())
return bfconn, ui, err
}
Expand All @@ -64,7 +64,11 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt
}

if egOpt == nil {
egOpt = NewDefaultEgressOptions()
if bfOpt.WebTransport {
egOpt = NewDefaultWebTransportEgressOptions()
} else {
egOpt = NewDefaultWebSocketEgressOptions()
}
}

// The boot DAG:
Expand Down Expand Up @@ -95,11 +99,17 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt
cfsms = append(cfsms, *NewProducerWebRTC(rtcOpt, &wgReady))
}
cTable = NewWorkerTable(cfsms)

// Widget peers consume connectivity from an egress server over WebSocket
var pfsms []WorkerFSM
for i := 0; i < bfOpt.PTableSize; i++ {
pfsms = append(pfsms, *NewEgressConsumerWebSocket(egOpt, &wgReady))
if bfOpt.WebTransport {
// Chrome widget peers consume connectivity from an egress server over WebTransport
for i := 0; i < bfOpt.PTableSize; i++ {
pfsms = append(pfsms, *NewEgressConsumerWebTransport(egOpt, &wgReady))
}
} else {
// Widget peers consume connectivity from an egress server over WebSocket
for i := 0; i < bfOpt.PTableSize; i++ {
pfsms = append(pfsms, *NewEgressConsumerWebSocket(egOpt, &wgReady))
}
}
pTable = NewWorkerTable(pfsms)
}
Expand Down
54 changes: 54 additions & 0 deletions clientcore/connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package clientcore

import (
"context"
"crypto/tls"
"crypto/x509"
"testing"
"time"

"github.com/getlantern/broflake/common"
"github.com/quic-go/quic-go/http3"
"github.com/quic-go/webtransport-go"
)

func newDefaultWebTransportEgressOptions() *EgressOptions {
return &EgressOptions{
Addr: "https://localhost:8000",
Endpoint: "/wt/",
ConnectTimeout: 5 * time.Second,
ErrorBackoff: 5 * time.Second,
CACert: []byte(caCert),
}
}

func TestConnection(t *testing.T) {
options := newDefaultWebTransportEgressOptions()
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
// rootC
if ok := rootCAs.AppendCertsFromPEM(options.CACert); !ok {
common.Debugf("Couldn't add root certificate: %v", options.CACert)
}

var d webtransport.Dialer = webtransport.Dialer{}
d.RoundTripper = &http3.RoundTripper{
TLSClientConfig: &tls.Config{
//InsecureSkipVerify: true,
RootCAs: rootCAs,
},
}

// TODO: We ideally should create a single session and reuse it for all streams.
_, _, err := d.Dial(context.Background(), options.Addr+options.Endpoint, nil)
if err != nil {
common.Debugf("Couldn't connect to egress server at %v: %v", options.Addr+options.Endpoint, err)
//<-time.After(options.ErrorBackoff)
//return 0, []interface{}{}
} else {
common.Debugf("Connected to egress server at %v", options.Addr+options.Endpoint)
}
time.Sleep(5 * time.Second)
}
153 changes: 148 additions & 5 deletions clientcore/egress_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ package clientcore

import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"sync"
"time"

"nhooyr.io/websocket"

"github.com/getlantern/broflake/common"
"github.com/getlantern/quicwrapper/webt"
"github.com/quic-go/quic-go/http3"
"github.com/quic-go/webtransport-go"
)

func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *WorkerFSM {
Expand All @@ -36,7 +42,7 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor
// like ($, 1)... OR just disallow just-in-time strategies, and make egress consumers
// pre-establish N websocket connections

ctx, cancel := context.WithTimeout(context.Background(), options.ConnectTimeout)
ctx, cancel := context.WithTimeout(ctx, options.ConnectTimeout)
defer cancel()

// TODO: WSS
Expand All @@ -52,7 +58,6 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor
}),
FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) {
// State 1
// input[0]: *websocket.Conn
c := input[0].(*websocket.Conn)
common.Debugf("Egress consumer state 1, WebSocket connection established!")

Expand Down Expand Up @@ -93,7 +98,7 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor
case msg := <-com.rx:
// Write the chunk to the websocket, detect and handle error
// TODO: is it safe to assume the message is a chunk type? Do we trust the router?
err := c.Write(context.Background(), websocket.MessageBinary, msg.Data.([]byte))
err := c.Write(ctx, websocket.MessageBinary, msg.Data.([]byte))
if err != nil {
c.Close(websocket.StatusNormalClosure, err.Error())
common.Debugf("Egress consumer WebSocket write error: %v", err)
Expand All @@ -112,9 +117,147 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor
// stop logic in protocol.go takes over and kills this goroutine.
}
}
}),
})
}

func NewEgressConsumerWebTransport(options *EgressOptions, wg *sync.WaitGroup) *WorkerFSM {
return NewWorkerFSM(wg, []FSMstate{
FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) {
// State 0
// (no input data)
common.Debugf("Egress consumer state 0, opening WebTransport connection...")

// We're resetting this slot, so send a nil path assertion IPC message
com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{}}

// TODO: interesting quirk here: if the table router which manages this WorkerFSM implements
// non-multiplexed just-in-time strategy wherein it creates a new WebTransport connection for
// each new censored peer, we've got a chicken and egg deadlock: the consumer table won't
// start advertising connectivity until it detects a non-nil path assertion, and we won't
// have a non-nil path assertion until a censored peer connects to us. 3 poss solutions: make
// this egress consumer WorkerFSM always emit a (*, 1) path assertion, even when it doesn't
// have upstream connectivity... OR invent another special case for the host field which
// indicates "on request", as an escape hatch which indicates to a consumer table that it
// can use that slot to dial a lantern-controlled exit node, so we'd be emitting something
// like ($, 1)... OR just disallow just-in-time strategies, and make egress consumers
// pre-establish N WebTransport connections

ctx, cancel := context.WithTimeout(ctx, options.ConnectTimeout)
defer cancel()

rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
if ok := rootCAs.AppendCertsFromPEM(options.CACert); !ok {
common.Debugf("Couldn't add root certificate: %v", options.CACert)
}

var d webtransport.Dialer = webtransport.Dialer{}
d.RoundTripper = &http3.RoundTripper{
TLSClientConfig: &tls.Config{
RootCAs: rootCAs,
},
}

url := options.Addr + options.Endpoint

// TODO: We ideally should create a single session and reuse it for all streams.
httpResponse, session, err := d.Dial(ctx, url, nil)
if err != nil {
common.Debugf("Couldn't connect to egress server at %v: %v", url, err)
<-time.After(options.ErrorBackoff)
return 0, []interface{}{}
}
stream, err := session.OpenStream()
if err != nil {
common.Debugf("Couldn't open stream to egress server at %v: %v", url, err)
<-time.After(options.ErrorBackoff)
return 0, []interface{}{}
}

// We convert this to a net.Conn here because it's well understood interface but also
// allows us to encapsulate the relevant methods of both the session and the stream.
c := webt.NewConn(stream, session, httpResponse, func() {
common.Debugf("Egress consumer WebTransport connection closed")
})

return 1, []interface{}{&c}
}),
FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) {
// State 1
c := *input[0].(*net.Conn)
common.Debugf("Egress consumer state 1, WebTransport connection established!")

// Send a path assertion IPC message representing the connectivity now provided by this slot
// TODO: post-MVP we shouldn't be hardcoding (*, 1) here...
allowAll := []common.Endpoint{{Host: "*", Distance: 1}}
com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{Allow: allowAll}}

// WebTransport read loop:
readStatus := make(chan error)
go func(ctx context.Context) {
for {
buf := make([]byte, 1024)
bytesRead, err := c.Read(buf)
if err != nil {
readStatus <- err
return
}

// Wrap the chunk and send it on to the router
select {
case com.tx <- IPCMsg{IpcType: ChunkIPC, Data: buf[:bytesRead]}:
// Do nothing, msg sent
default:
// Drop the chunk if we can't keep up with the data rate
}
}
}(ctx)

// Main loop:
// 1. handle chunks from the bus, write them to the WebTransport, detect and handle write errors
// 2. listen for errors from the read goroutine and handle them

// TODO: We shouldn't reach this code path, right?
return 0, []interface{}{}
// On read or write error, we close the WebTransport to ensure that the egress server detects
// closed connections.
for {
select {
case msg := <-com.rx:
// Write the chunk to the WebTransport, detect and handle error
// TODO: what if the bytes written is less than the chunk size? Do we need to loop?
if data, ok := msg.Data.([]byte); !ok {
common.Debugf("Egress consumer WebTransport received non-byte chunk: %v", msg.Data)
c.Close()
return 0, []interface{}{}
} else if bytesWritten, err := c.Write(data); err != nil {
common.Debugf("Egress consumer WebTransport write error: %v", err)
c.Close()
return 0, []interface{}{}
} else if bytesWritten != len(data) {
// See https://pkg.go.dev/io#Writer for the contract of io.Writer.
// Theoretically we should never hit this code because any writer should return an error if
// it doesn't write the full chunk, but we check anyway just to be safe.
common.Debugf("Egress consumer WebTransport write error: wrote %v bytes, expected %v",
bytesWritten, len(data))
c.Close()
return 0, []interface{}{}
}
// At this point the chunk is written, so loop around and wait for the next chunk
case err := <-readStatus:
common.Debugf("Egress consumer WebTransport read error: %v", err)
c.Close()
return 0, []interface{}{}

// Ordinarily it would be incorrect to put a worker into an infinite loop without including
// a case to listen for context cancellation, but here we handle context cancellation in a
// non-explicit way. Since the worker context bounds the call to net.Conn.Read, worker
// context cancellation results in a Read error, which we trap to stop the child read
// goroutine, close the connection, and return from this state, at which point the worker
// stop logic in protocol.go takes over and kills this goroutine.
}
}
}),
})
}
2 changes: 1 addition & 1 deletion clientcore/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *QUICLayer) DialAndMaintainQUICConnection() {
}

var err error
conn, err = quic.Dial(c.bfconn, common.DebugAddr("NELSON WUZ HERE"), "DEBUG", c.tlsConfig, &common.QUICCfg)
conn, err = quic.Dial(c.ctx, c.bfconn, common.DebugAddr("NELSON WUZ HERE"), c.tlsConfig, &common.QUICCfg)
if err != nil {
common.Debugf("QUIC dial failed (%v), retrying...", err)
continue
Expand Down
48 changes: 42 additions & 6 deletions clientcore/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,34 @@ type EgressOptions struct {
Endpoint string
ConnectTimeout time.Duration
ErrorBackoff time.Duration
CACert []byte
}

func NewDefaultEgressOptions() *EgressOptions {
const caCert = `-----BEGIN CERTIFICATE-----
MIIDoDCCAogCCQCIDNrnudYmTzANBgkqhkiG9w0BAQsFADCBkTELMAkGA1UEBhMC
VVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFDASBgNVBAcMC0xvcyBBbmdlbGVzMQww
CgYDVQQKDANCTlMxEjAQBgNVBAsMCU1hcmtldGluZzEXMBUGA1UEAwwObG9jYWxo
b3N0OjgwMDAxHDAaBgkqhkiG9w0BCQEWDXRlc3RAdGVzdC5jb20wHhcNMjMwNzEw
MTgwODI5WhcNMjQwNzA5MTgwODI5WjCBkTELMAkGA1UEBhMCVVMxEzARBgNVBAgM
CkNhbGlmb3JuaWExFDASBgNVBAcMC0xvcyBBbmdlbGVzMQwwCgYDVQQKDANCTlMx
EjAQBgNVBAsMCU1hcmtldGluZzEXMBUGA1UEAwwObG9jYWxob3N0OjgwMDAxHDAa
BgkqhkiG9w0BCQEWDXRlc3RAdGVzdC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQCx73COifnB44oIGU5OO0Wy0tt8vzzd/bZgIhYqOw0+PFqGWi7a
9UnHpbhYx9NIoNo89CZO6TifsKFT2vuq/cphuhby1h+h+k1QT9jPVE+0vT5EFpe2
6l0eU9joI4g+lCI2HAZS6JEeYOky+yvPsro7K7L22/XoFCm24ZU1KSqsBCSoN4dk
qzcmaV2i5EqvUrg+SFOzesUdB2cj1cnydakPEVJPHgpNtlK1NQdhLiixHmjXuF9P
DvVKPuhSyDMJbfMvTjVveGDP4TrGLAoxKMZvUSGSL3hJ5IulwKXH2YUqU5UQyEOI
LETRNrf3fUR7zzueyRp9Qj+pnENziMeHwzIvAgMBAAEwDQYJKoZIhvcNAQELBQAD
ggEBACbkHE1wPiKvHQZIIBxETvVwU9iAosGFcHOGJgMt7CgatdYEdVUjtBN0sf+y
DfL2PNnaY/gEGohORpgcGWJ1s1zAo4dtGGfnK9IVu07bFqTnABS6aaYj4obl7wJt
gRswuB4QTwDrKVoFVNfhVqXRU5rGxqu1S40axK+ZhkHNH44JP2M1dpAxSFkSZ++S
MW5z67ODDCxOZGYp9f5ulOLSzEZjQ0ux3gndKEQt1SVqx/2ca6xcYyC//ga95Yv+
+Tm8REHMUg5er9deB/99j5AbWj5pYZYmqKnXDAm/2oFaVUqVMvtHfb2zEpF+BPbr
67tOV4gpT2Q2Z6dVlnnjtuaIx9U=
-----END CERTIFICATE-----
`

func NewDefaultWebSocketEgressOptions() *EgressOptions {
return &EgressOptions{
Addr: "ws://localhost:8000",
Endpoint: "/ws",
Expand All @@ -54,12 +79,23 @@ func NewDefaultEgressOptions() *EgressOptions {
}
}

func NewDefaultWebTransportEgressOptions() *EgressOptions {
return &EgressOptions{
Addr: "https://localhost:8000",
Endpoint: "/wt/",
ConnectTimeout: 5 * time.Second,
ErrorBackoff: 5 * time.Second,
CACert: []byte(caCert),
}
}

type BroflakeOptions struct {
ClientType string
CTableSize int
PTableSize int
BusBufferSz int
Netstated string
ClientType string
CTableSize int
PTableSize int
BusBufferSz int
Netstated string
WebTransport bool
}

func NewDefaultBroflakeOptions() *BroflakeOptions {
Expand Down
2 changes: 1 addition & 1 deletion cmd/build.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env bash
set -xe
go build -race -o ./dist/bin/$1 --ldflags="-X 'main.clientType=$1'"
go build -race -o ./dist/bin/"$1" --ldflags="-X 'main.clientType=$1'"
2 changes: 1 addition & 1 deletion cmd/client_default_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
rtcOpt.DiscoverySrv = freddie
}

egOpt := clientcore.NewDefaultEgressOptions()
egOpt := clientcore.NewDefaultWebSocketEgressOptions()

if egress != "" {
egOpt.Addr = egress
Expand Down
Loading