Skip to content

Commit

Permalink
sync: support for cancelling of running tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jul 18, 2024
1 parent 3746c69 commit dbb285a
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 36 deletions.
18 changes: 15 additions & 3 deletions sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,17 +260,22 @@ func (j *JobRunner) CreateCronJob(taskDescriptor TaskDescriptor, configuration *
taskStatus.Description = err.Error()
} else {
taskStatus.Status = StatusCreated
taskStatus.Description = "Starting sync job..."
taskStatus.PodName = cronJob.Name
}
j.sendStatus(&taskStatus)
return taskStatus
}

func PodName(syncId, taskId, pkg string) string {
taskId = utils.NvlString(taskId, uuid.NewLettersNumbers())
podId := utils.JoinNonEmptyStrings(".", syncId, taskId)
return strings.ToLower(nonAlphaNum.ReplaceAllLiteralString(pkg, "-") + "-" + podId)
}

func (j *JobRunner) CreatePod(taskDescriptor TaskDescriptor, configuration *TaskConfiguration) TaskStatus {
taskStatus := TaskStatus{TaskDescriptor: taskDescriptor}
taskId := utils.NvlString(taskDescriptor.TaskID, uuid.NewLettersNumbers())
podId := utils.JoinNonEmptyStrings(".", taskStatus.SyncID, taskId)
podName := strings.ToLower(nonAlphaNum.ReplaceAllLiteralString(taskDescriptor.Package, "-") + "-" + podId)
podName := taskDescriptor.PodName()
if !configuration.IsEmpty() {
secret := j.createSecret(podName, taskDescriptor, configuration)
_, err := j.clientset.CoreV1().Secrets(j.namespace).Create(context.Background(), secret, metav1.CreateOptions{})
Expand All @@ -288,6 +293,7 @@ func (j *JobRunner) CreatePod(taskDescriptor TaskDescriptor, configuration *Task
taskStatus.Description = err.Error()
} else {
taskStatus.Status = StatusCreated
taskStatus.Description = "Starting sync job..."
taskStatus.PodName = pod.Name
}
j.sendStatus(&taskStatus)
Expand Down Expand Up @@ -496,6 +502,12 @@ func (j *JobRunner) createCronJob(jobId string, task TaskDescriptor, configurati
return cronJob
}

func (j *JobRunner) TerminatePod(podName string) {
_ = j.clientset.CoreV1().Pods(j.namespace).Delete(context.Background(), podName, metav1.DeleteOptions{})
_ = j.clientset.CoreV1().Secrets(j.namespace).Delete(context.Background(), podName+"-config", metav1.DeleteOptions{})
_ = j.clientset.CoreV1().ConfigMaps(j.namespace).Delete(context.Background(), podName+"-config", metav1.DeleteOptions{})
}

func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration *TaskConfiguration) *v1.Pod {
var command string
switch task.TaskType {
Expand Down
1 change: 1 addition & 0 deletions sync-controller/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func NewRouter(appContext *Context) *Router {
engine.POST("/check", appContext.taskManager.CheckHandler)
engine.POST("/discover", appContext.taskManager.DiscoverHandler)
engine.POST("/read", appContext.taskManager.ReadHandler)
engine.GET("/cancel", appContext.taskManager.CancelHandler)

engine.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "pass"})
Expand Down
3 changes: 3 additions & 0 deletions sync-controller/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type TaskDescriptor struct {
StartedAt string `json:"startedAt"`
}

func (t *TaskDescriptor) PodName() string {
return PodName(t.SyncID, t.TaskID, t.Package)
}
func (t *TaskDescriptor) StartedAtTime() time.Time {
tm, err := time.Parse(time.RFC3339, t.StartedAt)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions sync-controller/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ func (t *TaskManager) DiscoverHandler(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
}

func (t *TaskManager) CancelHandler(c *gin.Context) {
pkg := c.Query("package")
syncId := c.Query("syncId")
taskId := c.Query("taskId")
_ = db.UpdateRunningTaskStatus(t.dbpool, taskId, "CANCELLED")
t.jobRunner.TerminatePod(PodName(syncId, taskId, pkg))
c.JSON(http.StatusOK, gin.H{"ok": true})
}

func (t *TaskManager) ReadHandler(c *gin.Context) {
taskConfig := TaskConfiguration{}
err := jsonorder.NewDecoder(c.Request.Body).Decode(&taskConfig)
Expand Down Expand Up @@ -188,9 +197,9 @@ func (t *TaskManager) listenTaskStatus() {
case "read":
switch st.Status {
case StatusCreateFailed, StatusFailed, StatusInitTimeout:
err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "FAILED", strings.Join([]string{string(st.Status), st.Description}, ": "))
err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "FAILED", strings.Join([]string{string(st.Status), st.Description}, ": "), st.StartedBy)
case StatusCreated:
err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "RUNNING", strings.Join([]string{string(st.Status), st.Description}, ": "))
err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "RUNNING", strings.Join([]string{string(st.Status), st.Description}, ": "), st.StartedBy)
case StatusRunning:
err = db.UpdateRunningTaskDate(t.dbpool, st.TaskID)
default:
Expand Down
15 changes: 11 additions & 4 deletions sync-sidecar/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ ON CONFLICT ON CONSTRAINT source_state_pkey DO UPDATE SET state=$3, timestamp =
upsertTaskSQL = `INSERT INTO source_task (sync_id, task_id, package, version, started_at, updated_at, status, description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8 )
ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8`

upsertRunningTaskSQL = `INSERT INTO source_task as st (sync_id, task_id, package, version, started_at, updated_at, status, description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8 )
ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8 where st.status = 'RUNNING'`
upsertRunningTaskSQL = `INSERT INTO source_task as st (sync_id, task_id, package, version, started_at, updated_at, status, description, started_by) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8, started_by=$9 where st.status = 'RUNNING'`

updateRunningTaskDateSQL = `UPDATE source_task SET updated_at=$2 where task_id=$1 and status = 'RUNNING'`

updateRunningTaskStatusSQL = `UPDATE source_task SET status=$2 where task_id=$1 and status = 'RUNNING'`

upsertCheckSQL = `INSERT INTO source_check (package, version, key, status, description, timestamp) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT ON CONSTRAINT source_check_pkey DO UPDATE SET status = $4, description=$5, timestamp=$6`

Expand Down Expand Up @@ -79,8 +81,8 @@ func UpsertTask(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersio
return err
}

func UpsertRunningTask(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersion string, startedAt time.Time, status, description string) error {
_, err := dbpool.Exec(context.Background(), upsertRunningTaskSQL, syncId, taskId, packageName, packageVersion, startedAt, time.Now(), status, description)
func UpsertRunningTask(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersion string, startedAt time.Time, status, description, startedBy string) error {
_, err := dbpool.Exec(context.Background(), upsertRunningTaskSQL, syncId, taskId, packageName, packageVersion, startedAt, time.Now(), status, description, startedBy)
return err
}

Expand All @@ -89,6 +91,11 @@ func UpdateRunningTaskDate(dbpool *pgxpool.Pool, taskId string) error {
return err
}

func UpdateRunningTaskStatus(dbpool *pgxpool.Pool, taskId, status string) error {
_, err := dbpool.Exec(context.Background(), updateRunningTaskStatusSQL, taskId, status)
return err
}

func UpsertCheck(dbpool *pgxpool.Pool, packageName, packageVersion, storageKey, status, description string, timestamp time.Time) error {
_, err := dbpool.Exec(context.Background(), upsertCheckSQL, packageName, packageVersion, storageKey, status, description, timestamp)
return err
Expand Down
42 changes: 38 additions & 4 deletions sync-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@ import (
_ "github.com/jitsucom/bulker/bulkerlib/implementations/api_based"
_ "github.com/jitsucom/bulker/bulkerlib/implementations/file_storage"
_ "github.com/jitsucom/bulker/bulkerlib/implementations/sql"
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/sync-sidecar/db"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
)

type SideCar interface {
Run()
Close()
}

type AbstractSideCar struct {
syncId string
taskId string
Expand All @@ -34,6 +43,21 @@ type AbstractSideCar struct {

//first error occurred during command
firstErr error

errPipe *os.File
outPipe *os.File
cancelled atomic.Bool
}

func (s *AbstractSideCar) Close() {
s._log("jitsu", "WARN", "Cancelling...")
s.cancelled.Store(true)
if s.outPipe != nil {
_ = s.outPipe.Close()
}
if s.errPipe != nil {
_ = s.errPipe.Close()
}
}

func main() {
Expand All @@ -43,6 +67,7 @@ func main() {
}

command := os.Getenv("COMMAND")
var sidecar SideCar
abstract := &AbstractSideCar{
syncId: os.Getenv("SYNC_ID"),
taskId: os.Getenv("TASK_ID"),
Expand All @@ -56,13 +81,22 @@ func main() {
startedAt: startedAt,
}
if command == "read" {
sidecar := &ReadSideCar{AbstractSideCar: abstract, tableNamePrefix: os.Getenv("TABLE_NAME_PREFIX")}
sidecar.Run()
sidecar = &ReadSideCar{AbstractSideCar: abstract, tableNamePrefix: os.Getenv("TABLE_NAME_PREFIX")}
} else {
sidecar := SpecCatalogSideCar{AbstractSideCar: abstract}
sidecar.Run()
sidecar = &SpecCatalogSideCar{AbstractSideCar: abstract}
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)

go func() {
sig := <-sigs
logging.Infof("Received signal: %s. Shutting down...", sig)
sidecar.Close()
}()

sidecar.Run()

}

func (s *AbstractSideCar) log(message string, args ...any) {
Expand Down
52 changes: 37 additions & 15 deletions sync-sidecar/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ActiveStream struct {
}

const interruptError = "Stream was interrupted. Check logs for errors."
const cancelledError = "Sync job was cancelled"

func NewActiveStream(name, mode string) *ActiveStream {
return &ActiveStream{name: name, mode: mode, StreamStat: &StreamStat{Status: "RUNNING"}}
Expand Down Expand Up @@ -95,12 +96,12 @@ func (s *ActiveStream) Commit(strict bool) (state bulker.State) {
return
}

func (s *ActiveStream) Close(complete, strict bool) (state bulker.State) {
func (s *ActiveStream) Close(complete, cancelled, strict bool) (state bulker.State) {
if complete {
state = s.Commit(strict)
} else {
state = s.Abort()
if s.Error == "" {
if s.Error == "" && !cancelled {
s.Error = utils.NvlString(s.errorFromLogs, interruptError)
s.noTrustworthyError = true
}
Expand All @@ -111,6 +112,13 @@ func (s *ActiveStream) Close(complete, strict bool) (state bulker.State) {
} else {
s.Status = "FAILED"
}
} else if cancelled {
if s.EventsCount > 0 {
s.Status = "PARTIAL"
s.Error = cancelledError
} else {
s.Status = "CANCELLED"
}
} else if s.Status == "RUNNING" {
s.Status = "SUCCESS"
}
Expand Down Expand Up @@ -178,17 +186,24 @@ func (s *ReadSideCar) Run() {
defer s.dbpool.Close()

defer func() {
cancelled := s.cancelled.Load()
if r := recover(); r != nil {
s.registerErr(fmt.Errorf("%v", r))
s.closeActiveStreams(false)
} else {
s.closeActiveStreams(!cancelled)
}
s.closeActiveStreams()
if len(s.processedStreams) > 0 {
statusMap := types2.NewOrderedMap[string, any]()
s.catalog.ForEach(func(streamName string, _ *Stream) {
if stream, ok := s.processedStreams[streamName]; ok {
statusMap.Set(streamName, stream.StreamStat)
} else {
statusMap.Set(streamName, &StreamStat{Status: "FAILED", Error: "Stream was not processed. Check logs for errors."})
if cancelled {
statusMap.Set(streamName, &StreamStat{Status: "CANCELLED"})
} else {
statusMap.Set(streamName, &StreamStat{Status: "FAILED", Error: "Stream was not processed. Check logs for errors."})
}
}
})
allSuccess := true
Expand All @@ -207,13 +222,17 @@ func (s *ReadSideCar) Run() {
status = "SUCCESS"
} else if allFailed {
status = "FAILED"
} else if cancelled {
status = "CANCELLED"
}

processedStreamsJson, _ := jsonorder.Marshal(statusMap)
s.sendFinalStatus(status, string(processedStreamsJson))
} else if s.isErr() {
s.sendFinalStatus("FAILED", "ERROR: "+s.firstErr.Error())
os.Exit(1)
} else if cancelled {
s.sendFinalStatus("CANCELLED", "")
} else {
s.sendFinalStatus("SUCCESS", "")
}
Expand Down Expand Up @@ -247,32 +266,35 @@ func (s *ReadSideCar) Run() {
}
var stdOutErrWaitGroup sync.WaitGroup

errPipe, _ := os.Open(s.stdErrPipeFile)
defer errPipe.Close()
s.errPipe, _ = os.Open(s.stdErrPipeFile)
defer s.errPipe.Close()
stdOutErrWaitGroup.Add(1)
// read from stderr
go func() {
defer stdOutErrWaitGroup.Done()
scanner := bufio.NewScanner(errPipe)
scanner := bufio.NewScanner(s.errPipe)
scanner.Buffer(make([]byte, 1024*10), 1024*1024*10)
for scanner.Scan() {
line := scanner.Text()
s.sourceLog("ERRSTD", line)
}
if err := scanner.Err(); err != nil {
if err := scanner.Err(); err != nil && !s.cancelled.Load() {
s.panic("error reading from err pipe: %v", err)
}
}()

s.processedStreams = map[string]*ActiveStream{}
outPipe, _ := os.Open(s.stdOutPipeFile)
defer outPipe.Close()
s.outPipe, _ = os.Open(s.stdOutPipeFile)
defer s.outPipe.Close()
if s.cancelled.Load() {
return
}
stdOutErrWaitGroup.Add(1)
// read from stdout
go func() {
defer stdOutErrWaitGroup.Done()

scanner := bufio.NewScanner(outPipe)
scanner := bufio.NewScanner(s.outPipe)
scanner.Buffer(make([]byte, 1024*10), 1024*1024*10)
for scanner.Scan() {
s.lastMessageTime.Store(time.Now().Unix())
Expand Down Expand Up @@ -311,7 +333,7 @@ func (s *ReadSideCar) Run() {
s.panic("not supported Airbyte message type: %s", row.Type)
}
}
if err := scanner.Err(); err != nil {
if err := scanner.Err(); err != nil && !s.cancelled.Load() {
s.panic("error reading from pipe: %v", err)
}
}()
Expand Down Expand Up @@ -420,7 +442,7 @@ func (s *ReadSideCar) checkpointIfNecessary(stream *ActiveStream) {

func (s *ReadSideCar) _closeStream(stream *ActiveStream, complete bool, strict bool) {
wasActive := stream.IsActive()
state := stream.Close(complete, strict)
state := stream.Close(complete, s.cancelled.Load(), strict)
if stream.Error != "" {
s.errprint("Stream '%s' bulker commit failed: %v", stream.name, stream.Error)
} else if wasActive {
Expand All @@ -433,10 +455,10 @@ func (s *ReadSideCar) _closeStream(stream *ActiveStream, complete bool, strict b
s.log("Stream '%s' closed: status: %s rows: %d", stream.name, stream.Status, stream.EventsCount)
}

func (s *ReadSideCar) closeActiveStreams() {
func (s *ReadSideCar) closeActiveStreams(complete bool) {
for _, stream := range s.processedStreams {
if stream.Status == "RUNNING" {
s._closeStream(stream, true, true)
s._closeStream(stream, complete, true)
}
}
}
Expand Down
Loading

0 comments on commit dbb285a

Please sign in to comment.