Skip to content

Commit

Permalink
bulker: don't stack up batch cron tasks when previous batch takes too…
Browse files Browse the repository at this point in the history
… long

syncctl: memory leak fix
  • Loading branch information
absorbb committed Oct 30, 2023
1 parent d9d4a4d commit 808afa0
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 3 deletions.
11 changes: 10 additions & 1 deletion bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type AbstractBatchConsumer struct {

closed chan struct{}

running atomic.Bool

//AbstractBatchConsumer marked as no longer needed. We cannot close it immediately because it can be in the middle of processing batch
retired atomic.Bool
//idle AbstractBatchConsumer that is not running any batch jobs. retired idle consumer automatically closes itself
Expand Down Expand Up @@ -187,7 +189,14 @@ func (bc *AbstractBatchConsumer) TopicId() string {
}

func (bc *AbstractBatchConsumer) RunJob() {
_, _ = bc.ConsumeAll()
if bc.running.CompareAndSwap(false, true) {
defer func() {
bc.running.Store(false)
}()
_, _ = bc.ConsumeAll()
} else {
bc.Warnf("Previous job is still running")
}
}

func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error) {
Expand Down
1 change: 0 additions & 1 deletion bulkerapp/app/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ func NewCron(config *Config) *Cron {
s := gocron.NewScheduler(time.UTC)
s.TagsUnique()
s.StartAsync()
s.SingletonModeAll()
return &Cron{Service: base, scheduler: s, config: config}
}

Expand Down
2 changes: 2 additions & 0 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (r *Router) EventsHandler(c *gin.Context) {
}

func (r *Router) BulkHandler(c *gin.Context) {
start := time.Now()
destinationId := c.Param("destinationId")
tableName := c.Query("tableName")
taskId := c.DefaultQuery("taskId", uuid.New())
Expand Down Expand Up @@ -226,6 +227,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
rError = r.ResponseError(c, http.StatusBadRequest, "stream complete error", false, err, "")
return
}
r.Infof("Bulk stream for %s mode: %s Completed. Processed: %d in %dms.", jobId, mode, state.SuccessfulRows, time.Since(start).Milliseconds())
c.JSON(http.StatusOK, gin.H{"message": "ok", "state": state})
} else {
_, _ = bulkerStream.Abort(c)
Expand Down
17 changes: 17 additions & 0 deletions bulkerlib/bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/jitsucom/bulker/bulkerlib/types"
"io"
"reflect"
"strings"
)

type InitFunction func(Config) (Bulker, error)
Expand Down Expand Up @@ -146,6 +148,21 @@ func (s *State) SetError(err error) {
s.LastErrorText = err.Error()
}

// to string
func (s *State) String() string {
// print non-zero values
var sb strings.Builder
countersValue := reflect.ValueOf(*s)
countersType := countersValue.Type()
for i := 0; i < countersValue.NumField(); i++ {
value := countersValue.Field(i).String()
if value != "" && value != "0" {
sb.WriteString(fmt.Sprintf("%s: %s ", countersType.Field(i).Name, value))
}
}
return sb.String()
}

type LogLevel int

const (
Expand Down
2 changes: 1 addition & 1 deletion sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewJobRunner(appContext *Context) (*JobRunner, error) {
}

func (j *JobRunner) watchPodStatuses() {
ticker := utils.NewTicker(time.Second*time.Duration(j.config.ContainerStatusCheckSeconds), time.Second*time.Duration(j.config.ContainerStatusCheckSeconds))
for {
//recover from panic
defer func() {
Expand All @@ -64,7 +65,6 @@ func (j *JobRunner) watchPodStatuses() {
j.Errorf("watchPodStatuses Recovered from panic: %+v", r)
}
}()
ticker := utils.NewTicker(time.Second*time.Duration(j.config.ContainerStatusCheckSeconds), time.Second*time.Duration(j.config.ContainerStatusCheckSeconds))
select {
case <-j.closeCh:
return
Expand Down
13 changes: 13 additions & 0 deletions sync-controller/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/jitsucom/bulker/jitsubase/appbase"
"net/http"
"net/http/pprof"
"strings"
)

Expand All @@ -28,5 +29,17 @@ func NewRouter(appContext *Context) *Router {
engine.GET("/health", func(c *gin.Context) {
c.Status(http.StatusOK)
})

engine.GET("/debug/pprof/profile", gin.WrapF(pprof.Profile))
engine.GET("/debug/pprof/heap", gin.WrapF(pprof.Handler("heap").ServeHTTP))
engine.GET("/debug/pprof/goroutine", gin.WrapF(pprof.Handler("goroutine").ServeHTTP))
engine.GET("/debug/pprof/block", gin.WrapF(pprof.Handler("block").ServeHTTP))
engine.GET("/debug/pprof/threadcreate", gin.WrapF(pprof.Handler("threadcreate").ServeHTTP))
engine.GET("/debug/pprof/cmdline", gin.WrapF(pprof.Handler("cmdline").ServeHTTP))
engine.GET("/debug/pprof/symbol", gin.WrapF(pprof.Handler("symbol").ServeHTTP))
engine.GET("/debug/pprof/trace", gin.WrapF(pprof.Handler("trace").ServeHTTP))
engine.GET("/debug/pprof/mutex", gin.WrapF(pprof.Handler("mutex").ServeHTTP))
engine.GET("/debug/pprof", gin.WrapF(pprof.Index))

return router
}

0 comments on commit 808afa0

Please sign in to comment.