Skip to content

Commit

Permalink
Preserve queue job items and cleanup after some time (#472)
Browse files Browse the repository at this point in the history
* Continued work on queue UI=

* Wrapping up the core of the queue UI

* Working table for queue UI

* Adding counts to the queue

* Switch to use wp_count_posts for performance

* Style the row table

* Adding single view for queue job

* Adding queue UI with fixes for retrying

* Serialize enums when asserting against them

* Keep queue job around after it is complete for logging

* Fixing tests and adjusting comment

* Fixing linting

* Stubbing the cleanup command

* Cleanup the processed/failed jobs after some time

* Fixing phpcs issue
  • Loading branch information
srtfisher authored Nov 29, 2023
1 parent c4b945b commit 480ece0
Show file tree
Hide file tree
Showing 24 changed files with 267 additions and 96 deletions.
10 changes: 10 additions & 0 deletions config/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
*/
'max_concurrent_batches' => environment( 'QUEUE_MAX_CONCURRENT_BATCHES', 1 ),

/*
|--------------------------------------------------------------------------
| Delete failed or processed queue items after a set time
|--------------------------------------------------------------------------
|
| Delete failed or processed queue items after a set time in seconds.
|
*/
'delete_after' => environment( 'QUEUE_DELETE_AFTER', 60 * 60 * 24 * 7 ),

/*
|--------------------------------------------------------------------------
| Enable the Queue Admin Interface
Expand Down
2 changes: 1 addition & 1 deletion src/mantle/application/class-app-service-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function __construct( Application $app ) {
*/
protected function boot_scheduler() {
$this->app->singleton(
Schedule::class,
'scheduler',
fn ( $app ) => tap(
new Schedule( $app ),
fn ( Schedule $schedule ) => $this->schedule( $schedule ),
Expand Down
1 change: 1 addition & 0 deletions src/mantle/application/class-application.php
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ protected function register_core_aliases() {
'request' => [ \Mantle\Http\Request::class, \Symfony\Component\HttpFoundation\Request::class ],
'router' => [ \Mantle\Http\Routing\Router::class, \Mantle\Contracts\Http\Routing\Router::class ],
'router.entity' => [ \Mantle\Http\Routing\Entity_Router::class, \Mantle\Contracts\Http\Routing\Entity_Router::class ],
'scheduler' => [ \Mantle\Scheduling\Schedule::class ],
'url' => [ \Mantle\Http\Routing\Url_Generator::class, \Mantle\Contracts\Http\Routing\Url_Generator::class ],
'view.loader' => [ \Mantle\Http\View\View_Finder::class, \Mantle\Contracts\Http\View\View_Finder::class ],
'view' => [ \Mantle\Http\View\Factory::class, \Mantle\Contracts\Http\View\Factory::class ],
Expand Down
2 changes: 2 additions & 0 deletions src/mantle/database/model/class-model.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,14 @@ public function refresh() {
}

$instance = static::find( $this->get( 'id' ) );

if ( ! $instance ) {
return null;
}

$this->exists = true;
$this->set_raw_attributes( $instance->get_raw_attributes() );

return $this;
}

Expand Down
10 changes: 5 additions & 5 deletions src/mantle/database/query/class-builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,8 @@ public function chunk_by_id( int $count, callable $callback, string $attribute =
/**
* Execute a callback over each item while chunking.
*
* @param callable(\Mantle\Support\Collection<int, TModel>): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param callable(TModel): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @return boolean
*/
public function each( callable $callback, int $count = 100 ) {
Expand All @@ -866,9 +866,9 @@ public function each( callable $callback, int $count = 100 ) {
/**
* Execute a callback over each item while chunking by ID.
*
* @param callable(\Mantle\Support\Collection<int, TModel>): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param string $attribute Attribute to chunk by.
* @param callable(TModel): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param string $attribute Attribute to chunk by.
* @return boolean
*/
public function each_by_id( callable $callback, int $count = 100, string $attribute = 'id' ) {
Expand Down
2 changes: 1 addition & 1 deletion src/mantle/database/query/class-post-query-builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereId( int $id )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereName( string $name )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereSlug( string $slug )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereStatus( string $status )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereStatus( string[]|string $status )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereTitle( string $title )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereType( string $type )
*/
Expand Down
4 changes: 3 additions & 1 deletion src/mantle/queue/class-queue-service-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Mantle\Queue;

use Mantle\Contracts\Queue\Queue_Manager as Queue_Manager_Contract;
use Mantle\Queue\Console\Cleanup_Jobs_Command;
use Mantle\Queue\Console\Run_Command;
use Mantle\Queue\Dispatcher;
use Mantle\Queue\Queue_Manager;
Expand All @@ -32,7 +33,7 @@ public function register() {
$this->app->singleton_if(
'queue',
fn ( $app ) => tap(
// Register the Queue Manager with the supported providers when invoked.
// Register the Queue Manager with the supported providers when resolved.
new Queue_Manager( $app ),
fn ( Queue_Manager $manager ) => $this->register_providers( $manager ),
),
Expand All @@ -49,6 +50,7 @@ public function register() {
);

// Register queue console commands.
$this->add_command( Cleanup_Jobs_Command::class );
$this->add_command( Run_Command::class );

// Register the queue service providers.
Expand Down
7 changes: 7 additions & 0 deletions src/mantle/queue/class-queue-worker-job.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ abstract public function get_job(): mixed;
*/
abstract public function failed( Throwable $e ): void;

/**
* Handle a completed job.
*
* @return void
*/
abstract public function completed(): void;

/**
* Retry a job with a specified delay.
*
Expand Down
3 changes: 1 addition & 2 deletions src/mantle/queue/class-worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ function( Queue_Worker_Job $job ) use ( $provider ) {

$this->events->dispatch( new Job_Failed( $provider, $job, $e ) );
} finally {
// TODO: Don't delete after completion.
if ( ! $job->has_failed() ) {
$job->delete();
$job->completed();
} elseif ( $job->can_retry() ) {
$job->retry( $job->get_retry_backoff() );
}
Expand Down
49 changes: 49 additions & 0 deletions src/mantle/queue/console/class-cleanup-jobs-command.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php
/**
* Cleanup_Jobs_Commands class file.
*
* @package Mantle
*/

namespace Mantle\Queue\Console;

use Mantle\Console\Command;
use Mantle\Contracts\Container;
use Mantle\Queue\Events\Job_Processed;
use Mantle\Queue\Events\Job_Processing;
use Mantle\Queue\Events\Run_Complete;
use Mantle\Queue\Events\Run_Start;
use Mantle\Queue\Providers\WordPress\Post_Status;
use Mantle\Queue\Providers\WordPress\Queue_Record;

/**
* Queue Cleanup Command
*/
class Cleanup_Jobs_Command extends Command {
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:cleanup';
/**
* Command Description.
*
* @var string
*/
protected $description = 'Cleanup old queue jobs.';

/**
* Command action.
*/
public function handle() {
Queue_Record::query()
->whereStatus( [ Post_Status::FAILED->value, Post_Status::COMPLETED->value ] )
->olderThan( now()->subSeconds( (int) $this->container['config']->get( 'queue.delete_after', 60 ) ) )
->take( -1 )
->each_by_id(
fn ( Queue_Record $record ) => $record->delete( true ),
100,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Mantle\Queue\Providers\WordPress\Admin;

use Mantle\Queue\Providers\WordPress\Post_Status;
use Mantle\Queue\Providers\WordPress\Queue_Job;
use Mantle\Queue\Providers\WordPress\Queue_Record;
use Mantle\Queue\Providers\WordPress\Queue_Worker_Job;

/**
Expand Down Expand Up @@ -37,7 +37,7 @@ public function render(): void {
* @param int $job_id The job ID.
*/
protected function render_single_job( int $job_id ): void {
$job = Queue_Job::find( $job_id );
$job = Queue_Record::find( $job_id );

if ( empty( $job ) ) {
wp_die( esc_html__( 'Invalid job ID.', 'mantle' ) );
Expand All @@ -60,7 +60,7 @@ protected function render_action( int $job_id ): void {
}

$action = sanitize_text_field( wp_unslash( $_GET['action'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended
$job = Queue_Job::find( $job_id );
$job = Queue_Record::find( $job_id );
$message = '';

if ( empty( $job ) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Mantle\Database\Query\Post_Query_Builder;
use Mantle\Queue\Providers\WordPress\Post_Status;
use Mantle\Queue\Providers\WordPress\Provider;
use Mantle\Queue\Providers\WordPress\Queue_Job;
use Mantle\Queue\Providers\WordPress\Queue_Record;
use Mantle\Queue\Providers\WordPress\Queue_Worker_Job;
use WP_List_Table;

Expand Down Expand Up @@ -126,7 +126,7 @@ public function prepare_items() {
$active_queue_filter = sanitize_text_field( wp_unslash( $_GET['queue'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended
$page = (int) ( $_GET['paged'] ?? 1 ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended, WordPress.Security.ValidatedSanitizedInput.InputNotSanitized

$query = Queue_Job::query()
$query = Queue_Record::query()
->orderBy( 'date', 'asc' )
// Allow the query to be filtered by status.
->when(
Expand All @@ -146,7 +146,7 @@ public function prepare_items() {

// TODO: Refactor with found_posts later.
$this->items = $query->get()->map(
function ( Queue_Job $model ) {
function ( Queue_Record $model ) {
$worker = new Queue_Worker_Job( $model );
$job = $worker->get_job();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* phpcs:disable WordPress.NamingConventions.PrefixAllGlobals.NonPrefixedVariableFound
*
* @package Mantle
* @var \Mantle\Queue\Providers\WordPress\Queue_Job $job The queue job.
* @var \Mantle\Queue\Providers\WordPress\Queue_Record $job The queue job.
*/

use Carbon\Carbon;
Expand Down
20 changes: 9 additions & 11 deletions src/mantle/queue/providers/wordpress/class-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

/**
* WordPress Cron Queue Provider
*
* @todo Add support for one off cron events that should have their own cron scheduled.
*/
class Provider implements Provider_Contract {
/**
Expand Down Expand Up @@ -123,7 +121,7 @@ public function push( mixed $job ): bool {
->lower()
->slug();

$object = new Queue_Job(
$object = new Queue_Record(
[
'post_name' => "mantle_queue_{$job_name}_" . time(),
'post_status' => Post_Status::PENDING->value,
Expand Down Expand Up @@ -181,13 +179,13 @@ public function pop( string $queue = null, int $count = 1 ): Collection {
->take( $count * $max_concurrent_batches )
->get()
// Filter out any jobs that are locked.
->filter( fn ( Queue_Job $job ) => ! $job->is_locked() )
->filter( fn ( Queue_Record $record ) => ! $record->is_locked() )
->map(
fn ( Queue_Job $model ) => tap(
new Queue_Worker_Job( $model ),
fn ( Queue_Record $record ) => tap(
new Queue_Worker_Job( $record ),
// Lock the job until the configured timeout or 10 minutes.
fn ( Queue_Worker_Job $queue_job ) => $model->set_lock_until(
$queue_job->get_job()->timeout ?? 600
fn ( Queue_Worker_Job $job ) => $record->set_lock_until(
$job->get_job()->timeout ?? 600
),
),
)
Expand All @@ -209,10 +207,10 @@ public function pending_count( string $queue = null ): int {
* Construct the query builder for the queue.
*
* @param string|null $queue Queue name, optional.
* @return Post_Query_Builder<Queue_Job>
* @return Post_Query_Builder<Queue_Record>
*/
protected function query( string $queue = null ): Post_Query_Builder {
return Queue_Job::where( 'post_status', Post_Status::PENDING->value )
return Queue_Record::where( 'post_status', Post_Status::PENDING->value )
->whereTerm( static::get_queue_term_id( $queue ), static::OBJECT_NAME )
->orderBy( 'post_date', 'asc' );
}
Expand All @@ -225,7 +223,7 @@ protected function query( string $queue = null ): Post_Query_Builder {
* @return bool
*/
public function in_queue( mixed $job, string $queue = null ): bool {
return Queue_Job::where( 'post_status', Post_Status::PENDING->value )
return Queue_Record::where( 'post_status', Post_Status::PENDING->value )
->whereDate( now()->toDateTimeString(), '>=' )
->whereTerm( static::get_queue_term_id( $queue ), static::OBJECT_NAME )
->whereMeta( Meta_Key::JOB->value, maybe_serialize( $job ) )
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
<?php
/**
* Queue_Job class file
* Queue_ class file
*
* @package Mantle
*/

namespace Mantle\Queue\Providers\WordPress;

use Mantle\Database\Model\Post;
use WordPressCS\WordPress\Sniffs\CodeAnalysis\EmptyStatementSniff;

use function Mantle\Support\Helpers\collect;

/**
* Queue Job Data Model (for internal use only).
*
* @access private
* Queue Job Record
*/
class Queue_Job extends Post {
class Queue_Record extends Post {
/**
* Post type for the model.
*
Expand Down
25 changes: 23 additions & 2 deletions src/mantle/queue/providers/wordpress/class-queue-worker-job.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

/**
* WordPress Cron Queue Job
*
* Class to perform the actual queue job.
*/
class Queue_Worker_Job extends \Mantle\Queue\Queue_Worker_Job {

Expand All @@ -30,9 +32,9 @@ class Queue_Worker_Job extends \Mantle\Queue\Queue_Worker_Job {
/**
* Constructor.
*
* @param Queue_Job $model The job model used for storage.
* @param Queue_Record $model The queue record.
*/
public function __construct( protected Queue_Job $model ) {}
public function __construct( protected Queue_Record $model ) {}

/**
* Fire the job.
Expand Down Expand Up @@ -115,6 +117,25 @@ public function failed( Throwable $e ): void {
}
}

/**
* Handle a completed job.
*
* @return void
*/
public function completed(): void {
$this->model->save(
[
'post_status' => Post_Status::COMPLETED->value,
]
);

$job = $this->get_job();

if ( method_exists( $job, 'completed' ) ) {
$job->completed();
}
}

/**
* Delete the job from the queue.
*/
Expand Down
Loading

0 comments on commit 480ece0

Please sign in to comment.