forked from matomo-org/matomo
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refs matomo-org#4903 started to work on the possibility to run multip…
…le archivers in parallel for faster archiving. There were multiple issues, for instance there were arrays of siteIds read and written in Options but options do cache all values in a class property so an update of an option does not get updated on another running archiver. Also all sites were reprocessed because of the time_before_today_archive_considered_outdated setting if the last archiving by another archivier was 10 seconds or longer ago. To prevent this only maintaining a list of to be processed siteids in db / filesystem helps so far
- Loading branch information
Showing
6 changed files
with
272 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
<?php | ||
/** | ||
* Piwik - Open source web analytics | ||
* | ||
* @link http://piwik.org | ||
* @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later | ||
* | ||
*/ | ||
namespace Piwik\ArchiveProcessor; | ||
|
||
use Piwik\CronArchive; | ||
use Exception; | ||
use Piwik\Option; | ||
use Piwik\CliMulti\Process; | ||
|
||
class FixedSiteIds | ||
{ | ||
private $siteIds = array(); | ||
private $index = -1; | ||
|
||
public function __construct($websiteIds) | ||
{ | ||
$this->siteIds = $websiteIds; | ||
} | ||
|
||
public function getNumSites() | ||
{ | ||
return count($this->siteIds); | ||
} | ||
|
||
public function getNumProcessedWebsites() | ||
{ | ||
return $this->index + 1; | ||
} | ||
|
||
public function getNextSiteId() | ||
{ | ||
$this->index++; | ||
|
||
if (!empty($this->siteIds[$this->index])) { | ||
return $this->siteIds[$this->index]; | ||
} | ||
|
||
return null; | ||
} | ||
|
||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
<?php | ||
/** | ||
* Piwik - Open source web analytics | ||
* | ||
* @link http://piwik.org | ||
* @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later | ||
* | ||
*/ | ||
namespace Piwik\ArchiveProcessor; | ||
|
||
use Exception; | ||
use Piwik\Option; | ||
use Piwik\CliMulti\Process; | ||
|
||
class SharedSiteIds | ||
{ | ||
private $siteIds = array(); | ||
private $currentSiteId; | ||
|
||
public function __construct($websiteIds) | ||
{ | ||
$self = $this; | ||
$this->siteIds = $this->runExclusive(function () use ($self, $websiteIds) { | ||
$existingWebsiteIds = $self->getAllSiteIdsToArchive(); | ||
|
||
if (!empty($existingWebsiteIds)) { | ||
return $existingWebsiteIds; | ||
} | ||
|
||
$self->setSiteIdsToArchive($websiteIds); | ||
|
||
return $websiteIds; | ||
}); | ||
} | ||
|
||
public function getNumSites() | ||
{ | ||
return count($this->siteIds); | ||
} | ||
|
||
public function getNumProcessedWebsites() | ||
{ | ||
if (empty($this->currentSiteId)) { | ||
return 0; | ||
} | ||
|
||
$index = array_search($this->currentSiteId, $this->siteIds); | ||
|
||
if (false === $index) { | ||
return 0; | ||
} | ||
|
||
return $index + 1; | ||
} | ||
|
||
public function setSiteIdsToArchive($siteIds) | ||
{ | ||
if (!empty($siteIds)) { | ||
Option::set('SiteIdsToArchive', implode(',', $siteIds)); | ||
} else { | ||
Option::delete('SiteIdsToArchive'); | ||
} | ||
} | ||
|
||
public function getAllSiteIdsToArchive() | ||
{ | ||
Option::clearCachedOption('SiteIdsToArchive'); | ||
$siteIdsToArchive = Option::get('SiteIdsToArchive'); | ||
|
||
if (empty($siteIdsToArchive)) { | ||
return array(); | ||
} | ||
|
||
return explode(',', trim($siteIdsToArchive)); | ||
} | ||
|
||
/** | ||
* If there are multiple archiver running on the same node it makes sure only one of them performs an action and it | ||
* will wait until another one has finished. Any closure you pass here should be very fast as other processes wait | ||
* for this closure to finish otherwise. Currently only used for making multiple archivers at the same time work. | ||
* If a closure takes more than 5 seconds we assume it is dead and simply continue. | ||
* | ||
* @param \Closure $closure | ||
* @return mixed | ||
* @throws \Exception | ||
*/ | ||
private function runExclusive($closure) | ||
{ | ||
$process = new Process('archive.lock'); | ||
while ($process->isRunning() && $process->getSecondsSinceCreation() < 5) { | ||
// wait max 5 seconds, such an operation should not take longer | ||
usleep(25); | ||
} | ||
|
||
$process->startProcess(); | ||
|
||
try { | ||
$result = $closure(); | ||
} catch (Exception $e) { | ||
$process->finishProcess(); | ||
throw $e; | ||
} | ||
|
||
$process->finishProcess(); | ||
|
||
return $result; | ||
} | ||
|
||
public function getNextSiteId() | ||
{ | ||
$self = $this; | ||
|
||
$this->currentSiteId = $this->runExclusive(function () use ($self) { | ||
|
||
$siteIds = $self->getAllSiteIdsToArchive(); | ||
|
||
if (empty($siteIds)) { | ||
return null; | ||
} | ||
|
||
$nextSiteId = array_shift($siteIds); | ||
$self->setSiteIdsToArchive($siteIds); | ||
|
||
return $nextSiteId; | ||
}); | ||
|
||
return $this->currentSiteId; | ||
} | ||
|
||
public static function isSupported() | ||
{ | ||
return Process::isSupported(); | ||
} | ||
|
||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.