From b955992f658b43f819943d016e5ee669dc550464 Mon Sep 17 00:00:00 2001 From: Touhidur Rahman Date: Mon, 20 Mar 2023 19:23:00 +0600 Subject: [PATCH 1/4] pkp/pkp-lib#8734 Added bulk mail sending job class and optimized bulk mail sending --- api/v1/_email/PKPEmailHandler.php | 74 ++++++++---------------- classes/core/PKPQueueProvider.php | 2 +- classes/job/repositories/Job.php | 23 ++++++++ jobs/bulk/BulkEmailSender.php | 96 +++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 52 deletions(-) create mode 100644 jobs/bulk/BulkEmailSender.php diff --git a/api/v1/_email/PKPEmailHandler.php b/api/v1/_email/PKPEmailHandler.php index 4df01b82e69..00e89086469 100644 --- a/api/v1/_email/PKPEmailHandler.php +++ b/api/v1/_email/PKPEmailHandler.php @@ -17,14 +17,11 @@ namespace PKP\API\v1\_email; use APP\facades\Repo; -use Illuminate\Queue\WorkerOptions; use Illuminate\Support\Facades\DB; -use Illuminate\Support\Facades\Mail; use Illuminate\Support\Facades\Queue; use PKP\core\APIResponse; -use PKP\core\PKPContainer; use PKP\handler\APIHandler; -use PKP\mail\Mailable; +use PKP\jobs\bulk\BulkEmailSender; use PKP\security\authorization\PolicySet; use PKP\security\authorization\RoleBasedHandlerOperationPolicy; use PKP\security\authorization\UserRolesRequiredPolicy; @@ -33,9 +30,6 @@ class PKPEmailHandler extends APIHandler { - /** @var int Number of emails to send in each job */ - public const EMAILS_PER_JOB = 100; - /** * Constructor */ @@ -146,16 +140,10 @@ public function create(ServerRequestInterface $slimRequest, APIResponse $respons $userIds = Repo::user()->getCollector() ->filterByContextIds([$contextId]) - ->filterByUserGroupIds(['userGroupIds']) + ->filterByUserGroupIds($params['userGroupIds']) ->getIds() ->toArray(); - $subject = $params['subject']; - $body = $params['body']; - $fromEmail = $context->getData('contactEmail'); - $fromName = $context->getData('contactName'); - $queueId = 'email_' . uniqid(); - if (!empty($params['copy'])) { $currentUserId = $this->getRequest()->getUser()->getId(); if (!in_array($currentUserId, $userIds)) { @@ -163,25 +151,22 @@ public function create(ServerRequestInterface $slimRequest, APIResponse $respons } } - $batches = array_chunk($userIds, self::EMAILS_PER_JOB); + $queueId = 'email_' . uniqid(); + $batches = array_chunk($userIds, BulkEmailSender::EMAILS_PER_JOB); + foreach ($batches as $userIds) { - Queue::push(function () use ($userIds, $contextId, $subject, $body, $fromEmail, $fromName) { - $users = Repo::user()->getCollector() - ->filterByContextIds([$contextId]) - ->filterByUserIds($userIds) - ->getMany(); - - foreach ($users as $user) { - $mailable = new Mailable(); - $mailable - ->from($fromEmail, $fromName) - ->to($user->getEmail(), $user->getFullName()) - ->subject($subject) - ->body($body); - - Mail::send($mailable); - } - }, [], $queueId); + Queue::push( + new BulkEmailSender( + $userIds, + $contextId, + $params['subject'], + $params['body'], + $context->getData('contactEmail'), + $context->getData('contactName') + ), + [], + $queueId + ); } return $response->withJson([ @@ -199,23 +184,20 @@ public function create(ServerRequestInterface $slimRequest, APIResponse $respons */ public function process(ServerRequestInterface $slimRequest, APIResponse $response, array $args) { - $countRunning = DB::table('jobs') - ->where('queue', $args['queueId']) - ->whereNotNull('reserved_at') - ->count(); - $countPending = $this->countPending($args['queueId']); + $countRunning = Repo::job()->getRunningJobCount($args['queueId']); + $countPending = Repo::job()->getPendingJobCount($args['queueId']); + $jobQueue = app('pkpJobQueue'); /** @var \PKP\core\PKPQueueProvider $jobQueue */ // Don't run another job if one is already running. // This should ensure jobs are run one after the other and // prevent long-running jobs from running simultaneously // and piling onto the server like a DDOS attack. if (!$countRunning && $countPending) { - $laravelContainer = PKPContainer::getInstance(); - $options = new WorkerOptions(); - $laravelContainer['queue.worker']->runNextJob('database', $args['queueId'], $options); + + $jobQueue->forQueue($args['queueId'])->runJobInQueue(); // Update count of pending jobs - $countPending = $this->countPending($args['queueId']); + $countPending = Repo::job()->getPendingJobCount($args['queueId']); } return $response->withJson([ @@ -223,14 +205,4 @@ public function process(ServerRequestInterface $slimRequest, APIResponse $respon ], 200); } - /** - * Return a count of the pending jobs in a given queue - * - */ - protected function countPending(string $queueId): int - { - return DB::table('jobs') - ->where('queue', $queueId) - ->count(); - } } diff --git a/classes/core/PKPQueueProvider.php b/classes/core/PKPQueueProvider.php index 7eb821001a5..aab85f5cb9b 100644 --- a/classes/core/PKPQueueProvider.php +++ b/classes/core/PKPQueueProvider.php @@ -101,7 +101,7 @@ public function runJobInQueue(): void $laravelContainer['queue.worker']->runNextJob( 'database', - $job->queue, + $job->queue ?? Config::getVar('queues', 'default_queue', 'queue'), $this->getWorkerOptions() ); } diff --git a/classes/job/repositories/Job.php b/classes/job/repositories/Job.php index e43c2d228e5..94ab7a058e8 100644 --- a/classes/job/repositories/Job.php +++ b/classes/job/repositories/Job.php @@ -39,6 +39,29 @@ public function total(): int ->count(); } + public function getRunningJobCount(string $queue = null): int + { + $query = $this->model; + + if ($queue) { + $query = $query->onQueue($queue); + } + + return $query->whereNotNull('reserved_at')->count(); + } + + public function getPendingJobCount(string $queue = null): int + { + if (!$queue) { + return $this->total(); + } + + return $this->model + ->onQueue($queue) + ->nonReserved() + ->count(); + } + public function showJobs(): LengthAwarePaginator { $currentPage = LengthAwarePaginator::resolveCurrentPage(); diff --git a/jobs/bulk/BulkEmailSender.php b/jobs/bulk/BulkEmailSender.php new file mode 100644 index 00000000000..9a4ce7e6515 --- /dev/null +++ b/jobs/bulk/BulkEmailSender.php @@ -0,0 +1,96 @@ +userIds = $userIds; + $this->contextId = $contextId; + $this->subject = $subject; + $this->body = $body; + $this->fromEmail = $fromEmail; + $this->fromName = $fromName; + } + + public function handle() + { + $users = Repo::user() + ->getCollector() + ->filterByContextIds([$this->contextId]) + ->filterByUserIds($this->userIds) + ->getMany(); + + foreach ($users as $user) { + $mailable = new Mailable(); + $mailable + ->from($this->fromEmail, $this->fromName) + ->to($user->getEmail(), $user->getFullName()) + ->subject($this->subject) + ->body($this->body); + + Mail::send($mailable); + } + } +} From 59f33cb8d90af7f3daa4672ad8642c28175a6876 Mon Sep 17 00:00:00 2001 From: Touhidur Rahman Date: Wed, 22 Mar 2023 13:00:44 +0600 Subject: [PATCH 2/4] pkp/pkp-lib#8734 Updated bulk emailing to use batch queuing process --- api/v1/_email/PKPEmailHandler.php | 69 ++++++-------------------- classes/job/repositories/Job.php | 23 --------- jobs/bulk/BulkEmailSender.php | 10 ++-- locale/en/manager.po | 8 +-- pages/management/ManagementHandler.php | 2 - templates/management/access.tpl | 11 ++-- 6 files changed, 24 insertions(+), 99 deletions(-) diff --git a/api/v1/_email/PKPEmailHandler.php b/api/v1/_email/PKPEmailHandler.php index 00e89086469..6516e932ae4 100644 --- a/api/v1/_email/PKPEmailHandler.php +++ b/api/v1/_email/PKPEmailHandler.php @@ -10,18 +10,19 @@ * @class PKPEmailHandler * @ingroup api_v1_announcement * - * @brief Handle API requests for announcement operations. + * @brief Handle API request to send bulk email * */ namespace PKP\API\v1\_email; use APP\facades\Repo; -use Illuminate\Support\Facades\DB; +use Illuminate\Support\Facades\Bus; use Illuminate\Support\Facades\Queue; use PKP\core\APIResponse; use PKP\handler\APIHandler; use PKP\jobs\bulk\BulkEmailSender; +use PKP\mail\Mailer; use PKP\security\authorization\PolicySet; use PKP\security\authorization\RoleBasedHandlerOperationPolicy; use PKP\security\authorization\UserRolesRequiredPolicy; @@ -36,6 +37,7 @@ class PKPEmailHandler extends APIHandler public function __construct() { $this->_handlerPath = '_email'; + $this->_endpoints = [ 'POST' => [ [ @@ -44,14 +46,8 @@ public function __construct() 'roles' => [Role::ROLE_ID_SITE_ADMIN, Role::ROLE_ID_MANAGER], ], ], - 'PUT' => [ - [ - 'pattern' => $this->getEndpointPattern() . '/{queueId}', - 'handler' => [$this, 'process'], - 'roles' => [Role::ROLE_ID_SITE_ADMIN, Role::ROLE_ID_MANAGER], - ], - ], ]; + parent::__construct(); } @@ -151,57 +147,24 @@ public function create(ServerRequestInterface $slimRequest, APIResponse $respons } } - $queueId = 'email_' . uniqid(); - $batches = array_chunk($userIds, BulkEmailSender::EMAILS_PER_JOB); + $batches = array_chunk($userIds, Mailer::BULK_EMAIL_SIZE_LIMIT); + $jobs = []; foreach ($batches as $userIds) { - Queue::push( - new BulkEmailSender( - $userIds, - $contextId, - $params['subject'], - $params['body'], - $context->getData('contactEmail'), - $context->getData('contactName') - ), - [], - $queueId + $jobs[] = new BulkEmailSender( + $userIds, + $contextId, + $params['subject'], + $params['body'], + $context->getData('contactEmail'), + $context->getData('contactName') ); } - return $response->withJson([ - 'queueId' => $queueId, - 'totalJobs' => count($batches), - ], 200); - } - - /** - * Process a jobs queue for sending a bulk email - * - * @param array $args arguments - * - * @return APIResponse - */ - public function process(ServerRequestInterface $slimRequest, APIResponse $response, array $args) - { - $countRunning = Repo::job()->getRunningJobCount($args['queueId']); - $countPending = Repo::job()->getPendingJobCount($args['queueId']); - $jobQueue = app('pkpJobQueue'); /** @var \PKP\core\PKPQueueProvider $jobQueue */ - - // Don't run another job if one is already running. - // This should ensure jobs are run one after the other and - // prevent long-running jobs from running simultaneously - // and piling onto the server like a DDOS attack. - if (!$countRunning && $countPending) { - - $jobQueue->forQueue($args['queueId'])->runJobInQueue(); - - // Update count of pending jobs - $countPending = Repo::job()->getPendingJobCount($args['queueId']); - } + Bus::batch($jobs)->dispatch(); return $response->withJson([ - 'pendingJobs' => $countPending, + 'totalBulkJobs' => count($batches), ], 200); } diff --git a/classes/job/repositories/Job.php b/classes/job/repositories/Job.php index 94ab7a058e8..e43c2d228e5 100644 --- a/classes/job/repositories/Job.php +++ b/classes/job/repositories/Job.php @@ -39,29 +39,6 @@ public function total(): int ->count(); } - public function getRunningJobCount(string $queue = null): int - { - $query = $this->model; - - if ($queue) { - $query = $query->onQueue($queue); - } - - return $query->whereNotNull('reserved_at')->count(); - } - - public function getPendingJobCount(string $queue = null): int - { - if (!$queue) { - return $this->total(); - } - - return $this->model - ->onQueue($queue) - ->nonReserved() - ->count(); - } - public function showJobs(): LengthAwarePaginator { $currentPage = LengthAwarePaginator::resolveCurrentPage(); diff --git a/jobs/bulk/BulkEmailSender.php b/jobs/bulk/BulkEmailSender.php index 9a4ce7e6515..89e6b665628 100644 --- a/jobs/bulk/BulkEmailSender.php +++ b/jobs/bulk/BulkEmailSender.php @@ -16,19 +16,15 @@ namespace PKP\jobs\bulk; use APP\facades\Repo; +use Illuminate\Bus\Batchable; use Illuminate\Support\Facades\Mail; use PKP\mail\Mailable; use PKP\jobs\BaseJob; class BulkEmailSender extends BaseJob { - /** - * Number of emails to send in each job - * - * @var int - */ - public const EMAILS_PER_JOB = 100; - + use Batchable; + /** * The user ids to send email */ diff --git a/locale/en/manager.po b/locale/en/manager.po index 2c6032d680a..61d8cec8702 100644 --- a/locale/en/manager.po +++ b/locale/en/manager.po @@ -1091,12 +1091,8 @@ msgstr "" "You are about to send an email to {$total} users. Are you sure you want to " "send this email?" -msgid "manager.setup.notifyUsers.sending" -msgstr "" -"Sending the email. Please do not browse away until sending is complete." - -msgid "manager.setup.notifyUsers.sent" -msgstr "Email was successfully sent to all recipients." +msgid "manager.setup.notifyUsers.queued" +msgstr "Emails are successfully queued to be sent at the earliest convenience." msgid "manager.setup.notifyUsers.sendAnother" msgstr "Send another email" diff --git a/pages/management/ManagementHandler.php b/pages/management/ManagementHandler.php index 083a5d1018a..499db38674e 100644 --- a/pages/management/ManagementHandler.php +++ b/pages/management/ManagementHandler.php @@ -463,7 +463,6 @@ public function access($args, $request) $apiUrl = $dispatcher->url($request, PKPApplication::ROUTE_API, $context->getPath(), 'contexts/' . $context->getId()); $notifyUrl = $dispatcher->url($request, PKPApplication::ROUTE_API, $context->getPath(), '_email'); - $progressUrl = $dispatcher->url($request, PKPApplication::ROUTE_API, $context->getPath(), '_email/{queueId}'); $userAccessForm = new \APP\components\forms\context\UserAccessForm($apiUrl, $context); $isBulkEmailsEnabled = in_array($context->getId(), (array) $request->getSite()->getData('enableBulkEmails')); @@ -484,7 +483,6 @@ public function access($args, $request) FORM_USER_ACCESS => $userAccessForm->getConfig(), PKPNotifyUsersForm::FORM_NOTIFY_USERS => $notifyUsersForm ? $notifyUsersForm->getConfig() : null, ], - 'progressUrl' => $progressUrl, ]); $templateMgr->display('management/access.tpl'); diff --git a/templates/management/access.tpl b/templates/management/access.tpl index 8596b9fd162..7e3f86a0fa9 100644 --- a/templates/management/access.tpl +++ b/templates/management/access.tpl @@ -25,19 +25,14 @@ {if $enableBulkEmails} -
-

- - {translate key="manager.setup.notifyUsers.sending"} -

-

+

+

- {translate key="manager.setup.notifyUsers.sent"} + {translate key="manager.setup.notifyUsers.queued"}

-
Date: Wed, 22 Mar 2023 13:01:59 +0600 Subject: [PATCH 3/4] pkp/pkp-lib#8734 Override database queue connector to handle default config queue for batch process --- classes/core/PKPQueueDatabaseConnector.php | 42 ++++++++++++++++++++++ classes/core/PKPQueueProvider.php | 19 ++++++++-- 2 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 classes/core/PKPQueueDatabaseConnector.php diff --git a/classes/core/PKPQueueDatabaseConnector.php b/classes/core/PKPQueueDatabaseConnector.php new file mode 100644 index 00000000000..bb98c0222b6 --- /dev/null +++ b/classes/core/PKPQueueDatabaseConnector.php @@ -0,0 +1,42 @@ +connections->connection($config['connection'] ?? null), + $config['table'], + Config::getVar('queues', 'default_queue', 'queue'), + $config['retry_after'] ?? 60, + $config['after_commit'] ?? null + ); + } +} \ No newline at end of file diff --git a/classes/core/PKPQueueProvider.php b/classes/core/PKPQueueProvider.php index aab85f5cb9b..07beab36899 100644 --- a/classes/core/PKPQueueProvider.php +++ b/classes/core/PKPQueueProvider.php @@ -5,8 +5,8 @@ /** * @file classes/core/PKPQueueProvider.php * - * Copyright (c) 2014-2021 Simon Fraser University - * Copyright (c) 2000-2021 John Willinsky + * Copyright (c) 2014-2023 Simon Fraser University + * Copyright (c) 2000-2023 John Willinsky * Distributed under the GNU GPL v3. For full terms see the file docs/COPYING. * * @class PKPQueueProvider @@ -19,6 +19,7 @@ use APP\core\Application; use PKP\core\PKPContainer; +use PKP\core\PKPQueueDatabaseConnector; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Database\Eloquent\Builder; use Illuminate\Queue\Events\JobFailed; @@ -27,7 +28,6 @@ use Illuminate\Queue\WorkerOptions; use Illuminate\Support\Facades\Facade; use Illuminate\Support\Facades\Queue; -use Illuminate\Support\Facades\Schema; use PKP\config\Config; use PKP\job\models\Job as PKPJobModel; use PKP\queue\JobRunner; @@ -160,6 +160,19 @@ public function register() $this->registerDatabaseConnector(app()->get(\Illuminate\Queue\QueueManager::class)); } + /** + * Register the database queue connector. + * + * @param \Illuminate\Queue\QueueManager $manager + * @return void + */ + protected function registerDatabaseConnector($manager) + { + $manager->addConnector('database', function () { + return new PKPQueueDatabaseConnector($this->app['db']); + }); + } + /** * Register the queue worker. * From e7e71a11f6493bd6a84a79cb5ab9bde2b50e2263 Mon Sep 17 00:00:00 2001 From: Touhidur Rahman Date: Mon, 27 Mar 2023 11:51:07 +0600 Subject: [PATCH 4/4] pkp/pkp-lib#8734 Removed PKPQueueProvider::register method as deemed unnecessary --- classes/core/PKPQueueProvider.php | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/classes/core/PKPQueueProvider.php b/classes/core/PKPQueueProvider.php index 07beab36899..0b0e7408917 100644 --- a/classes/core/PKPQueueProvider.php +++ b/classes/core/PKPQueueProvider.php @@ -149,17 +149,6 @@ public function boot() }); } - /** - * Register the service provider. - * - */ - public function register() - { - parent::register(); - - $this->registerDatabaseConnector(app()->get(\Illuminate\Queue\QueueManager::class)); - } - /** * Register the database queue connector. *