diff --git a/examples/impls/functional/nesting-2.yml b/examples/impls/functional/nesting-2.yml index 1f670a552..f20c9ff4c 100644 --- a/examples/impls/functional/nesting-2.yml +++ b/examples/impls/functional/nesting-2.yml @@ -30,17 +30,33 @@ tree: duration: 10 cpu-util: 50 energy-network: 1 + - timestamp: 2023-07-06T00:10 + duration: 10 + cpu-util: 20 + energy-network: 2 + - timestamp: 2023-07-06T00:20 + duration: 10 + cpu-util: 30 + energy-network: 3 child:0-2: children: child-0-2-1: pipeline: - sci-e config: null - inputs: + inputs: - timestamp: 2023-07-06T00:00 duration: 10 - cpu-util: 50 + cpu-util: 10 + energy-network: 1 + - timestamp: 2023-07-06T00:10 + duration: 10 + cpu-util: 10 energy-network: 1 + - timestamp: 2023-07-06T00:20 + duration: 10 + cpu-util: 40 + energy-network: 5 child-0-2-2: pipeline: - sci-e @@ -48,5 +64,13 @@ tree: inputs: - timestamp: 2023-07-06T00:00 duration: 10 - cpu-util: 50 - energy-network: 1 + cpu-util: 90 + energy-network: 5 + - timestamp: 2023-07-06T00:10 + duration: 10 + cpu-util: 100 + energy-network: 10 + - timestamp: 2023-07-06T00:20 + duration: 10 + cpu-util: 80 + energy-network: 9 diff --git a/src/__tests__/unit/util/horizontal-aggregator.test.ts b/src/__tests__/unit/util/horizontal-aggregator.test.ts deleted file mode 100644 index 4e4f606ae..000000000 --- a/src/__tests__/unit/util/horizontal-aggregator.test.ts +++ /dev/null @@ -1,91 +0,0 @@ -import {aggregate} from '../../../util/horizontal-aggregator'; - -import {STRINGS, PARAMETERS} from '../../../config'; - -import {ERRORS} from '../../../util/errors'; - -const {INVALID_AGGREGATION_METHOD, METRIC_MISSING} = STRINGS; - -const {InvalidAggregationParams} = ERRORS; - -describe('util/horizontal-aggregator: ', () => { - describe('aggregate(): ', () => { - it('throws error if aggregation method is none.', () => { - const inputs = [{}]; - const metrics = ['total-resources']; - - const expectedMessage = INVALID_AGGREGATION_METHOD('none'); - - expect.assertions(1); - - try { - aggregate(inputs, metrics, PARAMETERS); - } catch (error) { - expect(error).toEqual(new InvalidAggregationParams(expectedMessage)); - } - }); - - it('throws error if metric is not found while aggregation.', () => { - const inputs = [ - { - 'ram-util': 10, - }, - ]; - const metrics = ['cpu-util']; - - const expectedMessage = METRIC_MISSING(metrics[0], 0); - - expect.assertions(1); - - try { - aggregate(inputs, metrics, PARAMETERS); - } catch (error) { - expect(error).toEqual(new InvalidAggregationParams(expectedMessage)); - } - }); - - it('should successfully calculate avg.', () => { - const inputs = [ - { - 'cpu-util': 10, - }, - { - 'cpu-util': 20, - }, - ]; - const metrics = ['cpu-util']; - - const expectedKey = `aggregated-${Object.keys(inputs[0])[0]}`; - const expectedValue = (inputs[0]['cpu-util'] + inputs[1]['cpu-util']) / 2; - const expectedResult = { - [`${expectedKey}`]: expectedValue, - }; - - const aggregatedResult = aggregate(inputs, metrics, PARAMETERS); - - expect(aggregatedResult).toEqual(expectedResult); - }); - - it('should successfully calculate sum.', () => { - const inputs = [ - { - 'disk-io': 10, - }, - { - 'disk-io': 20, - }, - ]; - const metrics = ['disk-io']; - - const expectedKey = `aggregated-${Object.keys(inputs[0])[0]}`; - const expectedValue = inputs[0]['disk-io'] + inputs[1]['disk-io']; - const expectedResult = { - [`${expectedKey}`]: expectedValue, - }; - - const aggregatedResult = aggregate(inputs, metrics, PARAMETERS); - - expect(aggregatedResult).toEqual(expectedResult); - }); - }); -}); diff --git a/src/__tests__/unit/util/param-selectors.test.ts b/src/__tests__/unit/util/param-selectors.test.ts index 70f00cfc2..e55a76fd4 100644 --- a/src/__tests__/unit/util/param-selectors.test.ts +++ b/src/__tests__/unit/util/param-selectors.test.ts @@ -1,5 +1,4 @@ import {getAggregationMethod} from '../../../util/param-selectors'; -import {PARAMETERS} from '../../../config'; describe('util/param-selector: ', () => { describe('getAggregationMethod(): ', () => { @@ -7,16 +6,14 @@ describe('util/param-selector: ', () => { const nonExistantMetric = 'mock'; const expectedResult = 'sum'; - expect(getAggregationMethod(nonExistantMetric, PARAMETERS)).toBe( - expectedResult - ); + expect(getAggregationMethod(nonExistantMetric)).toBe(expectedResult); }); it('returns aggregation method for `cpu-util`.', () => { const metric = 'cpu-util'; const expectedResult = 'avg'; - expect(getAggregationMethod(metric, PARAMETERS)).toBe(expectedResult); + expect(getAggregationMethod(metric)).toBe(expectedResult); }); }); }); diff --git a/src/lib/aggregate.ts b/src/lib/aggregate.ts index 135e87924..119df40de 100644 --- a/src/lib/aggregate.ts +++ b/src/lib/aggregate.ts @@ -1,26 +1,117 @@ -import {AggregatorOperations, aggregator} from '../util/aggregation-storage'; +import {ERRORS} from '../util/errors'; +import {getAggregationMethod} from '../util/param-selectors'; +import {STRINGS} from '../config'; + +import {AggregationResult} from '../types/aggregation'; +import {PluginParams} from '../types/interface'; import {AggregationParams} from '../types/manifest'; +const {InvalidAggregationParams} = ERRORS; +const {INVALID_AGGREGATION_METHOD, METRIC_MISSING} = STRINGS; + +/** + * Validates metrics array before applying aggregator. + * If aggregation method is `none`, then throws error. + */ +const checkIfMetricsAreValid = (metrics: string[]) => { + metrics.forEach(metric => { + const method = getAggregationMethod(metric); + + if (method === 'none') { + throw new InvalidAggregationParams(INVALID_AGGREGATION_METHOD(method)); + } + }); +}; + +/** + * Aggregates child node level metrics. Validates if metric aggregation type is `none`, then rejects with error. + * Otherwise iterates over inputs by aggregating per given `metrics`. + */ +const aggregateInputsIntoOne = (inputs: PluginParams[], metrics: string[]) => { + checkIfMetricsAreValid(metrics); + + return inputs.reduce((acc, input, index) => { + for (const metric of metrics) { + if (!(metric in input)) { + throw new InvalidAggregationParams(METRIC_MISSING(metric, index)); + } + + acc[metric] = acc[metric] ?? 0; + acc[metric] += parseFloat(input[metric]); + + /** Checks for the last iteration. */ + if (index === inputs.length - 1) { + if (getAggregationMethod(metric) === 'avg') { + acc[metric] /= inputs.length; + } + } + } + + return acc; + }, {} as AggregationResult); +}; + +/** + * Gets `i`th element from all children outputs and collects them in single array. + */ +const getIthElementsFromChildren = (children: any, i: number) => { + const values = Object.values(children); + + return values.map((value: any) => { + const output = value.outputs; + + return output[i]; + }); +}; + /** - * + * 1. Gets the i'th element from each childrens outputs (treating children as rows and we are after a column of data). + * 2. Now we just aggregate over the `ithSliceOfOutputs` the same as we did for the normal outputs. */ -const verticallyAggregate = ( - node: any, - storage: AggregatorOperations, - parentNode = node -) => { +const temporalAggregation = (node: any, metrics: string[]) => { + const outputs: PluginParams[] = []; + const values: any = Object.values(node.children); + + for (let i = 0; i < values[0].outputs.length; i++) { + const ithSliceOfOutputs = getIthElementsFromChildren(node.children, i); + outputs.push(aggregateInputsIntoOne(ithSliceOfOutputs, metrics)); + } + + return outputs; +}; + +/** + * Navigates the tree depth first, bottom up, + * left to right aggregating the component nodes and then the grouping nodes will be aggregated + * only when all their child nodes have been aggregated. + * 1. Aggregates all the children. + * 2. At this point you can be positive all your children have been aggregated and so you can now work on aggregating yourself. + * 3. It's component node, аggregates just the outputs of THIS component node (horizontal/component aggregation). + * 4. Else it's grouping node, first does temporal aggregation. This assumes everything is on the same time-grid. + * The outputs of the grouping node are the aggregated time bucketed outputs of it's children. + * 5. Now a grouping node has it's own outputs, it can horizotnally aggregate them. + */ +const aggregateNode = (node: any, aggregationParams: AggregationParams) => { + const {metrics, type} = aggregationParams; + if (node.children) { for (const child in node.children) { - verticallyAggregate(node.children[child], storage, node); + aggregateNode(node.children[child], aggregationParams); } } - if (node.inputs) { - storage.set(node.outputs); + if (!node.children) { + if (type === 'horizontal' || type === 'both') { + node.aggregated = aggregateInputsIntoOne(node.outputs, metrics); + } + } else { + if (type === 'vertical' || type === 'both') { + const outputs = temporalAggregation(node, metrics); + node.outputs = outputs; + node.aggregated = aggregateInputsIntoOne(outputs, metrics); + } } - - parentNode.aggregated = storage.get(); }; /** @@ -32,21 +123,8 @@ export const aggregate = (tree: any, aggregationParams?: AggregationParams) => { return tree; } - const {metrics, type} = aggregationParams; - const copyOfTree = structuredClone(tree); - const storage = aggregator(metrics); - - if (type === 'vertical') { - verticallyAggregate(copyOfTree, storage); - - return copyOfTree; - } - - if (type === 'horizontal') { - // horizontallyAggregte(); - // return copyOfTree - } + aggregateNode(copyOfTree, aggregationParams); return copyOfTree; }; diff --git a/src/models/time-sync.ts b/src/models/time-sync.ts index 9b2842b0a..86e788207 100644 --- a/src/models/time-sync.ts +++ b/src/models/time-sync.ts @@ -1,8 +1,11 @@ +import {isDate} from 'node:util/types'; import {DateTime, DateTimeMaybeValid, Interval} from 'luxon'; -import {STRINGS, PARAMETERS} from '../config'; + import {ERRORS} from '../util/errors'; import {getAggregationMethod} from '../util/param-selectors'; -import {isDate} from 'node:util/types'; + +import {STRINGS} from '../config'; + import {PluginParams} from '../types/interface'; import { PaddingReceipt, @@ -178,7 +181,7 @@ export const TimeSync = ( const inputKeys = Object.keys(input); return inputKeys.reduce((acc, key) => { - const method = getAggregationMethod(key, PARAMETERS); + const method = getAggregationMethod(key); if (key === 'timestamp') { const perSecond = normalizeTimePerSecond(input.timestamp, i); @@ -232,7 +235,7 @@ export const TimeSync = ( return acc; } - const method = getAggregationMethod(metric, PARAMETERS); + const method = getAggregationMethod(metric); if (method === 'avg' || method === 'sum') { acc[metric] = 0; @@ -290,7 +293,7 @@ export const TimeSync = ( const metrics = Object.keys(input); metrics.forEach(metric => { - const method = getAggregationMethod(metric, PARAMETERS); + const method = getAggregationMethod(metric); acc[metric] = acc[metric] ?? 0; if (metric === 'timestamp') { diff --git a/src/util/aggregation-storage.ts b/src/util/aggregation-storage.ts deleted file mode 100644 index c52c97c29..000000000 --- a/src/util/aggregation-storage.ts +++ /dev/null @@ -1,51 +0,0 @@ -import {PluginParams} from '../types/interface'; - -export type AggregatorOperations = { - set: (current: PluginParams[]) => PluginParams[]; - get: () => PluginParams[]; - clear: () => PluginParams[]; -}; - -export const aggregator = (metrics: string[]): AggregatorOperations => { - let storage: PluginParams[] = []; - - const set = (current: PluginParams[]) => { - if (storage.length === 0) { - storage = current; - - return storage; - } - - storage = current.reduce((acc: PluginParams[], item, index) => { - const keys = Object.keys(item); - const member: PluginParams = {}; - - keys.forEach(key => { - if (metrics.includes(key)) { - member[key] = member[key] ?? 0; - member[key] = item[key] + storage[index][key]; - } - }); - - acc.push(member); - - return acc; - }, []); - - return storage; - }; - - const get = () => storage; - - const clear = () => { - storage = []; - - return storage; - }; - - return { - set, - get, - clear, - }; -}; diff --git a/src/util/horizontal-aggregator.ts b/src/util/horizontal-aggregator.ts deleted file mode 100644 index 5b16e90eb..000000000 --- a/src/util/horizontal-aggregator.ts +++ /dev/null @@ -1,58 +0,0 @@ -import {ERRORS} from './errors'; -import {getAggregationMethod} from './param-selectors'; - -import {STRINGS} from '../config'; - -import {PluginParams} from '../types/interface'; -import {Parameters} from '../types/parameters'; -import {AggregationResult} from '../types/aggregation'; - -const {InvalidAggregationParams} = ERRORS; -const {INVALID_AGGREGATION_METHOD, METRIC_MISSING} = STRINGS; - -/** - * Validates metrics array before applying aggregator. - * If aggregation method is `none`, then throws error. - */ -const checkIfMetricsAreValid = (metrics: string[], parameters: Parameters) => { - metrics.forEach(metric => { - const method = getAggregationMethod(metric, parameters); - - if (method === 'none') { - throw new InvalidAggregationParams(INVALID_AGGREGATION_METHOD(method)); - } - }); -}; - -/** - * Aggregates child node level metrics. Validates if metric aggregation type is `none`, then rejects with error. - * Otherwise iterates over inputs by aggregating per given `metrics`. - */ -export const aggregate = ( - inputs: PluginParams[], - metrics: string[], - parameters: Parameters -) => { - checkIfMetricsAreValid(metrics, parameters); - - return inputs.reduce((acc, input, index) => { - for (const metric of metrics) { - if (!(metric in input)) { - throw new InvalidAggregationParams(METRIC_MISSING(metric, index)); - } - - const accessKey = `aggregated-${metric}`; - acc[accessKey] = acc[accessKey] ?? 0; - acc[accessKey] += parseFloat(input[metric]); - - /** Checks for the last iteration. */ - if (index === inputs.length - 1) { - if (getAggregationMethod(metric, parameters) === 'avg') { - acc[accessKey] /= inputs.length; - } - } - } - - return acc; - }, {} as AggregationResult); -}; diff --git a/src/util/param-selectors.ts b/src/util/param-selectors.ts index d6c521bdf..454bffa7c 100644 --- a/src/util/param-selectors.ts +++ b/src/util/param-selectors.ts @@ -1,14 +1,11 @@ -import {Parameters} from '../types/parameters'; +import {PARAMETERS} from '../config'; /** * Returns aggregation method for given `unitName`. If doesn't exist then returns value `sum`. */ -export const getAggregationMethod = ( - unitName: string, - parameters: Parameters -) => { - if (`${unitName}` in parameters) { - return parameters[unitName].aggregation; +export const getAggregationMethod = (unitName: string) => { + if (`${unitName}` in PARAMETERS) { + return PARAMETERS[unitName].aggregation; } return 'sum';