Skip to content

Commit

Permalink
Merge pull request #23 from adhocore/4-mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
adhocore authored Apr 23, 2023
2 parents 5fc5949 + 77cc26c commit fe221c0
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,12 @@ func main() {
return 0, nil
})

// run task without overlap, set concurrent flag to false:
concurrent := false
taskr.Task("* * * * * *", , tasker.Taskify("sleep 2", tasker.Option{}), concurrent)

// every 10 minute with arbitrary command
taskr.Task("@10minutes", taskr.Taskify("command --option val -- args"))
taskr.Task("@10minutes", taskr.Taskify("command --option val -- args", tasker.Option{Shell: "/bin/sh -c"}))

// ... add more tasks

Expand All @@ -160,6 +164,20 @@ func main() {
}
```

#### Concurrency

By default the tasks can run concurrently i.e if previous run is still not finished
but it is now due again, it will run again.
If you want to run only one instance of a task at a time, set concurrent flag to false:

```go
taskr := tasker.New(tasker.Option{})

concurrent := false
expr, task := "* * * * * *", tasker.Taskify("php -r 'sleep(2);'")
taskr.Task(expr, task, false)
```

### Task Daemon

It can also be used as standalone task daemon instead of programmatic usage for Golang application.
Expand Down
24 changes: 22 additions & 2 deletions pkg/tasker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ go get -u github.com/adhocore/gronx/cmd/tasker
## Usage
### Go Tasker

Tasker is a task manager that can be programatically used in Golang applications. It runs as a daemon and and invokes tasks scheduled with cron expression:
Tasker is a task manager that can be programatically used in Golang applications.
It runs as a daemon and and invokes tasks scheduled with cron expression:

```go
package main

Expand Down Expand Up @@ -52,8 +54,12 @@ func main() {
return 0, nil
})

// run task without overlap, set concurrent flag to false:
concurrent := false
taskr.Task("* * * * * *", , tasker.Taskify("sleep 2", tasker.Option{}), concurrent)

// every 10 minute with arbitrary command
taskr.Task("@10minutes", taskr.Taskify("command --option val -- args"))
taskr.Task("@10minutes", taskr.Taskify("command --option val -- args", tasker.Option{Shell: "/bin/sh -c"}))

// ... add more tasks

Expand All @@ -66,6 +72,20 @@ func main() {
}
```

#### Concurrency

By default the tasks can run concurrently i.e if previous run is still not finished
but it is now due again, it will run again.
If you want to run only one instance of a task at a time, set concurrent flag to false:

```go
taskr := tasker.New(tasker.Option{})

concurrent := false
expr, task := "* * * * * *", tasker.Taskify("php -r 'sleep(2);'")
taskr.Task(expr, task, false)
```

### Task Daemon
It can also be used as standalone task daemon instead of programmatic usage for Golang application.

Expand Down
36 changes: 33 additions & 3 deletions pkg/tasker/tasker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -45,6 +46,7 @@ type Tasker struct {
until time.Time
exprs map[string][]string
tasks map[string]TaskFunc
mutex map[string]uint32
abort bool
timeout bool
verbose bool
Expand Down Expand Up @@ -169,16 +171,16 @@ const taskIDFormat = "[%s][#%d]"

// Task appends new task handler for given cron expr.
// It returns Tasker (itself) for fluency and bails if expr is invalid.
func (t *Tasker) Task(expr string, task TaskFunc) *Tasker {
func (t *Tasker) Task(expr string, task TaskFunc, concurrent ...bool) *Tasker {
segs, err := gronx.Segments(expr)
if err != nil {
log.Fatalf("invalid cron expr: %+v", err)
}

concurrent = append(concurrent, true)
old, expr := gronx.SpaceRe.ReplaceAllString(expr, " "), strings.Join(segs, " ")
if _, ok := t.exprs[expr]; !ok {
// Validate expr.
if _, err := t.gron.SegmentsDue(segs); err != nil {
if !t.gron.IsValid(expr) {
log.Fatalf("invalid cron expr: %+v", err)
}

Expand All @@ -190,6 +192,13 @@ func (t *Tasker) Task(expr string, task TaskFunc) *Tasker {
t.exprs[expr] = append(t.exprs[expr], ref)
t.tasks[ref] = task

if !concurrent[0] {
if len(t.mutex) == 0 {
t.mutex = map[string]uint32{}
}
t.mutex[ref] = 0
}

return t
}

Expand Down Expand Up @@ -330,6 +339,10 @@ func (t *Tasker) runTasks(tasks map[string]TaskFunc) {
}

for ref, task := range tasks {
if !t.canRun(ref) {
continue
}

t.wg.Add(1)
rc := make(chan result)

Expand All @@ -338,6 +351,18 @@ func (t *Tasker) runTasks(tasks map[string]TaskFunc) {
}
}

func (t *Tasker) canRun(ref string) bool {
lock, ok := t.mutex[ref]
if !ok {
return true
}
if atomic.CompareAndSwapUint32(&lock, 0, 1) {
t.mutex[ref] = 1
return true
}
return false
}

func (t *Tasker) doRun(ctx context.Context, ref string, task TaskFunc, rc chan result) {
defer t.wg.Done()
if t.abort || t.timeout {
Expand All @@ -347,7 +372,12 @@ func (t *Tasker) doRun(ctx context.Context, ref string, task TaskFunc, rc chan r
if t.verbose {
t.Log.Printf("[tasker] task %s running\n", ref)
}

code, err := task(ctx)
if lock, ok := t.mutex[ref]; ok {
atomic.StoreUint32(&lock, 0)
t.mutex[ref] = 0
}

rc <- result{ref, code, err}
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/tasker/tasker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,32 @@ func TestWithContext(t *testing.T) {
fmt.Println(string(buf))
})
}

func TestConcurrency(t *testing.T) {
t.Run("Run", func(t *testing.T) {
taskr := New(Option{Verbose: true, Out: "../../test/tasker.out"})

single := 0
taskr.Task("* * * * * *", func(ctx context.Context) (int, error) {
time.Sleep(2500 * time.Millisecond)
single++
return 0, nil
}, false)

concurrent := 0
taskr.Task("* * * * * *", func(ctx context.Context) (int, error) {
time.Sleep(1 * time.Second)
concurrent++
return 0, nil
}, true)

taskr.Until(3 * time.Second).Run()

if single != 1 {
t.Errorf("single task should run 1x, not %dx", single)
}
if concurrent != 2 {
t.Errorf("concurrent task should run 2x, not %dx", concurrent)
}
})
}

0 comments on commit fe221c0

Please sign in to comment.