Skip to content

Commit

Permalink
refactor(ndt7): use netxlite rather than netx (#768)
Browse files Browse the repository at this point in the history
This diff required us to move some code around, but no major
change actually happened, except better tests.

While there, I also slightly refactored ndt7's implementation and
removed the ProxyURL setting, which was actually unused.

See ooni/probe#2121
  • Loading branch information
bassosimone authored May 30, 2022
1 parent 314c3c9 commit 3265bc6
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 159 deletions.
52 changes: 52 additions & 0 deletions internal/bytecounter/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package bytecounter

//
// model.Dialer wrappers
//

import (
"context"
"net"

"github.com/ooni/probe-cli/v3/internal/model"
)

// ContextAwareDialer is a model.Dialer that attempts to count bytes using
// the MaybeWrapWithContextByteCounters function.
//
// Bug
//
// This implementation cannot properly account for the bytes that are sent by
// persistent connections, because they stick to the counters set when the
// connection was established. This typically means we miss the bytes sent and
// received when submitting a measurement. Such bytes are specifically not
// seen by the experiment specific byte counter.
//
// For this reason, this implementation may be heavily changed/removed
// in the future (<- this message is now ~two years old, though).
type ContextAwareDialer struct {
Dialer model.Dialer
}

// NewContextAwareDialer creates a new ContextAwareDialer.
func NewContextAwareDialer(dialer model.Dialer) *ContextAwareDialer {
return &ContextAwareDialer{Dialer: dialer}
}

var _ model.Dialer = &ContextAwareDialer{}

// DialContext implements Dialer.DialContext
func (d *ContextAwareDialer) DialContext(
ctx context.Context, network, address string) (net.Conn, error) {
conn, err := d.Dialer.DialContext(ctx, network, address)
if err != nil {
return nil, err
}
conn = MaybeWrapWithContextByteCounters(ctx, conn)
return conn, nil
}

// CloseIdleConnections implements Dialer.CloseIdleConnections.
func (d *ContextAwareDialer) CloseIdleConnections() {
d.Dialer.CloseIdleConnections()
}
101 changes: 101 additions & 0 deletions internal/bytecounter/dialer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package bytecounter

import (
"context"
"errors"
"io"
"net"
"testing"

"github.com/ooni/probe-cli/v3/internal/model/mocks"
)

func TestContextAwareDialer(t *testing.T) {
t.Run("DialContext", func(t *testing.T) {
dialAndUseConn := func(ctx context.Context, bufsiz int) error {
childConn := &mocks.Conn{
MockRead: func(b []byte) (int, error) {
return len(b), nil
},
MockWrite: func(b []byte) (int, error) {
return len(b), nil
},
}
child := &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
return childConn, nil
},
}
dialer := NewContextAwareDialer(child)
conn, err := dialer.DialContext(ctx, "tcp", "10.0.0.1:443")
if err != nil {
return err
}
buffer := make([]byte, bufsiz)
conn.Read(buffer)
conn.Write(buffer)
return nil
}

t.Run("normal usage", func(t *testing.T) {
if testing.Short() {
t.Skip("skip test in short mode")
}
sess := New()
ctx := context.Background()
ctx = WithSessionByteCounter(ctx, sess)
const count = 128
if err := dialAndUseConn(ctx, count); err != nil {
t.Fatal(err)
}
exp := New()
ctx = WithExperimentByteCounter(ctx, exp)
if err := dialAndUseConn(ctx, count); err != nil {
t.Fatal(err)
}
if exp.Received.Load() != count {
t.Fatal("experiment should have received 128 bytes")
}
if sess.Received.Load() != 2*count {
t.Fatal("session should have received 256 bytes")
}
if exp.Sent.Load() != count {
t.Fatal("experiment should have sent 128 bytes")
}
if sess.Sent.Load() != 256 {
t.Fatal("session should have sent 256 bytes")
}
})

t.Run("failure", func(t *testing.T) {
dialer := &ContextAwareDialer{
Dialer: &mocks.Dialer{
MockDialContext: func(ctx context.Context, network string, address string) (net.Conn, error) {
return nil, io.EOF
},
},
}
conn, err := dialer.DialContext(context.Background(), "tcp", "www.google.com:80")
if !errors.Is(err, io.EOF) {
t.Fatal("not the error we expected")
}
if conn != nil {
t.Fatal("expected nil conn here")
}
})
})

t.Run("CloseIdleConnections", func(t *testing.T) {
var called bool
child := &mocks.Dialer{
MockCloseIdleConnections: func() {
called = true
},
}
dialer := NewContextAwareDialer(child)
dialer.CloseIdleConnections()
if !called {
t.Fatal("not called")
}
})
}
10 changes: 0 additions & 10 deletions internal/engine/experiment/ndt7/callback_test.go

This file was deleted.

17 changes: 4 additions & 13 deletions internal/engine/experiment/ndt7/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import (
"context"
"crypto/tls"
"net/http"
"net/url"

"github.com/gorilla/websocket"
"github.com/ooni/probe-cli/v3/internal/engine/netx/dialer"
"github.com/ooni/probe-cli/v3/internal/bytecounter"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)

type dialManager struct {
ndt7URL string
logger model.Logger
proxyURL *url.URL
readBufferSize int
userAgent string
writeBufferSize int
Expand All @@ -32,16 +30,9 @@ func newDialManager(ndt7URL string, logger model.Logger, userAgent string) dialM
}

func (mgr dialManager) dialWithTestName(ctx context.Context, testName string) (*websocket.Conn, error) {
var reso model.Resolver = &netxlite.ResolverSystem{}
reso = &netxlite.ResolverLogger{
Resolver: reso,
Logger: mgr.logger,
}
dlr := dialer.New(&dialer.Config{
ContextByteCounting: true,
Logger: mgr.logger,
ProxyURL: mgr.proxyURL,
}, reso)
reso := netxlite.NewResolverStdlib(mgr.logger)
dlr := netxlite.NewDialerWithResolver(mgr.logger, reso)
dlr = bytecounter.NewContextAwareDialer(dlr)
// Implements shaping if the user builds using `-tags shaping`
// See https://github.com/ooni/probe/issues/2112
dlr = netxlite.NewMaybeShapingDialer(dlr)
Expand Down
4 changes: 2 additions & 2 deletions internal/engine/experiment/ndt7/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type downloadManager struct {
conn mockableConn
conn wsConn
maxMessageSize int64
maxRuntime time.Duration
measureInterval time.Duration
Expand All @@ -20,7 +20,7 @@ type downloadManager struct {
}

func newDownloadManager(
conn mockableConn, onPerformance callbackPerformance,
conn wsConn, onPerformance callbackPerformance,
onJSON callbackJSON,
) downloadManager {
return downloadManager{
Expand Down
19 changes: 13 additions & 6 deletions internal/engine/experiment/ndt7/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ import (
"github.com/gorilla/websocket"
)

func defaultCallbackJSON(data []byte) error {
return nil
}

func defaultCallbackPerformance(elapsed time.Duration, count int64) {
}

func TestDownloadSetReadDeadlineFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
&mockableWSConn{
ReadDeadlineErr: expected,
},
defaultCallbackPerformance,
Expand All @@ -30,7 +37,7 @@ func TestDownloadSetReadDeadlineFailure(t *testing.T) {
func TestDownloadNextReaderFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
&mockableWSConn{
NextReaderErr: expected,
},
defaultCallbackPerformance,
Expand All @@ -45,7 +52,7 @@ func TestDownloadNextReaderFailure(t *testing.T) {
func TestDownloadTextMessageReadAllFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
&mockableWSConn{
NextReaderMsgType: websocket.TextMessage,
NextReaderReader: func() io.Reader {
return &alwaysFailingReader{
Expand Down Expand Up @@ -73,7 +80,7 @@ func (r *alwaysFailingReader) Read(p []byte) (int, error) {
func TestDownloadBinaryMessageReadAllFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newDownloadManager(
&mockableConnMock{
&mockableWSConn{
NextReaderMsgType: websocket.BinaryMessage,
NextReaderReader: func() io.Reader {
return &alwaysFailingReader{
Expand All @@ -92,7 +99,7 @@ func TestDownloadBinaryMessageReadAllFailure(t *testing.T) {

func TestDownloadOnJSONCallbackError(t *testing.T) {
mgr := newDownloadManager(
&mockableConnMock{
&mockableWSConn{
NextReaderMsgType: websocket.TextMessage,
NextReaderReader: func() io.Reader {
return &invalidJSONReader{}
Expand Down Expand Up @@ -121,7 +128,7 @@ func TestDownloadOnJSONLoop(t *testing.T) {
t.Skip("skip test in short mode")
}
mgr := newDownloadManager(
&mockableConnMock{
&mockableWSConn{
NextReaderMsgType: websocket.TextMessage,
NextReaderReader: func() io.Reader {
return &goodJSONReader{}
Expand Down
4 changes: 2 additions & 2 deletions internal/engine/experiment/ndt7/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func newMessage(n int) (*websocket.PreparedMessage, error) {
}

type uploadManager struct {
conn mockableConn
conn wsConn
fractionForScaling int64
maxRuntime time.Duration
maxMessageSize int
Expand All @@ -24,7 +24,7 @@ type uploadManager struct {
}

func newUploadManager(
conn mockableConn, onPerformance callbackPerformance,
conn wsConn, onPerformance callbackPerformance,
) uploadManager {
return uploadManager{
conn: conn,
Expand Down
10 changes: 5 additions & 5 deletions internal/engine/experiment/ndt7/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestUploadSetWriteDeadlineFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newUploadManager(
&mockableConnMock{
&mockableWSConn{
WriteDeadlineErr: expected,
},
defaultCallbackPerformance,
Expand All @@ -26,7 +26,7 @@ func TestUploadSetWriteDeadlineFailure(t *testing.T) {
func TestUploadNewMessageFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newUploadManager(
&mockableConnMock{},
&mockableWSConn{},
defaultCallbackPerformance,
)
mgr.newMessage = func(int) (*websocket.PreparedMessage, error) {
Expand All @@ -41,7 +41,7 @@ func TestUploadNewMessageFailure(t *testing.T) {
func TestUploadWritePreparedMessageFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newUploadManager(
&mockableConnMock{
&mockableWSConn{
WritePreparedMessageErr: expected,
},
defaultCallbackPerformance,
Expand All @@ -55,7 +55,7 @@ func TestUploadWritePreparedMessageFailure(t *testing.T) {
func TestUploadWritePreparedMessageSubsequentFailure(t *testing.T) {
expected := errors.New("mocked error")
mgr := newUploadManager(
&mockableConnMock{},
&mockableWSConn{},
defaultCallbackPerformance,
)
var already bool
Expand All @@ -77,7 +77,7 @@ func TestUploadLoop(t *testing.T) {
t.Skip("skip test in short mode")
}
mgr := newUploadManager(
&mockableConnMock{},
&mockableWSConn{},
defaultCallbackPerformance,
)
mgr.newMessage = func(int) (*websocket.PreparedMessage, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"github.com/gorilla/websocket"
)

type mockableConn interface {
// weConn is the interface of gorilla/websocket.Conn
type wsConn interface {
NextReader() (int, io.Reader, error)
SetReadDeadline(time.Time) error
SetReadLimit(int64)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/gorilla/websocket"
)

type mockableConnMock struct {
type mockableWSConn struct {
NextReaderMsgType int
NextReaderErr error
NextReaderReader func() io.Reader
Expand All @@ -16,24 +16,24 @@ type mockableConnMock struct {
WritePreparedMessageErr error
}

func (c *mockableConnMock) NextReader() (int, io.Reader, error) {
func (c *mockableWSConn) NextReader() (int, io.Reader, error) {
var reader io.Reader
if c.NextReaderReader != nil {
reader = c.NextReaderReader()
}
return c.NextReaderMsgType, reader, c.NextReaderErr
}

func (c *mockableConnMock) SetReadDeadline(time.Time) error {
func (c *mockableWSConn) SetReadDeadline(time.Time) error {
return c.ReadDeadlineErr
}

func (c *mockableConnMock) SetReadLimit(int64) {}
func (c *mockableWSConn) SetReadLimit(int64) {}

func (c *mockableConnMock) SetWriteDeadline(time.Time) error {
func (c *mockableWSConn) SetWriteDeadline(time.Time) error {
return c.WriteDeadlineErr
}

func (c *mockableConnMock) WritePreparedMessage(*websocket.PreparedMessage) error {
func (c *mockableWSConn) WritePreparedMessage(*websocket.PreparedMessage) error {
return c.WritePreparedMessageErr
}
Loading

0 comments on commit 3265bc6

Please sign in to comment.