-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqserver.js
166 lines (158 loc) · 6.55 KB
/
qserver.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/* qserver.jss - Richard Revis - http://theplanis.com
* This is a DEMONSTRATION queue server written for node.js
* It is recommended that you implement your own in something more suitable for a production environment
* Interesting parts of the file:
* this.workRequests - format of the work request packet
* this.requestHandler - format of the messsage as the client expects it
*/
function QManager() {
// Object keyed by UID
// Attribs: fn,data,{prelimResults},{preferences},[sent],lastSent,callback
this.workRequests = {
// Example work request that does no work and echos the final verified answer to the log
'1a':{'fn':'function(d){return d;}','data':'2','callback':function(d){sys.puts(d);}}
};
// Object keyed by IP
// Attribs: sent, returned, verified
this.clientScores = {};
// List of known-good clients (e.g. servers)
// this.trustedClients = [];
this.configuration = {
'queueemptyretry':20000, // ms
'blacklistafter':20, // failed attempts
'maxparallel':10, // concurrent processing
'clienttimeout':20000, // max time to wait before retrying with a new client (ms)
'minconfirmations':2 // min number of matching results required to accept
};
// Accepts a request from the client and returns [code,response]
this.requestHandler = function(clientIP) {
if (plen(this.workRequests) != 0) {
// returns [key,wr object]
var wr = this.getNextWorkRequest(clientIP);
// key is false if no work is available
if (wr[0]) {
var key = wr[0];
var wr = wr[1];
var response = '{"uid":"' + key + '",';
response += '"fnstr":"' + escape(wr.fn) + '",';
response += '"data":"' + escape(wr.data) + '"}';
return [200,response];
} else if (wr[1] == 'blacklisted') {
// Client blacklisted
return [404,'{"code":"abandon"}'];
}
}
// No work - try later
response = '{"code":"retry","period":"' + this.configuration.queueemptyretry + '"}';
return [200,response];
};
// Pulls a work request off the queue
this.getNextWorkRequest = function(clientIP) {
if (!this.clientScores[clientIP]) {
this.clientScores[clientIP] = {'sent':0,'returned':0};
}
var clientStats = this.clientScores[clientIP];
if (clientStats.sent < (clientStats.returned + this.configuration.blacklistafter)) {
// Pick work from the queue
for (key in this.workRequests) {
var wr = this.workRequests[key];
var currentTime = new Date();
if (!wr.lastSent ||
wr.lastSent.getTime()+this.configuration.clienttimeout < currentTime.getTime()) {
// Too much time has passed since a request was sent and work has not been
// completed so ignore the max parallel requirements for this work packet
var ignoreMaxParallel = true;
}
if (!wr.sent) {wr.sent = [];}
if (!wr.prelimResults) {wr.prelimResults = {};}
if (ignoreMaxParallel ||
(wr.sent.length < (plen(wr.prelimResults) + this.configuration.maxparallel))) {
// Haven't yet exceeded the number of clients we can send this packet to
// Check we have't sent it to this client
for (client in wr.sent) {
if (wr.sent[client] == clientIP) {
var sentBefore = true;
}
}
if (!sentBefore) {
clientStats.sent++;
wr.sent.push(clientIP);
wr.lastSent = currentTime;
// Return valid work request
return [key,wr];
}
}
}
} else {
// Client is blacklisted for too many retries
return [false,'blacklisted'];
}
// Couldn't send a work request for a non-blacklist reason
return [false,'retrylater'];
};
// Accepts an result upload
this.responseHandler = function(clientIP,responseData) {
// Parse results
var rjs = JSON.parse(responseData);
// ID client and result supplied, then validate
var client = this.clientScores[clientIP];
var wr = this.workRequests[rjs.pktid];
var ref;
client.returned++;
// Check and see if we have enough results to close this work request
if (!wr.prelimResults) { wr.prelimResults = {}; }
var q = wr.prelimResults[rjs.result] += 1;
// Catch first result condition
if (isNaN(q)) { wr.prelimResults[rjs.result] = 1; }
var r = this.checkResults(wr);
if (r) {
// Sufficient confirmation to close this request
wr.callback(r);
delete this.workRequests[rjs.pktid];
}
return [200,'{"code":"success"}'];
};
// Check if any of the supplied results have been sufficiently verified
this.checkResults = function(wr) {
for (r in wr.prelimResults) {
if (wr.prelimResults[r] >= this.configuration.minconfirmations) {
return r;
}
}
return false;
};
}
function plen(obj) {
var i = 0;
for (k in obj) {i++;}
return i;
}
function sendResponse(res,response) {
res.sendHeader(response[0],{'Content-Type':'application/xml'});
sys.puts(response.join(': '));
res.sendBody(response[1]);
res.finish();
}
var sys = require('sys'),
http = require('http');
var qm = new QManager();
http.createServer(function (req, res) {
var clientIP = req.headers['host'].split(':')[0];
if (req.method == 'POST') {
req.setBodyEncoding('utf-8');
var body = '';
req.addListener('body', function(chunk) {
body += chunk;
});
req.addListener('complete', function() {
var response = qm.responseHandler(clientIP,body);
sendResponse(res,response);
});
} else {
var response = qm.requestHandler(clientIP);
sendResponse(res,response);
}
}).listen(8011);
// In the tested configuration this was running on port 8011 behind a
// nginx proxy redirecting /queue/ to this node.js instance
sys.puts('Server running at http://127.0.0.1:8011/');