From f34af246d57c51b320f9c5331936399daa9ee364 Mon Sep 17 00:00:00 2001 From: Paras Malhotra Date: Mon, 14 Sep 2020 18:16:26 +0530 Subject: [PATCH 1/3] Add ability and command to clear queues --- .../Contracts/Queue/ClearableQueue.php | 14 ++++ .../Providers/ArtisanServiceProvider.php | 14 ++++ src/Illuminate/Queue/Console/ClearCommand.php | 73 +++++++++++++++++++ src/Illuminate/Queue/DatabaseQueue.php | 13 ++++ src/Illuminate/Queue/LuaScripts.php | 18 +++++ src/Illuminate/Queue/RedisQueue.php | 18 ++++- .../QueueDatabaseQueueIntegrationTest.php | 29 ++++++++ tests/Queue/RedisQueueIntegrationTest.php | 19 +++++ 8 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 src/Illuminate/Contracts/Queue/ClearableQueue.php create mode 100644 src/Illuminate/Queue/Console/ClearCommand.php diff --git a/src/Illuminate/Contracts/Queue/ClearableQueue.php b/src/Illuminate/Contracts/Queue/ClearableQueue.php new file mode 100644 index 000000000000..2f5c1ddd464b --- /dev/null +++ b/src/Illuminate/Contracts/Queue/ClearableQueue.php @@ -0,0 +1,14 @@ + 'command.optimize', 'OptimizeClear' => 'command.optimize.clear', 'PackageDiscover' => 'command.package.discover', + 'QueueClear' => 'command.queue.clear', 'QueueFailed' => 'command.queue.failed', 'QueueFlush' => 'command.queue.flush', 'QueueForget' => 'command.queue.forget', @@ -712,6 +714,18 @@ protected function registerQueueWorkCommand() }); } + /** + * Register the command. + * + * @return void + */ + protected function registerQueueClearCommand() + { + $this->app->singleton('command.queue.clear', function () { + return new QueueClearCommand; + }); + } + /** * Register the command. * diff --git a/src/Illuminate/Queue/Console/ClearCommand.php b/src/Illuminate/Queue/Console/ClearCommand.php new file mode 100644 index 000000000000..c092f77137b8 --- /dev/null +++ b/src/Illuminate/Queue/Console/ClearCommand.php @@ -0,0 +1,73 @@ +confirmToProceed()) { + return 1; + } + + $connection = $this->argument('connection') + ?: $this->laravel['config']['queue.default']; + + // We need to get the right queue for the connection which is set in the queue + // configuration file for the application. We will pull it based on the set + // connection being run for the queue operation currently being executed. + $queueName = $this->getQueue($connection); + $queue = ($this->laravel['queue'])->connection($connection); + + if($queue instanceof ClearableQueue) { + $count = $queue->clear($queueName); + + $this->line('Cleared '.$count.' jobs from the '.$queueName.' queue '); + } else { + $this->line('Clearing queues is not supported on '.(new ReflectionClass($queue))->getShortName().' '); + } + + return 0; + } + + /** + * Get the queue name to clear. + * + * @param string $connection + * @return string + */ + protected function getQueue($connection) + { + return $this->option('queue') ?: $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); + } +} diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 5abb989f8551..f58bf375fabf 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -303,6 +303,19 @@ protected function markJobAsReserved($job) return $job; } + /** + * Clear all jobs from the queue. + * + * @param string $queue + * @return int + */ + public function clear($queue) + { + return $this->database->table($this->table) + ->where('queue', $this->getQueue($queue)) + ->delete(); + } + /** * Delete a reserved job from the queue. * diff --git a/src/Illuminate/Queue/LuaScripts.php b/src/Illuminate/Queue/LuaScripts.php index c031140cf732..447c759c4d58 100644 --- a/src/Illuminate/Queue/LuaScripts.php +++ b/src/Illuminate/Queue/LuaScripts.php @@ -20,6 +20,24 @@ public static function size() LUA; } + /** + * Get the Lua script for clearing the queue. + * + * KEYS[1] - The name of the primary queue + * KEYS[2] - The name of the "delayed" queue + * KEYS[3] - The name of the "reserved" queue + * + * @return string + */ + public static function clear() + { + return <<<'LUA' +local size = redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3]) +redis.call('del', KEYS[1], KEYS[2], KEYS[3]) +return size +LUA; + } + /** * Get the Lua script for pushing jobs onto the queue. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index a72593f80000..b4c134fbd0c0 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -3,11 +3,12 @@ namespace Illuminate\Queue; use Illuminate\Contracts\Queue\Queue as QueueContract; +use Illuminate\Contracts\Queue\ClearableQueue; use Illuminate\Contracts\Redis\Factory as Redis; use Illuminate\Queue\Jobs\RedisJob; use Illuminate\Support\Str; -class RedisQueue extends Queue implements QueueContract +class RedisQueue extends Queue implements QueueContract, ClearableQueue { /** * The Redis factory implementation. @@ -256,6 +257,21 @@ protected function retrieveNextJob($queue, $block = true) return [$job, $reserved]; } + /** + * Clear all jobs from the queue. + * + * @param string $queue + * @return int + */ + public function clear($queue) + { + $queue = $this->getQueue($queue); + + return $this->getConnection()->eval( + LuaScripts::clear(), 3, $queue, $queue.':delayed', $queue.':reserved' + ); + } + /** * Delete a reserved job from the queue. * diff --git a/tests/Queue/QueueDatabaseQueueIntegrationTest.php b/tests/Queue/QueueDatabaseQueueIntegrationTest.php index 0dcef512bfb4..2b043fc92465 100644 --- a/tests/Queue/QueueDatabaseQueueIntegrationTest.php +++ b/tests/Queue/QueueDatabaseQueueIntegrationTest.php @@ -147,6 +147,35 @@ public function testPoppedJobsIncrementAttempts() $this->assertEquals(1, $popped_job->attempts(), 'The "attempts" attribute of the Job object was not updated by pop!'); } + /** + * Test that the queue can be cleared. + */ + public function testThatQueueCanBeCleared() + { + $this->connection() + ->table('jobs') + ->insert([[ + 'id' => 1, + 'queue' => $mock_queue_name = 'mock_queue_name', + 'payload' => 'mock_payload', + 'attempts' => 0, + 'reserved_at' => Carbon::now()->addDay()->getTimestamp(), + 'available_at' => Carbon::now()->subDay()->getTimestamp(), + 'created_at' => Carbon::now()->getTimestamp(), + ], [ + 'id' => 2, + 'queue' => $mock_queue_name, + 'payload' => 'mock_payload 2', + 'attempts' => 0, + 'reserved_at' => null, + 'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(), + 'created_at' => Carbon::now()->getTimestamp(), + ]]); + + $this->assertEquals(2, $this->queue->clear($mock_queue_name)); + $this->assertEquals(0, $this->queue->size()); + } + /** * Test that jobs that are not reserved and have an available_at value in the future, are not popped. */ diff --git a/tests/Queue/RedisQueueIntegrationTest.php b/tests/Queue/RedisQueueIntegrationTest.php index 3843c2630e01..3381e03fee4d 100644 --- a/tests/Queue/RedisQueueIntegrationTest.php +++ b/tests/Queue/RedisQueueIntegrationTest.php @@ -414,6 +414,25 @@ public function testDelete($driver) $this->assertNull($this->queue->pop()); } + /** + * @dataProvider redisDriverProvider + * + * @param string $driver + */ + public function testClear($driver) + { + $this->setQueue($driver); + + $job1 = new RedisQueueIntegrationTestJob(30); + $job2 = new RedisQueueIntegrationTestJob(40); + + $this->queue->push($job1); + $this->queue->push($job2); + + $this->assertEquals(2, $this->queue->clear(null)); + $this->assertEquals(0, $this->queue->size()); + } + /** * @dataProvider redisDriverProvider * From 3eb6e1885148c03d1e60153df6aa6e4b7af01fea Mon Sep 17 00:00:00 2001 From: Paras Malhotra Date: Mon, 14 Sep 2020 18:26:20 +0530 Subject: [PATCH 2/3] support database queues --- src/Illuminate/Queue/DatabaseQueue.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index f58bf375fabf..296c2357cb0d 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -3,13 +3,14 @@ namespace Illuminate\Queue; use Illuminate\Contracts\Queue\Queue as QueueContract; +use Illuminate\Contracts\Queue\ClearableQueue; use Illuminate\Database\Connection; use Illuminate\Queue\Jobs\DatabaseJob; use Illuminate\Queue\Jobs\DatabaseJobRecord; use Illuminate\Support\Carbon; use PDO; -class DatabaseQueue extends Queue implements QueueContract +class DatabaseQueue extends Queue implements QueueContract, ClearableQueue { /** * The database connection instance. From 452826b8f17ff07fcbba99523bb15b015b2d94f8 Mon Sep 17 00:00:00 2001 From: Paras Malhotra Date: Mon, 14 Sep 2020 18:53:07 +0530 Subject: [PATCH 3/3] fix style ci errors --- src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php | 2 +- src/Illuminate/Queue/Console/ClearCommand.php | 2 +- src/Illuminate/Queue/DatabaseQueue.php | 2 +- src/Illuminate/Queue/RedisQueue.php | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php b/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php index a74ae08ed455..bc17f9c1bc79 100755 --- a/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php +++ b/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php @@ -57,6 +57,7 @@ use Illuminate\Foundation\Console\ViewClearCommand; use Illuminate\Notifications\Console\NotificationTableCommand; use Illuminate\Queue\Console\BatchesTableCommand; +use Illuminate\Queue\Console\ClearCommand as QueueClearCommand; use Illuminate\Queue\Console\FailedTableCommand; use Illuminate\Queue\Console\FlushFailedCommand as FlushFailedQueueCommand; use Illuminate\Queue\Console\ForgetFailedCommand as ForgetFailedQueueCommand; @@ -65,7 +66,6 @@ use Illuminate\Queue\Console\RestartCommand as QueueRestartCommand; use Illuminate\Queue\Console\RetryBatchCommand as QueueRetryBatchCommand; use Illuminate\Queue\Console\RetryCommand as QueueRetryCommand; -use Illuminate\Queue\Console\ClearCommand as QueueClearCommand; use Illuminate\Queue\Console\TableCommand; use Illuminate\Queue\Console\WorkCommand as QueueWorkCommand; use Illuminate\Routing\Console\ControllerMakeCommand; diff --git a/src/Illuminate/Queue/Console/ClearCommand.php b/src/Illuminate/Queue/Console/ClearCommand.php index c092f77137b8..a5ff18a48607 100644 --- a/src/Illuminate/Queue/Console/ClearCommand.php +++ b/src/Illuminate/Queue/Console/ClearCommand.php @@ -47,7 +47,7 @@ public function handle() $queueName = $this->getQueue($connection); $queue = ($this->laravel['queue'])->connection($connection); - if($queue instanceof ClearableQueue) { + if ($queue instanceof ClearableQueue) { $count = $queue->clear($queueName); $this->line('Cleared '.$count.' jobs from the '.$queueName.' queue '); diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 296c2357cb0d..313c575774e4 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -2,8 +2,8 @@ namespace Illuminate\Queue; -use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Contracts\Queue\ClearableQueue; +use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Database\Connection; use Illuminate\Queue\Jobs\DatabaseJob; use Illuminate\Queue\Jobs\DatabaseJobRecord; diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index b4c134fbd0c0..593a5a3a9f42 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -2,8 +2,8 @@ namespace Illuminate\Queue; -use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Contracts\Queue\ClearableQueue; +use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Contracts\Redis\Factory as Redis; use Illuminate\Queue\Jobs\RedisJob; use Illuminate\Support\Str;