From dbb285a74268b4b42df21dcce3159b62ae123f74 Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Thu, 18 Jul 2024 19:56:47 +0400 Subject: [PATCH] sync: support for cancelling of running tasks --- sync-controller/job_runner.go | 18 ++++++++++-- sync-controller/router.go | 1 + sync-controller/task.go | 3 ++ sync-controller/task_manager.go | 13 +++++++-- sync-sidecar/db/db.go | 15 +++++++--- sync-sidecar/main.go | 42 +++++++++++++++++++++++--- sync-sidecar/read.go | 52 +++++++++++++++++++++++---------- sync-sidecar/spec_catalog.go | 21 ++++++++----- 8 files changed, 129 insertions(+), 36 deletions(-) diff --git a/sync-controller/job_runner.go b/sync-controller/job_runner.go index 34ce1744..7e74f683 100644 --- a/sync-controller/job_runner.go +++ b/sync-controller/job_runner.go @@ -260,17 +260,22 @@ func (j *JobRunner) CreateCronJob(taskDescriptor TaskDescriptor, configuration * taskStatus.Description = err.Error() } else { taskStatus.Status = StatusCreated + taskStatus.Description = "Starting sync job..." taskStatus.PodName = cronJob.Name } j.sendStatus(&taskStatus) return taskStatus } +func PodName(syncId, taskId, pkg string) string { + taskId = utils.NvlString(taskId, uuid.NewLettersNumbers()) + podId := utils.JoinNonEmptyStrings(".", syncId, taskId) + return strings.ToLower(nonAlphaNum.ReplaceAllLiteralString(pkg, "-") + "-" + podId) +} + func (j *JobRunner) CreatePod(taskDescriptor TaskDescriptor, configuration *TaskConfiguration) TaskStatus { taskStatus := TaskStatus{TaskDescriptor: taskDescriptor} - taskId := utils.NvlString(taskDescriptor.TaskID, uuid.NewLettersNumbers()) - podId := utils.JoinNonEmptyStrings(".", taskStatus.SyncID, taskId) - podName := strings.ToLower(nonAlphaNum.ReplaceAllLiteralString(taskDescriptor.Package, "-") + "-" + podId) + podName := taskDescriptor.PodName() if !configuration.IsEmpty() { secret := j.createSecret(podName, taskDescriptor, configuration) _, err := j.clientset.CoreV1().Secrets(j.namespace).Create(context.Background(), secret, metav1.CreateOptions{}) @@ -288,6 +293,7 @@ func (j *JobRunner) CreatePod(taskDescriptor TaskDescriptor, configuration *Task taskStatus.Description = err.Error() } else { taskStatus.Status = StatusCreated + taskStatus.Description = "Starting sync job..." taskStatus.PodName = pod.Name } j.sendStatus(&taskStatus) @@ -496,6 +502,12 @@ func (j *JobRunner) createCronJob(jobId string, task TaskDescriptor, configurati return cronJob } +func (j *JobRunner) TerminatePod(podName string) { + _ = j.clientset.CoreV1().Pods(j.namespace).Delete(context.Background(), podName, metav1.DeleteOptions{}) + _ = j.clientset.CoreV1().Secrets(j.namespace).Delete(context.Background(), podName+"-config", metav1.DeleteOptions{}) + _ = j.clientset.CoreV1().ConfigMaps(j.namespace).Delete(context.Background(), podName+"-config", metav1.DeleteOptions{}) +} + func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration *TaskConfiguration) *v1.Pod { var command string switch task.TaskType { diff --git a/sync-controller/router.go b/sync-controller/router.go index f764b203..77c2e26f 100644 --- a/sync-controller/router.go +++ b/sync-controller/router.go @@ -22,6 +22,7 @@ func NewRouter(appContext *Context) *Router { engine.POST("/check", appContext.taskManager.CheckHandler) engine.POST("/discover", appContext.taskManager.DiscoverHandler) engine.POST("/read", appContext.taskManager.ReadHandler) + engine.GET("/cancel", appContext.taskManager.CancelHandler) engine.GET("/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "pass"}) diff --git a/sync-controller/task.go b/sync-controller/task.go index 35fa2818..fd436299 100644 --- a/sync-controller/task.go +++ b/sync-controller/task.go @@ -22,6 +22,9 @@ type TaskDescriptor struct { StartedAt string `json:"startedAt"` } +func (t *TaskDescriptor) PodName() string { + return PodName(t.SyncID, t.TaskID, t.Package) +} func (t *TaskDescriptor) StartedAtTime() time.Time { tm, err := time.Parse(time.RFC3339, t.StartedAt) if err != nil { diff --git a/sync-controller/task_manager.go b/sync-controller/task_manager.go index 6c3dcc4d..03cc016b 100644 --- a/sync-controller/task_manager.go +++ b/sync-controller/task_manager.go @@ -95,6 +95,15 @@ func (t *TaskManager) DiscoverHandler(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"ok": true}) } +func (t *TaskManager) CancelHandler(c *gin.Context) { + pkg := c.Query("package") + syncId := c.Query("syncId") + taskId := c.Query("taskId") + _ = db.UpdateRunningTaskStatus(t.dbpool, taskId, "CANCELLED") + t.jobRunner.TerminatePod(PodName(syncId, taskId, pkg)) + c.JSON(http.StatusOK, gin.H{"ok": true}) +} + func (t *TaskManager) ReadHandler(c *gin.Context) { taskConfig := TaskConfiguration{} err := jsonorder.NewDecoder(c.Request.Body).Decode(&taskConfig) @@ -188,9 +197,9 @@ func (t *TaskManager) listenTaskStatus() { case "read": switch st.Status { case StatusCreateFailed, StatusFailed, StatusInitTimeout: - err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "FAILED", strings.Join([]string{string(st.Status), st.Description}, ": ")) + err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "FAILED", strings.Join([]string{string(st.Status), st.Description}, ": "), st.StartedBy) case StatusCreated: - err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "RUNNING", strings.Join([]string{string(st.Status), st.Description}, ": ")) + err = db.UpsertRunningTask(t.dbpool, st.SyncID, st.TaskID, st.Package, st.PackageVersion, st.StartedAtTime(), "RUNNING", strings.Join([]string{string(st.Status), st.Description}, ": "), st.StartedBy) case StatusRunning: err = db.UpdateRunningTaskDate(t.dbpool, st.TaskID) default: diff --git a/sync-sidecar/db/db.go b/sync-sidecar/db/db.go index d4c4e57a..85d07db0 100644 --- a/sync-sidecar/db/db.go +++ b/sync-sidecar/db/db.go @@ -28,11 +28,13 @@ ON CONFLICT ON CONSTRAINT source_state_pkey DO UPDATE SET state=$3, timestamp = upsertTaskSQL = `INSERT INTO source_task (sync_id, task_id, package, version, started_at, updated_at, status, description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8 ) ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8` - upsertRunningTaskSQL = `INSERT INTO source_task as st (sync_id, task_id, package, version, started_at, updated_at, status, description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8 ) -ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8 where st.status = 'RUNNING'` + upsertRunningTaskSQL = `INSERT INTO source_task as st (sync_id, task_id, package, version, started_at, updated_at, status, description, started_by) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8, started_by=$9 where st.status = 'RUNNING'` updateRunningTaskDateSQL = `UPDATE source_task SET updated_at=$2 where task_id=$1 and status = 'RUNNING'` + updateRunningTaskStatusSQL = `UPDATE source_task SET status=$2 where task_id=$1 and status = 'RUNNING'` + upsertCheckSQL = `INSERT INTO source_check (package, version, key, status, description, timestamp) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT source_check_pkey DO UPDATE SET status = $4, description=$5, timestamp=$6` @@ -79,8 +81,8 @@ func UpsertTask(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersio return err } -func UpsertRunningTask(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersion string, startedAt time.Time, status, description string) error { - _, err := dbpool.Exec(context.Background(), upsertRunningTaskSQL, syncId, taskId, packageName, packageVersion, startedAt, time.Now(), status, description) +func UpsertRunningTask(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersion string, startedAt time.Time, status, description, startedBy string) error { + _, err := dbpool.Exec(context.Background(), upsertRunningTaskSQL, syncId, taskId, packageName, packageVersion, startedAt, time.Now(), status, description, startedBy) return err } @@ -89,6 +91,11 @@ func UpdateRunningTaskDate(dbpool *pgxpool.Pool, taskId string) error { return err } +func UpdateRunningTaskStatus(dbpool *pgxpool.Pool, taskId, status string) error { + _, err := dbpool.Exec(context.Background(), updateRunningTaskStatusSQL, taskId, status) + return err +} + func UpsertCheck(dbpool *pgxpool.Pool, packageName, packageVersion, storageKey, status, description string, timestamp time.Time) error { _, err := dbpool.Exec(context.Background(), upsertCheckSQL, packageName, packageVersion, storageKey, status, description, timestamp) return err diff --git a/sync-sidecar/main.go b/sync-sidecar/main.go index bf260825..11957709 100644 --- a/sync-sidecar/main.go +++ b/sync-sidecar/main.go @@ -8,12 +8,21 @@ import ( _ "github.com/jitsucom/bulker/bulkerlib/implementations/api_based" _ "github.com/jitsucom/bulker/bulkerlib/implementations/file_storage" _ "github.com/jitsucom/bulker/bulkerlib/implementations/sql" + "github.com/jitsucom/bulker/jitsubase/logging" "github.com/jitsucom/bulker/sync-sidecar/db" "os" + "os/signal" "strings" + "sync/atomic" + "syscall" "time" ) +type SideCar interface { + Run() + Close() +} + type AbstractSideCar struct { syncId string taskId string @@ -34,6 +43,21 @@ type AbstractSideCar struct { //first error occurred during command firstErr error + + errPipe *os.File + outPipe *os.File + cancelled atomic.Bool +} + +func (s *AbstractSideCar) Close() { + s._log("jitsu", "WARN", "Cancelling...") + s.cancelled.Store(true) + if s.outPipe != nil { + _ = s.outPipe.Close() + } + if s.errPipe != nil { + _ = s.errPipe.Close() + } } func main() { @@ -43,6 +67,7 @@ func main() { } command := os.Getenv("COMMAND") + var sidecar SideCar abstract := &AbstractSideCar{ syncId: os.Getenv("SYNC_ID"), taskId: os.Getenv("TASK_ID"), @@ -56,13 +81,22 @@ func main() { startedAt: startedAt, } if command == "read" { - sidecar := &ReadSideCar{AbstractSideCar: abstract, tableNamePrefix: os.Getenv("TABLE_NAME_PREFIX")} - sidecar.Run() + sidecar = &ReadSideCar{AbstractSideCar: abstract, tableNamePrefix: os.Getenv("TABLE_NAME_PREFIX")} } else { - sidecar := SpecCatalogSideCar{AbstractSideCar: abstract} - sidecar.Run() + sidecar = &SpecCatalogSideCar{AbstractSideCar: abstract} } + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + + go func() { + sig := <-sigs + logging.Infof("Received signal: %s. Shutting down...", sig) + sidecar.Close() + }() + + sidecar.Run() + } func (s *AbstractSideCar) log(message string, args ...any) { diff --git a/sync-sidecar/read.go b/sync-sidecar/read.go index 2c68a176..bd989773 100644 --- a/sync-sidecar/read.go +++ b/sync-sidecar/read.go @@ -38,6 +38,7 @@ type ActiveStream struct { } const interruptError = "Stream was interrupted. Check logs for errors." +const cancelledError = "Sync job was cancelled" func NewActiveStream(name, mode string) *ActiveStream { return &ActiveStream{name: name, mode: mode, StreamStat: &StreamStat{Status: "RUNNING"}} @@ -95,12 +96,12 @@ func (s *ActiveStream) Commit(strict bool) (state bulker.State) { return } -func (s *ActiveStream) Close(complete, strict bool) (state bulker.State) { +func (s *ActiveStream) Close(complete, cancelled, strict bool) (state bulker.State) { if complete { state = s.Commit(strict) } else { state = s.Abort() - if s.Error == "" { + if s.Error == "" && !cancelled { s.Error = utils.NvlString(s.errorFromLogs, interruptError) s.noTrustworthyError = true } @@ -111,6 +112,13 @@ func (s *ActiveStream) Close(complete, strict bool) (state bulker.State) { } else { s.Status = "FAILED" } + } else if cancelled { + if s.EventsCount > 0 { + s.Status = "PARTIAL" + s.Error = cancelledError + } else { + s.Status = "CANCELLED" + } } else if s.Status == "RUNNING" { s.Status = "SUCCESS" } @@ -178,17 +186,24 @@ func (s *ReadSideCar) Run() { defer s.dbpool.Close() defer func() { + cancelled := s.cancelled.Load() if r := recover(); r != nil { s.registerErr(fmt.Errorf("%v", r)) + s.closeActiveStreams(false) + } else { + s.closeActiveStreams(!cancelled) } - s.closeActiveStreams() if len(s.processedStreams) > 0 { statusMap := types2.NewOrderedMap[string, any]() s.catalog.ForEach(func(streamName string, _ *Stream) { if stream, ok := s.processedStreams[streamName]; ok { statusMap.Set(streamName, stream.StreamStat) } else { - statusMap.Set(streamName, &StreamStat{Status: "FAILED", Error: "Stream was not processed. Check logs for errors."}) + if cancelled { + statusMap.Set(streamName, &StreamStat{Status: "CANCELLED"}) + } else { + statusMap.Set(streamName, &StreamStat{Status: "FAILED", Error: "Stream was not processed. Check logs for errors."}) + } } }) allSuccess := true @@ -207,6 +222,8 @@ func (s *ReadSideCar) Run() { status = "SUCCESS" } else if allFailed { status = "FAILED" + } else if cancelled { + status = "CANCELLED" } processedStreamsJson, _ := jsonorder.Marshal(statusMap) @@ -214,6 +231,8 @@ func (s *ReadSideCar) Run() { } else if s.isErr() { s.sendFinalStatus("FAILED", "ERROR: "+s.firstErr.Error()) os.Exit(1) + } else if cancelled { + s.sendFinalStatus("CANCELLED", "") } else { s.sendFinalStatus("SUCCESS", "") } @@ -247,32 +266,35 @@ func (s *ReadSideCar) Run() { } var stdOutErrWaitGroup sync.WaitGroup - errPipe, _ := os.Open(s.stdErrPipeFile) - defer errPipe.Close() + s.errPipe, _ = os.Open(s.stdErrPipeFile) + defer s.errPipe.Close() stdOutErrWaitGroup.Add(1) // read from stderr go func() { defer stdOutErrWaitGroup.Done() - scanner := bufio.NewScanner(errPipe) + scanner := bufio.NewScanner(s.errPipe) scanner.Buffer(make([]byte, 1024*10), 1024*1024*10) for scanner.Scan() { line := scanner.Text() s.sourceLog("ERRSTD", line) } - if err := scanner.Err(); err != nil { + if err := scanner.Err(); err != nil && !s.cancelled.Load() { s.panic("error reading from err pipe: %v", err) } }() s.processedStreams = map[string]*ActiveStream{} - outPipe, _ := os.Open(s.stdOutPipeFile) - defer outPipe.Close() + s.outPipe, _ = os.Open(s.stdOutPipeFile) + defer s.outPipe.Close() + if s.cancelled.Load() { + return + } stdOutErrWaitGroup.Add(1) // read from stdout go func() { defer stdOutErrWaitGroup.Done() - scanner := bufio.NewScanner(outPipe) + scanner := bufio.NewScanner(s.outPipe) scanner.Buffer(make([]byte, 1024*10), 1024*1024*10) for scanner.Scan() { s.lastMessageTime.Store(time.Now().Unix()) @@ -311,7 +333,7 @@ func (s *ReadSideCar) Run() { s.panic("not supported Airbyte message type: %s", row.Type) } } - if err := scanner.Err(); err != nil { + if err := scanner.Err(); err != nil && !s.cancelled.Load() { s.panic("error reading from pipe: %v", err) } }() @@ -420,7 +442,7 @@ func (s *ReadSideCar) checkpointIfNecessary(stream *ActiveStream) { func (s *ReadSideCar) _closeStream(stream *ActiveStream, complete bool, strict bool) { wasActive := stream.IsActive() - state := stream.Close(complete, strict) + state := stream.Close(complete, s.cancelled.Load(), strict) if stream.Error != "" { s.errprint("Stream '%s' bulker commit failed: %v", stream.name, stream.Error) } else if wasActive { @@ -433,10 +455,10 @@ func (s *ReadSideCar) _closeStream(stream *ActiveStream, complete bool, strict b s.log("Stream '%s' closed: status: %s rows: %d", stream.name, stream.Status, stream.EventsCount) } -func (s *ReadSideCar) closeActiveStreams() { +func (s *ReadSideCar) closeActiveStreams(complete bool) { for _, stream := range s.processedStreams { if stream.Status == "RUNNING" { - s._closeStream(stream, true, true) + s._closeStream(stream, complete, true) } } } diff --git a/sync-sidecar/spec_catalog.go b/sync-sidecar/spec_catalog.go index 422b3037..4bdfacb5 100644 --- a/sync-sidecar/spec_catalog.go +++ b/sync-sidecar/spec_catalog.go @@ -32,6 +32,8 @@ func (s *SpecCatalogSideCar) Run() { } else if s.isErr() { s.sendStatus(s.command, "FAILED", s.firstErr.Error()) os.Exit(1) + } else if s.cancelled.Load() { + s.sendStatus(s.command, "CANCELLED", "") } else { s.sendStatus(s.command, "SUCCESS", "") } @@ -39,31 +41,34 @@ func (s *SpecCatalogSideCar) Run() { s.log("Sidecar. command: %s, taskId: %s, package: %s:%s startedAt: %s", s.command, s.taskId, s.packageName, s.packageVersion, s.startedAt.Format(time.RFC3339)) var stdOutErrWaitGroup sync.WaitGroup - errPipe, _ := os.Open(s.stdErrPipeFile) - defer errPipe.Close() + s.errPipe, _ = os.Open(s.stdErrPipeFile) + defer s.errPipe.Close() stdOutErrWaitGroup.Add(1) // read from stderr go func() { defer stdOutErrWaitGroup.Done() - scanner := bufio.NewScanner(errPipe) + scanner := bufio.NewScanner(s.errPipe) scanner.Buffer(make([]byte, 1024*10), 1024*1024*10) for scanner.Scan() { line := scanner.Text() s.sourceLog("ERRSTD", line) } - if err := scanner.Err(); err != nil { + if err := scanner.Err(); err != nil && !s.cancelled.Load() { s.panic("error reading from err pipe: %v", err) } }() - outPipe, _ := os.Open(s.stdOutPipeFile) - defer outPipe.Close() + s.outPipe, _ = os.Open(s.stdOutPipeFile) + defer s.outPipe.Close() + if s.cancelled.Load() { + return + } stdOutErrWaitGroup.Add(1) // read from stdout go func() { defer stdOutErrWaitGroup.Done() - scanner := bufio.NewScanner(outPipe) + scanner := bufio.NewScanner(s.outPipe) scanner.Buffer(make([]byte, 1024*10), 1024*1024*10) for scanner.Scan() { line := scanner.Bytes() @@ -93,7 +98,7 @@ func (s *SpecCatalogSideCar) Run() { s.panic("not supported type: %s", row.Type) } } - if err := scanner.Err(); err != nil { + if err := scanner.Err(); err != nil && !s.cancelled.Load() { s.panic("error reading from pipe: %v", err) } }()