Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): wrap read-modify-write apis with distributed lock #5979

Merged
merged 23 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4e638a3
feat: wrap read-modify-write apis with transaction
darkskygit Mar 1, 2024
9c7842a
fix: lint
darkskygit Mar 1, 2024
47e8d3b
fix: new user name test
darkskygit Mar 1, 2024
4de584c
feat: add invite member test
darkskygit Mar 1, 2024
974d3c1
feat: use cls transaction plugin
darkskygit Mar 1, 2024
216da39
feat: move direct transaction call to decorator
darkskygit Mar 1, 2024
6102177
fix: type import
darkskygit Mar 1, 2024
19e7c5b
fix: test case
darkskygit Mar 1, 2024
34b17e8
feat: distributed mutex module
darkskygit Mar 1, 2024
e87e652
fix: rebase error
darkskygit Mar 1, 2024
f3610b6
fix: typo
darkskygit Mar 4, 2024
6c497cc
feat: wrap lock in callback
darkskygit Mar 5, 2024
133b892
chore: remove useless unlock
darkskygit Mar 5, 2024
5aab2ce
feat: improve error handle for `lockWith`
darkskygit Mar 11, 2024
7151cd3
chore: use timers/promises
darkskygit Mar 11, 2024
ac3a161
chore: add comment for cls usage
darkskygit Mar 11, 2024
335d89e
feat: add `using` syntax support for mutex lock
darkskygit Mar 13, 2024
856462d
feat: use scoped request service instance replace async context
darkskygit Mar 13, 2024
51d53b5
Merge branch 'canary' into darksky/mutate-db-with-transaction
darkskygit Mar 13, 2024
63d0408
fix: unnamed user name
darkskygit Mar 13, 2024
40580a8
Merge branch 'canary' into darksky/mutate-db-with-transaction
darkskygit Mar 14, 2024
77f5517
fix: syntax error in testing
darkskygit Mar 14, 2024
01b39ba
Merge branch 'canary' into darksky/mutate-db-with-transaction
darkskygit Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/backend/server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { GqlModule } from './fundamentals/graphql';
import { HelpersModule } from './fundamentals/helpers';
import { MailModule } from './fundamentals/mailer';
import { MetricsModule } from './fundamentals/metrics';
import { MutexModule } from './fundamentals/mutex';
import { PrismaModule } from './fundamentals/prisma';
import { StorageProviderModule } from './fundamentals/storage';
import { RateLimiterModule } from './fundamentals/throttler';
Expand All @@ -39,6 +40,7 @@ export const FunctionalityModules = [
ScheduleModule.forRoot(),
EventModule,
CacheModule,
MutexModule,
PrismaModule,
MetricsModule,
RateLimiterModule,
Expand Down
5 changes: 2 additions & 3 deletions packages/backend/server/src/core/features/feature.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { PrismaClient } from '@prisma/client';

import { PrismaTransaction } from '../../fundamentals';
import { Feature, FeatureSchema, FeatureType } from './types';

class FeatureConfig {
Expand Down Expand Up @@ -67,7 +66,7 @@ export type FeatureConfigType<F extends FeatureType> = InstanceType<

const FeatureCache = new Map<number, FeatureConfigType<FeatureType>>();

export async function getFeature(prisma: PrismaClient, featureId: number) {
export async function getFeature(prisma: PrismaTransaction, featureId: number) {
const cachedQuota = FeatureCache.get(featureId);

if (cachedQuota) {
Expand Down
7 changes: 3 additions & 4 deletions packages/backend/server/src/core/quota/quota.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import { PrismaClient } from '@prisma/client';

import { PrismaTransaction } from '../../fundamentals';
import { formatDate, formatSize, Quota, QuotaSchema } from './types';

const QuotaCache = new Map<number, QuotaConfig>();

export class QuotaConfig {
readonly config: Quota;

static async get(prisma: PrismaClient, featureId: number) {
static async get(tx: PrismaTransaction, featureId: number) {
const cachedQuota = QuotaCache.get(featureId);

if (cachedQuota) {
return cachedQuota;
}

const quota = await prisma.features.findFirst({
const quota = await tx.features.findFirst({
where: {
id: featureId,
},
Expand Down
12 changes: 7 additions & 5 deletions packages/backend/server/src/core/quota/service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Injectable } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';

import { type EventPayload, OnEvent } from '../../fundamentals';
import {
type EventPayload,
OnEvent,
PrismaTransaction,
} from '../../fundamentals';
import { FeatureKind } from '../features';
import { QuotaConfig } from './quota';
import { QuotaType } from './types';

type Transaction = Parameters<Parameters<PrismaClient['$transaction']>[0]>[0];

@Injectable()
export class QuotaService {
constructor(private readonly prisma: PrismaClient) {}
Expand Down Expand Up @@ -140,8 +142,8 @@ export class QuotaService {
});
}

async hasQuota(userId: string, quota: QuotaType, transaction?: Transaction) {
const executor = transaction ?? this.prisma;
async hasQuota(userId: string, quota: QuotaType, tx?: PrismaTransaction) {
const executor = tx ?? this.prisma;

return executor.userFeatures
.count({
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/server/src/core/user/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class UserService {

return this.createUser({
email,
name: 'Unnamed',
name: email.split('@')[0],
...data,
});
}
Expand Down
138 changes: 77 additions & 61 deletions packages/backend/server/src/core/workspaces/resolvers/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import {
EventEmitter,
type FileUpload,
MailService,
MutexService,
Throttle,
TooManyRequestsException,
} from '../../../fundamentals';
import { CurrentUser, Public } from '../../auth';
import { QuotaManagementService, QuotaQueryType } from '../../quota';
Expand Down Expand Up @@ -58,7 +60,8 @@ export class WorkspaceResolver {
private readonly quota: QuotaManagementService,
private readonly users: UserService,
private readonly event: EventEmitter,
private readonly blobStorage: WorkspaceBlobStorage
private readonly blobStorage: WorkspaceBlobStorage,
private readonly mutex: MutexService
) {}

@ResolveField(() => Permission, {
Expand Down Expand Up @@ -336,74 +339,87 @@ export class WorkspaceResolver {
throw new ForbiddenException('Cannot change owner');
}

// member limit check
const [memberCount, quota] = await Promise.all([
this.prisma.workspaceUserPermission.count({
where: { workspaceId },
}),
this.quota.getWorkspaceUsage(workspaceId),
]);
if (memberCount >= quota.memberLimit) {
throw new PayloadTooLargeException('Workspace member limit reached.');
}
try {
// lock to prevent concurrent invite
const lockFlag = `invite:${workspaceId}`;
await using lock = await this.mutex.lock(lockFlag);
if (!lock) {
return new TooManyRequestsException('Server is busy');
}

let target = await this.users.findUserByEmail(email);
if (target) {
const originRecord = await this.prisma.workspaceUserPermission.findFirst({
where: {
workspaceId,
userId: target.id,
},
});
// only invite if the user is not already in the workspace
if (originRecord) return originRecord.id;
} else {
target = await this.users.createAnonymousUser(email, {
registered: false,
});
}
// member limit check
const [memberCount, quota] = await Promise.all([
this.prisma.workspaceUserPermission.count({
where: { workspaceId },
}),
this.quota.getWorkspaceUsage(workspaceId),
]);
if (memberCount >= quota.memberLimit) {
return new PayloadTooLargeException('Workspace member limit reached.');
}

const inviteId = await this.permissions.grant(
workspaceId,
target.id,
permission
);
if (sendInviteMail) {
const inviteInfo = await this.getInviteInfo(inviteId);

try {
await this.mailer.sendInviteEmail(email, inviteId, {
workspace: {
id: inviteInfo.workspace.id,
name: inviteInfo.workspace.name,
avatar: inviteInfo.workspace.avatar,
},
user: {
avatar: inviteInfo.user?.avatarUrl || '',
name: inviteInfo.user?.name || '',
},
let target = await this.users.findUserByEmail(email);
if (target) {
const originRecord =
await this.prisma.workspaceUserPermission.findFirst({
where: {
workspaceId,
userId: target.id,
},
});
// only invite if the user is not already in the workspace
if (originRecord) return originRecord.id;
} else {
target = await this.users.createAnonymousUser(email, {
registered: false,
});
} catch (e) {
const ret = await this.permissions.revokeWorkspace(
workspaceId,
target.id
);

if (!ret) {
this.logger.fatal(
`failed to send ${workspaceId} invite email to ${email} and failed to revoke permission: ${inviteId}, ${e}`
}

const inviteId = await this.permissions.grant(
workspaceId,
target.id,
permission
);
if (sendInviteMail) {
const inviteInfo = await this.getInviteInfo(inviteId);

try {
await this.mailer.sendInviteEmail(email, inviteId, {
workspace: {
id: inviteInfo.workspace.id,
name: inviteInfo.workspace.name,
avatar: inviteInfo.workspace.avatar,
},
user: {
avatar: inviteInfo.user?.avatarUrl || '',
name: inviteInfo.user?.name || '',
},
});
} catch (e) {
const ret = await this.permissions.revokeWorkspace(
workspaceId,
target.id
);
} else {
this.logger.warn(
`failed to send ${workspaceId} invite email to ${email}, but successfully revoked permission: ${e}`

if (!ret) {
this.logger.fatal(
`failed to send ${workspaceId} invite email to ${email} and failed to revoke permission: ${inviteId}, ${e}`
);
} else {
this.logger.warn(
`failed to send ${workspaceId} invite email to ${email}, but successfully revoked permission: ${e}`
);
}
return new InternalServerErrorException(
'Failed to send invite email. Please try again.'
);
}
return new InternalServerErrorException(
'Failed to send invite email. Please try again.'
);
}
return inviteId;
} catch (e) {
this.logger.error('failed to invite user', e);
return new TooManyRequestsException('Server is busy');
}
return inviteId;
}

@Throttle({
Expand Down
1 change: 1 addition & 0 deletions packages/backend/server/src/fundamentals/error/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './payment-required';
export * from './too-many-requests';
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { HttpException, HttpStatus } from '@nestjs/common';

export class TooManyRequestsException extends HttpException {
constructor(desc?: string, code: string = 'Too Many Requests') {
super(
HttpException.createBody(
desc ?? code,
code,
HttpStatus.TOO_MANY_REQUESTS
),
HttpStatus.TOO_MANY_REQUESTS
);
}
}
14 changes: 13 additions & 1 deletion packages/backend/server/src/fundamentals/graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import { GraphQLError } from 'graphql';
import { Config } from '../config';
import { GQLLoggerPlugin } from './logger-plugin';

export type GraphqlContext = {
req: Request;
res: Response;
isAdminQuery: boolean;
};

@Global()
@Module({
imports: [
Expand All @@ -30,7 +36,13 @@ import { GQLLoggerPlugin } from './logger-plugin';
: '../../../schema.gql'
),
sortSchema: true,
context: ({ req, res }: { req: Request; res: Response }) => ({
context: ({
req,
res,
}: {
req: Request;
res: Response;
}): GraphqlContext => ({
req,
res,
isAdminQuery: false,
Expand Down
9 changes: 9 additions & 0 deletions packages/backend/server/src/fundamentals/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@ export {
} from './config';
export * from './error';
export { EventEmitter, type EventPayload, OnEvent } from './event';
export type { GraphqlContext } from './graphql';
export { CryptoHelper, URLHelper } from './helpers';
export { MailService } from './mailer';
export { CallCounter, CallTimer, metrics } from './metrics';
export {
BucketService,
LockGuard,
MUTEX_RETRY,
MUTEX_WAIT,
MutexService,
} from './mutex';
export {
getOptionalModuleMetadata,
GlobalExceptionFilter,
OptionalModule,
} from './nestjs';
export type { PrismaTransaction } from './prisma';
export * from './storage';
export { type StorageProvider, StorageProviderFactory } from './storage';
export { AuthThrottlerGuard, CloudThrottlerGuard, Throttle } from './throttler';
Expand Down
15 changes: 15 additions & 0 deletions packages/backend/server/src/fundamentals/mutex/bucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export class BucketService {
private readonly bucket = new Map<string, string>();

get(key: string) {
return this.bucket.get(key);
}

set(key: string, value: string) {
this.bucket.set(key, value);
}

delete(key: string) {
this.bucket.delete(key);
}
}
14 changes: 14 additions & 0 deletions packages/backend/server/src/fundamentals/mutex/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Global, Module } from '@nestjs/common';

import { BucketService } from './bucket';
import { MutexService } from './mutex';

@Global()
@Module({
providers: [BucketService, MutexService],
exports: [BucketService, MutexService],
})
export class MutexModule {}

export { BucketService, MutexService };
export { LockGuard, MUTEX_RETRY, MUTEX_WAIT } from './mutex';
Loading
Loading