Skip to content

Commit

Permalink
add custom consumer name
Browse files Browse the repository at this point in the history
add consumer custom name option
commit_hash:0a98b0eacf582366c69a8719c638db00a2c82ee2
  • Loading branch information
KosovGrigorii committed Dec 3, 2024
1 parent 67879cc commit 6d8075a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
9 changes: 5 additions & 4 deletions pkg/providers/ydb/model_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ type YdbSource struct {
RootCAFiles []string

// replication stuff:
ChangeFeedMode ChangeFeedModeType
ChangeFeedCustomName string // user can specify pre-created feed's name, otherwise it will created with name == transferID
BufferSize model.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source
VerboseSDKLogs bool
ChangeFeedMode ChangeFeedModeType
ChangeFeedCustomName string // user can specify pre-created feed's name, otherwise it will created with name == transferID
ChangeFeedCustomConsumerName string
BufferSize model.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source
VerboseSDKLogs bool

// auth stuff:
Token model.SecretString
Expand Down
4 changes: 2 additions & 2 deletions pkg/providers/ydb/reader_threadsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (r *readerThreadSafe) Close(ctx context.Context) error {
return r.readerImpl.Close(ctx)
}

func newReader(feedName string, dbname string, tables []string, ydbClient *ydb.Driver, logger log.Logger) (*readerThreadSafe, error) {
func newReader(feedName, consumerName, dbname string, tables []string, ydbClient *ydb.Driver, logger log.Logger) (*readerThreadSafe, error) {
dbname = strings.TrimLeft(dbname, "/")
selectors := make([]topicoptions.ReadSelector, len(tables))
for i, table := range tables {
Expand All @@ -49,7 +49,7 @@ func newReader(feedName string, dbname string, tables []string, ydbClient *ydb.D
}

readerImpl, err := ydbClient.Topic().StartReader(
dataTransferConsumerName,
consumerName,
selectors,
topicoptions.WithReaderCommitTimeLagTrigger(0),
topicoptions.WithReaderCommitMode(topicoptions.CommitModeSync),
Expand Down
7 changes: 6 additions & 1 deletion pkg/providers/ydb/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,12 @@ func NewSource(transferID string, cfg *YdbSource, logger log.Logger, _ metrics.R
if cfg.ChangeFeedCustomName != "" {
feedName = cfg.ChangeFeedCustomName
}
reader, err := newReader(feedName, cfg.Database, cfg.Tables, ydbClient, logger)
consumerName := dataTransferConsumerName
if cfg.ChangeFeedCustomConsumerName != "" {
consumerName = cfg.ChangeFeedCustomConsumerName
}

reader, err := newReader(feedName, consumerName, cfg.Database, cfg.Tables, ydbClient, logger)
if err != nil {
return nil, xerrors.Errorf("failed to create stream reader: %w", err)
}
Expand Down

0 comments on commit 6d8075a

Please sign in to comment.