Skip to content

Commit

Permalink
fix(server): wrap read-modify-write apis with distributed lock (#5979)
Browse files Browse the repository at this point in the history
  • Loading branch information
darkskygit authored and Brooooooklyn committed Mar 15, 2024
1 parent dd2c6cf commit 530959b
Show file tree
Hide file tree
Showing 20 changed files with 413 additions and 80 deletions.
2 changes: 2 additions & 0 deletions packages/backend/server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { GqlModule } from './fundamentals/graphql';
import { HelpersModule } from './fundamentals/helpers';
import { MailModule } from './fundamentals/mailer';
import { MetricsModule } from './fundamentals/metrics';
import { MutexModule } from './fundamentals/mutex';
import { PrismaModule } from './fundamentals/prisma';
import { StorageProviderModule } from './fundamentals/storage';
import { RateLimiterModule } from './fundamentals/throttler';
Expand All @@ -39,6 +40,7 @@ export const FunctionalityModules = [
ScheduleModule.forRoot(),
EventModule,
CacheModule,
MutexModule,
PrismaModule,
MetricsModule,
RateLimiterModule,
Expand Down
5 changes: 2 additions & 3 deletions packages/backend/server/src/core/features/feature.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { PrismaClient } from '@prisma/client';

import { PrismaTransaction } from '../../fundamentals';
import { Feature, FeatureSchema, FeatureType } from './types';

class FeatureConfig {
Expand Down Expand Up @@ -67,7 +66,7 @@ export type FeatureConfigType<F extends FeatureType> = InstanceType<

const FeatureCache = new Map<number, FeatureConfigType<FeatureType>>();

export async function getFeature(prisma: PrismaClient, featureId: number) {
export async function getFeature(prisma: PrismaTransaction, featureId: number) {
const cachedQuota = FeatureCache.get(featureId);

if (cachedQuota) {
Expand Down
7 changes: 3 additions & 4 deletions packages/backend/server/src/core/quota/quota.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import { PrismaClient } from '@prisma/client';

import { PrismaTransaction } from '../../fundamentals';
import { formatDate, formatSize, Quota, QuotaSchema } from './types';

const QuotaCache = new Map<number, QuotaConfig>();

export class QuotaConfig {
readonly config: Quota;

static async get(prisma: PrismaClient, featureId: number) {
static async get(tx: PrismaTransaction, featureId: number) {
const cachedQuota = QuotaCache.get(featureId);

if (cachedQuota) {
return cachedQuota;
}

const quota = await prisma.features.findFirst({
const quota = await tx.features.findFirst({
where: {
id: featureId,
},
Expand Down
12 changes: 7 additions & 5 deletions packages/backend/server/src/core/quota/service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Injectable } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';

import { type EventPayload, OnEvent } from '../../fundamentals';
import {
type EventPayload,
OnEvent,
PrismaTransaction,
} from '../../fundamentals';
import { FeatureKind } from '../features';
import { QuotaConfig } from './quota';
import { QuotaType } from './types';

type Transaction = Parameters<Parameters<PrismaClient['$transaction']>[0]>[0];

@Injectable()
export class QuotaService {
constructor(private readonly prisma: PrismaClient) {}
Expand Down Expand Up @@ -140,8 +142,8 @@ export class QuotaService {
});
}

async hasQuota(userId: string, quota: QuotaType, transaction?: Transaction) {
const executor = transaction ?? this.prisma;
async hasQuota(userId: string, quota: QuotaType, tx?: PrismaTransaction) {
const executor = tx ?? this.prisma;

return executor.userFeatures
.count({
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/server/src/core/user/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class UserService {

return this.createUser({
email,
name: 'Unnamed',
name: email.split('@')[0],
...data,
});
}
Expand Down
138 changes: 77 additions & 61 deletions packages/backend/server/src/core/workspaces/resolvers/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import {
EventEmitter,
type FileUpload,
MailService,
MutexService,
Throttle,
TooManyRequestsException,
} from '../../../fundamentals';
import { CurrentUser, Public } from '../../auth';
import { QuotaManagementService, QuotaQueryType } from '../../quota';
Expand Down Expand Up @@ -58,7 +60,8 @@ export class WorkspaceResolver {
private readonly quota: QuotaManagementService,
private readonly users: UserService,
private readonly event: EventEmitter,
private readonly blobStorage: WorkspaceBlobStorage
private readonly blobStorage: WorkspaceBlobStorage,
private readonly mutex: MutexService
) {}

@ResolveField(() => Permission, {
Expand Down Expand Up @@ -336,74 +339,87 @@ export class WorkspaceResolver {
throw new ForbiddenException('Cannot change owner');
}

// member limit check
const [memberCount, quota] = await Promise.all([
this.prisma.workspaceUserPermission.count({
where: { workspaceId },
}),
this.quota.getWorkspaceUsage(workspaceId),
]);
if (memberCount >= quota.memberLimit) {
throw new PayloadTooLargeException('Workspace member limit reached.');
}
try {
// lock to prevent concurrent invite
const lockFlag = `invite:${workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
if (!lock) {
return new TooManyRequestsException('Server is busy');
}

let target = await this.users.findUserByEmail(email);
if (target) {
const originRecord = await this.prisma.workspaceUserPermission.findFirst({
where: {
workspaceId,
userId: target.id,
},
});
// only invite if the user is not already in the workspace
if (originRecord) return originRecord.id;
} else {
target = await this.users.createAnonymousUser(email, {
registered: false,
});
}
// member limit check
const [memberCount, quota] = await Promise.all([
this.prisma.workspaceUserPermission.count({
where: { workspaceId },
}),
this.quota.getWorkspaceUsage(workspaceId),
]);
if (memberCount >= quota.memberLimit) {
return new PayloadTooLargeException('Workspace member limit reached.');
}

const inviteId = await this.permissions.grant(
workspaceId,
target.id,
permission
);
if (sendInviteMail) {
const inviteInfo = await this.getInviteInfo(inviteId);

try {
await this.mailer.sendInviteEmail(email, inviteId, {
workspace: {
id: inviteInfo.workspace.id,
name: inviteInfo.workspace.name,
avatar: inviteInfo.workspace.avatar,
},
user: {
avatar: inviteInfo.user?.avatarUrl || '',
name: inviteInfo.user?.name || '',
},
let target = await this.users.findUserByEmail(email);
if (target) {
const originRecord =
await this.prisma.workspaceUserPermission.findFirst({
where: {
workspaceId,
userId: target.id,
},
});
// only invite if the user is not already in the workspace
if (originRecord) return originRecord.id;
} else {
target = await this.users.createAnonymousUser(email, {
registered: false,
});
} catch (e) {
const ret = await this.permissions.revokeWorkspace(
workspaceId,
target.id
);

if (!ret) {
this.logger.fatal(
`failed to send ${workspaceId} invite email to ${email} and failed to revoke permission: ${inviteId}, ${e}`
}

const inviteId = await this.permissions.grant(
workspaceId,
target.id,
permission
);
if (sendInviteMail) {
const inviteInfo = await this.getInviteInfo(inviteId);

try {
await this.mailer.sendInviteEmail(email, inviteId, {
workspace: {
id: inviteInfo.workspace.id,
name: inviteInfo.workspace.name,
avatar: inviteInfo.workspace.avatar,
},
user: {
avatar: inviteInfo.user?.avatarUrl || '',
name: inviteInfo.user?.name || '',
},
});
} catch (e) {
const ret = await this.permissions.revokeWorkspace(
workspaceId,
target.id
);
} else {
this.logger.warn(
`failed to send ${workspaceId} invite email to ${email}, but successfully revoked permission: ${e}`

if (!ret) {
this.logger.fatal(
`failed to send ${workspaceId} invite email to ${email} and failed to revoke permission: ${inviteId}, ${e}`
);
} else {
this.logger.warn(
`failed to send ${workspaceId} invite email to ${email}, but successfully revoked permission: ${e}`
);
}
return new InternalServerErrorException(
'Failed to send invite email. Please try again.'
);
}
return new InternalServerErrorException(
'Failed to send invite email. Please try again.'
);
}
return inviteId;
} catch (e) {
this.logger.error('failed to invite user', e);
return new TooManyRequestsException('Server is busy');
}
return inviteId;
}

@Throttle({
Expand Down
1 change: 1 addition & 0 deletions packages/backend/server/src/fundamentals/error/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './payment-required';
export * from './too-many-requests';
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { HttpException, HttpStatus } from '@nestjs/common';

export class TooManyRequestsException extends HttpException {
constructor(desc?: string, code: string = 'Too Many Requests') {
super(
HttpException.createBody(
desc ?? code,
code,
HttpStatus.TOO_MANY_REQUESTS
),
HttpStatus.TOO_MANY_REQUESTS
);
}
}
14 changes: 13 additions & 1 deletion packages/backend/server/src/fundamentals/graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import { GraphQLError } from 'graphql';
import { Config } from '../config';
import { GQLLoggerPlugin } from './logger-plugin';

export type GraphqlContext = {
req: Request;
res: Response;
isAdminQuery: boolean;
};

@Global()
@Module({
imports: [
Expand All @@ -30,7 +36,13 @@ import { GQLLoggerPlugin } from './logger-plugin';
: '../../../schema.gql'
),
sortSchema: true,
context: ({ req, res }: { req: Request; res: Response }) => ({
context: ({
req,
res,
}: {
req: Request;
res: Response;
}): GraphqlContext => ({
req,
res,
isAdminQuery: false,
Expand Down
9 changes: 9 additions & 0 deletions packages/backend/server/src/fundamentals/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@ export {
} from './config';
export * from './error';
export { EventEmitter, type EventPayload, OnEvent } from './event';
export type { GraphqlContext } from './graphql';
export { CryptoHelper, URLHelper } from './helpers';
export { MailService } from './mailer';
export { CallCounter, CallTimer, metrics } from './metrics';
export {
BucketService,
LockGuard,
MUTEX_RETRY,
MUTEX_WAIT,
MutexService,
} from './mutex';
export {
getOptionalModuleMetadata,
GlobalExceptionFilter,
OptionalModule,
} from './nestjs';
export type { PrismaTransaction } from './prisma';
export * from './storage';
export { type StorageProvider, StorageProviderFactory } from './storage';
export { AuthThrottlerGuard, CloudThrottlerGuard, Throttle } from './throttler';
Expand Down
15 changes: 15 additions & 0 deletions packages/backend/server/src/fundamentals/mutex/bucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export class BucketService {
private readonly bucket = new Map<string, string>();

get(key: string) {
return this.bucket.get(key);
}

set(key: string, value: string) {
this.bucket.set(key, value);
}

delete(key: string) {
this.bucket.delete(key);
}
}
14 changes: 14 additions & 0 deletions packages/backend/server/src/fundamentals/mutex/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Global, Module } from '@nestjs/common';

import { BucketService } from './bucket';
import { MutexService } from './mutex';

@Global()
@Module({
providers: [BucketService, MutexService],
exports: [BucketService, MutexService],
})
export class MutexModule {}

export { BucketService, MutexService };
export { LockGuard, MUTEX_RETRY, MUTEX_WAIT } from './mutex';
Loading

0 comments on commit 530959b

Please sign in to comment.