Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Canceled task goes into retry state #532

Open
patriceshot opened this issue Sep 1, 2022 · 14 comments
Open

[BUG] Canceled task goes into retry state #532

patriceshot opened this issue Sep 1, 2022 · 14 comments
Assignees
Labels
bug Something isn't working investigate

Comments

@patriceshot
Copy link

patriceshot commented Sep 1, 2022

Describe the bug
Canceled task goes into retry state, and is consumed again.
If I return a Skip retry error, it is ignored.

To Reproduce
Steps to reproduce the behavior (Code snippets if applicable):

  1. Start a worker
  2. Enqueue a task which starts a long running task
  3. Cancel the task (either with inspector or asynqmon)
  4. Task goes into retry state

Expected behavior
A canceled task should be archived or completed, we do not want it to be consumed again.

Code sample
Here's the code of my handler (the rest of the setup is standard):

func doWork(ctx context.Context, t *asynq.Task) error {
    for {
        select {
        case <-ctx.Done():
            log.Println("canceling inside DoWork")
            return nil
        default:
            time.Sleep(2 * time.Second)
            log.Printf("running... %v", time.Now())
        }
    }
}

func HandleStartPullTask(ctx context.Context, t *asynq.Task) error {
    c := make(chan error, 1)
    go func() {
        c <- doWork(ctx, t)
    }()
    select {
    case <-ctx.Done():
        // cancelation signal received, abandon this work.
        log.Println("canceling outside DoWork")
        return asynq.SkipRetry
    case res := <-c:
        return res
    }
}

Environment (please complete the following information):

  • OS: Linux
  • Version of asynq package: v0.23.0

Additional context
When I watch the error in processor.go, handleFailedMessage, the error is "Context canceled", even if I return a "skip retry" error in my task, it is ignored. Naively, I would add "context canceled" to the condition to skip retry, but perhaps I'm missing something ?

@patriceshot patriceshot added the bug Something isn't working label Sep 1, 2022
@dxl3811051
Copy link

dxl3811051 commented Sep 14, 2022

我也碰到了这个问题 我以为就我碰到了呢,我想把任务终止掉 而且设置了重试次数为0, 但是当我cancel task时 这个任务会重新启动,而且原来的task也没有终止掉 最终时跑了2个任务。。。

@linhbkhn95
Copy link
Contributor

linhbkhn95 commented Sep 21, 2022

As I know based on asynq code, When a message is assigned to a worker, it also is created a cancel() function via
diagram like:

Biểu đồ không có tiêu đề-Trang-3 drawio

explain by code:

ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
p.cancelations.Add(msg.ID, cancel)
defer func() {
  cancel()
  p.cancelations.Delete(msg.ID)
}()
....
// New returns a context and cancel function for a given task message.
func New(base context.Context, msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) {
  metadata := taskMetadata{
  id:         msg.ID,
  maxRetry:   msg.Retry,
  retryCount: msg.Retried,
  qname:      msg.Queue,
  }
  ctx := context.WithValue(base, metadataCtxKey, metadata)
  return context.WithDeadline(ctx, deadline)
}

When you action Cancel the task (either with inspector or asynqmon) then asynq will Publish msg to asynq:cancel

// PublishCancelation publish cancelation message to all subscribers.
// The message is the ID for the task to be canceled.
func (r *RDB) PublishCancelation(id string) error {
	var op errors.Op = "rdb.PublishCancelation"
	ctx := context.Background()
	if err := r.client.Publish(ctx, base.CancelChannel, id).Err(); err != nil {
		return errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", err))
	}
	return nil
}

At the Subscriber will receive msg and action cancel() func.

for {
	pubsub, err = s.broker.CancelationPubSub()
	if err != nil {
		s.logger.Errorf("cannot subscribe to cancelation channel: %v", err)
		select {
		case <-time.After(s.retryTimeout):
			continue
		case <-s.done:
			s.logger.Debug("Subscriber done")
			return
		}
	}
	break
}
cancelCh := pubsub.Channel()
for {
	select {
	case <-s.done:
		pubsub.Close()
		s.logger.Debug("Subscriber done")
		return
	case msg := <-cancelCh:
		cancel, ok := s.cancelations.Get(msg.Payload)
		if ok {
			cancel()
		}
	}
}

Therefore, asynq will perform cancel func => return context canceled incontinently instead of SkipRetry. It means your's handler has not run to return the SkipRetry error. => your msg don't move to archive status.

func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
	if p.errHandler != nil {
		p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
	}
	if !p.isFailureFunc(err) {
		// retry the task without marking it as failed
		p.retry(l, msg, err, false /*isFailure*/)
		return
	}
	if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
		p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
		p.archive(l, msg, err)
	} else {
		p.retry(l, msg, err, true /*isFailure*/)
	}
}

I think it can it is the author's intention.
Please confirm for me if I miss understanding @hibiken

@dxl3811051
Copy link

why?
I terminated the mission, but the mission run again

@linhbkhn95
Copy link
Contributor

linhbkhn95 commented Sep 21, 2022

I think thetask canceling here is to stop work at the moment and then action at another time. It mean status reflect
Running -> Stop. Not Running to Archive

@linhbkhn95
Copy link
Contributor

if @dxl3811051 wants as your expected, I can update it into my repo. Then you can use it for now. We need to confirm the author to merge to this repo.

@dxl3811051
Copy link

如果@dxl3811051如您所料,我可以将其更新到我的仓库中。然后你可以暂时使用它。我们需要确认作者才能合并到这个 repo。

thanks

@OAyomide
Copy link

@linhbkhn95 how do you move to archive then? i tried cancelling the task but it doesnt work. it says i should use CancelTask and i cannot see reference to this anywhere.

@linhbkhn95
Copy link
Contributor

linhbkhn95 commented Dec 3, 2022

We cannot move directly from a normal task to an archive task. It only performs it when num of retry reach >= the limit. If you need it can improve at my repo instead of at this repo. @OAyomide

@hungtcs
Copy link

hungtcs commented Aug 9, 2023

+1 Same Issues, are there any existing solutions?

image

@agorman
Copy link

agorman commented Feb 29, 2024

I'm running into this issue as well. It would be trivial to write middleware that could check if the context has been cancelled and return an asynq.SkipRetry. However, the context may have been cancelled for other reasons. For example a timeout or a deadline.

I propose that instead of using context.CancelFunc to cancel the task internally you use a context.CancelCauseFunc. That way we could use context.Cause(ctx) to see why it was cancelled and call asynq.SkipRetry accordingly.

Thoughts?

@agorman
Copy link

agorman commented Mar 1, 2024

After more investigation my solution above will not work. Asynq returns context.Cancelled error right away and does not wait for the task to finish or read it's error message. This was also explained above by @linhbkhn95 but I missed it.

I think it would be nice if we could have a mechanism to cancel and skip retry via the inspector.

@kamikazechaser
Copy link
Collaborator

I don't think we can have a case where any context.Cancelled error pushes a task into archived state here. The potential side effects are just too many.

Ideally we want something in the handleFailedMessage that either reads the task message metadata (set immediately before signalling context cancellation) or another error type.

@handsomefox
Copy link

As a temporary solution until the library itself solves this issue, I've ended up using this code to cancel tasks:

func (t *Tasker) CancelTask(queue, id string) error {
	if err := t.inspector.CancelProcessing(id); err != nil {
		return err
	}

	for range 100 {
		err := t.inspector.ArchiveTask(queue, id)
		if err == nil {
			break
		}
		if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) {
			break
		}
		time.Sleep(time.Millisecond * 100)
	}

	return nil
}

It might not work if you have no retry delay set, so use at your own risk.

@alexshopee
Copy link

Suppose there is a long running task and it would not success anymore, then the best practice is to just cancel it without any retries to save the worker resources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working investigate
Projects
None yet
Development

No branches or pull requests

10 participants