From 89dd3d307521b501e476c9d0944749de50f26682 Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Tue, 24 Oct 2023 11:42:12 -0700 Subject: [PATCH] Run integration tests against full agent & proxy-server apps --- .gitignore | 2 + cmd/agent/app/server.go | 30 ++- cmd/server/app/options/options.go | 2 +- cmd/server/app/server.go | 47 +++-- cmd/server/main.go | 1 - pkg/server/server.go | 9 +- pkg/server/tunnel.go | 2 +- tests/agent_disconnect_test.go | 8 +- tests/benchmarks_test.go | 7 +- tests/concurrent_client_request_test.go | 4 +- tests/concurrent_test.go | 4 +- tests/framework/agent.go | 108 ++++++++--- tests/framework/certs.go | 195 ++++++++++++++++++++ tests/framework/proxy_server.go | 236 +++++++++--------------- tests/framework/util.go | 63 +++++++ tests/ha_proxy_server_test.go | 9 +- tests/main_test.go | 10 + tests/proxy_test.go | 113 +++++++----- tests/reconnect_test.go | 82 +++----- tests/tcp_server_test.go | 4 +- 20 files changed, 605 insertions(+), 331 deletions(-) create mode 100644 tests/framework/certs.go create mode 100644 tests/framework/util.go diff --git a/.gitignore b/.gitignore index b0ecc46b8..056f85aed 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ konnectivity.out konnectivity.html konnectivity-client/client.out konnectivity-client/client.html + +tests.test diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index 63ec5d8b1..319157a79 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -49,7 +49,8 @@ func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command Use: "agent", Long: `A gRPC agent, Connects to the proxy and then allows traffic to be forwarded to it.`, RunE: func(cmd *cobra.Command, args []string) error { - return a.run(o) + stopCh := make(chan struct{}) + return a.Run(o, stopCh) }, } @@ -57,35 +58,41 @@ func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command } type Agent struct { + adminServer *http.Server + healthServer *http.Server + + cs *agent.ClientSet } -func (a *Agent) run(o *options.GrpcProxyAgentOptions) error { +func (a *Agent) Run(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) error { o.Print() if err := o.Validate(); err != nil { return fmt.Errorf("failed to validate agent options with %v", err) } - stopCh := make(chan struct{}) - cs, err := a.runProxyConnection(o, stopCh) if err != nil { return fmt.Errorf("failed to run proxy connection with %v", err) } + a.cs = cs if err := a.runHealthServer(o, cs); err != nil { return fmt.Errorf("failed to run health server with %v", err) } + defer a.healthServer.Close() if err := a.runAdminServer(o); err != nil { return fmt.Errorf("failed to run admin server with %v", err) } + defer a.adminServer.Close() <-stopCh + klog.V(1).Infoln("Shutting down agent.") return nil } -func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (agent.ReadinessManager, error) { +func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (*agent.ClientSet, error) { var tlsConfig *tls.Config var err error if tlsConfig, err = util.GetClientTLSConfig(o.CaCert, o.AgentCert, o.AgentKey, o.ProxyServerHost, o.AlpnProtos); err != nil { @@ -149,7 +156,7 @@ func (a *Agent) runHealthServer(o *options.GrpcProxyAgentOptions, cs agent.Readi // "/ready" is deprecated but being maintained for backward compatibility muxHandler.HandleFunc("/ready", readinessHandler) muxHandler.HandleFunc("/readyz", readinessHandler) - healthServer := &http.Server{ + a.healthServer = &http.Server{ Addr: net.JoinHostPort(o.HealthServerHost, strconv.Itoa(o.HealthServerPort)), Handler: muxHandler, MaxHeaderBytes: 1 << 20, @@ -160,7 +167,7 @@ func (a *Agent) runHealthServer(o *options.GrpcProxyAgentOptions, cs agent.Readi "core", "healthListener", "port", strconv.Itoa(o.HealthServerPort), ) - go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveHealth(healthServer) }) + go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveHealth(a.healthServer) }) return nil } @@ -197,7 +204,7 @@ func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error { } } - adminServer := &http.Server{ + a.adminServer = &http.Server{ Addr: net.JoinHostPort(o.AdminBindAddress, strconv.Itoa(o.AdminServerPort)), Handler: muxHandler, MaxHeaderBytes: 1 << 20, @@ -208,7 +215,7 @@ func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error { "core", "adminListener", "port", strconv.Itoa(o.AdminServerPort), ) - go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveAdmin(adminServer) }) + go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveAdmin(a.adminServer) }) return nil } @@ -220,3 +227,8 @@ func (a *Agent) serveAdmin(adminServer *http.Server) { } klog.V(0).Infoln("Admin server stopped listening") } + +// ClientSet exposes internal state for testing. +func (a *Agent) ClientSet() *agent.ClientSet { + return a.cs +} diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index 9249d8fc2..f887653c0 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -221,7 +221,7 @@ func (o *ProxyRunOptions) Validate() error { return fmt.Errorf("error checking cluster CA cert %s, got %v", o.ClusterCaCert, err) } } - if o.Mode != "grpc" && o.Mode != "http-connect" { + if o.Mode != server.ModeGRPC && o.Mode != server.ModeHTTPConnect { return fmt.Errorf("mode must be set to either 'grpc' or 'http-connect' not %q", o.Mode) } if o.UdsName != "" { diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index 090786e7a..6cc570569 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -59,7 +59,8 @@ func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command { Use: "proxy", Long: `A gRPC proxy server, receives requests from the API server and forwards to the agent.`, RunE: func(cmd *cobra.Command, args []string) error { - return p.run(o) + stopCh := SetupSignalHandler() + return p.Run(o, stopCh) }, } @@ -81,11 +82,16 @@ func tlsCipherSuites(cipherNames []string) []uint16 { } type Proxy struct { + agentServer *grpc.Server + adminServer *http.Server + healthServer *http.Server + + server *server.ProxyServer } type StopFunc func() -func (p *Proxy) run(o *options.ProxyRunOptions) error { +func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { o.Print() if err := o.Validate(); err != nil { return fmt.Errorf("failed to validate server options with %v", err) @@ -126,37 +132,40 @@ func (p *Proxy) run(o *options.ProxyRunOptions) error { if err != nil { return err } - server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt) + p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt) - frontendStop, err := p.runFrontendServer(ctx, o, server) + frontendStop, err := p.runFrontendServer(ctx, o, p.server) if err != nil { return fmt.Errorf("failed to run the frontend server: %v", err) } + if frontendStop != nil { + defer frontendStop() + } klog.V(1).Infoln("Starting agent server for tunnel connections.") - err = p.runAgentServer(o, server) + err = p.runAgentServer(o, p.server) if err != nil { return fmt.Errorf("failed to run the agent server: %v", err) } + defer p.agentServer.Stop() + klog.V(1).Infoln("Starting admin server for debug connections.") - err = p.runAdminServer(o, server) + err = p.runAdminServer(o, p.server) if err != nil { return fmt.Errorf("failed to run the admin server: %v", err) } + defer p.adminServer.Close() + klog.V(1).Infoln("Starting health server for healthchecks.") - err = p.runHealthServer(o, server) + err = p.runHealthServer(o, p.server) if err != nil { return fmt.Errorf("failed to run the health server: %v", err) } + defer p.healthServer.Close() - stopCh := SetupSignalHandler() <-stopCh klog.V(1).Infoln("Shutting down server.") - if frontendStop != nil { - frontendStop() - } - return nil } @@ -379,6 +388,7 @@ func (p *Proxy) runAgentServer(o *options.ProxyRunOptions, server *server.ProxyS "port", strconv.FormatUint(uint64(o.AgentPort), 10), ) go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) }) + p.agentServer = grpcServer return nil } @@ -396,7 +406,7 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS runtime.SetBlockProfileRate(1) } } - adminServer := &http.Server{ + p.adminServer = &http.Server{ Addr: net.JoinHostPort(o.AdminBindAddress, strconv.Itoa(o.AdminPort)), Handler: muxHandler, MaxHeaderBytes: 1 << 20, @@ -408,7 +418,7 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS "port", strconv.FormatUint(uint64(o.AdminPort), 10), ) go runpprof.Do(context.Background(), labels, func(context.Context) { - err := adminServer.ListenAndServe() + err := p.adminServer.ListenAndServe() if err != nil { klog.ErrorS(err, "admin server could not listen") } @@ -438,7 +448,7 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy // "/ready" is deprecated but being maintained for backward compatibility muxHandler.HandleFunc("/ready", readinessHandler) muxHandler.HandleFunc("/readyz", readinessHandler) - healthServer := &http.Server{ + p.healthServer = &http.Server{ Addr: net.JoinHostPort(o.HealthBindAddress, strconv.Itoa(o.HealthPort)), Handler: muxHandler, MaxHeaderBytes: 1 << 20, @@ -450,7 +460,7 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy "port", strconv.FormatUint(uint64(o.HealthPort), 10), ) go runpprof.Do(context.Background(), labels, func(context.Context) { - err := healthServer.ListenAndServe() + err := p.healthServer.ListenAndServe() if err != nil { klog.ErrorS(err, "health server could not listen") } @@ -459,3 +469,8 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy return nil } + +// ProxyServer exposes internal state for testing. +func (p *Proxy) ProxyServer() *server.ProxyServer { + return p.server +} diff --git a/cmd/server/main.go b/cmd/server/main.go index f020bbe8d..fdb1a736b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -29,7 +29,6 @@ import ( ) func main() { - // flag.CommandLine.Parse(os.Args[1:]) proxy := &app.Proxy{} o := options.NewProxyRunOptions() command := app.NewProxyCommand(proxy, o) diff --git a/pkg/server/server.go b/pkg/server/server.go index 6681580cf..d65d5c4be 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -87,6 +87,11 @@ func (g *GrpcFrontend) Recv() (*client.Packet, error) { return pkt, nil } +const ( + ModeGRPC = "grpc" + ModeHTTPConnect = "http-connect" +) + type ProxyClientConnection struct { Mode string HTTP io.ReadWriter @@ -107,10 +112,10 @@ const ( func (c *ProxyClientConnection) send(pkt *client.Packet) error { defer func(start time.Time) { metrics.Metrics.ObserveFrontendWriteLatency(time.Since(start)) }(time.Now()) - if c.Mode == "grpc" { + if c.Mode == ModeGRPC { return c.frontend.Send(pkt) } - if c.Mode == "http-connect" { + if c.Mode == ModeHTTPConnect { if pkt.Type == client.PacketType_CLOSE_RSP { return c.CloseHTTP() } else if pkt.Type == client.PacketType_DIAL_CLS { diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index 0c2b8e743..5fb59a667 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -84,7 +84,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { closed := make(chan struct{}) connected := make(chan struct{}) connection := &ProxyClientConnection{ - Mode: "http-connect", + Mode: ModeHTTPConnect, HTTP: io.ReadWriter(conn), // pass as ReadWriter so the caller must close with CloseHTTP CloseHTTP: func() error { closeOnce.Do(func() { conn.Close() }) diff --git a/tests/agent_disconnect_test.go b/tests/agent_disconnect_test.go index e08d8ba8b..07c4ee1cc 100644 --- a/tests/agent_disconnect_test.go +++ b/tests/agent_disconnect_test.go @@ -29,8 +29,6 @@ import ( "testing" "time" - "google.golang.org/grpc" - "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" "sigs.k8s.io/apiserver-network-proxy/tests/framework" ) @@ -92,7 +90,7 @@ func TestProxy_Agent_Disconnect_Persistent_Connection(t *testing.T) { } } -func TestProxy_Agent_Reconnect(t *testing.T) { +func TestAgentRestartReconnect(t *testing.T) { testcases := []struct { name string proxyServerFunction func(testing.TB) framework.ProxyServer @@ -176,7 +174,7 @@ func clientRequest(c *http.Client, addr string) ([]byte, error) { } func createGrpcTunnelClient(ctx context.Context, proxyAddr, addr string) (*http.Client, error) { - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, proxyAddr, grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, proxyAddr) if err != nil { return nil, err } @@ -192,7 +190,7 @@ func createGrpcTunnelClient(ctx context.Context, proxyAddr, addr string) (*http. } func createHTTPConnectClient(ctx context.Context, proxyAddr, addr string) (*http.Client, error) { - conn, err := net.Dial("tcp", proxyAddr) + conn, err := net.Dial("unix", proxyAddr) if err != nil { return nil, err } diff --git a/tests/benchmarks_test.go b/tests/benchmarks_test.go index 109c1bbd4..51a096ea9 100644 --- a/tests/benchmarks_test.go +++ b/tests/benchmarks_test.go @@ -23,9 +23,6 @@ import ( "net/http" "net/http/httptest" "testing" - - "google.golang.org/grpc" - "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" ) func BenchmarkLargeResponse_GRPC(b *testing.B) { @@ -53,7 +50,7 @@ func BenchmarkLargeResponse_GRPC(b *testing.B) { for n := 0; n < b.N; n++ { // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { b.Fatal(err) } @@ -122,7 +119,7 @@ func BenchmarkLargeRequest_GRPC(b *testing.B) { req.Close = true for n := 0; n < b.N; n++ { // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { b.Fatal(err) } diff --git a/tests/concurrent_client_request_test.go b/tests/concurrent_client_request_test.go index 16502780a..97a646d01 100644 --- a/tests/concurrent_client_request_test.go +++ b/tests/concurrent_client_request_test.go @@ -27,9 +27,7 @@ import ( "testing" "time" - "google.golang.org/grpc" "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" ) type simpleServer struct { @@ -52,7 +50,7 @@ func (s *simpleServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // TODO: test http-connect as well. func getTestClient(front string, t *testing.T) *http.Client { ctx := context.Background() - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, front, grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, front) if err != nil { t.Fatal(err) } diff --git a/tests/concurrent_test.go b/tests/concurrent_test.go index 99fe2f711..3cd0d54b3 100644 --- a/tests/concurrent_test.go +++ b/tests/concurrent_test.go @@ -24,8 +24,6 @@ import ( "sync" "testing" - "google.golang.org/grpc" - "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" "sigs.k8s.io/apiserver-network-proxy/tests/framework" ) @@ -48,7 +46,7 @@ func TestProxy_ConcurrencyGRPC(t *testing.T) { defer wg.Done() // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { t.Error(err) return diff --git a/tests/framework/agent.go b/tests/framework/agent.go index adf849369..d7722a6fc 100644 --- a/tests/framework/agent.go +++ b/tests/framework/agent.go @@ -17,9 +17,20 @@ limitations under the License. package framework import ( + "context" + "fmt" + "log" + "net" + "path/filepath" + "strconv" + "sync" + "testing" "time" - "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/util/wait" + + agentapp "sigs.k8s.io/apiserver-network-proxy/cmd/agent/app" + agentopts "sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options" "sigs.k8s.io/apiserver-network-proxy/pkg/agent" ) @@ -29,44 +40,62 @@ type AgentOpts struct { } type AgentRunner interface { - Start(AgentOpts) (Agent, error) + Start(testing.TB, AgentOpts) (Agent, error) } type Agent interface { GetConnectedServerCount() (int, error) Ready() bool - Stop() error + Stop() } - type InProcessAgentRunner struct{} -func (*InProcessAgentRunner) Start(opts AgentOpts) (Agent, error) { - cc := agent.ClientSetConfig{ - Address: opts.ServerAddr, - AgentID: opts.AgentID, - SyncInterval: 100 * time.Millisecond, - ProbeInterval: 100 * time.Millisecond, - DialOptions: []grpc.DialOption{grpc.WithInsecure()}, +func (*InProcessAgentRunner) Start(t testing.TB, opts AgentOpts) (Agent, error) { + a := &agentapp.Agent{} + o, err := agentOptions(t, opts) + if err != nil { + return nil, err } + ctx, cancel := context.WithCancel(context.Background()) stopCh := make(chan struct{}) - client := cc.NewAgentClientSet(stopCh) - client.Serve() + go func() { + if err := a.Run(o, stopCh); err != nil { + log.Printf("ERROR running agent: %v", err) + cancel() + } + }() + + healthAddr := net.JoinHostPort(o.HealthServerHost, strconv.Itoa(o.HealthServerPort)) + if err := wait.PollImmediateWithContext(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, func(context.Context) (bool, error) { + return checkLiveness(healthAddr), nil + }); err != nil { + close(stopCh) + return nil, fmt.Errorf("agent never came up: %v", err) + } - return &inProcessAgent{ - client: client, - stopCh: stopCh, - }, nil + pa := &inProcessAgent{ + client: a.ClientSet(), + stopCh: stopCh, + healthAddr: healthAddr, + } + t.Cleanup(pa.Stop) + return pa, nil } type inProcessAgent struct { client *agent.ClientSet - stopCh chan struct{} + + stopOnce sync.Once + stopCh chan struct{} + + healthAddr string } -func (a *inProcessAgent) Stop() error { - close(a.stopCh) - return nil +func (a *inProcessAgent) Stop() { + a.stopOnce.Do(func() { + close(a.stopCh) + }) } func (a *inProcessAgent) GetConnectedServerCount() (int, error) { @@ -74,5 +103,40 @@ func (a *inProcessAgent) GetConnectedServerCount() (int, error) { } func (a *inProcessAgent) Ready() bool { - return a.client.Ready() + return checkReadiness(a.healthAddr) +} + +func agentOptions(t testing.TB, opts AgentOpts) (*agentopts.GrpcProxyAgentOptions, error) { + o := agentopts.NewGrpcProxyAgentOptions() + + host, port, err := net.SplitHostPort(opts.ServerAddr) + if err != nil { + return nil, fmt.Errorf("failed to parse ServerAddr: %w", err) + } + o.ProxyServerHost = host + if o.ProxyServerPort, err = strconv.Atoi(port); err != nil { + return nil, fmt.Errorf("invalid server port: %w", err) + } + + o.AgentID = opts.AgentID + o.SyncInterval = 100 * time.Millisecond + o.SyncIntervalCap = 1 * time.Second + o.ProbeInterval = 100 * time.Millisecond + + o.AgentCert = filepath.Join(CertsDir, TestAgentCertFile) + o.AgentKey = filepath.Join(CertsDir, TestAgentKeyFile) + o.CaCert = filepath.Join(CertsDir, TestCAFile) + + const localhost = "127.0.0.1" + o.HealthServerHost = localhost + o.AdminBindAddress = localhost + + ports, err := FreePorts(2) + if err != nil { + return nil, err + } + o.HealthServerPort = ports[0] + o.AdminServerPort = ports[1] + + return o, nil } diff --git a/tests/framework/certs.go b/tests/framework/certs.go new file mode 100644 index 000000000..7456d719a --- /dev/null +++ b/tests/framework/certs.go @@ -0,0 +1,195 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "os" + "path/filepath" +) + +var CertsDir string // Initialized with InitCertsDir + +// The certificates & keys in this file are insecure and for testing use only. +// Use `make certs` to generate certificates & keys for other uses. + +const testCA = `-----BEGIN CERTIFICATE----- +MIIDSDCCAjCgAwIBAgIUWvgHsWuJgLnna8Yl8I+rBui0XGIwDQYJKoZIhvcNAQEL +BQAwFTETMBEGA1UEAwwKMTI3LjAuMC4xQDAeFw0yMzEwMDYyMDAxMzdaFw0zMzEw +MDMyMDAxMzdaMBUxEzARBgNVBAMMCjEyNy4wLjAuMUAwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQCCJ3zvMr+obQNH6W268sOWO089pl98cQ3r5CxkJrNk +I59dOAtaEnCYSo+WNUZI+7hsEUqHb/bnJQu4ydTA+Y5LEwwtW3kcDterVakPd7oG +i6XqxoQat+5wrXXhASmZnZv6ilgqOyDCJhJsT4QljdG4bHLJFfW8wWdrHZpII87a +OP3yvUPX0grQkBNKiwvxvmTpa4R2EysaEiBSemGF0kIF5FI7lfkjGXjQBNeFW74s +KW+aToflU2Tus5U9CXpOzjwYJZVKvAEV4FtjCjCIwIPc2VLx3nv89o7ui4ax1y3I +UrtlaPe7XWfWEUo9hMgo33+1SrMwWcJMRiM3KRtqD7XPAgMBAAGjgY8wgYwwHQYD +VR0OBBYEFNXp9CUZJqP3z9oVX92PGy0prsriMFAGA1UdIwRJMEeAFNXp9CUZJqP3 +z9oVX92PGy0prsrioRmkFzAVMRMwEQYDVQQDDAoxMjcuMC4wLjFAghRa+Aexa4mA +uedrxiXwj6sG6LRcYjAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIBBjANBgkqhkiG +9w0BAQsFAAOCAQEAH2xugrzi0T7CK17QEdsA5iING2Fvjf7Oe7L4c/knvlkaSeJZ +kEYJEGbzD8mMJPPz7Q8/zJMlFxbKOVrNxipUaFb2VJDcZg4fKbRwkUjIB+nvCrfy +fLei2XRgjW5fG94G0Z+E0hD6KQxY6wEAfyZnpsBJdETo4bYAkd5h4h6dwgFx66NF +ERjoxlSBVsnqksGID1/Q3pNdRP1si9OyLTs+WFVCbGW2O4+czY6EeGw9BEMtS35M +ZFow5H5OAshlvxbUftmK0TEnFhs509sKjgJVZMpSIr7ImqLPmRytJJ8zCoNQevuc +zVji1R3oGpF5D2FBzbtMuT3ydGX9DPxgt1s8Kw== +-----END CERTIFICATE----- +` + +const testServerCert = `-----BEGIN CERTIFICATE----- +MIIDczCCAlugAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDDAoxMjcu +MC4wLjFAMB4XDTIzMTAwNjIwMDEzN1oXDTMzMTAwMzIwMDEzN1owGTEXMBUGA1UE +AwwOcHJveHktZnJvbnRlbmQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB +AQC5MCsFMaac22FuamGZcUhd2WoMTasFt//o3ondhRKhabFgUIoiQqVGUz/Pe/vI +Et5ixDU3wntGRBcceBMkOO9kWdFUQ1Rzxk1amg926ROA0LND4w2tVCsFHbCfny7A +PmnV40m3ayzIknGvnGNl52RP2vRTr1zocMkd3brRBESeAt2C/Fr8MjcsGLBIjAab +mp1P6bozOgjEZOA4gMCVVIl5VvZ2O1xgoVU7Q5RQBu+xydBmMcdKMjQh4IkxEuSH +HEJ9YVVZEmi11yqv+t6cTf2rnj32iW5oLJRbY+mzRPnfpTDsAM52gUXVJSGujohf +VTIZ9OrW3/8YXx2O7CBJFlWXAgMBAAGjgckwgcYwCQYDVR0TBAIwADAdBgNVHQ4E +FgQUK+SBt6V3wln+gF7YNxLeaIydk7kwUAYDVR0jBEkwR4AU1en0JRkmo/fP2hVf +3Y8bLSmuyuKhGaQXMBUxEzARBgNVBAMMCjEyNy4wLjAuMUCCFFr4B7FriYC552vG +JfCPqwbotFxiMBMGA1UdJQQMMAoGCCsGAQUFBwMBMAsGA1UdDwQEAwIFoDAmBgNV +HREEHzAdggprdWJlcm5ldGVzgglsb2NhbGhvc3SHBH8AAAEwDQYJKoZIhvcNAQEL +BQADggEBAETRBhlXT42bBm77k+C+lqc6EI+swinal1PmeLiOjm1o/66l4wF/XD3V +z167HsQlJ4cP6wMDOHhP7VLVxywhbwO43mXt0Q3SZ2vyJBjosmOC+8g1XLBL5MQT +NJjBjFf2mObWB6DM5XCfRLbMKA+odqoWl4sRvhqg8LuxhScb4Ul/IyT4HpULBhNT +oSjj8gGCbUT3qsERlopx6KUC1doHw3b/faKMfY0zSPfn5HIPWBdhE1FumXGuJ/oi +bauj4tNadWNkXrUQ01/aRnOb0zTCCbmvCbfWsjkdw6Oku9yfEvgndClY56O5lXOK +QvPH9fTYK2ruBXC9zikrAXTs6QpBAgY= +-----END CERTIFICATE----- +` + +const testServerKey = `-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC5MCsFMaac22Fu +amGZcUhd2WoMTasFt//o3ondhRKhabFgUIoiQqVGUz/Pe/vIEt5ixDU3wntGRBcc +eBMkOO9kWdFUQ1Rzxk1amg926ROA0LND4w2tVCsFHbCfny7APmnV40m3ayzIknGv +nGNl52RP2vRTr1zocMkd3brRBESeAt2C/Fr8MjcsGLBIjAabmp1P6bozOgjEZOA4 +gMCVVIl5VvZ2O1xgoVU7Q5RQBu+xydBmMcdKMjQh4IkxEuSHHEJ9YVVZEmi11yqv ++t6cTf2rnj32iW5oLJRbY+mzRPnfpTDsAM52gUXVJSGujohfVTIZ9OrW3/8YXx2O +7CBJFlWXAgMBAAECggEAMv80QaRoJPb28ECkaux6yLloDkZPK+59LyQlZBbSyBeC +jKrxNzkSKXkgb+NNNU4Y5qrwms/YQcPbd3ALmWSCbCid0C4QcidwQtx9GLpbsBQI +4c+DgzFT/X8tFe/woGkvnQKP2M5PUVaerwUKjFP52FHMCcWXeL0ibTKT0R5zRO2+ +Bfo2BmQlzskooq2qhGgzLZ6mXtnDQVTIeRdoB4kgtqZzzPzWO6S8oeSh4vH7gmoO +NzF9mUlYvbvYuQ2tZp+mLqb5qub5kLCIa1pGESnHWy58hV4X1uRSwGvH4WyR3A93 +4OpD/Jv1KISgXFJW0QTBH4Ll2cvcqq1XmTNvWhV0wQKBgQDj3WoDyPkJiW+KfOIz +yQ+tPS+F8dyEe/TQltRHUUbJ69VpPhb5hAUjeOzX5FDjJl4P/4v1K5SCpNelFGeP +eXvAGX4kGh0zCX7SrEdQfsxR2oRWzew9zOJ4wdrz3H3cCBl5P2WZvaj/kaUoHmOW +HE/YLM/NWhNwgFdnBCJhJgBdVwKBgQDQDciyW3EeyOy3zxsizWXNAPo/o3yYFzdK +CghuleP/trXk6uk9VD69tiZ7hGMunjGMrgBbr9VjtkpXHdkX0C9TFjPFiRQVVF6g +SZ9g368uyulozXfRLbsfguN2SyS3zUV2CSj0uNh58kCiZ8uF9oOkv1IarRhh4DHz +klOqIt5hwQKBgQDKEsX8e1LW8Um4j81uTUUYxeUKLRX5a5ANF2VDpcFYOkt03Ho1 +Zq3D6m5nevN8rb7HA0IT90TpotQWcoTwiLSFBFaIH5x7cVVF8UABE6GQiW/JJy71 +E2hX3NqWXphC8+/bRayNbdOcaYYEkQaRzaPFOuBB5TrODxLzqYfvjWrPWwKBgQCP +gtKLZNP0njfa2jsnmHK+JAx6VTUeW/VBVwZV8YKh4tA5JWjZawEUL08AKGOZxnj7 +RxLsK6+P5jAFQ4t6B5p9P3VarqFxzQ6wldggJGtcZY73QbOCUH8gz1JDSLX9KtTd +BJiBpfd8toOrAtm6gD5yJ55k1D1bViBemPKpCwBGgQKBgCN+2x414z/Ok4Bc79+M +nnL5xgORgGCcqWhfl9nEfcG+NqVu2WnyRoGmk5OPRavfkTnmqqnZhs508+WVKL6O +gBiLDxqRqqdCGSDEN06iUHseTiFmT7nSlOKVhOdUTVoxEXJYe7qD3Rj0U+QC1CNb +ZSwx3ZMKdQL8ikQf7cEZuV9O +-----END PRIVATE KEY----- +` + +const testAgentCert = `-----BEGIN CERTIFICATE----- +MIIDSDCCAjCgAwIBAgIBAjANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDDAoxMjcu +MC4wLjFAMB4XDTIzMTAwNjIwMDEzOFoXDTMzMTAwMzIwMDEzOFowFjEUMBIGA1UE +AwwLcHJveHktYWdlbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDB +5emcdqRr2VJ23plXWRhDbfDe/lyqZ44lkfrjGhh6fIRIWfjnCYhhPSLiWiG0LIf1 +RGSyLJ5jQcs8pwAMwE3KUc0tA4/whF088QXExOtnSlIvZG2pTXmeuMXxidlcv3jE +mI2Y0gcvvFhfkRuxckJSuvfaOjVbL1dBJe0W9m3oVqZLLYuZ8KGMvJBGWipjGlaE +EX3oP8S2ef7JwVMJ4vJRC+yGOIGoEDkUGFxiZLGxnq8qGCYEU8upOCCJt7LCvwQh +SB3/PQreq1q8qcc/sKsTF8QV09k+VbQ77n8HFGttDnbHYRPX6C9fNyU2bqwJOvd0 +giUKAwyL7D/MnF6K9Z11AgMBAAGjgaEwgZ4wCQYDVR0TBAIwADAdBgNVHQ4EFgQU +9++eLNBpGZ6qurgprQQTaduj1PEwUAYDVR0jBEkwR4AU1en0JRkmo/fP2hVf3Y8b +LSmuyuKhGaQXMBUxEzARBgNVBAMMCjEyNy4wLjAuMUCCFFr4B7FriYC552vGJfCP +qwbotFxiMBMGA1UdJQQMMAoGCCsGAQUFBwMCMAsGA1UdDwQEAwIHgDANBgkqhkiG +9w0BAQsFAAOCAQEAPra+ZeyI5X86PZuHOSq/s8xWMEAo34B//N7ipv4yyYUtlcOl +WNBtRWi9gtnQz1NAZplsjxMDSKCTuZScNtUUMJLpoTPzfE2UdvLN4eZ2hJGKZLXD +qlljvKGTcFyzcxSXcO3lqWJP6jhnb5JIgiK3qqW/UXTY8DEN1h9P+v9lcP7oOjTP +smXGG+fREUlt0dyTkJWcP4m/84XmhRCbktQ7nYnk4f3Yq0eq8bkZ+BCAoMePrYf9 +nTXUWUjxbwRWvbtd8bKm2BkWLeVyNYxxghUZg0wycIV556lkNARO+mN6Q3hDGx7n +zAAE7B/05pCD7R7zER4/I0S/rgZbyNFpq/N6tA== +-----END CERTIFICATE----- +` + +const testAgentKey = `-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDB5emcdqRr2VJ2 +3plXWRhDbfDe/lyqZ44lkfrjGhh6fIRIWfjnCYhhPSLiWiG0LIf1RGSyLJ5jQcs8 +pwAMwE3KUc0tA4/whF088QXExOtnSlIvZG2pTXmeuMXxidlcv3jEmI2Y0gcvvFhf +kRuxckJSuvfaOjVbL1dBJe0W9m3oVqZLLYuZ8KGMvJBGWipjGlaEEX3oP8S2ef7J +wVMJ4vJRC+yGOIGoEDkUGFxiZLGxnq8qGCYEU8upOCCJt7LCvwQhSB3/PQreq1q8 +qcc/sKsTF8QV09k+VbQ77n8HFGttDnbHYRPX6C9fNyU2bqwJOvd0giUKAwyL7D/M +nF6K9Z11AgMBAAECggEAManDxjGVN5J4Tr4BJKBLWKoGMfeQoIzZmcHkMtryPh06 +fJWe7P5CEjXog3V2gIGPaUDVUdWf0+h8N9LGbn2q7xE4rjjlW0Nr5joNsjKF4PTm +TAE7HUwcxIyrFoyqQdlBA4nXarcQ5Ccns4KlRzPuzOXaqeiS1gIwJR2jtmf0CrgE +ykFyLGhfSvc/fiCr7EDxr/eOANx7mf6ONsPHOIEA/mvs/V48OndsZdgryrVvsvXJ +/Mn2iw/S6CbmQtTjW+2zbknh3wbRiVC3ZToCtg2xIYmh04TjW+eb3Ly84BAflMbB ++nTuT8cl7qe407RaAX5YOxLyog86yPKUDXqRl1CZmQKBgQDfWMJ4uIopTNBpjgIK +yEq4HxYLLYcHanjjAhG15gwATmqr6h4q7uotR1Lstz2n//xrysDFTc90wB3eiciI +xcdmXrx+oDjVoiq1/lui2lqFgQkOPvkwaCa5u82qjrTW567jB7HrwhJ4KDf4Z3Z2 +vjgzHGDldrzZKKujm9Rbylnt3wKBgQDePvlEBVKl84H32UDCKgiJHvZLOTRTLK1B +pJQpoEhjdFs6y3QFB3toK4AkYI6tFb94IaMnalF+aRccHu/VB1olbmbla2O6+92h +0OuXqRMOP6Fm+Z11u/+VI2y5T8KvcLAmoEDxSyjtsZaIK2bnJnbr2kry0WCpytCn +ag3IOLh3KwKBgQC1OzXadYwOxTjcXhIEI9CVpQvjGBdYiin7soMigcA9Q2RFiZzf +I6y7/wMn9+y89Pgjk4tmzqPHTdku6cjiSvJpe/giG+riV0unD/XVqK8JY9IwUCMu +B2VdEypo+pF9TNRZfrX94yXPgHsiQvoaknHR73Yk3HuTDvBvuxPPQ9xDKwKBgE7P +dgUw/gXrPANv/w7baPt3B0/VkUCNb0L/4aqBNCpQcKmAzDucU561DlPYCcBHHgaz +pu+rPArfqVpHfjTEzqrHY6WnV05PUmC3fVPimOdMmSezDKtbZ16zmTJ9nkQoac7I +tT7bsD/Z4c+X1H3Tngg0+K7yoJyVVziG2yxNMNzRAoGAFpFz1nci58g7UobjvM9q +TZoVJGWe0YlufRJ7TiWnHGqptgHr+Eot4a8PfKLikhDMXQUSf5VjqK23IrX+pW2R +DHNQLqjuhU8espwRnA1qzWd3Ss1LccVe/v+2EQcVio95mK4iEGlNXT1bkMcDGhFz +UMKSzlENDkz1zDPyaQmd9wI= +-----END PRIVATE KEY----- +` + +const ( + TestCAFile = "anp-test-ca.crt" + TestServerCertFile = "anp-test-server.crt" + TestServerKeyFile = "anp-test-server.key" + TestAgentCertFile = "anp-test-agent.crt" + TestAgentKeyFile = "anp-test-agent.key" +) + +// InitCertsDir writes the above certificates & keys to a temporary directory. +func InitCertsDir() error { + dir, err := os.MkdirTemp("", "anp-testing") + if err != nil { + return fmt.Errorf("failed to create temp directory: %w", err) + } + CertsDir = dir + defer func() { + if r := recover(); r != nil && err == nil { + err = fmt.Errorf("panic: %v", r) + } else if err == nil { + return + } + // Something failed; clean up the temp directory. + os.RemoveAll(dir) + }() + + for filename, contents := range map[string]string{ + TestCAFile: testCA, + TestServerCertFile: testServerCert, + TestServerKeyFile: testServerKey, + TestAgentCertFile: testAgentCert, + TestAgentKeyFile: testAgentKey, + } { + if err := os.WriteFile(filepath.Join(dir, filename), []byte(contents), 0600); err != nil { + return fmt.Errorf("failed to write test %s: %w", filename, err) + } + } + + return nil +} diff --git a/tests/framework/proxy_server.go b/tests/framework/proxy_server.go index 386ad258b..ad9c6c64e 100644 --- a/tests/framework/proxy_server.go +++ b/tests/framework/proxy_server.go @@ -19,210 +19,146 @@ package framework import ( "context" "fmt" + "log" "net" - "net/http" - "sync/atomic" + "path/filepath" + "strconv" + "sync" + "testing" "time" "github.com/google/uuid" - "google.golang.org/grpc" - clientproto "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" + "k8s.io/apimachinery/pkg/util/wait" + serverapp "sigs.k8s.io/apiserver-network-proxy/cmd/server/app" + serveropts "sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options" "sigs.k8s.io/apiserver-network-proxy/pkg/server" - agentproto "sigs.k8s.io/apiserver-network-proxy/proto/agent" ) type ProxyServerOpts struct { ServerCount int Mode string + AgentPort int // Defaults to random port. } type ProxyServerRunner interface { - Start(ProxyServerOpts) (ProxyServer, error) + Start(testing.TB, ProxyServerOpts) (ProxyServer, error) } type ProxyServer interface { - ConnectedBackends() int - FrontendConnectionCount() int + ConnectedBackends() (int, error) AgentAddr() string FrontAddr() string Ready() bool - Stop() error + Stop() } type InProcessProxyServerRunner struct{} -func (*InProcessProxyServerRunner) Start(opts ProxyServerOpts) (ProxyServer, error) { - switch opts.Mode { - case "http": - return startHTTP(opts) - case "grpc": - return startGRPC(opts) - default: - panic("must specify proxy server mode") - } -} - -func startGRPC(opts ProxyServerOpts) (ProxyServer, error) { - var err error - - commonServer, err := startInProcessCommonProxyServer(opts) +func (*InProcessProxyServerRunner) Start(t testing.TB, opts ProxyServerOpts) (ProxyServer, error) { + s := &serverapp.Proxy{} + o, err := serverOptions(t, opts) if err != nil { - return nil, err + return nil, fmt.Errorf("error building server options: %w", err) } - ps := &inProcessGRPCProxyServer{ - inProcessCommonProxyServer: commonServer, - grpcServer: grpc.NewServer(), - } + ctx, cancel := context.WithCancel(context.Background()) + stopCh := make(chan struct{}) + go func() { + if err := s.Run(o, stopCh); err != nil { + log.Printf("ERROR running proxy server: %v", err) + cancel() + } + }() - clientproto.RegisterProxyServiceServer(ps.grpcServer, ps.proxyServer) - ps.grpcListener, err = net.Listen("tcp", "") - if err != nil { - return ps, err + healthAddr := net.JoinHostPort(o.HealthBindAddress, strconv.Itoa(o.HealthPort)) + if err := wait.PollImmediateWithContext(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, func(context.Context) (bool, error) { + return checkLiveness(healthAddr), nil + }); err != nil { + close(stopCh) + return nil, fmt.Errorf("server never came up: %v", err) } - go ps.grpcServer.Serve(ps.grpcListener) + ps := &inProcessProxyServer{ + proxyServer: s.ProxyServer(), + stopCh: stopCh, + mode: opts.Mode, + healthAddr: healthAddr, + agentAddr: net.JoinHostPort(o.AgentBindAddress, strconv.Itoa(o.AgentPort)), + frontAddr: o.UdsName, + } + t.Cleanup(ps.Stop) return ps, nil } -type inProcessGRPCProxyServer struct { - *inProcessCommonProxyServer +type inProcessProxyServer struct { + proxyServer *server.ProxyServer - grpcServer *grpc.Server - grpcListener net.Listener -} + stopOnce sync.Once + stopCh chan struct{} -func (ps *inProcessGRPCProxyServer) Stop() error { - ps.stopCommonProxyServer() + mode string - if ps.grpcListener != nil { - ps.grpcListener.Close() - } - ps.grpcServer.Stop() - return nil + healthAddr string + agentAddr string + frontAddr string } -func (ps *inProcessGRPCProxyServer) FrontAddr() string { - return ps.grpcListener.Addr().String() +func (ps *inProcessProxyServer) Stop() { + ps.stopOnce.Do(func() { + close(ps.stopCh) + }) } -func (ps *inProcessGRPCProxyServer) FrontendConnectionCount() int { - panic("unimplemented: inProcessGRPCProxyServer.FrontendConnectionCount") // FIXME: consider reading from metrics -} - -func startHTTP(opts ProxyServerOpts) (ProxyServer, error) { - var err error - - commonServer, err := startInProcessCommonProxyServer(opts) - if err != nil { - return nil, err - } - - ps := &inProcessHTTPProxyServer{ - inProcessCommonProxyServer: commonServer, - } - - // http-connect - handler := &server.Tunnel{ - Server: ps.proxyServer, - } - ps.httpServer = &http.Server{ - ReadHeaderTimeout: 60 * time.Second, - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddInt32(&ps.httpConnectionCount, 1) - defer atomic.AddInt32(&ps.httpConnectionCount, -1) - handler.ServeHTTP(w, r) - }), - } - ps.httpListener, err = net.Listen("tcp", "") - if err != nil { - return ps, err - } - - go func() { - err := ps.httpServer.Serve(ps.httpListener) - if err != nil { - fmt.Println("http connect server error: ", err) - } - }() - - return ps, nil +func (ps *inProcessProxyServer) AgentAddr() string { + return ps.agentAddr } -type inProcessHTTPProxyServer struct { - *inProcessCommonProxyServer - - httpServer *http.Server - httpListener net.Listener - - httpConnectionCount int32 // atomic +func (ps *inProcessProxyServer) FrontAddr() string { + return ps.frontAddr } -func (ps *inProcessHTTPProxyServer) Stop() error { - ps.stopCommonProxyServer() - - if ps.httpListener != nil { - ps.httpListener.Close() +func (ps *inProcessProxyServer) ConnectedBackends() (int, error) { + numBackends := 0 + for _, bm := range ps.proxyServer.BackendManagers { + numBackends += bm.NumBackends() } - ps.httpServer.Shutdown(context.Background()) - - return nil + return numBackends, nil } -func (ps *inProcessHTTPProxyServer) FrontAddr() string { - return ps.httpListener.Addr().String() +func (ps *inProcessProxyServer) Ready() bool { + return checkReadiness(ps.healthAddr) } -func (ps *inProcessHTTPProxyServer) FrontendConnectionCount() int { - return int(atomic.LoadInt32(&ps.httpConnectionCount)) -} +func serverOptions(t testing.TB, opts ProxyServerOpts) (*serveropts.ProxyRunOptions, error) { + o := serveropts.NewProxyRunOptions() -// inProcessCommonProxyServer handles the common agent (backend) serving shared between the -// inProccessGRPCProxyServer and inProcessHTTPProxyServer -type inProcessCommonProxyServer struct { - proxyServer *server.ProxyServer + o.ServerCount = uint(opts.ServerCount) + o.Mode = opts.Mode - agentServer *grpc.Server - agentListener net.Listener -} + uid := uuid.New().String() + o.UdsName = filepath.Join(CertsDir, fmt.Sprintf("server-%s.sock", uid)) + o.ServerPort = 0 // Required for UDS -func startInProcessCommonProxyServer(opts ProxyServerOpts) (*inProcessCommonProxyServer, error) { - s := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, opts.ServerCount, &server.AgentTokenAuthenticationOptions{}) - ps := &inProcessCommonProxyServer{ - proxyServer: s, - agentServer: grpc.NewServer(), - } - agentproto.RegisterAgentServiceServer(ps.agentServer, s) - var err error - ps.agentListener, err = net.Listen("tcp", "") - if err != nil { - return ps, err - } - go ps.agentServer.Serve(ps.agentListener) + o.ClusterCert = filepath.Join(CertsDir, TestServerCertFile) + o.ClusterKey = filepath.Join(CertsDir, TestServerKeyFile) + o.ClusterCaCert = filepath.Join(CertsDir, TestCAFile) - return ps, nil -} + const localhost = "127.0.0.1" + o.AgentBindAddress = localhost + o.HealthBindAddress = localhost + o.AdminBindAddress = localhost -func (ps *inProcessCommonProxyServer) stopCommonProxyServer() { - if ps.agentListener != nil { - ps.agentListener.Close() + ports, err := FreePorts(3) + if err != nil { + return nil, err } - ps.agentServer.Stop() -} - -func (ps *inProcessCommonProxyServer) AgentAddr() string { - return ps.agentListener.Addr().String() -} - -func (ps *inProcessCommonProxyServer) ConnectedBackends() int { - numBackends := 0 - for _, bm := range ps.proxyServer.BackendManagers { - numBackends += bm.NumBackends() + if opts.AgentPort != 0 { + o.AgentPort = opts.AgentPort + } else { + o.AgentPort = ports[0] } - return numBackends -} + o.HealthPort = ports[1] + o.AdminPort = ports[2] -func (ps *inProcessCommonProxyServer) Ready() bool { - ready, _ := ps.proxyServer.Readiness.Ready() - return ready + return o, nil } diff --git a/tests/framework/util.go b/tests/framework/util.go new file mode 100644 index 000000000..61c4ba79f --- /dev/null +++ b/tests/framework/util.go @@ -0,0 +1,63 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "net" + "net/http" + "strconv" +) + +func checkReadiness(addr string) bool { + resp, err := http.Get(fmt.Sprintf("http://%s/readyz", addr)) + if err != nil { + return false + } + resp.Body.Close() + return resp.StatusCode == http.StatusOK +} + +func checkLiveness(addr string) bool { + resp, err := http.Get(fmt.Sprintf("http://%s/healthz", addr)) + if err != nil { + return false + } + resp.Body.Close() + return resp.StatusCode == http.StatusOK +} + +// FreePorts finds [count] available ports. +func FreePorts(count int) ([]int, error) { + ports := make([]int, count) + for i := 0; i < count; i++ { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to reserve ports: %w", err) + } + defer l.Close() + _, p, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + return nil, fmt.Errorf("failed to reserve ports: %w", err) + } + ports[i], err = strconv.Atoi(p) + if err != nil { + return nil, fmt.Errorf("failed to reserve ports: %w", err) + } + } + return ports, nil +} diff --git a/tests/ha_proxy_server_test.go b/tests/ha_proxy_server_test.go index 77e59bc42..ea0ae9265 100644 --- a/tests/ha_proxy_server_test.go +++ b/tests/ha_proxy_server_test.go @@ -28,8 +28,6 @@ import ( "testing" "time" - "google.golang.org/grpc" - "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" "sigs.k8s.io/apiserver-network-proxy/tests/framework" ) @@ -90,6 +88,7 @@ func (lb *tcpLB) addBackend(backend string) { lb.mu.Lock() defer lb.mu.Unlock() lb.backends = append(lb.backends, backend) + } func (lb *tcpLB) removeBackend(backend string) { @@ -175,8 +174,9 @@ func TestBasicHAProxyServer_GRPC(t *testing.T) { } func testProxyServer(t *testing.T, front string, target string) { - ctx := context.Background() - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, front, grpc.WithInsecure()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Close the tunnel + tunnel, err := createSingleUseGrpcTunnel(ctx, front) if err != nil { t.Fatal(err) } @@ -194,6 +194,7 @@ func testProxyServer(t *testing.T, front string, target string) { } data, err := io.ReadAll(r.Body) + r.Body.Close() if err != nil { t.Error(err) } diff --git a/tests/main_test.go b/tests/main_test.go index 04db0dd37..e89a146c1 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -18,6 +18,8 @@ package tests import ( "flag" + "log" + "os" "testing" "github.com/prometheus/client_golang/prometheus" @@ -38,5 +40,13 @@ func TestMain(m *testing.M) { fs.Set("v", "1") // Set klog verbosity. metricsclient.Metrics.RegisterMetrics(prometheus.DefaultRegisterer) + err := framework.InitCertsDir() + if err != nil { + log.Fatalf("Failed to write test certs: %v", err) + } + defer func() { + os.RemoveAll(framework.CertsDir) + }() + m.Run() } diff --git a/tests/proxy_test.go b/tests/proxy_test.go index 60348ad7a..f76194c57 100644 --- a/tests/proxy_test.go +++ b/tests/proxy_test.go @@ -26,6 +26,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "path/filepath" "strings" "sync" "testing" @@ -34,14 +35,18 @@ import ( "github.com/google/uuid" "go.uber.org/goleak" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" metricsclient "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics" clientmetricstest "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics/testing" metricsagent "sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics" + "sigs.k8s.io/apiserver-network-proxy/pkg/server" metricsserver "sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics" metricstest "sigs.k8s.io/apiserver-network-proxy/pkg/testing/metrics" + "sigs.k8s.io/apiserver-network-proxy/pkg/util" agentproto "sigs.k8s.io/apiserver-network-proxy/proto/agent" "sigs.k8s.io/apiserver-network-proxy/proto/header" "sigs.k8s.io/apiserver-network-proxy/tests/framework" @@ -118,7 +123,6 @@ func (s *delayedServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func TestBasicProxy_GRPC(t *testing.T) { expectCleanShutdown(t) - ctx := context.Background() server := httptest.NewServer(newEchoServer("hello")) defer server.Close() @@ -130,7 +134,9 @@ func TestBasicProxy_GRPC(t *testing.T) { waitForConnectedServerCount(t, 1, a) // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { t.Fatal(err) } @@ -143,18 +149,18 @@ func TestBasicProxy_GRPC(t *testing.T) { req, err := http.NewRequest("GET", server.URL, nil) if err != nil { - t.Error(err) + t.Fatal(err) } r, err := c.Do(req) if err != nil { - t.Error(err) + t.Fatal(err) } defer r.Body.Close() data, err := io.ReadAll(r.Body) if err != nil { - t.Error(err) + t.Fatal(err) } if string(data) != "hello" { t.Errorf("expect %v; got %v", "hello", string(data)) @@ -175,7 +181,7 @@ func TestProxyHandleDialError_GRPC(t *testing.T) { waitForConnectedServerCount(t, 1, a) // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { t.Fatal(err) } @@ -221,7 +227,7 @@ func TestProxyHandle_DoneContext_GRPC(t *testing.T) { // run test client ctx, cancel := context.WithTimeout(context.Background(), -time.Second) defer cancel() - _, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + _, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err == nil { t.Error("Expected error when context is cancelled, did not receive error") } else if !strings.Contains(err.Error(), "context deadline exceeded") { @@ -248,7 +254,7 @@ func TestProxyHandle_RequestDeadlineExceeded_GRPC(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent(), goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport")) // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(context.Background(), ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(context.Background(), ps.FrontAddr()) if err != nil { t.Fatal(err) } @@ -306,7 +312,7 @@ func TestProxyDial_RequestCancelled_GRPC(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent(), goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport")) // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(context.Background(), ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(context.Background(), ps.FrontAddr()) if err != nil { t.Fatal(err) } @@ -360,9 +366,10 @@ func TestProxyDial_RequestCancelled_Concurrent_GRPC(t *testing.T) { defer wg.Done() // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(context.Background(), ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(context.Background(), ps.FrontAddr()) if err != nil { t.Error(err) + return } ctx, cancel := context.WithCancel(context.Background()) @@ -381,6 +388,7 @@ func TestProxyDial_RequestCancelled_Concurrent_GRPC(t *testing.T) { req, err := http.NewRequestWithContext(ctx, "GET", server.URL, nil) if err != nil { t.Error(err) + return } c.Do(req) // Errors are expected. @@ -439,7 +447,7 @@ func TestProxyDial_AgentTimeout_GRPC(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent(), goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport")) // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(context.Background(), ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(context.Background(), ps.FrontAddr()) if err != nil { t.Fatal(err) } @@ -487,7 +495,7 @@ func TestProxyHandle_TunnelContextCancelled_GRPC(t *testing.T) { // run test client ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { t.Fatal(err) } @@ -535,7 +543,7 @@ func TestProxy_LargeResponse(t *testing.T) { waitForConnectedServerCount(t, 1, a) // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { t.Fatal(err) } @@ -558,6 +566,7 @@ func TestProxy_LargeResponse(t *testing.T) { } data, err := io.ReadAll(r.Body) + r.Body.Close() if err != nil { t.Error(err) } @@ -580,7 +589,7 @@ func TestBasicProxy_HTTPCONN(t *testing.T) { defer a.Stop() waitForConnectedServerCount(t, 1, a) - conn, err := net.Dial("tcp", ps.FrontAddr()) + conn, err := net.Dial("unix", ps.FrontAddr()) if err != nil { t.Error(err) } @@ -622,6 +631,7 @@ func TestBasicProxy_HTTPCONN(t *testing.T) { } data, err := io.ReadAll(r.Body) + r.Body.Close() if err != nil { t.Error(err) } @@ -642,7 +652,7 @@ func TestFailedDNSLookupProxy_HTTPCONN(t *testing.T) { defer a.Stop() waitForConnectedServerCount(t, 1, a) - conn, err := net.Dial("tcp", ps.FrontAddr()) + conn, err := net.Dial("unix", ps.FrontAddr()) if err != nil { t.Error(err) } @@ -689,6 +699,7 @@ func TestFailedDNSLookupProxy_HTTPCONN(t *testing.T) { } body, err := io.ReadAll(resp.Body) + resp.Body.Close() if !strings.Contains(err.Error(), "connection reset by peer") { t.Error(err) } @@ -697,14 +708,6 @@ func TestFailedDNSLookupProxy_HTTPCONN(t *testing.T) { t.Errorf("Unexpected error: %v", err) } - err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - return ps.FrontendConnectionCount() == 0, nil - }) - - if err != nil { - t.Errorf("while waiting for connection to be closed: %v", err) - } - if err := metricstest.ExpectServerDialFailure(metricsserver.DialFailureErrorResponse, 1); err != nil { t.Error(err) } @@ -727,7 +730,7 @@ func TestFailedDial_HTTPCONN(t *testing.T) { defer a.Stop() waitForConnectedServerCount(t, 1, a) - conn, err := net.Dial("tcp", ps.FrontAddr()) + conn, err := net.Dial("unix", ps.FrontAddr()) if err != nil { t.Error(err) } @@ -737,17 +740,17 @@ func TestFailedDial_HTTPCONN(t *testing.T) { // Send HTTP-Connect request _, err = fmt.Fprintf(conn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", serverURL.Host, "127.0.0.1") if err != nil { - t.Error(err) + t.Fatal(err) } // Parse the HTTP response for Connect br := bufio.NewReader(conn) res, err := http.ReadResponse(br, nil) if err != nil { - t.Errorf("reading HTTP response from CONNECT: %v", err) + t.Fatalf("reading HTTP response from CONNECT: %v", err) } if res.StatusCode != 200 { - t.Errorf("expect 200; got %d", res.StatusCode) + t.Fatalf("expect 200; got %d", res.StatusCode) } dialer := func(network, addr string) (net.Conn, error) { @@ -762,11 +765,14 @@ func TestFailedDial_HTTPCONN(t *testing.T) { resp, err := c.Get(server.URL) if err != nil { - t.Error(err) + t.Fatal(err) } body, err := io.ReadAll(resp.Body) - if !strings.Contains(err.Error(), "connection reset by peer") { + resp.Body.Close() + if err == nil { + t.Fatalf("Expected error reading response body; response=%q", body) + } else if !strings.Contains(err.Error(), "connection reset by peer") { t.Error(err) } @@ -774,13 +780,6 @@ func TestFailedDial_HTTPCONN(t *testing.T) { t.Errorf("Unexpected error: %v", err) } - err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - return ps.FrontendConnectionCount() == 0, nil - }) - if err != nil { - t.Errorf("while waiting for connection to be closed: %v", err) - } - if err := metricstest.ExpectServerDialFailure(metricsserver.DialFailureErrorResponse, 1); err != nil { t.Error(err) } @@ -796,10 +795,10 @@ func runGRPCProxyServer(t testing.TB) framework.ProxyServer { func runGRPCProxyServerWithServerCount(t testing.TB, serverCount int) framework.ProxyServer { opts := framework.ProxyServerOpts{ - Mode: "grpc", + Mode: server.ModeGRPC, ServerCount: serverCount, } - ps, err := Framework.ProxyServerRunner.Start(opts) + ps, err := Framework.ProxyServerRunner.Start(t, opts) if err != nil { t.Fatalf("Failed to start gRPC proxy server: %v", err) } @@ -808,10 +807,10 @@ func runGRPCProxyServerWithServerCount(t testing.TB, serverCount int) framework. func runHTTPConnProxyServer(t testing.TB) framework.ProxyServer { opts := framework.ProxyServerOpts{ - Mode: "http", + Mode: server.ModeHTTPConnect, ServerCount: 1, } - ps, err := Framework.ProxyServerRunner.Start(opts) + ps, err := Framework.ProxyServerRunner.Start(t, opts) if err != nil { t.Fatalf("Failed to start HTTP proxy server: %v", err) } @@ -827,7 +826,7 @@ func runAgentWithID(t testing.TB, agentID, addr string) framework.Agent { AgentID: agentID, ServerAddr: addr, } - a, err := Framework.AgentRunner.Start(opts) + a, err := Framework.AgentRunner.Start(t, opts) if err != nil { t.Fatalf("Failed to start agent: %v", err) } @@ -841,7 +840,15 @@ type unresponsiveAgent struct { // Connect registers the unresponsive agent with the proxy server. func (a *unresponsiveAgent) Connect(address string) error { agentID := uuid.New().String() - conn, err := grpc.Dial(address, grpc.WithInsecure()) + agentCert := filepath.Join(framework.CertsDir, framework.TestAgentCertFile) + agentKey := filepath.Join(framework.CertsDir, framework.TestAgentKeyFile) + caCert := filepath.Join(framework.CertsDir, framework.TestCAFile) + host, _, _ := net.SplitHostPort(address) + tlsConfig, err := util.GetClientTLSConfig(caCert, agentCert, agentKey, host, nil) + if err != nil { + return err + } + conn, err := grpc.Dial(address, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) if err != nil { return err } @@ -888,15 +895,19 @@ func waitForConnectedServerCount(t testing.TB, expectedServerCount int, a framew func waitForConnectedAgentCount(t testing.TB, expectedAgentCount int, ps framework.ProxyServer) { t.Helper() err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - count := ps.ConnectedBackends() + count, err := ps.ConnectedBackends() + if err != nil { + return false, err + } if count == expectedAgentCount { return true, nil } return false, nil }) if err != nil { - count := ps.ConnectedBackends() - t.Logf("got %d backends; expected %d", count, expectedAgentCount) + if count, err := ps.ConnectedBackends(); err == nil { + t.Logf("got %d backends; expected %d", count, expectedAgentCount) + } t.Fatalf("Error waiting for backend count: %v", err) } } @@ -938,3 +949,15 @@ func expectCleanShutdown(t testing.TB) { assertNoAgentDialFailures(t) }) } + +func createSingleUseGrpcTunnel(ctx context.Context, addr string) (client.Tunnel, error) { + return client.CreateSingleUseGrpcTunnel(ctx, addr, + grpc.WithContextDialer( + func(ctx context.Context, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", addr) + }), + grpc.WithBlock(), + grpc.WithReturnConnectionError(), + grpc.WithTimeout(30*time.Second), // matches http.DefaultTransport dial timeout + grpc.WithTransportCredentials(insecure.NewCredentials())) +} diff --git a/tests/reconnect_test.go b/tests/reconnect_test.go index f59a18ee0..b04490edd 100644 --- a/tests/reconnect_test.go +++ b/tests/reconnect_test.go @@ -17,79 +17,39 @@ limitations under the License. package tests import ( - "net" "testing" - "time" - "github.com/google/uuid" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" - "k8s.io/apimachinery/pkg/util/wait" - agentproto "sigs.k8s.io/apiserver-network-proxy/proto/agent" - "sigs.k8s.io/apiserver-network-proxy/proto/header" + "sigs.k8s.io/apiserver-network-proxy/pkg/server" + "sigs.k8s.io/apiserver-network-proxy/tests/framework" ) -func TestClientReconnects(t *testing.T) { - connections := make(chan struct{}) - s := &testAgentServerImpl{ - onConnect: func(stream agentproto.AgentService_ConnectServer) error { - stream.SetHeader(metadata.New(map[string]string{ - header.ServerID: uuid.Must(uuid.NewRandom()).String(), - header.ServerCount: "1", - })) - connections <- struct{}{} - return nil - }, - } - - svr := grpc.NewServer() - agentproto.RegisterAgentServiceServer(svr, s) - lis, err := net.Listen("tcp", "127.0.0.1:0") +func TestServerRestartAgentReconnect(t *testing.T) { + agentPort, err := framework.FreePorts(1) if err != nil { t.Fatal(err) } - go func() { - if err := svr.Serve(lis); err != nil { - panic(err) - } - }() + opts := framework.ProxyServerOpts{ + Mode: server.ModeGRPC, + ServerCount: 1, + AgentPort: agentPort[0], + } + ps, err := Framework.ProxyServerRunner.Start(t, opts) + if err != nil { + t.Fatalf("Failed to start gRPC proxy server: %v", err) + } - a := runAgentWithID(t, "test-id", lis.Addr().String()) + a := runAgentWithID(t, "test-id", ps.AgentAddr()) defer a.Stop() - select { - case <-connections: - // Expected - case <-time.After(wait.ForeverTestTimeout): - t.Fatal("Timed out waiting for agent to connect") - } - svr.Stop() + waitForConnectedServerCount(t, 1, a) + ps.Stop() + waitForConnectedServerCount(t, 0, a) - lis2, err := net.Listen("tcp", lis.Addr().String()) + ps2, err := Framework.ProxyServerRunner.Start(t, opts) if err != nil { - t.Fatal(err) - } - svr2 := grpc.NewServer() - agentproto.RegisterAgentServiceServer(svr2, s) - go func() { - if err := svr2.Serve(lis2); err != nil { - panic(err) - } - }() - defer svr2.Stop() - - select { - case <-connections: - // Expected - case <-time.After(wait.ForeverTestTimeout): - t.Fatal("Timed out waiting for agent to reconnect") + t.Fatalf("Failed to start gRPC proxy server: %v", err) } -} - -type testAgentServerImpl struct { - onConnect func(agentproto.AgentService_ConnectServer) error -} + defer ps2.Stop() -func (t *testAgentServerImpl) Connect(svr agentproto.AgentService_ConnectServer) error { - return t.onConnect(svr) + waitForConnectedServerCount(t, 1, a) } diff --git a/tests/tcp_server_test.go b/tests/tcp_server_test.go index 14983da3c..b459f09d5 100644 --- a/tests/tcp_server_test.go +++ b/tests/tcp_server_test.go @@ -21,9 +21,7 @@ import ( "net" "testing" - "google.golang.org/grpc" "k8s.io/klog/v2" - "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" ) func echo(conn net.Conn) { @@ -70,7 +68,7 @@ func TestEchoServer(t *testing.T) { waitForConnectedServerCount(t, 1, a) // run test client - tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure()) + tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr()) if err != nil { t.Fatal(err) }