From a40d6ac1a7833c4978a520245bef1fe22fbb9bc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 26 Jul 2023 20:47:01 +0200 Subject: [PATCH] Support PHP 8.2, refactor queuing logic --- .github/workflows/ci.yml | 1 + src/Queue.php | 58 +++++++++++++++++++++++----------------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d278abc..a7f29b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,6 +11,7 @@ jobs: strategy: matrix: php: + - 8.2 - 8.1 - 8.0 - 7.4 diff --git a/src/Queue.php b/src/Queue.php index 60020a5..2287514 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -27,7 +27,10 @@ class Queue implements \Countable private $limit; private $handler; + /** @var int<0,max> */ private $pending = 0; + + /** @var array */ private $queue = array(); /** @@ -373,24 +376,42 @@ public function __invoke() $id = key($queue); assert(is_int($id)); - $deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$deferred) { + /** @var ?PromiseInterface $pending */ + $pending = null; + + $deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$pending) { // forward cancellation to pending operation if it is currently executing - if (isset($deferred->pending) && $deferred->pending instanceof PromiseInterface && \method_exists($deferred->pending, 'cancel')) { - $deferred->pending->cancel(); + if ($pending instanceof PromiseInterface && \method_exists($pending, 'cancel')) { + $pending->cancel(); } - unset($deferred->pending); + $pending = null; - if (isset($deferred->args)) { + if (isset($queue[$id])) { // queued promise cancelled before its handler is invoked // remove from queue and reject explicitly - unset($queue[$id], $deferred->args); + unset($queue[$id]); $reject(new \RuntimeException('Cancelled queued job before processing started')); } }); // queue job to process if number of pending jobs is below concurrency limit again - $deferred->args = func_get_args(); - $queue[$id] = $deferred; + $handler = $this->handler; // PHP 5.4+ + $args = func_get_args(); + $that = $this; // PHP 5.4+ + $queue[$id] = function () use ($handler, $args, $deferred, &$pending, $that) { + $pending = \call_user_func_array($handler, $args); + + $that->await($pending)->then( + function ($result) use ($deferred, &$pending) { + $pending = null; + $deferred->resolve($result); + }, + function ($e) use ($deferred, &$pending) { + $pending = null; + $deferred->reject($e); + } + ); + }; return $deferred->promise(); } @@ -407,7 +428,7 @@ public function count() */ public function await(PromiseInterface $promise) { - $that = $this; + $that = $this; // PHP 5.4+ return $promise->then(function ($result) use ($that) { $that->processQueue(); @@ -430,28 +451,15 @@ public function processQueue() return; } - $deferred = reset($this->queue); - assert($deferred instanceof Deferred); + $next = reset($this->queue); + assert($next instanceof \Closure); unset($this->queue[key($this->queue)]); // once number of pending jobs is below concurrency limit again: // await this situation, invoke handler and await its resolution before invoking next queued job ++$this->pending; - $promise = call_user_func_array($this->handler, $deferred->args); - $deferred->pending = $promise; - unset($deferred->args); - // invoke handler and await its resolution before invoking next queued job - $this->await($promise)->then( - function ($result) use ($deferred) { - unset($deferred->pending); - $deferred->resolve($result); - }, - function ($e) use ($deferred) { - unset($deferred->pending); - $deferred->reject($e); - } - ); + $next(); } }