diff --git a/cmd/soft/root.go b/cmd/soft/root.go index 1dc8e1883..273c9d6a3 100644 --- a/cmd/soft/root.go +++ b/cmd/soft/root.go @@ -8,6 +8,7 @@ import ( "github.com/charmbracelet/log" . "github.com/charmbracelet/soft-serve/internal/log" "github.com/spf13/cobra" + "go.uber.org/automaxprocs/maxprocs" ) var ( @@ -52,6 +53,13 @@ func init() { func main() { logger := NewDefaultLogger() + + // Set the max number of processes to the number of CPUs + // This is useful when running soft serve in a container + if _, err := maxprocs.Set(maxprocs.Logger(logger.Debugf)); err != nil { + logger.Warn("couldn't set automaxprocs", "error", err) + } + ctx := log.WithContext(context.Background(), logger) if err := rootCmd.ExecuteContext(ctx); err != nil { os.Exit(1) diff --git a/go.mod b/go.mod index e7ca5b691..bdd5d6109 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/prometheus/client_golang v1.15.1 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cobra v1.7.0 + go.uber.org/automaxprocs v1.5.2 goji.io v2.0.2+incompatible golang.org/x/crypto v0.9.0 golang.org/x/sync v0.2.0 diff --git a/go.sum b/go.sum index 3cbdc98fb..29e1ad215 100644 --- a/go.sum +++ b/go.sum @@ -161,6 +161,7 @@ github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFz github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= @@ -207,6 +208,8 @@ github.com/yuin/goldmark v1.5.2 h1:ALmeCk/px5FSm1MAcFBAsVKZjDuMVj8Tm7FFIlMJnqU= github.com/yuin/goldmark v1.5.2/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark-emoji v1.0.1 h1:ctuWEyzGBwiucEqxzwe0SOYDXPAucOrE9NQC18Wa1os= github.com/yuin/goldmark-emoji v1.0.1/go.mod h1:2w1E6FEWLcDQkoTE+7HU6QF1F6SLlNGjRIBbIZQFqkQ= +go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME= +go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= goji.io v2.0.2+incompatible h1:uIssv/elbKRLznFUy3Xj4+2Mz/qKhek/9aZQDUMae7c= goji.io v2.0.2+incompatible/go.mod h1:sbqFwrtqZACxLBTQcdgVjFh54yGVCvwq8+w49MVMMIk= golang.org/x/arch v0.1.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= diff --git a/internal/log/log.go b/internal/log/log.go index a1184153b..7389b80fb 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -2,17 +2,29 @@ package log import ( "os" + "path/filepath" "strconv" "strings" "time" "github.com/charmbracelet/log" + "github.com/charmbracelet/soft-serve/server/config" ) var contextKey = &struct{ string }{"logger"} // NewDefaultLogger returns a new logger with default settings. func NewDefaultLogger() *log.Logger { + dp := os.Getenv("SOFT_SERVE_DATA_PATH") + if dp == "" { + dp = "data" + } + + cfg, err := config.ParseConfig(filepath.Join(dp, "config.yaml")) + if err != nil { + log.Errorf("failed to parse config: %v", err) + } + logger := log.NewWithOptions(os.Stderr, log.Options{ ReportTimestamp: true, TimeFormat: time.DateOnly, @@ -22,11 +34,9 @@ func NewDefaultLogger() *log.Logger { logger.SetLevel(log.DebugLevel) } - if tsfmt := os.Getenv("SOFT_SERVE_LOG_TIME_FORMAT"); tsfmt != "" { - logger.SetTimeFormat(tsfmt) - } + logger.SetTimeFormat(cfg.Log.TimeFormat) - switch strings.ToLower(os.Getenv("SOFT_SERVE_LOG_FORMAT")) { + switch strings.ToLower(cfg.Log.Format) { case "json": logger.SetFormatter(log.JSONFormatter) case "logfmt": diff --git a/internal/sync/workqueue.go b/internal/sync/workqueue.go new file mode 100644 index 000000000..16eb442a3 --- /dev/null +++ b/internal/sync/workqueue.go @@ -0,0 +1,98 @@ +package sync + +import ( + "context" + "sync" + + "golang.org/x/sync/semaphore" +) + +// WorkPool is a pool of work to be done. +type WorkPool struct { + workers int + work map[string]func() + mu sync.RWMutex + sem *semaphore.Weighted + ctx context.Context + logger func(string, ...interface{}) +} + +// WorkPoolOption is a function that configures a WorkPool. +type WorkPoolOption func(*WorkPool) + +// WithWorkPoolLogger sets the logger to use. +func WithWorkPoolLogger(logger func(string, ...interface{})) WorkPoolOption { + return func(wq *WorkPool) { + wq.logger = logger + } +} + +// NewWorkPool creates a new work pool. The workers argument specifies the +// number of concurrent workers to run the work. +// The queue will chunk the work into batches of workers size. +func NewWorkPool(ctx context.Context, workers int, opts ...WorkPoolOption) *WorkPool { + wq := &WorkPool{ + workers: workers, + work: make(map[string]func()), + ctx: ctx, + } + + for _, opt := range opts { + opt(wq) + } + + if wq.workers <= 0 { + wq.workers = 1 + } + + wq.sem = semaphore.NewWeighted(int64(wq.workers)) + + return wq +} + +// Run starts the workers and waits for them to finish. +func (wq *WorkPool) Run() { + for id, fn := range wq.work { + if err := wq.sem.Acquire(wq.ctx, 1); err != nil { + wq.logf("workpool: %v", err) + return + } + + go func(id string, fn func()) { + defer wq.sem.Release(1) + fn() + wq.mu.Lock() + delete(wq.work, id) + wq.mu.Unlock() + }(id, fn) + } + + if err := wq.sem.Acquire(wq.ctx, int64(wq.workers)); err != nil { + wq.logf("workpool: %v", err) + } +} + +// Add adds a new job to the pool. +// If the job already exists, it is a no-op. +func (wq *WorkPool) Add(id string, fn func()) { + wq.mu.Lock() + defer wq.mu.Unlock() + if _, ok := wq.work[id]; ok { + return + } + wq.work[id] = fn +} + +// Status checks if a job is in the queue. +func (wq *WorkPool) Status(id string) bool { + wq.mu.RLock() + defer wq.mu.RUnlock() + _, ok := wq.work[id] + return ok +} + +func (wq *WorkPool) logf(format string, args ...interface{}) { + if wq.logger != nil { + wq.logger(format, args...) + } +} diff --git a/server/config/config.go b/server/config/config.go index f75a7a6d2..a96e69c75 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -73,6 +73,17 @@ type StatsConfig struct { ListenAddr string `env:"LISTEN_ADDR" yaml:"listen_addr"` } +// LogConfig is the logger configuration. +type LogConfig struct { + // Format is the format of the logs. + // Valid values are "json", "logfmt", and "text". + Format string `env:"FORMAT" yaml:"format"` + + // Time format for the log `ts` field. + // Format must be described in Golang's time format. + TimeFormat string `env:"TIME_FORMAT" yaml:"time_format"` +} + // Config is the configuration for Soft Serve. type Config struct { // Name is the name of the server. @@ -90,13 +101,8 @@ type Config struct { // Stats is the configuration for the stats server. Stats StatsConfig `envPrefix:"STATS_" yaml:"stats"` - // LogFormat is the format of the logs. - // Valid values are "json", "logfmt", and "text". - LogFormat string `env:"LOG_FORMAT" yaml:"log_format"` - - // Time format for the log `ts` field. - // Format must be described in Golang's time format. - LogTimeFormat string `env:"LOG_TIME_FORMAT" yaml:"log_time_format"` + // Log is the logger configuration. + Log LogConfig `envPrefix:"LOG_" yaml:"log"` // InitialAdminKeys is a list of public keys that will be added to the list of admins. InitialAdminKeys []string `env:"INITIAL_ADMIN_KEYS" envSeparator:"\n" yaml:"initial_admin_keys"` @@ -111,10 +117,8 @@ type Config struct { func parseConfig(path string) (*Config, error) { dataPath := filepath.Dir(path) cfg := &Config{ - Name: "Soft Serve", - LogFormat: "text", - LogTimeFormat: time.DateOnly, - DataPath: dataPath, + Name: "Soft Serve", + DataPath: dataPath, SSH: SSHConfig{ ListenAddr: ":23231", PublicURL: "ssh://localhost:23231", @@ -136,6 +140,10 @@ func parseConfig(path string) (*Config, error) { Stats: StatsConfig{ ListenAddr: "localhost:23233", }, + Log: LogConfig{ + Format: "text", + TimeFormat: time.DateTime, + }, } f, err := os.Open(path) @@ -182,11 +190,11 @@ func parseConfig(path string) (*Config, error) { func ParseConfig(path string) (*Config, error) { cfg, err := parseConfig(path) if err != nil { - return nil, err + return cfg, err } if err := cfg.validate(); err != nil { - return nil, err + return cfg, err } return cfg, nil diff --git a/server/config/file.go b/server/config/file.go index 36c0a7a31..09e5ce2e0 100644 --- a/server/config/file.go +++ b/server/config/file.go @@ -11,9 +11,13 @@ var configFileTmpl = template.Must(template.New("config").Parse(`# Soft Serve Se # This is the name that will be displayed in the UI. name: "{{ .Name }}" -# Log format to use. Valid values are "json", "logfmt", and "text". -log_format: "{{ .LogFormat }}" -log_time_format: "{{ .LogTimeFormat }}" +# Logging configuration. +log: + # Log format to use. Valid values are "json", "logfmt", and "text". + format: "{{ .Log.Format }}" + # Time format for the log "timestamp" field. + # Should be described in Golang's time format. + time_format: "{{ .Log.TimeFormat }}" # The SSH server configuration. ssh: diff --git a/server/git/git.go b/server/git/git.go index 85e2cf068..ef8affe20 100644 --- a/server/git/git.go +++ b/server/git/git.go @@ -83,12 +83,12 @@ func RunGit(ctx context.Context, in io.Reader, out io.Writer, er io.Writer, dir logger := log.FromContext(ctx).WithPrefix("rungit") c := exec.CommandContext(ctx, "git", args...) c.Dir = dir - c.Env = append(c.Env, envs...) + c.Env = append(os.Environ(), envs...) c.Env = append(c.Env, "PATH="+os.Getenv("PATH")) c.Env = append(c.Env, "SOFT_SERVE_DEBUG="+os.Getenv("SOFT_SERVE_DEBUG")) if cfg != nil { - c.Env = append(c.Env, "SOFT_SERVE_LOG_FORMAT="+cfg.LogFormat) - c.Env = append(c.Env, "SOFT_SERVE_LOG_TIME_FORMAT="+cfg.LogTimeFormat) + c.Env = append(c.Env, "SOFT_SERVE_LOG_FORMAT="+cfg.Log.Format) + c.Env = append(c.Env, "SOFT_SERVE_LOG_TIME_FORMAT="+cfg.Log.TimeFormat) } stdin, err := c.StdinPipe() diff --git a/server/jobs.go b/server/jobs.go index 06656d1cc..9cd23a23f 100644 --- a/server/jobs.go +++ b/server/jobs.go @@ -3,15 +3,15 @@ package server import ( "fmt" "path/filepath" + "runtime" "github.com/charmbracelet/soft-serve/git" + "github.com/charmbracelet/soft-serve/internal/sync" ) -var ( - jobSpecs = map[string]string{ - "mirror": "@every 10m", - } -) +var jobSpecs = map[string]string{ + "mirror": "@every 10m", +} // mirrorJob runs the (pull) mirror job task. func (s *Server) mirrorJob() func() { @@ -25,26 +25,37 @@ func (s *Server) mirrorJob() func() { return } + // Divide the work up among the number of CPUs. + wq := sync.NewWorkPool(s.ctx, runtime.GOMAXPROCS(0), + sync.WithWorkPoolLogger(logger.Errorf), + ) + + logger.Debug("updating mirror repos") for _, repo := range repos { if repo.IsMirror() { - logger.Info("updating mirror", "repo", repo.Name()) r, err := repo.Open() if err != nil { logger.Error("error opening repository", "repo", repo.Name(), "err", err) continue } - cmd := git.NewCommand("remote", "update", "--prune") - cmd.AddEnvs( - fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`, - filepath.Join(cfg.DataPath, "ssh", "known_hosts"), - cfg.SSH.ClientKeyPath, - ), - ) - if _, err := cmd.RunInDir(r.Path); err != nil { - logger.Error("error running git remote update", "repo", repo.Name(), "err", err) - } + name := repo.Name() + wq.Add(name, func() { + cmd := git.NewCommand("remote", "update", "--prune") + cmd.AddEnvs( + fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`, + filepath.Join(cfg.DataPath, "ssh", "known_hosts"), + cfg.SSH.ClientKeyPath, + ), + ) + if _, err := cmd.RunInDir(r.Path); err != nil { + logger.Error("error running git remote update", "repo", name, "err", err) + } + + }) } } + + wq.Run() } } diff --git a/server/server.go b/server/server.go index 3b9554b2b..690c9650b 100644 --- a/server/server.go +++ b/server/server.go @@ -62,7 +62,7 @@ func NewServer(ctx context.Context) (*Server, error) { } // Add cron jobs. - srv.Cron.AddFunc(jobSpecs["mirror"], srv.mirrorJob()) + _, _ = srv.Cron.AddFunc(jobSpecs["mirror"], srv.mirrorJob()) srv.SSHServer, err = sshsrv.NewSSHServer(ctx) if err != nil {