Skip to content

Commit

Permalink
Merge pull request #8827 from touhidurabir/i8734_main
Browse files Browse the repository at this point in the history
#8734 Added bulk mail sending job class and optimized bulk mail sending
  • Loading branch information
Vitaliy-1 authored Mar 27, 2023
2 parents be10907 + e7e71a1 commit a5c7e95
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 110 deletions.
105 changes: 20 additions & 85 deletions api/v1/_email/PKPEmailHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@
* @class PKPEmailHandler
* @ingroup api_v1_announcement
*
* @brief Handle API requests for announcement operations.
* @brief Handle API request to send bulk email
*
*/

namespace PKP\API\v1\_email;

use APP\facades\Repo;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Mail;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Queue;
use PKP\core\APIResponse;
use PKP\core\PKPContainer;
use PKP\handler\APIHandler;
use PKP\mail\Mailable;
use PKP\jobs\bulk\BulkEmailSender;
use PKP\mail\Mailer;
use PKP\security\authorization\PolicySet;
use PKP\security\authorization\RoleBasedHandlerOperationPolicy;
use PKP\security\authorization\UserRolesRequiredPolicy;
Expand All @@ -33,15 +31,13 @@

class PKPEmailHandler extends APIHandler
{
/** @var int Number of emails to send in each job */
public const EMAILS_PER_JOB = 100;

/**
* Constructor
*/
public function __construct()
{
$this->_handlerPath = '_email';

$this->_endpoints = [
'POST' => [
[
Expand All @@ -50,14 +46,8 @@ public function __construct()
'roles' => [Role::ROLE_ID_SITE_ADMIN, Role::ROLE_ID_MANAGER],
],
],
'PUT' => [
[
'pattern' => $this->getEndpointPattern() . '/{queueId}',
'handler' => [$this, 'process'],
'roles' => [Role::ROLE_ID_SITE_ADMIN, Role::ROLE_ID_MANAGER],
],
],
];

parent::__construct();
}

Expand Down Expand Up @@ -146,91 +136,36 @@ public function create(ServerRequestInterface $slimRequest, APIResponse $respons

$userIds = Repo::user()->getCollector()
->filterByContextIds([$contextId])
->filterByUserGroupIds(['userGroupIds'])
->filterByUserGroupIds($params['userGroupIds'])
->getIds()
->toArray();

$subject = $params['subject'];
$body = $params['body'];
$fromEmail = $context->getData('contactEmail');
$fromName = $context->getData('contactName');
$queueId = 'email_' . uniqid();

if (!empty($params['copy'])) {
$currentUserId = $this->getRequest()->getUser()->getId();
if (!in_array($currentUserId, $userIds)) {
$userIds[] = $currentUserId;
}
}

$batches = array_chunk($userIds, self::EMAILS_PER_JOB);
$batches = array_chunk($userIds, Mailer::BULK_EMAIL_SIZE_LIMIT);
$jobs = [];

foreach ($batches as $userIds) {
Queue::push(function () use ($userIds, $contextId, $subject, $body, $fromEmail, $fromName) {
$users = Repo::user()->getCollector()
->filterByContextIds([$contextId])
->filterByUserIds($userIds)
->getMany();

foreach ($users as $user) {
$mailable = new Mailable();
$mailable
->from($fromEmail, $fromName)
->to($user->getEmail(), $user->getFullName())
->subject($subject)
->body($body);

Mail::send($mailable);
}
}, [], $queueId);
$jobs[] = new BulkEmailSender(
$userIds,
$contextId,
$params['subject'],
$params['body'],
$context->getData('contactEmail'),
$context->getData('contactName')
);
}

return $response->withJson([
'queueId' => $queueId,
'totalJobs' => count($batches),
], 200);
}

/**
* Process a jobs queue for sending a bulk email
*
* @param array $args arguments
*
* @return APIResponse
*/
public function process(ServerRequestInterface $slimRequest, APIResponse $response, array $args)
{
$countRunning = DB::table('jobs')
->where('queue', $args['queueId'])
->whereNotNull('reserved_at')
->count();
$countPending = $this->countPending($args['queueId']);

// Don't run another job if one is already running.
// This should ensure jobs are run one after the other and
// prevent long-running jobs from running simultaneously
// and piling onto the server like a DDOS attack.
if (!$countRunning && $countPending) {
$laravelContainer = PKPContainer::getInstance();
$options = new WorkerOptions();
$laravelContainer['queue.worker']->runNextJob('database', $args['queueId'], $options);

// Update count of pending jobs
$countPending = $this->countPending($args['queueId']);
}
Bus::batch($jobs)->dispatch();

return $response->withJson([
'pendingJobs' => $countPending,
'totalBulkJobs' => count($batches),
], 200);
}

/**
* Return a count of the pending jobs in a given queue
*
*/
protected function countPending(string $queueId): int
{
return DB::table('jobs')
->where('queue', $queueId)
->count();
}
}
42 changes: 42 additions & 0 deletions classes/core/PKPQueueDatabaseConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

declare(strict_types=1);

/**
* @file classes/core/PKPQueueDatabaseConnector.php
*
* Copyright (c) 2014-2023 Simon Fraser University
* Copyright (c) 2000-2023 John Willinsky
* Distributed under the GNU GPL v3. For full terms see the file docs/COPYING.
*
* @class PKPQueueDatabaseConnector
* @ingroup core
*
* @brief Registers the database queue connector
*/

namespace PKP\core;

use Illuminate\Queue\Connectors\DatabaseConnector as IlluminateQueueDatabaseConnector;
use Illuminate\Queue\DatabaseQueue;
use PKP\config\Config;

class PKPQueueDatabaseConnector extends IlluminateQueueDatabaseConnector
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new DatabaseQueue(
$this->connections->connection($config['connection'] ?? null),
$config['table'],
Config::getVar('queues', 'default_queue', 'queue'),
$config['retry_after'] ?? 60,
$config['after_commit'] ?? null
);
}
}
20 changes: 11 additions & 9 deletions classes/core/PKPQueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
/**
* @file classes/core/PKPQueueProvider.php
*
* Copyright (c) 2014-2021 Simon Fraser University
* Copyright (c) 2000-2021 John Willinsky
* Copyright (c) 2014-2023 Simon Fraser University
* Copyright (c) 2000-2023 John Willinsky
* Distributed under the GNU GPL v3. For full terms see the file docs/COPYING.
*
* @class PKPQueueProvider
Expand All @@ -19,6 +19,7 @@

use APP\core\Application;
use PKP\core\PKPContainer;
use PKP\core\PKPQueueDatabaseConnector;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Queue\Events\JobFailed;
Expand All @@ -27,7 +28,6 @@
use Illuminate\Queue\WorkerOptions;
use Illuminate\Support\Facades\Facade;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\Facades\Schema;
use PKP\config\Config;
use PKP\job\models\Job as PKPJobModel;
use PKP\queue\JobRunner;
Expand Down Expand Up @@ -101,7 +101,7 @@ public function runJobInQueue(): void

$laravelContainer['queue.worker']->runNextJob(
'database',
$job->queue,
$job->queue ?? Config::getVar('queues', 'default_queue', 'queue'),
$this->getWorkerOptions()
);
}
Expand Down Expand Up @@ -150,14 +150,16 @@ public function boot()
}

/**
* Register the service provider.
* Register the database queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function register()
protected function registerDatabaseConnector($manager)
{
parent::register();

$this->registerDatabaseConnector(app()->get(\Illuminate\Queue\QueueManager::class));
$manager->addConnector('database', function () {
return new PKPQueueDatabaseConnector($this->app['db']);
});
}

/**
Expand Down
92 changes: 92 additions & 0 deletions jobs/bulk/BulkEmailSender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?php

/**
* @file jobs/bulk/BulkEmailSender.php
*
* Copyright (c) 2014-2023 Simon Fraser University
* Copyright (c) 2000-2023 John Willinsky
* Distributed under the GNU GPL v3. For full terms see the file docs/COPYING.
*
* @class BulkEmailSender
* @ingroup jobs
*
* @brief Job to send bulk emails
*/

namespace PKP\jobs\bulk;

use APP\facades\Repo;
use Illuminate\Bus\Batchable;
use Illuminate\Support\Facades\Mail;
use PKP\mail\Mailable;
use PKP\jobs\BaseJob;

class BulkEmailSender extends BaseJob
{
use Batchable;

/**
* The user ids to send email
*/
protected array $userIds;

/**
* The associated context id
*/
protected int $contextId;

/**
* Mail subject
*/
protected string $subject;

/**
* Mail body
*/
protected string $body;

/**
* From email to send mail
*/
protected object|array|string $fromEmail;

/**
* From name to send mail
*/
protected mixed $fromName;

/**
* Create a new job instance.
*/
public function __construct(array $userIds, int $contextId, string $subject, string $body, object|array|string $fromEmail, mixed $fromName)
{
parent::__construct();

$this->userIds = $userIds;
$this->contextId = $contextId;
$this->subject = $subject;
$this->body = $body;
$this->fromEmail = $fromEmail;
$this->fromName = $fromName;
}

public function handle()
{
$users = Repo::user()
->getCollector()
->filterByContextIds([$this->contextId])
->filterByUserIds($this->userIds)
->getMany();

foreach ($users as $user) {
$mailable = new Mailable();
$mailable
->from($this->fromEmail, $this->fromName)
->to($user->getEmail(), $user->getFullName())
->subject($this->subject)
->body($this->body);

Mail::send($mailable);
}
}
}
8 changes: 2 additions & 6 deletions locale/en/manager.po
Original file line number Diff line number Diff line change
Expand Up @@ -1091,12 +1091,8 @@ msgstr ""
"You are about to send an email to {$total} users. Are you sure you want to "
"send this email?"

msgid "manager.setup.notifyUsers.sending"
msgstr ""
"Sending the email. Please do not browse away until sending is complete."

msgid "manager.setup.notifyUsers.sent"
msgstr "Email was successfully sent to all recipients."
msgid "manager.setup.notifyUsers.queued"
msgstr "Emails are successfully queued to be sent at the earliest convenience."

msgid "manager.setup.notifyUsers.sendAnother"
msgstr "Send another email"
Expand Down
2 changes: 0 additions & 2 deletions pages/management/ManagementHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ public function access($args, $request)

$apiUrl = $dispatcher->url($request, PKPApplication::ROUTE_API, $context->getPath(), 'contexts/' . $context->getId());
$notifyUrl = $dispatcher->url($request, PKPApplication::ROUTE_API, $context->getPath(), '_email');
$progressUrl = $dispatcher->url($request, PKPApplication::ROUTE_API, $context->getPath(), '_email/{queueId}');

$userAccessForm = new \APP\components\forms\context\UserAccessForm($apiUrl, $context);
$isBulkEmailsEnabled = in_array($context->getId(), (array) $request->getSite()->getData('enableBulkEmails'));
Expand All @@ -484,7 +483,6 @@ public function access($args, $request)
FORM_USER_ACCESS => $userAccessForm->getConfig(),
PKPNotifyUsersForm::FORM_NOTIFY_USERS => $notifyUsersForm ? $notifyUsersForm->getConfig() : null,
],
'progressUrl' => $progressUrl,
]);

$templateMgr->display('management/access.tpl');
Expand Down
Loading

0 comments on commit a5c7e95

Please sign in to comment.