Skip to content

Commit

Permalink
introduce RecordBuilder concept to split up Archiver code and use in …
Browse files Browse the repository at this point in the history
…Goals (#20394)

* introduce RecordBuilder concept and re-organize Goals archiving code via RecordBuilders

* fix loop iteration bug

* split ecommerce records recordbuilder into 3 separate records

* make sure Goals::getRecordMetadata() behaves like old archiver code

* make sure recordbuilder archive processor is restored after being used since archiving is a recursive process

* just make ArchiveProcessor a parameter

* check for plugin before calling buildMultiplePeriod()

* do not invoke record builders if archiver has no plugin (happens during tests)

* insert empty DataTables (as this appears to be the existing behavior before this change)

* add RecordBuilder class name to aggregation query hint

* clear up in-source todo

* attempt only archiving requested report if range archive and the record needed is created by a RecordBuilder

* refactor ArchiveSelector::getArchiveIds() to provide result with string keys

* when all found archives are partial archives, check that requested data is present within them. if some are not present, only archive those in a new partial archive.

* return correct value in Model::getRecordsContainedInArchives()

* fix if formatting

* existingArchives can be falsy

* existing archives can be null if the check is not relevant to the current archive request

* do not archive dependent segments if only processing the specific requested report

* fix more tests

* fix LoaderTest

* make sure if archiving specific reports for a single plugin that archiver class instances will not be created

* add filterRecordBuilders event

* if it looks like the requested records are numeric, prioritize the numeric archive table, otherwise blob archive table

* fix copy-paste error

* add dummy test for numeric values

* add test for partial archiving of numeric records for ranges and fix typo causing this to fail

* lessen code redundancy in Archive.php, use Piwik\\Request and do not yet mark RecordBuilder as api

* fix type hint

* fix php-cs errors

* fix failing tests

* fix failing tests (really)

* fix isEnabled calls

* only add idarchive to Archive.php idarchive cache if it is not already there (makes debugging a little less confusing)

* remove unneeded TODO

* when forcing new archive because timestamp is too old, do not report any existing archives

* report no existing archives if done flag is different + add tests

* remove unneeded unset

* fix phpcs

* remove unneeded newline

* use siteAware cache for RecordBuilder array

* better typehints in RecordBuilder

* ignore any records that are not declared in the record metadata (which can happen, for instance, when a goal has been deleted but is still referred to in log data)

* apply review feedback

* remove stray debugging change

* Update variable name for consistency

* Remove unnecessary array_filter since a valid class name never has an empty segment

* Add TODOs

* add comment on why we look for data within partial archives prior to reporting whether archives were found or not

* typehint fixes + make insertBlobRecord (formerly insertRecord) protected for use in RecordBuilders that need to manually insert data

* more typehints

* in aggregateNumericMetrics() allow operationsToApply to be array mapping column name to op

* optimization: when getting recordbuilders, only post Archiver.addRecordBuilders event for requested plugin since it is expected for those event handlers to perform queries

* default to null if default column aggregation operation is not specified

* add check for invalid record name to Record

* allow dashes in record name since entity IDs can be used in them

---------

Co-authored-by: Stefan Giehl <[email protected]>
Co-authored-by: Michal Kleiner <[email protected]>
  • Loading branch information
3 people authored Jun 5, 2023
1 parent a727364 commit adcae6d
Show file tree
Hide file tree
Showing 26 changed files with 1,986 additions and 591 deletions.
65 changes: 44 additions & 21 deletions core/Archive.php
Original file line number Diff line number Diff line change
Expand Up @@ -436,20 +436,21 @@ public function getDataTableExpanded($name, $idSubtable = null, $depth = null, $
}

/**
* Returns the list of plugins that archive the given reports.
* Returns the given reports grouped by the plugin name that archives them.
*
* @param array $archiveNames
* @return array
* @return array `['MyPlugin' => ['MyPlugin_metric1', 'MyPlugin_report1'], ...]`
*/
private function getRequestedPlugins($archiveNames)
{
$result = [];

foreach ($archiveNames as $name) {
$result[] = self::getPluginForReport($name);
$plugin = self::getPluginForReport($name);
$result[$plugin][] = $name;
}

return array_unique($result);
return array_map('array_unique', $result);
}

/**
Expand Down Expand Up @@ -601,7 +602,8 @@ protected function get($archiveNames, $archiveDataType, $idSubtable = null)
*/
private function getArchiveIds($archiveNames)
{
$plugins = $this->getRequestedPlugins($archiveNames);
$archiveNamesByPlugin = $this->getRequestedPlugins($archiveNames);
$plugins = array_keys($archiveNamesByPlugin);

// figure out which archives haven't been processed (if an archive has been processed,
// then we have the archive IDs in $this->idarchives)
Expand All @@ -627,15 +629,13 @@ private function getArchiveIds($archiveNames)
$globalDoneFlag = Rules::getDoneFlagArchiveContainsAllPlugins($this->params->getSegment());
$doneFlags[$globalDoneFlag] = true;

$archiveGroups = array_unique($archiveGroups);

// cache id archives for plugins we haven't processed yet
if (!empty($archiveGroups)) {
if (
Rules::isArchivingEnabledFor($this->params->getIdSites(), $this->params->getSegment(), $this->getPeriodLabel())
&& !$this->forceFetchingWithoutLaunchingArchiving
) {
$this->cacheArchiveIdsAfterLaunching($archiveGroups, $plugins);
$this->cacheArchiveIdsAfterLaunching($archiveNamesByPlugin);
} else {
$this->cacheArchiveIdsWithoutLaunching($plugins);
}
Expand All @@ -651,10 +651,9 @@ private function getArchiveIds($archiveNames)
* This function will launch the archiving process for each period/site/plugin if
* metrics/reports have not been calculated/archived already.
*
* @param array $archiveGroups @see getArchiveGroupOfReport
* @param array $plugins List of plugin names to archive.
* @param array $archiveNamesByPlugin @see getRequestedPlugins
*/
private function cacheArchiveIdsAfterLaunching($archiveGroups, $plugins)
private function cacheArchiveIdsAfterLaunching($archiveNamesByPlugin)
{
foreach ($this->params->getPeriods() as $period) {
$twoDaysAfterPeriod = $period->getDateEnd()->addDay(2);
Expand Down Expand Up @@ -697,7 +696,7 @@ private function cacheArchiveIdsAfterLaunching($archiveGroups, $plugins)
continue;
}

$this->prepareArchive($archiveGroups, $site, $period);
$this->prepareArchive($archiveNamesByPlugin, $site, $period);
}
}
}
Expand Down Expand Up @@ -744,6 +743,15 @@ private function cacheArchiveIdsWithoutLaunching($plugins)
*/
private function getDoneStringForPlugin($plugin, $idSites)
{
$requestedReport = $this->getRequestedReport();

$shouldOnlyProcessRequestedArchives = empty($requestedReport)
&& Rules::shouldProcessOnlyReportsRequestedInArchiveQuery($this->getPeriodLabel());

if ($shouldOnlyProcessRequestedArchives) {
return Rules::getDoneFlagArchiveContainsOnePlugin($this->params->getSegment(), $plugin);
}

return Rules::getDoneStringFlagFor(
$idSites,
$this->params->getSegment(),
Expand Down Expand Up @@ -881,44 +889,50 @@ public static function getPluginForReport($report)
}

/**
* @param $archiveGroups
* @param $archiveNamesByPlugin
* @param $site
* @param $period
*/
private function prepareArchive(array $archiveGroups, Site $site, Period $period)
private function prepareArchive(array $archiveNamesByPlugin, Site $site, Period $period)
{
$coreAdminHomeApi = API::getInstance();

$requestedReport = null;
if (SettingsServer::isArchivePhpTriggered()) {
$requestedReport = Common::getRequestVar('requestedReport', '', 'string');
}
$requestedReport = $this->getRequestedReport();

$shouldOnlyProcessRequestedArchives = empty($requestedReport)
&& Rules::shouldProcessOnlyReportsRequestedInArchiveQuery($period->getLabel());

$periodString = $period->getRangeString();
$periodDateStr = $period->getLabel() == 'range' ? $periodString : $period->getDateStart()->toString();

$idSites = [$site->getId()];

// process for each plugin as well
foreach ($archiveGroups as $plugin) {
foreach ($archiveNamesByPlugin as $plugin => $archiveNames) {
$doneFlag = $this->getDoneStringForPlugin($plugin, $idSites);
$this->initializeArchiveIdCache($doneFlag);

$reportsToArchiveForThisPlugin = (empty($requestedReport) && $shouldOnlyProcessRequestedArchives) ? $archiveNames : $requestedReport;

$prepareResult = $coreAdminHomeApi->archiveReports(
$site->getId(),
$period->getLabel(),
$periodDateStr,
$this->params->getSegment()->getOriginalString(),
$plugin,
$requestedReport
$reportsToArchiveForThisPlugin
);

if (
!empty($prepareResult)
&& !empty($prepareResult['idarchives'])
) {
foreach ($prepareResult['idarchives'] as $idArchive) {
$this->idarchives[$doneFlag][$periodString][] = $idArchive;
if (empty($this->idarchives[$doneFlag][$periodString])
|| !in_array($idArchive, $this->idarchives[$doneFlag][$periodString])
) {
$this->idarchives[$doneFlag][$periodString][] = $idArchive;
}
}
}
}
Expand Down Expand Up @@ -956,4 +970,13 @@ public function forceFetchingWithoutLaunchingArchiving()
{
$this->forceFetchingWithoutLaunchingArchiving = true;
}

private function getRequestedReport(): ?string
{
$requestedReport = null;
if (SettingsServer::isArchivePhpTriggered()) {
$requestedReport = Request::fromRequest()->getStringParameter('requestedReport', '');
}
return $requestedReport;
}
}
19 changes: 13 additions & 6 deletions core/ArchiveProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ public function aggregateDataTableRecords($recordNames,
* as metrics for the current period.
*
* @param array|string $columns Array of metric names to aggregate.
* @param bool|string $operationToApply The operation to apply to the metric. Either `'sum'`, `'max'` or `'min'`.
* @param bool|string|string[] $operationToApply The operation to apply to the metric. Either `'sum'`, `'max'` or `'min'`.
* Can also be an array mapping record names to operations.
* @return array|int Returns the array of aggregate values. If only one metric was aggregated,
* the aggregate value will be returned as is, not in an array.
* For example, if `array('nb_visits', 'nb_hits')` is supplied for `$columns`,
Expand All @@ -276,9 +277,9 @@ public function aggregateDataTableRecords($recordNames,
* then `3040` would be returned.
* @api
*/
public function aggregateNumericMetrics($columns, $operationToApply = false)
public function aggregateNumericMetrics($columns, $operationsToApply = false)
{
$metrics = $this->getAggregatedNumericMetrics($columns, $operationToApply);
$metrics = $this->getAggregatedNumericMetrics($columns, $operationsToApply);

foreach ($metrics as $column => $value) {
$this->insertNumericRecord($column, $value);
Expand Down Expand Up @@ -489,7 +490,7 @@ protected function getOperationForColumns($columns, $defaultOperation)
{
$operationForColumn = array();
foreach ($columns as $name) {
$operation = $defaultOperation;
$operation = is_array($defaultOperation) ? ($defaultOperation[$name] ?? null) : $defaultOperation;
if (empty($operation)) {
$operation = $this->guessOperationForColumn($name);
}
Expand Down Expand Up @@ -688,13 +689,13 @@ public function renameColumnsAfterAggregation(DataTable $table, $columnsToRename
}
}

protected function getAggregatedNumericMetrics($columns, $operationToApply)
protected function getAggregatedNumericMetrics($columns, $operationsToApply)
{
if (!is_array($columns)) {
$columns = array($columns);
}

$operationForColumn = $this->getOperationForColumns($columns, $operationToApply);
$operationForColumn = $this->getOperationForColumns($columns, $operationsToApply);

$dataTable = $this->getArchive()->getDataTableFromNumeric($columns);

Expand Down Expand Up @@ -740,6 +741,12 @@ public function processDependentArchive($plugin, $segment)
}

$params = $this->getParams();
// range archives are always processed on demand, so pre-processing dependent archives is not required
// here
if (Rules::shouldProcessOnlyReportsRequestedInArchiveQuery($params->getPeriod()->getLabel())) {
return;
}

$idSites = [$params->getSite()->getId()];

// important to use the original segment string when combining. As the API itself would combine the original string.
Expand Down
50 changes: 39 additions & 11 deletions core/ArchiveProcessor/Loader.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private function prepareArchiveImpl($pluginName)
if (sizeof($data) == 2) {
return $data;
}
list($idArchives, $visits, $visitsConverted) = $data;
list($idArchives, $visits, $visitsConverted, $foundRecords) = $data;

// only lock meet those conditions
if (ArchiveProcessor::$isRootArchivingRequest && !SettingsServer::isArchivePhpTriggered()) {
Expand All @@ -153,15 +153,15 @@ private function prepareArchiveImpl($pluginName)
return $data;
}

list($idArchives, $visits, $visitsConverted) = $data;
list($idArchives, $visits, $visitsConverted, $foundRecords) = $data;

return $this->insertArchiveData($visits, $visitsConverted);
return $this->insertArchiveData($visits, $visitsConverted, $idArchives, $foundRecords);
} finally {
$lock->unlock();
}
} else {

return $this->insertArchiveData($visits, $visitsConverted);
return $this->insertArchiveData($visits, $visitsConverted, $idArchives, $foundRecords);
}
}

Expand All @@ -171,17 +171,27 @@ private function prepareArchiveImpl($pluginName)
* @param $visitsConverted
* @return array|false[]
*/
protected function insertArchiveData($visits, $visitsConverted)
protected function insertArchiveData($visits, $visitsConverted, $existingArchives, $foundRecords)
{
if (SettingsServer::isArchivePhpTriggered()) {
$this->logger->info("initiating archiving via core:archive for " . $this->params);
}

if (!empty($foundRecords)) {
$this->params->setFoundRequestedReports($foundRecords);
}

list($visits, $visitsConverted) = $this->prepareCoreMetricsArchive($visits, $visitsConverted);
list($idArchive, $visits) = $this->prepareAllPluginsArchive($visits, $visitsConverted);

if ($this->isThereSomeVisits($visits) || PluginsArchiver::doesAnyPluginArchiveWithoutVisits()) {
return [[$idArchive], $visits];
if ($this->isThereSomeVisits($visits)
|| PluginsArchiver::doesAnyPluginArchiveWithoutVisits()
) {
$idArchivesToQuery = [$idArchive];
if (!empty($foundRecords)) {
$idArchivesToQuery = array_merge($idArchivesToQuery, $existingArchives ?: []);
}
return [$idArchivesToQuery, $visits];
}

return [false, false];
Expand All @@ -208,11 +218,22 @@ protected function loadArchiveData()
// this hack was used to check the main function goes to return or continue
// NOTE: $idArchives will contain the latest DONE_OK/DONE_INVALIDATED archive as well as any partial archives
// with a ts_archived >= the DONE_OK/DONE_INVALIDATED date.
list($idArchives, $visits, $visitsConverted, $isAnyArchiveExists, $tsArchived, $value) = $this->loadExistingArchiveIdFromDb();
$archiveInfo = $this->loadExistingArchiveIdFromDb();
$idArchives = $archiveInfo['idArchives'];
$visits = $archiveInfo['visits'];
$visitsConverted = $archiveInfo['visitsConverted'];
$tsArchived = $archiveInfo['tsArchived'];
$doneFlagValue = $archiveInfo['doneFlagValue'];
$existingArchives = $archiveInfo['existingRecords'];

$requestedRecords = $this->params->getArchiveOnlyReportAsArray();
$isMissingRequestedRecords = !empty($requestedRecords) && is_array($existingArchives) && count($requestedRecords) != count($existingArchives);

if (!empty($idArchives)
&& !Rules::isActuallyForceArchivingSinglePlugin()
&& !$this->shouldForceInvalidatedArchive($value, $tsArchived)) {
&& !$this->shouldForceInvalidatedArchive($doneFlagValue, $tsArchived)
&& !$isMissingRequestedRecords
) {
// we have a usable idarchive (it's not invalidated and it's new enough), and we are not archiving
// a single report
return [$idArchives, $visits];
Expand All @@ -232,7 +253,7 @@ protected function loadArchiveData()
}
}

return [$idArchives, $visits, $visitsConverted];
return [$idArchives, $visits, $visitsConverted, $existingArchives];
}

/**
Expand Down Expand Up @@ -331,7 +352,14 @@ public function loadExistingArchiveIdFromDb()

// return no usable archive found, and no existing archive. this will skip invalidation, which should
// be fine since we just force archiving.
return [false, false, false, false, false, false];
return [
'idArchives' => false,
'visits' => false,
'visitsConverted' => false,
'archiveExists' => false,
'tsArchived' => false,
'doneFlagValue' => false,
];
}

$minDatetimeArchiveProcessedUTC = $this->getMinTimeArchiveProcessed();
Expand Down
32 changes: 30 additions & 2 deletions core/ArchiveProcessor/Parameters.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class Parameters
*/
private $isArchiveOnlyReportHandled;

/**
* @var string[]|null
*/
private $foundRequestedReports;

/**
* Constructor.
*
Expand All @@ -71,7 +76,7 @@ public function __construct(Site $site, Period $period, Segment $segment)
* If we want to archive only a single report, we can request that via this method.
* It is up to each plugin's archiver to respect the setting.
*
* @param string $archiveOnlyReport
* @param string|string[] $archiveOnlyReport
* @api
*/
public function setArchiveOnlyReport($archiveOnlyReport)
Expand Down Expand Up @@ -267,7 +272,11 @@ public function logStatusDebug()

public function __toString()
{
return "[idSite = {$this->getSite()->getId()}, period = {$this->getPeriod()->getLabel()} {$this->getPeriod()->getRangeString()}, segment = {$this->getSegment()->getString()}, plugin = {$this->getRequestedPlugin()}, report = {$this->getArchiveOnlyReport()}]";
$requestedReports = $this->getArchiveOnlyReport();
if (is_array($requestedReports)) {
$requestedReports = implode(', ', $requestedReports);
}
return "[idSite = {$this->getSite()->getId()}, period = {$this->getPeriod()->getLabel()} {$this->getPeriod()->getRangeString()}, segment = {$this->getSegment()->getString()}, plugin = {$this->getRequestedPlugin()}, report = {$requestedReports}]";
}

/**
Expand Down Expand Up @@ -295,4 +304,23 @@ public function setIsPartialArchive($isArchiveOnlyReportHandled)
{
$this->isArchiveOnlyReportHandled = $isArchiveOnlyReportHandled;
}

public function getArchiveOnlyReportAsArray()
{
$requestedReport = $this->getArchiveOnlyReport();
if (empty($requestedReport)) {
return [];
}
return is_array($requestedReport) ? $requestedReport : [$requestedReport];
}

public function setFoundRequestedReports(array $foundRecords)
{
$this->foundRequestedReports = $foundRecords;
}

public function getFoundRequestedReports()
{
return $this->foundRequestedReports ?: [];
}
}
Loading

0 comments on commit adcae6d

Please sign in to comment.