Skip to content

Commit

Permalink
Merge pull request #449 from Green-Software-Foundation/functional-agg…
Browse files Browse the repository at this point in the history
…regation

Functional aggregation
  • Loading branch information
narekhovhannisyan authored Feb 22, 2024
2 parents 62fbf33 + 79a12f6 commit 3f5d4f8
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 247 deletions.
32 changes: 28 additions & 4 deletions examples/impls/functional/nesting-2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,47 @@ tree:
duration: 10
cpu-util: 50
energy-network: 1
- timestamp: 2023-07-06T00:10
duration: 10
cpu-util: 20
energy-network: 2
- timestamp: 2023-07-06T00:20
duration: 10
cpu-util: 30
energy-network: 3
child:0-2:
children:
child-0-2-1:
pipeline:
- sci-e
config: null
inputs:
inputs:
- timestamp: 2023-07-06T00:00
duration: 10
cpu-util: 50
cpu-util: 10
energy-network: 1
- timestamp: 2023-07-06T00:10
duration: 10
cpu-util: 10
energy-network: 1
- timestamp: 2023-07-06T00:20
duration: 10
cpu-util: 40
energy-network: 5
child-0-2-2:
pipeline:
- sci-e
config: null
inputs:
- timestamp: 2023-07-06T00:00
duration: 10
cpu-util: 50
energy-network: 1
cpu-util: 90
energy-network: 5
- timestamp: 2023-07-06T00:10
duration: 10
cpu-util: 100
energy-network: 10
- timestamp: 2023-07-06T00:20
duration: 10
cpu-util: 80
energy-network: 9
91 changes: 0 additions & 91 deletions src/__tests__/unit/util/horizontal-aggregator.test.ts

This file was deleted.

7 changes: 2 additions & 5 deletions src/__tests__/unit/util/param-selectors.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
import {getAggregationMethod} from '../../../util/param-selectors';
import {PARAMETERS} from '../../../config';

describe('util/param-selector: ', () => {
describe('getAggregationMethod(): ', () => {
it('check if `sum` is returned for non existant unit.', () => {
const nonExistantMetric = 'mock';
const expectedResult = 'sum';

expect(getAggregationMethod(nonExistantMetric, PARAMETERS)).toBe(
expectedResult
);
expect(getAggregationMethod(nonExistantMetric)).toBe(expectedResult);
});

it('returns aggregation method for `cpu-util`.', () => {
const metric = 'cpu-util';
const expectedResult = 'avg';

expect(getAggregationMethod(metric, PARAMETERS)).toBe(expectedResult);
expect(getAggregationMethod(metric)).toBe(expectedResult);
});
});
});
130 changes: 104 additions & 26 deletions src/lib/aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,117 @@
import {AggregatorOperations, aggregator} from '../util/aggregation-storage';
import {ERRORS} from '../util/errors';
import {getAggregationMethod} from '../util/param-selectors';

import {STRINGS} from '../config';

import {AggregationResult} from '../types/aggregation';
import {PluginParams} from '../types/interface';
import {AggregationParams} from '../types/manifest';

const {InvalidAggregationParams} = ERRORS;
const {INVALID_AGGREGATION_METHOD, METRIC_MISSING} = STRINGS;

/**
* Validates metrics array before applying aggregator.
* If aggregation method is `none`, then throws error.
*/
const checkIfMetricsAreValid = (metrics: string[]) => {
metrics.forEach(metric => {
const method = getAggregationMethod(metric);

if (method === 'none') {
throw new InvalidAggregationParams(INVALID_AGGREGATION_METHOD(method));
}
});
};

/**
* Aggregates child node level metrics. Validates if metric aggregation type is `none`, then rejects with error.
* Otherwise iterates over inputs by aggregating per given `metrics`.
*/
const aggregateInputsIntoOne = (inputs: PluginParams[], metrics: string[]) => {
checkIfMetricsAreValid(metrics);

return inputs.reduce((acc, input, index) => {
for (const metric of metrics) {
if (!(metric in input)) {
throw new InvalidAggregationParams(METRIC_MISSING(metric, index));
}

acc[metric] = acc[metric] ?? 0;
acc[metric] += parseFloat(input[metric]);

/** Checks for the last iteration. */
if (index === inputs.length - 1) {
if (getAggregationMethod(metric) === 'avg') {
acc[metric] /= inputs.length;
}
}
}

return acc;
}, {} as AggregationResult);
};

/**
* Gets `i`th element from all children outputs and collects them in single array.
*/
const getIthElementsFromChildren = (children: any, i: number) => {
const values = Object.values(children);

return values.map((value: any) => {
const output = value.outputs;

return output[i];
});
};

/**
*
* 1. Gets the i'th element from each childrens outputs (treating children as rows and we are after a column of data).
* 2. Now we just aggregate over the `ithSliceOfOutputs` the same as we did for the normal outputs.
*/
const verticallyAggregate = (
node: any,
storage: AggregatorOperations,
parentNode = node
) => {
const temporalAggregation = (node: any, metrics: string[]) => {
const outputs: PluginParams[] = [];
const values: any = Object.values(node.children);

for (let i = 0; i < values[0].outputs.length; i++) {
const ithSliceOfOutputs = getIthElementsFromChildren(node.children, i);
outputs.push(aggregateInputsIntoOne(ithSliceOfOutputs, metrics));
}

return outputs;
};

/**
* Navigates the tree depth first, bottom up,
* left to right aggregating the component nodes and then the grouping nodes will be aggregated
* only when all their child nodes have been aggregated.
* 1. Aggregates all the children.
* 2. At this point you can be positive all your children have been aggregated and so you can now work on aggregating yourself.
* 3. It's component node, аggregates just the outputs of THIS component node (horizontal/component aggregation).
* 4. Else it's grouping node, first does temporal aggregation. This assumes everything is on the same time-grid.
* The outputs of the grouping node are the aggregated time bucketed outputs of it's children.
* 5. Now a grouping node has it's own outputs, it can horizotnally aggregate them.
*/
const aggregateNode = (node: any, aggregationParams: AggregationParams) => {
const {metrics, type} = aggregationParams;

if (node.children) {
for (const child in node.children) {
verticallyAggregate(node.children[child], storage, node);
aggregateNode(node.children[child], aggregationParams);
}
}

if (node.inputs) {
storage.set(node.outputs);
if (!node.children) {
if (type === 'horizontal' || type === 'both') {
node.aggregated = aggregateInputsIntoOne(node.outputs, metrics);
}
} else {
if (type === 'vertical' || type === 'both') {
const outputs = temporalAggregation(node, metrics);
node.outputs = outputs;
node.aggregated = aggregateInputsIntoOne(outputs, metrics);
}
}

parentNode.aggregated = storage.get();
};

/**
Expand All @@ -32,21 +123,8 @@ export const aggregate = (tree: any, aggregationParams?: AggregationParams) => {
return tree;
}

const {metrics, type} = aggregationParams;

const copyOfTree = structuredClone(tree);
const storage = aggregator(metrics);

if (type === 'vertical') {
verticallyAggregate(copyOfTree, storage);

return copyOfTree;
}

if (type === 'horizontal') {
// horizontallyAggregte();
// return copyOfTree
}
aggregateNode(copyOfTree, aggregationParams);

return copyOfTree;
};
13 changes: 8 additions & 5 deletions src/models/time-sync.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import {isDate} from 'node:util/types';
import {DateTime, DateTimeMaybeValid, Interval} from 'luxon';
import {STRINGS, PARAMETERS} from '../config';

import {ERRORS} from '../util/errors';
import {getAggregationMethod} from '../util/param-selectors';
import {isDate} from 'node:util/types';

import {STRINGS} from '../config';

import {PluginParams} from '../types/interface';
import {
PaddingReceipt,
Expand Down Expand Up @@ -178,7 +181,7 @@ export const TimeSync = (
const inputKeys = Object.keys(input);

return inputKeys.reduce((acc, key) => {
const method = getAggregationMethod(key, PARAMETERS);
const method = getAggregationMethod(key);

if (key === 'timestamp') {
const perSecond = normalizeTimePerSecond(input.timestamp, i);
Expand Down Expand Up @@ -232,7 +235,7 @@ export const TimeSync = (
return acc;
}

const method = getAggregationMethod(metric, PARAMETERS);
const method = getAggregationMethod(metric);

if (method === 'avg' || method === 'sum') {
acc[metric] = 0;
Expand Down Expand Up @@ -290,7 +293,7 @@ export const TimeSync = (
const metrics = Object.keys(input);

metrics.forEach(metric => {
const method = getAggregationMethod(metric, PARAMETERS);
const method = getAggregationMethod(metric);
acc[metric] = acc[metric] ?? 0;

if (metric === 'timestamp') {
Expand Down
Loading

0 comments on commit 3f5d4f8

Please sign in to comment.