Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(influxdb): Respect custom waitStrategy #2845

Merged
merged 11 commits into from
Nov 20, 2024
54 changes: 27 additions & 27 deletions modules/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package influxdb
import (
"context"
"fmt"
"io"
"path"
"strings"

Expand Down Expand Up @@ -34,7 +35,6 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
"INFLUXDB_HTTP_HTTPS_ENABLED": "false",
"INFLUXDB_HTTP_AUTH_ENABLED": "false",
},
WaitingFor: wait.ForListeningPort("8086/tcp"),
}
genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Expand All @@ -47,8 +47,24 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
}
}

hasInitDb := false
if genericContainerReq.WaitingFor == nil {
genericContainerReq.WaitingFor = defaultWaitStrategy(genericContainerReq)
}
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved

container, err := testcontainers.GenericContainer(ctx, genericContainerReq)
var c *InfluxDbContainer
if container != nil {
c = &InfluxDbContainer{Container: container}
}

if err != nil {
return c, fmt.Errorf("generic container: %w", err)
}

return c, nil
}

func defaultWaitStrategy(genericContainerReq testcontainers.GenericContainerRequest) wait.Strategy {
for _, f := range genericContainerReq.Files {
if f.ContainerFilePath == "/" && strings.HasSuffix(f.HostFilePath, "docker-entrypoint-initdb.d") {
// Init service in container will start influxdb, run scripts in docker-entrypoint-initdb.d and then
Expand All @@ -57,39 +73,23 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// "Open shard" which is the last thing that happens before the server is ready to accept connections.
// This is probably different for InfluxDB 2.x, but that is left as an exercise for the reader.
strategies := []wait.Strategy{
genericContainerReq.WaitingFor,
wait.ForListeningPort("8086/tcp"),
wait.ForLog("influxdb init process in progress..."),
wait.ForLog("Server shutdown completed"),
wait.ForLog("Opened shard"),
}
genericContainerReq.WaitingFor = wait.ForAll(strategies...)
hasInitDb = true
break
return wait.ForAll(strategies...)
marcinmilewski93 marked this conversation as resolved.
Show resolved Hide resolved
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}
}

if !hasInitDb {
if lastIndex := strings.LastIndex(genericContainerReq.Image, ":"); lastIndex != -1 {
tag := genericContainerReq.Image[lastIndex+1:]
if tag == "latest" || tag[0] == '2' {
genericContainerReq.WaitingFor = wait.ForLog(`Listening log_id=[0-9a-zA-Z_~]+ service=tcp-listener transport=http`).AsRegexp()
return wait.ForHTTP("/health").
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
WithResponseMatcher(func(body io.Reader) bool {
marcinmilewski93 marked this conversation as resolved.
Show resolved Hide resolved
bs, err := io.ReadAll(body)
if err != nil {
return false
}
} else {
genericContainerReq.WaitingFor = wait.ForLog("Listening for signals")
}
}

container, err := testcontainers.GenericContainer(ctx, genericContainerReq)
var c *InfluxDbContainer
if container != nil {
c = &InfluxDbContainer{Container: container}
}

if err != nil {
return c, fmt.Errorf("generic container: %w", err)
}

return c, nil
return strings.Contains(string(bs), "ready for queries and writes")
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
})
}

func (c *InfluxDbContainer) MustConnectionUrl(ctx context.Context) string {
Expand Down