Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Omission of Delayed creation for RepeatJob #2744

Closed
Feelw00 opened this issue Aug 29, 2024 · 10 comments · Fixed by #2778
Closed

Omission of Delayed creation for RepeatJob #2744

Feelw00 opened this issue Aug 29, 2024 · 10 comments · Fixed by #2778

Comments

@Feelw00
Copy link

Feelw00 commented Aug 29, 2024

Hi,

After prioritizing the execution of a RepeatJob through Promote, the creation of Delayed is omitted when the server restarts.

There is a RepeatJob scheduled to run every day at 09:00. However, if I promote this job to prioritize its execution through the Bull Dashboard and then restart the server, the job is not created in Delayed, resulting in an issue.

This issue arises because, due to the nature of RepeatJob, a Completed Job with the same ID already exists.

When the getNextMillis function in the src/classes/repeat.ts file calculates the next execution time, it generates an ID based on the current time without checking for existing Completed Jobs, leading to this issue

To resolve this issue, I modified the process to check for Completed Jobs using the getCompleted method of the QueueGetters class before executing the add method of the Queue class.

This allows us to retrieve the prevMillis value of the most recent job and include it in the opts object when creating a new job with the add method

When using RepeatJob, this issue may cause regularly scheduled jobs to be missed. I would appreciate it if you could consider adding a fix for this to the core

Here is an example code to resolve the issue.

  // src/classes/queue.ts
  async add(
    name: NameType,
    data: DataType,
    opts?: JobsOptions,
  ): Promise<Job<DataType, ResultType, NameType>> {
    if (opts && opts.repeat) {
      if (opts.repeat.endDate) {
        if (+new Date(opts.repeat.endDate) < Date.now()) {
          throw new Error('End date must be greater than current timestamp');
        }
      }
      
      // Retrieve the prevMillis value of the most recent Completed Job using the getCompleted method.
      opts.prevMillis = opts.prevMillis || (await this.getCompleted())[0]?.opts.prevMillis || 0;

      return (await this.repeat).addNextRepeatableJob<
        DataType,
        ResultType,
        NameType
      >(name, data, { ...this.jobsOpts, ...opts }, true);
    } else {
      const jobId = opts?.jobId;

      if (jobId == '0' || jobId?.startsWith('0:')) {
        throw new Error("JobId cannot be '0' or start with 0:");
      }

      const job = await this.Job.create<DataType, ResultType, NameType>(
        this as MinimalQueue,
        name,
        data,
        {
          ...this.jobsOpts,
          ...opts,
          jobId,
        },
      );
      this.emit('waiting', job);
      return job;
    }
  }
@hauserkristof
Copy link

Second this. We faced the same issue!

@Feelw00
Copy link
Author

Feelw00 commented Sep 10, 2024

To address this issue, when adding to the Queue, you can explicitly include prevMillis by using getCompleted to retrieve the previous completion date.

class RepeatJobService {
  constructor(private readonly queue: Queue) {}

  async startJob() {
    const options = {
      repeat: '0 * * * *',
      // Additional Options
      prevMillis: (await this.queue.getCompleted())[0].opts.prevMillis || 0,
    };

    return await this.queue.add(jobName, jobData, options);
  }
}

I believe that when using RepeatJob, the next job should be created based on the most recently completed job by default.
This can be achieved without major modifications to the core logic, simply by supporting the prevMillis parameter.

However, if modifying the core logic is difficult due to potential side effects, a viable alternative would be to include this information in the official documentation and comments.

@roggervalf
Copy link
Collaborator

hi guys, I tried to replicate your case but I couldn't, could you pls take a look on my test case that I added and let me know how you are reproducing it

@Feelw00
Copy link
Author

Feelw00 commented Sep 12, 2024

@roggervalf

Hi,

The issue occurs within the queue.add() method.

Specifically, it happens in the updateRepeatableJob() method inside the queue.add() method, where the problem arises in the getNextMillis() function while calculating the nextMillis variable.

Below is a portion of the updateRepeatableJob() method.

// Check if we reached the end date of the repeatable job
let now = Date.now();
const { endDate } = repeatOpts;
if (!(typeof endDate === undefined) && now > new Date(endDate!).getTime()) {
  return;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

const nextMillis = await this.repeatStrategy(now, repeatOpts, name);
const { every, pattern } = repeatOpts;

If the repeatOpts object does not have a prevMillis field, now will be set to the current time.

Below is the getNextMillis() function.

export const getNextMillis = (
  millis: number,
  opts: RepeatOptions,
): number | undefined => {
  const pattern = opts.pattern;
  if (pattern && opts.every) {
    throw new Error(
      'Both .pattern and .every options are defined for this repeatable job',
    );
  }

  if (opts.every) {
    return (
      Math.floor(millis / opts.every) * opts.every +
      (opts.immediately ? 0 : opts.every)
    );
  }

  const currentDate =
    opts.startDate && new Date(opts.startDate) > new Date(millis)
      ? new Date(opts.startDate)
      : new Date(millis);
  const interval = parseExpression(pattern, {
    ...opts,
    currentDate,
  });

  try {
    if(opts.immediately){
      return new Date().getTime();
    } else {
      return interval.next().getTime();
    }
  } catch (e) {
    // Ignore error
  }
};

Here, it takes millis as a parameter to calculate currentDate, which is then used to determine the next job's time.

Based on the test you shared, it seems the issue can be reproduced by modifying it to add a repeat job with the same job name after restarting the worker following moveToCompleted.

Currently, I am outside, so if possible, I will write a PR that includes the test code later.

@manast
Copy link
Contributor

manast commented Sep 12, 2024

@Feelw00 the job that was created in a given iteration includes prevMillis in its options. Therefore when you promote a given delayed job, and this job is processed, it will add the next iteration based on the job's prevMillis. So this method should work. Now I am confused why a "server restart" is required in order to reproduce the issue, maybe you can give more context.

@Feelw00
Copy link
Author

Feelw00 commented Sep 13, 2024

@manast

Hi,

As you commented, during the promote operation, delayed jobs are successfully registered based on prevMillis.

However, when the worker is restarted in this state, it seems that due to the issue, already completed delayed jobs are being generated and overwritten as completed jobs.

In my operating environment, server restarts mostly occur due to deployments.

We are currently using a container environment with K8s, and deployments happen several times a day.

Besides deployments, there may also be situations where we need to restart the worker due to server load or unexpected errors.
It is quite cumbersome to remember and manually recreate all the promoted jobs every time this happens.

Would this explanation provide sufficient justification for fixing the issue?

@manast
Copy link
Contributor

manast commented Sep 13, 2024

However, when the worker is restarted in this state, it seems that due to the issue, already completed delayed jobs are being generated and overwritten as completed jobs.

This sounds really strange to me. Could provide a test that demonstrates this issue for us so that we can look into?

@Feelw00
Copy link
Author

Feelw00 commented Sep 19, 2024

@manast @roggervalf

Hi,

As I mentioned in the PR,
it seems that the server restart is unrelated to the missing delayedJob.

In my service, the missing delayedJob after the server restart appears to have occurred because the queue.add() method was executed for all jobs during server startup.

However, I still believe this issue needs to be resolved.

What are your thoughts?

@manast
Copy link
Contributor

manast commented Sep 19, 2024

@Feelw00 as soon as we have a reproducible test case we can look into it.

@roggervalf
Copy link
Collaborator

thank you @Feelw00, your test case was useful. I create a pr to address it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants