Skip to content

Commit

Permalink
refactor: Refactor registration service concurrency
Browse files Browse the repository at this point in the history
Refactor the registration service so that we can more easily verify that it
does not register duplicate certs. Related to #2155
  • Loading branch information
Lucas Leblow committed Jan 31, 2024
1 parent 5f3ada8 commit 11ae203
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 271 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

# Refactorings:

* Refactor registration service, replace promise waiting mechanism around certificate requests and help prevent duplicate username registration
* Removed SAVE_OWNER_CERTIFICATE event.
* Removed registrar reminders and rename LAUNCH_REGISTRAR.
* Removed unused SEND_USER_CERTIFICATE event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import {
PeerId as PeerIdType,
SaveCSRPayload,
CommunityMetadata,
PermsData,
type PermsData,
type UserProfile,
type UserProfilesLoadedEvent,
} from '@quiet/types'
Expand Down Expand Up @@ -177,8 +177,6 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
if (this.storageService) {
this.logger('Stopping orbitdb')
await this.storageService?.stopOrbitDb()
this.logger('reset CsrReplicated map and id and certificate store values')
this.storageService.resetCsrAndCertsValues()
}
if (this.serverIoProvider?.io) {
this.logger('Closing socket server')
Expand Down Expand Up @@ -423,6 +421,12 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.serverIoProvider.io.emit(SocketActionTypes.PEER_DISCONNECTED, payload)
})
await this.storageService.init(_peerId)
// We can use Nest for dependency injection, but I think since the
// registration service depends on the storage service being
// initialized, this is helpful to manually inject the storage
// service for now. Both object construction and object
// initialization need to happen in order based on dependencies.
await this.registrationService.init(this.storageService)
this.logger('storage initialized')

this.serverIoProvider.io.emit(
Expand All @@ -449,15 +453,6 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.registrationService.on(RegistrationEvents.ERROR, payload => {
emitError(this.serverIoProvider.io, payload)
})
this.registrationService.on(RegistrationEvents.NEW_USER, async payload => {
await this.storageService?.saveCertificate(payload)
})

this.registrationService.on(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, payload => {
if (payload.id) {
this.storageService.resolveCsrReplicatedPromise(payload.id)
}
})
}

private attachSocketServiceListeners() {
Expand All @@ -473,7 +468,9 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
SocketActionTypes.CONNECTED_PEERS,
Array.from(this.libp2pService.connectedPeers.keys())
)
await this.storageService?.loadAllCertificates()
this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CERTIFICATES, {
certificates: await this.storageService?.loadAllCertificates(),
})
await this.storageService?.loadAllChannels()
}
})
Expand Down Expand Up @@ -511,7 +508,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
// when creating the community.
this.socketService.on(SocketActionTypes.SEND_COMMUNITY_CA_DATA, async (payload: PermsData) => {
this.logger(`socketService - ${SocketActionTypes.SEND_COMMUNITY_CA_DATA}`)
this.registrationService.permsData = payload
this.registrationService.setPermsData(payload)
})

// Public Channels
Expand Down Expand Up @@ -626,15 +623,12 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.logger(`Storage - ${StorageEvents.CHANNEL_DELETION_RESPONSE}`)
this.serverIoProvider.io.emit(SocketActionTypes.CHANNEL_DELETION_RESPONSE, payload)
})
this.storageService.on(
StorageEvents.REPLICATED_CSR,
async (payload: { csrs: string[]; certificates: string[]; id: string }) => {
this.logger(`Storage - ${StorageEvents.REPLICATED_CSR}`)
this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs))
this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs })
this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload)
}
)
this.storageService.on(StorageEvents.REPLICATED_CSR, async (payload: { csrs: string[] }) => {
this.logger(`Storage - ${StorageEvents.REPLICATED_CSR}`)
this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs))
this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, payload)
this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload)
})

this.socketService.on(SocketActionTypes.SEND_COMMUNITY_METADATA, async (payload: CommunityMetadata) => {
await this.storageService?.updateCommunityMetadata(payload)
Expand Down
60 changes: 48 additions & 12 deletions packages/backend/src/nest/registration/registration.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ import { RegistrationModule } from './registration.module'
import { RegistrationService } from './registration.service'
import { configCrypto, createRootCA, createUserCsr, type RootCA, verifyUserCert, type UserCsr } from '@quiet/identity'
import { type DirResult } from 'tmp'
import { type PermsData } from '@quiet/types'
import { type PermsData, type SaveCertificatePayload } from '@quiet/types'
import { Time } from 'pkijs'
import { issueCertificate, extractPendingCsrs } from './registration.functions'
import { jest } from '@jest/globals'
import { createTmpDir } from '../common/utils'
import { RegistrationEvents } from './registration.types'
import { EventEmitter } from 'events'
import { CertificatesStore } from '../storage/certificates/certificates.store'
import { StorageService } from '../storage/storage.service'
import { StorageModule } from '../storage/storage.module'
import { OrbitDb } from '../storage/orbitDb/orbitDb.service'
import { create } from 'ipfs-core'
import PeerId from 'peer-id'

describe('RegistrationService', () => {
let module: TestingModule
Expand Down Expand Up @@ -148,10 +155,34 @@ describe('RegistrationService', () => {
expect(pendingCsrs[0]).toBe(userCsr.userCsr)
})

it('wait for all NEW_USER events until emitting FINISHED_ISSUING_CERTIFICATES_FOR_ID', async () => {
registrationService.permsData = permsData
it('only issues one group of certs at a time', async () => {
const storageModule = await Test.createTestingModule({
imports: [TestModule, StorageModule],
}).compile()
const certificatesStore = await storageModule.resolve(CertificatesStore)
const orbitDb = await storageModule.resolve(OrbitDb)
const peerId = await PeerId.create()
const ipfs = await create()
const loadAllCertificates = async () => {
return await certificatesStore.loadAllCertificates()
}
const saveCertificate = async (payload: SaveCertificatePayload) => {
await certificatesStore.addCertificate(payload.certificate)
}

const eventSpy = jest.spyOn(registrationService, 'emit')
await orbitDb.create(peerId, ipfs)
await certificatesStore.init(new EventEmitter())
certificatesStore.updateMetadata({
id: '39F7485441861F4A2A1A512188F1E0AA',
rootCa:
'MIIBUDCB+KADAgECAgEBMAoGCCqGSM49BAMCMBIxEDAOBgNVBAMTB3JvY2tldHMwHhcNMTAxMjI4MTAxMDEwWhcNMzAxMjI4MTAxMDEwWjASMRAwDgYDVQQDEwdyb2NrZXRzMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/ESHf6rXksyiuxSKpQgtiSAhVWNtx4vbFgW6knWfH7MR4dPyxiCNgSeCzRfreuhqVpVtv3U49tcwsqDGkoWHsKM/MD0wDwYDVR0TBAgwBgEB/wIBAzALBgNVHQ8EBAMCAIYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMAoGCCqGSM49BAMCA0cAMEQCIHrYMhgU/RluSsWoO205EjCQ8pE5MeBZ4Cp8PTgNkOW7AiA690+KIgobiObH6/1JDuS82R0NPO84Ttc8PY886AoKbA==',
ownerCertificate:
'MIIDeTCCAx6gAwIBAgIGAYwVp42mMAoGCCqGSM49BAMCMBIxEDAOBgNVBAMTB3JvY2tldHMwHhcNMjMxMTI4MTExOTExWhcNMzAwMTMxMjMwMDAwWjBJMUcwRQYDVQQDEz5jYXJhaTJ0d2phem50aW56bndtcnlqdzNlNzVmdXF0Z2xrd2hsemo2d3RlcWx4ano2NnRsZnhpZC5vbmlvbjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABNMUauWsTJiuDGt4zoj4lKGgHMkTH96M11fCxMwIInhan0RUB5sv+PtGKbfEfawGjhSQiUaTLdwUGjyIdMs3OMWjggInMIICIzAJBgNVHRMEAjAAMAsGA1UdDwQEAwIAgDAdBgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwggFHBgkqhkiG9w0BCQwEggE4BIIBNBETZ2k8vszIRvkuOUk/cNtOb8JcGmw5yVhs45/+e7To4t51nwcdAODj5juVi6+SpLCcHCHhE+g7KswEkC1ScFrW6CRinSgrNBOAUIjOtvWZ/GvK6lI4WTMf7xAaRaJSCF6H0m4cFoUY3JpklJleHhzj0re+NmFZEJ/hNRKochGFy4Xq9Z7StvPpGBlfxhmR7X2t/+HtZaAAbLRLLgbHtCQ7fecg0Qb9Ej58uc+T4Gd2+8ptWvebtOQVU70VAL7uT6aLkFXaDibgSt3kDNvGrwn3AxWlESgROTh5+OWWbfYIbFxjf0PkPDdUSAIOKS9qbYZ+bSYfVq+/0JFyZAa0zhPtgW8wjj0gDCLVm5joyW5Hz2eZ36W7u3cxFME2qmT9G2Dh6NGLn7G19ulVzoTkVmP5/tGPMBUGCisGAQQBg4wbAgEEBxMFZGF2aWQwPQYJKwYBAgEPAwEBBDATLlFtZE5GVjc3dXZOcTJBaWlqUEY0dzY2OU1ucWdiYVdMR1VhZlh0WTdlZjNRRFMwSQYDVR0RBEIwQII+Y2FyYWkydHdqYXpudGluem53bXJ5anczZTc1ZnVxdGdsa3dobHpqNnd0ZXFseGp6NjZ0bGZ4aWQub25pb24wCgYIKoZIzj0EAwIDSQAwRgIhAOafgBe5T0EFjyy0tCRrTHJ1+5ri0W6kAUfc6eRKHIZAAiEA7rFEfPDU+D8MiOF+w0QOdp46dqaWsHFjrDHYPSYGxQA=',
})

registrationService.setPermsData(permsData)
registrationService.onModuleInit()
registrationService.init({ certificatesStore, saveCertificate, loadAllCertificates } as unknown as StorageService)

const userCsr = await createUserCsr({
nickname: 'alice',
Expand All @@ -161,23 +192,28 @@ describe('RegistrationService', () => {
signAlg: configCrypto.signAlg,
hashAlg: configCrypto.hashAlg,
})

const userCsr2 = await createUserCsr({
nickname: 'karol',
nickname: 'alice',
commonName: 'nnnnnnc4c77fb47lk52m5l57h4tcxceo7ymxekfn7yh5m66t4jv2olad.onion',
peerId: 'QmffffffqLET9xtAtDzvAr5Pp3egK1H3C5iJAZm1SpLEp6',
dmPublicKey: 'testdmPublicKey',
signAlg: configCrypto.signAlg,
hashAlg: configCrypto.hashAlg,
})

const csrs: string[] = [userCsr.userCsr, userCsr2.userCsr]
// @ts-ignore - fn 'issueCertificates' is private
await registrationService.issueCertificates({ certificates: [], csrs, id: 1 })
const certificates: string[] = []

expect(eventSpy).toHaveBeenLastCalledWith(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, {
id: 1,
})
registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, { csrs: [userCsr.userCsr] })

registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, { csrs: [userCsr2.userCsr] })

await new Promise(r => setTimeout(r, 2000))

expect((await certificatesStore.loadAllCertificates()).length).toEqual(1)

expect(eventSpy).toHaveBeenCalledTimes(3)
await orbitDb.stop()
await ipfs.stop()
await certificatesStore.close()
})
})
89 changes: 66 additions & 23 deletions packages/backend/src/nest/registration/registration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,82 @@ import { extractPendingCsrs, issueCertificate } from './registration.functions'
import { ErrorCodes, ErrorMessages, PermsData, RegisterOwnerCertificatePayload, SocketActionTypes } from '@quiet/types'
import { RegistrationEvents } from './registration.types'
import Logger from '../common/logger'
import { StorageService } from '../storage/storage.service'

@Injectable()
export class RegistrationService extends EventEmitter implements OnModuleInit {
private readonly logger = Logger(RegistrationService.name)
public certificates: string[] = []
private _permsData: PermsData
private permsData: PermsData
private storageService: StorageService
private registrationEvents: { csrs: string[] }[] = []
private registrationEventInProgress = false

constructor() {
super()
}

onModuleInit() {
this.on(
RegistrationEvents.REGISTER_USER_CERTIFICATE,
async (payload: { csrs: string[]; certificates: string[]; id: string }) => {
await this.issueCertificates(payload)
this.on(RegistrationEvents.REGISTER_USER_CERTIFICATE, async (payload: { csrs: string[] }) => {
// Save the registration event and then try to process it, but
// since we only process a single event at a time, it might not
// get processed until other events have been processed.
this.registrationEvents.push(payload)
await this.tryIssueCertificates()
})
}

public init(storageService: StorageService) {
this.storageService = storageService
}

public setPermsData(permsData: PermsData) {
this.permsData = permsData
}

public async tryIssueCertificates() {
this.logger('Trying to issue certificates', this.registrationEventInProgress, this.registrationEvents)
// Process only a single registration event at a time so that we
// do not register two certificates with the same name.
if (!this.registrationEventInProgress) {
// Get the next event.
const event = this.registrationEvents.shift()
if (event) {
this.logger('Issuing certificates', event)
// Event processing in progress
this.registrationEventInProgress = true

// Await the processing function and make sure everything that
// needs to be done in order is awaited inside this function.
await this.issueCertificates({
...event,
certificates: (await this.storageService?.loadAllCertificates()) as string[],
})

this.logger('Finished issuing certificates')
// Event processing finished
this.registrationEventInProgress = false

// Re-run this function if there are more events to process
if (this.registrationEvents.length > 0) {
setTimeout(this.tryIssueCertificates.bind(this), 0)
}
}
)
}
}

private async issueCertificates(payload: { csrs: string[]; certificates: string[]; id?: string }) {
private async issueCertificates(payload: { csrs: string[]; certificates: string[] }) {
// Lack of permsData means that we are not the owner of the
// community in the official model of the app, however anyone can
// modify the source code, put malicious permsData here, issue
// false certificates and try to trick other users. To prevent
// that, peers verify that anything that is written to the
// certificate store is signed by the owner.
if (!this._permsData) {
if (payload.id) this.emit(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, { id: payload.id })
//
// NOTE: There may be a race condition here if we try to issue
// certs before permsData is set. We may want to refactor this or
// add the ability to retry.
if (!this.permsData) {
this.logger('Not issuing certificates due to missing perms data')
return
}

Expand All @@ -44,20 +91,11 @@ export class RegistrationService extends EventEmitter implements OnModuleInit {
await this.registerUserCertificate(csr)
})
)

if (payload.id) this.emit(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, { id: payload.id })
}

public set permsData(perms: PermsData) {
this._permsData = {
certificate: perms.certificate,
privKey: perms.privKey,
}
}

public async registerOwnerCertificate(payload: RegisterOwnerCertificatePayload): Promise<void> {
this._permsData = payload.permsData
const result = await issueCertificate(payload.userCsr.userCsr, this._permsData)
this.permsData = payload.permsData
const result = await issueCertificate(payload.userCsr.userCsr, this.permsData)
if (result?.cert) {
this.emit(SocketActionTypes.SAVED_OWNER_CERTIFICATE, {
communityId: payload.communityId,
Expand All @@ -74,10 +112,15 @@ export class RegistrationService extends EventEmitter implements OnModuleInit {
}

public async registerUserCertificate(csr: string): Promise<void> {
const result = await issueCertificate(csr, this._permsData)
const result = await issueCertificate(csr, this.permsData)
this.logger('DuplicatedCertBug', { result })
if (result?.cert) {
this.emit(RegistrationEvents.NEW_USER, { certificate: result.cert })
// Save certificate (awaited) so that we are sure that the certs
// are saved before processing the next round of CSRs.
// Otherwise, we could issue a duplicate certificate.

// @ts-ignore
await this.storageService?.saveCertificate({ certificate: result.cert })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ export enum RegistrationEvents {
ERROR = 'error',
NEW_USER = 'newUser',
REGISTER_USER_CERTIFICATE = 'registerUserCertificate',
FINISHED_ISSUING_CERTIFICATES_FOR_ID = 'FINISHED_ISSUING_CERTIFICATES_FOR_ID',
}
14 changes: 10 additions & 4 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
DeleteFilesFromChannelSocketPayload,
SaveCSRPayload,
CommunityMetadata,
type PermsData,
type UserProfile,
} from '@quiet/types'
import EventEmitter from 'events'
Expand Down Expand Up @@ -153,10 +154,6 @@ export class SocketService extends EventEmitter implements OnModuleInit {
})

// ====== Community ======
socket.on(SocketActionTypes.SEND_COMMUNITY_METADATA, (payload: CommunityMetadata) => {
this.emit(SocketActionTypes.SEND_COMMUNITY_METADATA, payload)
})

socket.on(SocketActionTypes.CREATE_COMMUNITY, async (payload: InitCommunityPayload) => {
this.logger(`Creating community ${payload.id}`)
this.emit(SocketActionTypes.CREATE_COMMUNITY, payload)
Expand All @@ -177,11 +174,20 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.logger('Leaving community')
this.emit(SocketActionTypes.LEAVE_COMMUNITY)
})

socket.on(SocketActionTypes.LIBP2P_PSK_SAVED, payload => {
this.logger('Saving PSK', payload)
this.emit(SocketActionTypes.LIBP2P_PSK_SAVED, payload)
})

socket.on(SocketActionTypes.SEND_COMMUNITY_METADATA, (payload: CommunityMetadata) => {
this.emit(SocketActionTypes.SEND_COMMUNITY_METADATA, payload)
})

socket.on(SocketActionTypes.SEND_COMMUNITY_CA_DATA, (payload: PermsData) => {
this.emit(SocketActionTypes.SEND_COMMUNITY_CA_DATA, payload)
})

// ====== Users ======

socket.on(SocketActionTypes.SAVE_USER_PROFILE, (profile: UserProfile) => {
Expand Down
Loading

0 comments on commit 11ae203

Please sign in to comment.