Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reorder the closing of services, prevent sagas running multiple times and close backend server properly #2499

Merged
merged 13 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* Fix issues with recreating general channel when deleted while offline ([#2334](https://github.com/TryQuiet/quiet/issues/2334))
* Fix package.json license inconsistency
* Fixes issue with reconnecting to peers on resume on iOS ([#2424](https://github.com/TryQuiet/quiet/issues/2424))
* Reorder the closing of services, prevent sagas running multiple times and close backend server properly

[2.1.2]

Expand Down
11 changes: 11 additions & 0 deletions packages/backend/ipfs-pubsub-peer-monitor.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
--- node_modules/ipfs-pubsub-peer-monitor/src/ipfs-pubsub-peer-monitor.js 2024-05-08 12:44:48
+++ node_modules/ipfs-pubsub-peer-monitor/src/ipfs-pubsub-peer-monitor.backup.js 2024-05-08 12:44:25
@@ -55,7 +55,7 @@
async _pollPeers () {
try {
const peers = await this._pubsub.peers(this._topic)
- IpfsPubsubPeerMonitor._emitJoinsAndLeaves(new Set(this._peers), new Set(peers), this)
+ IpfsPubsubPeerMonitor._emitJoinsAndLeaves(new Set(this._peers.map(p => p.toString())), new Set(peers.map(p => p.toString())), this)
this._peers = peers
} catch (err) {
clearInterval(this._interval)
2 changes: 1 addition & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"build": "tsc -p tsconfig.build.json",
"webpack": "webpack --env mode=development && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"webpack:prod": "webpack --env mode=production && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"applyPatches": "patch -f -p0 < ./electron-fetch.patch || true && patch -f -p0 --forward --binary < ./parse-duration.patch || true && patch -f -p0 --forward --binary < ./parse-duration-esm.patch || true",
"applyPatches": "patch -f -p0 < ./electron-fetch.patch || true && patch -f -p0 --forward --binary < ./parse-duration.patch || true && patch -f -p0 --forward --binary < ./parse-duration-esm.patch || true && patch -f -p0 < ./ipfs-pubsub-peer-monitor.patch || true",
"prepare": "npm run applyPatches && npm run webpack",
"version": "git add -A src",
"lint:no-fix": "eslint --ext .jsx,.js,.ts,.tsx ./src/",
Expand Down
8 changes: 4 additions & 4 deletions packages/backend/src/backendManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ export const runBackendMobile = async () => {
{ logger: ['warn', 'error', 'log', 'debug', 'verbose'] }
)

rn_bridge.channel.on('close', async () => {
rn_bridge.channel.on('close', () => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
await connectionsManager.pause()
connectionsManager.pause()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for async here since nothing awaits the event handler

})

rn_bridge.channel.on('open', async (msg: OpenServices) => {
rn_bridge.channel.on('open', (msg: OpenServices) => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
const torControl = app.get<TorControl>(TorControl)
const proxyAgent = app.get<{ proxy: { port: string } }>(SOCKS_PROXY_AGENT)
Expand All @@ -123,7 +123,7 @@ export const runBackendMobile = async () => {
torControl.torControlParams.auth.value = msg.authCookie
proxyAgent.proxy.port = msg.httpTunnelPort

await connectionsManager.resume()
connectionsManager.resume()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,46 +224,44 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
}

public async closeAllServices(options: { saveTor: boolean } = { saveTor: false }) {
this.logger('Closing services')

await this.closeSocket()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close the data server first


if (this.tor && !options.saveTor) {
this.logger('Killing tor')
await this.tor.kill()
} else if (options.saveTor) {
this.logger('Saving tor')
}
if (this.storageService) {
this.logger('Stopping orbitdb')
this.logger('Stopping OrbitDB')
await this.storageService?.stopOrbitDb()
}
if (this.serverIoProvider?.io) {
this.logger('Closing socket server')
this.serverIoProvider.io.close()
}
if (this.localDbService) {
this.logger('Closing local storage')
await this.localDbService.close()
}
if (this.libp2pService) {
this.logger('Stopping libp2p')
await this.libp2pService.close()
}
if (this.localDbService) {
this.logger('Closing local DB')
await this.localDbService.close()
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-organizing. Data server should get closed first and then OrbitDB, then LibP2P


public closeSocket() {
this.serverIoProvider.io.close()
public async closeSocket() {
await this.socketService.close()
}

public async pause() {
this.logger('Pausing!')
this.logger('Closing socket!')
this.closeSocket()
await this.closeSocket()
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove the log because there is a similar log in sockerService.close, but we can keep it too, just let me know.

this.logger('Pausing libp2pService!')
this.peerInfo = await this.libp2pService?.pause()
this.logger('Found the following peer info on pause: ', this.peerInfo)
}

public async resume() {
this.logger('Resuming!')
this.logger('Reopening socket!')
await this.openSocket()
Copy link
Collaborator Author

@leblowl leblowl May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I remove the log because there is a similar log in socketService.listen. We can keep both also, just let me know.

this.logger('Attempting to redial peers!')
if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) {
Expand All @@ -289,21 +287,14 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
public async leaveCommunity(): Promise<boolean> {
this.logger('Running leaveCommunity')

this.logger('Resetting tor')
this.tor.resetHiddenServices()

this.logger('Closing the socket')
this.closeSocket()

this.logger('Purging local DB')
await this.localDbService.purge()

this.logger('Closing services')
await this.closeAllServices({ saveTor: true })

this.logger('Purging data')
await this.purgeData()

this.logger('Resetting Tor')
this.tor.resetHiddenServices()

this.logger('Resetting state')
await this.resetState()

Expand Down
2 changes: 0 additions & 2 deletions packages/backend/src/nest/libp2p/libp2p.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Module } from '@nestjs/common'
import { SocketModule } from '../socket/socket.module'
import { Libp2pService } from './libp2p.service'
import { ProcessInChunksService } from './process-in-chunks.service'

@Module({
imports: [SocketModule],
providers: [Libp2pService, ProcessInChunksService],
exports: [Libp2pService],
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class RegistrationService extends EventEmitter implements OnModuleInit {
// Get the next event.
const event = this.registrationEvents.shift()
if (event) {
this.logger('Processing registration event', event)
this.logger('Processing registration event')
// Event processing in progress
this.registrationEventInProgress = true

Expand Down
81 changes: 69 additions & 12 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import { CONFIG_OPTIONS, SERVER_IO_PROVIDER } from '../const'
import { ConfigOptions, ServerIoProviderTypes } from '../types'
import { suspendableSocketEvents } from './suspendable.events'
import Logger from '../common/logger'
import type net from 'node:net'

@Injectable()
export class SocketService extends EventEmitter implements OnModuleInit {
private readonly logger = Logger(SocketService.name)

public resolveReadyness: (value: void | PromiseLike<void>) => void
public readyness: Promise<void>
private sockets: Set<net.Socket>

constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
Expand All @@ -44,14 +46,15 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.readyness = new Promise<void>(resolve => {
this.resolveReadyness = resolve
})

this.sockets = new Set<net.Socket>()

this.attachListeners()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[FYI] attachListeners() moved from onModuleInit() to the constructor. I'm not sure what side effects this would have. Might cause attachListeners to trigger before init?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it does trigger before init and I think that's fine. I think it makes sense to put everything necessary to setup an object in the constructor and only using onModuleInit when necessary and it doesn't seem like attachListeners needs to be in onModuleInit.

}

async onModuleInit() {
this.logger('init: Started')

this.attachListeners()
await this.init()

this.logger('init: Finished')
}

Expand All @@ -71,7 +74,9 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.logger('init: Frontend connected')
}

private readonly attachListeners = (): void => {
private readonly attachListeners = () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] lost the return type

Copy link
Collaborator Author

@leblowl leblowl May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not specifying void is generally the standard TS convention

this.logger('Attaching listeners')

// Attach listeners here
this.serverIoProvider.io.on(SocketActionTypes.CONNECTION, socket => {
this.logger('Socket connection')
Expand Down Expand Up @@ -173,7 +178,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
}
)

socket.on(SocketActionTypes.LEAVE_COMMUNITY, async (callback: (closed: boolean) => void) => {
socket.on(SocketActionTypes.LEAVE_COMMUNITY, (callback: (closed: boolean) => void) => {
this.logger('Leaving community')
this.emit(SocketActionTypes.LEAVE_COMMUNITY, callback)
})
Expand All @@ -195,25 +200,77 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.emit(SocketActionTypes.LOAD_MIGRATION_DATA, data)
})
})

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
this.serverIoProvider.server.on('connection', conn => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[bug] This seems like a confusion of the server and the socket io.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in my testing never seems to be called

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets called for me. There are two different sockets, the socket.io Socket (https://socket.io/docs/v4/server-api/#socket) and the Node net.Socket. I'm really just following the advice here: socketio/socket.io#1602 and it worked for me in cleaning up the hanging connections I was seeing, so I just went with that.

this.sockets.add(conn)
conn.on('close', () => {
this.sockets.delete(conn)
})
})
}

public listen = async (port = this.configOptions.socketIOPort): Promise<void> => {
return await new Promise(resolve => {
if (this.serverIoProvider.server.listening) resolve()
public getConnections = (): Promise<number> => {
return new Promise(resolve => {
this.serverIoProvider.server.getConnections((err, count) => {
if (err) throw new Error(err.message)
resolve(count)
})
})
}

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
//
// I also tried `this.serverIoProvider.io.disconnectSockets(true)`
// which didn't work for me, but we still call it.
public closeSockets = () => {
this.logger('Disconnecting sockets')
this.serverIoProvider.io.disconnectSockets(true)
this.sockets.forEach(s => s.destroy())
}

public listen = async (): Promise<void> => {
this.logger(`Opening data server on port ${this.configOptions.socketIOPort}`)

if (this.serverIoProvider.server.listening) {
this.logger('Failed to listen. Server already listening.')
return
}

const numConnections = await this.getConnections()

if (numConnections > 0) {
this.logger('Failed to listen. Connections still open:', numConnections)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Wouldn't it be better to just clean up the existing connections and proceed with listening?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I thought about that, but there might be some trickiness with timing due to the listen and close being async. I'll have to look into it more, so just went with the simplest option for now.

return
}

return new Promise(resolve => {
this.serverIoProvider.server.listen(this.configOptions.socketIOPort, '127.0.0.1', () => {
this.logger(`Data server running on port ${this.configOptions.socketIOPort}`)
resolve()
})
})
}

public close = async (): Promise<void> => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)
return await new Promise(resolve => {
this.serverIoProvider.server.close(err => {
public close = (): Promise<void> => {
return new Promise(resolve => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)

if (!this.serverIoProvider.server.listening) {
this.logger('Data server is not running.')
resolve()
return
}

this.serverIoProvider.io.close(err => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Does this suffer from the same issue mentioned in the cleanup function stuck in attachListeners where you need to loop through each socket and disconnect? Also, does this remove all of the listeners that were attached in the attachListeners function? Could this cause a memory leak and duplication of actions if the SocketService is started back up after being close and the listeners are attached again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this doesn't appear to close all connections properly. I don't think we need to remove the listeners ever. So those just get added once in the constructor.

if (err) throw new Error(err.message)
this.logger('Data server closed')
resolve()
})

this.closeSockets()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Shouldn't we also detach listeners when closing sockets? I mentioned it before, but I'd rather see a dedicated closeSockets() and detachListeners() method.

Copy link
Collaborator Author

@leblowl leblowl May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to detach any listeners. They are just attached once and persist between opening and closing of the server.

})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class CertificatesRequestsStore extends EventEmitter {
write: ['*'],
},
})
await this.store.load()

this.store.events.on('write', async (_address, entry) => {
this.logger('Added CSR to database')
Expand All @@ -40,8 +41,6 @@ export class CertificatesRequestsStore extends EventEmitter {
this.loadedCertificateRequests()
})

// @ts-ignore
await this.store.load({ fetchEntryTimeout: 15000 })
this.logger('Initialized')
}

Expand All @@ -52,9 +51,9 @@ export class CertificatesRequestsStore extends EventEmitter {
}

public async close() {
this.logger('Closing...')
this.logger('Closing certificate requests DB')
await this.store?.close()
this.logger('Closed')
this.logger('Closed certificate requests DB')
}

public getAddress() {
Expand Down Expand Up @@ -91,8 +90,6 @@ export class CertificatesRequestsStore extends EventEmitter {

public async getCsrs() {
const filteredCsrsMap: Map<string, string> = new Map()
// @ts-expect-error - OrbitDB's type declaration of `load` lacks 'options'
await this.store.load({ fetchEntryTimeout: 15000 })
const allEntries = this.store
.iterator({ limit: -1 })
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ export class CertificatesStore extends EventEmitter {
write: ['*'],
},
})
await this.store.load()

this.store.events.on('ready', async () => {
this.logger('Loaded certificates to memory')
Expand All @@ -59,8 +58,7 @@ export class CertificatesStore extends EventEmitter {
await this.loadedCertificates()
})

// // @ts-expect-error - OrbitDB's type declaration of `load` lacks 'options'
// await this.store.load({ fetchEntryTimeout: 15000 })
await this.store.load()

this.logger('Initialized')
}
Expand All @@ -72,7 +70,9 @@ export class CertificatesStore extends EventEmitter {
}

public async close() {
this.logger('Closing certificates DB')
await this.store?.close()
this.logger('Closed certificates DB')
}

public getAddress() {
Expand Down Expand Up @@ -154,8 +154,6 @@ export class CertificatesStore extends EventEmitter {
return []
}

// @ts-expect-error - OrbitDB's type declaration of `load` lacks 'options'
await this.store.load({ fetchEntryTimeout: 15000 })
const allCertificates = this.store
.iterator({ limit: -1 })
.collect()
Expand Down
Loading
Loading