Skip to content

Commit

Permalink
fix: auto-cork, sse not working
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <[email protected]>
  • Loading branch information
mattkrick committed Dec 1, 2023
1 parent b9bcd69 commit d486bc3
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 219 deletions.
12 changes: 5 additions & 7 deletions packages/server/graphql/httpGraphQLHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ const httpGraphQLBodyHandler = async (
}
}
const response = await handleGraphQLTrebuchetRequest(body, connectionContext)
res.cork(() => {
if (response) {
res.writeHeader('content-type', 'application/json').end(JSON.stringify(response))
} else {
res.writeStatus('200').end()
}
})
if (response) {
res.writeHeader('content-type', 'application/json').end(JSON.stringify(response))
} else {
res.writeStatus('200').end()
}
}

const contentTypeBodyParserMap = {
Expand Down
4 changes: 1 addition & 3 deletions packages/server/graphql/intranetGraphQLHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ const intranetHttpGraphQLHandler = uWSAsyncHandler(async (res: HttpResponse, req
isPrivate,
isAdHoc: true
})
res.cork(() => {
res.writeHeader('content-type', 'application/json').end(JSON.stringify(result))
})
res.writeHeader('content-type', 'application/json').end(JSON.stringify(result))
} catch (e) {
res.writeStatus('502').end()
}
Expand Down
14 changes: 5 additions & 9 deletions packages/server/jiraImagesHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ const servePlaceholderImage = async (res: HttpResponse) => {
path.join(__dirname, jiraPlaceholder.slice(__webpack_public_path__.length))
)
}
res.cork(() => {
res.writeStatus('200').writeHeader('Content-Type', 'image/png').end(jiraPlaceholderBuffer)
})
res.writeStatus('200').writeHeader('Content-Type', 'image/png').end(jiraPlaceholderBuffer)
}

const jiraImagesHandler = uWSAsyncHandler(async (res: HttpResponse, req: HttpRequest) => {
Expand All @@ -53,12 +51,10 @@ const jiraImagesHandler = uWSAsyncHandler(async (res: HttpResponse, req: HttpReq
return
}

res.cork(() => {
res
.writeStatus('200')
.writeHeader('Content-Type', cachedImage.contentType)
.end(cachedImage.imageBuffer)
})
res
.writeStatus('200')
.writeHeader('Content-Type', cachedImage.contentType)
.end(cachedImage.imageBuffer)
})

export default jiraImagesHandler
4 changes: 3 additions & 1 deletion packages/server/pipeStreamOverResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ const pipeStreamOverResponse = (

.on('error', () => {
if (!res.aborted) {
res.writeStatus('500').end()
res.cork(() => {
res.writeStatus('500').end()
})
}
readStream.destroy()
})
Expand Down
42 changes: 28 additions & 14 deletions packages/server/safetyPatchRes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {HttpResponse, RecognizedString} from 'uWebSockets.js'

type Header = [key: RecognizedString, value: RecognizedString]

const safetyPatchRes = (res: HttpResponse) => {
if (res._end) {
throw new Error('already patched')
Expand All @@ -17,14 +19,28 @@ const safetyPatchRes = (res: HttpResponse) => {
return res
}

// Cache writes until `.end()` gets called. Then flush
res.status = ''
res.headers = [] as Header[]

const flush = <T>(thunk: () => T) => {
return res._cork(() => {
if (res.status) res._writeStatus(res.status)
res.headers.forEach((header: Header) => {
res._writeHeader(...header)
})
return thunk()
})
}

res._end = res.end
res.end = (body?: RecognizedString) => {
if (res.done) {
console.warn(`uWS: Called end after done`)
}
if (res.done || res.aborted) return res
res.done = true
return res._end(body)
return flush(() => res._end(body))
}

res._close = res.close
Expand All @@ -38,12 +54,8 @@ const safetyPatchRes = (res: HttpResponse) => {
}

res._cork = res.cork
res.cork = (cb: () => void) => {
if (res.done) {
console.warn(`uWS: Called cork after done`)
}
if (res.done || res.aborted) return res
return res._cork(cb)
res.cork = () => {
throw new Error('safetyPatchRes applies the cork for you, do not call directly')
}

res._tryEnd = res.tryEnd
Expand All @@ -52,7 +64,7 @@ const safetyPatchRes = (res: HttpResponse) => {
console.warn(`uWS: Called tryEnd after done`)
}
if (res.done || res.aborted) return [true, true]
return res._tryEnd(fullBodyOrChunk, totalSize)
return flush(() => res._tryEnd(fullBodyOrChunk, totalSize))
}

res._write = res.write
Expand All @@ -69,17 +81,17 @@ const safetyPatchRes = (res: HttpResponse) => {
if (res.done) {
console.warn(`uWS: Called writeHeader after done`)
}
if (res.done || res.aborted) return res
return res._writeHeader(key, value)
res.headers.push([key, value])
return res
}

res._writeStatus = res.writeStatus
res.writeStatus = (status: RecognizedString) => {
if (res.done) {
console.error(`uWS: Called writeStatus after done ${status}`)
}
if (res.done || res.aborted) return res
return res._writeStatus(status)
res.status = status
return res
}

res._upgrade = res.upgrade
Expand All @@ -88,13 +100,15 @@ const safetyPatchRes = (res: HttpResponse) => {
console.error(`uWS: Called upgrade after done`)
}
if (res.done || res.aborted) return
return res._upgrade(...args)
return res._cork(() => {
res._upgrade(...args)
})
}

res._getRemoteAddressAsText = res.getRemoteAddressAsText
res.getRemoteAddressAsText = () => {
if (res.done) {
console.error(`uWS: Called upgrade after done`)
console.error(`uWS: Called getRemoteAddressAsText after done`)
}
if (res.done || res.aborted) return Buffer.from('')
return res._getRemoteAddressAsText()
Expand Down
7 changes: 2 additions & 5 deletions packages/server/socketHandlers/handleUpgrade.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import {WebSocketBehavior} from 'uWebSockets.js'
import {TrebuchetCloseReason} from '../../client/types/constEnums'
import safetyPatchRes from '../safetyPatchRes'
import {isAuthenticated} from '../utils/authorization'
import checkBlacklistJWT from '../utils/checkBlacklistJWT'
import getQueryToken from '../utils/getQueryToken'
import sendToSentry from '../utils/sendToSentry'
import uwsGetIP from '../utils/uwsGetIP'

const handleUpgrade: WebSocketBehavior<void>['upgrade'] = async (res, req, context) => {
safetyPatchRes(res)
const protocol = req.getHeader('sec-websocket-protocol')
if (protocol !== 'trebuchet-ws') {
sendToSentry(new Error(`WebSocket error: invalid protocol: ${protocol}`))
Expand All @@ -19,22 +21,17 @@ const handleUpgrade: WebSocketBehavior<void>['upgrade'] = async (res, req, conte
res.writeStatus('401').end()
return
}
res.onAborted(() => {
res.aborted = true
})

const key = req.getHeader('sec-websocket-key')
const extensions = req.getHeader('sec-websocket-extensions')
const ip = uwsGetIP(res, req)
const {sub: userId, iat} = authToken
// ALL async calls must come after the message listener, or we'll skip out on messages (e.g. resub after server restart)
const isBlacklistedJWT = await checkBlacklistJWT(userId, iat)
if (res.aborted) return
if (isBlacklistedJWT) {
res.writeStatus('401').end(TrebuchetCloseReason.EXPIRED_SESSION)
return
}

res.upgrade({ip, authToken}, key, protocol, extensions, context)
}

Expand Down
163 changes: 0 additions & 163 deletions packages/server/sse/whm.js

This file was deleted.

16 changes: 7 additions & 9 deletions packages/server/utils/SAMLHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ const SAMLHandler = uWSAsyncHandler(async (res: HttpResponse, req: HttpRequest)
redirectOnError(res, message)
return
}
res.cork(() => {
res
.writeStatus('302')
.writeHeader(
'location',
`/saml-redirect?userId=${userId}&token=${authToken}&isNewUser=${isNewUser}&isPatient0=${user.isPatient0}`
)
.end()
})
res
.writeStatus('302')
.writeHeader(
'location',
`/saml-redirect?userId=${userId}&token=${authToken}&isNewUser=${isNewUser}&isPatient0=${user.isPatient0}`
)
.end()
})

export default SAMLHandler
Loading

0 comments on commit d486bc3

Please sign in to comment.