Skip to content

Commit

Permalink
Added config-keeper service
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 3, 2024
1 parent f791dbe commit 941d61a
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 5 deletions.
12 changes: 8 additions & 4 deletions all.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN apt-get install gcc libc6-dev

WORKDIR /app

RUN mkdir jitsubase kafkabase eventslog bulkerlib bulkerapp ingest sync-sidecar sync-controller ingress-manager admin
RUN mkdir jitsubase kafkabase eventslog bulkerlib bulkerapp ingest sync-sidecar sync-controller ingress-manager config-keeper admin
RUN mkdir connectors connectors/airbytecdk connectors/firebase

COPY jitsubase/go.* ./jitsubase/
Expand All @@ -30,6 +30,7 @@ COPY ingest/go.* ./ingest/
COPY sync-sidecar/go.* ./sync-sidecar/
COPY sync-controller/go.* ./sync-controller/
COPY ingress-manager/go.* ./ingress-manager/
COPY config-keeper/go.* ./config-keeper/

COPY admin/go.* ./admin/
COPY connectors/airbytecdk/go.* ./connectors/airbytecdk/
Expand All @@ -44,14 +45,12 @@ WORKDIR /app

COPY . .

ENV GOEXPERIMENT loopvar

RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o bulker ./bulkerapp
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o ingest ./ingest
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o sidecar ./sync-sidecar
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o syncctl ./sync-controller
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o ingmgr ./ingress-manager

RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o cfgkpr ./config-keeper

FROM base as bulker

Expand All @@ -77,3 +76,8 @@ FROM base as ingmgr

COPY --from=builder /app/ingmgr ./
CMD ["/app/ingmgr"]

FROM base as cfgkpr

COPY --from=builder /app/cfgkpr ./
CMD ["/app/cfgkpr"]
94 changes: 94 additions & 0 deletions config-keeper/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"context"
"fmt"
"github.com/jitsucom/bulker/jitsubase/appbase"
"io"
"net/http"
"strings"
"sync/atomic"
"time"
)

type Context struct {
config *Config
server *http.Server
pScript appbase.Repository[[]byte]
repositories map[string]appbase.Repository[[]byte]
}

type RawRepositoryData struct {
data atomic.Pointer[[]byte]
}

func (r *RawRepositoryData) Init(reader io.Reader, tag any) error {
data, err := io.ReadAll(reader)
if err != nil {
return err
}
r.data.Store(&data)
return nil
}

func (r *RawRepositoryData) GetData() *[]byte {
return r.data.Load()
}

func (r *RawRepositoryData) Store(writer io.Writer) error {
_, err := writer.Write(*r.data.Load())
return err
}

func (a *Context) InitContext(settings *appbase.AppSettings) error {
var err error
a.config = &Config{}
err = appbase.InitAppConfig(a.config, settings)
if err != nil {
return err
}

baseUrl := a.config.RepositoryBaseURL
token := a.config.RepositoryAuthToken
refreshPeriodSec := a.config.RepositoryRefreshPeriodSec
cacheDir := a.config.CacheDir

a.pScript = appbase.NewHTTPRepository[[]byte]("p.js", a.config.ScriptOrigin, "", appbase.HTTPTagETag, &RawRepositoryData{}, 5, 60, cacheDir)
reps := a.config.Repositories
a.repositories = map[string]appbase.Repository[[]byte]{
"p.js": a.pScript,
}
for _, rep := range strings.Split(reps, ",") {
a.repositories[rep] = appbase.NewHTTPRepository[[]byte](rep, baseUrl+"/"+rep, token, appbase.HTTPTagLastModified, &RawRepositoryData{}, 2, refreshPeriodSec, cacheDir)

}
router := NewRouter(a)
a.server = &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", a.config.HTTPPort),
Handler: router.Engine(),
ReadTimeout: time.Second * 60,
ReadHeaderTimeout: time.Second * 60,
IdleTimeout: time.Second * 65,
}
return nil
}

func (a *Context) Cleanup() error {
for _, rep := range a.repositories {
_ = rep.Close()
}
return nil
}

func (a *Context) ShutdownSignal() error {
_ = a.server.Shutdown(context.Background())
return nil
}

func (a *Context) Server() *http.Server {
return a.server
}

func (a *Context) Config() *Config {
return a.config
}
30 changes: 30 additions & 0 deletions config-keeper/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/spf13/viper"
"os"
)

type Config struct {
appbase.Config `mapstructure:",squash"`

//Cache dir for repositories data
CacheDir string `mapstructure:"CACHE_DIR"`

ScriptOrigin string `mapstructure:"SCRIPT_ORIGIN" default:"https://cdn.jsdelivr.net/npm/@jitsu/js@latest/dist/web/p.js.txt"`

RepositoryBaseURL string `mapstructure:"REPOSITORY_BASE_URL"`
RepositoryAuthToken string `mapstructure:"REPOSITORY_AUTH_TOKEN"`
RepositoryRefreshPeriodSec int `mapstructure:"REPOSITORY_REFRESH_PERIOD_SEC" default:"5"`
Repositories string `mapstructure:"REPOSITORIES" default:"streams-with-destinations,workspaces-with-profiles,functions,rotor-connections,bulker-connections"`
}

func init() {
viper.SetDefault("HTTP_PORT", utils.NvlString(os.Getenv("PORT"), "3059"))
}

func (c *Config) PostInit(settings *appbase.AppSettings) error {
return c.Config.PostInit(settings)
}
56 changes: 56 additions & 0 deletions config-keeper/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
module github.com/jitsucom/bulker/config-keeper

go 1.23

toolchain go1.23.3

require (
github.com/gin-gonic/gin v1.10.0
github.com/spf13/viper v1.19.0
)

require (
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
18 changes: 18 additions & 0 deletions config-keeper/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
"github.com/jitsucom/bulker/jitsubase/appbase"
"os"
)

func main() {
settings := &appbase.AppSettings{
ConfigPath: os.Getenv("CFGKPR_CONFIG_PATH"),
Name: "cfgkpr",
EnvPrefix: "CFGKPR",
ConfigName: "cfgkpr",
ConfigType: "env",
}
application := appbase.NewApp[Config](&Context{}, settings)
application.Run()
}
134 changes: 134 additions & 0 deletions config-keeper/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/utils"
"net/http"
"strings"
"time"
)

type Router struct {
*appbase.Router
appContext *Context
}

func NewRouter(appContext *Context) *Router {
base := appbase.NewRouterBase(appContext.config.Config, []string{"/health", "/p.js"})

router := &Router{
Router: base,
appContext: appContext,
}
engine := router.Engine()
engine.GET("/p.js", router.ScriptHandler)
engine.GET("/:repository", router.RepositoryHandler)

engine.GET("/health", func(c *gin.Context) {
reps := utils.JoinNonEmptyStrings(",", appContext.config.Repositories, "p.js")
healthy := true
repStatuses := map[string]any{}
now := time.Now()
for _, rep := range strings.Split(reps, ",") {
repository, ok := appContext.repositories[rep]
if !ok {
healthy = false
repStatuses[rep] = map[string]any{"error": "not_found"}
continue
}
if !repository.Loaded() {
healthy = false
}
lastSuccess := repository.LastSuccess()
if lastSuccess.IsZero() || now.Sub(lastSuccess) > 10*time.Minute {
healthy = false
}
}
for name, repository := range appContext.repositories {
lastSuccess := repository.LastSuccess()
status := map[string]any{
"loaded": repository.Loaded(),
"last_success": repository.LastSuccess(),
}
if lastSuccess.IsZero() || now.Sub(lastSuccess) > 10*time.Minute {
status["error"] = fmt.Sprintf("no refreshes since: %s", lastSuccess)
}
repStatuses[name] = status
}
c.JSON(http.StatusOK, gin.H{
"status": utils.Ternary(healthy, "pass", "fail"),
"repositories": repStatuses,
})
})

return router
}
func (r *Router) RepositoryHandler(c *gin.Context) {
repName := c.Param("repository")
repository, ok := r.appContext.repositories[repName]
if !ok {
r.Infof("Repository %s not found, initializing", repName)
repository = appbase.NewHTTPRepository[[]byte](repName, r.appContext.config.RepositoryBaseURL+"/"+repName, r.appContext.config.RepositoryAuthToken, appbase.HTTPTagLastModified, &RawRepositoryData{}, 2, r.appContext.config.RepositoryRefreshPeriodSec, r.appContext.config.CacheDir)
initTimeout := time.After(time.Second * 60)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
if repository.Loaded() {
r.Infof("Repository %s initialized", repName)
r.appContext.repositories[repName] = repository
break
}
case <-initTimeout:
if !repository.Loaded() {
_ = repository.Close()
r.Errorf("Repository %s initialization timeout", repName)
_ = c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("Repository %s initialization timeout", repName))
return
} else {
r.Infof("Repository %s initialized", repName)
r.appContext.repositories[repName] = repository
break
}
}
}
var ifModifiedSince time.Time
var err error
ifModifiedSinceS := c.GetHeader("If-Modified-Since")
if ifModifiedSinceS != "" {
ifModifiedSince, err = time.Parse(http.TimeFormat, ifModifiedSinceS)
if err != nil {
fmt.Println("Error parsing If-Modified-Since header:", err)
}
}
lastModified := repository.GetLastModified()

if !ifModifiedSince.IsZero() && !lastModified.IsZero() && !lastModified.After(ifModifiedSince) {
c.Header("Last-Modified", lastModified.Format(http.TimeFormat))
c.Status(http.StatusNotModified)
return
}
if !lastModified.IsZero() {
c.Header("Last-Modified", lastModified.Format(http.TimeFormat))
}
c.Writer.Header().Set("Content-Type", "application/json")
_, _ = c.Writer.Write(*repository.GetData())
}

func (r *Router) ScriptHandler(c *gin.Context) {
ifNoneMatch := c.GetHeader("If-None-Match")
etag := r.appContext.pScript.GetEtag()

if ifNoneMatch != "" && ifNoneMatch == etag {
c.Header("ETag", etag)
c.Status(http.StatusNotModified)
return
}
if etag != "" {
c.Header("ETag", etag)
}
c.Writer.Header().Set("Content-Type", "application/javascript")
_, _ = c.Writer.Write(*r.appContext.pScript.GetData())
}
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ toolchain go1.23.3
use (
./bulkerapp
./bulkerlib
./config-keeper
./connectors/airbytecdk
./connectors/firebase
./eventslog
Expand Down
1 change: 0 additions & 1 deletion ingest.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ WORKDIR /app

COPY . .

ENV GOEXPERIMENT loopvar
# Build ingest
RUN go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o ingest ./ingest

Expand Down
Loading

0 comments on commit 941d61a

Please sign in to comment.