diff --git a/Makefile b/Makefile index b609324..fdaa3e4 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,11 @@ test: test-kafka test-redpanda generate: go generate ./... +.PHONY: fmt +fmt: ## Format Go files using gofumpt and gci. + gofumpt -l -w . + gci write --skip-generated . + .PHONY: lint lint: golangci-lint run diff --git a/README.md b/README.md index 4364bf1..e7ea06e 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ A source is getting associated with a consumer group ID the first time the `Read | `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | | | `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | | | `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | | +| `retryGroupJoinErrors` | determines whether the connector will continually retry on group join errors | false | `true` | ## Destination diff --git a/common/config.go b/common/config.go index e3348fa..424ae6b 100644 --- a/common/config.go +++ b/common/config.go @@ -23,10 +23,8 @@ import ( "github.com/twmb/franz-go/pkg/kgo" ) -var ( - // TODO make the timeout configurable - connectionTimeout = time.Second * 10 -) +// TODO make the timeout configurable +var connectionTimeout = time.Second * 10 // Config contains common configuration parameters. type Config struct { @@ -78,6 +76,7 @@ func (c Config) TryDial(ctx context.Context) error { case <-ctx.Done(): return err case <-time.After(time.Second): + sdk.Logger(ctx).Warn().Msg("failed to dial broker, trying again...") // ping again } } diff --git a/source/config.go b/source/config.go index 2973e18..1e47fa2 100644 --- a/source/config.go +++ b/source/config.go @@ -39,6 +39,8 @@ type Config struct { ReadFromBeginning bool `json:"readFromBeginning"` // GroupID defines the consumer group id. GroupID string `json:"groupID"` + // RetryGroupJoinErrors determines whether the connector will continually retry on group join errors. + RetryGroupJoinErrors bool `json:"retryGroupJoinErrors" default:"true"` } // Validate executes manual validations beyond what is defined in struct tags. diff --git a/source/config_test.go b/source/config_test.go index 45b8635..983c578 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -30,34 +30,35 @@ func TestConfig_ValidateTopics(t *testing.T) { name string cfg Config wantErr string - }{{ - name: `one of "topic" and "topics" should be provided.`, - cfg: Config{ - Topics: []string{}, - Topic: "", + }{ + { + name: `one of "topic" and "topics" should be provided.`, + cfg: Config{ + Topics: []string{}, + Topic: "", + }, + wantErr: `required parameter missing: "topics"`, + }, { + name: "invalid, only provide one.", + cfg: Config{ + Topics: []string{"topic2"}, + Topic: "topic1", + }, + wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`, + }, { + name: "valid with warning, will be deprecated soon", + cfg: Config{ + Topics: []string{}, + Topic: "topic1", + }, + wantErr: "", + }, { + name: "valid", + cfg: Config{ + Topics: []string{"topic1"}, + }, + wantErr: "", }, - wantErr: `required parameter missing: "topics"`, - }, { - name: "invalid, only provide one.", - cfg: Config{ - Topics: []string{"topic2"}, - Topic: "topic1", - }, - wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`, - }, { - name: "valid with warning, will be deprecated soon", - cfg: Config{ - Topics: []string{}, - Topic: "topic1", - }, - wantErr: "", - }, { - name: "valid", - cfg: Config{ - Topics: []string{"topic1"}, - }, - wantErr: "", - }, } for _, tc := range testCases { diff --git a/source/franz.go b/source/franz.go index 8d2736c..3c07dc0 100644 --- a/source/franz.go +++ b/source/franz.go @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate mockgen -typed -destination franz_mock.go -package source -mock_names=Client=MockClient . Client + package source import ( "context" "errors" "fmt" + "strings" "sync" sdk "github.com/conduitio/conduit-connector-sdk" @@ -25,10 +28,20 @@ import ( ) type FranzConsumer struct { - client *kgo.Client + client Client acker *batchAcker iter *kgo.FetchesRecordIter + + retryGroupJoinErrors bool +} + +// Client is a franz-go kafka client. +type Client interface { + Close() + CommitRecords(ctx context.Context, rs ...*kgo.Record) error + OptValue(opt any) any + PollFetches(ctx context.Context) kgo.Fetches } var _ Consumer = (*FranzConsumer)(nil) @@ -53,9 +66,10 @@ func NewFranzConsumer(ctx context.Context, cfg Config) (*FranzConsumer, error) { } return &FranzConsumer{ - client: cl, - acker: newBatchAcker(cl, 1000), - iter: &kgo.FetchesRecordIter{}, // empty iterator is done + client: cl, + acker: newBatchAcker(cl, 1000), + iter: &kgo.FetchesRecordIter{}, // empty iterator is done + retryGroupJoinErrors: cfg.RetryGroupJoinErrors, }, nil } @@ -63,6 +77,13 @@ func (c *FranzConsumer) Consume(ctx context.Context) (*Record, error) { for c.iter.Done() { fetches := c.client.PollFetches(ctx) if err := fetches.Err(); err != nil { + var errGroupSession *kgo.ErrGroupSession + if c.retryGroupJoinErrors && + (errors.As(err, &errGroupSession) || strings.Contains(err.Error(), "unable to join group session")) { + sdk.Logger(ctx).Warn().Err(err).Msgf("group session error, retrying") + return nil, sdk.ErrBackoffRetry + } + return nil, err } if fetches.Empty() { @@ -92,7 +113,7 @@ func (c *FranzConsumer) Close(ctx context.Context) error { // batchAcker commits acks in batches. type batchAcker struct { - client *kgo.Client + client Client batchSize int curBatchIndex int @@ -101,7 +122,7 @@ type batchAcker struct { m sync.Mutex } -func newBatchAcker(client *kgo.Client, batchSize int) *batchAcker { +func newBatchAcker(client Client, batchSize int) *batchAcker { return &batchAcker{ client: client, batchSize: batchSize, diff --git a/source/franz_integration_test.go b/source/franz_integration_test.go index e055331..8f8e2a4 100644 --- a/source/franz_integration_test.go +++ b/source/franz_integration_test.go @@ -28,7 +28,7 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false)) cfg.ReadFromBeginning = true records := test.GenerateFranzRecords(1, 6) @@ -56,7 +56,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false)) cfg.ReadFromBeginning = false records := test.GenerateFranzRecords(1, 6) @@ -93,7 +93,7 @@ func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true, false)) cfg.ReadFromBeginning = true records := test.GenerateFranzRecords(1, 6) diff --git a/source/franz_mock.go b/source/franz_mock.go new file mode 100644 index 0000000..b756d7c --- /dev/null +++ b/source/franz_mock.go @@ -0,0 +1,196 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit-connector-kafka/source (interfaces: Client) +// +// Generated by this command: +// +// mockgen -typed -destination franz_mock.go -package source -mock_names=Client=MockClient . Client +// + +// Package source is a generated GoMock package. +package source + +import ( + context "context" + reflect "reflect" + + kgo "github.com/twmb/franz-go/pkg/kgo" + gomock "go.uber.org/mock/gomock" +) + +// MockClient is a mock of Client interface. +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder +} + +// MockClientMockRecorder is the mock recorder for MockClient. +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance. +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockClient) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockClientMockRecorder) Close() *MockClientCloseCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close)) + return &MockClientCloseCall{Call: call} +} + +// MockClientCloseCall wrap *gomock.Call +type MockClientCloseCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockClientCloseCall) Return() *MockClientCloseCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockClientCloseCall) Do(f func()) *MockClientCloseCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockClientCloseCall) DoAndReturn(f func()) *MockClientCloseCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// CommitRecords mocks base method. +func (m *MockClient) CommitRecords(arg0 context.Context, arg1 ...*kgo.Record) error { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CommitRecords", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// CommitRecords indicates an expected call of CommitRecords. +func (mr *MockClientMockRecorder) CommitRecords(arg0 any, arg1 ...any) *MockClientCommitRecordsCall { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommitRecords", reflect.TypeOf((*MockClient)(nil).CommitRecords), varargs...) + return &MockClientCommitRecordsCall{Call: call} +} + +// MockClientCommitRecordsCall wrap *gomock.Call +type MockClientCommitRecordsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockClientCommitRecordsCall) Return(arg0 error) *MockClientCommitRecordsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockClientCommitRecordsCall) Do(f func(context.Context, ...*kgo.Record) error) *MockClientCommitRecordsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockClientCommitRecordsCall) DoAndReturn(f func(context.Context, ...*kgo.Record) error) *MockClientCommitRecordsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// OptValue mocks base method. +func (m *MockClient) OptValue(arg0 any) any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OptValue", arg0) + ret0, _ := ret[0].(any) + return ret0 +} + +// OptValue indicates an expected call of OptValue. +func (mr *MockClientMockRecorder) OptValue(arg0 any) *MockClientOptValueCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OptValue", reflect.TypeOf((*MockClient)(nil).OptValue), arg0) + return &MockClientOptValueCall{Call: call} +} + +// MockClientOptValueCall wrap *gomock.Call +type MockClientOptValueCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockClientOptValueCall) Return(arg0 any) *MockClientOptValueCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockClientOptValueCall) Do(f func(any) any) *MockClientOptValueCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockClientOptValueCall) DoAndReturn(f func(any) any) *MockClientOptValueCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// PollFetches mocks base method. +func (m *MockClient) PollFetches(arg0 context.Context) kgo.Fetches { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PollFetches", arg0) + ret0, _ := ret[0].(kgo.Fetches) + return ret0 +} + +// PollFetches indicates an expected call of PollFetches. +func (mr *MockClientMockRecorder) PollFetches(arg0 any) *MockClientPollFetchesCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollFetches", reflect.TypeOf((*MockClient)(nil).PollFetches), arg0) + return &MockClientPollFetchesCall{Call: call} +} + +// MockClientPollFetchesCall wrap *gomock.Call +type MockClientPollFetchesCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockClientPollFetchesCall) Return(arg0 kgo.Fetches) *MockClientPollFetchesCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockClientPollFetchesCall) Do(f func(context.Context) kgo.Fetches) *MockClientPollFetchesCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockClientPollFetchesCall) DoAndReturn(f func(context.Context) kgo.Fetches) *MockClientPollFetchesCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/source/franz_test.go b/source/franz_test.go index 20af311..59796e5 100644 --- a/source/franz_test.go +++ b/source/franz_test.go @@ -17,16 +17,20 @@ package source import ( "context" "crypto/tls" + "errors" "regexp" "testing" + "time" "github.com/conduitio/conduit-connector-kafka/common" "github.com/conduitio/conduit-connector-kafka/test" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/matryer/is" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl" + "go.uber.org/mock/gomock" ) func TestFranzConsumer_Opts(t *testing.T) { @@ -64,3 +68,73 @@ func TestFranzConsumer_Opts(t *testing.T) { is.Equal(cmp.Diff(c.client.OptValue(kgo.DialTLSConfig), cfg.TLS(), cmpopts.IgnoreUnexported(tls.Config{})), "") is.Equal(c.client.OptValue(kgo.SASL).([]sasl.Mechanism)[0].Name(), cfg.SASL().Name()) } + +func Test_FranzConsumer_Consume_Success(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + cl := NewMockClient(gomock.NewController(t)) + cl.EXPECT(). + PollFetches(gomock.Any()). + Return([]kgo.Fetch{{ + Topics: []kgo.FetchTopic{{ + Topic: "test", + Partitions: []kgo.FetchPartition{{ + Partition: 0, + Err: nil, + Records: []*kgo.Record{ + { + Key: []byte("hi"), + Value: []byte("hello"), + Partition: 0, + Offset: 1, + Timestamp: time.Now(), + Topic: "test", + }, + }, + }}, + }}, + }}). + Times(1) + + c := &FranzConsumer{ + client: cl, + acker: newBatchAcker(cl, 1000), + iter: &kgo.FetchesRecordIter{}, + retryGroupJoinErrors: true, + } + + r, err := c.Consume(ctx) + is.NoErr(err) + is.Equal(r.Key, []byte("hi")) + is.Equal(r.Value, []byte("hello")) +} + +func Test_FranzConsumer_Consume_RetryGroupJoinError(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + cl := NewMockClient(gomock.NewController(t)) + cl.EXPECT(). + PollFetches(gomock.Any()). + Return([]kgo.Fetch{{ + Topics: []kgo.FetchTopic{{ + Topic: "", + Partitions: []kgo.FetchPartition{{ + Partition: 0, + Err: errors.New("unable to join group session: unable to dial: dial tcp 127.0.0.1:9092: connect: connection refused"), + }}, + }}, + }}). + Times(1) + + c := &FranzConsumer{ + client: cl, + acker: newBatchAcker(cl, 1000), + iter: &kgo.FetchesRecordIter{}, + retryGroupJoinErrors: true, + } + + _, err := c.Consume(ctx) + is.True(errors.Is(err, sdk.ErrBackoffRetry)) +} diff --git a/source/paramgen.go b/source/paramgen.go index b9398f9..f3eb834 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -51,6 +51,12 @@ func (Config) Parameters() map[string]sdk.Parameter { Type: sdk.ParameterTypeBool, Validations: []sdk.Validation{}, }, + "retryGroupJoinErrors": { + Default: "true", + Description: "RetryGroupJoinErrors determines whether the connector will continually retry on group join errors.", + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, + }, "saslMechanism": { Default: "", Description: "saslMechanism configures the connector to use SASL authentication. If empty, no authentication will be performed.", diff --git a/source_integration_test.go b/source_integration_test.go index fb5bd86..0582b2a 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -28,7 +28,7 @@ import ( func TestSource_Integration_RestartFull(t *testing.T) { t.Parallel() - cfgMap := test.SourceConfigMap(t, true) + cfgMap := test.SourceConfigMap(t, true, false) cfg := test.ParseConfigMap[source.Config](t, cfgMap) recs1 := test.GenerateFranzRecords(1, 3) @@ -44,7 +44,7 @@ func TestSource_Integration_RestartFull(t *testing.T) { func TestSource_Integration_RestartPartial(t *testing.T) { t.Parallel() - cfgMap := test.SourceConfigMap(t, true) + cfgMap := test.SourceConfigMap(t, true, false) cfg := test.ParseConfigMap[source.Config](t, cfgMap) recs1 := test.GenerateFranzRecords(1, 3) diff --git a/source_test.go b/source_test.go index 27a5795..782b1d6 100644 --- a/source_test.go +++ b/source_test.go @@ -39,7 +39,7 @@ func TestSource_Teardown_Success(t *testing.T) { Close(context.Background()). Return(nil) - cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, true)) + cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, true, false)) underTest := Source{consumer: consumerMock, config: cfg} is.NoErr(underTest.Teardown(context.Background())) @@ -86,7 +86,7 @@ func TestSource_Read(t *testing.T) { Consume(gomock.Any()). Return((*source.Record)(rec), nil) - cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false)) + cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false, false)) underTest := Source{consumer: consumerMock, config: cfg} got, err := underTest.Read(context.Background()) is.NoErr(err) diff --git a/test/util.go b/test/util.go index 7d18d4e..7ba7c9f 100644 --- a/test/util.go +++ b/test/util.go @@ -67,9 +67,10 @@ func ConfigMap() map[string]string { } } -func SourceConfigMap(t T, multipleTopics bool) map[string]string { +func SourceConfigMap(t T, multipleTopics, retryGroupJoinErrors bool) map[string]string { m := ConfigMap() m["readFromBeginning"] = "true" + m["retryGroupJoinErrors"] = fmt.Sprint(retryGroupJoinErrors) m["topics"] = getRandomTopicName(t) if multipleTopics { m["topics"] = m["topics"] + "," + getRandomTopicName(t)