diff --git a/README.md b/README.md index bd4a7e0..81ac247 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,8 @@ A source is getting associated with a consumer group ID the first time the `Read | name | description | required | default value | |----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------------| | `servers` | Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. | true | | -| `topic` | Topic is the Kafka topic from which records will be read. | true | | +| `topics` | Topics is a comma separated list of Kafka topics from which records will be read, ex: "topic1,topic2". | true | | +| ~~`topic`~~ | Topic is the Kafka topic to read from. **Deprecated: use `topics` instead.** | false | | | `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` | | `readFromBeginning` | Determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this option is set to true it will start with the first message in that partition. | false | `false` | | `groupID` | Defines the consumer group ID. | false | | diff --git a/acceptance_test.go b/acceptance_test.go index b7a6989..0155031 100644 --- a/acceptance_test.go +++ b/acceptance_test.go @@ -26,10 +26,13 @@ import ( ) func TestAcceptance(t *testing.T) { - cfg := map[string]string{ + srcCfg := map[string]string{ "servers": "localhost:9092", // source params "readFromBeginning": "true", + } + destCfg := map[string]string{ + "servers": "localhost:9092", // destination params "batchBytes": "1000012", "acks": "all", @@ -40,12 +43,14 @@ func TestAcceptance(t *testing.T) { ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{ Config: sdk.ConfigurableAcceptanceTestDriverConfig{ Connector: Connector, - SourceConfig: cfg, - DestinationConfig: cfg, + SourceConfig: srcCfg, + DestinationConfig: destCfg, BeforeTest: func(t *testing.T) { lastSlash := strings.LastIndex(t.Name(), "/") - cfg["topic"] = t.Name()[lastSlash+1:] + uuid.NewString() + randomName := t.Name()[lastSlash+1:] + uuid.NewString() + srcCfg["topics"] = randomName + destCfg["topic"] = randomName }, Skip: []string{ @@ -69,7 +74,7 @@ type AcceptanceTestDriver struct { // group which results in slow reads. This speeds up the destination tests. func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record { cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t)) - kgoRecs := test.Consume(t, cfg.Servers, cfg.Topic, len(records)) + kgoRecs := test.Consume(t, cfg.Servers, cfg.Topics[0], len(records)) recs := make([]sdk.Record, len(kgoRecs)) for i, rec := range kgoRecs { diff --git a/destination_integration_test.go b/destination_integration_test.go index 238ec8f..8175726 100644 --- a/destination_integration_test.go +++ b/destination_integration_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-connector-kafka/destination" "github.com/conduitio/conduit-connector-kafka/source" "github.com/conduitio/conduit-connector-kafka/test" "github.com/matryer/is" @@ -25,9 +26,9 @@ import ( func TestDestination_Integration_WriteExistingTopic(t *testing.T) { cfgMap := test.DestinationConfigMap(t) - cfg := test.ParseConfigMap[source.Config](t, cfgMap) + cfg := test.ParseConfigMap[destination.Config](t, cfgMap) - test.CreateTopic(t, cfg.Servers, cfg.Topic) + test.CreateTopics(t, cfg.Servers, []string{cfg.Topic}) testDestinationIntegrationWrite(t, cfgMap) } @@ -58,8 +59,12 @@ func testDestinationIntegrationWrite(t *testing.T, cfg map[string]string) { is.NoErr(err) is.Equal(count, len(wantRecords)) + // source config needs "topics" param + cfg["topics"] = cfg["topic"] + cfg["topic"] = "" + srcCfg := test.ParseConfigMap[source.Config](t, cfg) - gotRecords := test.Consume(t, srcCfg.Servers, srcCfg.Topic, len(wantRecords)) + gotRecords := test.Consume(t, srcCfg.Servers, srcCfg.Topics[0], len(wantRecords)) is.Equal(len(wantRecords), len(gotRecords)) for i, got := range gotRecords { is.Equal(got.Value, wantRecords[i].Bytes()) diff --git a/source.go b/source.go index f3ef561..a3928b1 100644 --- a/source.go +++ b/source.go @@ -42,14 +42,14 @@ func (s *Source) Parameters() map[string]sdk.Parameter { return source.Config{}.Parameters() } -func (s *Source) Configure(_ context.Context, cfg map[string]string) error { +func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { var config source.Config err := sdk.Util.ParseConfig(cfg, &config) if err != nil { return err } - err = config.Validate() + err = config.Validate(ctx) if err != nil { return err } diff --git a/source/config.go b/source/config.go index 0eca699..2973e18 100644 --- a/source/config.go +++ b/source/config.go @@ -17,14 +17,21 @@ package source import ( + "context" + "errors" + "fmt" + "github.com/conduitio/conduit-connector-kafka/common" + sdk "github.com/conduitio/conduit-connector-sdk" ) type Config struct { common.Config - // Topic is the Kafka topic. - Topic string `json:"topic" validate:"required"` + // Topics is a comma separated list of Kafka topics to read from. + Topics []string `json:"topics"` + // Topic {WARN will be deprecated soon} the kafka topic to read from. + Topic string `json:"topic"` // ReadFromBeginning determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If this // options is set to true it will start with the first message in that @@ -35,6 +42,24 @@ type Config struct { } // Validate executes manual validations beyond what is defined in struct tags. -func (c Config) Validate() error { - return c.Config.Validate() +func (c *Config) Validate(ctx context.Context) error { + var multierr []error + err := c.Config.Validate() + if err != nil { + multierr = append(multierr, err) + } + // validate and set the topics. + if len(c.Topic) == 0 && len(c.Topics) == 0 { + multierr = append(multierr, fmt.Errorf("required parameter missing: %q", "topics")) + } + if len(c.Topic) > 0 && len(c.Topics) > 0 { + multierr = append(multierr, fmt.Errorf(`can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`)) + } + if len(c.Topic) > 0 && len(c.Topics) == 0 { + sdk.Logger(ctx).Warn().Msg(`"topic" parameter is deprecated and will be removed, please use "topics" instead.`) + // add the topic value to the topics slice. + c.Topics = make([]string, 1) + c.Topics[0] = c.Topic + } + return errors.Join(multierr...) } diff --git a/source/config_test.go b/source/config_test.go new file mode 100644 index 0000000..45b8635 --- /dev/null +++ b/source/config_test.go @@ -0,0 +1,77 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/matryer/is" +) + +func TestConfig_ValidateTopics(t *testing.T) { + // Note that we are testing custom validations. Required fields and simple + // validations are already executed by the SDK via parameter specifications. + testCases := []struct { + name string + cfg Config + wantErr string + }{{ + name: `one of "topic" and "topics" should be provided.`, + cfg: Config{ + Topics: []string{}, + Topic: "", + }, + wantErr: `required parameter missing: "topics"`, + }, { + name: "invalid, only provide one.", + cfg: Config{ + Topics: []string{"topic2"}, + Topic: "topic1", + }, + wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`, + }, { + name: "valid with warning, will be deprecated soon", + cfg: Config{ + Topics: []string{}, + Topic: "topic1", + }, + wantErr: "", + }, { + name: "valid", + cfg: Config{ + Topics: []string{"topic1"}, + }, + wantErr: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + err := tc.cfg.Validate(context.Background()) + fmt.Println(err) + if tc.wantErr != "" { + is.True(err != nil) + is.True(strings.Contains(err.Error(), tc.wantErr)) + } else { + is.NoErr(err) + is.Equal(tc.cfg.Topics, []string{"topic1"}) + } + }) + } +} diff --git a/source/franz.go b/source/franz.go index 7e68869..8d2736c 100644 --- a/source/franz.go +++ b/source/franz.go @@ -37,7 +37,7 @@ func NewFranzConsumer(ctx context.Context, cfg Config) (*FranzConsumer, error) { opts := cfg.FranzClientOpts(sdk.Logger(ctx)) opts = append(opts, []kgo.Opt{ kgo.ConsumerGroup(cfg.GroupID), - kgo.ConsumeTopics(cfg.Topic), + kgo.ConsumeTopics(cfg.Topics...), }...) if !cfg.ReadFromBeginning { diff --git a/source/franz_integration_test.go b/source/franz_integration_test.go index 6589980..e055331 100644 --- a/source/franz_integration_test.go +++ b/source/franz_integration_test.go @@ -28,12 +28,12 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) cfg.ReadFromBeginning = true records := test.GenerateFranzRecords(1, 6) - test.CreateTopic(t, cfg.Servers, cfg.Topic) - test.Produce(t, cfg.Servers, cfg.Topic, records) + test.CreateTopics(t, cfg.Servers, cfg.Topics) + test.Produce(t, cfg.Servers, cfg.Topics[0], records) c, err := NewFranzConsumer(ctx, cfg) is.NoErr(err) @@ -56,12 +56,12 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) cfg.ReadFromBeginning = false records := test.GenerateFranzRecords(1, 6) - test.CreateTopic(t, cfg.Servers, cfg.Topic) - test.Produce(t, cfg.Servers, cfg.Topic, records) + test.CreateTopics(t, cfg.Servers, cfg.Topics) + test.Produce(t, cfg.Servers, cfg.Topics[0], records) c, err := NewFranzConsumer(ctx, cfg) is.NoErr(err) @@ -77,7 +77,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is.Equal(got, nil) records = test.GenerateFranzRecords(7, 9) - test.Produce(t, cfg.Servers, cfg.Topic, records) + test.Produce(t, cfg.Servers, cfg.Topics[0], records) for i := 0; i < len(records); i++ { ctx, cancel := context.WithTimeout(ctx, time.Second) @@ -87,3 +87,40 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is.Equal(got.Key, records[i].Key) } } + +func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) { + t.Parallel() + is := is.New(t) + ctx := context.Background() + + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true)) + cfg.ReadFromBeginning = true + + records := test.GenerateFranzRecords(1, 6) + test.CreateTopics(t, cfg.Servers, cfg.Topics) + test.Produce(t, cfg.Servers, cfg.Topics[0], records[0:3]) + test.Produce(t, cfg.Servers, cfg.Topics[1], records[3:]) + + c, err := NewFranzConsumer(ctx, cfg) + is.NoErr(err) + defer func() { + err := c.Close(ctx) + is.NoErr(err) + }() + + topic1 := 0 + topic2 := 0 + for i := 0; i < len(records); i++ { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + got, err := c.Consume(ctx) + is.NoErr(err) + if got.Topic == cfg.Topics[0] { + topic1++ + } else if got.Topic == cfg.Topics[1] { + topic2++ + } + } + is.Equal(topic1, 3) + is.Equal(topic2, 3) +} diff --git a/source/franz_test.go b/source/franz_test.go index 4a79896..20af311 100644 --- a/source/franz_test.go +++ b/source/franz_test.go @@ -50,14 +50,14 @@ func TestFranzConsumer_Opts(t *testing.T) { CACert: caCert, }, }, - Topic: "test-topic", + Topics: []string{"test-topic"}, GroupID: "test-group-id", } c, err := NewFranzConsumer(context.Background(), cfg) is.NoErr(err) - is.Equal(c.client.OptValue(kgo.ConsumeTopics), map[string]*regexp.Regexp{cfg.Topic: nil}) + is.Equal(c.client.OptValue(kgo.ConsumeTopics), map[string]*regexp.Regexp{cfg.Topics[0]: nil}) is.Equal(c.client.OptValue(kgo.ConsumerGroup), cfg.GroupID) is.Equal(c.client.OptValue(kgo.ClientID), cfg.ClientID) diff --git a/source/paramgen.go b/source/paramgen.go index 8be2133..b9398f9 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -87,11 +87,15 @@ func (Config) Parameters() map[string]sdk.Parameter { }, "topic": { Default: "", - Description: "topic is the Kafka topic.", + Description: "topic {WARN will be deprecated soon} the kafka topic to read from.", Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, - }, + Validations: []sdk.Validation{}, + }, + "topics": { + Default: "", + Description: "topics is a comma separated list of Kafka topics to read from.", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, }, } } diff --git a/source_integration_test.go b/source_integration_test.go index 7b6007f..fb5bd86 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -28,33 +28,33 @@ import ( func TestSource_Integration_RestartFull(t *testing.T) { t.Parallel() - cfgMap := test.SourceConfigMap(t) + cfgMap := test.SourceConfigMap(t, true) cfg := test.ParseConfigMap[source.Config](t, cfgMap) recs1 := test.GenerateFranzRecords(1, 3) - test.Produce(t, cfg.Servers, cfg.Topic, recs1) + test.Produce(t, cfg.Servers, cfg.Topics[0], recs1) lastPosition := testSourceIntegrationRead(t, cfgMap, nil, recs1, false) // produce more records and restart source from last position recs2 := test.GenerateFranzRecords(4, 6) - test.Produce(t, cfg.Servers, cfg.Topic, recs2) + test.Produce(t, cfg.Servers, cfg.Topics[1], recs2) testSourceIntegrationRead(t, cfgMap, lastPosition, recs2, false) } func TestSource_Integration_RestartPartial(t *testing.T) { t.Parallel() - cfgMap := test.SourceConfigMap(t) + cfgMap := test.SourceConfigMap(t, true) cfg := test.ParseConfigMap[source.Config](t, cfgMap) recs1 := test.GenerateFranzRecords(1, 3) - test.Produce(t, cfg.Servers, cfg.Topic, recs1) + test.Produce(t, cfg.Servers, cfg.Topics[0], recs1) lastPosition := testSourceIntegrationRead(t, cfgMap, nil, recs1, true) // only first record was acked, produce more records and expect to resume // from last acked record recs2 := test.GenerateFranzRecords(4, 6) - test.Produce(t, cfg.Servers, cfg.Topic, recs2) + test.Produce(t, cfg.Servers, cfg.Topics[0], recs2) var wantRecs []*kgo.Record wantRecs = append(wantRecs, recs1[1:]...) @@ -91,6 +91,9 @@ func testSourceIntegrationRead( rec, err := underTest.Read(ctx) is.NoErr(err) is.Equal(wantRecord.Key, rec.Key.Bytes()) + collection, err := rec.Metadata.GetCollection() + is.NoErr(err) + is.Equal(wantRecord.Topic, collection) positions = append(positions, rec.Position) } diff --git a/source_test.go b/source_test.go index 18574cb..27a5795 100644 --- a/source_test.go +++ b/source_test.go @@ -39,7 +39,7 @@ func TestSource_Teardown_Success(t *testing.T) { Close(context.Background()). Return(nil) - cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t)) + cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, true)) underTest := Source{consumer: consumerMock, config: cfg} is.NoErr(underTest.Teardown(context.Background())) @@ -86,7 +86,7 @@ func TestSource_Read(t *testing.T) { Consume(gomock.Any()). Return((*source.Record)(rec), nil) - cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t)) + cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false)) underTest := Source{consumer: consumerMock, config: cfg} got, err := underTest.Read(context.Background()) is.NoErr(err) diff --git a/test/util.go b/test/util.go index b8f7199..7d18d4e 100644 --- a/test/util.go +++ b/test/util.go @@ -56,24 +56,33 @@ type T interface { Cleanup(func()) } -func ConfigMap(t T) map[string]string { +func getRandomTopicName(t T) string { lastSlash := strings.LastIndex(t.Name(), "/") - topic := t.Name()[lastSlash+1:] + uuid.NewString() - t.Logf("using topic: %v", topic) + return t.Name()[lastSlash+1:] + uuid.NewString() +} + +func ConfigMap() map[string]string { return map[string]string{ "servers": "localhost:9092", - "topic": topic, } } -func SourceConfigMap(t T) map[string]string { - m := ConfigMap(t) +func SourceConfigMap(t T, multipleTopics bool) map[string]string { + m := ConfigMap() m["readFromBeginning"] = "true" + m["topics"] = getRandomTopicName(t) + if multipleTopics { + m["topics"] = m["topics"] + "," + getRandomTopicName(t) + } + t.Logf("using topics: %v", m["topics"]) return m } func DestinationConfigMap(t T) map[string]string { - m := ConfigMap(t) + m := ConfigMap() + m["topic"] = getRandomTopicName(t) + t.Logf("using topic: %v", m["topic"]) + m["batchBytes"] = "1000012" m["acks"] = "all" m["compression"] = "snappy" @@ -116,7 +125,7 @@ func Consume(t T, servers []string, topic string, limit int) []*kgo.Record { } func Produce(t T, servers []string, topic string, records []*kgo.Record, timeoutOpt ...time.Duration) { - CreateTopic(t, servers, topic) + CreateTopics(t, servers, []string{topic}) timeout := timeout // copy default timeout if len(timeoutOpt) > 0 { @@ -173,7 +182,7 @@ func GenerateSDKRecords(from, to int, topicOpt ...string) []sdk.Record { return sdkRecs } -func CreateTopic(t T, servers []string, topic string) { +func CreateTopics(t T, servers []string, topics []string) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() is := is.New(t) @@ -188,25 +197,27 @@ func CreateTopic(t T, servers []string, topic string) { t.Cleanup(cl.Close) adminCl := kadm.NewClient(cl) - resp, err := adminCl.CreateTopic( - ctx, 1, 1, nil, topic) + resp, err := adminCl.CreateTopics( + ctx, 1, 1, nil, topics...) var kafkaErr *kerr.Error - if errors.As(err, &kafkaErr) && kafkaErr.Code == kerr.TopicAlreadyExists.Code { + if errors.As(resp.Error(), &kafkaErr) && kafkaErr.Code == kerr.TopicAlreadyExists.Code { // ignore topic if it already exists cl.Close() return } is.NoErr(err) - is.NoErr(resp.Err) + is.NoErr(resp.Error()) // we created the topic, so we should clean up after the test t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - resp, err := adminCl.DeleteTopics(ctx, topic) + responses, err := adminCl.DeleteTopics(ctx, topics...) is.NoErr(err) - is.Equal(resp[topic].ErrMessage, "") - is.NoErr(resp[topic].Err) + for _, resp := range responses { + is.Equal(resp.ErrMessage, "") + is.NoErr(resp.Err) + } }) }