Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding basic gocron-gorm-lock functionality. #1

Merged
merged 8 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,58 @@
# gocron-gorm-lock
A gocron locker implementation using gorm

## install

```
go get github.com/go-co-op/gocron-gorm-lock
```

## usage

Here is an example usage that would be deployed in multiple instances

```go
package main

import (
"fmt"

"github.com/go-co-op/gocron"
gormlock "github.com/go-co-op/gocron-gorm-lock"
"gorm.io/gorm"
"time"
)

func main() {
var db * gorm.DB // gorm db connection
var worker string // name of this instance to be used to know which instance run the job
db.AutoMigrate(&CronJobLock{}) // We need the table to store the job execution
locker, err := gormlock.NewGormLocker(db, worker)
if err != nil {
// handle the error
}

s := gocron.NewScheduler(time.UTC)
s.WithDistributedLocker(locker)

_, err = s.Every("1s").Name("unique_name").Do(func() {
// task to do
fmt.Println("call 1s")
})
if err != nil {
// handle the error
}

s.StartBlocking()
}
```

## Prerequisites

- The table cron_job_locks needs to exists in the database. This can be achieved, as an example, using gorm automigrate functionality `db.Automigrate(&CronJobLock{})`
- In order to uniquely identify the job, the locker uses the unique combination of the job name + timestamp (by default with precision to miliseconds).

## FAQ

- The locker uses the unique combination of the job name + timestamp with miliseconds precision, how can I change that:
- It's possible to set how to create the job identifier, here is an example to set an hour precision: `locker, err := gormlock.NewGormLocker(db, "local", WithJobIdentifier(func(ctx context.Context, key string) string { return time.Now().Truncate(60 * time.Minute).Format("2006-01-02 15:04:05.000")}))`
manuelarte marked this conversation as resolved.
Show resolved Hide resolved
56 changes: 56 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,59 @@
module github.com/go-co-op/gocron-gorm-lock

go 1.20

require (
github.com/go-co-op/gocron v1.31.0
github.com/stretchr/testify v1.8.4
github.com/testcontainers/testcontainers-go v0.22.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.22.0
gorm.io/driver/postgres v1.5.2
gorm.io/gorm v1.25.2
)

require (
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/containerd/containerd v1.7.3 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.5+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.3.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
62 changes: 62 additions & 0 deletions gormlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package gormlock

import (
"context"
"time"

"github.com/go-co-op/gocron"
"gorm.io/gorm"
)

var defaultPrecision = time.Millisecond

func NewGormLocker(db *gorm.DB, worker string, options ...LockOption) (gocron.Locker, error) {
gl := &gormLocker{db: db, worker: worker}
for _, option := range options {
option(gl)
}
return gl, nil
}

var _ gocron.Locker = (*gormLocker)(nil)

type gormLocker struct {
db *gorm.DB
worker string
jobIdentifier func(ctx context.Context, key string) string
}

func (g *gormLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
ji := g.getJobIdentifier(ctx, key)

// I would like that people can "pass" their own implementation,
cjb := &CronJobLock{
JobName: key,
JobIdentifier: ji,
Worker: g.worker,
Status: "RUNNING",
}
tx := g.db.Create(cjb)
if tx.Error != nil {
JohnRoesler marked this conversation as resolved.
Show resolved Hide resolved
return nil, tx.Error
}
return &gormLock{db: g.db, id: cjb.GetID()}, nil
}

func (g *gormLocker) getJobIdentifier(ctx context.Context, key string) string {
if g.jobIdentifier == nil {
return time.Now().Truncate(defaultPrecision).Format("2006-01-02 15:04:05.000")
}
return g.jobIdentifier(ctx, key)
}

var _ gocron.Lock = (*gormLock)(nil)

type gormLock struct {
db *gorm.DB
id int
}

func (g *gormLock) Unlock(_ context.Context) error {
return g.db.Model(&CronJobLock{ID: g.id}).Updates(&CronJobLock{Status: "FINISHED"}).Error
}
208 changes: 208 additions & 0 deletions gormlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package gormlock

import (
"context"
"testing"
"time"

"github.com/go-co-op/gocron"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
testcontainerspostgres "github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)

func TestEnableDistributedLocking(t *testing.T) {
ctx := context.Background()
postgresContainer, err := testcontainerspostgres.RunContainer(ctx,
testcontainers.WithWaitStrategy(wait.ForLog("database system is ready to accept connections").

Check failure on line 21 in gormlock_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.20)

undefined: testcontainers (typecheck)
WithOccurrence(2).WithStartupTimeout(5*time.Second)))
require.NoError(t, err)
t.Cleanup(func() {
if err := postgresContainer.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

connStr, err := postgresContainer.ConnectionString(ctx, "sslmode=disable", "application_name=test")
assert.NoError(t, err)

db, err := gorm.Open(postgres.Open(connStr), &gorm.Config{})
require.NoError(t, err)

err = db.AutoMigrate(&CronJobLock{})
require.NoError(t, err)

resultChan := make(chan int, 10)
f := func(schedulerInstance int) {
resultChan <- schedulerInstance
}

s1 := gocron.NewScheduler(time.UTC)
l1, err := NewGormLocker(db, "s1")
require.NoError(t, err)
s1.WithDistributedLocker(l1)
_, err = s1.Every("500ms").Do(f, 1)
require.NoError(t, err)

s2 := gocron.NewScheduler(time.UTC)
l2, err := NewGormLocker(db, "s2")
require.NoError(t, err)
s2.WithDistributedLocker(l2)
_, err = s2.Every("500ms").Do(f, 2)
require.NoError(t, err)

s3 := gocron.NewScheduler(time.UTC)
l3, err := NewGormLocker(db, "s3")
require.NoError(t, err)
s3.WithDistributedLocker(l3)
_, err = s3.Every("500ms").Do(f, 3)
require.NoError(t, err)

s1.StartAsync()
s2.StartAsync()
s3.StartAsync()

time.Sleep(1700 * time.Millisecond)

s1.Stop()
s2.Stop()
s3.Stop()
close(resultChan)

var results []int
for r := range resultChan {
results = append(results, r)
}
assert.Len(t, results, 4)
var allCronJobs []*CronJobLock
db.Find(&allCronJobs)
assert.Equal(t, len(results), len(allCronJobs))
}

func TestEnableDistributedLocking_DifferentJob(t *testing.T) {
ctx := context.Background()
postgresContainer, err := testcontainerspostgres.RunContainer(ctx,
testcontainers.WithWaitStrategy(wait.ForLog("database system is ready to accept connections").

Check failure on line 89 in gormlock_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.20)

undefined: testcontainers (typecheck)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an alias issue here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't reproduce this alias issue, @JohnRoesler is it possible that you tell me how you get it? do you run a command? or you get it in your IDE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The golangci lint step reports the error https://github.com/go-co-op/gocron-gorm-lock/actions/runs/6825095998/job/18568477606

Let me see if I can see what the issue is locally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's because you dont' have a go.sum file. Run go mod tidy and it should solve the issue

Copy link
Collaborator Author

@manuelarte manuelarte Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran go mod tidy, and I also ran the command that is failing in the pipeline (golangci-lint run --out-format=github-actions), and I still don't get the error. I am sure I am missing something...

Copy link
Collaborator Author

@manuelarte manuelarte Nov 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I commit the go.sum file that I get after running go mod tidy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Go expects a mod and sum file to be present

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go.sum pushed

WithOccurrence(2).WithStartupTimeout(5*time.Second)))
require.NoError(t, err)
t.Cleanup(func() {
if err := postgresContainer.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

connStr, err := postgresContainer.ConnectionString(ctx, "sslmode=disable", "application_name=test")
assert.NoError(t, err)

db, err := gorm.Open(postgres.Open(connStr), &gorm.Config{})
require.NoError(t, err)

err = db.AutoMigrate(&CronJobLock{})
require.NoError(t, err)

resultChan := make(chan int, 10)
f := func(schedulerInstance int) {
resultChan <- schedulerInstance
}

result2Chan := make(chan int, 10)
f2 := func(schedulerInstance int) {
result2Chan <- schedulerInstance
}

s1 := gocron.NewScheduler(time.UTC)
l1, err := NewGormLocker(db, "s1")
require.NoError(t, err)
s1.WithDistributedLocker(l1)
_, err = s1.Every("500ms").Name("f").Do(f, 1)
require.NoError(t, err)
_, err = s1.Every("500ms").Name("f2").Do(f2, 1)
require.NoError(t, err)

s2 := gocron.NewScheduler(time.UTC)
l2, err := NewGormLocker(db, "s2")
require.NoError(t, err)
s2.WithDistributedLocker(l2)
_, err = s2.Every("500ms").Name("f").Do(f, 2)
require.NoError(t, err)
_, err = s2.Every("500ms").Name("f2").Do(f2, 2)
require.NoError(t, err)

s3 := gocron.NewScheduler(time.UTC)
l3, err := NewGormLocker(db, "s3")
require.NoError(t, err)
s3.WithDistributedLocker(l3)
_, err = s3.Every("500ms").Name("f").Do(f, 3)
require.NoError(t, err)
_, err = s3.Every("500ms").Name("f2").Do(f2, 3)
require.NoError(t, err)

s1.StartAsync()
s2.StartAsync()
s3.StartAsync()

time.Sleep(1700 * time.Millisecond)

s1.Stop()
s2.Stop()
s3.Stop()
close(resultChan)
close(result2Chan)

var results []int
for r := range resultChan {
results = append(results, r)
}
assert.Len(t, results, 4, "f is expected 4 times")
var results2 []int
for r := range result2Chan {
results2 = append(results2, r)
}
assert.Len(t, results2, 4, "f2 is expected 4 times")
var allCronJobs []*CronJobLock
db.Find(&allCronJobs)
assert.Equal(t, len(results)+len(results2), len(allCronJobs))
}

func TestJobReturningExceptionWhenUnique(t *testing.T) {
ctx := context.Background()
postgresContainer, err := testcontainerspostgres.RunContainer(ctx,
testcontainers.WithWaitStrategy(wait.ForLog("database system is ready to accept connections").

Check failure on line 174 in gormlock_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.20)

undefined: testcontainers (typecheck)
WithOccurrence(2).WithStartupTimeout(5*time.Second)))
require.NoError(t, err)
t.Cleanup(func() {
if err := postgresContainer.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

connStr, err := postgresContainer.ConnectionString(ctx, "sslmode=disable", "application_name=test")
assert.NoError(t, err)

db, err := gorm.Open(postgres.Open(connStr), &gorm.Config{})
require.NoError(t, err)

err = db.AutoMigrate(&CronJobLock{})
require.NoError(t, err)

// creating a entry to force the unique identifier error
cjb := &CronJobLock{
JobName: "job",
JobIdentifier: time.Now().Truncate(60 * time.Minute).Format("2006-01-02 15:04:05.000"),
Worker: "local",
Status: "RUNNING",
}
require.NoError(t, db.Create(cjb).Error)

l, _ := NewGormLocker(db, "local", WithJobIdentifier(func(ctx context.Context, key string) string {
return time.Now().Truncate(60 * time.Minute).Format("2006-01-02 15:04:05.000")
}))
_, lerr := l.Lock(ctx, "job")
if assert.Error(t, lerr) {
assert.ErrorContains(t, lerr, "violates unique constraint")
}
}
Loading
Loading