Skip to content

Commit

Permalink
fix(log): respect log config (#280)
Browse files Browse the repository at this point in the history
* fix(log): respect log config

Parse log config

fix(git): ensure os envs are present

feat: set the default time format to dateTime

fix(log): change update mirror log message into debug

fix(config): rename log config struct

fix(config): always return cfg

* perf: update mirrors in a workpool (#285)

* perf: update mirrors in a workqueue

Implement a simple chunked workqueue to queue updating mirrors. We use
the number of cpus to calculate the number of workers to distribute the
work to.

* fix: use automaxprocs

Signed-off-by: Carlos Alexandro Becker <[email protected]>

* fix: set maxprocs in main

* feat(wp): use a workpool impl

Use semaphores to implement a workpool of n workers
and use that to run the mirroring job

---------

Signed-off-by: Carlos Alexandro Becker <[email protected]>
Co-authored-by: Carlos Alexandro Becker <[email protected]>

---------

Signed-off-by: Carlos Alexandro Becker <[email protected]>
Co-authored-by: Carlos Alexandro Becker <[email protected]>
  • Loading branch information
aymanbagabas and caarlos0 authored May 11, 2023
1 parent 8d13e32 commit b07de74
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 40 deletions.
8 changes: 8 additions & 0 deletions cmd/soft/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/charmbracelet/log"
. "github.com/charmbracelet/soft-serve/internal/log"
"github.com/spf13/cobra"
"go.uber.org/automaxprocs/maxprocs"
)

var (
Expand Down Expand Up @@ -52,6 +53,13 @@ func init() {

func main() {
logger := NewDefaultLogger()

// Set the max number of processes to the number of CPUs
// This is useful when running soft serve in a container
if _, err := maxprocs.Set(maxprocs.Logger(logger.Debugf)); err != nil {
logger.Warn("couldn't set automaxprocs", "error", err)
}

ctx := log.WithContext(context.Background(), logger)
if err := rootCmd.ExecuteContext(ctx); err != nil {
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/prometheus/client_golang v1.15.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.7.0
go.uber.org/automaxprocs v1.5.2
goji.io v2.0.2+incompatible
golang.org/x/crypto v0.9.0
golang.org/x/sync v0.2.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFz
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
Expand Down Expand Up @@ -207,6 +208,8 @@ github.com/yuin/goldmark v1.5.2 h1:ALmeCk/px5FSm1MAcFBAsVKZjDuMVj8Tm7FFIlMJnqU=
github.com/yuin/goldmark v1.5.2/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/goldmark-emoji v1.0.1 h1:ctuWEyzGBwiucEqxzwe0SOYDXPAucOrE9NQC18Wa1os=
github.com/yuin/goldmark-emoji v1.0.1/go.mod h1:2w1E6FEWLcDQkoTE+7HU6QF1F6SLlNGjRIBbIZQFqkQ=
go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
goji.io v2.0.2+incompatible h1:uIssv/elbKRLznFUy3Xj4+2Mz/qKhek/9aZQDUMae7c=
goji.io v2.0.2+incompatible/go.mod h1:sbqFwrtqZACxLBTQcdgVjFh54yGVCvwq8+w49MVMMIk=
golang.org/x/arch v0.1.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
Expand Down
18 changes: 14 additions & 4 deletions internal/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,29 @@ package log

import (
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/charmbracelet/log"
"github.com/charmbracelet/soft-serve/server/config"
)

var contextKey = &struct{ string }{"logger"}

// NewDefaultLogger returns a new logger with default settings.
func NewDefaultLogger() *log.Logger {
dp := os.Getenv("SOFT_SERVE_DATA_PATH")
if dp == "" {
dp = "data"
}

cfg, err := config.ParseConfig(filepath.Join(dp, "config.yaml"))
if err != nil {
log.Errorf("failed to parse config: %v", err)
}

logger := log.NewWithOptions(os.Stderr, log.Options{
ReportTimestamp: true,
TimeFormat: time.DateOnly,
Expand All @@ -22,11 +34,9 @@ func NewDefaultLogger() *log.Logger {
logger.SetLevel(log.DebugLevel)
}

if tsfmt := os.Getenv("SOFT_SERVE_LOG_TIME_FORMAT"); tsfmt != "" {
logger.SetTimeFormat(tsfmt)
}
logger.SetTimeFormat(cfg.Log.TimeFormat)

switch strings.ToLower(os.Getenv("SOFT_SERVE_LOG_FORMAT")) {
switch strings.ToLower(cfg.Log.Format) {
case "json":
logger.SetFormatter(log.JSONFormatter)
case "logfmt":
Expand Down
98 changes: 98 additions & 0 deletions internal/sync/workqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sync

import (
"context"
"sync"

"golang.org/x/sync/semaphore"
)

// WorkPool is a pool of work to be done.
type WorkPool struct {
workers int
work map[string]func()
mu sync.RWMutex
sem *semaphore.Weighted
ctx context.Context
logger func(string, ...interface{})
}

// WorkPoolOption is a function that configures a WorkPool.
type WorkPoolOption func(*WorkPool)

// WithWorkPoolLogger sets the logger to use.
func WithWorkPoolLogger(logger func(string, ...interface{})) WorkPoolOption {
return func(wq *WorkPool) {
wq.logger = logger
}
}

// NewWorkPool creates a new work pool. The workers argument specifies the
// number of concurrent workers to run the work.
// The queue will chunk the work into batches of workers size.
func NewWorkPool(ctx context.Context, workers int, opts ...WorkPoolOption) *WorkPool {
wq := &WorkPool{
workers: workers,
work: make(map[string]func()),
ctx: ctx,
}

for _, opt := range opts {
opt(wq)
}

if wq.workers <= 0 {
wq.workers = 1
}

wq.sem = semaphore.NewWeighted(int64(wq.workers))

return wq
}

// Run starts the workers and waits for them to finish.
func (wq *WorkPool) Run() {
for id, fn := range wq.work {
if err := wq.sem.Acquire(wq.ctx, 1); err != nil {
wq.logf("workpool: %v", err)
return
}

go func(id string, fn func()) {
defer wq.sem.Release(1)
fn()
wq.mu.Lock()
delete(wq.work, id)
wq.mu.Unlock()
}(id, fn)
}

if err := wq.sem.Acquire(wq.ctx, int64(wq.workers)); err != nil {
wq.logf("workpool: %v", err)
}
}

// Add adds a new job to the pool.
// If the job already exists, it is a no-op.
func (wq *WorkPool) Add(id string, fn func()) {
wq.mu.Lock()
defer wq.mu.Unlock()
if _, ok := wq.work[id]; ok {
return
}
wq.work[id] = fn
}

// Status checks if a job is in the queue.
func (wq *WorkPool) Status(id string) bool {
wq.mu.RLock()
defer wq.mu.RUnlock()
_, ok := wq.work[id]
return ok
}

func (wq *WorkPool) logf(format string, args ...interface{}) {
if wq.logger != nil {
wq.logger(format, args...)
}
}
34 changes: 21 additions & 13 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ type StatsConfig struct {
ListenAddr string `env:"LISTEN_ADDR" yaml:"listen_addr"`
}

// LogConfig is the logger configuration.
type LogConfig struct {
// Format is the format of the logs.
// Valid values are "json", "logfmt", and "text".
Format string `env:"FORMAT" yaml:"format"`

// Time format for the log `ts` field.
// Format must be described in Golang's time format.
TimeFormat string `env:"TIME_FORMAT" yaml:"time_format"`
}

// Config is the configuration for Soft Serve.
type Config struct {
// Name is the name of the server.
Expand All @@ -90,13 +101,8 @@ type Config struct {
// Stats is the configuration for the stats server.
Stats StatsConfig `envPrefix:"STATS_" yaml:"stats"`

// LogFormat is the format of the logs.
// Valid values are "json", "logfmt", and "text".
LogFormat string `env:"LOG_FORMAT" yaml:"log_format"`

// Time format for the log `ts` field.
// Format must be described in Golang's time format.
LogTimeFormat string `env:"LOG_TIME_FORMAT" yaml:"log_time_format"`
// Log is the logger configuration.
Log LogConfig `envPrefix:"LOG_" yaml:"log"`

// InitialAdminKeys is a list of public keys that will be added to the list of admins.
InitialAdminKeys []string `env:"INITIAL_ADMIN_KEYS" envSeparator:"\n" yaml:"initial_admin_keys"`
Expand All @@ -111,10 +117,8 @@ type Config struct {
func parseConfig(path string) (*Config, error) {
dataPath := filepath.Dir(path)
cfg := &Config{
Name: "Soft Serve",
LogFormat: "text",
LogTimeFormat: time.DateOnly,
DataPath: dataPath,
Name: "Soft Serve",
DataPath: dataPath,
SSH: SSHConfig{
ListenAddr: ":23231",
PublicURL: "ssh://localhost:23231",
Expand All @@ -136,6 +140,10 @@ func parseConfig(path string) (*Config, error) {
Stats: StatsConfig{
ListenAddr: "localhost:23233",
},
Log: LogConfig{
Format: "text",
TimeFormat: time.DateTime,
},
}

f, err := os.Open(path)
Expand Down Expand Up @@ -182,11 +190,11 @@ func parseConfig(path string) (*Config, error) {
func ParseConfig(path string) (*Config, error) {
cfg, err := parseConfig(path)
if err != nil {
return nil, err
return cfg, err
}

if err := cfg.validate(); err != nil {
return nil, err
return cfg, err
}

return cfg, nil
Expand Down
10 changes: 7 additions & 3 deletions server/config/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ var configFileTmpl = template.Must(template.New("config").Parse(`# Soft Serve Se
# This is the name that will be displayed in the UI.
name: "{{ .Name }}"
# Log format to use. Valid values are "json", "logfmt", and "text".
log_format: "{{ .LogFormat }}"
log_time_format: "{{ .LogTimeFormat }}"
# Logging configuration.
log:
# Log format to use. Valid values are "json", "logfmt", and "text".
format: "{{ .Log.Format }}"
# Time format for the log "timestamp" field.
# Should be described in Golang's time format.
time_format: "{{ .Log.TimeFormat }}"
# The SSH server configuration.
ssh:
Expand Down
6 changes: 3 additions & 3 deletions server/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func RunGit(ctx context.Context, in io.Reader, out io.Writer, er io.Writer, dir
logger := log.FromContext(ctx).WithPrefix("rungit")
c := exec.CommandContext(ctx, "git", args...)
c.Dir = dir
c.Env = append(c.Env, envs...)
c.Env = append(os.Environ(), envs...)
c.Env = append(c.Env, "PATH="+os.Getenv("PATH"))
c.Env = append(c.Env, "SOFT_SERVE_DEBUG="+os.Getenv("SOFT_SERVE_DEBUG"))
if cfg != nil {
c.Env = append(c.Env, "SOFT_SERVE_LOG_FORMAT="+cfg.LogFormat)
c.Env = append(c.Env, "SOFT_SERVE_LOG_TIME_FORMAT="+cfg.LogTimeFormat)
c.Env = append(c.Env, "SOFT_SERVE_LOG_FORMAT="+cfg.Log.Format)
c.Env = append(c.Env, "SOFT_SERVE_LOG_TIME_FORMAT="+cfg.Log.TimeFormat)
}

stdin, err := c.StdinPipe()
Expand Down
43 changes: 27 additions & 16 deletions server/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package server
import (
"fmt"
"path/filepath"
"runtime"

"github.com/charmbracelet/soft-serve/git"
"github.com/charmbracelet/soft-serve/internal/sync"
)

var (
jobSpecs = map[string]string{
"mirror": "@every 10m",
}
)
var jobSpecs = map[string]string{
"mirror": "@every 10m",
}

// mirrorJob runs the (pull) mirror job task.
func (s *Server) mirrorJob() func() {
Expand All @@ -25,26 +25,37 @@ func (s *Server) mirrorJob() func() {
return
}

// Divide the work up among the number of CPUs.
wq := sync.NewWorkPool(s.ctx, runtime.GOMAXPROCS(0),
sync.WithWorkPoolLogger(logger.Errorf),
)

logger.Debug("updating mirror repos")
for _, repo := range repos {
if repo.IsMirror() {
logger.Info("updating mirror", "repo", repo.Name())
r, err := repo.Open()
if err != nil {
logger.Error("error opening repository", "repo", repo.Name(), "err", err)
continue
}

cmd := git.NewCommand("remote", "update", "--prune")
cmd.AddEnvs(
fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`,
filepath.Join(cfg.DataPath, "ssh", "known_hosts"),
cfg.SSH.ClientKeyPath,
),
)
if _, err := cmd.RunInDir(r.Path); err != nil {
logger.Error("error running git remote update", "repo", repo.Name(), "err", err)
}
name := repo.Name()
wq.Add(name, func() {
cmd := git.NewCommand("remote", "update", "--prune")
cmd.AddEnvs(
fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`,
filepath.Join(cfg.DataPath, "ssh", "known_hosts"),
cfg.SSH.ClientKeyPath,
),
)
if _, err := cmd.RunInDir(r.Path); err != nil {
logger.Error("error running git remote update", "repo", name, "err", err)
}

})
}
}

wq.Run()
}
}
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewServer(ctx context.Context) (*Server, error) {
}

// Add cron jobs.
srv.Cron.AddFunc(jobSpecs["mirror"], srv.mirrorJob())
_, _ = srv.Cron.AddFunc(jobSpecs["mirror"], srv.mirrorJob())

srv.SSHServer, err = sshsrv.NewSSHServer(ctx)
if err != nil {
Expand Down

0 comments on commit b07de74

Please sign in to comment.