Skip to content
This repository has been archived by the owner on Feb 28, 2022. It is now read-only.

Commit

Permalink
Added simple tap function #14
Browse files Browse the repository at this point in the history
  • Loading branch information
trieloff committed Oct 2, 2018
1 parent 0e3e51c commit 2b95d71
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
30 changes: 28 additions & 2 deletions src/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ const nopLogger = {
* @return {Promise<Context>} Promise which resolves to a parameters to be added to the context.
*/

/**
* Tap function
*
* @typedef {function(context, _action, index)} tapFunction
* @callback tapFunction
* @param {Context} context Pipeline execution context that is passed along
* @param {Action} action Pipeline action define during construction
* @param {number} index index of the function invocation order
*/

/**
* Pipeline that allows to execute a list of functions in order. The pipeline consists of 3
* major function lists: `pre`, `once` and, `post`. the functions added to the `pre` list are
Expand All @@ -67,6 +77,8 @@ class Pipeline {
this._oncef = null;
// functions that are executed after
this._posts = [];
// functions that are executed before each step
this._taps = [];
}

/**
Expand All @@ -91,6 +103,18 @@ class Pipeline {
return this;
}

/**
* Adds a tap (observing) function to the pipeline. taps are executed for every
* single pipeline step and best used for validation, and logging. taps don't have
* any effect, i.e. the return value of a tap function is ignored.
* @param {pipelineFunction} f function to be executed in every step. Effects are ignored.
*/
tap(f) {
this._taps.push(f);
this._last = this._taps;
return this;
}

/**
* Adds a condition to the previously defined `pre` or `post` function. The previously defined
* function will only be executed if the predicate evaluates to something truthy or returns a
Expand Down Expand Up @@ -163,12 +187,14 @@ class Pipeline {
* @param {pipelineFunction} currFunction Function that is currently "reduced"
* @returns {Promise} Promise resolving to the new value of the accumulator
*/
const merge = (currContext, currFunction) => {
const merge = (currContext, currFunction, index) => {
// copy the pipeline payload into a new object to avoid modifications
const mergedargs = _.merge({}, currContext);

// log the function that is being called and the parameters of the function
this._action.logger.silly('processing ', { function: `${currFunction}`, params: mergedargs });
this._action.logger.silly('processing ', { function: `${currFunction}`, index, params: mergedargs });

this._taps.map(f => f(mergedargs, this._action, index));

return Promise.resolve(currFunction(mergedargs, this._action))
.then((value) => {
Expand Down
8 changes: 8 additions & 0 deletions test/testPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,12 @@ describe('Testing Pipeline', () => {
done();
});
});

it('Executes taps', (done) => {
new Pipeline({ logger })
.pre(() => ({ foo: 'bar' }))
.post(() => ({ bar: 'baz' }))
.tap((c, a, i) => { if (i === 1) { done(); } })
.run();
});
});

0 comments on commit 2b95d71

Please sign in to comment.