Skip to content

Commit

Permalink
Fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas Leblow committed May 8, 2024
1 parent cec2c0d commit de2c55a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
}
}

public async closeAllServices(
options: { saveTor: boolean; purgeLocalDb: boolean } = { saveTor: false, purgeLocalDb: false }
) {
public async closeAllServices(options: { saveTor: boolean } = { saveTor: false }) {
this.logger('Closing services')

await this.closeSocket()
Expand All @@ -245,10 +243,6 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
await this.libp2pService.close()
}
if (this.localDbService) {
if (options.purgeLocalDb) {
this.logger('Purging local DB')
await this.localDbService.purge()
}
this.logger('Closing local DB')
await this.localDbService.close()
}
Expand Down Expand Up @@ -293,7 +287,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
public async leaveCommunity(): Promise<boolean> {
this.logger('Running leaveCommunity')

await this.closeAllServices({ saveTor: true, purgeLocalDb: true })
await this.closeAllServices({ saveTor: true })

this.logger('Purging data')
await this.purgeData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class RegistrationService extends EventEmitter implements OnModuleInit {
// Get the next event.
const event = this.registrationEvents.shift()
if (event) {
this.logger('Processing registration event', event)
this.logger('Processing registration event')
// Event processing in progress
this.registrationEventInProgress = true

Expand Down
50 changes: 27 additions & 23 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import { CONFIG_OPTIONS, SERVER_IO_PROVIDER } from '../const'
import { ConfigOptions, ServerIoProviderTypes } from '../types'
import { suspendableSocketEvents } from './suspendable.events'
import Logger from '../common/logger'
import { sleep } from '../common/sleep'
import type net from 'node:net'

@Injectable()
Expand All @@ -36,8 +35,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {

public resolveReadyness: (value: void | PromiseLike<void>) => void
public readyness: Promise<void>
private listening: boolean
private closeSockets: () => void
private sockets: Set<net.Socket>

constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
Expand All @@ -49,8 +47,9 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.resolveReadyness = resolve
})

this.listening = false
this.closeSockets = this.attachListeners()
this.sockets = new Set<net.Socket>()

this.attachListeners()
}

async onModuleInit() {
Expand All @@ -77,7 +76,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.logger('init: Frontend connected')
}

private readonly attachListeners = (): (() => void) => {
private readonly attachListeners = () => {
// Attach listeners here
this.serverIoProvider.io.on(SocketActionTypes.CONNECTION, socket => {
this.logger('Socket connection')
Expand Down Expand Up @@ -179,7 +178,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
}
)

socket.on(SocketActionTypes.LEAVE_COMMUNITY, async (callback: (closed: boolean) => void) => {
socket.on(SocketActionTypes.LEAVE_COMMUNITY, (callback: (closed: boolean) => void) => {
this.logger('Leaving community')
this.emit(SocketActionTypes.LEAVE_COMMUNITY, callback)
})
Expand Down Expand Up @@ -207,16 +206,12 @@ export class SocketService extends EventEmitter implements OnModuleInit {
//
// I also tried `this.serverIoProvider.io.disconnectSockets(true)`
// which didn't work for me.
const sockets = new Set<net.Socket>()

this.serverIoProvider.server.on('connection', conn => {
sockets.add(conn)
this.sockets.add(conn)
conn.on('close', () => {
sockets.delete(conn)
this.sockets.delete(conn)
})
})

return () => sockets.forEach(s => s.destroy())
}

public getConnections = (): Promise<number> => {
Expand All @@ -228,22 +223,31 @@ export class SocketService extends EventEmitter implements OnModuleInit {
})
}

public listen = async (port = this.configOptions.socketIOPort): Promise<void> => {
// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
public closeSockets = () => {
this.logger('Disconnecting sockets')
this.sockets.forEach(s => s.destroy())
}

public listen = async (): Promise<void> => {
this.logger(`Opening data server on port ${this.configOptions.socketIOPort}`)

// Sometimes socket.io closes the HTTP server but doesn't close
// all underlying connections. So it doesn't appear that
// `this.serverIoProvider.server.listening` is sufficient.
if (this.listening) {
const numConnections = await this.getConnections()
if (this.serverIoProvider.server.listening) {
this.logger('Failed to listen. Server already listening.')
return
}

const numConnections = await this.getConnections()

if (numConnections > 0) {
this.logger('Failed to listen. Connections still open:', numConnections)
return
}

return new Promise(resolve => {
this.serverIoProvider.server.listen(this.configOptions.socketIOPort, '127.0.0.1', () => {
this.logger(`Data server running on port ${this.configOptions.socketIOPort}`)
this.listening = true
resolve()
})
})
Expand All @@ -253,7 +257,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
return new Promise(resolve => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)

if (!this.listening) {
if (!this.serverIoProvider.server.listening) {
this.logger('Data server is not running.')
resolve()
return
Expand All @@ -262,10 +266,10 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.serverIoProvider.io.close(err => {
if (err) throw new Error(err.message)
this.logger('Data server closed')
this.listening = false
resolve()
})
this.logger('Disconnecting sockets')

this.serverIoProvider.io.disconnectSockets(true)
this.closeSockets()
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class CertificatesRequestsStore extends EventEmitter {
write: ['*'],
},
})
await this.store.load()

this.store.events.on('write', async (_address, entry) => {
this.logger('Added CSR to database')
Expand All @@ -40,8 +41,9 @@ export class CertificatesRequestsStore extends EventEmitter {
this.loadedCertificateRequests()
})

// @ts-ignore
await this.store.load({ fetchEntryTimeout: 15000 })
// // @ts-ignore
// await this.store.load({ fetchEntryTimeout: 15000 })

this.logger('Initialized')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ export function* handleActions(socket: Socket): Generator {
try {
const socketChannel = yield* call(subscribe, socket)
yield takeEvery(socketChannel, function* (action) {
console.log('handleActions PUT', action)
yield put(action)
})
} finally {
Expand Down

0 comments on commit de2c55a

Please sign in to comment.