diff --git a/src/Illuminate/Contracts/Queue/ClearableQueue.php b/src/Illuminate/Contracts/Queue/ClearableQueue.php new file mode 100644 index 000000000000..2f5c1ddd464b --- /dev/null +++ b/src/Illuminate/Contracts/Queue/ClearableQueue.php @@ -0,0 +1,14 @@ + 'command.optimize', 'OptimizeClear' => 'command.optimize.clear', 'PackageDiscover' => 'command.package.discover', + 'QueueClear' => 'command.queue.clear', 'QueueFailed' => 'command.queue.failed', 'QueueFlush' => 'command.queue.flush', 'QueueForget' => 'command.queue.forget', @@ -712,6 +714,18 @@ protected function registerQueueWorkCommand() }); } + /** + * Register the command. + * + * @return void + */ + protected function registerQueueClearCommand() + { + $this->app->singleton('command.queue.clear', function () { + return new QueueClearCommand; + }); + } + /** * Register the command. * diff --git a/src/Illuminate/Queue/Console/ClearCommand.php b/src/Illuminate/Queue/Console/ClearCommand.php new file mode 100644 index 000000000000..a5ff18a48607 --- /dev/null +++ b/src/Illuminate/Queue/Console/ClearCommand.php @@ -0,0 +1,73 @@ +confirmToProceed()) { + return 1; + } + + $connection = $this->argument('connection') + ?: $this->laravel['config']['queue.default']; + + // We need to get the right queue for the connection which is set in the queue + // configuration file for the application. We will pull it based on the set + // connection being run for the queue operation currently being executed. + $queueName = $this->getQueue($connection); + $queue = ($this->laravel['queue'])->connection($connection); + + if ($queue instanceof ClearableQueue) { + $count = $queue->clear($queueName); + + $this->line('Cleared '.$count.' jobs from the '.$queueName.' queue '); + } else { + $this->line('Clearing queues is not supported on '.(new ReflectionClass($queue))->getShortName().' '); + } + + return 0; + } + + /** + * Get the queue name to clear. + * + * @param string $connection + * @return string + */ + protected function getQueue($connection) + { + return $this->option('queue') ?: $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); + } +} diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 5abb989f8551..313c575774e4 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -2,6 +2,7 @@ namespace Illuminate\Queue; +use Illuminate\Contracts\Queue\ClearableQueue; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Database\Connection; use Illuminate\Queue\Jobs\DatabaseJob; @@ -9,7 +10,7 @@ use Illuminate\Support\Carbon; use PDO; -class DatabaseQueue extends Queue implements QueueContract +class DatabaseQueue extends Queue implements QueueContract, ClearableQueue { /** * The database connection instance. @@ -303,6 +304,19 @@ protected function markJobAsReserved($job) return $job; } + /** + * Clear all jobs from the queue. + * + * @param string $queue + * @return int + */ + public function clear($queue) + { + return $this->database->table($this->table) + ->where('queue', $this->getQueue($queue)) + ->delete(); + } + /** * Delete a reserved job from the queue. * diff --git a/src/Illuminate/Queue/LuaScripts.php b/src/Illuminate/Queue/LuaScripts.php index c031140cf732..447c759c4d58 100644 --- a/src/Illuminate/Queue/LuaScripts.php +++ b/src/Illuminate/Queue/LuaScripts.php @@ -20,6 +20,24 @@ public static function size() LUA; } + /** + * Get the Lua script for clearing the queue. + * + * KEYS[1] - The name of the primary queue + * KEYS[2] - The name of the "delayed" queue + * KEYS[3] - The name of the "reserved" queue + * + * @return string + */ + public static function clear() + { + return <<<'LUA' +local size = redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3]) +redis.call('del', KEYS[1], KEYS[2], KEYS[3]) +return size +LUA; + } + /** * Get the Lua script for pushing jobs onto the queue. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index a72593f80000..593a5a3a9f42 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -2,12 +2,13 @@ namespace Illuminate\Queue; +use Illuminate\Contracts\Queue\ClearableQueue; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Contracts\Redis\Factory as Redis; use Illuminate\Queue\Jobs\RedisJob; use Illuminate\Support\Str; -class RedisQueue extends Queue implements QueueContract +class RedisQueue extends Queue implements QueueContract, ClearableQueue { /** * The Redis factory implementation. @@ -256,6 +257,21 @@ protected function retrieveNextJob($queue, $block = true) return [$job, $reserved]; } + /** + * Clear all jobs from the queue. + * + * @param string $queue + * @return int + */ + public function clear($queue) + { + $queue = $this->getQueue($queue); + + return $this->getConnection()->eval( + LuaScripts::clear(), 3, $queue, $queue.':delayed', $queue.':reserved' + ); + } + /** * Delete a reserved job from the queue. * diff --git a/tests/Queue/QueueDatabaseQueueIntegrationTest.php b/tests/Queue/QueueDatabaseQueueIntegrationTest.php index 0dcef512bfb4..2b043fc92465 100644 --- a/tests/Queue/QueueDatabaseQueueIntegrationTest.php +++ b/tests/Queue/QueueDatabaseQueueIntegrationTest.php @@ -147,6 +147,35 @@ public function testPoppedJobsIncrementAttempts() $this->assertEquals(1, $popped_job->attempts(), 'The "attempts" attribute of the Job object was not updated by pop!'); } + /** + * Test that the queue can be cleared. + */ + public function testThatQueueCanBeCleared() + { + $this->connection() + ->table('jobs') + ->insert([[ + 'id' => 1, + 'queue' => $mock_queue_name = 'mock_queue_name', + 'payload' => 'mock_payload', + 'attempts' => 0, + 'reserved_at' => Carbon::now()->addDay()->getTimestamp(), + 'available_at' => Carbon::now()->subDay()->getTimestamp(), + 'created_at' => Carbon::now()->getTimestamp(), + ], [ + 'id' => 2, + 'queue' => $mock_queue_name, + 'payload' => 'mock_payload 2', + 'attempts' => 0, + 'reserved_at' => null, + 'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(), + 'created_at' => Carbon::now()->getTimestamp(), + ]]); + + $this->assertEquals(2, $this->queue->clear($mock_queue_name)); + $this->assertEquals(0, $this->queue->size()); + } + /** * Test that jobs that are not reserved and have an available_at value in the future, are not popped. */ diff --git a/tests/Queue/RedisQueueIntegrationTest.php b/tests/Queue/RedisQueueIntegrationTest.php index 3843c2630e01..3381e03fee4d 100644 --- a/tests/Queue/RedisQueueIntegrationTest.php +++ b/tests/Queue/RedisQueueIntegrationTest.php @@ -414,6 +414,25 @@ public function testDelete($driver) $this->assertNull($this->queue->pop()); } + /** + * @dataProvider redisDriverProvider + * + * @param string $driver + */ + public function testClear($driver) + { + $this->setQueue($driver); + + $job1 = new RedisQueueIntegrationTestJob(30); + $job2 = new RedisQueueIntegrationTestJob(40); + + $this->queue->push($job1); + $this->queue->push($job2); + + $this->assertEquals(2, $this->queue->clear(null)); + $this->assertEquals(0, $this->queue->size()); + } + /** * @dataProvider redisDriverProvider *