diff --git a/modules/elasticsearch/elasticsearch.go b/modules/elasticsearch/elasticsearch.go index 5fab503b36..e878c3e6ef 100644 --- a/modules/elasticsearch/elasticsearch.go +++ b/modules/elasticsearch/elasticsearch.go @@ -2,6 +2,8 @@ package elasticsearch import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "io" "os" @@ -15,6 +17,7 @@ const ( defaultTCPPort = "9300" defaultPassword = "changeme" defaultUsername = "elastic" + defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt" minimalImageVersion = "7.9.2" ) @@ -32,7 +35,7 @@ type ElasticsearchContainer struct { } // Deprecated: use Run instead -// RunContainer creates an instance of the Couchbase container type +// RunContainer creates an instance of the Elasticsearch container type func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*ElasticsearchContainer, error) { return Run(ctx, "docker.elastic.co/elasticsearch/elasticsearch:7.9.2", opts...) } @@ -50,54 +53,41 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom defaultHTTPPort + "/tcp", defaultTCPPort + "/tcp", }, - // regex that - // matches 8.3 JSON logging with started message and some follow up content within the message field - // matches 8.0 JSON logging with no whitespace between message field and content - // matches 7.x JSON logging with whitespace between message field and content - // matches 6.x text logging with node name in brackets and just a 'started' message till the end of the line - WaitingFor: wait.ForLog(`.*("message":\s?"started(\s|")?.*|]\sstarted\n)`).AsRegexp(), - LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ - { - // the container needs a post create hook to set the default JVM options in a file - PostCreates: []testcontainers.ContainerHook{}, - PostReadies: []testcontainers.ContainerHook{}, - }, - }, }, Started: true, } // Gather all config options (defaults and then apply provided options) - settings := defaultOptions() + options := defaultOptions() for _, opt := range opts { if apply, ok := opt.(Option); ok { - apply(settings) + apply(options) } if err := opt.Customize(&req); err != nil { return nil, err } } - // Transfer the certificate settings to the container request - err := configureCertificate(settings, &req) - if err != nil { - return nil, err - } - // Transfer the password settings to the container request - err = configurePassword(settings, &req) - if err != nil { + if err := configurePassword(options, &req); err != nil { return nil, err } if isAtLeastVersion(req.Image, 7) { - req.LifecycleHooks[0].PostCreates = append(req.LifecycleHooks[0].PostCreates, configureJvmOpts) + req.LifecycleHooks = append(req.LifecycleHooks, + testcontainers.ContainerLifecycleHooks{ + PostCreates: []testcontainers.ContainerHook{configureJvmOpts}, + }, + ) } + // Set the default waiting strategy if not already set. + setWaitFor(options, &req.ContainerRequest) + container, err := testcontainers.GenericContainer(ctx, req) var esContainer *ElasticsearchContainer if container != nil { - esContainer = &ElasticsearchContainer{Container: container, Settings: *settings} + esContainer = &ElasticsearchContainer{Container: container, Settings: *options} } if err != nil { return esContainer, fmt.Errorf("generic container: %w", err) @@ -110,6 +100,61 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom return esContainer, nil } +// certWriter is a helper that writes the details of a CA cert to options. +type certWriter struct { + options *Options + certPool *x509.CertPool +} + +// Read reads the CA cert from the reader and appends it to the options. +func (w *certWriter) Read(r io.Reader) error { + buf, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("read CA cert: %w", err) + } + + w.options.CACert = buf + w.certPool.AppendCertsFromPEM(w.options.CACert) + + return nil +} + +// setWaitFor sets the req.WaitingFor strategy based on settings. +func setWaitFor(options *Options, req *testcontainers.ContainerRequest) { + var strategies []wait.Strategy + if req.WaitingFor != nil { + // Custom waiting strategy, ensure we honour it. + strategies = append(strategies, req.WaitingFor) + } + + waitHTTP := wait.ForHTTP("/").WithPort(defaultHTTPPort) + if sslRequired(req) { + waitHTTP = waitHTTP.WithTLS(true).WithAllowInsecure(true) + cw := &certWriter{ + options: options, + certPool: x509.NewCertPool(), + } + + waitHTTP = waitHTTP. + WithTLS(true, &tls.Config{RootCAs: cw.certPool}) + + strategies = append(strategies, wait.ForFile(defaultCaCertPath).WithMatcher(cw.Read)) + } + + if options.Password != "" || options.Username != "" { + waitHTTP = waitHTTP.WithBasicAuth(options.Username, options.Password) + } + + strategies = append(strategies, waitHTTP) + + if len(strategies) > 1 { + req.WaitingFor = wait.ForAll(strategies...) + return + } + + req.WaitingFor = strategies[0] +} + // configureAddress sets the address of the Elasticsearch container. // If the certificate is set, it will use https as protocol, otherwise http. func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error { @@ -133,50 +178,28 @@ func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error { return nil } -// configureCertificate transfers the certificate settings to the container request. -// For that, it defines a post start hook that copies the certificate from the container to the host. -// The certificate is only available since version 8, and will be located in a well-known location. -func configureCertificate(settings *Options, req *testcontainers.GenericContainerRequest) error { - if isAtLeastVersion(req.Image, 8) { - // These configuration keys explicitly disable CA generation. - // If any are set we skip the file retrieval. - configKeys := []string{ - "xpack.security.enabled", - "xpack.security.http.ssl.enabled", - "xpack.security.transport.ssl.enabled", - } - for _, configKey := range configKeys { - if value, ok := req.Env[configKey]; ok { - if value == "false" { - return nil - } +// sslRequired returns true if the SSL is required, otherwise false. +func sslRequired(req *testcontainers.ContainerRequest) bool { + if !isAtLeastVersion(req.Image, 8) { + return false + } + + // These configuration keys explicitly disable CA generation. + // If any are set we skip the file retrieval. + configKeys := []string{ + "xpack.security.enabled", + "xpack.security.http.ssl.enabled", + "xpack.security.transport.ssl.enabled", + } + for _, configKey := range configKeys { + if value, ok := req.Env[configKey]; ok { + if value == "false" { + return false } } - - // The container needs a post ready hook to copy the certificate from the container to the host. - // This certificate is only available since version 8 - req.LifecycleHooks[0].PostReadies = append(req.LifecycleHooks[0].PostReadies, - func(ctx context.Context, container testcontainers.Container) error { - const defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt" - - readCloser, err := container.CopyFileFromContainer(ctx, defaultCaCertPath) - if err != nil { - return err - } - - // receive the bytes from the default location - certBytes, err := io.ReadAll(readCloser) - if err != nil { - return err - } - - settings.CACert = certBytes - - return nil - }) } - return nil + return true } // configurePassword transfers the password settings to the container request. diff --git a/modules/elasticsearch/options.go b/modules/elasticsearch/options.go index ed801c3b09..ba4dca75c3 100644 --- a/modules/elasticsearch/options.go +++ b/modules/elasticsearch/options.go @@ -16,7 +16,6 @@ type Options struct { func defaultOptions() *Options { return &Options{ - CACert: nil, Username: defaultUsername, } } diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index 686d310d9f..c0936637d3 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -62,7 +62,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) { tmpDir, err := os.MkdirTemp("", "redpanda") if err != nil { - return nil, fmt.Errorf("failed to create directory: %w", err) + return nil, fmt.Errorf("create temporary directory: %w", err) } defer os.RemoveAll(tmpDir) @@ -121,13 +121,13 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom // the Kafka API. entrypointPath := filepath.Join(tmpDir, entrypointFile) if err := os.WriteFile(entrypointPath, entrypoint, 0o700); err != nil { - return nil, fmt.Errorf("failed to create entrypoint file: %w", err) + return nil, fmt.Errorf("write entrypoint file: %w", err) } // 4. Register extra kafka listeners if provided, network aliases will be // set if err := registerListeners(settings, req); err != nil { - return nil, fmt.Errorf("failed to register listeners: %w", err) + return nil, fmt.Errorf("register listeners: %w", err) } // Bootstrap config file contains cluster configurations which will only be considered @@ -135,10 +135,10 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile) bootstrapConfig, err := renderBootstrapConfig(settings) if err != nil { - return nil, fmt.Errorf("failed to create bootstrap config file: %w", err) + return nil, err } if err := os.WriteFile(bootstrapConfigPath, bootstrapConfig, 0o600); err != nil { - return nil, fmt.Errorf("failed to create bootstrap config file: %w", err) + return nil, fmt.Errorf("write bootstrap config: %w", err) } req.Files = append(req.Files, @@ -158,11 +158,11 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom if settings.EnableTLS { certPath := filepath.Join(tmpDir, certFile) if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil { - return nil, fmt.Errorf("failed to create certificate file: %w", err) + return nil, fmt.Errorf("write certificate file: %w", err) } keyPath := filepath.Join(tmpDir, keyFile) if err := os.WriteFile(keyPath, settings.key, 0o600); err != nil { - return nil, fmt.Errorf("failed to create key file: %w", err) + return nil, fmt.Errorf("write key file: %w", err) } req.Files = append(req.Files, @@ -192,34 +192,54 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom // the Redpanda config with the advertised Kafka address. hostIP, err := ctr.Host(ctx) if err != nil { - return c, fmt.Errorf("failed to get container host: %w", err) + return c, fmt.Errorf("host: %w", err) } kafkaPort, err := ctr.MappedPort(ctx, nat.Port(defaultKafkaAPIPort)) if err != nil { - return c, fmt.Errorf("failed to get mapped Kafka port: %w", err) + return c, fmt.Errorf("mapped kafka port: %w", err) } // 7. Render redpanda.yaml config and mount it. nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int()) if err != nil { - return c, fmt.Errorf("failed to render node config: %w", err) + return c, err } - err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 600) + err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 0o600) if err != nil { - return c, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err) + return c, fmt.Errorf("copy to container: %w", err) } // 8. Wait until Redpanda is ready to serve requests. + waitHTTP := wait.ForHTTP(defaultAdminAPIPort). + WithStatusCodeMatcher(func(status int) bool { + // Redpanda's admin API returns 404 for requests to "/". + return status == http.StatusNotFound + }) + + var tlsConfig *tls.Config + if settings.EnableTLS { + cert, err := tls.X509KeyPair(settings.cert, settings.key) + if err != nil { + return c, fmt.Errorf("create admin cert: %w", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(settings.cert) + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + waitHTTP = waitHTTP.WithTLS(true, tlsConfig) + } err = wait.ForAll( wait.ForListeningPort(defaultKafkaAPIPort), - wait.ForListeningPort(defaultAdminAPIPort), + waitHTTP, wait.ForListeningPort(defaultSchemaRegistryPort), wait.ForLog("Successfully started Redpanda!"), ).WaitUntilReady(ctx, ctr) if err != nil { - return c, fmt.Errorf("failed to wait for Redpanda readiness: %w", err) + return c, fmt.Errorf("wait for readiness: %w", err) } c.urlScheme = "http" @@ -231,34 +251,25 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom if len(settings.ServiceAccounts) > 0 { adminAPIPort, err := ctr.MappedPort(ctx, nat.Port(defaultAdminAPIPort)) if err != nil { - return c, fmt.Errorf("failed to get mapped Admin API port: %w", err) + return c, fmt.Errorf("mapped admin port: %w", err) } adminAPIUrl := fmt.Sprintf("%s://%v:%d", c.urlScheme, hostIP, adminAPIPort.Int()) adminCl := NewAdminAPIClient(adminAPIUrl) if settings.EnableTLS { - cert, err := tls.X509KeyPair(settings.cert, settings.key) - if err != nil { - return c, fmt.Errorf("failed to create admin client with cert: %w", err) - } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(settings.cert) adminCl = adminCl.WithHTTPClient(&http.Client{ Timeout: 5 * time.Second, Transport: &http.Transport{ ForceAttemptHTTP2: true, TLSHandshakeTimeout: 10 * time.Second, - TLSClientConfig: &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caCertPool, - }, + TLSClientConfig: tlsConfig, }, }) } for username, password := range settings.ServiceAccounts { if err := adminCl.CreateUser(ctx, username, password); err != nil { - return c, fmt.Errorf("failed to create service account with username %q: %w", username, err) + return c, fmt.Errorf("create user %q: %w", username, err) } } } @@ -299,12 +310,12 @@ func renderBootstrapConfig(settings options) ([]byte, error) { tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl) if err != nil { - return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err) + return nil, fmt.Errorf("parse bootstrap template: %w", err) } var bootstrapConfig bytes.Buffer if err := tpl.Execute(&bootstrapConfig, bootstrapTplParams); err != nil { - return nil, fmt.Errorf("failed to render redpanda bootstrap config template: %w", err) + return nil, fmt.Errorf("render bootstrap template: %w", err) } return bootstrapConfig.Bytes(), nil @@ -353,12 +364,12 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ncTpl, err := template.New("redpanda.yaml").Parse(nodeConfigTpl) if err != nil { - return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err) + return nil, fmt.Errorf("parse node config template: %w", err) } var redpandaYaml bytes.Buffer if err := ncTpl.Execute(&redpandaYaml, tplParams); err != nil { - return nil, fmt.Errorf("failed to render redpanda node config template: %w", err) + return nil, fmt.Errorf("render node config template: %w", err) } return redpandaYaml.Bytes(), nil