diff --git a/go.mod b/go.mod index 2aa6aaa83b8..bdfce20bb37 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 github.com/grafana/dskit v0.0.0-20240117144322-b9a439dedeb8 - github.com/grafana/e2e v0.1.1 + github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 github.com/minio/minio-go/v7 v7.0.66 diff --git a/go.sum b/go.sum index 76ea7230e85..c6453572a6d 100644 --- a/go.sum +++ b/go.sum @@ -545,8 +545,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/dskit v0.0.0-20240117144322-b9a439dedeb8 h1:6sqXQFpxe/VO2VsBwhdbGd6TGSULFf9Mpzij7zXQ66o= github.com/grafana/dskit v0.0.0-20240117144322-b9a439dedeb8/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM= -github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc= -github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE= +github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= +github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM= github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae/go.mod h1:GFAN9Jn9t1cX7sNfc6ZoFyc4f7i8jtm3SajrWdZM2EE= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= diff --git a/integration/configs.go b/integration/configs.go index 32bab1eae42..b83c0840f9d 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -16,6 +16,8 @@ import ( e2e "github.com/grafana/e2e" e2edb "github.com/grafana/e2e/db" + + "github.com/grafana/mimir/pkg/querier/api" ) const ( @@ -208,6 +210,23 @@ blocks_storage: MinioSecretKey: e2edb.MinioSecretKey, MinioEndpoint: fmt.Sprintf("%s-minio-9000:9000", networkName), }) + + IngestStorageFlags = func() map[string]string { + return map[string]string{ + "-ingest-storage.enabled": "true", + "-ingest-storage.kafka.topic": "ingest", + "-ingest-storage.kafka.address": fmt.Sprintf("%s-kafka:9092", networkName), + + // To simplify integration tests, we use strong read consistency by default. + // Integration tests that want to test the eventual consistency can override it. + "-ingest-storage.read-consistency": api.ReadConsistencyStrong, + + // Frequently poll last produced offset in order to have a low end-to-end latency + // and faster integration tests. + "-ingest-storage.kafka.last-produced-offset-poll-interval": "50ms", + "-ingest-storage.kafka.last-produced-offset-retry-timeout": "1s", + } + } ) func buildConfigFromTemplate(tmpl string, data interface{}) string { diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 719ee25fb09..1964737ffb2 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -229,6 +229,28 @@ func TestQueryFrontendWithQueryResultPayloadFormats(t *testing.T) { } } +func TestQueryFrontendWithIngestStorageViaFlagsAndWithQuerySchedulerAndQueryStatsEnabled(t *testing.T) { + runQueryFrontendTest(t, queryFrontendTestConfig{ + querySchedulerEnabled: true, + querySchedulerDiscoveryMode: "dns", + queryStatsEnabled: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + flags = mergeFlags( + BlocksStorageFlags(), + BlocksStorageS3Flags(), + IngestStorageFlags(), + ) + + kafka := e2edb.NewKafka() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio, kafka)) + + return "", flags + }, + withHistograms: true, + }) +} + func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { const numUsers = 10 const numQueriesPerUser = 10 @@ -275,7 +297,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { } // Start all other services. - ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, e2emimir.WithConfigFile(configFile)) + ingester := e2emimir.NewIngester("ingester-0", consul.NetworkHTTPEndpoint(), flags, e2emimir.WithConfigFile(configFile)) distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, e2emimir.WithConfigFile(configFile)) querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, e2emimir.WithConfigFile(configFile)) diff --git a/tools/pre-pull-images/main.go b/tools/pre-pull-images/main.go index 3d662c3cd2d..ed5d4a73771 100644 --- a/tools/pre-pull-images/main.go +++ b/tools/pre-pull-images/main.go @@ -18,6 +18,7 @@ func main() { fmt.Println(images.Consul) fmt.Println(images.ETCD) fmt.Println(images.Memcached) + fmt.Println(images.Kafka) // images from previous releases for image := range integration.DefaultPreviousVersionImages { diff --git a/vendor/github.com/grafana/e2e/db/kafka.go b/vendor/github.com/grafana/e2e/db/kafka.go new file mode 100644 index 00000000000..fe91aedc717 --- /dev/null +++ b/vendor/github.com/grafana/e2e/db/kafka.go @@ -0,0 +1,103 @@ +package e2edb + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/e2e" + "github.com/grafana/e2e/images" +) + +type KafkaService struct { + *e2e.HTTPService +} + +func NewKafka() *KafkaService { + return &KafkaService{ + HTTPService: e2e.NewHTTPService( + "kafka", + images.Kafka, + nil, // No custom command. + NewKafkaReadinessProbe(9092), + 9092, + ), + } +} + +func (s *KafkaService) Start(networkName, sharedDir string) (err error) { + // Configures Kafka right before starting it so that we have the networkName to correctly compute + // the advertised host. + s.HTTPService.SetEnvVars(map[string]string{ + // Configure Kafka to run in KRaft mode (without Zookeeper). + "CLUSTER_ID": "NqnEdODVKkiLTfJvqd1uqQ==", // A random ID (16 bytes of a base64-encoded UUID). + "KAFKA_BROKER_ID": "1", + "KAFKA_NODE_ID": "1", + "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093,PLAINTEXT_HOST://localhost:29092", // Host and port to which Kafka binds to for listening. + "KAFKA_PROCESS_ROLES": "broker,controller", + "KAFKA_CONTROLLER_QUORUM_VOTERS": "1@kafka:29093", + "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", + + // Configure the advertised host and post. + "KAFKA_ADVERTISED_LISTENERS": fmt.Sprintf("PLAINTEXT://%s-kafka:9092,PLAINTEXT_HOST://localhost:29092", networkName), + + // RF=1. + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1", + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1", + "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": "0", + + // No TLS. + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT", + "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT", + + // Enough partitions for integration tests. + "KAFKA_NUM_PARTITIONS": "3", + + "LOG4J_ROOT_LOGLEVEL": "WARN", + }) + + return s.HTTPService.Start(networkName, sharedDir) +} + +// KafkaReadinessProbe checks readiness by ensure a Kafka broker is up and running. +type KafkaReadinessProbe struct { + port int +} + +func NewKafkaReadinessProbe(port int) *KafkaReadinessProbe { + return &KafkaReadinessProbe{ + port: port, + } +} + +func (p *KafkaReadinessProbe) Ready(service *e2e.ConcreteService) (err error) { + const timeout = time.Second + + endpoint := service.Endpoint(p.port) + if endpoint == "" { + return fmt.Errorf("cannot get service endpoint for port %d", p.port) + } else if endpoint == "stopped" { + return errors.New("service has stopped") + } + + client, err := kgo.NewClient(kgo.SeedBrokers(endpoint), kgo.DialTimeout(timeout)) + if err != nil { + return err + } + + // Ensure we close the client once done. + defer client.Close() + + admin := kadm.NewClient(client) + + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + _, err = admin.ApiVersions(ctxWithTimeout) + return err +} diff --git a/vendor/github.com/grafana/e2e/images/images.go b/vendor/github.com/grafana/e2e/images/images.go index c6b47ceac79..5fd263986f0 100644 --- a/vendor/github.com/grafana/e2e/images/images.go +++ b/vendor/github.com/grafana/e2e/images/images.go @@ -12,4 +12,5 @@ var ( BigtableEmulator = "shopify/bigtable-emulator:0.1.0" Cassandra = "rinscy/cassandra:3.11.0" SwiftEmulator = "bouncestorage/swift-aio:55ba4331" + Kafka = "confluentinc/cp-kafka:7.5.3" ) diff --git a/vendor/modules.txt b/vendor/modules.txt index 85aecc328d5..6a52604b6f0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -587,8 +587,8 @@ github.com/grafana/dskit/tenant github.com/grafana/dskit/test github.com/grafana/dskit/tracing github.com/grafana/dskit/user -# github.com/grafana/e2e v0.1.1 -## explicit; go 1.18 +# github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc +## explicit; go 1.19 github.com/grafana/e2e github.com/grafana/e2e/cache github.com/grafana/e2e/db