diff --git a/examples/config/advanced/locker.yaml b/examples/config/advanced/locker.yaml new file mode 100644 index 00000000..bb57740b --- /dev/null +++ b/examples/config/advanced/locker.yaml @@ -0,0 +1,23 @@ +jobs: + - type: lock + args: + key: test + job: + type: loop + args: + interval: 1s + job: + type: log + args: + text: test + - type: lock + args: + key: test + job: + type: loop + args: + interval: 1s + job: + type: log + args: + text: test2 diff --git a/src/job/base.go b/src/job/base.go index 6439a150..7e75515d 100644 --- a/src/job/base.go +++ b/src/job/base.go @@ -129,6 +129,8 @@ func Get(t string) Job { return timeoutJob case "loop": return loopJob + case "lock": + return lockJob case "js": return jsJob case "encrypted": diff --git a/src/job/utils.go b/src/job/utils.go index c155a5c4..3e548314 100644 --- a/src/job/utils.go +++ b/src/job/utils.go @@ -38,6 +38,8 @@ import ( "github.com/Arriven/db1000n/src/utils/templates" ) +var locker utils.Locker + // "log" in config func logJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) ( data any, err error, //nolint:unparam // data is here to match Job @@ -190,6 +192,32 @@ func loopJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, return nil, nil } +func lockJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) (data any, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var jobConfig struct { + BasicJobConfig + + Key string + Job config.Config + } + + if err := mapstructure.Decode(templates.ParseAndExecuteMapStruct(logger, args, ctx), &jobConfig); err != nil { + return nil, fmt.Errorf("error parsing job config: %w", err) + } + + unlock := locker.Lock(jobConfig.Key) + defer unlock() + + job := Get(jobConfig.Job.Type) + if job == nil { + return nil, fmt.Errorf("unknown job %q", jobConfig.Job.Type) + } + + return job(ctx, jobConfig.Job.Args, globalConfig, a, logger) +} + // "js" in config func jsJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) ( data any, err error, diff --git a/src/utils/locker.go b/src/utils/locker.go new file mode 100644 index 00000000..8bbb8fb8 --- /dev/null +++ b/src/utils/locker.go @@ -0,0 +1,15 @@ +package utils + +import "sync" + +type Locker struct { + mutexes sync.Map // Zero value is empty and ready for use +} + +func (m *Locker) Lock(key string) func() { + value, _ := m.mutexes.LoadOrStore(key, &sync.Mutex{}) + mtx := value.(*sync.Mutex) + mtx.Lock() + + return func() { mtx.Unlock() } +}