From 2991625f01df6f25da148a65838bd48addb2fbae Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 8 Jan 2025 06:35:01 +0000 Subject: [PATCH] Bump github.com/IBM/sarama from 1.44.0 to 1.45.0 Bumps [github.com/IBM/sarama](https://github.com/IBM/sarama) from 1.44.0 to 1.45.0. - [Release notes](https://github.com/IBM/sarama/releases) - [Changelog](https://github.com/IBM/sarama/blob/main/CHANGELOG.md) - [Commits](https://github.com/IBM/sarama/compare/v1.44.0...v1.45.0) --- updated-dependencies: - dependency-name: github.com/IBM/sarama dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 10 +- go.sum | 20 +- vendor/github.com/IBM/sarama/Dockerfile.kafka | 38 ++- vendor/github.com/IBM/sarama/Makefile | 2 +- vendor/github.com/IBM/sarama/admin.go | 217 ++++++++++++------ .../github.com/IBM/sarama/balance_strategy.go | 5 + .../IBM/sarama/create_topics_request.go | 15 ++ .../IBM/sarama/delete_topics_request.go | 15 ++ .../github.com/IBM/sarama/docker-compose.yml | 23 +- vendor/github.com/IBM/sarama/errors.go | 2 +- .../github.com/IBM/sarama/server.properties | 138 +++++++++++ vendor/github.com/IBM/sarama/utils.go | 4 +- .../klauspost/compress/.goreleaser.yml | 6 +- .../github.com/klauspost/compress/README.md | 29 ++- .../klauspost/compress/flate/deflate.go | 2 +- .../klauspost/compress/flate/inflate.go | 74 ++++-- .../klauspost/compress/fse/decompress.go | 2 +- .../klauspost/compress/huff0/decompress.go | 4 +- .../klauspost/compress/zstd/blockdec.go | 4 +- .../klauspost/compress/zstd/enc_better.go | 32 +-- .../klauspost/compress/zstd/enc_dfast.go | 16 +- .../klauspost/compress/zstd/encoder.go | 45 +++- .../klauspost/compress/zstd/framedec.go | 4 +- .../klauspost/compress/zstd/seqdec_amd64.go | 4 +- .../klauspost/compress/zstd/seqdec_amd64.s | 8 +- .../klauspost/compress/zstd/zstd.go | 4 + .../x/sys/unix/syscall_dragonfly.go | 12 + .../golang.org/x/sys/windows/dll_windows.go | 11 +- vendor/modules.txt | 14 +- 29 files changed, 568 insertions(+), 192 deletions(-) create mode 100644 vendor/github.com/IBM/sarama/server.properties diff --git a/go.mod b/go.mod index b4c9ea31..93e8e56e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/beatlabs/bake go 1.22 require ( - github.com/IBM/sarama v1.44.0 + github.com/IBM/sarama v1.45.0 github.com/cenkalti/backoff/v3 v3.2.2 github.com/hashicorp/consul/api v1.31.0 github.com/magefile/mage v1.15.0 @@ -50,7 +50,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -75,10 +75,10 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect - golang.org/x/crypto v0.31.0 // indirect + golang.org/x/crypto v0.32.0 // indirect golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.28.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 01207428..4fb7dfc3 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4 github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/IBM/sarama v1.44.0 h1:puNKqcScjSAgVLramjsuovZrS0nJZFVsrvuUymkWqhE= -github.com/IBM/sarama v1.44.0/go.mod h1:MxQ9SvGfvKIorbk077Ff6DUnBlGpidiQOtU2vuBaxVw= +github.com/IBM/sarama v1.45.0 h1:IzeBevTn809IJ/dhNKhP5mpxEXTmELuezO2tgHD9G5E= +github.com/IBM/sarama v1.45.0/go.mod h1:EEay63m8EZkeumco9TDXf2JT3uDnZsZqFgV46n4yZdY= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= @@ -159,8 +159,8 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -300,8 +300,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ= golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -320,8 +320,8 @@ golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -360,8 +360,8 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/vendor/github.com/IBM/sarama/Dockerfile.kafka b/vendor/github.com/IBM/sarama/Dockerfile.kafka index b4d5c6ac..d2234e39 100644 --- a/vendor/github.com/IBM/sarama/Dockerfile.kafka +++ b/vendor/github.com/IBM/sarama/Dockerfile.kafka @@ -1,17 +1,17 @@ -FROM registry.access.redhat.com/ubi8/ubi-minimal:8.10@sha256:cf095e5668919ba1b4ace3888107684ad9d587b1830d3eb56973e6a54f456e67 +FROM registry.access.redhat.com/ubi9/ubi-minimal:9.5@sha256:daa61d6103e98bccf40d7a69a0d4f8786ec390e2204fd94f7cc49053e9949360 USER root RUN microdnf update -y \ - && microdnf install -y curl gzip java-11-openjdk-headless tar tzdata-java \ + && microdnf install -y git gzip java-17-openjdk-headless tar tzdata-java \ && microdnf reinstall -y tzdata \ && microdnf clean all -ENV JAVA_HOME=/usr/lib/jvm/jre-11 +ENV JAVA_HOME=/usr/lib/jvm/jre-17 # https://docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html # Ensure Java doesn't cache any dns results -RUN cd /etc/java/java-11-openjdk/*/conf/security \ +RUN cd /etc/java/java-17-openjdk/*/conf/security \ && sed -e '/networkaddress.cache.ttl/d' -e '/networkaddress.cache.negative.ttl/d' -i java.security \ && echo 'networkaddress.cache.ttl=0' >> java.security \ && echo 'networkaddress.cache.negative.ttl=0' >> java.security @@ -19,24 +19,46 @@ RUN cd /etc/java/java-11-openjdk/*/conf/security \ ARG SCALA_VERSION="2.13" ARG KAFKA_VERSION="3.6.2" +WORKDIR /tmp + # https://github.com/apache/kafka/blob/2e2b0a58eda3e677763af974a44a6aaa3c280214/tests/docker/Dockerfile#L77-L105 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" SHELL ["/bin/bash", "-o", "pipefail", "-c"] -RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ +RUN --mount=type=bind,target=.,rw=true \ + mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ && chmod a+rw "/opt/kafka-${KAFKA_VERSION}" \ - && curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" + && if [ "$KAFKA_VERSION" = "4.0.0" ]; then \ + microdnf install -y java-17-openjdk-devel \ + && git clone --depth=50 --single-branch -b 4.0 https://github.com/apache/kafka /usr/src/kafka \ + && cd /usr/src/kafka \ + && : PIN TO COMMIT BEFORE KAFKA-17616 ZOOKEEPER REMOVAL STARTED \ + && git reset --hard d1504649fb \ + && export JAVA_TOOL_OPTIONS=-XX:MaxRAMPercentage=80 \ + && sed -e '/version=/s/-SNAPSHOT//' -e '/org.gradle.jvmargs/d' -e '/org.gradle.parallel/s/true/false/' -i gradle.properties && ./gradlew -PmaxParallelForks=1 -PmaxScalacThreads=1 --no-daemon releaseTarGz -x siteDocsTar -x javadoc \ + && tar xzf core/build/distributions/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" \ + && cp /tmp/server.properties "/opt/kafka-${KAFKA_VERSION}/config/" \ + && microdnf remove -y java-17-openjdk-devel \ + && rm -rf /usr/src/kafka ; \ + else \ + curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" ; \ + fi # older kafka versions depend upon jaxb-api being bundled with the JDK, but it # was removed from Java 11 so work around that by including it in the kafka # libs dir regardless -WORKDIR /tmp RUN curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb-api-2.3.0.jar" \ && for DIR in /opt/kafka-*; do cp -v jaxb-api-2.3.0.jar $DIR/libs/ ; done \ && rm -f jaxb-api-2.3.0.jar +# older kafka versions with the zookeeper 3.4.13 client aren't compatible with Java 17 so quietly bump them to 3.5.9 +RUN [ -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" ] || exit 0 ; \ + rm -f "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.4.13.jar" \ + && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper/3.5.9/zookeeper-3.5.9.jar" \ + && curl --fail -sSL -o "/opt/kafka-${KAFKA_VERSION}/libs/zookeeper-jute-3.5.9.jar" "https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper-jute/3.5.9/zookeeper-jute-3.5.9.jar" + WORKDIR /opt/kafka-${KAFKA_VERSION} -ENV JAVA_MAJOR_VERSION=11 +ENV JAVA_MAJOR_VERSION=17 RUN sed -e "s/JAVA_MAJOR_VERSION=.*/JAVA_MAJOR_VERSION=${JAVA_MAJOR_VERSION}/" -i"" ./bin/kafka-run-class.sh diff --git a/vendor/github.com/IBM/sarama/Makefile b/vendor/github.com/IBM/sarama/Makefile index 7cefc2a2..ba6f46e4 100644 --- a/vendor/github.com/IBM/sarama/Makefile +++ b/vendor/github.com/IBM/sarama/Makefile @@ -9,7 +9,7 @@ FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') $(GOBIN)/tparse: - GOBIN=$(GOBIN) go install github.com/mfridman/tparse@v0.11.1 + GOBIN=$(GOBIN) go install github.com/mfridman/tparse@v0.16.0 get: $(GO) get ./... $(GO) mod verify diff --git a/vendor/github.com/IBM/sarama/admin.go b/vendor/github.com/IBM/sarama/admin.go index 6549c7e6..8aa1f374 100644 --- a/vendor/github.com/IBM/sarama/admin.go +++ b/vendor/github.com/IBM/sarama/admin.go @@ -3,6 +3,7 @@ package sarama import ( "errors" "fmt" + "io" "math/rand" "strconv" "sync" @@ -144,6 +145,10 @@ type ClusterAdmin interface { // locally cached value if it's available. Controller() (*Broker, error) + // Coordinator returns the coordinating broker for a consumer group. It will + // return a locally cached value if it's available. + Coordinator(group string) (*Broker, error) + // Remove members from the consumer group by given member identities. // This operation is supported by brokers with version 2.3 or higher // This is for static membership feature. KIP-345 @@ -195,14 +200,25 @@ func (ca *clusterAdmin) Controller() (*Broker, error) { return ca.client.Controller() } +func (ca *clusterAdmin) Coordinator(group string) (*Broker, error) { + return ca.client.Coordinator(group) +} + func (ca *clusterAdmin) refreshController() (*Broker, error) { return ca.client.RefreshController() } -// isErrNotController returns `true` if the given error type unwraps to an -// `ErrNotController` response from Kafka -func isErrNotController(err error) bool { - return errors.Is(err, ErrNotController) +// isRetriableControllerError returns `true` if the given error type unwraps to +// an `ErrNotController` or `EOF` response from Kafka +func isRetriableControllerError(err error) bool { + return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) +} + +// isRetriableGroupCoordinatorError returns `true` if the given error type +// unwraps to an `ErrNotCoordinatorForConsumer`, +// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka +func isRetriableGroupCoordinatorError(err error) bool { + return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) } // retryOnError will repeatedly call the given (error-returning) func in the @@ -252,7 +268,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -269,7 +285,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO } if !errors.Is(topicErr.Err, ErrNoError) { - if errors.Is(topicErr.Err, ErrNotController) { + if isRetriableControllerError(topicErr.Err) { _, _ = ca.refreshController() } return topicErr @@ -281,14 +297,14 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err } request := NewMetadataRequest(ca.conf.Version, topics) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -301,7 +317,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err @@ -309,7 +325,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 request := NewMetadataRequest(ca.conf.Version, nil) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -441,7 +457,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -485,7 +501,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -526,7 +542,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ request.AddBlock(topic, int32(i), assignment[i]) } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -573,7 +589,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) var rsp *ListPartitionReassignmentsResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -581,7 +597,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in _ = b.Open(ca.client.Config()) rsp, err = b.ListPartitionReassignments(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -924,7 +940,7 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s } var res *ElectLeadersResponse - err := ca.retryOnError(isErrNotController, func() error { + if err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -932,12 +948,17 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s _ = b.Open(ca.client.Config()) res, err = b.ElectLeaders(request) - if isErrNotController(err) { - _, _ = ca.refreshController() + if err != nil { + return err } - return err - }) - if err != nil { + if !errors.Is(res.ErrorCode, ErrNoError) { + if isRetriableControllerError(res.ErrorCode) { + _, _ = ca.refreshController() + } + return res.ErrorCode + } + return nil + }); err != nil { return nil, err } return res.ReplicaElectionResults, nil @@ -947,11 +968,11 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group groupsPerBroker := make(map[*Broker][]string) for _, group := range groups { - controller, err := ca.client.Coordinator(group) + coordinator, err := ca.client.Coordinator(group) if err != nil { return nil, err } - groupsPerBroker[controller] = append(groupsPerBroker[controller], group) + groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group) } for broker, brokerGroups := range groupsPerBroker { @@ -1043,22 +1064,36 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e } func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return nil, err - } - + var response *OffsetFetchResponse request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - return coordinator.FetchOffset(request) + response, err = coordinator.FetchOffset(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + return response.Err + } + + return nil + }) + + return response, err } func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteOffsetsResponse request := &DeleteOffsetsRequest{ Group: group, partitions: map[string][]int32{ @@ -1066,27 +1101,35 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa }, } - resp, err := coordinator.DeleteOffsets(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - if !errors.Is(resp.ErrorCode, ErrNoError) { - return resp.ErrorCode - } + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - if !errors.Is(resp.Errors[topic][partition], ErrNoError) { - return resp.Errors[topic][partition] - } - return nil + response, err = coordinator.DeleteOffsets(request) + if err != nil { + return err + } + if !errors.Is(response.ErrorCode, ErrNoError) { + return response.ErrorCode + } + if !errors.Is(response.Errors[topic][partition], ErrNoError) { + return response.Errors[topic][partition] + } + + return nil + }) } func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteGroupsResponse request := &DeleteGroupsRequest{ Groups: []string{group}, } @@ -1094,21 +1137,34 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request.Version = 1 } - resp, err := coordinator.DeleteGroups(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - groupErr, ok := resp.GroupErrorCodes[group] - if !ok { - return ErrIncompleteResponse - } + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - if !errors.Is(groupErr, ErrNoError) { - return groupErr - } + response, err = coordinator.DeleteGroups(request) + if err != nil { + return err + } - return nil + groupErr, ok := response.GroupErrorCodes[group] + if !ok { + return ErrIncompleteResponse + } + + if !errors.Is(groupErr, ErrNoError) { + return groupErr + } + + return nil + }) } func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) { @@ -1206,7 +1262,7 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU } var rsp *AlterUserScramCredentialsResponse - err := ca.retryOnError(isErrNotController, func() error { + err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -1284,18 +1340,14 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie return nil } -func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) { +func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) { if !ca.conf.Version.IsAtLeast(V2_4_0_0) { return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0") } - - controller, err := ca.client.Coordinator(groupId) - if err != nil { - return nil, err - } + var response *LeaveGroupResponse request := &LeaveGroupRequest{ Version: 3, - GroupId: groupId, + GroupId: group, } for _, instanceId := range groupInstanceIds { groupInstanceId := instanceId @@ -1303,5 +1355,28 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInsta GroupInstanceId: &groupInstanceId, }) } - return controller.LeaveGroup(request) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + + response, err = coordinator.LeaveGroup(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + return response.Err + } + + return nil + }) + + return response, err } diff --git a/vendor/github.com/IBM/sarama/balance_strategy.go b/vendor/github.com/IBM/sarama/balance_strategy.go index 30d41779..b5bc30a1 100644 --- a/vendor/github.com/IBM/sarama/balance_strategy.go +++ b/vendor/github.com/IBM/sarama/balance_strategy.go @@ -989,6 +989,7 @@ func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicParti return reversePairPartition } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) { if src == dst { return currentPath, false @@ -1023,6 +1024,7 @@ func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, cur return currentPath, false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { superCycle := make([]string, len(cycle)-1) for i := 0; i < len(cycle)-1; i++ { @@ -1037,6 +1039,7 @@ func (p *partitionMovements) in(cycle []string, cycles [][]string) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { cycles := make([][]string, 0) for _, pair := range pairs { @@ -1068,6 +1071,7 @@ func (p *partitionMovements) hasCycles(pairs []consumerPair) bool { return false } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func (p *partitionMovements) isSticky() bool { for topic, movements := range p.PartitionMovementsByTopic { movementPairs := make([]consumerPair, len(movements)) @@ -1085,6 +1089,7 @@ func (p *partitionMovements) isSticky() bool { return true } +//nolint:unused // this is used but only in unittests as a helper (which are excluded by the integration build tag) func indexOfSubList(source []string, target []string) int { targetSize := len(target) maxCandidate := len(source) - targetSize diff --git a/vendor/github.com/IBM/sarama/create_topics_request.go b/vendor/github.com/IBM/sarama/create_topics_request.go index 8382d17c..e8c0f014 100644 --- a/vendor/github.com/IBM/sarama/create_topics_request.go +++ b/vendor/github.com/IBM/sarama/create_topics_request.go @@ -16,6 +16,21 @@ type CreateTopicsRequest struct { ValidateOnly bool } +func NewCreateTopicsRequest(version KafkaVersion, topicDetails map[string]*TopicDetail, timeout time.Duration) *CreateTopicsRequest { + r := &CreateTopicsRequest{ + TopicDetails: topicDetails, + Timeout: timeout, + } + if version.IsAtLeast(V2_0_0_0) { + r.Version = 3 + } else if version.IsAtLeast(V0_11_0_0) { + r.Version = 2 + } else if version.IsAtLeast(V0_10_2_0) { + r.Version = 1 + } + return r +} + func (c *CreateTopicsRequest) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(c.TopicDetails)); err != nil { return err diff --git a/vendor/github.com/IBM/sarama/delete_topics_request.go b/vendor/github.com/IBM/sarama/delete_topics_request.go index 252c0d02..f38f3277 100644 --- a/vendor/github.com/IBM/sarama/delete_topics_request.go +++ b/vendor/github.com/IBM/sarama/delete_topics_request.go @@ -8,6 +8,21 @@ type DeleteTopicsRequest struct { Timeout time.Duration } +func NewDeleteTopicsRequest(version KafkaVersion, topics []string, timeout time.Duration) *DeleteTopicsRequest { + d := &DeleteTopicsRequest{ + Topics: topics, + Timeout: timeout, + } + if version.IsAtLeast(V2_1_0_0) { + d.Version = 3 + } else if version.IsAtLeast(V2_0_0_0) { + d.Version = 2 + } else if version.IsAtLeast(V0_11_0_0) { + d.Version = 1 + } + return d +} + func (d *DeleteTopicsRequest) encode(pe packetEncoder) error { if err := pe.putStringArray(d.Topics); err != nil { return err diff --git a/vendor/github.com/IBM/sarama/docker-compose.yml b/vendor/github.com/IBM/sarama/docker-compose.yml index a0e3d2e2..1e66cca0 100644 --- a/vendor/github.com/IBM/sarama/docker-compose.yml +++ b/vendor/github.com/IBM/sarama/docker-compose.yml @@ -1,6 +1,6 @@ services: zookeeper-1: - hostname: 'zookeeper-1' + container_name: 'zookeeper-1' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -13,7 +13,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-2: - hostname: 'zookeeper-2' + container_name: 'zookeeper-2' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -26,7 +26,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' zookeeper-3: - hostname: 'zookeeper-3' + container_name: 'zookeeper-3' image: 'docker.io/library/zookeeper:3.7.2' init: true restart: always @@ -39,7 +39,7 @@ services: ZOO_MAX_CLIENT_CNXNS: '0' ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' kafka-1: - hostname: 'kafka-1' + container_name: 'kafka-1' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -74,6 +74,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '1' KAFKA_CFG_BROKER_RACK: '1' @@ -85,7 +86,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-2: - hostname: 'kafka-2' + container_name: 'kafka-2' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -120,6 +121,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '2' KAFKA_CFG_BROKER_RACK: '2' @@ -131,7 +133,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-3: - hostname: 'kafka-3' + container_name: 'kafka-3' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -166,6 +168,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '3' KAFKA_CFG_BROKER_RACK: '3' @@ -177,7 +180,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-4: - hostname: 'kafka-4' + container_name: 'kafka-4' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -212,6 +215,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '4' KAFKA_CFG_BROKER_RACK: '4' @@ -223,7 +227,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-5: - hostname: 'kafka-5' + container_name: 'kafka-5' image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.2}' init: true build: @@ -258,6 +262,7 @@ services: KAFKA_CFG_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: '2' KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '2' KAFKA_CFG_BROKER_ID: '5' KAFKA_CFG_BROKER_RACK: '5' @@ -269,7 +274,7 @@ services: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" toxiproxy: - hostname: 'toxiproxy' + container_name: 'toxiproxy' image: 'ghcr.io/shopify/toxiproxy:2.4.0' init: true healthcheck: diff --git a/vendor/github.com/IBM/sarama/errors.go b/vendor/github.com/IBM/sarama/errors.go index 2c431aec..842d3025 100644 --- a/vendor/github.com/IBM/sarama/errors.go +++ b/vendor/github.com/IBM/sarama/errors.go @@ -304,7 +304,7 @@ func (err KError) Error() string { case ErrOffsetsLoadInProgress: return "kafka server: The coordinator is still loading offsets and cannot currently process requests" case ErrConsumerCoordinatorNotAvailable: - return "kafka server: Offset's topic has not yet been created" + return "kafka server: The coordinator is not available" case ErrNotCoordinatorForConsumer: return "kafka server: Request was for a consumer group that is not coordinated by this broker" case ErrInvalidTopic: diff --git a/vendor/github.com/IBM/sarama/server.properties b/vendor/github.com/IBM/sarama/server.properties new file mode 100644 index 00000000..21ba1c7d --- /dev/null +++ b/vendor/github.com/IBM/sarama/server.properties @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required. +# See kafka.server.KafkaConfig for additional details and defaults +# + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. If not configured, the host name will be equal to the value of +# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +#listeners=PLAINTEXT://:9092 + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +#advertised.listeners=PLAINTEXT://your.host.name:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs=/tmp/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=18000 + + +############################# Group Coordinator Settings ############################# + +# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. +# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. +# The default value for this is 3 seconds. +# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. +# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. +group.initial.rebalance.delay.ms=0 diff --git a/vendor/github.com/IBM/sarama/utils.go b/vendor/github.com/IBM/sarama/utils.go index d5b77e0d..b0e1acef 100644 --- a/vendor/github.com/IBM/sarama/utils.go +++ b/vendor/github.com/IBM/sarama/utils.go @@ -206,6 +206,7 @@ var ( V3_8_0_0 = newKafkaVersion(3, 8, 0, 0) V3_8_1_0 = newKafkaVersion(3, 8, 1, 0) V3_9_0_0 = newKafkaVersion(3, 9, 0, 0) + V4_0_0_0 = newKafkaVersion(4, 0, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -277,9 +278,10 @@ var ( V3_8_0_0, V3_8_1_0, V3_9_0_0, + V4_0_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V3_9_0_0 + MaxVersion = V4_0_0_0 DefaultVersion = V2_1_0_0 // reduced set of protocol versions to matrix test diff --git a/vendor/github.com/klauspost/compress/.goreleaser.yml b/vendor/github.com/klauspost/compress/.goreleaser.yml index a2295380..4528059c 100644 --- a/vendor/github.com/klauspost/compress/.goreleaser.yml +++ b/vendor/github.com/klauspost/compress/.goreleaser.yml @@ -1,5 +1,5 @@ -# This is an example goreleaser.yaml file with some sane defaults. -# Make sure to check the documentation at http://goreleaser.com +version: 2 + before: hooks: - ./gen.sh @@ -99,7 +99,7 @@ archives: checksum: name_template: 'checksums.txt' snapshot: - name_template: "{{ .Tag }}-next" + version_template: "{{ .Tag }}-next" changelog: sort: asc filters: diff --git a/vendor/github.com/klauspost/compress/README.md b/vendor/github.com/klauspost/compress/README.md index 05c7359e..de264c85 100644 --- a/vendor/github.com/klauspost/compress/README.md +++ b/vendor/github.com/klauspost/compress/README.md @@ -16,6 +16,27 @@ This package provides various compression algorithms. # changelog +* Sep 23rd, 2024 - [1.17.10](https://github.com/klauspost/compress/releases/tag/v1.17.10) + * gzhttp: Add TransportAlwaysDecompress option. https://github.com/klauspost/compress/pull/978 + * gzhttp: Add supported decompress request body by @mirecl in https://github.com/klauspost/compress/pull/1002 + * s2: Add EncodeBuffer buffer recycling callback https://github.com/klauspost/compress/pull/982 + * zstd: Improve memory usage on small streaming encodes https://github.com/klauspost/compress/pull/1007 + * flate: read data written with partial flush by @vajexal in https://github.com/klauspost/compress/pull/996 + +* Jun 12th, 2024 - [1.17.9](https://github.com/klauspost/compress/releases/tag/v1.17.9) + * s2: Reduce ReadFrom temporary allocations https://github.com/klauspost/compress/pull/949 + * flate, zstd: Shave some bytes off amd64 matchLen by @greatroar in https://github.com/klauspost/compress/pull/963 + * Upgrade zip/zlib to 1.22.4 upstream https://github.com/klauspost/compress/pull/970 https://github.com/klauspost/compress/pull/971 + * zstd: BuildDict fails with RLE table https://github.com/klauspost/compress/pull/951 + +* Apr 9th, 2024 - [1.17.8](https://github.com/klauspost/compress/releases/tag/v1.17.8) + * zstd: Reject blocks where reserved values are not 0 https://github.com/klauspost/compress/pull/885 + * zstd: Add RLE detection+encoding https://github.com/klauspost/compress/pull/938 + +* Feb 21st, 2024 - [1.17.7](https://github.com/klauspost/compress/releases/tag/v1.17.7) + * s2: Add AsyncFlush method: Complete the block without flushing by @Jille in https://github.com/klauspost/compress/pull/927 + * s2: Fix literal+repeat exceeds dst crash https://github.com/klauspost/compress/pull/930 + * Feb 5th, 2024 - [1.17.6](https://github.com/klauspost/compress/releases/tag/v1.17.6) * zstd: Fix incorrect repeat coding in best mode https://github.com/klauspost/compress/pull/923 * s2: Fix DecodeConcurrent deadlock on errors https://github.com/klauspost/compress/pull/925 @@ -81,7 +102,7 @@ https://github.com/klauspost/compress/pull/919 https://github.com/klauspost/comp * zstd: Various minor improvements by @greatroar in https://github.com/klauspost/compress/pull/788 https://github.com/klauspost/compress/pull/794 https://github.com/klauspost/compress/pull/795 * s2: Fix huge block overflow https://github.com/klauspost/compress/pull/779 * s2: Allow CustomEncoder fallback https://github.com/klauspost/compress/pull/780 - * gzhttp: Suppport ResponseWriter Unwrap() in gzhttp handler by @jgimenez in https://github.com/klauspost/compress/pull/799 + * gzhttp: Support ResponseWriter Unwrap() in gzhttp handler by @jgimenez in https://github.com/klauspost/compress/pull/799 * Mar 13, 2023 - [v1.16.1](https://github.com/klauspost/compress/releases/tag/v1.16.1) * zstd: Speed up + improve best encoder by @greatroar in https://github.com/klauspost/compress/pull/776 @@ -136,7 +157,7 @@ https://github.com/klauspost/compress/pull/919 https://github.com/klauspost/comp * zstd: Add [WithDecodeAllCapLimit](https://pkg.go.dev/github.com/klauspost/compress@v1.15.10/zstd#WithDecodeAllCapLimit) https://github.com/klauspost/compress/pull/649 * Add Go 1.19 - deprecate Go 1.16 https://github.com/klauspost/compress/pull/651 * flate: Improve level 5+6 compression https://github.com/klauspost/compress/pull/656 - * zstd: Improve "better" compresssion https://github.com/klauspost/compress/pull/657 + * zstd: Improve "better" compression https://github.com/klauspost/compress/pull/657 * s2: Improve "best" compression https://github.com/klauspost/compress/pull/658 * s2: Improve "better" compression. https://github.com/klauspost/compress/pull/635 * s2: Slightly faster non-assembly decompression https://github.com/klauspost/compress/pull/646 @@ -339,7 +360,7 @@ While the release has been extensively tested, it is recommended to testing when * s2: Fix binaries. * Feb 25, 2021 (v1.11.8) - * s2: Fixed occational out-of-bounds write on amd64. Upgrade recommended. + * s2: Fixed occasional out-of-bounds write on amd64. Upgrade recommended. * s2: Add AMD64 assembly for better mode. 25-50% faster. [#315](https://github.com/klauspost/compress/pull/315) * s2: Less upfront decoder allocation. [#322](https://github.com/klauspost/compress/pull/322) * zstd: Faster "compression" of incompressible data. [#314](https://github.com/klauspost/compress/pull/314) @@ -518,7 +539,7 @@ While the release has been extensively tested, it is recommended to testing when * Feb 19, 2016: Faster bit writer, level -2 is 15% faster, level 1 is 4% faster. * Feb 19, 2016: Handle small payloads faster in level 1-3. * Feb 19, 2016: Added faster level 2 + 3 compression modes. -* Feb 19, 2016: [Rebalanced compression levels](https://blog.klauspost.com/rebalancing-deflate-compression-levels/), so there is a more even progresssion in terms of compression. New default level is 5. +* Feb 19, 2016: [Rebalanced compression levels](https://blog.klauspost.com/rebalancing-deflate-compression-levels/), so there is a more even progression in terms of compression. New default level is 5. * Feb 14, 2016: Snappy: Merge upstream changes. * Feb 14, 2016: Snappy: Fix aggressive skipping. * Feb 14, 2016: Snappy: Update benchmark. diff --git a/vendor/github.com/klauspost/compress/flate/deflate.go b/vendor/github.com/klauspost/compress/flate/deflate.go index 66d1657d..af53fb86 100644 --- a/vendor/github.com/klauspost/compress/flate/deflate.go +++ b/vendor/github.com/klauspost/compress/flate/deflate.go @@ -861,7 +861,7 @@ func (d *compressor) reset(w io.Writer) { } switch d.compressionLevel.chain { case 0: - // level was NoCompression or ConstantCompresssion. + // level was NoCompression or ConstantCompression. d.windowEnd = 0 default: s := d.state diff --git a/vendor/github.com/klauspost/compress/flate/inflate.go b/vendor/github.com/klauspost/compress/flate/inflate.go index 2f410d64..0d7b437f 100644 --- a/vendor/github.com/klauspost/compress/flate/inflate.go +++ b/vendor/github.com/klauspost/compress/flate/inflate.go @@ -298,6 +298,14 @@ const ( huffmanGenericReader ) +// flushMode tells decompressor when to return data +type flushMode uint8 + +const ( + syncFlush flushMode = iota // return data after sync flush block + partialFlush // return data after each block +) + // Decompress state. type decompressor struct { // Input source. @@ -332,6 +340,8 @@ type decompressor struct { nb uint final bool + + flushMode flushMode } func (f *decompressor) nextBlock() { @@ -618,7 +628,10 @@ func (f *decompressor) dataBlock() { } if n == 0 { - f.toRead = f.dict.readFlush() + if f.flushMode == syncFlush { + f.toRead = f.dict.readFlush() + } + f.finishBlock() return } @@ -657,8 +670,12 @@ func (f *decompressor) finishBlock() { if f.dict.availRead() > 0 { f.toRead = f.dict.readFlush() } + f.err = io.EOF + } else if f.flushMode == partialFlush && f.dict.availRead() > 0 { + f.toRead = f.dict.readFlush() } + f.step = nextBlock } @@ -789,15 +806,25 @@ func (f *decompressor) Reset(r io.Reader, dict []byte) error { return nil } -// NewReader returns a new ReadCloser that can be used -// to read the uncompressed version of r. -// If r does not also implement io.ByteReader, -// the decompressor may read more data than necessary from r. -// It is the caller's responsibility to call Close on the ReadCloser -// when finished reading. -// -// The ReadCloser returned by NewReader also implements Resetter. -func NewReader(r io.Reader) io.ReadCloser { +type ReaderOpt func(*decompressor) + +// WithPartialBlock tells decompressor to return after each block, +// so it can read data written with partial flush +func WithPartialBlock() ReaderOpt { + return func(f *decompressor) { + f.flushMode = partialFlush + } +} + +// WithDict initializes the reader with a preset dictionary +func WithDict(dict []byte) ReaderOpt { + return func(f *decompressor) { + f.dict.init(maxMatchOffset, dict) + } +} + +// NewReaderOpts returns new reader with provided options +func NewReaderOpts(r io.Reader, opts ...ReaderOpt) io.ReadCloser { fixedHuffmanDecoderInit() var f decompressor @@ -806,9 +833,26 @@ func NewReader(r io.Reader) io.ReadCloser { f.codebits = new([numCodes]int) f.step = nextBlock f.dict.init(maxMatchOffset, nil) + + for _, opt := range opts { + opt(&f) + } + return &f } +// NewReader returns a new ReadCloser that can be used +// to read the uncompressed version of r. +// If r does not also implement io.ByteReader, +// the decompressor may read more data than necessary from r. +// It is the caller's responsibility to call Close on the ReadCloser +// when finished reading. +// +// The ReadCloser returned by NewReader also implements Resetter. +func NewReader(r io.Reader) io.ReadCloser { + return NewReaderOpts(r) +} + // NewReaderDict is like NewReader but initializes the reader // with a preset dictionary. The returned Reader behaves as if // the uncompressed data stream started with the given dictionary, @@ -817,13 +861,5 @@ func NewReader(r io.Reader) io.ReadCloser { // // The ReadCloser returned by NewReader also implements Resetter. func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser { - fixedHuffmanDecoderInit() - - var f decompressor - f.r = makeReader(r) - f.bits = new([maxNumLit + maxNumDist]int) - f.codebits = new([numCodes]int) - f.step = nextBlock - f.dict.init(maxMatchOffset, dict) - return &f + return NewReaderOpts(r, WithDict(dict)) } diff --git a/vendor/github.com/klauspost/compress/fse/decompress.go b/vendor/github.com/klauspost/compress/fse/decompress.go index cc05d0f7..0c7dd4ff 100644 --- a/vendor/github.com/klauspost/compress/fse/decompress.go +++ b/vendor/github.com/klauspost/compress/fse/decompress.go @@ -15,7 +15,7 @@ const ( // It is possible, but by no way guaranteed that corrupt data will // return an error. // It is up to the caller to verify integrity of the returned data. -// Use a predefined Scrach to set maximum acceptable output size. +// Use a predefined Scratch to set maximum acceptable output size. func Decompress(b []byte, s *Scratch) ([]byte, error) { s, err := s.prepare(b) if err != nil { diff --git a/vendor/github.com/klauspost/compress/huff0/decompress.go b/vendor/github.com/klauspost/compress/huff0/decompress.go index 54bd08b2..0f56b02d 100644 --- a/vendor/github.com/klauspost/compress/huff0/decompress.go +++ b/vendor/github.com/klauspost/compress/huff0/decompress.go @@ -1136,7 +1136,7 @@ func (s *Scratch) matches(ct cTable, w io.Writer) { errs++ } if errs > 0 { - fmt.Fprintf(w, "%d errros in base, stopping\n", errs) + fmt.Fprintf(w, "%d errors in base, stopping\n", errs) continue } // Ensure that all combinations are covered. @@ -1152,7 +1152,7 @@ func (s *Scratch) matches(ct cTable, w io.Writer) { errs++ } if errs > 20 { - fmt.Fprintf(w, "%d errros, stopping\n", errs) + fmt.Fprintf(w, "%d errors, stopping\n", errs) break } } diff --git a/vendor/github.com/klauspost/compress/zstd/blockdec.go b/vendor/github.com/klauspost/compress/zstd/blockdec.go index 03744fbc..9c28840c 100644 --- a/vendor/github.com/klauspost/compress/zstd/blockdec.go +++ b/vendor/github.com/klauspost/compress/zstd/blockdec.go @@ -598,7 +598,9 @@ func (b *blockDec) prepareSequences(in []byte, hist *history) (err error) { printf("RLE set to 0x%x, code: %v", symb, v) } case compModeFSE: - println("Reading table for", tableIndex(i)) + if debugDecoder { + println("Reading table for", tableIndex(i)) + } if seq.fse == nil || seq.fse.preDefined { seq.fse = fseDecoderPool.Get().(*fseDecoder) } diff --git a/vendor/github.com/klauspost/compress/zstd/enc_better.go b/vendor/github.com/klauspost/compress/zstd/enc_better.go index a4f5bf91..84a79fde 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_better.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_better.go @@ -179,9 +179,9 @@ encodeLoop: if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) { // Consider history as well. var seq seq - lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src) + length := 4 + e.matchlen(s+4+repOff, repIndex+4, src) - seq.matchLen = uint32(lenght - zstdMinMatch) + seq.matchLen = uint32(length - zstdMinMatch) // We might be able to match backwards. // Extend as long as we can. @@ -210,12 +210,12 @@ encodeLoop: // Index match start+1 (long) -> s - 1 index0 := s + repOff - s += lenght + repOff + s += length + repOff nextEmit = s if s >= sLimit { if debugEncoder { - println("repeat ended", s, lenght) + println("repeat ended", s, length) } break encodeLoop @@ -241,9 +241,9 @@ encodeLoop: if false && repIndex >= 0 && load6432(src, repIndex) == load6432(src, s+repOff) { // Consider history as well. var seq seq - lenght := 8 + e.matchlen(s+8+repOff2, repIndex+8, src) + length := 8 + e.matchlen(s+8+repOff2, repIndex+8, src) - seq.matchLen = uint32(lenght - zstdMinMatch) + seq.matchLen = uint32(length - zstdMinMatch) // We might be able to match backwards. // Extend as long as we can. @@ -270,11 +270,11 @@ encodeLoop: } blk.sequences = append(blk.sequences, seq) - s += lenght + repOff2 + s += length + repOff2 nextEmit = s if s >= sLimit { if debugEncoder { - println("repeat ended", s, lenght) + println("repeat ended", s, length) } break encodeLoop @@ -708,9 +708,9 @@ encodeLoop: if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) { // Consider history as well. var seq seq - lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src) + length := 4 + e.matchlen(s+4+repOff, repIndex+4, src) - seq.matchLen = uint32(lenght - zstdMinMatch) + seq.matchLen = uint32(length - zstdMinMatch) // We might be able to match backwards. // Extend as long as we can. @@ -738,12 +738,12 @@ encodeLoop: blk.sequences = append(blk.sequences, seq) // Index match start+1 (long) -> s - 1 - s += lenght + repOff + s += length + repOff nextEmit = s if s >= sLimit { if debugEncoder { - println("repeat ended", s, lenght) + println("repeat ended", s, length) } break encodeLoop @@ -772,9 +772,9 @@ encodeLoop: if false && repIndex >= 0 && load6432(src, repIndex) == load6432(src, s+repOff) { // Consider history as well. var seq seq - lenght := 8 + e.matchlen(s+8+repOff2, repIndex+8, src) + length := 8 + e.matchlen(s+8+repOff2, repIndex+8, src) - seq.matchLen = uint32(lenght - zstdMinMatch) + seq.matchLen = uint32(length - zstdMinMatch) // We might be able to match backwards. // Extend as long as we can. @@ -801,11 +801,11 @@ encodeLoop: } blk.sequences = append(blk.sequences, seq) - s += lenght + repOff2 + s += length + repOff2 nextEmit = s if s >= sLimit { if debugEncoder { - println("repeat ended", s, lenght) + println("repeat ended", s, length) } break encodeLoop diff --git a/vendor/github.com/klauspost/compress/zstd/enc_dfast.go b/vendor/github.com/klauspost/compress/zstd/enc_dfast.go index a154c18f..d36be7bd 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_dfast.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_dfast.go @@ -138,9 +138,9 @@ encodeLoop: if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) { // Consider history as well. var seq seq - lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src) + length := 4 + e.matchlen(s+4+repOff, repIndex+4, src) - seq.matchLen = uint32(lenght - zstdMinMatch) + seq.matchLen = uint32(length - zstdMinMatch) // We might be able to match backwards. // Extend as long as we can. @@ -166,11 +166,11 @@ encodeLoop: println("repeat sequence", seq, "next s:", s) } blk.sequences = append(blk.sequences, seq) - s += lenght + repOff + s += length + repOff nextEmit = s if s >= sLimit { if debugEncoder { - println("repeat ended", s, lenght) + println("repeat ended", s, length) } break encodeLoop @@ -798,9 +798,9 @@ encodeLoop: if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) { // Consider history as well. var seq seq - lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src) + length := 4 + e.matchlen(s+4+repOff, repIndex+4, src) - seq.matchLen = uint32(lenght - zstdMinMatch) + seq.matchLen = uint32(length - zstdMinMatch) // We might be able to match backwards. // Extend as long as we can. @@ -826,11 +826,11 @@ encodeLoop: println("repeat sequence", seq, "next s:", s) } blk.sequences = append(blk.sequences, seq) - s += lenght + repOff + s += length + repOff nextEmit = s if s >= sLimit { if debugEncoder { - println("repeat ended", s, lenght) + println("repeat ended", s, length) } break encodeLoop diff --git a/vendor/github.com/klauspost/compress/zstd/encoder.go b/vendor/github.com/klauspost/compress/zstd/encoder.go index 72af7ef0..8f8223cd 100644 --- a/vendor/github.com/klauspost/compress/zstd/encoder.go +++ b/vendor/github.com/klauspost/compress/zstd/encoder.go @@ -6,6 +6,7 @@ package zstd import ( "crypto/rand" + "errors" "fmt" "io" "math" @@ -149,6 +150,9 @@ func (e *Encoder) ResetContentSize(w io.Writer, size int64) { // and write CRC if requested. func (e *Encoder) Write(p []byte) (n int, err error) { s := &e.state + if s.eofWritten { + return 0, ErrEncoderClosed + } for len(p) > 0 { if len(p)+len(s.filling) < e.o.blockSize { if e.o.crc { @@ -202,7 +206,7 @@ func (e *Encoder) nextBlock(final bool) error { return nil } if final && len(s.filling) > 0 { - s.current = e.EncodeAll(s.filling, s.current[:0]) + s.current = e.encodeAll(s.encoder, s.filling, s.current[:0]) var n2 int n2, s.err = s.w.Write(s.current) if s.err != nil { @@ -288,6 +292,9 @@ func (e *Encoder) nextBlock(final bool) error { s.filling, s.current, s.previous = s.previous[:0], s.filling, s.current s.nInput += int64(len(s.current)) s.wg.Add(1) + if final { + s.eofWritten = true + } go func(src []byte) { if debugEncoder { println("Adding block,", len(src), "bytes, final:", final) @@ -303,9 +310,6 @@ func (e *Encoder) nextBlock(final bool) error { blk := enc.Block() enc.Encode(blk, src) blk.last = final - if final { - s.eofWritten = true - } // Wait for pending writes. s.wWg.Wait() if s.writeErr != nil { @@ -401,12 +405,20 @@ func (e *Encoder) Flush() error { if len(s.filling) > 0 { err := e.nextBlock(false) if err != nil { + // Ignore Flush after Close. + if errors.Is(s.err, ErrEncoderClosed) { + return nil + } return err } } s.wg.Wait() s.wWg.Wait() if s.err != nil { + // Ignore Flush after Close. + if errors.Is(s.err, ErrEncoderClosed) { + return nil + } return s.err } return s.writeErr @@ -422,6 +434,9 @@ func (e *Encoder) Close() error { } err := e.nextBlock(true) if err != nil { + if errors.Is(s.err, ErrEncoderClosed) { + return nil + } return err } if s.frameContentSize > 0 { @@ -459,6 +474,11 @@ func (e *Encoder) Close() error { } _, s.err = s.w.Write(frame) } + if s.err == nil { + s.err = ErrEncoderClosed + return nil + } + return s.err } @@ -469,6 +489,15 @@ func (e *Encoder) Close() error { // Data compressed with EncodeAll can be decoded with the Decoder, // using either a stream or DecodeAll. func (e *Encoder) EncodeAll(src, dst []byte) []byte { + e.init.Do(e.initialize) + enc := <-e.encoders + defer func() { + e.encoders <- enc + }() + return e.encodeAll(enc, src, dst) +} + +func (e *Encoder) encodeAll(enc encoder, src, dst []byte) []byte { if len(src) == 0 { if e.o.fullZero { // Add frame header. @@ -491,13 +520,7 @@ func (e *Encoder) EncodeAll(src, dst []byte) []byte { } return dst } - e.init.Do(e.initialize) - enc := <-e.encoders - defer func() { - // Release encoder reference to last block. - // If a non-single block is needed the encoder will reset again. - e.encoders <- enc - }() + // Use single segments when above minimum window and below window size. single := len(src) <= e.o.windowSize && len(src) > MinWindowSize if e.o.single != nil { diff --git a/vendor/github.com/klauspost/compress/zstd/framedec.go b/vendor/github.com/klauspost/compress/zstd/framedec.go index 53e160f7..e47af66e 100644 --- a/vendor/github.com/klauspost/compress/zstd/framedec.go +++ b/vendor/github.com/klauspost/compress/zstd/framedec.go @@ -146,7 +146,9 @@ func (d *frameDec) reset(br byteBuffer) error { } return err } - printf("raw: %x, mantissa: %d, exponent: %d\n", wd, wd&7, wd>>3) + if debugDecoder { + printf("raw: %x, mantissa: %d, exponent: %d\n", wd, wd&7, wd>>3) + } windowLog := 10 + (wd >> 3) windowBase := uint64(1) << windowLog windowAdd := (windowBase / 8) * uint64(wd&0x7) diff --git a/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go b/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go index 8adabd82..c59f17e0 100644 --- a/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go +++ b/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go @@ -146,7 +146,7 @@ func (s *sequenceDecs) decodeSyncSimple(hist []byte) (bool, error) { return true, fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) default: - return true, fmt.Errorf("sequenceDecs_decode returned erronous code %d", errCode) + return true, fmt.Errorf("sequenceDecs_decode returned erroneous code %d", errCode) } s.seqSize += ctx.litRemain @@ -292,7 +292,7 @@ func (s *sequenceDecs) decode(seqs []seqVals) error { return io.ErrUnexpectedEOF } - return fmt.Errorf("sequenceDecs_decode_amd64 returned erronous code %d", errCode) + return fmt.Errorf("sequenceDecs_decode_amd64 returned erroneous code %d", errCode) } if ctx.litRemain < 0 { diff --git a/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.s b/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.s index 5b06174b..f5591fa1 100644 --- a/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.s +++ b/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.s @@ -1814,7 +1814,7 @@ TEXT ·sequenceDecs_decodeSync_amd64(SB), $64-32 MOVQ 40(SP), AX ADDQ AX, 48(SP) - // Calculate poiter to s.out[cap(s.out)] (a past-end pointer) + // Calculate pointer to s.out[cap(s.out)] (a past-end pointer) ADDQ R10, 32(SP) // outBase += outPosition @@ -2376,7 +2376,7 @@ TEXT ·sequenceDecs_decodeSync_bmi2(SB), $64-32 MOVQ 40(SP), CX ADDQ CX, 48(SP) - // Calculate poiter to s.out[cap(s.out)] (a past-end pointer) + // Calculate pointer to s.out[cap(s.out)] (a past-end pointer) ADDQ R9, 32(SP) // outBase += outPosition @@ -2896,7 +2896,7 @@ TEXT ·sequenceDecs_decodeSync_safe_amd64(SB), $64-32 MOVQ 40(SP), AX ADDQ AX, 48(SP) - // Calculate poiter to s.out[cap(s.out)] (a past-end pointer) + // Calculate pointer to s.out[cap(s.out)] (a past-end pointer) ADDQ R10, 32(SP) // outBase += outPosition @@ -3560,7 +3560,7 @@ TEXT ·sequenceDecs_decodeSync_safe_bmi2(SB), $64-32 MOVQ 40(SP), CX ADDQ CX, 48(SP) - // Calculate poiter to s.out[cap(s.out)] (a past-end pointer) + // Calculate pointer to s.out[cap(s.out)] (a past-end pointer) ADDQ R9, 32(SP) // outBase += outPosition diff --git a/vendor/github.com/klauspost/compress/zstd/zstd.go b/vendor/github.com/klauspost/compress/zstd/zstd.go index 4be7cc73..066bef2a 100644 --- a/vendor/github.com/klauspost/compress/zstd/zstd.go +++ b/vendor/github.com/klauspost/compress/zstd/zstd.go @@ -88,6 +88,10 @@ var ( // Close has been called. ErrDecoderClosed = errors.New("decoder used after Close") + // ErrEncoderClosed will be returned if the Encoder was used after + // Close has been called. + ErrEncoderClosed = errors.New("encoder used after Close") + // ErrDecoderNilInput is returned when a nil Reader was provided // and an operation other than Reset/DecodeAll/Close was attempted. ErrDecoderNilInput = errors.New("nil input provided as reader") diff --git a/vendor/golang.org/x/sys/unix/syscall_dragonfly.go b/vendor/golang.org/x/sys/unix/syscall_dragonfly.go index 97cb916f..be8c0020 100644 --- a/vendor/golang.org/x/sys/unix/syscall_dragonfly.go +++ b/vendor/golang.org/x/sys/unix/syscall_dragonfly.go @@ -246,6 +246,18 @@ func Sendfile(outfd int, infd int, offset *int64, count int) (written int, err e return sendfile(outfd, infd, offset, count) } +func Dup3(oldfd, newfd, flags int) error { + if oldfd == newfd || flags&^O_CLOEXEC != 0 { + return EINVAL + } + how := F_DUP2FD + if flags&O_CLOEXEC != 0 { + how = F_DUP2FD_CLOEXEC + } + _, err := fcntl(oldfd, how, newfd) + return err +} + /* * Exposed directly */ diff --git a/vendor/golang.org/x/sys/windows/dll_windows.go b/vendor/golang.org/x/sys/windows/dll_windows.go index 4e613cf6..3ca814f5 100644 --- a/vendor/golang.org/x/sys/windows/dll_windows.go +++ b/vendor/golang.org/x/sys/windows/dll_windows.go @@ -43,8 +43,8 @@ type DLL struct { // LoadDLL loads DLL file into memory. // // Warning: using LoadDLL without an absolute path name is subject to -// DLL preloading attacks. To safely load a system DLL, use LazyDLL -// with System set to true, or use LoadLibraryEx directly. +// DLL preloading attacks. To safely load a system DLL, use [NewLazySystemDLL], +// or use [LoadLibraryEx] directly. func LoadDLL(name string) (dll *DLL, err error) { namep, err := UTF16PtrFromString(name) if err != nil { @@ -271,6 +271,9 @@ func (d *LazyDLL) NewProc(name string) *LazyProc { } // NewLazyDLL creates new LazyDLL associated with DLL file. +// +// Warning: using NewLazyDLL without an absolute path name is subject to +// DLL preloading attacks. To safely load a system DLL, use [NewLazySystemDLL]. func NewLazyDLL(name string) *LazyDLL { return &LazyDLL{Name: name} } @@ -410,7 +413,3 @@ func loadLibraryEx(name string, system bool) (*DLL, error) { } return &DLL{Name: name, Handle: h}, nil } - -type errString string - -func (s errString) Error() string { return string(s) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 1dc3c8f9..0e0f27f4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -5,8 +5,8 @@ dario.cat/mergo ## explicit; go 1.16 github.com/Azure/go-ansiterm github.com/Azure/go-ansiterm/winterm -# github.com/IBM/sarama v1.44.0 -## explicit; go 1.20 +# github.com/IBM/sarama v1.45.0 +## explicit; go 1.21 github.com/IBM/sarama # github.com/Microsoft/go-winio v0.6.2 ## explicit; go 1.21 @@ -162,8 +162,8 @@ github.com/jcmturner/gokrb5/v8/types ## explicit; go 1.13 github.com/jcmturner/rpc/v2/mstypes github.com/jcmturner/rpc/v2/ndr -# github.com/klauspost/compress v1.17.9 -## explicit; go 1.20 +# github.com/klauspost/compress v1.17.11 +## explicit; go 1.21 github.com/klauspost/compress github.com/klauspost/compress/flate github.com/klauspost/compress/fse @@ -342,7 +342,7 @@ go.mongodb.org/mongo-driver/x/mongo/driver/operation go.mongodb.org/mongo-driver/x/mongo/driver/session go.mongodb.org/mongo-driver/x/mongo/driver/topology go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage -# golang.org/x/crypto v0.31.0 +# golang.org/x/crypto v0.32.0 ## explicit; go 1.20 golang.org/x/crypto/md4 golang.org/x/crypto/ocsp @@ -352,7 +352,7 @@ golang.org/x/crypto/scrypt ## explicit; go 1.20 golang.org/x/exp/constraints golang.org/x/exp/slices -# golang.org/x/net v0.33.0 +# golang.org/x/net v0.34.0 ## explicit; go 1.18 golang.org/x/net/http2/hpack golang.org/x/net/internal/socks @@ -361,7 +361,7 @@ golang.org/x/net/proxy ## explicit; go 1.18 golang.org/x/sync/errgroup golang.org/x/sync/singleflight -# golang.org/x/sys v0.28.0 +# golang.org/x/sys v0.29.0 ## explicit; go 1.18 golang.org/x/sys/unix golang.org/x/sys/windows