Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【请教】bull bee-queue队列中process如何在多个work上执行一次 #3969

Closed
zhang988925 opened this issue Oct 9, 2019 · 7 comments
Assignees
Labels

Comments

@zhang988925
Copy link

zhang988925 commented Oct 9, 2019

app.js文件代码

class AppBootHook {
  constructor (app) {
    this.app = app
  }

  /**
   * 应用已经启动完毕
   * @return {Promise<void>}
   */
  async didReady () {
    const ctx = await this.app.createAnonymousContext()
    const queue = this.app.bull.get('q1')
    ctx.getLogger('build').info('队列监听' + JSON.stringify(this.app))
    queue.process(async job => {
      return await ctx.service.buildIos.runJob(job)
    })

    // 监听任务
    queue.on('completed', (job, result) => {
      ctx.getLogger('build').info('任务已完成,任务ID: %s', job.id)
    })
    queue.on('failed', (job, err) => {
      ctx.getLogger('build').error('任务已失败,任务ID: %s', job.id)
    })
  }
}

module.exports = AppBootHook

问题:
默认queue.process是同时执行一个任务,npm run dev下正常,但是npm run start后会在多个work里面执行,导致队列process同时执行多个。
看了issue中又2个朋友有类型情况,请问如何在一个work上执行或者同时只有一个proccess有效?
(备注:agent通知这个不行,因为无法获取到队列进度,还是会导致并行)

@zhang988925
Copy link
Author

#2496

@atian25
Copy link
Member

atian25 commented Oct 9, 2019

封装为一个 egg-schedule 的策略,触发的时候丢给 worker 执行,执行完后通知回来,再调度下一个。

@popomore
Copy link
Member

要看下这个库是不是分布式的

@zhang988925
Copy link
Author

多进程研发模式增强
https://eggjs.org/zh-cn/advanced/cluster-client.html
这个可以解决我的问题吗?

@zhang988925
Copy link
Author

zhang988925 commented Oct 12, 2019

难道不能吧任务放在单个work上执行,并返回结果?
OptimalBits/bull#488
agent和app传递消息可以实现单个work上执行,代码贴在下面,但是我担心如果work意外退出了不传递结果消息到agent,那队列就暂停了

@atian25 你好,你的思路我想了一下,但是有个问题,定时最小间隔是1分钟,会出现队列执行延迟,有更好的方法吗?

@zhang988925
Copy link
Author

zhang988925 commented Oct 12, 2019

// agent.js
class AgentBootHook {
  constructor (agent) {
    this.agent = agent
  }

  async didReady () {
    const ctx = await this.agent.createAnonymousContext()

    const queue = this.agent.bull.get('q1')
    queue.process((job) => {
      // 监听app通知处理结果,收到消息后返回成功
      this.agent.getLogger('build').info(`Job process with id ${job.id} and data ${JSON.stringify(job.data)}`)

      // 通知一个app起来干活
      this.agent.messenger.sendRandom('build-ios', {job})

      // 监听任务处理结果
      return new Promise((resolve, reject) => {
        this.agent.messenger.on('build-ios',  (data) => {
          resolve(data)
        })
      })
    })

    queue.on('completed', (job, result) => {
      this.agent.getLogger('build').info(`Job completed with id ${job.id} result ${JSON.stringify(result)}`)
    })
  }
}

module.exports = AgentBootHook
// app.js
class AppBootHook {
  constructor (app) {
    this.app = app
  }

  async didReady () {
    const ctx = await this.app.createAnonymousContext()

    this.app.messenger.on('build', async (data) => {
      const { job } = data
      const result = await ctx.service.buil.runJob(job)

      this.app.messenger.sendToAgent('build', result)
    })
  }
}

module.exports = AppBootHook

@atian25
Copy link
Member

atian25 commented Oct 12, 2019

我那个指的是自定义一个 ScheduleStrategy

https://github.com/eggjs/egg-schedule#schedule-type

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants