diff --git a/CRM/Queue/Queue/Sql.php b/CRM/Queue/Queue/Sql.php index 3f6c474a9468..2ace66766635 100644 --- a/CRM/Queue/Queue/Sql.php +++ b/CRM/Queue/Queue/Sql.php @@ -14,6 +14,8 @@ */ class CRM_Queue_Queue_Sql extends CRM_Queue_Queue { + use CRM_Queue_Queue_SqlTrait; + /** * Create a reference to queue. After constructing the queue, one should * usually call createQueue (if it's a new queue) or loadQueue (if it's @@ -32,41 +34,6 @@ public function __construct($queueSpec) { parent::__construct($queueSpec); } - /** - * Perform any registation or resource-allocation for a new queue - */ - public function createQueue() { - // nothing to do -- just start CRUDing items in the appropriate table - } - - /** - * Perform any loading or pre-fetch for an existing queue. - */ - public function loadQueue() { - // nothing to do -- just start CRUDing items in the appropriate table - } - - /** - * Release any resources claimed by the queue (memory, DB rows, etc) - */ - public function deleteQueue() { - return CRM_Core_DAO::singleValueQuery(" - DELETE FROM civicrm_queue_item - WHERE queue_name = %1 - ", [ - 1 => [$this->getName(), 'String'], - ]); - } - - /** - * Check if the queue exists. - * - * @return bool - */ - public function existsQueue() { - return ($this->numberOfItems() > 0); - } - /** * Add a new item to the queue. * @@ -85,21 +52,6 @@ public function createItem($data, $options = []) { $dao->save(); } - /** - * Determine number of items remaining in the queue. - * - * @return int - */ - public function numberOfItems() { - return CRM_Core_DAO::singleValueQuery(" - SELECT count(*) - FROM civicrm_queue_item - WHERE queue_name = %1 - ", [ - 1 => [$this->getName(), 'String'], - ]); - } - /** * Get the next item. * @@ -185,16 +137,6 @@ public function stealItem($lease_time = 3600) { } } - /** - * Remove an item from the queue. - * - * @param CRM_Core_DAO $dao - * The item returned by claimItem. - */ - public function deleteItem($dao) { - $dao->delete(); - } - /** * Return an item that could not be processed. * diff --git a/CRM/Queue/Queue/SqlParallel.php b/CRM/Queue/Queue/SqlParallel.php index 369e7d2cef63..f7a4b64971e9 100644 --- a/CRM/Queue/Queue/SqlParallel.php +++ b/CRM/Queue/Queue/SqlParallel.php @@ -14,6 +14,8 @@ */ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue { + use CRM_Queue_Queue_SqlTrait; + /** * Create a reference to queue. After constructing the queue, one should * usually call createQueue (if it's a new queue) or loadQueue (if it's @@ -32,32 +34,6 @@ public function __construct($queueSpec) { parent::__construct($queueSpec); } - /** - * Perform any registation or resource-allocation for a new queue - */ - public function createQueue() { - // nothing to do -- just start CRUDing items in the appropriate table - } - - /** - * Perform any loading or pre-fetch for an existing queue. - */ - public function loadQueue() { - // nothing to do -- just start CRUDing items in the appropriate table - } - - /** - * Release any resources claimed by the queue (memory, DB rows, etc) - */ - public function deleteQueue() { - return CRM_Core_DAO::singleValueQuery(" - DELETE FROM civicrm_queue_item - WHERE queue_name = %1 - ", [ - 1 => [$this->getName(), 'String'], - ]); - } - /** * Check if the queue exists. * @@ -85,21 +61,6 @@ public function createItem($data, $options = []) { $dao->save(); } - /** - * Determine number of items remaining in the queue. - * - * @return int - */ - public function numberOfItems() { - return CRM_Core_DAO::singleValueQuery(" - SELECT count(*) - FROM civicrm_queue_item - WHERE queue_name = %1 - ", [ - 1 => [$this->getName(), 'String'], - ]); - } - /** * Get the next item. * @@ -182,30 +143,4 @@ public function stealItem($lease_time = 3600) { } } - /** - * Remove an item from the queue. - * - * @param CRM_Core_DAO $dao - * The item returned by claimItem. - */ - public function deleteItem($dao) { - $dao->delete(); - $dao->free(); - } - - /** - * Return an item that could not be processed. - * - * @param CRM_Core_DAO $dao - * The item returned by claimItem. - */ - public function releaseItem($dao) { - $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1"; - $params = [ - 1 => [$dao->id, 'Integer'], - ]; - CRM_Core_DAO::executeQuery($sql, $params); - $dao->free(); - } - } diff --git a/CRM/Queue/Queue/SqlTrait.php b/CRM/Queue/Queue/SqlTrait.php new file mode 100644 index 000000000000..00f61e6a0254 --- /dev/null +++ b/CRM/Queue/Queue/SqlTrait.php @@ -0,0 +1,114 @@ + [$this->getName(), 'String'], + ]); + } + + /** + * Check if the queue exists. + * + * @return bool + */ + public function existsQueue() { + return ($this->numberOfItems() > 0); + } + + /** + * Determine number of items remaining in the queue. + * + * @return int + */ + public function numberOfItems() { + return CRM_Core_DAO::singleValueQuery(" + SELECT count(*) + FROM civicrm_queue_item + WHERE queue_name = %1 + ", [ + 1 => [$this->getName(), 'String'], + ]); + } + + /** + * Remove an item from the queue. + * + * @param CRM_Core_DAO|stdClass $dao + * The item returned by claimItem. + */ + public function deleteItem($dao) { + $dao->delete(); + $dao->free(); + } + + /** + * Get the full data for an item. + * + * This is a passive peek - it does not claim/steal/release anything. + * + * @param int|string $id + * The unique ID of the task within the queue. + * @return CRM_Queue_DAO_QueueItem|object|null $dao + */ + public function fetchItem($id) { + $dao = new CRM_Queue_DAO_QueueItem(); + $dao->id = $id; + $dao->queue_name = $this->getName(); + if (!$dao->find(TRUE)) { + return NULL; + } + $dao->data = unserialize($dao->data); + return $dao; + } + + /** + * Return an item that could not be processed. + * + * @param CRM_Core_DAO $dao + * The item returned by claimItem. + */ + public function releaseItem($dao) { + $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1"; + $params = [ + 1 => [$dao->id, 'Integer'], + ]; + CRM_Core_DAO::executeQuery($sql, $params); + $dao->free(); + } + +}