-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing user registration concurrency issues #2157
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -318,11 +318,25 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI | |
await this.launchCommunity(payload) | ||
this.logger(`Created and launched community ${payload.id}`) | ||
this.serverIoProvider.io.emit(SocketActionTypes.NEW_COMMUNITY, { id: payload.id }) | ||
|
||
if (!payload.ownerCsr || !payload.permsData) { | ||
throw new Error("Owner CSR and PermsData required to create community!") | ||
} | ||
// In launchCommunity, we initialize the StorageService and also | ||
// the RegistrationService for the first time. And then now, once | ||
// those are ready, we register the owner's certificate. | ||
const cert = await this.registrationService.registerOwnerCertificate({ communityId: payload.id, userCsr: payload.ownerCsr, permsData: payload.permsData }) | ||
if (payload.certs) { | ||
// Hacking, perhaps make certs.certificate optional | ||
payload.certs.certificate = cert || '' | ||
} | ||
await this.localDbService.put(LocalDBKeys.COMMUNITY, payload) | ||
} | ||
|
||
public async launchCommunity(payload: InitCommunityPayload) { | ||
this.logger('Launching community: peers:', payload.peers) | ||
this.communityState = ServiceState.LAUNCHING | ||
|
||
// Perhaps we should call this data something else, since we already have a Community type. | ||
// It seems like InitCommunityPayload is a mix of various connection metadata. | ||
const communityData: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY) | ||
|
@@ -424,6 +438,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI | |
this.serverIoProvider.io.emit(SocketActionTypes.PEER_DISCONNECTED, payload) | ||
}) | ||
await this.storageService.init(_peerId) | ||
await this.registrationService.init(this.storageService) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a recommended way to inject dependencies. Please add StorageModule in Registration Module: |
||
this.logger('storage initialized') | ||
} | ||
|
||
|
@@ -444,10 +459,6 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI | |
this.registrationService.on(RegistrationEvents.ERROR, payload => { | ||
emitError(this.serverIoProvider.io, payload) | ||
}) | ||
this.registrationService.on(RegistrationEvents.NEW_USER, async payload => { | ||
await this.storageService?.saveCertificate(payload) | ||
}) | ||
|
||
this.registrationService.on(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, payload => { | ||
if (payload.id) { | ||
this.storageService.resolveCsrReplicatedPromise(payload.id) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,11 @@ import { issueCertificate, extractPendingCsrs } from './registration.functions' | |
import { jest } from '@jest/globals' | ||
import { createTmpDir } from '../common/utils' | ||
import { RegistrationEvents } from './registration.types' | ||
import { EventEmitter } from 'events' | ||
import { CertificatesStore } from '../storage/certificates/certificates.store' | ||
import { create, IPFS } from 'ipfs-core' | ||
import OrbitDB from 'orbit-db' | ||
import { TestConfig } from '../const' | ||
|
||
describe('RegistrationService', () => { | ||
let module: TestingModule | ||
|
@@ -180,4 +185,71 @@ describe('RegistrationService', () => { | |
|
||
expect(eventSpy).toHaveBeenCalledTimes(3) | ||
}) | ||
|
||
const createOrbitDbInstance = async () => { | ||
const ipfs: IPFS = await create() | ||
// @ts-ignore | ||
const orbitdb = await OrbitDB.createInstance(ipfs, { | ||
directory: TestConfig.ORBIT_DB_DIR, | ||
}) | ||
|
||
return { orbitdb, ipfs } | ||
} | ||
|
||
it('race', async () => { | ||
let { orbitdb, ipfs } = await createOrbitDbInstance() | ||
let store = new CertificatesStore(orbitdb) | ||
let emitter = new EventEmitter() | ||
await store.init(emitter) | ||
|
||
registrationService.permsData = permsData | ||
// @ts-ignore | ||
registrationService.storageService = { certificatesStore: store } | ||
|
||
registrationService.onModuleInit() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing await |
||
|
||
const userCsr = await createUserCsr({ | ||
nickname: 'alice', | ||
commonName: 'nqnw4kc4c77fb47lk52m5l57h4tcxceo7ymxekfn7yh5m66t4jv2olad.onion', | ||
peerId: 'Qmf3ySkYqLET9xtAtDzvAr5Pp3egK1H3C5iJAZm1SpLEp6', | ||
dmPublicKey: 'testdmPublicKey', | ||
signAlg: configCrypto.signAlg, | ||
hashAlg: configCrypto.hashAlg, | ||
}) | ||
|
||
const userCsr2 = await createUserCsr({ | ||
nickname: 'alice', | ||
commonName: 'nnnnnnc4c77fb47lk52m5l57h4tcxceo7ymxekfn7yh5m66t4jv2olad.onion', | ||
peerId: 'QmffffffqLET9xtAtDzvAr5Pp3egK1H3C5iJAZm1SpLEp6', | ||
dmPublicKey: 'testdmPublicKey', | ||
signAlg: configCrypto.signAlg, | ||
hashAlg: configCrypto.hashAlg, | ||
}) | ||
|
||
const certificates: string[] = [] | ||
let calls = 0 | ||
|
||
registrationService.on('new', (p: any) => { | ||
console.log(p) | ||
calls++ | ||
}) | ||
|
||
registrationService.emit( | ||
RegistrationEvents.REGISTER_USER_CERTIFICATE, | ||
{ csrs: [userCsr.userCsr], certificates: [], id: '1' } | ||
) | ||
|
||
registrationService.emit( | ||
RegistrationEvents.REGISTER_USER_CERTIFICATE, | ||
{ csrs: [userCsr2.userCsr], certificates: [], id: '2' } | ||
) | ||
|
||
await new Promise(r => setTimeout(r, 10000)) | ||
|
||
await store.close() | ||
await orbitdb.stop() | ||
await ipfs.stop() | ||
|
||
expect(calls).toEqual(1) | ||
}) | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,17 @@ import { EventEmitter } from 'events' | |
import { extractPendingCsrs, issueCertificate } from './registration.functions' | ||
import { ErrorCodes, ErrorMessages, PermsData, RegisterOwnerCertificatePayload, SocketActionTypes } from '@quiet/types' | ||
import { RegistrationEvents } from './registration.types' | ||
import { StorageService } from '../storage/storage.service' | ||
import Logger from '../common/logger' | ||
|
||
@Injectable() | ||
export class RegistrationService extends EventEmitter implements OnModuleInit { | ||
private readonly logger = Logger(RegistrationService.name) | ||
public certificates: string[] = [] | ||
private _permsData: PermsData | ||
private storageService: StorageService | ||
private registrationEvents: { csrs: string[], id?: string }[] = [] | ||
private registrationEventInProgress: boolean | ||
|
||
constructor() { | ||
super() | ||
|
@@ -18,32 +22,15 @@ export class RegistrationService extends EventEmitter implements OnModuleInit { | |
onModuleInit() { | ||
this.on( | ||
RegistrationEvents.REGISTER_USER_CERTIFICATE, | ||
async (payload: { csrs: string[]; certificates: string[]; id: string }) => { | ||
await this.issueCertificates(payload) | ||
(payload: { csrs: string[]; certificates: string[]; id: string }) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should still be async function |
||
this.registrationEvents.push({ csrs: payload.csrs, id: payload.id }) | ||
this.tryRegisterNextUserCertificates() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tryRegisterNextUserCertificates() is async function so it needs to be I think something may be broken again with our eslint. Do you not get an error on this line? It should complain about missing await. If you did want to actually fire-and-forget you should write it as
but this is asually a very bad idea and rarely the behaviour you want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think
is equivalent to
and where the Promise body, I was confused myself and so I started diving into it a bit more. I don't have the whole picture and am likely still confused about many things, but this is my latest understanding A situation like this I think is a little bit different:
Where the solo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's actually the opposite. If you don't This second point is the main reason for always awaiting stuff. Without it, error handling is a nightmare. |
||
} | ||
) | ||
} | ||
|
||
private async issueCertificates(payload: { csrs: string[]; certificates: string[]; id?: string }) { | ||
// Lack of permsData means that we are not the owner of the | ||
// community in the official model of the app, however anyone can | ||
// modify the source code, put malicious permsData here, issue | ||
// false certificates and try to trick other users. To prevent | ||
// that, peers verify that anything that is written to the | ||
// certificate store is signed by the owner. | ||
if (!this._permsData) { | ||
if (payload.id) this.emit(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, { id: payload.id }) | ||
return | ||
} | ||
const pendingCsrs = await extractPendingCsrs(payload) | ||
|
||
await Promise.all( | ||
pendingCsrs.map(async csr => { | ||
await this.registerUserCertificate(csr) | ||
}) | ||
) | ||
|
||
if (payload.id) this.emit(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, { id: payload.id }) | ||
public init(storageService: StorageService) { | ||
this.storageService = storageService | ||
} | ||
|
||
public set permsData(perms: PermsData) { | ||
|
@@ -53,15 +40,26 @@ export class RegistrationService extends EventEmitter implements OnModuleInit { | |
} | ||
} | ||
|
||
public async registerOwnerCertificate(payload: RegisterOwnerCertificatePayload): Promise<void> { | ||
public async registerOwnerCertificate(payload: RegisterOwnerCertificatePayload): Promise<string | undefined> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For functions that return stuff conditionally it's generally recommended to go with |
||
if (!this.storageService) { | ||
throw new Error("Storage Service must be initialized before the Registration Service") | ||
} | ||
|
||
// FIXME: We should resolve problems with events order and we should set permsData only on LAUNCH_REGISTRART socket event in connectionsManager. | ||
this._permsData = payload.permsData | ||
const result = await issueCertificate(payload.userCsr.userCsr, this._permsData) | ||
if (result?.cert) { | ||
await this.storageService.certificatesStore.addCertificate(result.cert) | ||
// Not sure if this is necessary | ||
const certs = await this.storageService.certificatesStore.loadAllCertificates() | ||
if (!certs.includes(result.cert)) { | ||
throw new Error("Cert wasn't added to CertificateStore correctly") | ||
} | ||
this.emit(SocketActionTypes.SAVED_OWNER_CERTIFICATE, { | ||
communityId: payload.communityId, | ||
network: { certificate: result.cert }, | ||
}) | ||
return result?.cert | ||
} else { | ||
this.emit(SocketActionTypes.ERROR, { | ||
type: SocketActionTypes.REGISTRAR, | ||
|
@@ -72,10 +70,76 @@ export class RegistrationService extends EventEmitter implements OnModuleInit { | |
} | ||
} | ||
|
||
public async registerUserCertificate(csr: string): Promise<void> { | ||
const result = await issueCertificate(csr, this._permsData) | ||
if (result?.cert) { | ||
this.emit(RegistrationEvents.NEW_USER, { certificate: result.cert }) | ||
public async tryRegisterNextUserCertificates() { | ||
if (!this.registrationEventInProgress) { | ||
console.log("Registering next certificate") | ||
this.registrationEventInProgress = true | ||
const next = this.registrationEvents.shift() | ||
if (next) { | ||
await this.registerUserCertificates(next) | ||
} | ||
this.registrationEventInProgress = false | ||
|
||
if (this.registrationEvents.length !== 0) { | ||
setTimeout(this.tryRegisterNextUserCertificates.bind(this), 0) | ||
} | ||
} | ||
} | ||
|
||
// Apparently, JS will run each function to completion. So assuming | ||
// we do not have multiple threads, this function should run to | ||
// completion before it is called again. Because we take the CSRs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is inaccurate JS will run the function synchronously until the first There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, this comment is outdated |
||
// and then get the latest view of certificates, filter CSRs and | ||
// then update certificates in a single function, we should never | ||
// issue a CSR that is contained in the certificates list, at least | ||
// in this function... as long as there is no other code that | ||
// updates the certificates list. Something else to consider is that | ||
// because this function is called asynchronously, there may be | ||
// several invocations with different CSRs and they may run in an | ||
// unexpected order. We could address that if it's an issue, but I | ||
// think that might only affect the order of CSR registration. | ||
public async registerUserCertificates(payload: { csrs: string[]; id?: string }) { | ||
if (!this.storageService) { | ||
throw new Error("Storage Service must be initialized before the Registration Service") | ||
} | ||
console.log("Registering user certificates") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't we refactor everthing to use nest logger for logging? If so, please fix this |
||
|
||
// Lack of permsData means that we are not the owner of the | ||
// community in the official model of the app, however anyone can | ||
// modify the source code, put malicious permsData here, issue | ||
// false certificates and try to trick other users. To prevent | ||
// that, peers verify that anything that is written to the | ||
// certificate store is signed by the owner. | ||
if (!this._permsData) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this here? Why would we trigger a "finished issuing certificates" event if we are not the owner? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure, I think I'm mostly reorganizing existing code here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name is misleading. AFAIK it's only used for resolving promise in csrReplicatedPromiseMap in storage. |
||
if (payload.id) this.emit(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, { id: payload.id }) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm starting to think eslint is completely off in this project. Is it not requesting that you wrap this in brackets? |
||
return | ||
} | ||
|
||
console.log("Loading all certificates") | ||
const certificates = await this.storageService.certificatesStore.loadAllCertificates() | ||
console.log("Certificates loaded") | ||
|
||
console.log("Extracting pending CSRs") | ||
const pendingCsrs = await extractPendingCsrs({ csrs: payload.csrs, certificates: certificates as string[] }) | ||
|
||
for (const csr of pendingCsrs) { | ||
console.log("Issuing certificate") | ||
const result = await issueCertificate(csr, this._permsData) | ||
if (result?.cert) { | ||
console.log("Adding certificate") | ||
await this.storageService.certificatesStore.addCertificate(result.cert) | ||
// Not sure if this is necessary | ||
const certs = await this.storageService.certificatesStore.loadAllCertificates() | ||
if (!certs.includes(result.cert)) { | ||
throw new Error("Cert wasn't added to CertificateStore correctly") | ||
} | ||
console.log("Certificate added") | ||
this.emit('new', result?.cert) | ||
} else { | ||
console.log("Not adding certificate") | ||
} | ||
} | ||
|
||
if (payload.id) this.emit(RegistrationEvents.FINISHED_ISSUING_CERTIFICATES_FOR_ID, { id: payload.id }) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,13 @@ import { applyEmitParams, type Socket } from '../../../types' | |
import { type PayloadAction } from '@reduxjs/toolkit' | ||
import { apply, select, put } from 'typed-redux-saga' | ||
import { communitiesSelectors } from '../../communities/communities.selectors' | ||
import { identitySelectors } from '../../identity/identity.selectors' | ||
import { identityActions } from '../identity.slice' | ||
import { | ||
type RegisterOwnerCertificatePayload, | ||
type RegisterUserCertificatePayload, | ||
SocketActionTypes, | ||
type InitCommunityPayload, | ||
} from '@quiet/types' | ||
import { communitiesActions } from '../../communities/communities.slice' | ||
|
||
|
@@ -22,16 +24,30 @@ export function* registerCertificateSaga( | |
} | ||
|
||
if (currentCommunity.CA?.rootCertString) { | ||
const payload: RegisterOwnerCertificatePayload = { | ||
communityId: action.payload.communityId, | ||
userCsr: action.payload.userCsr, | ||
const identity = yield* select(identitySelectors.selectById(currentCommunity.id)) | ||
if (!identity?.userCsr || !currentCommunity?.rootCa) { | ||
console.error("User CSR or root cert missing", identity?.userCsr, currentCommunity?.rootCa) | ||
return | ||
} | ||
|
||
const payload: InitCommunityPayload = { | ||
id: currentCommunity.id, | ||
peerId: identity.peerId, | ||
hiddenService: identity.hiddenService, | ||
certs: { | ||
// Hacking, perhaps make certs.certificate optional | ||
certificate: identity.userCertificate || '', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. call it bikeshedding on my part but |
||
key: identity.userCsr.userKey, | ||
CA: [currentCommunity.rootCa], | ||
}, | ||
ownerCsr: identity.userCsr, | ||
permsData: { | ||
certificate: currentCommunity.CA.rootCertString, | ||
privKey: currentCommunity.CA.rootKeyString, | ||
}, | ||
} | ||
|
||
yield* apply(socket, socket.emit, applyEmitParams(SocketActionTypes.REGISTER_OWNER_CERTIFICATE, payload)) | ||
yield* apply(socket, socket.emit, applyEmitParams(SocketActionTypes.CREATE_COMMUNITY, payload)) | ||
} else { | ||
if (!isUsernameTaken) { | ||
yield* put(communitiesActions.launchCommunity(action.payload.communityId)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would just show "abnormal backend termination" js error in application. Maybe it should be a
return
instead and emitError?