Skip to content

Commit

Permalink
ingest: load config from db only if updatedAt changed
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 2, 2024
1 parent 1dfa68c commit 5d82948
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Config struct {
RedisTLSCA string `mapstructure:"REDIS_TLS_CA"`
EventsLogMaxSize int `mapstructure:"EVENTS_LOG_MAX_SIZE" default:"1000"`

RepositoryRefreshPeriodSec int `mapstructure:"REPOSITORY_REFRESH_PERIOD_SEC" default:"5"`
RepositoryRefreshPeriodSec int `mapstructure:"REPOSITORY_REFRESH_PERIOD_SEC" default:"2"`

RotorURL string `mapstructure:"ROTOR_URL"`
DeviceFunctionsTimeoutMs int `mapstructure:"DEVICE_FUNCTIONS_TIMEOUT_MS" default:"200"`
Expand Down
43 changes: 25 additions & 18 deletions ingest/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"encoding/json"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/safego"
Expand All @@ -14,24 +16,8 @@ import (
"time"
)

const SQLQuery = `select b."streamId", json_build_object('stream', b."srcConfig", 'deleted', b."deleted", 'backupEnabled', true, 'destinations', b."connectionData", 'updatedAt', to_char (b."updatedAt", 'YYYY-MM-DD"T"HH24:MI:SS"Z"') ) from
(select
src.id as "streamId",
ws.id,
src.deleted or ws.deleted as "deleted",
(src."config" || jsonb_build_object('id', src.id, 'workspaceId', ws.id)) as "srcConfig",
json_agg( case when dst.id is not null and dst.deleted = false and link.id is not null and link.deleted = false then json_build_object('id', dst.id,
'credentials', dst."config",
'destinationType', dst."config"->>'destinationType',
'connectionId', link."id",
'options', link."data") end ) as "connectionData",
max(greatest(link."updatedAt", src."updatedAt", dst."updatedAt", ws."updatedAt")) as "updatedAt"
from newjitsu."ConfigurationObject" src
join newjitsu."Workspace" ws on src."workspaceId" = ws.id
left join newjitsu."ConfigurationObjectLink" link on src.id = link."fromId" and link."workspaceId" = src."workspaceId"
left join newjitsu."ConfigurationObject" dst on dst.id = link."toId" and dst.type='destination' and dst."workspaceId" = link."workspaceId"
where src."type" ='stream'
group by 1,2) b`
const SQLLastUpdatedQuery = `select * from last_updated`
const SQLQuery = `select * from streams_with_destinations`

type Repository struct {
appbase.Service
Expand All @@ -42,6 +28,7 @@ type Repository struct {
apiKeyBindings atomic.Pointer[map[string]*ApiKeyBinding]
streamsByIds atomic.Pointer[map[string]*StreamWithDestinations]
streamsByDomains atomic.Pointer[map[string][]*StreamWithDestinations]
lastModified atomic.Pointer[time.Time]
closed chan struct{}
}

Expand Down Expand Up @@ -144,6 +131,25 @@ func (r *Repository) refresh() {
r.Debugf("Refreshed in %v", time.Now().Sub(start))
}
}()
ifModifiedSince := r.lastModified.Load()
var lastModified time.Time
err = r.dbpool.QueryRow(context.Background(), SQLLastUpdatedQuery).Scan(&lastModified)
if err != nil {
err = r.NewError("Error querying last updated: %v", err)
return
} else if errors.Is(err, pgx.ErrNoRows) || lastModified.IsZero() {
//Failed to load repository last updated date. Probably database has no records yet.
r.apiKeyBindings.Store(&apiKeyBindings)
r.streamsByIds.Store(&streamsByIds)
r.streamsByDomains.Store(&streamsByDomains)
r.inited.Store(true)
return
}
if ifModifiedSince != nil && lastModified.Compare(*ifModifiedSince) <= 0 {
return
}
r.Infof("Config updated: %s previous update date: %s`", lastModified, ifModifiedSince)

rows, err := r.dbpool.Query(context.Background(), SQLQuery)
if err != nil {
err = r.NewError("Error querying streams: %v", err)
Expand Down Expand Up @@ -193,6 +199,7 @@ func (r *Repository) refresh() {
r.streamsByIds.Store(&streamsByIds)
r.streamsByDomains.Store(&streamsByDomains)
r.inited.Store(true)
r.lastModified.Store(&lastModified)
if r.cacheDir != "" {
r.storeCached(RepositoryCache{ApiKeyBindings: apiKeyBindings, StreamsByIds: streamsByIds, StreamsByDomains: streamsByDomains})
}
Expand Down

0 comments on commit 5d82948

Please sign in to comment.