Skip to content
This repository has been archived by the owner on Aug 7, 2023. It is now read-only.

Commit

Permalink
Fetch upstream (#1)
Browse files Browse the repository at this point in the history
* Fix deprecated binaries goreleaser field

* Handle group tombstones

* Update Go dependencies

* Update Alpine image to 3.15

* Prefer http.NoBody to nil body

* Update changelog for v1.4.0

Co-authored-by: Vlad Gorodetsky <[email protected]>
Co-authored-by: Ricky Taylor <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2022
1 parent 3e02ff9 commit 9095bed
Show file tree
Hide file tree
Showing 18 changed files with 203 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
${{ runner.os }}-go-
- name: Install dependencies
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.42.1
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.43.0

- name: Run test suite
run: make test
Expand Down
2 changes: 0 additions & 2 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ dockers:
- goos: linux
goarch: amd64
goarm: ''
binaries:
- burrow
dockerfile: Dockerfile.gorelease
image_templates:
- 'docker.pkg.github.com/linkedin/burrow/burrow:latest'
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 1.4.0

- #734 - @bai - Prefer http.NoBody to nil body
- #733 - @bai - Update Alpine image to 3.15
- #732 - @bai - Update Go dependencies + address recent Go CVEs (1.17.6)
- #727 - @ricky26 - Handle consumer group tombstones

## 1.3.7

- #723 - @bai - Update Go to 1.17.2 and bump golangci-lint
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# stage 1: builder
FROM golang:1.17.2-alpine as builder
FROM golang:1.17.6-alpine as builder

ENV BURROW_SRC /usr/src/Burrow/

Expand All @@ -10,7 +10,7 @@ WORKDIR $BURROW_SRC
RUN go mod tidy && go build -o /tmp/burrow .

# stage 2: runner
FROM alpine:3.14
FROM alpine:3.15

LABEL maintainer="LinkedIn Burrow https://github.com/linkedin/Burrow"

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.gorelease
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM alpine:3.14
FROM alpine:3.15
LABEL maintainer="LinkedIn Burrow https://github.com/linkedin/Burrow"

WORKDIR /app
Expand Down
24 changes: 18 additions & 6 deletions core/internal/consumer/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,6 @@ func (module *KafkaClient) processConsumerOffsetsMessage(msg *sarama.ConsumerMes
zap.Int64("offset_offset", msg.Offset),
)

if len(msg.Value) == 0 {
// Tombstone message - we don't handle them for now
logger.Debug("dropped tombstone")
return
}

var keyver int16
keyBuffer := bytes.NewBuffer(msg.Key)
err := binary.Read(keyBuffer, binary.BigEndian, &keyver)
Expand Down Expand Up @@ -452,6 +446,12 @@ func (module *KafkaClient) decodeKeyAndOffset(offsetOrder int64, keyBuffer *byte
return
}

if len(value) == 0 {
// Tombstone message - we don't handle them for now
logger.Debug("dropped tombstone")
return
}

var valueVersion int16
valueBuffer := bytes.NewBuffer(value)
err := binary.Read(valueBuffer, binary.BigEndian, &valueVersion)
Expand Down Expand Up @@ -513,6 +513,18 @@ func (module *KafkaClient) decodeGroupMetadata(keyBuffer *bytes.Buffer, value []
return
}

if len(value) == 0 {
// Tombstone message - group deleted
logger.Debug("removing consumer group due to tombstone")
deleteMessage := &protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteGroup,
Cluster: module.cluster,
Group: group,
}
helpers.TimeoutSendStorageRequest(module.App.StorageChannel, deleteMessage, 1)
return
}

var valueVersion int16
valueBuffer := bytes.NewBuffer(value)
err = binary.Read(valueBuffer, binary.BigEndian, &valueVersion)
Expand Down
4 changes: 2 additions & 2 deletions core/internal/helpers/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package helpers
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"os"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -92,7 +92,7 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
if caFile == "" {
saramaConfig.Net.TLS.Config = &tls.Config{}
} else {
caCert, err := ioutil.ReadFile(caFile)
caCert, err := os.ReadFile(caFile)
if err != nil {
panic("cannot read TLS CA file: " + err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions core/internal/helpers/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ package helpers
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net"
"os"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -75,7 +75,7 @@ func ZookeeperConnectTLS(servers []string, sessionTimeout time.Duration, logger
// newTLSDialer creates a dialer with TLS configured. It will install caFile as root CA and if both certFile and keyFile are
// set, it will add those as a certificate.
func newTLSDialer(addr, caFile, certFile, keyFile string) (zk.Dialer, error) {
caCert, err := ioutil.ReadFile(caFile)
caCert, err := os.ReadFile(caFile)
if err != nil {
return nil, errors.New("could not read caFile: " + err.Error())
}
Expand Down
28 changes: 14 additions & 14 deletions core/internal/httpserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestHttpServer_configMain(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config", nil)
req, err := http.NewRequest("GET", "/v3/config", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -60,7 +60,7 @@ func TestHttpServer_configStorageList(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/storage", nil)
req, err := http.NewRequest("GET", "/v3/config/storage", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -84,7 +84,7 @@ func TestHttpServer_configConsumerList(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/consumer", nil)
req, err := http.NewRequest("GET", "/v3/config/consumer", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -108,7 +108,7 @@ func TestHttpServer_configClusterList(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/cluster", nil)
req, err := http.NewRequest("GET", "/v3/config/cluster", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -132,7 +132,7 @@ func TestHttpServer_configEvaluatorList(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/evaluator", nil)
req, err := http.NewRequest("GET", "/v3/config/evaluator", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -156,7 +156,7 @@ func TestHttpServer_configNotifierList(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/notifier", nil)
req, err := http.NewRequest("GET", "/v3/config/notifier", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -180,7 +180,7 @@ func TestHttpServer_configStorageDetail(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/storage/teststorage", nil)
req, err := http.NewRequest("GET", "/v3/config/storage/teststorage", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -206,7 +206,7 @@ func TestHttpServer_configStorageDetail(t *testing.T) {
assert.Equalf(t, "inmemory", resp.Module.ClassName, "Expected ClassName to be immemory, not %v", resp.Module.ClassName)

// Call again for a 404
req, err = http.NewRequest("GET", "/v3/config/storage/nomodule", nil)
req, err = http.NewRequest("GET", "/v3/config/storage/nomodule", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")
rr = httptest.NewRecorder()
coordinator.router.ServeHTTP(rr, req)
Expand All @@ -218,7 +218,7 @@ func TestHttpServer_configConsumerDetail(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/consumer/testconsumer", nil)
req, err := http.NewRequest("GET", "/v3/config/consumer/testconsumer", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -244,7 +244,7 @@ func TestHttpServer_configConsumerDetail(t *testing.T) {
assert.Equalf(t, "kafka_zk", resp.Module.ClassName, "Expected ClassName to be kafka_zk, not %v", resp.Module.ClassName)

// Call again for a 404
req, err = http.NewRequest("GET", "/v3/config/consumer/nomodule", nil)
req, err = http.NewRequest("GET", "/v3/config/consumer/nomodule", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")
rr = httptest.NewRecorder()
coordinator.router.ServeHTTP(rr, req)
Expand All @@ -256,7 +256,7 @@ func TestHttpServer_configEvaluatorDetail(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/evaluator/testevaluator", nil)
req, err := http.NewRequest("GET", "/v3/config/evaluator/testevaluator", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -282,7 +282,7 @@ func TestHttpServer_configEvaluatorDetail(t *testing.T) {
assert.Equalf(t, "caching", resp.Module.ClassName, "Expected ClassName to be caching, not %v", resp.Module.ClassName)

// Call again for a 404
req, err = http.NewRequest("GET", "/v3/config/evaluator/nomodule", nil)
req, err = http.NewRequest("GET", "/v3/config/evaluator/nomodule", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")
rr = httptest.NewRecorder()
coordinator.router.ServeHTTP(rr, req)
Expand All @@ -294,7 +294,7 @@ func TestHttpServer_configNotifierDetail(t *testing.T) {
setupConfiguration()

// Set up a request
req, err := http.NewRequest("GET", "/v3/config/notifier/testnotifier", nil)
req, err := http.NewRequest("GET", "/v3/config/notifier/testnotifier", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -320,7 +320,7 @@ func TestHttpServer_configNotifierDetail(t *testing.T) {
assert.Equalf(t, "null", resp.Module.ClassName, "Expected ClassName to be null, not %v", resp.Module.ClassName)

// Call again for a 404
req, err = http.NewRequest("GET", "/v3/config/notifier/nomodule", nil)
req, err = http.NewRequest("GET", "/v3/config/notifier/nomodule", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")
rr = httptest.NewRecorder()
coordinator.router.ServeHTTP(rr, req)
Expand Down
3 changes: 1 addition & 2 deletions core/internal/httpserver/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"crypto/x509"
"encoding/json"
"errors"
"io/ioutil"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -99,7 +98,7 @@ func (hc *Coordinator) Configure() {
server.TLSConfig = &tls.Config{}

if caFile != "" {
caCert, err := ioutil.ReadFile(caFile)
caCert, err := os.ReadFile(caFile)
if err != nil {
panic("cannot read TLS CA file: " + err.Error())
}
Expand Down
8 changes: 4 additions & 4 deletions core/internal/httpserver/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestHttpServer_handleAdmin(t *testing.T) {
coordinator := fixtureConfiguredCoordinator()

// Set up a request
req, err := http.NewRequest("GET", "/burrow/admin", nil)
req, err := http.NewRequest("GET", "/burrow/admin", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand All @@ -63,7 +63,7 @@ func TestHttpServer_handleReady(t *testing.T) {
coordinator := fixtureConfiguredCoordinator()

// Set up a request
req, err := http.NewRequest("GET", "/burrow/admin/ready", nil)
req, err := http.NewRequest("GET", "/burrow/admin/ready", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter, the app is not ready so we expect "STARTING" and HTTP 503
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestHttpServer_getClusterList(t *testing.T) {
}()

// Set up a request
req, err := http.NewRequest("GET", "/v3/admin/loglevel", nil)
req, err := http.NewRequest("GET", "/v3/admin/loglevel", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestHttpServer_DefaultHandler(t *testing.T) {
coordinator := fixtureConfiguredCoordinator()

// Set up a request
req, err := http.NewRequest("GET", "/v3/no/such/uri", nil)
req, err := http.NewRequest("GET", "/v3/no/such/uri", http.NoBody)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
Expand Down
Loading

0 comments on commit 9095bed

Please sign in to comment.