forked from igroff/forkulator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.coffee
226 lines (201 loc) · 9.4 KB
/
server.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
express = require 'express'
connect = require 'connect'
log = require 'simplog'
path = require 'path'
fs = require 'fs'
Promise = require 'bluebird'
through = require 'through'
_ = require 'lodash'
child_process = require 'child_process'
body_parser = require 'body-parser'
util = require 'util'
stream = require 'stream'
Promise.config({cancellation: true})
dieInAFire = (message, errorCode=1) ->
log.error message
process.exit errorCode
config=
outputDirectory: process.env.FORKULATOR_TEMP ||
process.env.TEMP ||
process.env.TMPDIR || dieInAFire 'I could not find a place to write my output'
maxConcurrentRequests: process.env.MAX_CONCURRENCY || 5
commandPath: process.env.COMMAND_PATH || path.join __dirname, "commands"
debug: process.env.DEBUG || "false"
# used to uniquely identify requests throughout the lifetime of forkulator
requestCounter = 0
# used to count active requests so throttling, if desired, can be done
countOfCurrentlyExecutingRequests = 0
createTempFileName = (suffix) ->
process.pid + requestCounter + "-" + suffix
createTempFilePath = (prefix) ->
path.join config.outputDirectory, createTempFileName(prefix)
executeThrottled = (req, res) ->
requestCounter++
# if we've not disabled throttling ( set to -1 ) then we see that we're running no
# more than our maximum allowed concurrent requests
if config.maxConcurrentRequests is -1 || (countOfCurrentlyExecutingRequests < config.maxConcurrentRequests)
log.debug 'executing request'
countOfCurrentlyExecutingRequests++
handleRequest(req,res).then(() -> countOfCurrentlyExecutingRequests--)
else
# deny execution of request, tell caller to try again
log.warn "too busy to handle request"
res.status(503).send(message: "too busy, try again later").end()
waitForEvent = (resolveEvent, emitter, rejectEvent='error') ->
new Promise (resolve, reject) ->
emitter.on resolveEvent, () -> resolve(emitter)
emitter.on(rejectEvent, reject) if rejectEvent
promiseToEnd = (stream) ->
waitForEvent 'end', stream
openForWrite = (path) ->
waitForEvent 'open', fs.createWriteStream(path)
openForRead = (path) ->
waitForEvent 'open', fs.createReadStream(path)
writeAndClose = (data, stream) ->
stream.end data
waitForEvent 'finish', stream
logAnyError = (message) ->
logCallback = (e) ->
if e
log.error("#{message} :\n", e)
returnWhen = (object, theseComplete) ->
Promise.props(theseComplete).then (completed) -> _.extend(object, completed)
handleRequest = (req, res) ->
# we're gonna do our best to return json in all cases
res.type('application/json')
createStreamTransform = () ->
through (data) ->
this.emit 'data', data.toString().replace(/\n/g, "\\n"),
null,
autoDestroy: false
createDisposableContext = () ->
promiseForContext = new Promise (resolve) ->
context =
commandFilePath: path.join(config.commandPath, req.path)
commandPath: req.path
outfilePath: createTempFilePath('stdout')
errfilePath: createTempFilePath('stderr')
stdinfilePath: createTempFilePath('stdin')
requestData:
url: req.url
query: if _.isEmpty(req.query) then null else req.query
body: if _.isEmpty(req.body) then null else req.body
headers: req.headers
path: req.path
resolve context
# during disposal we'll go ahead and close any streams we have lying
# around.
promiseForContext.disposer (context) ->
log.debug "disposing of context"
context.outfileStream.end() if context.outfileStream?.fd
context.errfileStream.end() if context.errfileStream?.fd
fs.close(context.stdinfileStream.fd) if context.stdinfileStream?.fd
if config.debug == "false"
if context.outfileStream
fs.unlink(context.outfileStream.path, logAnyError("Error removing stdout file"))
if context.errfileStream
fs.unlink(context.errfileStream.path, logAnyError("Error removing stderr file"))
if context.stdinfileStream
fs.unlink(context.stdinfileStream.path, logAnyError("Error removing stdin file"))
requestPipeline = (context) ->
# we start by 'passing in' our context to the promise chain
promiseToHandleRequest = Promise.resolve(context)
.then (context) ->
# special case for someone trying to hit /, for which there can never
# be a valid command. We're just gonna throw something that looks like the
# same error that would be raised in the case of someone calling a more 'valid'
# but still no existent path
if context.commandPath is "/"
err = new Error "Invalid command path: #{context.commandPath}"
err.code = "ENOENT"
err.path = context.commandFilePath
throw err
context
# then we're going to open the file that will contain the information
# we'll be passing to the command via stdin
.then (context) ->
returnWhen(context, stdinfileStream: openForWrite(context.stdinfilePath))
# and now we write our data to the stdin file
.then (context) ->
returnWhen(context, stdinWriteStream: writeAndClose(JSON.stringify(context.requestData), context.stdinfileStream))
# We'll be opening all the files that will comprise the stdio data for use by the
# command on execution. Child_process requires that any stream objects it uses
# already have an FD available when spawn is called so we must wait for those
# to emit the 'open' event before we can spawn our command process
.then (context) ->
whenTheseAreDone =
stdinfileStream: openForRead(context.stdinWriteStream.path)
outfileStream: openForWrite(context.outfilePath)
errfileStream: openForWrite(context.errfilePath)
returnWhen(context, whenTheseAreDone)
# now we fire up the command process as requested, piping in the
# request data we have via stdin
.then (context) ->
log.debug 'starting process: %s', context.commandFilePath
commandProcess = child_process.spawn(context.commandFilePath, [], stdio: ['pipe', context.outfileStream, context.errfileStream])
context.stdinfileStream.pipe commandProcess.stdin
new Promise (resolve, reject) ->
# When the process completes and closes all the stdio stream
# associated with it, we'll get a close, except the stdio streams don't get
# closed so we do tht all in our context disposer
commandProcess.on 'close', (exitCode, signal) ->
context.exitCode = exitCode
context.signal = signal
resolve(context)
commandProcess.on 'error', (e) -> log.error "error from commandProcess: #{util.inspect e}"; reject(e)
# if the process failes to start, in certain cases, we can get an error
# writing to stdin
commandProcess.stdin.on 'error', (e) -> log.error "error from commandProcess.stdin: #{e}"; reject(e)
.then (context) ->
# the command execution is complete when we get the 'close' event from commandProcess
# in the case of error we'll be passing back all the information we have from the close event
# along with the contents of stderr and stdout generated during command execution
log.debug "command (#{context.commandFilePath}) completed exit code #{context.exitCode}"
if context.exitCode != 0 || context.signal
res.status 500
res.write "{\"exitCode\":#{context.exitCode},\"signal\":\"#{context.signal}\",\"output\":\""
errStream = fs.createReadStream context.errfileStream.path
outStream = fs.createReadStream context.outfileStream.path
errStream.pipe(createStreamTransform()).pipe(res, end: false)
outStream.pipe(createStreamTransform()).pipe(res, end: false)
Promise.join(promiseToEnd(errStream), promiseToEnd(outStream))
.then(() -> res.write('"}'))
else
# no error, so we're just gonna stream our output generated by
# the command back on the response
commandOutputStream = fs.createReadStream(context.outfileStream.path)
commandOutputStream.pipe(res)
promiseToEnd(commandOutputStream)
.catch (e) ->
# first we check to see if we really just have a request for a non
# existent command, in which case we'll return a 404
if e.code is 'ENOENT' and e.path is context.commandFilePath
log.warn "No command found for #{context.commandPath}"
res.status(404)
else
log.error "something awful happened while running #{context.commandPath}\n#{e}\n#{e.stack}"
errorObject=
message: "error"
error: e.message
if req.query["debug"]
errorObject.stack = e.stack
res.status(500).send(errorObject)
.finally () -> res.end()
res.on "close", () ->
log.warn("response closed by client for command #{context.commandPath}")
promiseToHandleRequest.cancel()
promiseToHandleRequest
Promise.using(createDisposableContext(), requestPipeline)
app = express()
app.use connect()
# simply parse all bodies as string so we can pass whatever it
# is to the command, we treat the in and out of the command as
# opaque simply feeding in what we get
app.use body_parser.text(type: () -> true)
app.use((req, res, next) -> executeThrottled(req, res))
listenPort = process.env.PORT || 3000
log.debug "starting app " + process.env.APP_NAME
log.debug "listening on " + listenPort
log.debug "debug logging enabled"
log.debug config
app.listen listenPort