Skip to content

Commit

Permalink
[feature] Add socket.io module
Browse files Browse the repository at this point in the history
  • Loading branch information
konovalovsergey committed Oct 27, 2022
1 parent 002613c commit 09fe41a
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 22 deletions.
9 changes: 9 additions & 0 deletions Common/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,15 @@
"disable_cors": true,
"websocket": true
},
"socketio": {
"connection": {
"path": "/doc/socket.io/",
"serveClient": false,
"pingTimeout": 20000,
"pingInterval": 25000,
"maxHttpBufferSize": 1e8
}
},
"callbackBackoffOptions": {
"retries": 0,
"timeout":{
Expand Down
2 changes: 1 addition & 1 deletion Common/sources/tenantManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ function getTenant(ctx, domain) {
return tenant;
}
function getTenantByConnection(ctx, conn) {
return isMultitenantMode() ? getTenant(ctx, utils.getDomainByConnection(ctx, conn)) : getDefautTenant();
return isMultitenantMode() ? getTenant(ctx, utils.getDomainByConnection(ctx, conn.request)) : getDefautTenant();
}
function getTenantByRequest(ctx, req) {
return isMultitenantMode() ? getTenant(ctx, utils.getDomainByRequest(ctx, req)) : getDefautTenant();
Expand Down
1 change: 1 addition & 0 deletions Common/sources/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ function getBaseUrl(protocol, hostHeader, forwardedProtoHeader, forwardedHostHea
return url;
}
function getBaseUrlByConnection(conn) {
conn = conn.request;
return getBaseUrl('', conn.headers['host'], conn.headers['x-forwarded-proto'], conn.headers['x-forwarded-host'], conn.headers['x-forwarded-prefix']);
}
function getBaseUrlByRequest(req) {
Expand Down
138 changes: 138 additions & 0 deletions DocService/npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions DocService/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"pg": "^8.5.1",
"redis": "^2.8.0",
"retry": "^0.12.0",
"socket.io": "^4.5.2",
"sockjs": "^0.3.21",
"underscore": "^1.13.1",
"utf7": "^1.0.2",
Expand Down
67 changes: 46 additions & 21 deletions DocService/sources/DocsCoServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
'use strict';

const sockjs = require('sockjs');
const { Server } = require("socket.io");
const _ = require('underscore');
const url = require('url');
const os = require('os');
Expand Down Expand Up @@ -150,6 +151,7 @@ const cfgErrorFiles = configCommon.get('FileConverter.converter.errorfiles');
const cfgOpenProtectedFile = config.get('server.openProtectedFile');
const cfgRefreshLockInterval = ms(configCommon.get('wopi.refreshLockInterval'));
const cfgTokenRequiredParams = config.get('server.tokenRequiredParams');
const cfgSocketIoConnection = configCommon.get('services.CoAuthoring.socketio.connection');

const EditorTypes = {
document : 0,
Expand Down Expand Up @@ -1308,9 +1310,31 @@ function encryptPasswordParams(ctx, data) {
}
exports.encryptPasswordParams = encryptPasswordParams;
exports.install = function(server, callbackFunction) {
var sockjs_echo = sockjs.createServer(cfgSockjs);
const io = new Server(server, cfgSocketIoConnection);

sockjs_echo.on('connection', function(conn) {
io.use((socket, next) => {
co(function*(){
let ctx = new operationContext.Context();
let checkJwtRes;
try {
ctx.initFromConnection(socket);
ctx.logger.info('io.use start');
let handshake = socket.handshake;
if (cfgTokenEnableBrowser) {
checkJwtRes = yield checkJwt(ctx, handshake?.auth?.token, commonDefines.c_oAscSecretType.Browser);
}
} catch (err) {
ctx.logger.info('io.use error: %s', err.stack);
} finally {
ctx.logger.info('io.use end');
next(checkJwtRes.decoded ? undefined : new Error("not authorized"));
}


});
});

io.on('connection', function(conn) {
if (!conn) {
operationContext.global.logger.error("null == conn");
return;
Expand All @@ -1325,7 +1349,7 @@ exports.install = function(server, callbackFunction) {
conn.sessionIsSendWarning = false;
conn.sessionTimeConnect = conn.sessionTimeLastAction = new Date().getTime();

conn.on('data', function(message) {
conn.on('message', function(data) {
return co(function* () {
var docId = 'null';
let ctx = new operationContext.Context();
Expand All @@ -1336,7 +1360,6 @@ exports.install = function(server, callbackFunction) {
startDate = new Date();
}

var data = JSON.parse(message);
docId = conn.docId;
ctx.logger.info('data.type = %s', data.type);
if(getIsShutdown())
Expand Down Expand Up @@ -1392,7 +1415,7 @@ exports.install = function(server, callbackFunction) {
yield* checkEndAuthLock(ctx, data.unlock, data.isSave, docId, conn.user.id, data.releaseLocks, data.deleteIndex, conn);
break;
case 'close':
yield* closeDocument(ctx, conn, false);
yield* closeDocument(ctx, conn);
break;
case 'versionHistory' : {
let cmd = new commonDefines.InputCommand(data.cmd);
Expand Down Expand Up @@ -1433,7 +1456,7 @@ exports.install = function(server, callbackFunction) {
delete conn.authChangesAck;
break;
default:
ctx.logger.debug("unknown command %s", message);
ctx.logger.debug("unknown command %d", data);
break;
}
if(clientStatsD) {
Expand All @@ -1446,17 +1469,12 @@ exports.install = function(server, callbackFunction) {
}
});
});
conn.on('error', function() {
let ctx = new operationContext.Context();
ctx.initFromConnection(conn);
ctx.logger.error("On error");
});
conn.on('close', function() {
conn.on("disconnect", function(reason) {
return co(function* () {
let ctx = new operationContext.Context();
try {
ctx.initFromConnection(conn);
yield* closeDocument(ctx, conn, true);
yield* closeDocument(ctx, conn, reason);
} catch (err) {
ctx.logger.error('Error conn close: %s', err.stack);
}
Expand All @@ -1465,12 +1483,19 @@ exports.install = function(server, callbackFunction) {

_checkLicense(ctx, conn);
});
io.engine.on("connection_error", (err) => {
console.log(err.req); // the request object
console.log(err.code); // the error code, for example 1
console.log(err.message); // the error message, for example "Session ID unknown"
console.log(err.context); // some additional error context
});
/**
*
* @param ctx
* @param conn
* @param isCloseConnection - закрываем ли мы окончательно соединение
* @param reason - the reason of the disconnection (either client or server-side)
*/
function* closeDocument(ctx, conn, isCloseConnection) {
function* closeDocument(ctx, conn, reason) {
var userLocks, reconnected = false, bHasEditors, bHasChanges;
var docId = conn.docId;
if (null == docId) {
Expand All @@ -1480,9 +1505,9 @@ exports.install = function(server, callbackFunction) {
let participantsTimestamp;
var tmpUser = conn.user;
var isView = tmpUser.view;
ctx.logger.info("Connection closed or timed out: isCloseConnection = %s", isCloseConnection);
ctx.logger.info("Connection closed or timed out: reason = %s", reason);
var isCloseCoAuthoringTmp = conn.isCloseCoAuthoring;
if (isCloseConnection) {
if (reason) {
//Notify that participant has gone
connections = _.reject(connections, function(el) {
return el.id === conn.id;//Delete this connection
Expand Down Expand Up @@ -3145,10 +3170,10 @@ exports.install = function(server, callbackFunction) {
return licenseType;
}

sockjs_echo.installHandlers(server, {prefix: '/doc/['+constants.DOC_ID_PATTERN+']*/c', log: function(severity, message) {
//TODO: handle severity
operationContext.global.logger.info(message);
}});
// sockjs_echo.installHandlers(server, {prefix: '/doc/['+constants.DOC_ID_PATTERN+']*/c', log: function(severity, message) {
// //TODO: handle severity
// operationContext.global.logger.info(message);
// }});

//publish subscribe message brocker
function pubsubOnMessage(msg) {
Expand Down

0 comments on commit 09fe41a

Please sign in to comment.