Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
anbraten committed Feb 12, 2024
1 parent 2afb499 commit a48ded3
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
7 changes: 1 addition & 6 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,10 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er
for {
// poll blocks until a task is available or the context is canceled / worker is kicked
task, err := s.queue.Poll(c, agent.ID, filterFn)
if err != nil {
if err != nil || task == nil {

Check warning on line 74 in server/grpc/rpc.go

View check run for this annotation

Codecov / codecov/patch

server/grpc/rpc.go#L71-L74

Added lines #L71 - L74 were not covered by tests
return nil, err
}

// poll returns nil task if context was canceled / worker was kicked
if task == nil {
return nil, nil
}

if task.ShouldRun() {
workflow := new(rpc.Workflow)
err = json.Unmarshal(task.Data, workflow)
Expand Down
9 changes: 5 additions & 4 deletions server/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package queue
import (
"container/list"
"context"
"fmt"
"sync"
"time"

Expand All @@ -36,7 +37,7 @@ type worker struct {
agentID int64
filter FilterFn
channel chan *model.Task
stop context.CancelFunc
stop context.CancelCauseFunc
}

type fifo struct {
Expand Down Expand Up @@ -85,7 +86,7 @@ func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error {
// Poll retrieves and removes a task head of this queue.
func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) {
q.Lock()
ctx, stop := context.WithCancel(c)
ctx, stop := context.WithCancelCause(c)

w := &worker{
agentID: agentID,
Expand All @@ -103,7 +104,7 @@ func (q *fifo) Poll(c context.Context, agentID int64, f FilterFn) (*model.Task,
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, nil
return nil, ctx.Err()

Check warning on line 107 in server/queue/fifo.go

View check run for this annotation

Codecov / codecov/patch

server/queue/fifo.go#L107

Added line #L107 was not covered by tests
case t := <-w.channel:
return t, nil
}
Expand Down Expand Up @@ -244,7 +245,7 @@ func (q *fifo) KickAgentWorkers(agentID int64) {

for w := range q.workers {
if w.agentID == agentID {
w.stop()
w.stop(fmt.Errorf("worker was kicked"))
delete(q.workers, w)
}

Check warning on line 250 in server/queue/fifo.go

View check run for this annotation

Codecov / codecov/patch

server/queue/fifo.go#L242-L250

Added lines #L242 - L250 were not covered by tests
}
Expand Down

0 comments on commit a48ded3

Please sign in to comment.