Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve queue job items and cleanup after some time #472

Merged
merged 18 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
*/
'max_concurrent_batches' => environment( 'QUEUE_MAX_CONCURRENT_BATCHES', 1 ),

/*
|--------------------------------------------------------------------------
| Delete failed or processed queue items after a set time
|--------------------------------------------------------------------------
|
| Delete failed or processed queue items after a set time in seconds.
|
*/
'delete_after' => environment( 'QUEUE_DELETE_AFTER', 60 * 60 * 24 * 7 ),

/*
|--------------------------------------------------------------------------
| Enable the Queue Admin Interface
Expand Down
2 changes: 1 addition & 1 deletion src/mantle/application/class-app-service-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function __construct( Application $app ) {
*/
protected function boot_scheduler() {
$this->app->singleton(
Schedule::class,
'scheduler',
fn ( $app ) => tap(
new Schedule( $app ),
fn ( Schedule $schedule ) => $this->schedule( $schedule ),
Expand Down
1 change: 1 addition & 0 deletions src/mantle/application/class-application.php
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ protected function register_core_aliases() {
'request' => [ \Mantle\Http\Request::class, \Symfony\Component\HttpFoundation\Request::class ],
'router' => [ \Mantle\Http\Routing\Router::class, \Mantle\Contracts\Http\Routing\Router::class ],
'router.entity' => [ \Mantle\Http\Routing\Entity_Router::class, \Mantle\Contracts\Http\Routing\Entity_Router::class ],
'scheduler' => [ \Mantle\Scheduling\Schedule::class ],
'url' => [ \Mantle\Http\Routing\Url_Generator::class, \Mantle\Contracts\Http\Routing\Url_Generator::class ],
'view.loader' => [ \Mantle\Http\View\View_Finder::class, \Mantle\Contracts\Http\View\View_Finder::class ],
'view' => [ \Mantle\Http\View\Factory::class, \Mantle\Contracts\Http\View\Factory::class ],
Expand Down
2 changes: 2 additions & 0 deletions src/mantle/database/model/class-model.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,14 @@ public function refresh() {
}

$instance = static::find( $this->get( 'id' ) );

if ( ! $instance ) {
return null;
}

$this->exists = true;
$this->set_raw_attributes( $instance->get_raw_attributes() );

return $this;
}

Expand Down
10 changes: 5 additions & 5 deletions src/mantle/database/query/class-builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,8 @@ public function chunk_by_id( int $count, callable $callback, string $attribute =
/**
* Execute a callback over each item while chunking.
*
* @param callable(\Mantle\Support\Collection<int, TModel>): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param callable(TModel): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @return boolean
*/
public function each( callable $callback, int $count = 100 ) {
Expand All @@ -866,9 +866,9 @@ public function each( callable $callback, int $count = 100 ) {
/**
* Execute a callback over each item while chunking by ID.
*
* @param callable(\Mantle\Support\Collection<int, TModel>): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param string $attribute Attribute to chunk by.
* @param callable(TModel): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param string $attribute Attribute to chunk by.
* @return boolean
*/
public function each_by_id( callable $callback, int $count = 100, string $attribute = 'id' ) {
Expand Down
2 changes: 1 addition & 1 deletion src/mantle/database/query/class-post-query-builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereId( int $id )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereName( string $name )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereSlug( string $slug )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereStatus( string $status )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereStatus( string[]|string $status )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereTitle( string $title )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereType( string $type )
*/
Expand Down
4 changes: 3 additions & 1 deletion src/mantle/queue/class-queue-service-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Mantle\Queue;

use Mantle\Contracts\Queue\Queue_Manager as Queue_Manager_Contract;
use Mantle\Queue\Console\Cleanup_Jobs_Command;
use Mantle\Queue\Console\Run_Command;
use Mantle\Queue\Dispatcher;
use Mantle\Queue\Queue_Manager;
Expand All @@ -32,7 +33,7 @@ public function register() {
$this->app->singleton_if(
'queue',
fn ( $app ) => tap(
// Register the Queue Manager with the supported providers when invoked.
// Register the Queue Manager with the supported providers when resolved.
new Queue_Manager( $app ),
fn ( Queue_Manager $manager ) => $this->register_providers( $manager ),
),
Expand All @@ -49,6 +50,7 @@ public function register() {
);

// Register queue console commands.
$this->add_command( Cleanup_Jobs_Command::class );
$this->add_command( Run_Command::class );

// Register the queue service providers.
Expand Down
7 changes: 7 additions & 0 deletions src/mantle/queue/class-queue-worker-job.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ abstract public function get_job(): mixed;
*/
abstract public function failed( Throwable $e ): void;

/**
* Handle a completed job.
*
* @return void
*/
abstract public function completed(): void;

/**
* Retry a job with a specified delay.
*
Expand Down
3 changes: 1 addition & 2 deletions src/mantle/queue/class-worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ function( Queue_Worker_Job $job ) use ( $provider ) {

$this->events->dispatch( new Job_Failed( $provider, $job, $e ) );
} finally {
// TODO: Don't delete after completion.
if ( ! $job->has_failed() ) {
$job->delete();
$job->completed();
} elseif ( $job->can_retry() ) {
$job->retry( $job->get_retry_backoff() );
}
Expand Down
49 changes: 49 additions & 0 deletions src/mantle/queue/console/class-cleanup-jobs-command.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php
/**
* Cleanup_Jobs_Commands class file.
*
* @package Mantle
*/

namespace Mantle\Queue\Console;

use Mantle\Console\Command;
use Mantle\Contracts\Container;
use Mantle\Queue\Events\Job_Processed;
use Mantle\Queue\Events\Job_Processing;
use Mantle\Queue\Events\Run_Complete;
use Mantle\Queue\Events\Run_Start;
use Mantle\Queue\Providers\WordPress\Post_Status;
use Mantle\Queue\Providers\WordPress\Queue_Record;

/**
* Queue Cleanup Command
*/
class Cleanup_Jobs_Command extends Command {
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:cleanup';
/**
* Command Description.
*
* @var string
*/
protected $description = 'Cleanup old queue jobs.';

/**
* Command action.
*/
public function handle() {
Queue_Record::query()
->whereStatus( [ Post_Status::FAILED->value, Post_Status::COMPLETED->value ] )
->olderThan( now()->subSeconds( (int) $this->container['config']->get( 'queue.delete_after', 60 ) ) )
->take( -1 )
->each_by_id(
fn ( Queue_Record $record ) => $record->delete( true ),
100,
);
}
Comment on lines +1 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class Cleanup_Jobs_Command is well-structured and follows good object-oriented design principles. It extends the base Command class and defines properties and a method that are clear in their purpose. The handle method uses a fluent interface to build a query, which is a common and readable way to interact with models in modern PHP frameworks.

However, there are a few points to consider:

  • The use of PHP 8's enums (Post_Status::FAILED->value, Post_Status::COMPLETED->value) is a good practice for code clarity and maintainability. Ensure that the rest of the codebase is compatible with PHP 8 features if they are being used here.
  • The olderThan method seems to be a custom addition to the query builder. Ensure that this method is properly defined and tested in the Queue_Record model or query builder class.
  • The take(-1) method call is unusual. Typically, take is used to limit the number of records returned. If -1 is intended to indicate "all records," this should be clearly documented, as it's not a standard convention. If it's a mistake, it should be corrected.
  • The lambda function fn (Queue_Record $record) => $record->delete(true) within each_by_id is concise and takes advantage of PHP 8's arrow functions. Ensure that the delete method on Queue_Record accepts a boolean parameter and that its behavior is well-documented, especially what passing true signifies.
  • There is no error handling in the handle method. Consider what should happen if the deletion fails for one or more Queue_Record instances. Should the command fail entirely, log an error, or continue with the next record? This depends on the desired behavior and robustness of the command.
  • The command retrieves the delete_after configuration value. Ensure that this configuration option is documented and validated elsewhere in the codebase.

Here's a potential update to the take method if -1 is not intended:

- ->take(-1)
+ ->take($this->container['config']->get('queue.cleanup_batch_size', 100))

This change assumes that there's a configuration setting named queue.cleanup_batch_size that controls how many records should be processed at a time.


Commitable suggestion

[!IMPORTANT]
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
<?php
/**
* Cleanup_Jobs_Commands class file.
*
* @package Mantle
*/
namespace Mantle\Queue\Console;
use Mantle\Console\Command;
use Mantle\Contracts\Container;
use Mantle\Queue\Events\Job_Processed;
use Mantle\Queue\Events\Job_Processing;
use Mantle\Queue\Events\Run_Complete;
use Mantle\Queue\Events\Run_Start;
use Mantle\Queue\Providers\WordPress\Post_Status;
use Mantle\Queue\Providers\WordPress\Queue_Record;
/**
* Queue Cleanup Command
*/
class Cleanup_Jobs_Command extends Command {
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:cleanup';
/**
* Command Description.
*
* @var string
*/
protected $description = 'Cleanup old queue jobs.';
/**
* Command action.
*/
public function handle() {
Queue_Record::query()
->whereStatus( [ Post_Status::FAILED->value, Post_Status::COMPLETED->value ] )
->olderThan( now()->subSeconds( (int) $this->container['config']->get( 'queue.delete_after', 60 ) ) )
->take( -1 )
->each_by_id(
fn ( Queue_Record $record ) => $record->delete( true ),
100,
);
}
/**
* Command action.
*/
public function handle() {
Queue_Record::query()
->whereStatus( [ Post_Status::FAILED->value, Post_Status::COMPLETED->value ] )
->olderThan( now()->subSeconds( (int) $this->container['config']->get( 'queue.delete_after', 60 ) ) )
->take($this->container['config']->get('queue.cleanup_batch_size', 100))
->each_by_id(
fn ( Queue_Record $record ) => $record->delete( true ),
100,
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Mantle\Queue\Providers\WordPress\Admin;

use Mantle\Queue\Providers\WordPress\Post_Status;
use Mantle\Queue\Providers\WordPress\Queue_Job;
use Mantle\Queue\Providers\WordPress\Queue_Record;
use Mantle\Queue\Providers\WordPress\Queue_Worker_Job;

/**
Expand Down Expand Up @@ -37,7 +37,7 @@ public function render(): void {
* @param int $job_id The job ID.
*/
protected function render_single_job( int $job_id ): void {
$job = Queue_Job::find( $job_id );
$job = Queue_Record::find( $job_id );

if ( empty( $job ) ) {
wp_die( esc_html__( 'Invalid job ID.', 'mantle' ) );
Expand All @@ -60,7 +60,7 @@ protected function render_action( int $job_id ): void {
}

$action = sanitize_text_field( wp_unslash( $_GET['action'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended
$job = Queue_Job::find( $job_id );
$job = Queue_Record::find( $job_id );
$message = '';

if ( empty( $job ) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Mantle\Database\Query\Post_Query_Builder;
use Mantle\Queue\Providers\WordPress\Post_Status;
use Mantle\Queue\Providers\WordPress\Provider;
use Mantle\Queue\Providers\WordPress\Queue_Job;
use Mantle\Queue\Providers\WordPress\Queue_Record;
use Mantle\Queue\Providers\WordPress\Queue_Worker_Job;
use WP_List_Table;

Expand Down Expand Up @@ -126,7 +126,7 @@ public function prepare_items() {
$active_queue_filter = sanitize_text_field( wp_unslash( $_GET['queue'] ?? '' ) ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended
$page = (int) ( $_GET['paged'] ?? 1 ); // phpcs:ignore WordPress.Security.NonceVerification.Recommended, WordPress.Security.ValidatedSanitizedInput.InputNotSanitized

$query = Queue_Job::query()
$query = Queue_Record::query()
->orderBy( 'date', 'asc' )
// Allow the query to be filtered by status.
->when(
Expand All @@ -146,7 +146,7 @@ public function prepare_items() {

// TODO: Refactor with found_posts later.
$this->items = $query->get()->map(
function ( Queue_Job $model ) {
function ( Queue_Record $model ) {
$worker = new Queue_Worker_Job( $model );
$job = $worker->get_job();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* phpcs:disable WordPress.NamingConventions.PrefixAllGlobals.NonPrefixedVariableFound
*
* @package Mantle
* @var \Mantle\Queue\Providers\WordPress\Queue_Job $job The queue job.
* @var \Mantle\Queue\Providers\WordPress\Queue_Record $job The queue job.
*/

use Carbon\Carbon;
Expand Down
20 changes: 9 additions & 11 deletions src/mantle/queue/providers/wordpress/class-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

/**
* WordPress Cron Queue Provider
*
* @todo Add support for one off cron events that should have their own cron scheduled.
*/
class Provider implements Provider_Contract {
/**
Expand Down Expand Up @@ -123,7 +121,7 @@ public function push( mixed $job ): bool {
->lower()
->slug();

$object = new Queue_Job(
$object = new Queue_Record(
[
'post_name' => "mantle_queue_{$job_name}_" . time(),
'post_status' => Post_Status::PENDING->value,
Expand Down Expand Up @@ -181,13 +179,13 @@ public function pop( string $queue = null, int $count = 1 ): Collection {
->take( $count * $max_concurrent_batches )
->get()
// Filter out any jobs that are locked.
->filter( fn ( Queue_Job $job ) => ! $job->is_locked() )
->filter( fn ( Queue_Record $record ) => ! $record->is_locked() )
->map(
fn ( Queue_Job $model ) => tap(
new Queue_Worker_Job( $model ),
fn ( Queue_Record $record ) => tap(
new Queue_Worker_Job( $record ),
// Lock the job until the configured timeout or 10 minutes.
fn ( Queue_Worker_Job $queue_job ) => $model->set_lock_until(
$queue_job->get_job()->timeout ?? 600
fn ( Queue_Worker_Job $job ) => $record->set_lock_until(
$job->get_job()->timeout ?? 600
),
),
)
Expand All @@ -209,10 +207,10 @@ public function pending_count( string $queue = null ): int {
* Construct the query builder for the queue.
*
* @param string|null $queue Queue name, optional.
* @return Post_Query_Builder<Queue_Job>
* @return Post_Query_Builder<Queue_Record>
*/
protected function query( string $queue = null ): Post_Query_Builder {
return Queue_Job::where( 'post_status', Post_Status::PENDING->value )
return Queue_Record::where( 'post_status', Post_Status::PENDING->value )
->whereTerm( static::get_queue_term_id( $queue ), static::OBJECT_NAME )
->orderBy( 'post_date', 'asc' );
}
Expand All @@ -225,7 +223,7 @@ protected function query( string $queue = null ): Post_Query_Builder {
* @return bool
*/
public function in_queue( mixed $job, string $queue = null ): bool {
return Queue_Job::where( 'post_status', Post_Status::PENDING->value )
return Queue_Record::where( 'post_status', Post_Status::PENDING->value )
->whereDate( now()->toDateTimeString(), '>=' )
->whereTerm( static::get_queue_term_id( $queue ), static::OBJECT_NAME )
->whereMeta( Meta_Key::JOB->value, maybe_serialize( $job ) )
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
<?php
/**
* Queue_Job class file
* Queue_ class file
*
* @package Mantle
*/

namespace Mantle\Queue\Providers\WordPress;

use Mantle\Database\Model\Post;
use WordPressCS\WordPress\Sniffs\CodeAnalysis\EmptyStatementSniff;

use function Mantle\Support\Helpers\collect;

/**
* Queue Job Data Model (for internal use only).
*
* @access private
* Queue Job Record
*/
class Queue_Job extends Post {
class Queue_Record extends Post {
/**
* Post type for the model.
*
Expand Down
25 changes: 23 additions & 2 deletions src/mantle/queue/providers/wordpress/class-queue-worker-job.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

/**
* WordPress Cron Queue Job
*
* Class to perform the actual queue job.
*/
class Queue_Worker_Job extends \Mantle\Queue\Queue_Worker_Job {

Expand All @@ -30,9 +32,9 @@ class Queue_Worker_Job extends \Mantle\Queue\Queue_Worker_Job {
/**
* Constructor.
*
* @param Queue_Job $model The job model used for storage.
* @param Queue_Record $model The queue record.
*/
public function __construct( protected Queue_Job $model ) {}
public function __construct( protected Queue_Record $model ) {}

/**
* Fire the job.
Expand Down Expand Up @@ -115,6 +117,25 @@ public function failed( Throwable $e ): void {
}
}

/**
* Handle a completed job.
*
* @return void
*/
public function completed(): void {
$this->model->save(
[
'post_status' => Post_Status::COMPLETED->value,
]
);

$job = $this->get_job();

if ( method_exists( $job, 'completed' ) ) {
$job->completed();
}
}

/**
* Delete the job from the queue.
*/
Expand Down
Loading