Skip to content

Commit

Permalink
events log: log to CH only when CH is configured
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Mar 5, 2024
1 parent 4288efb commit e3f9eb9
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 22 deletions.
15 changes: 4 additions & 11 deletions bulkerapp/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,17 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
a.cron = NewCron(a.config)

a.eventsLogService = &eventslog.DummyEventsLogService{}
elServices := []eventslog.EventsLogService{}

if a.config.ClickhouseHost != "" {
chEventsLogService, err := eventslog.NewClickhouseEventsLog(a.config.EventsLogConfig)
a.eventsLogService, err = eventslog.NewClickhouseEventsLog(a.config.EventsLogConfig)
if err != nil {
return err
}
elServices = append(elServices, chEventsLogService)
}
eventsLogRedisUrl := utils.NvlString(a.config.EventsLogRedisURL, a.config.RedisURL)
if eventsLogRedisUrl != "" {
redisEventsLogService, err := eventslog.NewRedisEventsLog(eventsLogRedisUrl, a.config.RedisTLSCA, a.config.EventsLogMaxSize)
} else if eventsLogRedisUrl := utils.NvlString(a.config.EventsLogRedisURL, a.config.RedisURL); eventsLogRedisUrl != "" {
a.eventsLogService, err = eventslog.NewRedisEventsLog(eventsLogRedisUrl, a.config.RedisTLSCA, a.config.EventsLogMaxSize)
if err != nil {
return err
}
elServices = append(elServices, redisEventsLogService)
}
if len(elServices) > 0 {
a.eventsLogService = &eventslog.MultiEventsLogService{Services: elServices}
}

a.fastStore, err = NewFastStore(a.config)
Expand Down
14 changes: 3 additions & 11 deletions ingest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,16 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
a.repository = NewStreamsRepository(a.config.RepositoryURL, a.config.RepositoryAuthToken, a.config.RepositoryRefreshPeriodSec, a.config.CacheDir)
a.scriptRepository = NewScriptRepository(a.config.ScriptOrigin, a.config.CacheDir)
a.eventsLogService = &eventslog.DummyEventsLogService{}
elServices := []eventslog.EventsLogService{}
if a.config.ClickhouseHost != "" {
chEventsLogService, err := eventslog.NewClickhouseEventsLog(a.config.EventsLogConfig)
a.eventsLogService, err = eventslog.NewClickhouseEventsLog(a.config.EventsLogConfig)
if err != nil {
return err
}
elServices = append(elServices, chEventsLogService)
}
eventsLogRedisUrl := a.config.RedisURL
if eventsLogRedisUrl != "" {
redisEventsLogService, err := eventslog.NewRedisEventsLog(eventsLogRedisUrl, a.config.RedisTLSCA, a.config.EventsLogMaxSize)
} else if a.config.RedisURL != "" {
a.eventsLogService, err = eventslog.NewRedisEventsLog(a.config.RedisURL, a.config.RedisTLSCA, a.config.EventsLogMaxSize)
if err != nil {
return err
}
elServices = append(elServices, redisEventsLogService)
}
if len(elServices) > 0 {
a.eventsLogService = &eventslog.MultiEventsLogService{Services: elServices}
}
a.kafkaConfig = a.config.GetKafkaConfig()
//batch producer uses higher linger.ms and doesn't suit for sync delivery used by stream consumer when retrying messages
Expand Down

0 comments on commit e3f9eb9

Please sign in to comment.