-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsse.js
135 lines (114 loc) · 4 KB
/
sse.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// ============================================================================|
/*
* Project : HEARIT.IO
*
* Developing an innovative connected/smart home intelligent
* management system for the needs of blind or visually impaired
* persons.
*
* Purpose:
*
* An example of a GET routing handler for a Service Send Event implementation
* (used in https://app.hearit.io/sse).
*
* Author: Emil Usunov, hearit.io
*
* License: MIT
*
*/
// ============================================================================|
'use strict'
const {
messageSourceId,
pingIntervalMs
} = require('./constants.js')
// ----------------------------------------------------------------------------|
// A SSE (Service Send Event) route handler
// ----------------------------------------------------------------------------|
module.exports = async function (request, reply) {
const fastify = this
try {
// Get a last event id form the headers or form a query
let lastEventId = request.headers['last-event-id']
if (typeof lastEventId === typeof undefined) {
lastEventId = request.query.id
}
// Subscribe for a tunnel
const tunnel = await fastify.channels.use(messageSourceId)
await fastify.channels.subscribe(tunnel)
// Unsubscribe on a connection close.
request.raw.addListener('close', async (event) => {
await fastify.channels.unsubscribe(tunnel)
})
// Start SSE using an asynchronious iterator function.
reply.sse(consume(fastify.channels, tunnel, pingIntervalMs, lastEventId))
return reply
} catch (error) {
console.error ('SSE handler : ', error)
}
}
// ----------------------------------------------------------------------------|
// A channel consumer implementation which supports following functionality:
//
// 1. Starts a consuming from a given period in the past using 'lastEventId'.
//
// 2. Produces a 'gap' named SSE event message in a case of a missing events
// (there is a channel size overflow).
//
// 3. Produces a 'ping' named SSE event message on each blocking timeout.
// ----------------------------------------------------------------------------|
async function * consume (channel, tunnel, blockTimeOutMs, lastEventId) {
try {
let checkForGap = true
let startConsumeFromId = lastEventId
let lastTimeStampMs
let lastCounter
// Build a previous id. A format of a lastEventId is <timestamp>-<sequence>
if (typeof lastEventId !== typeof undefined) {
[lastTimeStampMs, lastCounter] = lastEventId.split('-')
lastCounter = parseInt(lastCounter)
lastTimeStampMs = parseInt(lastTimeStampMs)
if (lastCounter > 0) {
startConsumeFromId = `${lastTimeStampMs}-${lastCounter-1}`
} else {
startConsumeFromId = `${lastTimeStampMs-1}-0`
}
}
// Consume form a channel
for await (const messages of channel.consume(tunnel, undefined, undefined,
blockTimeOutMs, startConsumeFromId, true)) {
for (const i in messages) {
const id = messages[i].id || ''
// Produce a 'ping' message
if (messages[i].data === null) {
yield ({ id: id, event: 'ping', data: 'ping' })
checkForGap = false
continue
}
// Check for a gap in a message id
if (checkForGap && typeof lastEventId !== typeof undefined) {
let [timeStampMs, counter] = messages[i].id.split('-')
counter = parseInt(counter)
timeStampMs = parseInt(timeStampMs)
// Skip all older messages
if (timeStampMs < lastTimeStampMs || counter < lastCounter) {
continue
}
checkForGap = false
// Produce a 'gap' message
if (messages[i].id !== lastEventId) {
yield ({ id: messages[i].id, event: 'gap', data: 'gap' })
} else {
// Skip the message with the last event id.
continue
}
}
yield (messages[i])
checkForGap = false
}
}
} catch (error) {
console.error('consume : ', error)
throw error
}
}