forked from launchdarkly/node-server-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreaming.js
101 lines (86 loc) · 2.72 KB
/
streaming.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
var EventSource = require('./eventsource');
var noop = function(){};
function StreamProcessor(sdk_key, config, requestor) {
var processor = {},
store = config.feature_store,
es;
processor.start = function(fn) {
var cb = fn || noop;
es = new EventSource(config.stream_uri + "/flags",
{
agent: config.proxy_agent,
headers: {'Authorization': sdk_key}
});
es.onerror = function(err) {
cb(err);
};
es.addEventListener('put', function(e) {
config.logger.debug("[LaunchDarkly] Received put event")
if (e && e.data) {
var flags = JSON.parse(e.data);
store.init(flags, function() {
cb();
})
} else {
cb(new Error("[LaunchDarkly] Unexpected payload from event stream"));
}
});
es.addEventListener('patch', function(e) {
config.logger.debug("[LaunchDarkly] Received patch event")
if (e && e.data) {
var patch = JSON.parse(e.data);
store.upsert(patch.data.key, patch.data);
} else {
config.logger.error("[LaunchDarkly] Unexpected payload from event stream")
}
});
es.addEventListener('delete', function(e) {
config.logger.debug("[LaunchDarkly] Received delete event")
if (e && e.data) {
var data = JSON.parse(e.data),
key = data.path.charAt(0) === '/' ? data.path.substring(1) : data.path, // trim leading '/'
version = data.version;
store.delete(key, version);
} else {
config.logger.error("[LaunchDarkly] Unexpected payload from event stream");
}
});
es.addEventListener('indirect/put', function(e) {
config.logger.debug("[LaunchDarkly] Received indirect put event")
requestor.request_all_flags(function (err, flags) {
if (err) {
cb(err);
} else {
store.init(flags, function() {
cb();
})
}
})
});
es.addEventListener('indirect/patch', function(e) {
config.logger.debug("[LaunchDarkly] Received indirect patch event")
if (e && e.data) {
var key = data.charAt(0) === '/' ? data.substring(1) : data;
requestor.request_flag(key, function(err, flag) {
if (err) {
config.logger.error("[LaunchDarkly] Unexpected error requesting feature flag");
} else {
store.upsert(key, flag);
}
})
} else {
config.logger.error("[LaunchDarkly] Unexpected payload from event stream");
}
});
}
processor.stop = function() {
if (es) {
es.close();
}
}
processor.close = function() {
this.stop();
}
return processor;
}
module.exports = StreamProcessor;