Skip to content

Commit

Permalink
jitsubase: abstract repository and http repository implementation
Browse files Browse the repository at this point in the history
bulker: http configuration source support
ingest: p.js and streams repository switched to http_repository

ingest: repository
  • Loading branch information
absorbb committed Jan 19, 2024
1 parent 4fe82f4 commit 2fb2a0d
Show file tree
Hide file tree
Showing 12 changed files with 621 additions and 481 deletions.
3 changes: 3 additions & 0 deletions bulkerapp/app/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ type Config struct {

// ConfigSource source of destinations configs. Can be:
// - `file://...` for destinations config in yaml format
// - `http://...` for destinations config in json array format
// - `redis` or `redis://redis_url` to load configs from redis `enrichedConnections` key
// - postgresql://postgres_url to load configs from postgresql
// - `env://PREFIX` to load each destination environment variables with like `PREFIX_ID` where ID is destination id
//
// Default: `env://BULKER_DESTINATION`
ConfigSource string `mapstructure:"CONFIG_SOURCE"`
// ConfigSourceHTTPAuthToken auth token for http:// config source
ConfigSourceHTTPAuthToken string `mapstructure:"CONFIG_SOURCE_HTTP_AUTH_TOKEN"`
// ConfigSourceSQLQuery for `postgresql` config source, SQL query to load connections
ConfigSourceSQLQuery string `mapstructure:"CONFIG_SOURCE_SQL_QUERY" default:"select * from enriched_connections"`
// CacheDir dir for config source data
Expand Down
6 changes: 6 additions & 0 deletions bulkerapp/app/configuration_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func InitConfigurationSource(config *Config) (ConfigurationSource, error) {
if err != nil {
return nil, fmt.Errorf("❗error creating yaml configuration source from config file: %s: %v", filePath, err)
}
} else if strings.HasPrefix(cfgSource, "http://") || strings.HasPrefix(cfgSource, "https://") {
var err error
configurationSource = NewHTTPConfigurationSource(config)
if err != nil {
return nil, fmt.Errorf("❗️error while init postgres configuration source: %s: %v", cfgSource, err)
}
} else if strings.HasPrefix(cfgSource, "postgres") {
var err error
configurationSource, err = NewPostgresConfigurationSource(config)
Expand Down
96 changes: 96 additions & 0 deletions bulkerapp/app/http_configuration_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package app

import (
"encoding/json"
"fmt"
"github.com/jitsucom/bulker/jitsubase/appbase"
"io"
"sync/atomic"
"time"
)

type Destinations struct {
DestinationsList []*DestinationConfig
Destinations map[string]*DestinationConfig
LastModified time.Time
}

func (d *Destinations) GetDestinationConfigs() []*DestinationConfig {
return d.DestinationsList
}

func (d *Destinations) GetDestinationConfig(id string) *DestinationConfig {
return d.Destinations[id]
}

type DestinationsRepositoryData struct {
data atomic.Pointer[Destinations]
}

func (drd *DestinationsRepositoryData) Init(reader io.Reader, tag any) error {
dec := json.NewDecoder(reader)
// read open bracket
_, err := dec.Token()
if err != nil {
return fmt.Errorf("error reading open bracket: %v", err)
}
destinations := make([]*DestinationConfig, 0)
destinationsMap := map[string]*DestinationConfig{}
// while the array contains values
for dec.More() {
dc := DestinationConfig{}
err = dec.Decode(&dc)
if err != nil {
return fmt.Errorf("Error unmarshalling destination config: %v", err)
}
destinations = append(destinations, &dc)
destinationsMap[dc.Id()] = &dc
}

// read closing bracket
_, err = dec.Token()
if err != nil {
return fmt.Errorf("error reading closing bracket: %v", err)
}

data := Destinations{
Destinations: destinationsMap,
DestinationsList: destinations,
}
if tag != nil {
data.LastModified = tag.(time.Time)
}
drd.data.Store(&data)
return nil
}

func (drd *DestinationsRepositoryData) GetData() *Destinations {
return drd.data.Load()
}

func (drd *DestinationsRepositoryData) Store(writer io.Writer) error {
d := drd.data.Load()
if d != nil {
encoder := json.NewEncoder(writer)
err := encoder.Encode(d.DestinationsList)
return err
}
return nil
}

type HTTPConfigurationSource struct {
appbase.Repository[Destinations]
}

func NewHTTPConfigurationSource(appconfig *Config) *HTTPConfigurationSource {
rep := appbase.NewHTTPRepository[Destinations]("bulker-connections", appconfig.ConfigSource, appconfig.ConfigSourceHTTPAuthToken, appbase.HTTPTagLastModified, &DestinationsRepositoryData{}, 1, appconfig.ConfigRefreshPeriodSec, appconfig.CacheDir)
return &HTTPConfigurationSource{rep}
}

func (h *HTTPConfigurationSource) GetDestinationConfigs() []*DestinationConfig {
return h.GetData().GetDestinationConfigs()
}

func (h *HTTPConfigurationSource) GetDestinationConfig(id string) *DestinationConfig {
return h.GetData().GetDestinationConfig(id)
}
10 changes: 5 additions & 5 deletions ingest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type Context struct {
config *Config
kafkaConfig *kafka.ConfigMap
dbpool *pgxpool.Pool
repository *Repository
script *Script
repository appbase.Repository[Streams]
scriptRepository appbase.Repository[Script]
producer *kafkabase.Producer
eventsLogService eventslog.EventsLogService
server *http.Server
Expand All @@ -39,8 +39,8 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
if err != nil {
return fmt.Errorf("Unable to create postgres connection pool: %v\n", err)
}
a.repository = NewRepository(a.dbpool, a.config.RepositoryRefreshPeriodSec, a.config.CacheDir)
a.script = NewScript(a.config.ScriptOrigin, a.config.CacheDir)
a.repository = NewStreamsRepository(a.config.RepositoryURL, a.config.RepositoryAuthToken, a.config.RepositoryRefreshPeriodSec, a.config.CacheDir)
a.scriptRepository = NewScriptRepository(a.config.ScriptOrigin, a.config.CacheDir)
a.eventsLogService = &eventslog.DummyEventsLogService{}
eventsLogRedisUrl := a.config.RedisURL
if eventsLogRedisUrl != "" {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (a *Context) Cleanup() error {
}
_ = a.metricsServer.Stop()
_ = a.eventsLogService.Close()
a.script.Close()
_ = a.scriptRepository.Close()
a.repository.Close()
a.dbpool.Close()
return nil
Expand Down
8 changes: 5 additions & 3 deletions ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
)

type Config struct {
appbase.Config `mapstructure:",squash"`
// # BASE CONFIG - base setting for jitsu apps
appbase.Config `mapstructure:",squash"`
// # KAFKA CONFIG - base kafka setting
kafkabase.KafkaConfig `mapstructure:",squash"`
// # REPOSITORY CONFIG - settings for loading streams from repository
RepositoryConfig `mapstructure:",squash"`

DatabaseURL string `mapstructure:"DATABASE_URL"`

Expand Down Expand Up @@ -39,8 +43,6 @@ type Config struct {
RedisTLSCA string `mapstructure:"REDIS_TLS_CA"`
EventsLogMaxSize int `mapstructure:"EVENTS_LOG_MAX_SIZE" default:"1000"`

RepositoryRefreshPeriodSec int `mapstructure:"REPOSITORY_REFRESH_PERIOD_SEC" default:"2"`

RotorURL string `mapstructure:"ROTOR_URL"`
DeviceFunctionsTimeoutMs int `mapstructure:"DEVICE_FUNCTIONS_TIMEOUT_MS" default:"200"`

Expand Down
Loading

0 comments on commit 2fb2a0d

Please sign in to comment.