Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reorder the closing of services, prevent sagas running multiple times and close backend server properly #2499

Merged
merged 13 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/backend/ipfs-pubsub-peer-monitor.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
--- node_modules/ipfs-pubsub-peer-monitor/src/ipfs-pubsub-peer-monitor.js 2024-05-08 12:44:48
+++ node_modules/ipfs-pubsub-peer-monitor/src/ipfs-pubsub-peer-monitor.backup.js 2024-05-08 12:44:25
@@ -55,7 +55,7 @@
async _pollPeers () {
try {
const peers = await this._pubsub.peers(this._topic)
- IpfsPubsubPeerMonitor._emitJoinsAndLeaves(new Set(this._peers), new Set(peers), this)
+ IpfsPubsubPeerMonitor._emitJoinsAndLeaves(new Set(this._peers.map(p => p.toString())), new Set(peers.map(p => p.toString())), this)
this._peers = peers
} catch (err) {
clearInterval(this._interval)
2 changes: 1 addition & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"build": "tsc -p tsconfig.build.json",
"webpack": "webpack --env mode=development && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"webpack:prod": "webpack --env mode=production && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"applyPatches": "patch -f -p0 < ./electron-fetch.patch || true && patch -f -p0 --forward --binary < ./parse-duration.patch || true && patch -f -p0 --forward --binary < ./parse-duration-esm.patch || true",
"applyPatches": "patch -f -p0 < ./electron-fetch.patch || true && patch -f -p0 --forward --binary < ./parse-duration.patch || true && patch -f -p0 --forward --binary < ./parse-duration-esm.patch || true && patch -f -p0 < ./ipfs-pubsub-peer-monitor.patch",
"prepare": "npm run applyPatches && npm run webpack",
"version": "git add -A src",
"lint:no-fix": "eslint --ext .jsx,.js,.ts,.tsx ./src/",
Expand Down
8 changes: 4 additions & 4 deletions packages/backend/src/backendManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ export const runBackendMobile = async () => {
{ logger: ['warn', 'error', 'log', 'debug', 'verbose'] }
)

rn_bridge.channel.on('close', async () => {
rn_bridge.channel.on('close', () => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
await connectionsManager.pause()
connectionsManager.pause()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for async here since nothing awaits the event handler

})

rn_bridge.channel.on('open', async (msg: OpenServices) => {
rn_bridge.channel.on('open', (msg: OpenServices) => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
const torControl = app.get<TorControl>(TorControl)
const proxyAgent = app.get<{ proxy: { port: string } }>(SOCKS_PROXY_AGENT)
Expand All @@ -123,7 +123,7 @@ export const runBackendMobile = async () => {
torControl.torControlParams.auth.value = msg.authCookie
proxyAgent.proxy.port = msg.httpTunnelPort

await connectionsManager.resume()
connectionsManager.resume()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,46 +224,44 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
}

public async closeAllServices(options: { saveTor: boolean } = { saveTor: false }) {
this.logger('Closing services')

await this.closeSocket()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close the data server first


if (this.tor && !options.saveTor) {
this.logger('Killing tor')
await this.tor.kill()
} else if (options.saveTor) {
this.logger('Saving tor')
}
if (this.storageService) {
this.logger('Stopping orbitdb')
this.logger('Stopping OrbitDB')
await this.storageService?.stopOrbitDb()
}
if (this.serverIoProvider?.io) {
this.logger('Closing socket server')
this.serverIoProvider.io.close()
}
if (this.localDbService) {
this.logger('Closing local storage')
await this.localDbService.close()
}
if (this.libp2pService) {
this.logger('Stopping libp2p')
await this.libp2pService.close()
}
if (this.localDbService) {
this.logger('Closing local DB')
await this.localDbService.close()
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-organizing. Data server should get closed first and then OrbitDB, then LibP2P


public closeSocket() {
this.serverIoProvider.io.close()
public async closeSocket() {
await this.socketService.close()
}

public async pause() {
this.logger('Pausing!')
this.logger('Closing socket!')
this.closeSocket()
await this.closeSocket()
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove the log because there is a similar log in sockerService.close, but we can keep it too, just let me know.

this.logger('Pausing libp2pService!')
this.peerInfo = await this.libp2pService?.pause()
this.logger('Found the following peer info on pause: ', this.peerInfo)
}

public async resume() {
this.logger('Resuming!')
this.logger('Reopening socket!')
await this.openSocket()
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I remove the log because there is a similar log in socketService.listen. We can keep both also, just let me know.

this.logger('Attempting to redial peers!')
if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) {
Expand All @@ -289,21 +287,14 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
public async leaveCommunity(): Promise<boolean> {
this.logger('Running leaveCommunity')

this.logger('Resetting tor')
this.tor.resetHiddenServices()

this.logger('Closing the socket')
this.closeSocket()

this.logger('Purging local DB')
await this.localDbService.purge()

this.logger('Closing services')
await this.closeAllServices({ saveTor: true })

this.logger('Purging data')
await this.purgeData()

this.logger('Resetting Tor')
this.tor.resetHiddenServices()

this.logger('Resetting state')
await this.resetState()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class RegistrationService extends EventEmitter implements OnModuleInit {
// Get the next event.
const event = this.registrationEvents.shift()
if (event) {
this.logger('Processing registration event', event)
this.logger('Processing registration event')
// Event processing in progress
this.registrationEventInProgress = true

Expand Down
77 changes: 67 additions & 10 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import { CONFIG_OPTIONS, SERVER_IO_PROVIDER } from '../const'
import { ConfigOptions, ServerIoProviderTypes } from '../types'
import { suspendableSocketEvents } from './suspendable.events'
import Logger from '../common/logger'
import type net from 'node:net'

@Injectable()
export class SocketService extends EventEmitter implements OnModuleInit {
private readonly logger = Logger(SocketService.name)

public resolveReadyness: (value: void | PromiseLike<void>) => void
public readyness: Promise<void>
private sockets: Set<net.Socket>

constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
Expand All @@ -44,12 +46,15 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.readyness = new Promise<void>(resolve => {
this.resolveReadyness = resolve
})

this.sockets = new Set<net.Socket>()

this.attachListeners()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[FYI] attachListeners() moved from onModuleInit() to the constructor. I'm not sure what side effects this would have. Might cause attachListeners to trigger before init?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it does trigger before init and I think that's fine. I think it makes sense to put everything necessary to setup an object in the constructor and only using onModuleInit when necessary and it doesn't seem like attachListeners needs to be in onModuleInit.

}

async onModuleInit() {
this.logger('init: Started')

this.attachListeners()
await this.init()

this.logger('init: Finished')
Expand All @@ -71,7 +76,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.logger('init: Frontend connected')
}

private readonly attachListeners = (): void => {
private readonly attachListeners = () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] lost the return type

Copy link
Collaborator Author

@leblowl leblowl May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not specifying void is generally the standard TS convention

// Attach listeners here
this.serverIoProvider.io.on(SocketActionTypes.CONNECTION, socket => {
this.logger('Socket connection')
Expand Down Expand Up @@ -173,7 +178,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
}
)

socket.on(SocketActionTypes.LEAVE_COMMUNITY, async (callback: (closed: boolean) => void) => {
socket.on(SocketActionTypes.LEAVE_COMMUNITY, (callback: (closed: boolean) => void) => {
this.logger('Leaving community')
this.emit(SocketActionTypes.LEAVE_COMMUNITY, callback)
})
Expand All @@ -195,25 +200,77 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.emit(SocketActionTypes.LOAD_MIGRATION_DATA, data)
})
})

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
//
// I also tried `this.serverIoProvider.io.disconnectSockets(true)`
// which didn't work for me.
this.serverIoProvider.server.on('connection', conn => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[bug] This seems like a confusion of the server and the socket io.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in my testing never seems to be called

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets called for me. There are two different sockets, the socket.io Socket (https://socket.io/docs/v4/server-api/#socket) and the Node net.Socket. I'm really just following the advice here: socketio/socket.io#1602 and it worked for me in cleaning up the hanging connections I was seeing, so I just went with that.

this.sockets.add(conn)
conn.on('close', () => {
this.sockets.delete(conn)
})
})
}

public getConnections = (): Promise<number> => {
return new Promise(resolve => {
this.serverIoProvider.server.getConnections((err, count) => {
if (err) throw new Error(err.message)
resolve(count)
})
})
}

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
public closeSockets = () => {
this.logger('Disconnecting sockets')
this.sockets.forEach(s => s.destroy())
}

public listen = async (port = this.configOptions.socketIOPort): Promise<void> => {
return await new Promise(resolve => {
if (this.serverIoProvider.server.listening) resolve()
public listen = async (): Promise<void> => {
this.logger(`Opening data server on port ${this.configOptions.socketIOPort}`)

if (this.serverIoProvider.server.listening) {
this.logger('Failed to listen. Server already listening.')
return
}

const numConnections = await this.getConnections()

if (numConnections > 0) {
this.logger('Failed to listen. Connections still open:', numConnections)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Wouldn't it be better to just clean up the existing connections and proceed with listening?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I thought about that, but there might be some trickiness with timing due to the listen and close being async. I'll have to look into it more, so just went with the simplest option for now.

return
}

return new Promise(resolve => {
this.serverIoProvider.server.listen(this.configOptions.socketIOPort, '127.0.0.1', () => {
this.logger(`Data server running on port ${this.configOptions.socketIOPort}`)
resolve()
})
})
}

public close = async (): Promise<void> => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)
return await new Promise(resolve => {
this.serverIoProvider.server.close(err => {
public close = (): Promise<void> => {
return new Promise(resolve => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)

if (!this.serverIoProvider.server.listening) {
this.logger('Data server is not running.')
resolve()
return
}

this.serverIoProvider.io.close(err => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Does this suffer from the same issue mentioned in the cleanup function stuck in attachListeners where you need to loop through each socket and disconnect? Also, does this remove all of the listeners that were attached in the attachListeners function? Could this cause a memory leak and duplication of actions if the SocketService is started back up after being close and the listeners are attached again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this doesn't appear to close all connections properly. I don't think we need to remove the listeners ever. So those just get added once in the constructor.

if (err) throw new Error(err.message)
this.logger('Data server closed')
resolve()
})

this.serverIoProvider.io.disconnectSockets(true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] I would put this in closeSockets, but mostly inconsequential.

this.closeSockets()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Shouldn't we also detach listeners when closing sockets? I mentioned it before, but I'd rather see a dedicated closeSockets() and detachListeners() method.

Copy link
Collaborator Author

@leblowl leblowl May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to detach any listeners. They are just attached once and persist between opening and closing of the server.

})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class CertificatesRequestsStore extends EventEmitter {
write: ['*'],
},
})
await this.store.load()

this.store.events.on('write', async (_address, entry) => {
this.logger('Added CSR to database')
Expand All @@ -40,8 +41,9 @@ export class CertificatesRequestsStore extends EventEmitter {
this.loadedCertificateRequests()
})

// @ts-ignore
await this.store.load({ fetchEntryTimeout: 15000 })
// // @ts-ignore
// await this.store.load({ fetchEntryTimeout: 15000 })

this.logger('Initialized')
}

Expand All @@ -52,9 +54,9 @@ export class CertificatesRequestsStore extends EventEmitter {
}

public async close() {
this.logger('Closing...')
this.logger('Closing certificate requests DB')
await this.store?.close()
this.logger('Closed')
this.logger('Closed certificate requests DB')
}

public getAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ export class CertificatesStore extends EventEmitter {
}

public async close() {
this.logger('Closing certificates DB')
await this.store?.close()
this.logger('Closed certificates DB')
}

public getAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ export class CommunityMetadataStore extends EventEmitter {
}

public async close() {
logger('Closing community metadata DB')
await this.store?.close()
logger('Closed community metadata DB')
}

public async updateCommunityMetadata(newMeta: CommunityMetadata): Promise<CommunityMetadata | undefined> {
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/nest/storage/storage.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ export class StorageService extends EventEmitter {

public async stopOrbitDb() {
try {
this.logger('Closing channels DB')
await this.channels?.close()
this.logger('Closed channels DB')
} catch (e) {
this.logger.error('Error closing channels db', e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ export class UserProfileStore extends EventEmitter {
}

public async close() {
logger('Closing user profile DB')
await this.store?.close()
logger('Closed user profile DB')
}

public async addUserProfile(userProfile: UserProfile) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
import { io } from 'socket.io-client'
import { select, put, call, cancel, fork, takeEvery, FixedTask, delay, apply, putResolve } from 'typed-redux-saga'
import {
select,
put,
putResolve,
call,
cancel,
fork,
take,
takeLeading,
takeEvery,
FixedTask,
delay,
apply,
} from 'typed-redux-saga'
import { PayloadAction } from '@reduxjs/toolkit'
import { socket as stateManager, Socket } from '@quiet/state-manager'
import { encodeSecret } from '@quiet/common'
Expand Down Expand Up @@ -49,17 +62,20 @@ export function* startConnectionSaga(
})
yield* fork(handleSocketLifecycleActions, socket, action.payload)
// Handle opening/restoring connection
yield* takeEvery(initActions.setWebsocketConnected, setConnectedSaga, socket)
yield* takeLeading(initActions.setWebsocketConnected, setConnectedSaga, socket)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only needs to happen in sequence

}

function* setConnectedSaga(socket: Socket): Generator {
console.log('Frontend is ready. Forking state-manager sagas and starting backend...')

const task = yield* fork(stateManager.useIO, socket)
console.log('WEBSOCKET', 'Forking state-manager sagas', task)
// Handle suspending current connection
yield* takeEvery(initActions.suspendWebsocketConnection, cancelRootTaskSaga, task)
console.log('Frontend is ready. Starting backend...')

// @ts-ignore - Why is this broken?
yield* apply(socket, socket.emit, [SocketActionTypes.START])

// Handle suspending current connection
const suspendAction = yield* take(initActions.suspendWebsocketConnection)
yield* call(cancelRootTaskSaga, task, suspendAction)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only needs to happen once (takeEvery is not necessary and causes issues)

}

function* handleSocketLifecycleActions(socket: Socket, socketIOData: WebsocketConnectionPayload): Generator {
Expand Down
Loading
Loading