Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Commit

Permalink
Properly handle ws error
Browse files Browse the repository at this point in the history
  • Loading branch information
curtgrimes committed Jun 13, 2021
1 parent 8065fc6 commit 6c6b43a
Showing 1 changed file with 63 additions and 46 deletions.
109 changes: 63 additions & 46 deletions app/socket.io/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ module.exports = {
server,
maxPayload: 1024 * 100, // bytes
});

wss.on('connection', (socket) => {
socket.on('error', (error) => {
console.error('ws error:', error);
});

let redisStandaloneClient;
socket._wc = {};

Expand All @@ -31,30 +36,37 @@ module.exports = {

if (json.action === 'authenticateRoomOwner') {
const roomKey = 'rooms:' + json.roomId;
const ownerKeyForRoom = await redisSharedClient.hgetAsync(roomKey, 'ownerKey');
const ownerKeyForRoom = await redisSharedClient.hgetAsync(
roomKey,
'ownerKey'
);

if (!ownerKeyForRoom) {
// That room ID doesn't exist (or for some reason it doesn't have an owner key)
} else if (ownerKeyForRoom === json.ownerKey) {
// Successfully authenticated
socket._wc.room = {
ownerRoomKey: roomKey,
}
};
redisStandaloneClient = redis.getNewClient();
redisStandaloneClient.subscribe(roomKey + ':owner');

try {
socket.send(JSON.stringify({
mutation: 'SET_SHARE_SUBSCRIBER_COUNT',
subscriberCount: await getSubscriberCount(roomKey),
}));
}
catch (e) {
socket.send(
JSON.stringify({
mutation: 'SET_SHARE_SUBSCRIBER_COUNT',
subscriberCount: await getSubscriberCount(roomKey),
})
);
} catch (e) {
console.error('Socket send error', e);
}

redisStandaloneClient.on("message", async (channel, message) => {
if (message === 'updateSubscribers' && socket.readyState === socket.OPEN) {
redisStandaloneClient.on('message', async (channel, message) => {
if (
message === 'updateSubscribers' &&
socket.readyState === socket.OPEN
) {
try {
socket.send(
JSON.stringify({
Expand All @@ -73,15 +85,17 @@ module.exports = {
} else if (json.action === 'mutation') {
// If a roomKey is set, they've authenticated successfully before
if (socket._wc.room && socket._wc.room.ownerRoomKey) {
redisSharedClient.publish(socket._wc.room.ownerRoomKey, JSON.stringify({
mutation: json.mutation,
payload: json.payload,
}));
redisSharedClient.publish(
socket._wc.room.ownerRoomKey,
JSON.stringify({
mutation: json.mutation,
payload: json.payload,
})
);
} else {
// Haven't authenticated successfully
}
} else if (json.action == 'updateAppearance') {

// The broadcaster has an updated appearance object to save
// for this room.
try {
Expand All @@ -104,7 +118,7 @@ module.exports = {
const {
roomId,
s: stealth,
broadcast: wantsAppearanceUpdates
broadcast: wantsAppearanceUpdates,
} = json;

const roomKey = 'rooms:' + roomId;
Expand All @@ -115,36 +129,36 @@ module.exports = {
subscriberRoomKey: roomKey,
stealth: Boolean(stealth),
wantsAppearanceUpdates: Boolean(wantsAppearanceUpdates),
}
};

if (stealth) {
redisSharedClient.hincrby(socket._wc.room.subscriberRoomKey, 'stealthSubscribers', 1);
redisSharedClient.hincrby(
socket._wc.room.subscriberRoomKey,
'stealthSubscribers',
1
);
}

redisStandaloneClient.subscribe(roomKey);
redisSharedClient.publish(roomKey + ':owner', 'updateSubscribers');
redisStandaloneClient.on("message", (channel, message) => {
redisStandaloneClient.on('message', (channel, message) => {
if (message !== 'updateSubscribers') {
try {
let {
mutation,
payload
} = JSON.parse(message);
let { mutation, payload } = JSON.parse(message);
if (
socket.readyState === socket.OPEN &&
(
// we want to pass on all captioner events
mutation.startsWith('captioner/')

// we want to pass on all captioner events
(mutation.startsWith('captioner/') ||
// pass on appearance events only if this client wants them
||
!mutation.startsWith('captioner/') && socket._wc.room.wantsAppearanceUpdates
)
(!mutation.startsWith('captioner/') &&
socket._wc.room.wantsAppearanceUpdates))
) {
socket.send(JSON.stringify({
mutation,
...payload,
}));
socket.send(
JSON.stringify({
mutation,
...payload,
})
);
} else {
// TODO unsubscribe
}
Expand All @@ -156,27 +170,24 @@ module.exports = {
});
}
} else if (json.action === 'callWebhook') {
let {
method,
url,
transcript
} = json;
let { method, url, transcript } = json;
method = method === 'PUT' ? 'PUT' : 'POST';

if (!socket._wc.webhookTranscriptQueue) {
socket._wc.webhookTranscriptQueue = '';
}

socket._wc.webhookTranscriptQueue += (socket._wc.webhookTranscriptQueue ? ' ' : '') + transcript;
socket._wc.webhookTranscriptQueue +=
(socket._wc.webhookTranscriptQueue ? ' ' : '') + transcript;

if (!socket._wc.webhookThrottleInterval) {
socket._wc.webhookThrottleInterval = setInterval(function () {
socket._wc.webhookThrottleInterval = setInterval(function() {
axios({
method,
url,
data: {
transcript: socket._wc.webhookTranscriptQueue,
}
},
});
socket._wc.webhookTranscriptQueue = '';

Expand All @@ -187,13 +198,19 @@ module.exports = {
}
});

socket.on('close', function () {
socket.on('close', function() {
if (socket._wc.room && socket._wc.room.subscriberRoomKey) {
redisSharedClient.publish(socket._wc.room.subscriberRoomKey + ':owner', 'updateSubscribers');

redisSharedClient.publish(
socket._wc.room.subscriberRoomKey + ':owner',
'updateSubscribers'
);

if (socket._wc.room.stealth) {
redisSharedClient.hincrby(socket._wc.room.subscriberRoomKey, 'stealthSubscribers', -1);
redisSharedClient.hincrby(
socket._wc.room.subscriberRoomKey,
'stealthSubscribers',
-1
);
}
}

Expand Down

0 comments on commit 6c6b43a

Please sign in to comment.