-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathindex.js
120 lines (96 loc) · 3.22 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
const Kinesis = require('aws-sdk/clients/kinesis');
const debug = require('debug')('engine:kinesis');
const A = require('async');
const _ = require('lodash');
function KinesisEngine (script, ee, helpers) {
this.script = script;
this.ee = ee;
this.helpers = helpers;
return this;
}
KinesisEngine.prototype.createScenario = function createScenario (scenarioSpec, ee) {
const tasks = scenarioSpec.flow.map(rs => this.step(rs, ee));
return this.compile(tasks, scenarioSpec.flow, ee);
};
KinesisEngine.prototype.step = function step (rs, ee) {
const self = this;
if (rs.loop) {
const steps = rs.loop.map(loopStep => this.step(loopStep, ee));
return this.helpers.createLoopWithCount(rs.count || -1, steps, {});
}
if (rs.log) {
return function log (context, callback) {
return process.nextTick(function () { callback(null, context); });
};
}
if (rs.think) {
return this.helpers.createThink(rs, _.get(self.config, 'defaults.think', {}));
}
if (rs.function) {
return function (context, callback) {
let func = self.script.config.processor[rs.function];
if (!func) {
return process.nextTick(function () { callback(null, context); });
}
return func(context, ee, function () {
return callback(null, context);
});
};
}
if (rs.putRecord) {
return function putRecord (context, callback) {
const data = typeof rs.putRecord.data === 'object'
? JSON.stringify(rs.putRecord.data)
: String(rs.putRecord.data);
const params = {
Data: data,
PartitionKey: rs.putRecord.partitionKey,
StreamName: rs.putRecord.streamName || self.script.config.target,
ExplicitHashKey: rs.putRecord.explicitHashKey,
SequenceNumberForOrdering: rs.putRecord.sequenceNumberForOrdering
};
ee.emit('request');
context.kinesis.putRecord(params, function (err, data) {
if (err) {
debug(err);
ee.emit('error', err);
return callback(err, context);
}
ee.emit('response', 0, 0, context._uid); // FIXME
debug(data);
return callback(null, context);
});
};
}
return function (context, callback) {
return callback(null, context);
};
};
KinesisEngine.prototype.compile = function compile (tasks, scenarioSpec, ee) {
const self = this;
return function scenario (initialContext, callback) {
const init = function init (next) {
let opts = {...self.script.config.kinesis};
if (!opts.region) {
console.log(`WARNING: no AWS region provided. Defaulting to us-east-1`); // TODO: a 'warning' event
opts.region = 'us-east-1';
}
initialContext.kinesis = new Kinesis(opts);
ee.emit('started');
return next(null, initialContext);
};
let steps = [init].concat(tasks);
A.waterfall(
steps,
function done (err, context) {
if (err) {
debug(err);
}
return callback(err, context);
});
};
};
module.exports = KinesisEngine;