Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: host access port instability #2867

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,18 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
return nil, fmt.Errorf("expose host ports: %w", err)
}

defer func() {
if err != nil && con == nil {
// Container setup failed so ensure we clean up the sshd container too.
ctr := &DockerContainer{
provider: p,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{sshdForwardPortsHook},
}
err = errors.Join(ctr.terminatingHook(ctx))
}
}()

defaultHooks = append(defaultHooks, sshdForwardPortsHook)
}

Expand Down Expand Up @@ -1623,17 +1635,22 @@ func (p *DockerProvider) ensureDefaultNetwork(ctx context.Context) (string, erro
return "", fmt.Errorf("network list: %w", err)
}

// Prefer the default bridge network if it exists.
// This makes the results stable as network list order is not guaranteed.
for _, net := range networkResources {
switch net.Name {
case p.defaultBridgeNetworkName:
p.defaultNetwork = p.defaultBridgeNetworkName
return p.defaultNetwork, nil
case ReaperDefault:
p.defaultNetwork = ReaperDefault
return p.defaultNetwork, nil
}
}

if p.defaultNetwork != "" {
stevenh marked this conversation as resolved.
Show resolved Hide resolved
return p.defaultNetwork, nil
}

// Create a bridge network for the container communications.
_, err = p.client.NetworkCreate(ctx, ReaperDefault, network.CreateOptions{
Driver: Bridge,
Expand Down
46 changes: 43 additions & 3 deletions exec/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package exec

import (
"bytes"
"fmt"
"io"
"sync"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
Expand Down Expand Up @@ -60,6 +62,43 @@ func WithEnv(env []string) ProcessOption {
})
}

// safeBuffer is a goroutine safe buffer.
type safeBuffer struct {
mtx sync.Mutex
buf bytes.Buffer
err error
}

// Error sets an error for the next read.
func (sb *safeBuffer) Error(err error) {
sb.mtx.Lock()
defer sb.mtx.Unlock()

sb.err = err
}

// Write writes p to the buffer.
// It is safe for concurrent use by multiple goroutines.
func (sb *safeBuffer) Write(p []byte) (n int, err error) {
sb.mtx.Lock()
defer sb.mtx.Unlock()

return sb.buf.Write(p)
}

// Read reads up to len(p) bytes into p from the buffer.
// It is safe for concurrent use by multiple goroutines.
func (sb *safeBuffer) Read(p []byte) (n int, err error) {
sb.mtx.Lock()
defer sb.mtx.Unlock()

if sb.err != nil {
return 0, sb.err
}

return sb.buf.Read(p)
}

// Multiplexed returns a [ProcessOption] that configures the command execution
// to combine stdout and stderr into a single stream without Docker's multiplexing headers.
func Multiplexed() ProcessOption {
Expand All @@ -73,13 +112,14 @@ func Multiplexed() ProcessOption {

done := make(chan struct{})

var outBuff bytes.Buffer
var errBuff bytes.Buffer
var outBuff safeBuffer
var errBuff safeBuffer
go func() {
defer close(done)
if _, err := stdcopy.StdCopy(&outBuff, &errBuff, opts.Reader); err != nil {
outBuff.Error(fmt.Errorf("copying output: %w", err))
return
}
close(done)
}()

<-done
Expand Down
30 changes: 22 additions & 8 deletions port_forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,25 @@ func exposeHostPorts(ctx context.Context, req *ContainerRequest, ports ...int) (
return sshdConnectHook, fmt.Errorf("new sshd container: %w", err)
}

// IP in the first network of the container
sshdIP, err := sshdContainer.ContainerIP(context.Background())
// IP in the first network of the container.
inspect, err := sshdContainer.Inspect(ctx)
if err != nil {
return sshdConnectHook, fmt.Errorf("get sshd container IP: %w", err)
return sshdConnectHook, fmt.Errorf("inspect sshd container: %w", err)
}

sshdIP := inspect.NetworkSettings.IPAddress
stevenh marked this conversation as resolved.
Show resolved Hide resolved
if sshdIP == "" {
single := len(inspect.NetworkSettings.Networks) == 1
for name, network := range inspect.NetworkSettings.Networks {
if name == sshdFirstNetwork || single {
sshdIP = network.IPAddress
break
}
}
}

if sshdIP == "" {
return sshdConnectHook, errors.New("sshd container IP not found")
}

if req.HostConfigModifier == nil {
Expand Down Expand Up @@ -166,11 +181,10 @@ func exposeHostPorts(ctx context.Context, req *ContainerRequest, ports ...int) (
func newSshdContainer(ctx context.Context, opts ...ContainerCustomizer) (*sshdContainer, error) {
req := GenericContainerRequest{
ContainerRequest: ContainerRequest{
Image: sshdImage,
HostAccessPorts: []int{}, // empty list because it does not need any port
ExposedPorts: []string{sshPort},
Env: map[string]string{"PASSWORD": sshPassword},
WaitingFor: wait.ForListeningPort(sshPort),
Image: sshdImage,
stevenh marked this conversation as resolved.
Show resolved Hide resolved
ExposedPorts: []string{sshPort},
Env: map[string]string{"PASSWORD": sshPassword},
WaitingFor: wait.ForListeningPort(sshPort),
},
Started: true,
}
Expand Down
164 changes: 79 additions & 85 deletions port_forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,101 +22,95 @@ const (
)

func TestExposeHostPorts(t *testing.T) {
tests := []struct {
name string
numberOfPorts int
hasNetwork bool
hasHostAccess bool
}{
{
name: "single port",
numberOfPorts: 1,
hasHostAccess: true,
},
{
name: "single port using a network",
numberOfPorts: 1,
hasNetwork: true,
hasHostAccess: true,
},
{
name: "multiple ports",
numberOfPorts: 3,
hasHostAccess: true,
},
{
name: "single port with cancellation",
numberOfPorts: 1,
hasHostAccess: false,
t.Run("single-port", func(t *testing.T) {
testExposeHostPorts(t, 1, false, true)
})

t.Run("single-port-using-network", func(t *testing.T) {
testExposeHostPorts(t, 1, true, true)
})

t.Run("single-port-cancellation", func(t *testing.T) {
testExposeHostPorts(t, 1, false, false)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
})

t.Run("multi-port", func(t *testing.T) {
testExposeHostPorts(t, 3, false, true)
})

t.Run("multi-port-using-network", func(t *testing.T) {
testExposeHostPorts(t, 3, false, true)
})
}

func testExposeHostPorts(t *testing.T, numberOfPorts int, hasNetwork, hasHostAccess bool) {
t.Helper()

freePorts := make([]int, numberOfPorts)
for i := range freePorts {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, expectedResponse)
}))
freePorts[i] = server.Listener.Addr().(*net.TCPAddr).Port
t.Cleanup(func() {
server.Close()
})
}

req := testcontainers.GenericContainerRequest{
// hostAccessPorts {
ContainerRequest: testcontainers.ContainerRequest{
Image: "alpine:3.17",
HostAccessPorts: freePorts,
Cmd: []string{"top"},
},
// }
Started: true,
}

for _, tc := range tests {
t.Run(tc.name, func(tt *testing.T) {
freePorts := make([]int, tc.numberOfPorts)
for i := range freePorts {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, expectedResponse)
}))
freePorts[i] = server.Listener.Addr().(*net.TCPAddr).Port
tt.Cleanup(func() {
server.Close()
})
}

req := testcontainers.GenericContainerRequest{
// hostAccessPorts {
ContainerRequest: testcontainers.ContainerRequest{
Image: "alpine:3.17",
HostAccessPorts: freePorts,
Cmd: []string{"top"},
},
// }
Started: true,
}

var nw *testcontainers.DockerNetwork
if tc.hasNetwork {
var err error
nw, err = network.New(context.Background())
require.NoError(tt, err)
testcontainers.CleanupNetwork(t, nw)

req.Networks = []string{nw.Name}
req.NetworkAliases = map[string][]string{nw.Name: {"myalpine"}}
}

ctx := context.Background()
if !tc.hasHostAccess {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
defer cancel()
}

c, err := testcontainers.GenericContainer(ctx, req)
testcontainers.CleanupContainer(t, c)
require.NoError(tt, err)

if tc.hasHostAccess {
// create a container that has host access, which will
// automatically forward the port to the container
assertContainerHasHostAccess(tt, c, freePorts...)
} else {
// force cancellation because of timeout
time.Sleep(11 * time.Second)

assertContainerHasNoHostAccess(tt, c, freePorts...)
}
})
var nw *testcontainers.DockerNetwork
if hasNetwork {
var err error
nw, err = network.New(context.Background())
require.NoError(t, err)
testcontainers.CleanupNetwork(t, nw)

req.Networks = []string{nw.Name}
req.NetworkAliases = map[string][]string{nw.Name: {"myalpine"}}
}

ctx := context.Background()
if !hasHostAccess {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
defer cancel()
}

c, err := testcontainers.GenericContainer(ctx, req)
testcontainers.CleanupContainer(t, c)
require.NoError(t, err)

if hasHostAccess {
// create a container that has host access, which will
// automatically forward the port to the container
assertContainerHasHostAccess(t, c, freePorts...)
} else {
// force cancellation because of timeout
time.Sleep(11 * time.Second)

assertContainerHasNoHostAccess(t, c, freePorts...)
}
}

func httpRequest(t *testing.T, c testcontainers.Container, port int) (int, string) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// wgetHostInternal {
code, reader, err := c.Exec(
context.Background(),
[]string{"wget", "-q", "-O", "-", fmt.Sprintf("http://%s:%d", testcontainers.HostInternal, port)},
ctx,
[]string{"wget", "-q", "-O", "-", "-T", "2", fmt.Sprintf("http://%s:%d", testcontainers.HostInternal, port)},
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
tcexec.Multiplexed(),
)
// }
Expand Down