diff --git a/modules/kafka/go.mod b/modules/kafka/go.mod index 646f47edd2..effe129a00 100644 --- a/modules/kafka/go.mod +++ b/modules/kafka/go.mod @@ -3,7 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/kafka go 1.22 require ( - github.com/IBM/sarama v1.43.2 + github.com/IBM/sarama v1.43.3 github.com/docker/go-connections v0.5.0 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.33.0 @@ -23,7 +23,7 @@ require ( github.com/distribution/reference v0.6.0 // indirect github.com/docker/docker v27.1.1+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/eapache/go-resiliency v1.6.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -41,7 +41,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.17.8 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/kr/text v0.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -68,9 +68,9 @@ require ( go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect - golang.org/x/crypto v0.24.0 // indirect - golang.org/x/net v0.26.0 // indirect - golang.org/x/sys v0.21.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/modules/kafka/go.sum b/modules/kafka/go.sum index d4c3419647..9dbc9510c7 100644 --- a/modules/kafka/go.sum +++ b/modules/kafka/go.sum @@ -4,8 +4,8 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw= -github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -32,8 +32,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= -github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -85,8 +85,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -134,6 +134,8 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -172,8 +174,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -188,14 +192,16 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -211,19 +217,23 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index 3f633b17d0..b3a3af60f0 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -13,10 +13,9 @@ import ( "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/kafka" "github.com/testcontainers/testcontainers-go/network" - "github.com/testcontainers/testcontainers-go/wait" ) -func TestKafka(t *testing.T) { +func TestKafka_Basic(t *testing.T) { topic := "some-topic" ctx := context.Background() @@ -106,8 +105,15 @@ func TestKafka_networkConnectivity(t *testing.T) { var err error const ( + // config topic_in = "topic_in" topic_out = "topic_out" + + address = "kafka:9092" + + // test data + key = "wow" + text_msg = "test-input-external" ) Network, err := network.New(ctx) @@ -129,21 +135,38 @@ func TestKafka_networkConnectivity(t *testing.T) { }), ) // } - if err != nil { - t.Fatalf("failed to start container: %s", err) - } + require.NoError(t, err, "failed to start kafka container") - brokers, err := KafkaContainer.Brokers(context.TODO()) - if err != nil { - t.Fatal("failed to get brokers", err) - } + kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kcat:7.4.1", + Networks: []string{ + Network.Name, + }, + Entrypoint: []string{ + "sh", + }, + Cmd: []string{ + "-c", + "tail -f /dev/null", + }, + }, + Started: true, + }) + // } - err = createTopics(brokers, []string{topic_in, topic_out}) - require.NoError(t, err) + require.NoError(t, err, "failed to start kcat") - _, err = initKafkaTest(ctx, Network.Name, "kafka:9092", topic_in, topic_out) + // 4. Copy message to kcat + err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700) require.NoError(t, err) + brokers, err := KafkaContainer.Brokers(context.TODO()) + require.NoError(t, err, "failed to get brokers") + + err = createTopics(brokers, []string{topic_in, topic_out}) + require.NoError(t, err, "create topics") + // perform assertions // set config to true because successfully delivered messages will be returned on the Successes channel @@ -151,26 +174,37 @@ func TestKafka_networkConnectivity(t *testing.T) { config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer(brokers, config) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err, "create kafka producer") // Act - key := "wow" - text_msg := "test-input-external" - if _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + // External write + _, _, err = producer.SendMessage(&sarama.ProducerMessage{ Topic: topic_in, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(text_msg), - }); err != nil { - t.Fatal(err) - } + }) + require.NoError(t, err, "send message") + + // Internal read + _, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", address, "-C", "-t", topic_in, "-c", "1"}) + require.NoError(t, err) + out, err := io.ReadAll(stdout) + require.NoError(t, err, "read message in kcat") + + // Internal write + tempfile := "/tmp/msgs.txt" + + err = kcat.CopyToContainer(ctx, []byte(out), tempfile, 700) + require.NoError(t, err) + + _, _, err = kcat.Exec(ctx, []string{"kcat", "-b", address, "-t", topic_out, "-P", "-l", tempfile}) + require.NoError(t, err, "send message with kcat") + + // External read client, err := sarama.NewConsumerGroup(brokers, "groupName", config) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err, "create consumer group") consumer, _, done, cancel := NewTestKafkaConsumer(t) go func() { @@ -368,29 +402,6 @@ func TestKafka_listenersValidation(t *testing.T) { } } -func initKafkaTest(ctx context.Context, network string, brokers string, input string, output string) (testcontainers.Container, error) { - req := testcontainers.ContainerRequest{ - FromDockerfile: testcontainers.FromDockerfile{ - Context: "./testdata", - Dockerfile: "Dockerfile", - PrintBuildLog: true, - KeepImage: true, - }, - WaitingFor: wait.ForLog("start consuming events"), - Env: map[string]string{ - "KAFKA_BROKERS": brokers, - "KAFKA_TOPIC_IN": input, - "KAFKA_TOPIC_OUT": output, - }, - Networks: []string{network}, - } - - return testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) -} - func createTopics(brokers []string, topics []string) error { t := &sarama.CreateTopicsRequest{} t.TopicDetails = make(map[string]*sarama.TopicDetail, len(topics)) @@ -406,7 +417,9 @@ func createTopics(brokers []string, topics []string) error { } defer c.Close() - _, err = c.Brokers()[0].CreateTopics(t) + bs := c.Brokers() + + _, err = bs[0].CreateTopics(t) if err != nil { return fmt.Errorf("failed to create topics: %w", err) } @@ -419,10 +432,12 @@ func createTopics(brokers []string, topics []string) error { // assertAdvertisedListeners checks that the advertised listeners are set correctly: // - The BROKER:// protocol is using the hostname of the Kafka container func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) { - hostname, err := container.Host(context.Background()) + inspect, err := container.Inspect(context.Background()) if err != nil { t.Fatal(err) } + hostname := inspect.Config.Hostname + code, r, err := container.Exec(context.Background(), []string{"cat", "/usr/sbin/testcontainers_start.sh"}) if err != nil { t.Fatal(err) diff --git a/modules/kafka/kcat_test.go b/modules/kafka/kcat_test.go new file mode 100644 index 0000000000..5c1d56e83e --- /dev/null +++ b/modules/kafka/kcat_test.go @@ -0,0 +1,63 @@ +package kafka_test + +import ( + "context" + "fmt" + "io" + + "github.com/testcontainers/testcontainers-go" +) + +type KcatContainer struct { + Container testcontainers.Container + FilePath string +} + +func createKCat(ctx context.Context, network, filepath string) (KcatContainer, error) { + kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kcat:7.4.1", + Networks: []string{ + network, + }, + Entrypoint: []string{ + "sh", + }, + Cmd: []string{ + "-c", + "tail -f /dev/null", + }, + }, + Started: true, + }) + + if err != nil { + return KcatContainer{}, fmt.Errorf("create generic container: %w", err) + } + + return KcatContainer{Container: kcat, FilePath: filepath}, nil +} + +func (kcat *KcatContainer) SaveFile(ctx context.Context, data string) error { + return kcat.Container.CopyToContainer(ctx, []byte(data), kcat.FilePath, 700) +} + +func (kcat *KcatContainer) ProduceMessageFromFile(ctx context.Context, broker, topic string) error { + cmd := []string{"kcat", "-b", broker, "-t", topic, "-P", "-l", kcat.FilePath} + _, _, err := kcat.Container.Exec(ctx, cmd) + + return err +} + +func (kcat *KcatContainer) ConsumeMessage(ctx context.Context, broker, topic string) (string, error) { + cmd := []string{"kcat", "-b", broker, "-C", "-t", topic, "-c1"} + _, stdout, err := kcat.Container.Exec(ctx, cmd) + + if err != nil { + return "", err + } + + out, err := io.ReadAll(stdout) + + return string(out), err +} diff --git a/modules/kafka/testdata/Dockerfile b/modules/kafka/testdata/Dockerfile deleted file mode 100644 index a5afc156d7..0000000000 --- a/modules/kafka/testdata/Dockerfile +++ /dev/null @@ -1,14 +0,0 @@ -FROM golang:bullseye - -WORKDIR /app - -COPY go.mod . -COPY go.sum . - -RUN go mod tidy - -COPY . . - -RUN go build -o ./pg_test - -CMD /app/pg_test \ No newline at end of file diff --git a/modules/kafka/testdata/go.mod b/modules/kafka/testdata/go.mod deleted file mode 100644 index 15256e1e22..0000000000 --- a/modules/kafka/testdata/go.mod +++ /dev/null @@ -1,14 +0,0 @@ -module amogus - -go 1.21.6 - -require ( - github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 - github.com/google/uuid v1.6.0 - github.com/pkg/errors v0.9.1 -) - -require ( - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.15.0 // indirect -) diff --git a/modules/kafka/testdata/go.sum b/modules/kafka/testdata/go.sum deleted file mode 100644 index 38e7e8f3f8..0000000000 --- a/modules/kafka/testdata/go.sum +++ /dev/null @@ -1,72 +0,0 @@ -github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= -github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= -github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= -github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I= -github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc= -github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= -github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= -github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= -github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= -github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA= -github.com/containerd/cgroups v1.0.4/go.mod h1:nLNQtsF7Sl2HxNebu77i1R0oDlhiTG+kO4JTrUzo6IA= -github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs= -github.com/containerd/containerd v1.6.8/go.mod h1:By6p5KqPK0/7/CgO/A6t/Gz+CUYUu2zf1hUaaymVXB0= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= -github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE= -github.com/docker/docker v20.10.17+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= -github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= -github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= -github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= -github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= -github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= -github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= -github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0= -github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= -github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= -github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= -github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw= -github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= -github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= -github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= -github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= -github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec= -github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= -github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w= -github.com/opencontainers/runc v1.1.3/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8= -github.com/testcontainers/testcontainers-go v0.14.0/go.mod h1:hSRGJ1G8Q5Bw2gXgPulJOLlEBaYJHeBSOkQM5JLG+JQ= -go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= -go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08= -google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= -google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= -google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/modules/kafka/testdata/main.go b/modules/kafka/testdata/main.go deleted file mode 100644 index 96ea131822..0000000000 --- a/modules/kafka/testdata/main.go +++ /dev/null @@ -1,165 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log" - "log/slog" - "os" - "time" - - "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/google/uuid" - "github.com/pkg/errors" -) - -const ( - client = "kafka_rw_test" - group = "test_group" -) - -func main() { - brokers, _ := os.LookupEnv("KAFKA_BROKERS") - input_topic, _ := os.LookupEnv("KAFKA_TOPIC_IN") - output_topic, _ := os.LookupEnv("KAFKA_TOPIC_OUT") - - log.Printf("Got brokers: %v\n", brokers) - log.Printf("Got input topic: %v\n", input_topic) - log.Printf("Got output topic: %v\n", output_topic) - - consumer, err := InitNativeKafkaConsumer(client, brokers, group) - if err != nil { - log.Fatal(fmt.Errorf("failed to start kafka consumer: %w", err)) - } - - defer consumer.Close() - - meta, err := consumer.GetMetadata(nil, true, 1000) - log.Printf("Metadata: %#+v, %v", meta, err) - - producer, err := InitNativeKafkaProducer(client, brokers, "1", 30000) - if err != nil { - log.Fatal(fmt.Errorf("failed to start kafka producer: %w", err)) - } - - defer producer.Close() - - err = consumer.SubscribeTopics([]string{input_topic}, nil) - if err != nil { - log.Fatal(fmt.Errorf("failed to subscribe to kafka topic: %w", err)) - } - - StartConsuming(context.TODO(), consumer, producer, output_topic) - - fmt.Print("Finished\n") -} - -func StartConsuming(ctx context.Context, consumer *kafka.Consumer, producer *kafka.Producer, outTopic string) { - log.Println("start consuming events") - - run := true - - for run { - msg, err := consumer.ReadMessage(time.Second * 1) - if err != nil { - kErr, ok := err.(kafka.Error) - if ok && kErr.IsTimeout() { - log.Println(fmt.Errorf("read timeout: %w", kErr)) - continue - } - - log.Println(fmt.Errorf("failed to read message: %w", err)) - continue - } - - log.Printf("got message: %s\n", string(msg.Value)) - - outputText := string(msg.Value) + "-from-internal" - output := MakeMsg(outTopic, string(msg.Key), outputText) - - err = producer.Produce(&output, nil) - - if err != nil { - log.Println(fmt.Errorf("failed to write message: %w", err)) - continue - } - - log.Printf("written: %s\n", outputText) - } -} - -func MakeMsg(topic, key string, message interface{}) kafka.Message { - headers := []kafka.Header{ - { - Key: "DateAdd", Value: []byte(time.Now().Format(time.RFC3339Nano)), - }, - { - Key: "MessageId", Value: []byte(uuid.NewString()), - }, - } - - messageJson, _ := json.Marshal(message) - - keyJson, _ := json.Marshal(key) - - msg := kafka.Message{ - TopicPartition: kafka.TopicPartition{ - Topic: &topic, - Partition: kafka.PartitionAny, - }, - Value: messageJson, - Key: keyJson, - Headers: headers, - } - - return msg -} - -func InitNativeKafkaProducer( - clientID string, - brokers string, - acks string, - bufMaxMsg int, -) (*kafka.Producer, error) { - cfg := kafka.ConfigMap{ - "bootstrap.servers": brokers, - "client.id": clientID, - "acks": acks, - "queue.buffering.max.messages": bufMaxMsg, - "go.delivery.reports": false, - } - - p, err := kafka.NewProducer(&cfg) - if err != nil { - slog.Error("new producer", err) - return nil, err - } - - slog.Info(fmt.Sprintf("kafka producer %s created", clientID)) - - return p, nil -} - -func InitNativeKafkaConsumer( - clientID string, - brokers string, - group string, -) (*kafka.Consumer, error) { - config := kafka.ConfigMap{ - "bootstrap.servers": brokers, - "group.id": group, - "client.id": clientID, - "auto.offset.reset": "earliest", - "auto.commit.interval.ms": 3000, - } - - c, err := kafka.NewConsumer(&config) - if err != nil { - return nil, errors.Wrapf(err, "create kafka consumer") - } - - slog.Info(fmt.Sprintf("kafka consumer %s created", clientID)) - - return c, nil -}