diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index c7ed7211f44..c4293c6abcd 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -71,15 +71,10 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er for { // poll blocks until a task is available or the context is canceled / worker is kicked task, err := s.queue.Poll(c, agent.ID, filterFn) - if err != nil { + if err != nil || task == nil { return nil, err } - // poll returns nil task if context was canceled / worker was kicked - if task == nil { - return nil, nil - } - if task.ShouldRun() { workflow := new(rpc.Workflow) err = json.Unmarshal(task.Data, workflow) diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 7c746e3dc3d..29e24d1035e 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -17,6 +17,7 @@ package queue import ( "container/list" "context" + "fmt" "sync" "time" @@ -36,7 +37,7 @@ type worker struct { agentID int64 filter FilterFn channel chan *model.Task - stop context.CancelFunc + stop context.CancelCauseFunc } type fifo struct { @@ -85,7 +86,7 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { // Poll retrieves and removes a task head of this queue. func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) { q.Lock() - ctx, stop := context.WithCancel(c) + ctx, stop := context.WithCancelCause(c) w := &worker{ agentID: agentID, @@ -103,7 +104,7 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, q.Lock() delete(q.workers, w) q.Unlock() - return nil, nil + return nil, ctx.Err() case t := <-w.channel: return t, nil } @@ -244,7 +245,7 @@ func (q *fifo) KickAgentWorkers(agentID int64) { for w := range q.workers { if w.agentID == agentID { - w.stop() + w.stop(fmt.Errorf("worker was kicked")) delete(q.workers, w) } }