Skip to content

Commit

Permalink
Add cloudwatch and start sending cron times there
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 13, 2024
1 parent e7b04aa commit e61c18e
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 28 deletions.
11 changes: 9 additions & 2 deletions core/tasks/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"sync"
"time"

"github.com/nyaruka/gocommon/analytics"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/crons"
Expand Down Expand Up @@ -66,7 +67,13 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime)

elapsed := time.Since(started)
elapsedSeconds := elapsed.Seconds()
analytics.Gauge("mr.cron_"+name, elapsedSeconds)

rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("CronTime"),
Dimensions: []types.Dimension{{Name: aws.String("TaskName"), Value: aws.String(name)}},
Value: aws.Float64(elapsedSeconds),
Unit: types.StandardUnitSeconds,
})

rc := rt.RP.Get()
defer rc.Close()
Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ require (
github.com/Masterminds/semver v1.5.0
github.com/appleboy/go-fcm v1.2.1
github.com/aws/aws-sdk-go-v2 v1.32.6
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.20
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.21
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0
github.com/buger/jsonparser v1.1.1
Expand All @@ -22,25 +23,25 @@ require (
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/nyaruka/ezconf v0.3.0
github.com/nyaruka/gocommon v1.59.3
github.com/nyaruka/gocommon v1.60.3
github.com/nyaruka/goflow v0.225.8
github.com/nyaruka/null/v3 v3.0.0
github.com/nyaruka/redisx v0.8.1
github.com/nyaruka/rp-indexer/v9 v9.2.1
github.com/nyaruka/rp-indexer/v9 v9.3.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.61.0
github.com/samber/slog-multi v1.2.4
github.com/samber/slog-sentry v1.2.2
github.com/shopspring/decimal v1.4.0
github.com/stretchr/testify v1.10.0
google.golang.org/api v0.210.0
google.golang.org/api v0.211.0
)

require (
cel.dev/expr v0.19.1 // indirect
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.12.0 // indirect
cloud.google.com/go/auth v0.12.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
cloud.google.com/go/firestore v1.17.0 // indirect
Expand All @@ -62,7 +63,7 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.6 // indirect
Expand Down Expand Up @@ -116,7 +117,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
golang.org/x/crypto v0.30.0 // indirect
golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d // indirect
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/sync v0.10.0 // indirect
Expand Down
30 changes: 16 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ cel.dev/expr v0.19.1/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE=
cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U=
cloud.google.com/go/auth v0.12.0 h1:ARAD8r0lkiHw2go7kEnmviF6TOYhzLM+yDGcDt9mP68=
cloud.google.com/go/auth v0.12.0/go.mod h1:xxA5AqpDrvS+Gkmo9RqrGGRh6WSNKKOXhY3zNOr38tI=
cloud.google.com/go/auth v0.12.1 h1:n2Bj25BUMM0nvE9D2XLTiImanwZhO3DkfWSYS/SAJP4=
cloud.google.com/go/auth v0.12.1/go.mod h1:BFMu+TNpF3DmvfBO9ClqTR/SiqVIm7LukKF9mbendF4=
cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU=
cloud.google.com/go/auth/oauth2adapt v0.2.6/go.mod h1:AlmsELtlEBnaNTL7jCj8VQFLy6mbZv0s4Q7NGBeQ5E8=
cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo=
Expand Down Expand Up @@ -54,8 +54,8 @@ github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9
github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.20 h1:bwHhhCScKRAYJtaWVT+jDpt74GybN2nxI6+InkRjqGM=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.20/go.mod h1:/RfYH8CUMQuq/3CIEVGHLkqkA9KtbBF5omt2Ae8xc0s=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.21 h1:FdDxp4HNtJWPBAOdkJ+84Dfx2TOA7Dq+cH72GDHhjnA=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.21/go.mod h1:doHEXGiMWQBxcTJy3YN1Ao2HCgCuMWumuvTULGndCuQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU=
Expand All @@ -66,10 +66,12 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvK
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25 h1:r67ps7oHCYnflpgDy2LZU0MAQtQbYIOqNNnqGO6xQkE=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25/go.mod h1:GrGY+Q4fIokYLtjCVB/aFfCVL6hhGUFl8inD18fDalE=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 h1:nQLG9irjDGUFXVPDHzjCGEEwh0hZ6BcxTvHOod1YsP4=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3/go.mod h1:URs8sqsyaxiAZkKP6tOEmhcs9j2ynFIomqOKY/CAHJc=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 h1:isKhHsjpQR3CypQJ4G1g8QWx7zNpiC/xKw1zjgJYVno=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0/go.mod h1:xDvUyIkwBwNtVZJdHEwAuhFly3mezwdEWkbJ5oNYwIw=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8 h1:ntqHwZb+ZyVz0CFYUG0sQ02KMMJh+iXeV3bXoba+s4A=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.8/go.mod h1:Hcjb2SiUo9v1GhpXjRNW7hAwfzAPfrsgnlKpP5UYEPY=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.9 h1:yhB2XYpHeWeAv5u3w9PFiSVIariSyhK5jcyQUFJpnIQ=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.9/go.mod h1:Hcjb2SiUo9v1GhpXjRNW7hAwfzAPfrsgnlKpP5UYEPY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6 h1:HCpPsWqmYQieU7SS6E9HXfdAMSud0pteVXieJmcpIRI=
Expand Down Expand Up @@ -222,8 +224,8 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
github.com/nyaruka/ezconf v0.3.0 h1:kGvJqVN8AHowb4HdaHAviJ0Z3yI5Pyekp1WqibFEaGk=
github.com/nyaruka/ezconf v0.3.0/go.mod h1:89GUW6EPRNLIxT7lC4LWnjWTgZeQwRoX7lBmc8ralAU=
github.com/nyaruka/gocommon v1.59.3 h1:fdjs9Z7aH+zog7FXlEpvJ0GtI6XNdNdBtjFxw5kVB7s=
github.com/nyaruka/gocommon v1.59.3/go.mod h1:peOpluiVBMeQu81Ar+7EPQVT7vawN6ho9Kh1k/Gj8Vk=
github.com/nyaruka/gocommon v1.60.3 h1:fPQ9t6NX+mu7JQ7nXefgpBs8paqGvGXq3eA7VscsAVo=
github.com/nyaruka/gocommon v1.60.3/go.mod h1:kFJuOq8COneV7ssfK6xgCMJ8gP8fQifLQnNXBnE4YL0=
github.com/nyaruka/goflow v0.225.8 h1:rDc3P3KL8sNlXUFBi0UmBvMOd3eCUhUrkO3RbYYiW7o=
github.com/nyaruka/goflow v0.225.8/go.mod h1:spXtSWgS7dusHIfUFCvJGjSMc7d4FX9Abl6S7tg49ks=
github.com/nyaruka/librato v1.1.1 h1:0nTYtJLl3Sn7lX3CuHsLf+nXy1k/tGV0OjVxLy3Et4s=
Expand All @@ -236,8 +238,8 @@ github.com/nyaruka/phonenumbers v1.4.3 h1:tR71UJ+DZu7TSkxoG8JI8HzHJkPD/m4KNiUX34
github.com/nyaruka/phonenumbers v1.4.3/go.mod h1:gv+CtldaFz+G3vHHnasBSirAi3O2XLqZzVWz4V1pl2E=
github.com/nyaruka/redisx v0.8.1 h1:d9Hc8nfSKTSEU+bx+YrB13d6bzAgiiHygk4jg/Q4nb4=
github.com/nyaruka/redisx v0.8.1/go.mod h1:2TUmkDvprPInnmInR5AEbCm0zRRewkvSDVLsO+Do6iI=
github.com/nyaruka/rp-indexer/v9 v9.2.1 h1:gQa0QHiU+LjhmgpToHpoGRKRC8oI1EdW4dDaN9inhSk=
github.com/nyaruka/rp-indexer/v9 v9.2.1/go.mod h1:NzcuE4Zxrzde7gQinlWfwq2jeyEbamBj8hqVkm+eQLg=
github.com/nyaruka/rp-indexer/v9 v9.3.0 h1:8Thnt7k6/anEYcM3hIY+ObzF3JtO2/8EbDmb4JEAzsc=
github.com/nyaruka/rp-indexer/v9 v9.3.0/go.mod h1:YrHQx+ImBKRUQ4RWFJad1IlcMWlMyry/72SxAVCCgIU=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
Expand Down Expand Up @@ -301,8 +303,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d h1:0olWaB5pg3+oychR51GUVCEsGkeCU/2JxjBgIo4f3M0=
golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 h1:Y/Mj/94zIQQGHVSv1tTtQBDaQaJe62U9bkDZKKyhPCU=
golang.org/x/exp v0.0.0-20241210194714-1829a127f884/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -356,8 +358,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.210.0 h1:HMNffZ57OoZCRYSbdWVRoqOa8V8NIHLL0CzdBPLztWk=
google.golang.org/api v0.210.0/go.mod h1:B9XDZGnx2NtyjzVkOVTGrFSAVZgPcbedzKg/gTLwqBs=
google.golang.org/api v0.211.0 h1:IUpLjq09jxBSV1lACO33CGY3jsRcbctfGzhj+ZSE/Bg=
google.golang.org/api v0.211.0/go.mod h1:XOloB4MXFH4UTlQSGuNUxw0UT74qdENK8d6JNsXKLi0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine/v2 v2.0.6 h1:LvPZLGuchSBslPBp+LAhihBeGSiRh1myRoYK4NtuBIw=
Expand Down
13 changes: 13 additions & 0 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
"github.com/nyaruka/mailroom/core/tasks"
Expand Down Expand Up @@ -139,6 +140,16 @@ func (mr *Mailroom) Start() error {

analytics.Start()

// configure and start cloudwatch
mr.rt.CW, err = cwatch.NewService(c.AWSAccessKeyID, c.AWSSecretAccessKey, c.AWSRegion, c.CloudwatchNamespace, c.DeploymentID)
if err != nil {
log.Error("cloudwatch not available", "error", err)
} else {
log.Info("cloudwatch ok")
}

mr.rt.CW.StartQueue(mr.wg, time.Second*3)

// init our foremen and start it
mr.handlerForeman.Start()
mr.batchForeman.Start()
Expand All @@ -164,7 +175,9 @@ func (mr *Mailroom) Stop() error {
mr.batchForeman.Stop()
mr.throttledForeman.Stop()

mr.rt.CW.StopQueue()
analytics.Stop()

close(mr.quit)
mr.cancel()

Expand Down
2 changes: 2 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
)
Expand All @@ -21,6 +22,7 @@ type Runtime struct {
Dynamo *dynamo.Service
S3 *s3x.Service
ES *elasticsearch.TypedClient
CW *cwatch.Service
FCM FCMClient
Config *Config
}
Expand Down
9 changes: 8 additions & 1 deletion testsuite/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/redisx/assertredis"
"github.com/nyaruka/rp-indexer/v9/indexers"
ixruntime "github.com/nyaruka/rp-indexer/v9/runtime"
)

var _db *sqlx.DB
Expand Down Expand Up @@ -72,6 +74,7 @@ func Reset(what ResetFlag) {
// Runtime returns the various runtime things a test might need
func Runtime() (context.Context, *runtime.Runtime) {
cfg := runtime.NewDefaultConfig()
cfg.DeploymentID = "test"
cfg.Port = 8091
cfg.ElasticContactsIndex = elasticContactsIndex
cfg.AWSAccessKeyID = "root"
Expand All @@ -89,6 +92,9 @@ func Runtime() (context.Context, *runtime.Runtime) {
s3svc, err := s3x.NewService(cfg.AWSAccessKeyID, cfg.AWSSecretAccessKey, cfg.AWSRegion, cfg.S3Endpoint, cfg.S3Minio)
noError(err)

cwSvc, err := cwatch.NewService(cfg.AWSAccessKeyID, cfg.AWSSecretAccessKey, cfg.AWSRegion, cfg.CloudwatchNamespace, cfg.DeploymentID)
noError(err)

dbx := getDB()
rt := &runtime.Runtime{
DB: dbx,
Expand All @@ -97,6 +103,7 @@ func Runtime() (context.Context, *runtime.Runtime) {
Dynamo: dyna,
S3: s3svc,
ES: getES(),
CW: cwSvc,
FCM: &MockFCMClient{ValidTokens: []string{"FCMID3", "FCMID4", "FCMID5"}},
Config: cfg,
}
Expand All @@ -112,7 +119,7 @@ func ReindexElastic(ctx context.Context) {
es := getES()

contactsIndexer := indexers.NewContactIndexer(elasticURL, elasticContactsIndex, 1, 1, 100)
contactsIndexer.Index(db.DB, false, false)
contactsIndexer.Index(&ixruntime.Runtime{DB: db.DB}, false, false)

_, err := es.Indices.Refresh().Index(elasticContactsIndex).Do(ctx)
noError(err)
Expand Down
5 changes: 1 addition & 4 deletions utils/crons/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ func Start(rt *runtime.Runtime, wg *sync.WaitGroup, name string, allInstances bo
log := slog.With("cron", name)

go func() {
defer func() {
log.Info("cron exiting")
wg.Done()
}()
defer func() { wg.Done() }()

for {
select {
Expand Down

0 comments on commit e61c18e

Please sign in to comment.