diff --git a/config/queue.php b/config/queue.php index 76b5da633..43c3aa254 100644 --- a/config/queue.php +++ b/config/queue.php @@ -15,7 +15,7 @@ | Define the queue provider used in the application. | */ - 'default' => environment( 'QUEUE_CONNECTION', 'wordpress' ), + 'default' => environment( 'QUEUE_CONNECTION', 'wordpress' ), /* |-------------------------------------------------------------------------- @@ -25,7 +25,40 @@ | The amount of items handled in one run of the queue. | */ - 'batch_size' => environment( 'QUEUE_BATCH_SIZE', 100 ), + 'batch_size' => environment( 'QUEUE_BATCH_SIZE', 5 ), + + /* + |-------------------------------------------------------------------------- + | Maximum number of concurrent batches + |-------------------------------------------------------------------------- + | + | The maximum number of batches that can be run concurrently. For example, + | if 1000 queue jobs are dispatched and this is set to 5 with a batch size + | of 100, then 5 batches of 100 will be run concurrently and take two runs + | of the queue to complete. + | + */ + 'max_concurrent_batches' => environment( 'QUEUE_MAX_CONCURRENT_BATCHES', 1 ), + + /* + |-------------------------------------------------------------------------- + | Delete failed or processed queue items after a set time + |-------------------------------------------------------------------------- + | + | Delete failed or processed queue items after a set time in seconds. + | + */ + 'delete_after' => environment( 'QUEUE_DELETE_AFTER', 60 * 60 * 24 * 7 ), + + /* + |-------------------------------------------------------------------------- + | Enable the Queue Admin Interface + |-------------------------------------------------------------------------- + | + | Enable the queue admin interface to display queue jobs. + | + */ + 'enable_admin' => environment( 'QUEUE_ENABLE_ADMIN', true ), /* |-------------------------------------------------------------------------- @@ -35,7 +68,7 @@ | Control the configuration for the queue providers. | */ - 'wordpress' => [ + 'wordpress' => [ // Delay between queue runs in seconds. 'delay' => environment( 'QUEUE_DELAY', 0 ), ], diff --git a/src/mantle/application/class-app-service-provider.php b/src/mantle/application/class-app-service-provider.php index 2d409fd68..0831c1821 100644 --- a/src/mantle/application/class-app-service-provider.php +++ b/src/mantle/application/class-app-service-provider.php @@ -42,7 +42,7 @@ public function __construct( Application $app ) { */ protected function boot_scheduler() { $this->app->singleton( - Schedule::class, + 'scheduler', fn ( $app ) => tap( new Schedule( $app ), fn ( Schedule $schedule ) => $this->schedule( $schedule ), diff --git a/src/mantle/application/class-application.php b/src/mantle/application/class-application.php index a7ee051da..abba8cb3a 100644 --- a/src/mantle/application/class-application.php +++ b/src/mantle/application/class-application.php @@ -401,20 +401,23 @@ protected function register_base_bindings() { */ protected function register_core_aliases() { $core_aliases = [ - 'app' => [ static::class, \Mantle\Contracts\Application::class ], - 'config' => [ \Mantle\Config\Repository::class, \Mantle\Contracts\Config\Repository::class ], - 'events' => [ \Mantle\Events\Dispatcher::class, \Mantle\Contracts\Events\Dispatcher::class ], - 'files' => [ \Mantle\Filesystem\Filesystem::class ], - 'filesystem' => [ \Mantle\Filesystem\Filesystem_Manager::class, \Mantle\Contracts\Filesystem\Filesystem_Manager::class ], - 'log' => [ \Mantle\Log\Log_Manager::class, \Psr\Log\LoggerInterface::class ], - 'queue' => [ \Mantle\Queue\Queue_Manager::class, \Mantle\Contracts\Queue\Queue_Manager::class ], - 'redirect' => [ \Mantle\Http\Routing\Redirector::class ], - 'request' => [ \Mantle\Http\Request::class, \Symfony\Component\HttpFoundation\Request::class ], - 'router' => [ \Mantle\Http\Routing\Router::class, \Mantle\Contracts\Http\Routing\Router::class ], - 'router.entity' => [ \Mantle\Http\Routing\Entity_Router::class, \Mantle\Contracts\Http\Routing\Entity_Router::class ], - 'url' => [ \Mantle\Http\Routing\Url_Generator::class, \Mantle\Contracts\Http\Routing\Url_Generator::class ], - 'view.loader' => [ \Mantle\Http\View\View_Finder::class, \Mantle\Contracts\Http\View\View_Finder::class ], - 'view' => [ \Mantle\Http\View\Factory::class, \Mantle\Contracts\Http\View\Factory::class ], + 'app' => [ static::class, \Mantle\Contracts\Application::class ], + 'config' => [ \Mantle\Config\Repository::class, \Mantle\Contracts\Config\Repository::class ], + 'events' => [ \Mantle\Events\Dispatcher::class, \Mantle\Contracts\Events\Dispatcher::class ], + 'files' => [ \Mantle\Filesystem\Filesystem::class ], + 'filesystem' => [ \Mantle\Filesystem\Filesystem_Manager::class, \Mantle\Contracts\Filesystem\Filesystem_Manager::class ], + 'log' => [ \Mantle\Log\Log_Manager::class, \Psr\Log\LoggerInterface::class ], + 'queue' => [ \Mantle\Queue\Queue_Manager::class, \Mantle\Contracts\Queue\Queue_Manager::class ], + 'queue.worker' => [ \Mantle\Queue\Worker::class ], + 'queue.dispatcher' => [ \Mantle\Queue\Dispatcher::class, \Mantle\Contracts\Queue\Dispatcher::class ], + 'redirect' => [ \Mantle\Http\Routing\Redirector::class ], + 'request' => [ \Mantle\Http\Request::class, \Symfony\Component\HttpFoundation\Request::class ], + 'router' => [ \Mantle\Http\Routing\Router::class, \Mantle\Contracts\Http\Routing\Router::class ], + 'router.entity' => [ \Mantle\Http\Routing\Entity_Router::class, \Mantle\Contracts\Http\Routing\Entity_Router::class ], + 'scheduler' => [ \Mantle\Scheduling\Schedule::class ], + 'url' => [ \Mantle\Http\Routing\Url_Generator::class, \Mantle\Contracts\Http\Routing\Url_Generator::class ], + 'view.loader' => [ \Mantle\Http\View\View_Finder::class, \Mantle\Contracts\Http\View\View_Finder::class ], + 'view' => [ \Mantle\Http\View\Factory::class, \Mantle\Contracts\Http\View\Factory::class ], ]; foreach ( $core_aliases as $key => $aliases ) { diff --git a/src/mantle/contracts/queue/interface-provider.php b/src/mantle/contracts/queue/interface-provider.php index f59766e26..ea0f81f5b 100644 --- a/src/mantle/contracts/queue/interface-provider.php +++ b/src/mantle/contracts/queue/interface-provider.php @@ -19,7 +19,7 @@ interface Provider { * @param mixed $job Job instance. * @return bool */ - public function push( $job ); + public function push( $job ): bool; /** * Get the next set of jobs in the queue. @@ -37,5 +37,13 @@ public function pop( string $queue = null, int $count = 1 ): Collection; * @param string $queue Queue to compare against. * @return bool */ - public function in_queue( $job, string $queue = null ): bool; + public function in_queue( mixed $job, string $queue = null ): bool; + + /** + * Retrieve the number of pending jobs in the queue. + * + * @param string $queue Queue name, optional. + * @return int + */ + public function pending_count( string $queue = null ): int; } diff --git a/src/mantle/database/model/class-model.php b/src/mantle/database/model/class-model.php index 470b714c6..8c4126808 100644 --- a/src/mantle/database/model/class-model.php +++ b/src/mantle/database/model/class-model.php @@ -175,12 +175,14 @@ public function refresh() { } $instance = static::find( $this->get( 'id' ) ); + if ( ! $instance ) { return null; } $this->exists = true; $this->set_raw_attributes( $instance->get_raw_attributes() ); + return $this; } diff --git a/src/mantle/database/query/class-builder.php b/src/mantle/database/query/class-builder.php index 5775ba38e..14b088be7 100644 --- a/src/mantle/database/query/class-builder.php +++ b/src/mantle/database/query/class-builder.php @@ -859,8 +859,8 @@ public function chunk_by_id( int $count, callable $callback, string $attribute = /** * Execute a callback over each item while chunking. * - * @param callable(\Mantle\Support\Collection): mixed $callback Callback to run on each chunk. - * @param int $count Number of items to chunk by. + * @param callable(TModel): mixed $callback Callback to run on each chunk. + * @param int $count Number of items to chunk by. * @return boolean */ public function each( callable $callback, int $count = 100 ) { @@ -878,9 +878,9 @@ public function each( callable $callback, int $count = 100 ) { /** * Execute a callback over each item while chunking by ID. * - * @param callable(\Mantle\Support\Collection): mixed $callback Callback to run on each chunk. - * @param int $count Number of items to chunk by. - * @param string $attribute Attribute to chunk by. + * @param callable(TModel): mixed $callback Callback to run on each chunk. + * @param int $count Number of items to chunk by. + * @param string $attribute Attribute to chunk by. * @return boolean */ public function each_by_id( callable $callback, int $count = 100, string $attribute = 'id' ) { diff --git a/src/mantle/database/query/class-post-query-builder.php b/src/mantle/database/query/class-post-query-builder.php index 995f4dbdf..7f683084b 100644 --- a/src/mantle/database/query/class-post-query-builder.php +++ b/src/mantle/database/query/class-post-query-builder.php @@ -26,7 +26,7 @@ * @method \Mantle\Database\Query\Post_Query_Builder whereId( int $id ) * @method \Mantle\Database\Query\Post_Query_Builder whereName( string $name ) * @method \Mantle\Database\Query\Post_Query_Builder whereSlug( string $slug ) - * @method \Mantle\Database\Query\Post_Query_Builder whereStatus( string $status ) + * @method \Mantle\Database\Query\Post_Query_Builder whereStatus( string[]|string $status ) * @method \Mantle\Database\Query\Post_Query_Builder whereTitle( string $title ) * @method \Mantle\Database\Query\Post_Query_Builder whereType( string $type ) */ @@ -49,6 +49,7 @@ class Post_Query_Builder extends Builder { 'post_author' => 'author', 'post_name' => 'name', 'slug' => 'name', + 'status' => 'post_status', ]; /** diff --git a/src/mantle/queue/autoload.php b/src/mantle/queue/autoload.php index e20ddd659..a62293e9f 100644 --- a/src/mantle/queue/autoload.php +++ b/src/mantle/queue/autoload.php @@ -11,10 +11,15 @@ /** * Dispatch a job to the queue. * + * @template TJob of \Mantle\Contracts\Queue\Job|\Closure + * * @param \Mantle\Contracts\Queue\Job|\Closure $job Job instance. - * @return Pending_Dispatch|Pending_Closure_Dispatch + * @return Pending_Closure_Dispatch|Pending_Dispatch + * + * @phpstan-param TJob|\Mantle\Contracts\Queue\Job|\Closure|\Closure $job Job instance. + * @phpstan-return (TJob is \Closure ? Pending_Closure_Dispatch : Pending_Dispatch) */ - function dispatch( $job ): Pending_Dispatch { + function dispatch( $job ): Pending_Dispatch|Pending_Closure_Dispatch { return $job instanceof \Closure ? new Pending_Closure_Dispatch( Closure_Job::create( $job ) ) : new Pending_Dispatch( $job ); diff --git a/src/mantle/queue/class-closure-job.php b/src/mantle/queue/class-closure-job.php index cb2c661dd..417bb7db0 100644 --- a/src/mantle/queue/class-closure-job.php +++ b/src/mantle/queue/class-closure-job.php @@ -2,36 +2,39 @@ /** * Closure_Job class file * + * phpcs:disable Squiz.Commenting.VariableComment.Missing + * * @package Mantle */ namespace Mantle\Queue; use Closure; +use DateTimeInterface; use Laravel\SerializableClosure\SerializableClosure; use Mantle\Contracts\Queue\Can_Queue; use ReflectionFunction; use Throwable; /** - * Abstract Queue Job + * Closure Job * - * To be extended by provider-specific queue job classes. + * Storage of the closure-based queue job. */ class Closure_Job implements Can_Queue { /** - * Serializable closure instance. + * The delay before the job will be run. * - * @var SerializableClosure + * @var int|DateTimeInterface */ - public SerializableClosure $closure; + public int|DateTimeInterface $delay; /** * The callbacks that should be run on failure. * * @var array */ - public $failure_callbacks = []; + public array $failure_callbacks = []; /** * Create a new job instance. @@ -48,8 +51,7 @@ public static function create( Closure $closure ): Closure_Job { * * @param SerializableClosure $closure Serialized closure to wrap. */ - public function __construct( SerializableClosure $closure ) { - $this->closure = $closure; + public function __construct( public SerializableClosure $closure ) { } /** @@ -61,6 +63,18 @@ public function handle() { $callback(); } + /** + * Set the delay before the job will be run. + * + * @param DateTimeInterface|int $delay Delay in seconds or DateTime instance. + * @return static + */ + public function delay( DateTimeInterface|int $delay ) { + $this->delay = $delay; + + return $this; + } + /** * Add a callback to be executed if the job fails. * diff --git a/src/mantle/queue/class-dispatcher.php b/src/mantle/queue/class-dispatcher.php index 4c9fd425d..e0a4e1452 100644 --- a/src/mantle/queue/class-dispatcher.php +++ b/src/mantle/queue/class-dispatcher.php @@ -11,6 +11,7 @@ use Mantle\Contracts\Container; use Mantle\Contracts\Queue\Can_Queue; use Mantle\Contracts\Queue\Queue_Manager; +use Mantle\Queue\Events\Job_Queued; /** * Queue Dispatcher @@ -18,47 +19,50 @@ * Executes jobs from the queue. */ class Dispatcher { - /** - * Container instance. - * - * @var Container - */ - protected $container; - /** * Constructor. * * @param Container $container Container instance. */ - public function __construct( Container $container ) { - $this->container = $container; - } + public function __construct( protected Container $container ) {} /** * Dispatch the job to the queue. * * @param mixed $job Job instance. - * @return mixed + * @return void */ - public function dispatch( $job ) { + public function dispatch( mixed $job ): void { if ( ! $this->should_command_be_queued( $job ) ) { - return $this->dispatch_now( $job ); + $this->dispatch_now( $job ); + + return; } - $manager = $this->container->make( Queue_Manager::class ); + /** + * Provider instance. + * + * @var \Mantle\Contracts\Queue\Provider + */ + $provider = $this->container->make( Queue_Manager::class )->get_provider(); // Send the job to the queue. - $manager->get_provider()->push( $job ); + $provider->push( $job ); + + // Dispatch the job queued event. + $this->container['events']->dispatch( + new Job_Queued( $provider, $job ), + ); } /** * Dispatch a job in the current process. * * @param mixed $job Job instance. - * @return mixed + * @return void */ - public function dispatch_now( $job ) { - return $this->container->call( [ $job, 'handle' ] ); + public function dispatch_now( mixed $job ): void { + $this->container->call( [ $job, 'handle' ] ); } /** diff --git a/src/mantle/queue/class-pending-dispatch.php b/src/mantle/queue/class-pending-dispatch.php index ea015541a..7479d45d5 100644 --- a/src/mantle/queue/class-pending-dispatch.php +++ b/src/mantle/queue/class-pending-dispatch.php @@ -7,6 +7,7 @@ namespace Mantle\Queue; +use DateTimeInterface; use Mantle\Container\Container; use Mantle\Contracts\Queue\Dispatcher; use Mantle\Contracts\Queue\Job; @@ -16,21 +17,12 @@ * Allow jobs to be added to the queue with ease. */ class Pending_Dispatch { - /** - * Job instance. - * - * @var Closure_Job|Job - */ - protected Closure_Job|Job $job; // phpcs:ignore Squiz.Commenting.VariableComment.Missing - /** * Constructor. * * @param Job|Closure_Job $job Job instance. */ - public function __construct( $job ) { - $this->job = $job; - } + public function __construct( protected Job|Closure_Job $job ) {} /** * Add a dispatch to a specific queue. @@ -55,12 +47,12 @@ public function on_queue( string $queue ): Pending_Dispatch { * * @throws RuntimeException If the job does not support queueing. * - * @param int $delay Delay in seconds. + * @param DateTimeInterface|int $delay Delay in seconds or DateTime instance. * @return static */ - public function delay( int $delay ): Pending_Dispatch { + public function delay( DateTimeInterface|int $delay ): Pending_Dispatch { if ( ! method_exists( $this->job, 'delay' ) ) { - throw new RuntimeException( 'Job does not support queueing.' ); + throw new RuntimeException( $this->job::class . ' does not support delayed queueing.' ); } $this->job->delay( $delay ); diff --git a/src/mantle/queue/class-queue-manager.php b/src/mantle/queue/class-queue-manager.php index d974f323e..ce7b647de 100644 --- a/src/mantle/queue/class-queue-manager.php +++ b/src/mantle/queue/class-queue-manager.php @@ -16,13 +16,6 @@ * Queue Manager */ class Queue_Manager implements Queue_Manager_Contract { - /** - * Container instance. - * - * @var Container - */ - protected Container $container; - /** * Provider class map. * @@ -42,9 +35,7 @@ class Queue_Manager implements Queue_Manager_Contract { * * @param Container $container Container instance. */ - public function __construct( Container $container ) { - $this->container = $container; - } + public function __construct( protected Container $container ) {} /** * Get a queue provider instance. @@ -71,11 +62,9 @@ public function get_provider( string $name = null ): Provider { * * @throws InvalidArgumentException Thrown invalid provider. */ - public function add_provider( string $name, $provider ) { + public function add_provider( string $name, string|Provider $provider ) { if ( is_string( $provider ) && ( ! class_exists( $provider ) || ! in_array( Provider::class, class_implements( $provider ), true ) ) ) { throw new InvalidArgumentException( "Provider does not implement Provider contract: [$provider]" ); - } elseif ( is_object( $provider ) && ! ( $provider instanceof Provider ) ) { // @phpstan-ignore-line is always false - throw new InvalidArgumentException( "Provider does not implement Provider contract: [$provider::class]" ); } $this->providers[ $name ] = $provider; @@ -86,11 +75,11 @@ public function add_provider( string $name, $provider ) { /** * Get the default queue driver in queue. * - * @return string|null + * @return string */ - protected function get_default_driver(): ?string { + protected function get_default_driver(): string { if ( ! isset( $this->container['config'] ) ) { - return null; + return 'wordpress'; } return $this->container['config']['queue.default'] ?? 'wordpress'; @@ -117,7 +106,7 @@ protected function resolve( string $provider ): Provider { } if ( ! ( $this->connections[ $provider ] instanceof Provider ) ) { - throw new InvalidArgumentException( "Unknown provider instance resolved for [$provider]: " . get_class( $this->connections[ $provider ] ) ); + throw new InvalidArgumentException( "Unknown provider instance resolved for [$provider]: " . $this->connections[ $provider ]::class ); } return $this->connections[ $provider ]; diff --git a/src/mantle/queue/class-queue-service-provider.php b/src/mantle/queue/class-queue-service-provider.php index b4818b6a9..10aace87b 100644 --- a/src/mantle/queue/class-queue-service-provider.php +++ b/src/mantle/queue/class-queue-service-provider.php @@ -7,11 +7,10 @@ namespace Mantle\Queue; -use Mantle\Contracts\Queue\Dispatcher as Dispatcher_Contract; use Mantle\Contracts\Queue\Queue_Manager as Queue_Manager_Contract; +use Mantle\Queue\Console\Cleanup_Jobs_Command; use Mantle\Queue\Console\Run_Command; use Mantle\Queue\Dispatcher; -use Mantle\Queue\Events\Run_Complete; use Mantle\Queue\Queue_Manager; use Mantle\Queue\Worker; use Mantle\Support\Service_Provider; @@ -22,37 +21,35 @@ * Queue Service Provider */ class Queue_Service_Provider extends Service_Provider { - /** * Register the service provider. */ public function register() { - $this->app->singleton( + $this->app->singleton_if( 'queue', - function ( $app ) { - // Register the Queue Manager with the supported providers when invoked. - return tap( - new Queue_Manager( $app ), - fn ( $manager ) => $this->register_providers( $manager ), - ); - } + fn ( $app ) => tap( + // Register the Queue Manager with the supported providers when resolved. + new Queue_Manager( $app ), + fn ( Queue_Manager $manager ) => $this->register_providers( $manager ), + ), ); - $this->app->singleton( + $this->app->singleton_if( 'queue.worker', - function ( $app ) { - return new Worker( $app['queue'], $app['events'] ); - } + fn ( $app ) => new Worker( $app['queue'], $app['events'] ), ); $this->app->singleton_if( - Dispatcher_Contract::class, - function( $app ) { - return new Dispatcher( $app ); - } + 'queue.dispatcher', + fn ( $app ) => new Dispatcher( $app ), ); + // Register queue console commands. + $this->add_command( Cleanup_Jobs_Command::class ); $this->add_command( Run_Command::class ); + + // Register the queue service providers. + $this->app->register( Providers\WordPress\Service_Provider::class ); } /** @@ -65,32 +62,13 @@ public function boot() { /** * Register Queue Providers * - * @param Queue_Manager_Contract $manager Queue Manager. - */ - protected function register_providers( Queue_Manager_Contract $manager ) { - $this->register_wp_cron_provider( $manager ); - } - - /** - * Register the WordPress Cron Queue Provider + * Fire an event to allow other plugins to register queue providers. * * @param Queue_Manager_Contract $manager Queue Manager. */ - protected function register_wp_cron_provider( Queue_Manager_Contract $manager ) { - $manager->add_provider( 'wordpress', Providers\WordPress\Provider::class ); - - // Setup the WordPress cron scheduler. - \add_action( 'init', [ Providers\WordPress\Provider::class, 'on_init' ] ); - \add_action( Providers\WordPress\Scheduler::EVENT, [ Providers\WordPress\Scheduler::class, 'on_queue_run' ] ); - - // Add the event listener to schedule the next cron run. - $this->app['events']->listen( - Run_Complete::class, - function( Events\Run_Complete $event ) { - if ( $event->provider instanceof Providers\WordPress\Provider ) { - Providers\WordPress\Scheduler::schedule_next_run( $event->queue ); - } - } + protected function register_providers( Queue_Manager_Contract $manager ): void { + $this->app['events']->dispatch( + new Events\Providers_Registered( $manager ), ); } } diff --git a/src/mantle/queue/class-queue-worker-job.php b/src/mantle/queue/class-queue-worker-job.php index d9df08f2f..e3dbf9b77 100644 --- a/src/mantle/queue/class-queue-worker-job.php +++ b/src/mantle/queue/class-queue-worker-job.php @@ -25,14 +25,21 @@ abstract class Queue_Worker_Job { /** * Fire the queue job. */ - abstract public function fire(); + abstract public function fire(): void; /** * Get the queue job ID. * * @return mixed */ - abstract public function get_id(); + abstract public function get_id(): mixed; + + /** + * Retrieve the stored job. + * + * @return mixed + */ + abstract public function get_job(): mixed; /** * Handle a failed job. @@ -40,14 +47,28 @@ abstract public function get_id(); * @param Throwable $e * @return void */ - abstract public function failed( Throwable $e ); + abstract public function failed( Throwable $e ): void; + + /** + * Handle a completed job. + * + * @return void + */ + abstract public function completed(): void; + + /** + * Retry a job with a specified delay. + * + * @param int $delay Delay in seconds. + */ + abstract public function retry( int $delay = 0 ): void; /** * Delete a job from the queue. * * @return void */ - abstract public function delete(); + abstract public function delete(): void; /** * Check if the job has failed. @@ -57,4 +78,22 @@ abstract public function delete(); public function has_failed(): bool { return $this->failed; } + + /** + * Check if the job can be retried. + * + * @return bool + */ + public function can_retry(): bool { + return $this->has_failed() && ( $this->get_job()->retry ?? false ); + } + + /** + * Retrieve the retry backoff. + * + * @return int The retry backoff in seconds. + */ + public function get_retry_backoff(): int { + return $this->get_job()->retry_backoff ?? 0; + } } diff --git a/src/mantle/queue/class-worker.php b/src/mantle/queue/class-worker.php index 29a327ab6..3dad476f7 100644 --- a/src/mantle/queue/class-worker.php +++ b/src/mantle/queue/class-worker.php @@ -7,100 +7,74 @@ namespace Mantle\Queue; -use Closure; use Mantle\Contracts\Events\Dispatcher; -use Mantle\Contracts\Queue\Provider; use Mantle\Contracts\Queue\Queue_Manager; use Mantle\Queue\Events\Job_Failed; use Mantle\Queue\Events\Job_Processed; use Mantle\Queue\Events\Job_Processing; use Mantle\Queue\Events\Run_Complete; use Mantle\Queue\Events\Run_Start; -use RuntimeException; use Throwable; /** * Queue Worker */ class Worker { - /** - * Queue Manager - * - * @var Queue_Manager - */ - protected $manager; - - /** - * Events Dispatcher. - * - * @var Dispatcher - */ - protected $events; - /** * Constructor. * * @param Queue_Manager $manager Manager instance. * @param Dispatcher $events Events dispatcher. */ - public function __construct( Queue_Manager $manager, Dispatcher $events ) { - $this->manager = $manager; - $this->events = $events; + public function __construct( + protected Queue_Manager $manager, + protected Dispatcher $events, + ) { } /** * Run a batch of queue items. * - * @todo Add failed job re-running and retrying. - * * @param int $size Size of the batch to run. * @param string $queue Queue name. */ - public function run( int $size, string $queue = null ) { + public function run( int $size, string $queue = null ): void { $queue ??= 'default'; $provider = $this->manager->get_provider(); $jobs = $provider->pop( $queue, $size ); $this->events->dispatch( new Run_Start( $provider, $queue, $jobs ) ); - $jobs->each( - function( $job ) use ( $provider ) { - $this->events->dispatch( new Job_Processing( $provider, $job ) ); - - try { - if ( $job instanceof Closure || is_callable( $job ) ) { - $job(); - } else { - $job->fire(); - } - - $this->events->dispatch( new Job_Processed( $provider, $job ) ); - } catch ( Throwable $e ) { - $this->handle_job_exception( $provider, $job, $e ); - - $this->events->dispatch( new Job_Failed( $provider, $job, $e ) ); - } finally { - if ( ! $job instanceof Closure && ! $job->has_failed() ) { - $job->delete(); - } - } - } - ); + $jobs->each( [ $this, 'run_single' ] ); $this->events->dispatch( new Run_Complete( $provider, $queue, $jobs ) ); } /** - * Handle job exceptions. + * Run a single queue job. * - * @todo Add add job retrying. - * - * @param Provider $provider Queue provider. - * @param mixed $job Queue job. - * @param Throwable $e Exception thrown. + * @param Queue_Worker_Job $job Job to run. * @return void */ - protected function handle_job_exception( Provider $provider, $job, Throwable $e ) { - $job->failed( $e ); + public function run_single( Queue_Worker_Job $job ): void { + $provider = $this->manager->get_provider(); + + $this->events->dispatch( new Job_Processing( $provider, $job ) ); + + try { + $job->fire(); + + $this->events->dispatch( new Job_Processed( $provider, $job ) ); + } catch ( Throwable $e ) { + $job->failed( $e ); + + $this->events->dispatch( new Job_Failed( $provider, $job, $e ) ); + } finally { + if ( ! $job->has_failed() ) { + $job->completed(); + } elseif ( $job->can_retry() ) { + $job->retry( $job->get_retry_backoff() ); + } + } } } diff --git a/src/mantle/queue/console/class-cleanup-jobs-command.php b/src/mantle/queue/console/class-cleanup-jobs-command.php new file mode 100644 index 000000000..f7d4cb2c4 --- /dev/null +++ b/src/mantle/queue/console/class-cleanup-jobs-command.php @@ -0,0 +1,53 @@ +whereStatus( [ Post_Status::RUNNING->value, Post_Status::FAILED->value, Post_Status::COMPLETED->value ] ) + ->olderThan( now()->subSeconds( (int) $this->container['config']->get( 'queue.delete_after', 60 ) ) ) + ->each_by_id( + function ( Queue_Record $record ) use ( &$count ) { + if ( ! $record->is_locked() ) { + $record->delete( true ); + + $count++; + } + }, + 100, + ); + + $this->info( sprintf( 'Deleted %d jobs.', $count ) ); + } +} diff --git a/src/mantle/queue/events/class-job-queued.php b/src/mantle/queue/events/class-job-queued.php new file mode 100644 index 000000000..4db710f5c --- /dev/null +++ b/src/mantle/queue/events/class-job-queued.php @@ -0,0 +1,23 @@ + $this->render_action( $job_id ), // phpcs:ignore WordPress.Security.NonceVerification.Recommended + ! empty( $job_id ) => $this->render_single_job( $job_id ), // phpcs:ignore WordPress.Security.NonceVerification.Recommended + default => $this->render_table(), + }; + } + + /** + * Render a single job view. + * + * @param int $job_id The job ID. + */ + protected function render_single_job( int $job_id ): void { + $job = Queue_Record::find( $job_id ); + + if ( empty( $job ) ) { + wp_die( esc_html__( 'Invalid job ID.', 'mantle' ) ); + } + + include __DIR__ . '/template/single.php'; + } + + /** + * Handle an action (retry/delete). + * + * @param int $job_id The job ID. + */ + protected function render_action( int $job_id ): void { + if ( + empty( $_GET['_wpnonce'] ) + || ! wp_verify_nonce( sanitize_text_field( wp_unslash( $_GET['_wpnonce'] ) ), 'queue-job-action-' . $job_id ) + ) { + wp_die( 'Invalid nonce.' ); + } + + $action = sanitize_text_field( wp_unslash( $_GET['action'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended + $record = Queue_Record::find( $job_id ); + + $message = ''; + $message_link = ''; + + if ( empty( $record ) ) { + wp_die( esc_html__( 'Invalid job ID.', 'mantle' ) ); + } + + $return_link = sprintf( + '%s', + esc_url( + add_query_arg( + [ + 'action' => null, + 'job' => null, + ], + ), + ), + esc_html__( 'Return to queue jobs.', 'mantle' ), + ); + + if ( 'run' === $action ) { + if ( Post_Status::PENDING->value !== $record->status ) { + wp_die( esc_html__( 'Job is not in a pending state.', 'mantle' ) . ' ' . $return_link ); // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped + } + + // Check if the job is locked. + if ( $record->is_locked() ) { + wp_die( esc_html__( 'Job is currently locked.', 'mantle' ) . ' ' . $return_link ); // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped + } + + $job = new Queue_Worker_Job( $record ); + + // Lock the job before it is run. + $record->set_lock_until( $job->get_job()->timeout ?? 600 ); + + // Run the queue job through the queue worker and refresh the record. + app( Worker::class )->run_single( $job ); + + $message = match ( $record->refresh()?->status ) { + Post_Status::FAILED->value => esc_html__( 'Job has failed.', 'mantle' ), + Post_Status::COMPLETED->value => esc_html__( 'Job has completed successfully.', 'mantle' ), + default => esc_html__( 'Job has been run but the status is unknown.', 'mantle' ), + }; + + $message_status = Post_Status::FAILED->value === $record->status ? 'error' : 'success'; + + $message_link = sprintf( + '%s', + esc_url( + add_query_arg( + [ + 'action' => null, + 'filter' => null, + 'job' => $record->id(), + '_wpnonce' => null, + ], + ), + ), + esc_html__( 'View Details', 'mantle' ), + ); + } elseif ( 'retry' === $action ) { + if ( Post_Status::FAILED->value !== $record->status ) { + wp_die( esc_html__( 'Job is not in a failed state and cannot be retried.', 'mantle' ) ); + } + + ( new Queue_Worker_Job( $record ) )->retry(); + + $message = esc_html__( 'Job has been scheduled to be retried.', 'mantle' ); + } elseif ( 'delete' === $action ) { + $record->delete( true ); + + $message = esc_html__( 'Job has been deleted.', 'mantle' ); + } + + if ( ! empty( $message ) ) { + printf( + '

%s

', + esc_attr( $message_status ?? 'success' ), + esc_html( $message ) . " {$message_link}", // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped + ); + } + + $this->render_table(); + } + + /** + * Render the queue table. + */ + protected function render_table(): void { + $table = new Queue_Jobs_Table(); + + $table->prepare_items(); + + include __DIR__ . '/template/table.php'; + } +} diff --git a/src/mantle/queue/providers/wordpress/admin/class-queue-jobs-table.php b/src/mantle/queue/providers/wordpress/admin/class-queue-jobs-table.php new file mode 100644 index 000000000..bd93cf068 --- /dev/null +++ b/src/mantle/queue/providers/wordpress/admin/class-queue-jobs-table.php @@ -0,0 +1,339 @@ + __( 'Jobs', 'mantle' ), + 'singular' => __( 'Job', 'mantle' ), + ] + ); + } + + /** + * Gets the list of columns. + * + * @return string[] Array of column titles keyed by their column name. + */ + public function get_columns() { + return [ + 'job' => __( 'Job', 'mantle' ), + 'arguments' => __( 'Arguments', 'mantle' ), + 'queue' => __( 'Queue', 'mantle' ), + 'date' => __( 'Scheduled', 'mantle' ), + 'status' => __( 'Status', 'mantle' ), + ]; + } + + /** + * Collect the views for the table. + * + * @return array + */ + protected function get_views() { + $current = sanitize_text_field( wp_unslash( $_GET['filter'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended + + $links = [ + [ + 'current' => empty( $current ), + 'label' => __( 'All', 'mantle' ), + 'url' => add_query_arg( 'filter', '' ), + ], + ]; + + foreach ( Post_Status::cases() as $status ) { + $count = $this->get_status_count( $status ); + + $links[] = [ + 'current' => $status->value === $current, + 'label' => str( $status->name ) + ->title() + ->when( + $count > 0, + fn ( $str ) => $str->append( + sprintf( + ' (%d)', + esc_html( number_format_i18n( $count ) ), + ), + ), + ) + ->toString(), + 'url' => add_query_arg( + [ + 'action' => null, + 'job' => null, + 'filter' => $status->value, + ] + ), + ]; + } + + return $this->get_views_links( $links ); + } + + /** + * Retrieve the count of items on a specific status. + * + * @param Post_Status $status The status to retrieve the count for. + */ + protected function get_status_count( Post_Status $status ): int { + $count = wp_count_posts( Provider::OBJECT_NAME ); + + return $count->{$status->value} ?? 0; + } + + /** + * Prepares the list of items for displaying. + */ + public function prepare_items() { + $this->_column_headers = [ $this->get_columns(), [], [] ]; + + $statuses = array_column( Post_Status::cases(), 'value' ); + + $active_status_filter = sanitize_text_field( wp_unslash( $_GET['filter'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended + + // Validate that the status filter is valid. + if ( ! empty( $active_status_filter ) && ! in_array( $active_status_filter, $statuses, true ) ) { + $active_status_filter = ''; + } + + $active_queue_filter = sanitize_text_field( wp_unslash( $_GET['queue'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended + $page = (int) ( $_GET['paged'] ?? 1 ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended, WordPress.Security.ValidatedSanitizedInput.InputNotSanitized + + $query = Queue_Record::query() + ->orderBy( 'date', 'asc' ) + // Allow the query to be filtered by status. + ->when( + ! empty( $active_status_filter ), + fn ( $query ) => $query->where( 'post_status', $active_status_filter ), + fn ( $query ) => $query->where( 'post_status', $statuses ), + ) + // Allow the query to be filtered by queue. + ->when( + ! empty( $active_queue_filter ), + fn ( Post_Query_Builder $query ) => $query->whereTerm( + Provider::get_queue_term_id( $active_queue_filter, false ), + Provider::OBJECT_NAME, + ), + ) + ->for_page( $page, $this->per_page ); + + // TODO: Refactor with found_posts later. + $this->items = $query->get()->map( + function ( Queue_Record $model ) { + $worker = new Queue_Worker_Job( $model ); + $job = $worker->get_job(); + + return [ + 'id' => $model->ID, + 'job' => $worker->get_id(), + 'arguments' => is_object( $job ) ? get_object_vars( $job ) : '', + 'queue' => $model->get_queue(), + 'date' => $model->date, + 'status' => $model->status, + ]; + } + )->all(); + + $this->set_pagination_args( + [ + 'total_items' => $query->get_found_rows(), + 'per_page' => $this->per_page, + ] + ); + } + + /** + * Display the job column. + * + * @param array $item The current item. + */ + public function column_job( $item ): void { + $actions = [ + sprintf( + '%s', + esc_url( + add_query_arg( + [ + '_wpnonce' => false, + 'action' => false, + 'filter' => false, + 'job' => (int) $item['id'], + ] + ) + ), + esc_attr__( 'View details about this job', 'mantle' ), + esc_html__( 'View', 'mantle' ), + ), + Post_Status::PENDING->value === $item['status'] + ? sprintf( + '%s', + esc_url( + add_query_arg( + [ + '_wpnonce' => wp_create_nonce( 'queue-job-action-' . $item['id'] ), + 'job' => (int) $item['id'], + 'action' => 'run', + ] + ) + ), + esc_attr__( 'Run this job', 'mantle' ), + esc_html__( 'Run', 'mantle' ), + ) + : null, + Post_Status::FAILED->value === $item['status'] + ? sprintf( + '%s', + esc_url( + add_query_arg( + [ + '_wpnonce' => wp_create_nonce( 'queue-job-action-' . $item['id'] ), + 'job' => (int) $item['id'], + 'action' => 'retry', + ] + ) + ), + esc_attr__( 'Retry this job', 'mantle' ), + esc_html__( 'Retry', 'mantle' ), + ) + : null, + Post_Status::RUNNING->value !== $item['status'] + ? sprintf( + '%s', + esc_url( + add_query_arg( + [ + '_wpnonce' => wp_create_nonce( 'queue-job-action-' . $item['id'] ), + 'job' => (int) $item['id'], + 'action' => 'delete', + ] + ) + ), + esc_attr__( 'Delete this job', 'mantle' ), + "return confirm('" . esc_attr__( 'Are you sure you want to retry this job?', 'mantle' ) . "');", + esc_html__( 'Delete', 'mantle' ), + ) + : null, + ]; + + printf( + '
%s
%s
', + esc_html( $item['job'] ), + implode( ' | ', array_filter( $actions ) ), // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped + ); + } + + /** + * Display the arguments column. + * + * @param array $item The current item. + */ + public function column_arguments( $item ): void { + echo '' . wp_json_encode( $item['arguments'] ) . ''; + } + + /** + * Display the queue column. + * + * @param array $item The current item. + */ + public function column_queue( $item ): void { + printf( + '%s', + esc_url( add_query_arg( 'queue', $item['queue'] ) ), + esc_html( $item['queue'] ), + ); + } + + /** + * Display the date column. + * + * @param array $item The current item. + */ + public function column_date( $item ): void { + $time = Carbon::parse( $item['date'], wp_timezone() ); + + printf( + '', + esc_attr( $time->format( get_option( 'date_format' ) . ' ' . get_option( 'time_format' ) ) ), + esc_attr( $time->format( 'c' ) ), + esc_html( $time->diffForHumans() ), + ); + } + + /** + * Display the status column. + * + * @param array $item The current item. + */ + public function column_status( $item ): void { + switch ( $item['status'] ) { + case Post_Status::PENDING->value: + echo '' . esc_html__( 'Pending', 'mantle' ); + break; + + case Post_Status::RUNNING->value: + echo '' . esc_html__( 'Running', 'mantle' ); + break; + + case Post_Status::FAILED->value: + echo '' . esc_html__( 'Failed', 'mantle' ); + break; + + case Post_Status::COMPLETED->value: + echo '' . esc_html__( 'Completed', 'mantle' ); + break; + } + } + + /** + * Gets the name of the default primary column. + */ + protected function get_default_primary_column_name() { + return 'job'; + } + + /** + * Generates content for a single row of the table. + * + * @param array $item The current item. + */ + public function single_row( $item ) { + printf( '', esc_attr( 'queue-item queue-item__' . $item['status'] ) ); + $this->single_row_columns( $item ); + echo ''; + } +} diff --git a/src/mantle/queue/providers/wordpress/admin/class-service-provider.php b/src/mantle/queue/providers/wordpress/admin/class-service-provider.php new file mode 100644 index 000000000..07ac8ddcf --- /dev/null +++ b/src/mantle/queue/providers/wordpress/admin/class-service-provider.php @@ -0,0 +1,52 @@ +render(); + } +} diff --git a/src/mantle/queue/providers/wordpress/admin/template/single.php b/src/mantle/queue/providers/wordpress/admin/template/single.php new file mode 100644 index 000000000..b74cd0dd3 --- /dev/null +++ b/src/mantle/queue/providers/wordpress/admin/template/single.php @@ -0,0 +1,183 @@ +get_meta( Meta_Key::LOG->value, true ); +$log = is_array( $log ) ? $log : []; + +?> +
+

+ id, + /* phpcs:disable WordPress.Security.EscapeOutput.OutputNotEscaped */ + match ( $job->status ) { + Post_Status::PENDING->value => esc_html__( 'Pending', 'mantle' ), + Post_Status::FAILED->value => esc_html__( 'Failed', 'mantle' ), + Post_Status::RUNNING->value => esc_html__( 'Running', 'mantle' ), + default => esc_html__( 'Unknown', 'mantle' ), + }, + /* phpcs:enable WordPress.Security.EscapeOutput.OutputNotEscaped */ + ); + ?> +

+

+ + + +

+ +
+ +
+ + +
+

+
    + +
  1. +

    + + + + — + + format( get_option( 'date_format' ) . ' ' . get_option( 'time_format' ) ) ); ?> + + — + diffForHumans() ); ?> + +

    + + +
    + +
  2. + +
+
+
+
+ + diff --git a/src/mantle/queue/providers/wordpress/admin/template/table.php b/src/mantle/queue/providers/wordpress/admin/template/table.php new file mode 100644 index 000000000..fc56cb7ba --- /dev/null +++ b/src/mantle/queue/providers/wordpress/admin/template/table.php @@ -0,0 +1,31 @@ + +
+

+ +
+ + views(); ?> + display(); ?> +
+ + diff --git a/src/mantle/queue/providers/wordpress/class-meta-key.php b/src/mantle/queue/providers/wordpress/class-meta-key.php new file mode 100644 index 000000000..a99067034 --- /dev/null +++ b/src/mantle/queue/providers/wordpress/class-meta-key.php @@ -0,0 +1,38 @@ +> */ protected static $pending_queue = []; /** - * 'init' callback. + * Register the data types on 'init'. */ - public static function on_init() { + public static function register_data_types(): void { \register_post_type( static::OBJECT_NAME, [ @@ -66,13 +56,15 @@ public static function on_init() { ] ); - \register_post_status( - static::POST_STATUS_FAILED, - [ - 'internal' => true, - 'public' => false, - ] - ); + foreach ( Post_Status::cases() as $status ) { + \register_post_status( + $status->value, + [ + 'internal' => true, + 'public' => false, + ] + ); + } static::process_pending_queue(); } @@ -80,7 +72,7 @@ public static function on_init() { /** * Process the pending queue items that were added before `init`. */ - protected static function process_pending_queue() { + protected static function process_pending_queue(): void { if ( ! empty( static::$pending_queue ) ) { $manager = app( Queue_Manager::class ); @@ -94,6 +86,14 @@ protected static function process_pending_queue() { } } + /** + * Constructor + * + * @param Application $app Application instance. + */ + public function __construct( protected Application $app ) { + } + /** * Push a job to the queue. * @@ -102,10 +102,11 @@ protected static function process_pending_queue() { * @param mixed $job Job instance. * @return bool */ - public function push( $job ) { + public function push( mixed $job ): bool { // Account for adding to the queue before 'init'. if ( ! \did_action( 'init' ) ) { static::$pending_queue[] = func_get_args(); + return true; } @@ -113,66 +114,105 @@ public function push( $job ) { $job = serialize( $job ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize } - $queue = $job->queue ?? 'default'; - $insert = \wp_insert_post( + $queue = $job->queue ?? 'default'; + $job_name = Str::of( $job::class ) + ->replace( '\\', '_' ) + ->replace( '_', '-' ) + ->lower() + ->slug(); + + $object = new Queue_Record( [ - 'post_type' => static::OBJECT_NAME, - 'post_name' => 'mantle_queue_' . time(), - 'post_status' => 'publish', - 'meta_input' => [ - '_mantle_queue' => $job, + 'post_name' => "mantle_queue_{$job_name}_" . time(), + 'post_status' => Post_Status::PENDING->value, + 'meta' => [ + Meta_Key::JOB->value => $job, + Meta_Key::START_TIME->value => now()->getTimestamp(), ], - ], - true + ] ); - if ( is_wp_error( $insert ) ) { - throw new RuntimeException( 'Error adding job to queue: ' . $insert->get_error_message() ); + // Handle the job being delayed. + if ( isset( $job->delay ) ) { + // Translate the delay into a timestamp. + $delay = $job->delay instanceof DateTimeInterface + ? $job->delay->getTimestamp() + : now()->addSeconds( $job->delay )->getTimestamp(); + + // Set the post date to the timestamp of the delay to prevent it from + // being started before the delay. + $object->post_date = Carbon::createFromTimestamp( $delay, wp_timezone() )->toDateTimeString(); } - wp_set_object_terms( $insert, static::get_queue_term_id( $queue ), static::OBJECT_NAME, false ); + $object->save(); - // Ensure that the next cron event is scheduled for this queue. - Scheduler::schedule( $queue ); + // TODO: Convert this to a queued term setter like we do with meta. + $object->set_terms( + [ + static::OBJECT_NAME => static::get_queue_term_id( $queue ), + ] + ); return true; } /** - * Get the next set of jobs in the queue. + * Get the next set of job(s) in the queue. + * + * @todo Lock the jobs after popping them off the queue. * * @param string $queue Queue name. * @param int $count Number of items to fetch. - * @return Collection + * @return Collection */ public function pop( string $queue = null, int $count = 1 ): Collection { - $post_ids = \get_posts( - [ - 'fields' => 'ids', - 'ignore_sticky_posts' => true, - 'order' => 'ASC', - 'orderby' => 'date', - 'post_status' => 'publish', - 'post_type' => static::OBJECT_NAME, - 'posts_per_page' => $count, - 'suppress_filters' => false, - 'tax_query' => [ // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_tax_query - [ - 'taxonomy' => static::OBJECT_NAME, - 'terms' => static::get_queue_term_id( $queue ), - ], - ], - ] - ); + $max_concurrent_batches = max( 1, $this->app['config']->get( 'queue.max_concurrent_batches', 1 ) ); - if ( empty( $post_ids ) ) { - return collect(); - } - - return collect( $post_ids ) + return $this->query( $queue ) + // Ensure the we're only retrieving jobs that are not scheduled for the + // future ordered by the oldest first. + ->olderThanOrEqualTo( now() ) + ->orderBy( 'post_date', 'asc' ) + // Multiply the count times the number of concurrent batches to get the + // number of jobs to fetch. This accounts for job locks without needing a + // meta query. + ->take( $count * $max_concurrent_batches ) + ->get() + // Filter out any jobs that are locked. + ->filter( fn ( Queue_Record $record ) => ! $record->is_locked() ) ->map( - fn( int $post_id ) => new Queue_Worker_Job( \get_post_meta( $post_id, '_mantle_queue', true ), $post_id ), - ); + fn ( Queue_Record $record ) => tap( + new Queue_Worker_Job( $record ), + // Lock the job until the configured timeout or 10 minutes. + fn ( Queue_Worker_Job $job ) => $record->set_lock_until( + $job->get_job()->timeout ?? 600 + ), + ), + ) + ->take( $count ) + ->values(); + } + + /** + * Retrieve the number of pending jobs in the queue. + * + * @param string $queue Queue name, optional. + * @return int + */ + public function pending_count( string $queue = null ): int { + return $this->query( $queue )->count(); + } + + /** + * Construct the query builder for the queue. + * + * @param string|null $queue Queue name, optional. + * @return Post_Query_Builder + */ + protected function query( string $queue = null ): Post_Query_Builder { + return Queue_Record::where( 'post_status', Post_Status::PENDING->value ) + ->whereTerm( static::get_queue_term_id( $queue ), static::OBJECT_NAME ) + ->orderBy( 'post_date', 'asc' ); } /** @@ -182,46 +222,34 @@ public function pop( string $queue = null, int $count = 1 ): Collection { * @param string $queue Queue to compare against. * @return bool */ - public function in_queue( $job, string $queue = null ): bool { - $queued_objects = \get_posts( - [ - 'fields' => 'ids', - 'post_status' => 'publish', - 'post_type' => static::OBJECT_NAME, - 'posts_per_page' => 100, - 'meta_query' => [ // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_meta_query - [ - 'key' => '_mantle_queue', - 'value' => maybe_serialize( $job ), - ], - ], - 'tax_query' => [ // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_tax_query - [ - 'taxonomy' => static::OBJECT_NAME, - 'terms' => static::get_queue_term_id( $queue ), - ], - ], - ] - ); - - return ! empty( $queued_objects ); + public function in_queue( mixed $job, string $queue = null ): bool { + return Queue_Record::where( 'post_status', Post_Status::PENDING->value ) + ->whereDate( now()->toDateTimeString(), '>=' ) + ->whereTerm( static::get_queue_term_id( $queue ), static::OBJECT_NAME ) + ->whereMeta( Meta_Key::JOB->value, maybe_serialize( $job ) ) + ->exists(); } /** * Get the taxonomy term for a queue. * - * @param string $name Queue name, optional. + * @param string|null $name Queue name, optional. + * @param bool $create Whether to create the term if it doesn't exist. * @return int * * @throws InvalidArgumentException Thrown on invalid queue term. */ - public static function get_queue_term_id( string $name = null ): int { + public static function get_queue_term_id( ?string $name = null, bool $create = true ): int { if ( ! $name ) { $name = 'default'; } $term = \get_term_by( 'slug', $name, static::OBJECT_NAME ); + if ( empty( $term ) && ! $create ) { + return 0; + } + if ( empty( $term ) ) { $insert = \wp_insert_term( $name, static::OBJECT_NAME, [ 'slug' => $name ] ); diff --git a/src/mantle/queue/providers/wordpress/class-queue-record.php b/src/mantle/queue/providers/wordpress/class-queue-record.php new file mode 100644 index 000000000..31d3775d5 --- /dev/null +++ b/src/mantle/queue/providers/wordpress/class-queue-record.php @@ -0,0 +1,99 @@ +get_lock_until() > \time(); + } + + /** + * Get the lock end time. + * + * @return int The lock end time. + */ + public function get_lock_until(): int { + return (int) ( $this->get_meta( Meta_Key::LOCK_UNTIL->value, true, ) ?? 0 ); + } + + /** + * Set the lock end time. + * + * @param int $lock_until The lock end time. + * @return void + */ + public function set_lock_until( int $lock_until ): void { + $this->set_meta( Meta_Key::LOCK_UNTIL->value, $lock_until ); + } + + /** + * Clear the lock end time. + * + * @return void + */ + public function clear_lock(): void { + $this->delete_meta( Meta_Key::LOCK_UNTIL->value ); + } + + /** + * Log an event for the job. + * + * @param Event $event The event to log. + * @param array $payload The event payload. + * @return void + */ + public function log( Event $event, array $payload = [] ): void { + $meta = $this->get_meta( Meta_Key::LOG->value ); + + if ( ! is_array( $meta ) ) { + $meta = []; + } + + $meta[] = [ + 'event' => $event->value, + 'payload' => $payload, + 'time' => \time(), + ]; + + $this->set_meta( Meta_Key::LOG->value, $meta ); + } + + /** + * Retrieve the queue name. + * + * @return string + */ + public function get_queue(): string { + return collect( $this->get_terms( Provider::OBJECT_NAME ) ) + ->pluck( 'name' ) + ->first( null, '' ); + } +} diff --git a/src/mantle/queue/providers/wordpress/class-queue-worker-job.php b/src/mantle/queue/providers/wordpress/class-queue-worker-job.php index bb41daca9..98d4eb3b9 100644 --- a/src/mantle/queue/providers/wordpress/class-queue-worker-job.php +++ b/src/mantle/queue/providers/wordpress/class-queue-worker-job.php @@ -1,32 +1,27 @@ job = $job; - $this->queue_post_id = $queue_post_id; - } + public function __construct( protected Queue_Record $model ) {} /** * Fire the job. */ - public function fire() { - // Check if the job has a method called 'handle'. - if ( $this->job instanceof JobContract || method_exists( $this->job, 'handle' ) ) { - $this->job->handle(); - } elseif ( is_callable( $this->job ) ) { - $callback = $this->job; + public function fire(): void { + // Refresh the model once more to ensure we have the latest data. + $this->model->refresh(); + + $this->model->log( Event::STARTING ); + + // Mark the job as "running". + $this->model->save( + [ + 'post_status' => Post_Status::RUNNING->value, + ] + ); - $callback(); + $job = $this->get_job(); + + // Set the lock end time. + $this->model->set_lock_until( time() + ( $job->timeout ?? 600 ) ); + + // Check if the job has a method called 'handle'. + if ( $job instanceof JobContract || method_exists( $job, 'handle' ) ) { + $job->handle(); + } elseif ( is_callable( $job ) ) { + $job(); } - } - /** - * Get the queue post ID. - * - * @return int|null - */ - public function get_post_id(): ?int { - return $this->queue_post_id; + $this->model->log( Event::FINISHED ); } /** @@ -74,40 +73,109 @@ public function get_post_id(): ?int { * * @return mixed */ - public function get_id() { - return $this->get_post_id(); + public function get_id(): mixed { + $job = $this->get_job(); + + return match ( true ) { + $job instanceof Closure_Job => $job->get_id(), + is_object( $job ) => $job::class, + default => $this->model->id(), + }; } /** * Handle a failed queue job. * - * @todo Add retrying for queued jobs. - * * @param Throwable $e Exception thrown. * @return void */ - public function failed( Throwable $e ) { + public function failed( Throwable $e ): void { $this->failed = true; - if ( $this->queue_post_id ) { - update_post_meta( $this->queue_post_id, '_mantle_queue_error', $e->getMessage() ); - wp_update_post( - [ - 'ID' => $this->queue_post_id, - 'post_status' => 'failed', - ] - ); + $this->model->log( + Event::FAILED, + [ + 'exception' => $e::class, + 'message' => $e->getMessage(), + 'trace' => explode( "\n", $e->getTraceAsString() ), + ], + ); + + $this->model->save( + [ + 'meta' => [ + Meta_Key::FAILURE->value => $e->getMessage(), + Meta_Key::LOCK_UNTIL->value => '', + ], + 'post_status' => Post_Status::FAILED->value, + ] + ); + + $job = $this->get_job(); + + if ( method_exists( $job, 'failed' ) ) { + $job->failed( $e ); } } /** - * Delete the job from the queue. + * Handle a completed job. + * + * @return void */ - public function delete() { - $post_id = $this->get_post_id(); + public function completed(): void { + $this->model->save( + [ + 'post_status' => Post_Status::COMPLETED->value, + ] + ); - if ( $post_id && wp_delete_post( $post_id, true ) ) { - $this->queue_post_id = null; + $job = $this->get_job(); + + if ( method_exists( $job, 'completed' ) ) { + $job->completed(); } } + + /** + * Delete the job from the queue. + */ + public function delete(): void { + $this->model->delete( true ); + } + + /** + * Retry a job with a specified delay. + * + * @param int $delay Delay in seconds. + */ + public function retry( int $delay = 0 ): void { + $this->model->log( Event::RETRYING, [ 'delay' => $delay ] ); + + $this->model->save( + [ + 'post_date' => now()->addSeconds( $delay )->toDateTimeString(), + 'post_status' => Post_Status::PENDING->value, + ] + ); + + $app = Application::get_instance(); + + // Dispatch the job queued event. + $app['events']->dispatch( + new Job_Queued( + $app->make( Queue_Manager::class )->get_provider(), + $this->get_job(), + ), + ); + } + + /** + * Retrieve the stored job. + * + * @return mixed + */ + public function get_job(): mixed { + return $this->model->get_meta( Meta_Key::JOB->value, true ); + } } diff --git a/src/mantle/queue/providers/wordpress/class-scheduler.php b/src/mantle/queue/providers/wordpress/class-scheduler.php index 30ac7b8f0..b8a42e3f8 100644 --- a/src/mantle/queue/providers/wordpress/class-scheduler.php +++ b/src/mantle/queue/providers/wordpress/class-scheduler.php @@ -7,6 +7,10 @@ namespace Mantle\Queue\Providers\WordPress; +use Mantle\Support\Collection; + +use function Mantle\Support\Helpers\collect; + /** * WordPress Cron Scheduler */ @@ -19,19 +23,55 @@ class Scheduler { public const EVENT = 'mantle_queue'; /** - * Callback for the cron event. + * Pending queues that need scheduling. * - * @todo Abstract this a bit, allow configuration to control some of this. + * @var string[] + */ + public static array $pending_queues = []; + + /** + * Handle a job being queued and ensure the cron is scheduled for the queue. * * @param string $queue Queue name. */ - public static function on_queue_run( $queue ) { + public static function on_job_queued( string $queue ): void { + if ( ! in_array( $queue, static::$pending_queues, true ) ) { + static::$pending_queues[] = $queue; + } + + if ( ! has_action( 'shutdown', [ __CLASS__, 'schedule_on_shutdown' ] ) ) { + add_action( 'shutdown', [ __CLASS__, 'schedule_on_shutdown' ] ); + } + } + + /** + * Schedule the next run of the queue on shutdown for all pending queues. + * + * Once a queue job is dispatched, the queue will be scheduled to run on + * shutdown to deduplicate any scheduling calls. + */ + public static function schedule_on_shutdown(): void { + foreach ( static::$pending_queues as $queue ) { + static::schedule_next_run( $queue ); + } + + static::$pending_queues = []; + } + + /** + * Callback for the cron event. + * + * @param string $queue Queue name, optional. + */ + public static function run( ?string $queue = null ) { if ( ! $queue ) { $queue = 'default'; } + wp_raise_memory_limit( 'cron' ); + app( 'queue.worker' )->run( - (int) config( 'queue.batch_size', 1 ), + static::get_configuration_value( 'batch_size', $queue, 100 ), $queue ); } @@ -39,6 +79,8 @@ public static function on_queue_run( $queue ) { /** * Schedule the next run of the cron for a queue. * + * Note: this does not check if the cron event is already scheduled. + * * @param string $queue Queue name. * @param int $delay Delay in seconds, defaults to none. * @return bool @@ -48,8 +90,12 @@ public static function schedule( string $queue = null, int $delay = 0 ): bool { $queue = 'default'; } - if ( ! \wp_next_scheduled( static::EVENT, [ $queue ] ) ) { - \wp_schedule_single_event( time() + $delay, static::EVENT, [ $queue ] ); + $schedule = \wp_schedule_single_event( time() + $delay, static::EVENT, [ $queue, time() + $delay ], true ); + + if ( is_wp_error( $schedule ) ) { + dump( $schedule ); + + return false; } return true; @@ -61,11 +107,10 @@ public static function schedule( string $queue = null, int $delay = 0 ): bool { * @param string $queue Queue name. */ public static function unschedule( string $queue = null ): void { - if ( ! $queue ) { - $queue = 'default'; - } - - \wp_clear_scheduled_hook( static::EVENT, [ $queue ] ); + static::get_scheduled_cron_jobs( $queue ) + ->each( + fn ( array $job ) => wp_unschedule_event( $job['timestamp'], static::EVENT, $job['args'] ?? [] ), + ); } /** @@ -77,42 +122,124 @@ public static function unschedule( string $queue = null ): void { * @param string $queue Queue name. * @return bool Flag if the next run was scheduled. */ - public static function schedule_next_run( string $queue = null ): bool { - $has_remaining = \get_posts( - [ - 'fields' => 'ids', - 'ignore_sticky_posts' => true, - 'post_type' => Provider::OBJECT_NAME, - 'posts_per_page' => 1, - 'suppress_filters' => false, - 'post_status' => 'publish', - 'tax_query' => [ // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_tax_query - [ - 'taxonomy' => Provider::OBJECT_NAME, - 'terms' => Provider::get_queue_term_id( $queue ), - ], - ], - ] - ); + public static function schedule_next_run( ?string $queue = null ): bool { + if ( ! $queue ) { + $queue = 'default'; + } + + /** + * Provider instance. + * + * @var \Mantle\Contracts\Queue\Provider + */ + $provider = app( 'queue' )->get_provider( 'wordpress' ); + + $pending_count = $provider->pending_count( $queue ); // Ensure the queue job isn't scheduled if there are no items in the queue. - if ( empty( $has_remaining ) ) { + if ( ! $pending_count ) { static::unschedule( $queue ); return false; } + $max_concurrent_batches = max( 1, static::get_configuration_value( 'max_concurrent_batches', $queue, 1 ) ); + $batch_size = static::get_configuration_value( 'batch_size', $queue, 100 ); + $already_scheduled_count = static::get_scheduled_count( $queue ); + + // If there are already enough batches scheduled, don't schedule another. + if ( $already_scheduled_count >= $max_concurrent_batches ) { + return false; + } + + $to_schedule = max( $max_concurrent_batches, ceil( $pending_count / $batch_size ) ) - $already_scheduled_count; + + if ( $to_schedule > 0 ) { + $delay = static::get_configuration_value( 'delay', $queue, 0 ); + + for ( $i = 0; $i < $to_schedule; $i++ ) { + static::schedule( $queue, $delay ); + + // Add a delay to the next scheduled queue job to stagger them and allow + // for multiple of the same "job" to be scheduled. WordPress cron does + // not allow for duplicate jobs to be scheduled by default. + $delay += 5; + } + } + + return true; + } + + /** + * Retrieve all the scheduled cron jobs for a queue from the cron API. + * + * @param string $queue Queue name. + * @return Collection + */ + protected static function get_scheduled_cron_jobs( string $queue = null ): Collection { if ( ! $queue ) { $queue = 'default'; } - $delay = config( 'queue.wordpress.delay', [] ); + return collect( _get_cron_array() ) + ->reduce( + function ( Collection $carry, array $items, $timestamp ) use ( $queue ) { + if ( empty( $items[ static::EVENT ] ) ) { + return $carry; + } + + foreach ( $items[ static::EVENT ] as $job ) { + if ( empty( $job['args'][0] ) || $queue !== $job['args'][0] ) { + continue; + } + + $job['timestamp'] = $timestamp; + + $carry[] = $job; + } + + return $carry; + }, + collect(), + ); + } + + /** + * Retrieve the number of already-scheduled queue jobs for a queue. + * + * @param string $queue Queue name. + * @return int + */ + public static function get_scheduled_count( string $queue = null ): int { + return static::get_scheduled_cron_jobs( $queue )->count(); + } + + /** + * Retrieve a configuration value for a queue. + * + * @param string $key Configuration key. + * @param string $queue Queue name. + * @param mixed $default Default value. + * @return mixed + */ + protected static function get_configuration_value( string $key, string $queue = null, mixed $default = null ): mixed { + $config = config(); + + // Check for a queue-specific configuration value. + if ( $queue && $config->has( "queue.wordpress.queues.{$queue}.{$key}" ) ) { + return $config->get( "queue.wordpress.queues.{$queue}.{$key}" ); + } + + // Check for a default configuration for the queue provider. + if ( $config->has( "queue.wordpress.{$key}" ) ) { + return $config->get( "queue.wordpress.{$key}" ); + } - // Support queue-specific delay. - if ( is_array( $delay ) ) { - $delay = $delay[ $queue ] ?? 0; + // Check for a default configuration for the queue configuration. + if ( $config->has( "queue.{$key}" ) ) { + return $config->get( "queue.{$key}" ); } - return static::schedule( $queue, $delay ); + return $default; } } diff --git a/src/mantle/queue/providers/wordpress/class-service-provider.php b/src/mantle/queue/providers/wordpress/class-service-provider.php new file mode 100644 index 000000000..4fb9d07ed --- /dev/null +++ b/src/mantle/queue/providers/wordpress/class-service-provider.php @@ -0,0 +1,134 @@ +app['config']->get( 'queue.enable_admin', true ) ) { + $this->app->register( Admin\Service_Provider::class ); + } + } + + /** + * Register the WordPress queue provider's post type and taxonomies. + * + * Registers the cleanup command with the application task scheduler to run + * daily (by default) to remove old queue jobs from the database. + */ + public function boot() { + if ( did_action( 'init' ) ) { + $this->register_data_types(); + } + + $this->app->resolving( + 'scheduler', + fn ( Schedule $scheduler ) => $scheduler->command( Cleanup_Jobs_Command::class )->cron( + /** + * Filter the schedule for the queue cleanup job. + * + * @param string $schedule Schedule cron expression. Defaults to daily at midnight. + */ + (string) apply_filters( 'mantle_queue_cleanup_schedule', '0 0 * * *' ), + ) + ); + } + + /** + * Register the WordPress queue provider's post type and taxonomies. + */ + #[Action( 'init' )] + public function register_data_types(): void { + Provider::register_data_types(); + } + + /** + * Listen for the providers registered event to register the WordPress queue + * provider. + * + * @param Events\Providers_Registered $event Event instance. + */ + #[Action( Events\Providers_Registered::class )] + public function register_queue_provider( Events\Providers_Registered $event ): Events\Providers_Registered { + $event->manager->add_provider( 'wordpress', Provider::class ); + + return $event; + } + + /** + * Handle the schedule event to run the queue via WordPress cron. + * + * This is the listener for the cron event that will start the process of + * firing off queue jobs. + * + * @param string|null $queue Queue name. + */ + #[Action( Scheduler::EVENT )] + public function handle_scheduled_run( $queue = null ): void { + Scheduler::run( $queue ?? 'default' ); + } + + /** + * Handle the Job Queued event to schedule the next cron run. + * + * @param Events\Job_Queued $event Job Queued event. + * @return Events\Job_Queued + */ + #[Action( Events\Job_Queued::class ) ] + public function handle_job_queued_event( Events\Job_Queued $event ): Events\Job_Queued { + if ( $event->provider instanceof Provider ) { + Scheduler::on_job_queued( $event->queue ?? 'default' ); + } + + return $event; + } + + /** + * Handle the Run Complete event to schedule the next cron run. + * + * @param Events\Run_Complete $event Run complete event. + * @return Events\Run_Complete + */ + #[Action( Events\Run_Complete::class ) ] + public function handle_run_complete( Events\Run_Complete $event ): Events\Run_Complete { + if ( $event->provider instanceof Provider ) { + Scheduler::schedule_next_run( $event->queue ?? 'default' ); + } + + return $event; + } + + /** + * Increase the concurrency of the cron job with WordPress VIP's cron. + * + * @link https://docs.wpvip.com/technical-references/cron-control/#h-increasing-cron-event-concurrency + * + * @param array $list List of events and their concurrency. + * @return array List of events and their concurrency. + */ + #[Action( 'a8c_cron_control_concurrent_event_whitelist' )] + public function increase_vip_concurrency( array $list ): array { + $list[ Scheduler::EVENT ] = 100; + + return $list; + } +} diff --git a/src/mantle/queue/providers/wordpress/enum-event.php b/src/mantle/queue/providers/wordpress/enum-event.php new file mode 100644 index 000000000..cf136e348 --- /dev/null +++ b/src/mantle/queue/providers/wordpress/enum-event.php @@ -0,0 +1,18 @@ +dispatch_now( + public static function dispatch_now( ...$args ): void { + app( Dispatcher::class )->dispatch_now( new static( ...$args ) ); } diff --git a/src/mantle/queue/trait-queueable.php b/src/mantle/queue/trait-queueable.php index e75c441d3..5ee4f046b 100644 --- a/src/mantle/queue/trait-queueable.php +++ b/src/mantle/queue/trait-queueable.php @@ -2,11 +2,15 @@ /** * Queueable trait file. * + * phpcs:disable Squiz.Commenting.VariableComment.Missing + * * @package Mantle */ namespace Mantle\Queue; +use DateTimeInterface; + /** * Queueable trait for queue jobs. * @@ -16,16 +20,16 @@ trait Queueable { /** * The delay before the job will be run. * - * @var int + * @var int|DateTimeInterface */ - public $delay; + public int|DateTimeInterface $delay; /** * The name of the queue for the job. * * @var string */ - public $queue; + public string $queue; /** * Add a dispatch to a specific queue. @@ -35,17 +39,19 @@ trait Queueable { */ public function on_queue( string $queue ) { $this->queue = $queue; + return $this; } /** * Set the delay before the job will be run. * - * @param int $delay Delay in seconds. + * @param DateTimeInterface|int $delay Delay in seconds or DateTime instance. * @return static */ - public function delay( int $delay ) { + public function delay( DateTimeInterface|int $delay ) { $this->delay = $delay; + return $this; } } diff --git a/src/mantle/scheduling/class-event.php b/src/mantle/scheduling/class-event.php index 8bbfffaa9..3091e226d 100644 --- a/src/mantle/scheduling/class-event.php +++ b/src/mantle/scheduling/class-event.php @@ -21,7 +21,7 @@ use Throwable; /** - * Schedulable Event + * Schedule-able Event */ class Event { use Macroable, Manages_Frequencies; @@ -31,56 +31,49 @@ class Event { * * @var string */ - public $expression = '* * * * *'; + public string $expression = '* * * * *'; /** * The list of environments the command should run under. * - * @var array + * @var string[] */ - public $environments = []; - - /** - * Indicates if the command should not overlap itself. - * - * @var bool - */ - public $without_overlapping = false; + public array $environments = []; /** * The array of filter callbacks. * - * @var array + * @var callable[] */ - protected $filters = []; + protected array $filters = []; /** * The array of reject callbacks. * - * @var array + * @var callable[] */ - protected $rejects = []; + protected array $rejects = []; /** * The array of callbacks to be run before the event is started. * - * @var array + * @var callable[] */ - protected $before_callbacks = []; + protected array $before_callbacks = []; /** * The array of callbacks to be run after the event is finished. * - * @var array + * @var callable[] */ - protected $after_callbacks = []; + protected array $after_callbacks = []; /** * The human readable description of the event. * * @var string */ - public $description; + public string $description; /** * The exit status code of the command. @@ -88,14 +81,14 @@ class Event { * * @var int|null */ - public $exit_code; + public ?int $exit_code; /** * Exception thrown for the command. * * @var \Throwable */ - public $exception; + public \Throwable $exception; /** * Create a new event instance. diff --git a/src/mantle/scheduling/class-schedule.php b/src/mantle/scheduling/class-schedule.php index 88897fb2b..e7eca0772 100644 --- a/src/mantle/scheduling/class-schedule.php +++ b/src/mantle/scheduling/class-schedule.php @@ -94,7 +94,7 @@ public static function schedule_cron_event() { /** * Run the scheduled events that are due to run. */ - public function run_due_events() { + public function run_due_events(): void { $this ->due_events( $this->container ) ->each( fn ( Event $event ) => $event->run( $this->container ) ); @@ -103,9 +103,9 @@ public function run_due_events() { /** * Add a new command event. * - * @param string $command Command class to run. - * @param array $arguments Arguments for the command. - * @param array $assoc_args Assoc. arguments for the command. + * @param class-string<\Mantle\Console\Command> $command Command class to run. + * @param array $arguments Arguments for the command. + * @param array $assoc_args Assoc. arguments for the command. * @return Event * * @throws RuntimeException Thrown on missing command. @@ -130,8 +130,8 @@ public function command( string $command, array $arguments = [], array $assoc_ar /** * Add a new job event. * - * @param string $job Job class to run. - * @param array $arguments Arguments for the command. + * @param class-string $job Job class to run. + * @param array $arguments Arguments for the command. * @return Event * * @throws RuntimeException Thrown on missing command. @@ -156,11 +156,11 @@ public function job( string $job, array $arguments = [] ): Event { /** * Add a callback event. * - * @param string $callback Callback to run. - * @param array $arguments Arguments for the callback. + * @param callable $callback Callback to run. + * @param array $arguments Arguments for the callback. * @return Event */ - public function call( $callback, array $arguments = [] ): Event { + public function call( callable $callback, array $arguments = [] ): Event { $event = new Event( $callback, $arguments, $this->get_timezone() ); $this->events[] = $event; @@ -185,7 +185,7 @@ public function due_events( Application $app ): Collection { * * @return Event[] */ - public function events() { + public function events(): array { return $this->events; } } diff --git a/src/mantle/testing/concerns/trait-assertions.php b/src/mantle/testing/concerns/trait-assertions.php index 3c4bec91a..2d9a8f8c5 100644 --- a/src/mantle/testing/concerns/trait-assertions.php +++ b/src/mantle/testing/concerns/trait-assertions.php @@ -9,7 +9,9 @@ namespace Mantle\Testing\Concerns; +use BackedEnum; use Mantle\Contracts\Database\Core_Object; +use Mantle\Contracts\Support\Arrayable; use Mantle\Database\Model\Post; use Mantle\Database\Model\Term; use Mantle\Database\Model\User; @@ -17,6 +19,7 @@ use WP_Post; use WP_Term; +use function Mantle\Support\Helpers\collect; use function Mantle\Support\Helpers\get_term_object; /** @@ -270,12 +273,12 @@ public static function assertQueriedObjectNull(): void { * @param array $arguments Arguments to query against. */ public function assertPostExists( array $arguments ): void { - $arguments = array_merge( + $arguments = $this->serialize_arguments( + $arguments, [ 'fields' => 'ids', 'posts_per_page' => 1, ], - $arguments ); PHPUnit::assertNotEmpty( @@ -290,12 +293,12 @@ public function assertPostExists( array $arguments ): void { * @param array $arguments Arguments to query against. */ public function assertPostDoesNotExists( array $arguments ): void { - $arguments = array_merge( + $arguments = $this->serialize_arguments( + $arguments, [ 'fields' => 'ids', 'posts_per_page' => 1, ], - $arguments ); PHPUnit::assertEmpty( @@ -310,13 +313,13 @@ public function assertPostDoesNotExists( array $arguments ): void { * @param array $arguments Arguments to query against. */ public function assertTermExists( array $arguments ): void { - $arguments = array_merge( + $arguments = $this->serialize_arguments( + $arguments, [ 'fields' => 'ids', 'count' => 1, 'hide_empty' => false, ], - $arguments ); PHPUnit::assertNotEmpty( @@ -331,13 +334,13 @@ public function assertTermExists( array $arguments ): void { * @param array $arguments Arguments to query against. */ public function assertTermDoesNotExists( array $arguments ): void { - $arguments = array_merge( + $arguments = $this->serialize_arguments( + $arguments, [ 'fields' => 'ids', 'count' => 1, 'hide_empty' => false, ], - $arguments ); PHPUnit::assertEmpty( @@ -352,12 +355,12 @@ public function assertTermDoesNotExists( array $arguments ): void { * @param array $arguments Arguments to query against. */ public function assertUserExists( array $arguments ) { - $arguments = array_merge( + $arguments = $this->serialize_arguments( + $arguments, [ 'fields' => 'ids', 'count' => 1, ], - $arguments ); PHPUnit::assertNotEmpty( @@ -372,12 +375,12 @@ public function assertUserExists( array $arguments ) { * @param array $arguments Arguments to query against. */ public function assertUserDoesNotExists( array $arguments ): void { - $arguments = array_merge( + $arguments = $this->serialize_arguments( + $arguments, [ 'fields' => 'ids', 'count' => 1, ], - $arguments ); PHPUnit::assertEmpty( @@ -458,4 +461,46 @@ public function assertPostNotHasTerm( Post|WP_Post|int $post, Term|WP_Term|int $ public function assertPostsDoesNotHaveTerm( Post|WP_Post|int $post, Term|WP_Term|int $term ): void { $this->assertPostNotHasTerm( $post, $term ); } + + /** + * Serialize arguments for use in assertions. + * + * Convert string-backed enums to an array of all possible values from an enumeration. + * + * @param array $arguments Arguments to serialize. + * @param array $defaults Default values. + * @return array + */ + protected function serialize_arguments( array $arguments, array $defaults = [] ): array { + $arguments = array_merge( $defaults, $arguments ); + + foreach ( $arguments as $key => $value ) { + if ( $value instanceof Arrayable ) { + $arguments[ $key ] = $value->to_array(); + } + + // Check for PHP 8.1+ support. + if ( interface_exists( BackedEnum::class ) ) { + // Convert an enum to an array of all possible values. + if ( is_string( $value ) && enum_exists( $value ) && is_subclass_of( $value, BackedEnum::class ) ) { + $arguments[ $key ] = collect( $value::cases() )->pluck( 'value' )->all(); + } + + // Convert an enum object to its value. + if ( is_object( $value ) && $value instanceof BackedEnum ) { + $arguments[ $key ] = $value->value; + } + + // Convert an array of enum objects to an array of their values. + if ( is_array( $value ) ) { + $arguments[ $key ] = array_map( + fn ( $item ) => is_object( $item ) && $item instanceof BackedEnum ? $item->value : $item, + $value, + ); + } + } + } + + return $arguments; + } } diff --git a/src/mantle/testing/concerns/trait-create-application.php b/src/mantle/testing/concerns/trait-create-application.php index 567856373..67efca478 100644 --- a/src/mantle/testing/concerns/trait-create-application.php +++ b/src/mantle/testing/concerns/trait-create-application.php @@ -73,6 +73,7 @@ protected function get_application_config(): array { \Mantle\Filesystem\Filesystem_Service_Provider::class, \Mantle\Database\Pagination\Paginator_Service_Provider::class, \Mantle\Cache\Cache_Service_Provider::class, + \Mantle\Application\App_Service_Provider::class, ], ], 'queue' => [ diff --git a/src/mantle/testing/concerns/trait-interacts-with-cron.php b/src/mantle/testing/concerns/trait-interacts-with-cron.php index 314a4ddcd..8ce2b39cc 100644 --- a/src/mantle/testing/concerns/trait-interacts-with-cron.php +++ b/src/mantle/testing/concerns/trait-interacts-with-cron.php @@ -11,29 +11,42 @@ use InvalidArgumentException; use Mantle\Contracts\Queue\Job; use Mantle\Contracts\Queue\Queue_Manager; +use Mantle\Queue\Worker; use PHPUnit\Framework\Assert as PHPUnit; use stdClass; +use function Mantle\Support\Helpers\collect; + /** * Concern for interacting with the WordPress cron and making assertions against * it. Also supports queued and scheduled jobs. + * + * @mixin \Mantle\Testing\Test_Case */ trait Interacts_With_Cron { /** * Assert that an action is in the cron queue. * * @param string $action Action hook of the event. - * @param array $args Arguments for the cron queue event. + * @param array $args Arguments for the cron queue event or null to not check + * arguments (cron only). */ - public function assertInCronQueue( string $action, array $args = [] ): void { + public function assertInCronQueue( string $action, array|null $args = [] ): void { if ( $this->is_job_action( $action ) ) { - $this->assertJobQueued( $action, $args ); + $this->assertJobQueued( $action, (array) $args ); return; } - PHPUnit::assertNotFalse( - \wp_next_scheduled( $action, $args ), - "Cron action is not in cron queue: [$action]" + if ( ! is_null( $args ) ) { + PHPUnit::assertNotFalse( + \wp_next_scheduled( $action, $args ), + "Cron action is not in cron queue: [$action]" + ); + } + + PHPUnit::assertNotEmpty( + collect( static::get_cron_events() )->where( 'hook', $action )->all(), + "Cron action is not in cron queue: [$action] (no arguments checked)", ); } @@ -41,24 +54,32 @@ public function assertInCronQueue( string $action, array $args = [] ): void { * Assert that an action is not in a cron queue. * * @param string $action Action hook of the event. - * @param array $args Arguments for the cron queue event. + * @param array $args Arguments for the cron queue event or null to not check + * arguments (cron only). */ - public function assertNotInCronQueue( string $action, array $args = [] ): void { + public function assertNotInCronQueue( string $action, array|null $args = [] ): void { if ( $this->is_job_action( $action ) ) { - $this->assertJobNotQueued( $action, $args ); + $this->assertJobNotQueued( $action, (array) $args ); return; } - PHPUnit::assertFalse( - \wp_next_scheduled( $action, $args ), - "Cron action is in cron queue: [$action]" + if ( ! is_null( $args ) ) { + PHPUnit::assertFalse( + \wp_next_scheduled( $action, $args ), + "Cron action is in cron queue: [$action]" + ); + } + + PHPUnit::assertEmpty( + collect( static::get_cron_events() )->where( 'hook', $action )->all(), + "Cron action is in cron queue: [$action] (no arguments checked)", ); } /** * Determine if a cron 'action' is actually a queued job. * - * @param string $action Action name. + * @param class-string|class-string<\Mantle\Contracts\Queue\Job> $action Action name. * @return bool */ protected function is_job_action( string $action ): bool { @@ -78,6 +99,11 @@ protected function is_job_action( string $action ): bool { * @throws InvalidArgumentException Thrown for missing job class. */ public function assertJobQueued( $job, array $args = [], string $queue = null ): void { + /** + * Provider instance. + * + * @var \Mantle\Contracts\Queue\Provider + */ $provider = app( Queue_Manager::class )->get_provider(); if ( is_string( $job ) ) { @@ -88,9 +114,11 @@ public function assertJobQueued( $job, array $args = [], string $queue = null ): $job = new $job( ...$args ); } + $job_name = is_object( $job ) ? $job::class : $job; + PHPUnit::assertTrue( $provider->in_queue( $job, $queue ), - 'Job is not in the queue.' + "Job [{$job_name}] is not in the queue.", ); } @@ -107,6 +135,11 @@ public function assertJobQueued( $job, array $args = [], string $queue = null ): * @throws InvalidArgumentException Thrown for missing job class. */ public function assertJobNotQueued( $job, array $args = [], string $queue = null ): void { + /** + * Provider instance. + * + * @var \Mantle\Contracts\Queue\Provider + */ $provider = app( Queue_Manager::class )->get_provider(); if ( is_string( $job ) ) { @@ -117,9 +150,11 @@ public function assertJobNotQueued( $job, array $args = [], string $queue = null $job = new $job( ...$args ); } + $job_name = is_object( $job ) ? $job::class : $job; + PHPUnit::assertFalse( $provider->in_queue( $job, $queue ), - 'Job is in the queue.' + "Job [{$job_name}] is in the queue.", ); } @@ -170,7 +205,7 @@ public function dispatch_cron( string $action = null ) { /** * Fetches an array of scheduled cron events. * - * @return array + * @return array */ protected static function get_cron_events(): array { $crons = _get_cron_array(); @@ -227,9 +262,10 @@ protected static function run_cron_event( \stdClass $event ) { /** * Dispatch the WordPress cron queue. * + * @param int $size Size of the queue to run. * @param string $queue Queue to run. */ - public static function dispatch_queue( string $queue = null ) { - app( \Mantle\Queue\Providers\WordPress\Scheduler::class )->on_queue_run( $queue ); + public function dispatch_queue( int $size = 100, string $queue = null ): void { + $this->app->make( Worker::class )->run( $size, $queue ); } } diff --git a/tests/Database/Query/PostQueryBuilderTest.php b/tests/Database/Query/PostQueryBuilderTest.php index 759c87114..0e7f56d44 100644 --- a/tests/Database/Query/PostQueryBuilderTest.php +++ b/tests/Database/Query/PostQueryBuilderTest.php @@ -675,11 +675,6 @@ protected static function get_random_post_id( $args = [] ): int { } public function test_query_by_enum() { - // Check if enum is supported. - if ( PHP_VERSION_ID < 80100 ) { - $this->markTestSkipped( 'PHP 8.1+ is required for this test.' ); - } - $post = static::factory()->post ->with_meta( [ 'example-meta' => Testable_Meta_Values::Meta_Value_A, diff --git a/tests/Queue/WorkerTest.php b/tests/Queue/WorkerTest.php index 1d5d426ff..7bdb5aef6 100644 --- a/tests/Queue/WorkerTest.php +++ b/tests/Queue/WorkerTest.php @@ -6,9 +6,8 @@ use Mantle\Contracts\Queue\Provider; use Mantle\Queue\Queue_Service_Provider; use Mantle\Queue\Events; -use Mantle\Queue\Events\Run_Complete; -use Mantle\Queue\Events\Run_Start; use Mantle\Queue\Job; +use Mantle\Queue\Queue_Worker_Job; use Mantle\Support\Collection; use Mockery as m; use Mockery\Adapter\Phpunit\MockeryTestCase; @@ -93,24 +92,12 @@ public function test_event_listener() { ); } - public function test_closure_job() { - $job = fn () => $_SERVER['__closure_run'] = true; - - $this->app['queue']->get_provider( 'test' )->push( $job ); - - $this->app['queue.worker']->run( 1 ); - - $this->assertTrue( $_SERVER['__closure_run'] ); - - $this->app['queue.worker']->run( 1 ); - } - protected function get_mock_job( $id, $should_run = true ) { - $mock_job = m::mock( Job::class ); + $mock_job = m::mock( Queue_Worker_Job::class ); if ( $should_run ) { $mock_job->shouldReceive( 'fire' )->once()->andReturn( true ); - $mock_job->shouldReceive( 'delete' )->once(); + $mock_job->shouldReceive( 'completed' )->once(); $mock_job->shouldReceive( 'has_failed' )->once()->andReturn( false ); $mock_job->shouldNotReceive( 'failed' ); } @@ -130,7 +117,7 @@ class Testable_Provider implements Provider { * @param mixed $job Job instance. * @return bool */ - public function push( $job ) { + public function push( mixed $job ): bool { $this->jobs[] = $job; return true; } @@ -148,6 +135,16 @@ public function pop( string $queue = null, int $count = 1 ): Collection { ); } + /** + * Retrieve the number of pending jobs in the queue. + * + * @param string $queue Queue name, optional. + * @return int + */ + public function pending_count( string $queue = null ): int { + return count( $this->jobs ); + } + /** * Check if a job is in the queue. * @@ -155,7 +152,7 @@ public function pop( string $queue = null, int $count = 1 ): Collection { * @param string $queue Queue to compare against. * @return bool */ - public function in_queue( $job, string $queue = null ): bool { + public function in_queue( mixed $job, string $queue = null ): bool { return false; } } diff --git a/tests/Queue/providers/WordPressCronQueueTest.php b/tests/Queue/providers/WordPressCronQueueTest.php index 9676ccf85..f8a134a95 100644 --- a/tests/Queue/providers/WordPressCronQueueTest.php +++ b/tests/Queue/providers/WordPressCronQueueTest.php @@ -1,16 +1,20 @@ assertTrue( has_action( Scheduler::EVENT ) ); } - public function test_wordpress_queue_job_from_class_async() { + public function test_job_dispatch() { $_SERVER['__example_job'] = false; Example_Job::dispatch(); @@ -40,53 +46,74 @@ public function test_wordpress_queue_job_from_class_async() { $this->assertInCronQueue( Example_Job::class ); $this->assertFalse( $_SERVER['__example_job'] ); - // Force the cron to be dispatched which will execute - // the queued job. + // Assert that the underlying queue post exists. + $this->assertPostExists( [ + 'post_type' => Provider::OBJECT_NAME, + 'post_status' => Post_Status::PENDING, + ] ); + + // Force the cron to be dispatched which will execute the queued job. $this->dispatch_queue(); $this->assertTrue( $_SERVER['__example_job'] ); + + // Ensure that the queued job post was deleted. + $this->assertPostExists( [ + 'post_type' => Provider::OBJECT_NAME, + 'post_status' => Post_Status::COMPLETED, + ] ); } - public function test_wordpress_queue_job_from_class_sync() { + public function test_job_dispatch_now() { $this->assertNotInCronQueue( Example_Job::class ); $_SERVER['__example_job'] = false; - Example_Job::dispatch_now(); + Example_Job::dispatch_now( false ); $this->assertTrue( $_SERVER['__example_job'] ); $this->assertNotInCronQueue( Example_Job::class ); + + $this->assertPostDoesNotExists( [ + 'post_type' => Provider::OBJECT_NAME, + 'post_status' => Post_Status::class, + ] ); } - public function test_wordpress_queue_job_async_failure() { + public function test_job_failure() { $_SERVER['__failed_run'] = false; $this->app['events']->listen( Job_Failed::class, fn () => $_SERVER['__failed_run'] = true ); - $this->assertNotInCronQueue( Failable_Job::class ); + $this->assertNotInCronQueue( Job_To_Fail::class ); - Failable_Job::dispatch(); + Job_To_Fail::dispatch(); - $this->assertInCronQueue( Failable_Job::class ); + $this->assertInCronQueue( Job_To_Fail::class ); $this->dispatch_queue(); - $this->assertNotInCronQueue( Failable_Job::class ); + $this->assertNotInCronQueue( Job_To_Fail::class ); $this->assertTrue( $_SERVER['__failed_run'] ); + + // Ensure that the post does not exist. + $this->assertPostExists( [ + 'post_type' => Provider::OBJECT_NAME, + 'post_status' => Post_Status::FAILED, + ] ); } - public function test_wordpress_queue_job_from_closure_async() { + public function test_job_dispatch_anonymous() { $_SERVER['__closure_job'] = false; dispatch( function() { $_SERVER['__closure_job'] = true; } ); - $this->assertInCronQueue( Scheduler::EVENT, [ 'default' ] ); - // Assert the serialize queue post exists. $this->assertPostExists( [ 'post_type' => Provider::OBJECT_NAME, + 'post_status' => Post_Status::PENDING->value, 'meta_key' => '_mantle_queue', 'meta_value' => 'SerializableClosure', 'meta_compare' => 'LIKE', @@ -101,24 +128,29 @@ public function test_wordpress_queue_job_from_closure_async() { 'meta_key' => '_mantle_queue', 'meta_value' => 'SerializableClosure', 'meta_compare' => 'LIKE', + 'post_status' => [ + Post_Status::PENDING, + Post_Status::FAILED, + ], ] ); } - public function test_wordpress_queue_job_from_closure_async_failure() { + public function test_job_dispatch_anonymous_failure() { $_SERVER['__closure_job'] = false; $_SERVER['__failed_run'] = false; - $this->app['events']->listen( Job_Failed::class, fn () => $_SERVER['__failed_run'] = true ); - - dispatch( function() { - throw new RuntimeException( 'Something went wrong' ); - } )->catch( fn () => $_SERVER['__failed_run'] = true ); + $this->app['events']->listen( + Job_Failed::class, + fn () => $_SERVER['__failed_run'] = true, + ); - $this->assertInCronQueue( Scheduler::EVENT, [ 'default' ] ); + dispatch( + fn () => throw new RuntimeException( 'Something went wrong' ), + )->catch( fn () => $_SERVER['__failed_run'] = true ); // Assert the serialize queue post exists. $this->assertPostExists( [ - 'post_status' => 'publish', + 'post_status' => Post_Status::PENDING, 'post_type' => Provider::OBJECT_NAME, 'meta_key' => '_mantle_queue', 'meta_value' => 'SerializableClosure', @@ -131,7 +163,7 @@ public function test_wordpress_queue_job_from_closure_async_failure() { $this->assertTrue( $_SERVER['__failed_run'] ); $this->assertPostExists( [ - 'post_status' => 'failed', + 'post_status' => Post_Status::FAILED, 'post_type' => Provider::OBJECT_NAME, 'meta_key' => '_mantle_queue', 'meta_value' => 'SerializableClosure', @@ -139,6 +171,95 @@ public function test_wordpress_queue_job_from_closure_async_failure() { ] ); } + public function test_dispatch_job_delay() { + $_SERVER['__example_job'] = false; + + $start = now()->addMonth(); + + Example_Job::dispatch()->delay( $start ); + + $this->assertPostExists( [ + 'post_date' => $start->toDateTimeString(), + 'post_status' => Post_Status::PENDING, + 'post_type' => Provider::OBJECT_NAME, + 'meta_key' => '_mantle_queue', + ] ); + + $this->dispatch_queue(); + + $this->assertFalse( $_SERVER['__example_job'] ); + + $this->assertPostExists( [ + 'post_date' => $start->toDateTimeString(), + 'post_status' => Post_Status::PENDING, + 'post_type' => Provider::OBJECT_NAME, + 'meta_key' => '_mantle_queue', + ] ); + } + + public function test_schedule_multiple_queue_workers() { + $this->app['config']->set( 'queue.max_concurrent_batches', 10 ); + $this->app['config']->set( 'queue.batch_size', 10 ); + + for ( $i = 0; $i < 100; $i++ ) { + Example_Job::dispatch(); + } + + // Fire the "shutdown" event to schedule the queue jobs. + Scheduler::schedule_on_shutdown(); + + // With max_concurrent_batches set to 10 and 100 jobs dispatched, we should + // have 10 queue jobs scheduled to run. + $this->assertEquals( 10, Scheduler::get_scheduled_count() ); + + $this->dispatch_queue( 100 ); + + Scheduler::schedule_on_shutdown(); + + // Ensure the scheduled jobs are cleaned up. + $this->assertEquals( 0, Scheduler::get_scheduled_count() ); + } + + public function test_failed_job() { + $_SERVER['__failed_run'] = 0; + + Job_To_Fail::dispatch(); + + $this->assertInCronQueue( Job_To_Fail::class ); + + $this->dispatch_queue(); + + $this->assertNotInCronQueue( Job_To_Fail::class ); + $this->assertEquals( 1, $_SERVER['__failed_run'] ); + + $this->assertPostExists( [ + 'post_type' => Provider::OBJECT_NAME, + 'post_status' => Post_Status::FAILED, + ] ); + + $this->dispatch_queue(); + } + + public function test_retry_failed_job() { + $_SERVER['__failed_run'] = 0; + + Job_To_Fail_Retry::dispatch(); + + $this->assertInCronQueue( Job_To_Fail_Retry::class ); + + $this->dispatch_queue(); + + // First failure. + $this->assertEquals( 1, $_SERVER['__failed_run'] ); + $this->assertInCronQueue( Job_To_Fail_Retry::class ); + + $this->dispatch_queue(); + + // Ensure it didn't run (it should be delayed 30 seconds). + $this->assertEquals( 1, $_SERVER['__failed_run'] ); + $this->assertInCronQueue( Job_To_Fail_Retry::class ); + } + public function test_schedule_next_run_after_complete() { // Limit the queue batch size. $this->app['config']->set( 'queue.batch_size', 5 ); @@ -147,30 +268,98 @@ public function test_schedule_next_run_after_complete() { Example_Job::dispatch(); } - $this->assertInCronQueue( Scheduler::EVENT, [ 'default' ] ); + $this->assertJobQueued( Example_Job::class, [], 'default' ); - $this->dispatch_queue(); + // Ensure the next job is scheduled. + Scheduler::schedule_on_shutdown(); + $this->assertInCronQueue( Scheduler::EVENT, null ); - $this->assertInCronQueue( Scheduler::EVENT, [ 'default' ] ); + $this->dispatch_queue( 2 ); - $this->dispatch_queue(); + $this->assertJobQueued( Example_Job::class, [], 'default' ); + + // Ensure the next job is scheduled. + Scheduler::schedule_on_shutdown(); + $this->assertInCronQueue( Scheduler::EVENT, null ); - $this->assertNotInCronQueue( Scheduler::EVENT, [ 'default' ] ); + $this->dispatch_queue( 6 ); + + $this->assertJobNotQueued( Example_Job::class, [], 'default' ); + + // Ensure the next job is not scheduled. + Scheduler::schedule_on_shutdown(); + $this->assertNotInCronQueue( Scheduler::EVENT, null ); + } + + public function test_cleanup_completed_jobs() { + $this->app['config']->set( 'queue.delete_after', 60 * 60 * 24 ); + + $record = Queue_Record::create( [ + 'post_status' => Post_Status::COMPLETED->value, + 'post_date' => now()->subMonth()->format( 'Y-m-d H:i:s' ), + ] ); + + // Create a valid queue job that shouldn't be deleted. + Example_Job::dispatch(); + + + // Perform the scheduled cleanup manually. + $this->command( 'mantle queue:cleanup' ); + + // Ensure that the expired queue job was deleted. + $this->assertEmpty( get_post( $record->ID ) ); + + // Assert that the queue post still exists. + $this->assertPostExists( [ + 'post_type' => Provider::OBJECT_NAME, + 'post_status' => Post_Status::PENDING, + ] ); + + $this->assertInCronQueue( Example_Job::class ); } } class Example_Job implements Job, Can_Queue { use Queueable, Dispatchable; + public function __construct( public bool $assert = true ) {} + public function handle() { $_SERVER['__example_job'] = true; + + if ( ! $this->assert ) { + return; + } + + // Fetch the job post. + $jobs = get_posts( + [ + 'fields' => 'ids', + 'post_status' => Post_Status::RUNNING->value, + 'post_type' => Provider::OBJECT_NAME, + 'posts_per_page' => 1, + ] + ); + + Assert::assertCount( 1, $jobs ); + Assert::assertGreaterThan( \time(), get_post_meta( $jobs[0], Meta_Key::LOCK_UNTIL->value, true ) ); } } -class Failable_Job implements Job, Can_Queue { +class Job_To_Fail implements Job, Can_Queue { use Queueable, Dispatchable; public function handle() { throw new RuntimeException( 'Something went wrong' ); } + + public function failed(): void { + $_SERVER['__failed_run']++; + } +} + +class Job_To_Fail_Retry extends Job_To_Fail { + public bool $retry = true; + + public int $retry_backoff = 30; }