Skip to content

Commit

Permalink
pkp#8734 Added bulk mail sending job class and optimized bulk mail se…
Browse files Browse the repository at this point in the history
…nding
  • Loading branch information
touhidurabir committed Mar 27, 2023
1 parent be10907 commit b955992
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 52 deletions.
74 changes: 23 additions & 51 deletions api/v1/_email/PKPEmailHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
namespace PKP\API\v1\_email;

use APP\facades\Repo;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Mail;
use Illuminate\Support\Facades\Queue;
use PKP\core\APIResponse;
use PKP\core\PKPContainer;
use PKP\handler\APIHandler;
use PKP\mail\Mailable;
use PKP\jobs\bulk\BulkEmailSender;
use PKP\security\authorization\PolicySet;
use PKP\security\authorization\RoleBasedHandlerOperationPolicy;
use PKP\security\authorization\UserRolesRequiredPolicy;
Expand All @@ -33,9 +30,6 @@

class PKPEmailHandler extends APIHandler
{
/** @var int Number of emails to send in each job */
public const EMAILS_PER_JOB = 100;

/**
* Constructor
*/
Expand Down Expand Up @@ -146,42 +140,33 @@ public function create(ServerRequestInterface $slimRequest, APIResponse $respons

$userIds = Repo::user()->getCollector()
->filterByContextIds([$contextId])
->filterByUserGroupIds(['userGroupIds'])
->filterByUserGroupIds($params['userGroupIds'])
->getIds()
->toArray();

$subject = $params['subject'];
$body = $params['body'];
$fromEmail = $context->getData('contactEmail');
$fromName = $context->getData('contactName');
$queueId = 'email_' . uniqid();

if (!empty($params['copy'])) {
$currentUserId = $this->getRequest()->getUser()->getId();
if (!in_array($currentUserId, $userIds)) {
$userIds[] = $currentUserId;
}
}

$batches = array_chunk($userIds, self::EMAILS_PER_JOB);
$queueId = 'email_' . uniqid();
$batches = array_chunk($userIds, BulkEmailSender::EMAILS_PER_JOB);

foreach ($batches as $userIds) {
Queue::push(function () use ($userIds, $contextId, $subject, $body, $fromEmail, $fromName) {
$users = Repo::user()->getCollector()
->filterByContextIds([$contextId])
->filterByUserIds($userIds)
->getMany();

foreach ($users as $user) {
$mailable = new Mailable();
$mailable
->from($fromEmail, $fromName)
->to($user->getEmail(), $user->getFullName())
->subject($subject)
->body($body);

Mail::send($mailable);
}
}, [], $queueId);
Queue::push(
new BulkEmailSender(
$userIds,
$contextId,
$params['subject'],
$params['body'],
$context->getData('contactEmail'),
$context->getData('contactName')
),
[],
$queueId
);
}

return $response->withJson([
Expand All @@ -199,38 +184,25 @@ public function create(ServerRequestInterface $slimRequest, APIResponse $respons
*/
public function process(ServerRequestInterface $slimRequest, APIResponse $response, array $args)
{
$countRunning = DB::table('jobs')
->where('queue', $args['queueId'])
->whereNotNull('reserved_at')
->count();
$countPending = $this->countPending($args['queueId']);
$countRunning = Repo::job()->getRunningJobCount($args['queueId']);
$countPending = Repo::job()->getPendingJobCount($args['queueId']);
$jobQueue = app('pkpJobQueue'); /** @var \PKP\core\PKPQueueProvider $jobQueue */

// Don't run another job if one is already running.
// This should ensure jobs are run one after the other and
// prevent long-running jobs from running simultaneously
// and piling onto the server like a DDOS attack.
if (!$countRunning && $countPending) {
$laravelContainer = PKPContainer::getInstance();
$options = new WorkerOptions();
$laravelContainer['queue.worker']->runNextJob('database', $args['queueId'], $options);

$jobQueue->forQueue($args['queueId'])->runJobInQueue();

// Update count of pending jobs
$countPending = $this->countPending($args['queueId']);
$countPending = Repo::job()->getPendingJobCount($args['queueId']);
}

return $response->withJson([
'pendingJobs' => $countPending,
], 200);
}

/**
* Return a count of the pending jobs in a given queue
*
*/
protected function countPending(string $queueId): int
{
return DB::table('jobs')
->where('queue', $queueId)
->count();
}
}
2 changes: 1 addition & 1 deletion classes/core/PKPQueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public function runJobInQueue(): void

$laravelContainer['queue.worker']->runNextJob(
'database',
$job->queue,
$job->queue ?? Config::getVar('queues', 'default_queue', 'queue'),
$this->getWorkerOptions()
);
}
Expand Down
23 changes: 23 additions & 0 deletions classes/job/repositories/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,29 @@ public function total(): int
->count();
}

public function getRunningJobCount(string $queue = null): int
{
$query = $this->model;

if ($queue) {
$query = $query->onQueue($queue);
}

return $query->whereNotNull('reserved_at')->count();
}

public function getPendingJobCount(string $queue = null): int
{
if (!$queue) {
return $this->total();
}

return $this->model
->onQueue($queue)
->nonReserved()
->count();
}

public function showJobs(): LengthAwarePaginator
{
$currentPage = LengthAwarePaginator::resolveCurrentPage();
Expand Down
96 changes: 96 additions & 0 deletions jobs/bulk/BulkEmailSender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?php

/**
* @file jobs/bulk/BulkEmailSender.php
*
* Copyright (c) 2014-2023 Simon Fraser University
* Copyright (c) 2000-2023 John Willinsky
* Distributed under the GNU GPL v3. For full terms see the file docs/COPYING.
*
* @class BulkEmailSender
* @ingroup jobs
*
* @brief Job to send bulk emails
*/

namespace PKP\jobs\bulk;

use APP\facades\Repo;
use Illuminate\Support\Facades\Mail;
use PKP\mail\Mailable;
use PKP\jobs\BaseJob;

class BulkEmailSender extends BaseJob
{
/**
* Number of emails to send in each job
*
* @var int
*/
public const EMAILS_PER_JOB = 100;

/**
* The user ids to send email
*/
protected array $userIds;

/**
* The associated context id
*/
protected int $contextId;

/**
* Mail subject
*/
protected string $subject;

/**
* Mail body
*/
protected string $body;

/**
* From email to send mail
*/
protected object|array|string $fromEmail;

/**
* From name to send mail
*/
protected mixed $fromName;

/**
* Create a new job instance.
*/
public function __construct(array $userIds, int $contextId, string $subject, string $body, object|array|string $fromEmail, mixed $fromName)
{
parent::__construct();

$this->userIds = $userIds;
$this->contextId = $contextId;
$this->subject = $subject;
$this->body = $body;
$this->fromEmail = $fromEmail;
$this->fromName = $fromName;
}

public function handle()
{
$users = Repo::user()
->getCollector()
->filterByContextIds([$this->contextId])
->filterByUserIds($this->userIds)
->getMany();

foreach ($users as $user) {
$mailable = new Mailable();
$mailable
->from($this->fromEmail, $this->fromName)
->to($user->getEmail(), $user->getFullName())
->subject($this->subject)
->body($this->body);

Mail::send($mailable);
}
}
}

0 comments on commit b955992

Please sign in to comment.