Skip to content

Commit

Permalink
Merge pull request #4 from thesilentg/FixConcurrency
Browse files Browse the repository at this point in the history
First pass at concurrency changes
  • Loading branch information
gaffo authored Jul 23, 2024
2 parents 1e4f04a + 80e2f51 commit 8e5b993
Show file tree
Hide file tree
Showing 7 changed files with 615 additions and 567 deletions.
86 changes: 86 additions & 0 deletions internal/examples/simplejob/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"bytes"
"context"
"encoding/json"
"log"
"log/slog"
"math/rand"
"os"
"time"

"github.com/gaffo/jorb"
)

type oc struct{}
type ac struct{}
type jc struct{}

func main() {
o := oc{}
a := ac{}
r := jorb.NewRun[oc, jc]("example", o)

slog.SetLogLoggerLevel(slog.LevelWarn)
for i := 0; i < 100; i++ {
r.AddJobWithState(jc{}, "A")
}

states := []jorb.State[ac, oc, jc]{
{
TriggerState: "A",
Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
return jc, "B", nil, nil
},
Concurrency: 5,
},
{
TriggerState: "B",
Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
return jc, "C", nil, nil
},
Concurrency: 4,
},
{
TriggerState: "C",
Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
return jc, "D", nil, nil
},
Concurrency: 3,
},
{
TriggerState: "D",
Terminal: true,
},
}

serial := jorb.NewJsonSerializer[oc, jc]("example.state")
listener := &fileListener{fileName: "example.status"}
p, err := jorb.NewProcessor[ac, oc, jc](a, states, serial, listener)
if err != nil {
log.Fatal(err)
}

if err := p.Exec(context.Background(), r); err != nil {
log.Fatal(err)
}
}

// Serializes the status updates to a file
type fileListener struct {
fileName string
}

func (f *fileListener) StatusUpdate(status []jorb.StatusCount) {
buf := &bytes.Buffer{}

encoder := json.NewEncoder(buf)
encoder.SetIndent("", " ")
_ = encoder.Encode(status)

_ = os.WriteFile(f.fileName, buf.Bytes(), 0644)
}
6 changes: 5 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ type Job[JC any] struct {

// UpdateLastEvent updates the LastUpdate field of the Job struct to the current time.
func (j Job[JC]) UpdateLastEvent() Job[JC] {
t := time.Now()
// Removes the monotonic clock portion of the timestamp which is only useful for measuring time
// https://pkg.go.dev/time#hdr-Monotonic_Clocks
// The monotonic clock information will not be marshalled, and thus cause tests which Marshal / Unmarshal job state
// and expect the results to be the same to fail.
t := time.Now().Truncate(time.Millisecond)
// Set the LastUpdate field to the current time
j.LastUpdate = &t
return j
Expand Down
Loading

0 comments on commit 8e5b993

Please sign in to comment.