Skip to content

Commit

Permalink
add locker job
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven committed Jul 31, 2022
1 parent 761574c commit 4e5dce8
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
23 changes: 23 additions & 0 deletions examples/config/advanced/locker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
jobs:
- type: lock
args:
key: test
job:
type: loop
args:
interval: 1s
job:
type: log
args:
text: test
- type: lock
args:
key: test
job:
type: loop
args:
interval: 1s
job:
type: log
args:
text: test2
2 changes: 2 additions & 0 deletions src/job/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func Get(t string) Job {
return timeoutJob
case "loop":
return loopJob
case "lock":
return lockJob
case "js":
return jsJob
case "encrypted":
Expand Down
28 changes: 28 additions & 0 deletions src/job/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/Arriven/db1000n/src/utils/templates"
)

var locker utils.Locker

// "log" in config
func logJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) (
data any, err error, //nolint:unparam // data is here to match Job
Expand Down Expand Up @@ -190,6 +192,32 @@ func loopJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig,
return nil, nil
}

func lockJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) (data any, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var jobConfig struct {
BasicJobConfig

Key string
Job config.Config
}

if err := mapstructure.Decode(templates.ParseAndExecuteMapStruct(logger, args, ctx), &jobConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

unlock := locker.Lock(jobConfig.Key)
defer unlock()

job := Get(jobConfig.Job.Type)
if job == nil {
return nil, fmt.Errorf("unknown job %q", jobConfig.Job.Type)
}

return job(ctx, jobConfig.Job.Args, globalConfig, a, logger)
}

// "js" in config
func jsJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) (
data any, err error,
Expand Down
15 changes: 15 additions & 0 deletions src/utils/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package utils

import "sync"

type Locker struct {
mutexes sync.Map // Zero value is empty and ready for use
}

func (m *Locker) Lock(key string) func() {
value, _ := m.mutexes.LoadOrStore(key, &sync.Mutex{})
mtx := value.(*sync.Mutex)
mtx.Lock()

return func() { mtx.Unlock() }
}

0 comments on commit 4e5dce8

Please sign in to comment.