From 1ce94e34b8127d3168c0be63f86f5f52d0d4ac4b Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:02:09 +0400 Subject: [PATCH 1/9] test(util): remove horizontal aggregator --- .../unit/util/horizontal-aggregator.test.ts | 91 ------------------- 1 file changed, 91 deletions(-) delete mode 100644 src/__tests__/unit/util/horizontal-aggregator.test.ts diff --git a/src/__tests__/unit/util/horizontal-aggregator.test.ts b/src/__tests__/unit/util/horizontal-aggregator.test.ts deleted file mode 100644 index 4e4f606ae..000000000 --- a/src/__tests__/unit/util/horizontal-aggregator.test.ts +++ /dev/null @@ -1,91 +0,0 @@ -import {aggregate} from '../../../util/horizontal-aggregator'; - -import {STRINGS, PARAMETERS} from '../../../config'; - -import {ERRORS} from '../../../util/errors'; - -const {INVALID_AGGREGATION_METHOD, METRIC_MISSING} = STRINGS; - -const {InvalidAggregationParams} = ERRORS; - -describe('util/horizontal-aggregator: ', () => { - describe('aggregate(): ', () => { - it('throws error if aggregation method is none.', () => { - const inputs = [{}]; - const metrics = ['total-resources']; - - const expectedMessage = INVALID_AGGREGATION_METHOD('none'); - - expect.assertions(1); - - try { - aggregate(inputs, metrics, PARAMETERS); - } catch (error) { - expect(error).toEqual(new InvalidAggregationParams(expectedMessage)); - } - }); - - it('throws error if metric is not found while aggregation.', () => { - const inputs = [ - { - 'ram-util': 10, - }, - ]; - const metrics = ['cpu-util']; - - const expectedMessage = METRIC_MISSING(metrics[0], 0); - - expect.assertions(1); - - try { - aggregate(inputs, metrics, PARAMETERS); - } catch (error) { - expect(error).toEqual(new InvalidAggregationParams(expectedMessage)); - } - }); - - it('should successfully calculate avg.', () => { - const inputs = [ - { - 'cpu-util': 10, - }, - { - 'cpu-util': 20, - }, - ]; - const metrics = ['cpu-util']; - - const expectedKey = `aggregated-${Object.keys(inputs[0])[0]}`; - const expectedValue = (inputs[0]['cpu-util'] + inputs[1]['cpu-util']) / 2; - const expectedResult = { - [`${expectedKey}`]: expectedValue, - }; - - const aggregatedResult = aggregate(inputs, metrics, PARAMETERS); - - expect(aggregatedResult).toEqual(expectedResult); - }); - - it('should successfully calculate sum.', () => { - const inputs = [ - { - 'disk-io': 10, - }, - { - 'disk-io': 20, - }, - ]; - const metrics = ['disk-io']; - - const expectedKey = `aggregated-${Object.keys(inputs[0])[0]}`; - const expectedValue = inputs[0]['disk-io'] + inputs[1]['disk-io']; - const expectedResult = { - [`${expectedKey}`]: expectedValue, - }; - - const aggregatedResult = aggregate(inputs, metrics, PARAMETERS); - - expect(aggregatedResult).toEqual(expectedResult); - }); - }); -}); From 732ecad2bd01d6d6150600f029cc25dd2d34046f Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:02:31 +0400 Subject: [PATCH 2/9] revert(util): remove horizontal aggregator --- src/util/horizontal-aggregator.ts | 58 ------------------------------- 1 file changed, 58 deletions(-) delete mode 100644 src/util/horizontal-aggregator.ts diff --git a/src/util/horizontal-aggregator.ts b/src/util/horizontal-aggregator.ts deleted file mode 100644 index 5b16e90eb..000000000 --- a/src/util/horizontal-aggregator.ts +++ /dev/null @@ -1,58 +0,0 @@ -import {ERRORS} from './errors'; -import {getAggregationMethod} from './param-selectors'; - -import {STRINGS} from '../config'; - -import {PluginParams} from '../types/interface'; -import {Parameters} from '../types/parameters'; -import {AggregationResult} from '../types/aggregation'; - -const {InvalidAggregationParams} = ERRORS; -const {INVALID_AGGREGATION_METHOD, METRIC_MISSING} = STRINGS; - -/** - * Validates metrics array before applying aggregator. - * If aggregation method is `none`, then throws error. - */ -const checkIfMetricsAreValid = (metrics: string[], parameters: Parameters) => { - metrics.forEach(metric => { - const method = getAggregationMethod(metric, parameters); - - if (method === 'none') { - throw new InvalidAggregationParams(INVALID_AGGREGATION_METHOD(method)); - } - }); -}; - -/** - * Aggregates child node level metrics. Validates if metric aggregation type is `none`, then rejects with error. - * Otherwise iterates over inputs by aggregating per given `metrics`. - */ -export const aggregate = ( - inputs: PluginParams[], - metrics: string[], - parameters: Parameters -) => { - checkIfMetricsAreValid(metrics, parameters); - - return inputs.reduce((acc, input, index) => { - for (const metric of metrics) { - if (!(metric in input)) { - throw new InvalidAggregationParams(METRIC_MISSING(metric, index)); - } - - const accessKey = `aggregated-${metric}`; - acc[accessKey] = acc[accessKey] ?? 0; - acc[accessKey] += parseFloat(input[metric]); - - /** Checks for the last iteration. */ - if (index === inputs.length - 1) { - if (getAggregationMethod(metric, parameters) === 'avg') { - acc[accessKey] /= inputs.length; - } - } - } - - return acc; - }, {} as AggregationResult); -}; From fd28891fb1da4915089ff8d140fa78298683426a Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:03:17 +0400 Subject: [PATCH 3/9] test(util): fix param selectors --- src/__tests__/unit/util/param-selectors.test.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/__tests__/unit/util/param-selectors.test.ts b/src/__tests__/unit/util/param-selectors.test.ts index 70f00cfc2..e55a76fd4 100644 --- a/src/__tests__/unit/util/param-selectors.test.ts +++ b/src/__tests__/unit/util/param-selectors.test.ts @@ -1,5 +1,4 @@ import {getAggregationMethod} from '../../../util/param-selectors'; -import {PARAMETERS} from '../../../config'; describe('util/param-selector: ', () => { describe('getAggregationMethod(): ', () => { @@ -7,16 +6,14 @@ describe('util/param-selector: ', () => { const nonExistantMetric = 'mock'; const expectedResult = 'sum'; - expect(getAggregationMethod(nonExistantMetric, PARAMETERS)).toBe( - expectedResult - ); + expect(getAggregationMethod(nonExistantMetric)).toBe(expectedResult); }); it('returns aggregation method for `cpu-util`.', () => { const metric = 'cpu-util'; const expectedResult = 'avg'; - expect(getAggregationMethod(metric, PARAMETERS)).toBe(expectedResult); + expect(getAggregationMethod(metric)).toBe(expectedResult); }); }); }); From cab34a5b430d4329612fc8f24e00c81870080f1c Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:03:51 +0400 Subject: [PATCH 4/9] fix(util): use parameters from config --- src/util/param-selectors.ts | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/util/param-selectors.ts b/src/util/param-selectors.ts index d6c521bdf..454bffa7c 100644 --- a/src/util/param-selectors.ts +++ b/src/util/param-selectors.ts @@ -1,14 +1,11 @@ -import {Parameters} from '../types/parameters'; +import {PARAMETERS} from '../config'; /** * Returns aggregation method for given `unitName`. If doesn't exist then returns value `sum`. */ -export const getAggregationMethod = ( - unitName: string, - parameters: Parameters -) => { - if (`${unitName}` in parameters) { - return parameters[unitName].aggregation; +export const getAggregationMethod = (unitName: string) => { + if (`${unitName}` in PARAMETERS) { + return PARAMETERS[unitName].aggregation; } return 'sum'; From 7108c7c6dc16fe57cb245d2f8274e6a9279d715f Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:05:46 +0400 Subject: [PATCH 5/9] fix(models): remove parameters from time sync --- src/models/time-sync.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/models/time-sync.ts b/src/models/time-sync.ts index 9b2842b0a..86e788207 100644 --- a/src/models/time-sync.ts +++ b/src/models/time-sync.ts @@ -1,8 +1,11 @@ +import {isDate} from 'node:util/types'; import {DateTime, DateTimeMaybeValid, Interval} from 'luxon'; -import {STRINGS, PARAMETERS} from '../config'; + import {ERRORS} from '../util/errors'; import {getAggregationMethod} from '../util/param-selectors'; -import {isDate} from 'node:util/types'; + +import {STRINGS} from '../config'; + import {PluginParams} from '../types/interface'; import { PaddingReceipt, @@ -178,7 +181,7 @@ export const TimeSync = ( const inputKeys = Object.keys(input); return inputKeys.reduce((acc, key) => { - const method = getAggregationMethod(key, PARAMETERS); + const method = getAggregationMethod(key); if (key === 'timestamp') { const perSecond = normalizeTimePerSecond(input.timestamp, i); @@ -232,7 +235,7 @@ export const TimeSync = ( return acc; } - const method = getAggregationMethod(metric, PARAMETERS); + const method = getAggregationMethod(metric); if (method === 'avg' || method === 'sum') { acc[metric] = 0; @@ -290,7 +293,7 @@ export const TimeSync = ( const metrics = Object.keys(input); metrics.forEach(metric => { - const method = getAggregationMethod(metric, PARAMETERS); + const method = getAggregationMethod(metric); acc[metric] = acc[metric] ?? 0; if (metric === 'timestamp') { From 2f54f29b539f277d2b982da9fa933d5fde732e16 Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:06:54 +0400 Subject: [PATCH 6/9] feat(aggregate): implement strategy --- src/lib/aggregate.ts | 131 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 106 insertions(+), 25 deletions(-) diff --git a/src/lib/aggregate.ts b/src/lib/aggregate.ts index 135e87924..161795ee9 100644 --- a/src/lib/aggregate.ts +++ b/src/lib/aggregate.ts @@ -1,26 +1,120 @@ -import {AggregatorOperations, aggregator} from '../util/aggregation-storage'; +import {ERRORS} from '../util/errors'; +import {getAggregationMethod} from '../util/param-selectors'; +import {STRINGS} from '../config'; + +import {AggregationResult} from '../types/aggregation'; +import {PluginParams} from '../types/interface'; import {AggregationParams} from '../types/manifest'; +const {InvalidAggregationParams} = ERRORS; +const {INVALID_AGGREGATION_METHOD, METRIC_MISSING} = STRINGS; + /** - * + * Validates metrics array before applying aggregator. + * If aggregation method is `none`, then throws error. */ -const verticallyAggregate = ( - node: any, - storage: AggregatorOperations, - parentNode = node +const checkIfMetricsAreValid = (metrics: string[]) => { + metrics.forEach(metric => { + const method = getAggregationMethod(metric); + + if (method === 'none') { + throw new InvalidAggregationParams(INVALID_AGGREGATION_METHOD(method)); + } + }); +}; + +/** + * Aggregates child node level metrics. Validates if metric aggregation type is `none`, then rejects with error. + * Otherwise iterates over inputs by aggregating per given `metrics`. + */ +export const aggregateInputsIntoOne = ( + inputs: PluginParams[], + metrics: string[] ) => { + checkIfMetricsAreValid(metrics); + + return inputs.reduce((acc, input, index) => { + for (const metric of metrics) { + if (!(metric in input)) { + throw new InvalidAggregationParams(METRIC_MISSING(metric, index)); + } + + acc[metric] = acc[metric] ?? 0; + acc[metric] += parseFloat(input[metric]); + + /** Checks for the last iteration. */ + if (index === inputs.length - 1) { + if (getAggregationMethod(metric) === 'avg') { + acc[metric] /= inputs.length; + } + } + } + + return acc; + }, {} as AggregationResult); +}; + +/** + * Gets `i`th element from all children outputs and collects them in single array. + */ +const getIthElementsFromChildren = (children: any, i: number) => { + const values = Object.values(children); + + return values.map((value: any) => { + const output = value.outputs; + + return output[i]; + }); +}; + +/** + * 1. Gets the i'th element from each childrens outputs (treating children as rows and we are after a column of data). + * 2. Now we just aggregate over the `ithSliceOfOutputs` the same as we did for the normal outputs. + */ +const temporalAggregation = (node: any, metrics: string[]) => { + const outputs: PluginParams[] = []; + const values: any = Object.values(node.children); + + for (let i = 0; i < values[0].outputs.length; i++) { + const ithSliceOfOutputs = getIthElementsFromChildren(node.children, i); + outputs.push(aggregateInputsIntoOne(ithSliceOfOutputs, metrics)); + } + + return outputs; +}; + +/** + * Navigates the tree depth first, bottom up, + * left to right aggregating the component nodes and then the grouping nodes will be aggregated + * only when all their child nodes have been aggregated. + * 1. Aggregates all the children. + * 2. At this point you can be positive all your children have been aggregated and so you can now work on aggregating yourself. + * 3. It's component node, аggregates just the outputs of THIS component node (horizontal/component aggregation). + * 4. Else it's grouping node, first does temporal aggregation. This assumes everything is on the same time-grid. + * The outputs of the grouping node are the aggregated time bucketed outputs of it's children. + * 5. Now a grouping node has it's own outputs, it can horizotnally aggregate them. + */ +const aggregateNode = (node: any, aggregationParams: AggregationParams) => { + const {metrics, type} = aggregationParams; + if (node.children) { for (const child in node.children) { - verticallyAggregate(node.children[child], storage, node); + aggregateNode(node.children[child], aggregationParams); } } - if (node.inputs) { - storage.set(node.outputs); + if (!node.children) { + if (type === 'horizontal' || type === 'both') { + node.aggregated = aggregateInputsIntoOne(node.outputs, metrics); + } + } else { + if (type === 'vertical' || type === 'both') { + const outputs = temporalAggregation(node, metrics); + node.outputs = outputs; + node.aggregated = aggregateInputsIntoOne(outputs, metrics); + } } - - parentNode.aggregated = storage.get(); }; /** @@ -32,21 +126,8 @@ export const aggregate = (tree: any, aggregationParams?: AggregationParams) => { return tree; } - const {metrics, type} = aggregationParams; - const copyOfTree = structuredClone(tree); - const storage = aggregator(metrics); - - if (type === 'vertical') { - verticallyAggregate(copyOfTree, storage); - - return copyOfTree; - } - - if (type === 'horizontal') { - // horizontallyAggregte(); - // return copyOfTree - } + aggregateNode(copyOfTree, aggregationParams); return copyOfTree; }; From d24957d8b363ba7c543b142d55dc4237169f23b2 Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:07:20 +0400 Subject: [PATCH 7/9] chore(examples): tune nesting-2 sample from functional --- examples/impls/functional/nesting-2.yml | 32 +++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/examples/impls/functional/nesting-2.yml b/examples/impls/functional/nesting-2.yml index 1f670a552..f20c9ff4c 100644 --- a/examples/impls/functional/nesting-2.yml +++ b/examples/impls/functional/nesting-2.yml @@ -30,17 +30,33 @@ tree: duration: 10 cpu-util: 50 energy-network: 1 + - timestamp: 2023-07-06T00:10 + duration: 10 + cpu-util: 20 + energy-network: 2 + - timestamp: 2023-07-06T00:20 + duration: 10 + cpu-util: 30 + energy-network: 3 child:0-2: children: child-0-2-1: pipeline: - sci-e config: null - inputs: + inputs: - timestamp: 2023-07-06T00:00 duration: 10 - cpu-util: 50 + cpu-util: 10 + energy-network: 1 + - timestamp: 2023-07-06T00:10 + duration: 10 + cpu-util: 10 energy-network: 1 + - timestamp: 2023-07-06T00:20 + duration: 10 + cpu-util: 40 + energy-network: 5 child-0-2-2: pipeline: - sci-e @@ -48,5 +64,13 @@ tree: inputs: - timestamp: 2023-07-06T00:00 duration: 10 - cpu-util: 50 - energy-network: 1 + cpu-util: 90 + energy-network: 5 + - timestamp: 2023-07-06T00:10 + duration: 10 + cpu-util: 100 + energy-network: 10 + - timestamp: 2023-07-06T00:20 + duration: 10 + cpu-util: 80 + energy-network: 9 From 4d0023a669c4b6ec6bffb0da4e78690ca43d8f7d Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 20:12:44 +0400 Subject: [PATCH 8/9] revert(util): remove aggregation storage --- src/util/aggregation-storage.ts | 51 --------------------------------- 1 file changed, 51 deletions(-) delete mode 100644 src/util/aggregation-storage.ts diff --git a/src/util/aggregation-storage.ts b/src/util/aggregation-storage.ts deleted file mode 100644 index c52c97c29..000000000 --- a/src/util/aggregation-storage.ts +++ /dev/null @@ -1,51 +0,0 @@ -import {PluginParams} from '../types/interface'; - -export type AggregatorOperations = { - set: (current: PluginParams[]) => PluginParams[]; - get: () => PluginParams[]; - clear: () => PluginParams[]; -}; - -export const aggregator = (metrics: string[]): AggregatorOperations => { - let storage: PluginParams[] = []; - - const set = (current: PluginParams[]) => { - if (storage.length === 0) { - storage = current; - - return storage; - } - - storage = current.reduce((acc: PluginParams[], item, index) => { - const keys = Object.keys(item); - const member: PluginParams = {}; - - keys.forEach(key => { - if (metrics.includes(key)) { - member[key] = member[key] ?? 0; - member[key] = item[key] + storage[index][key]; - } - }); - - acc.push(member); - - return acc; - }, []); - - return storage; - }; - - const get = () => storage; - - const clear = () => { - storage = []; - - return storage; - }; - - return { - set, - get, - clear, - }; -}; From 79a12f6af17dc2ff5c34bb4a55ff8370f5ec209f Mon Sep 17 00:00:00 2001 From: Narek Hovhannisyan Date: Tue, 20 Feb 2024 23:44:54 +0400 Subject: [PATCH 9/9] chore(lib): make aggregate inputs into one private --- src/lib/aggregate.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/lib/aggregate.ts b/src/lib/aggregate.ts index 161795ee9..119df40de 100644 --- a/src/lib/aggregate.ts +++ b/src/lib/aggregate.ts @@ -28,10 +28,7 @@ const checkIfMetricsAreValid = (metrics: string[]) => { * Aggregates child node level metrics. Validates if metric aggregation type is `none`, then rejects with error. * Otherwise iterates over inputs by aggregating per given `metrics`. */ -export const aggregateInputsIntoOne = ( - inputs: PluginParams[], - metrics: string[] -) => { +const aggregateInputsIntoOne = (inputs: PluginParams[], metrics: string[]) => { checkIfMetricsAreValid(metrics); return inputs.reduce((acc, input, index) => {