Skip to content

Commit

Permalink
Merge pull request #75 from anatawa12/pick-redis-push-tl
Browse files Browse the repository at this point in the history
  • Loading branch information
anatawa12 authored Oct 13, 2023
2 parents 1a4200c + dc024b4 commit 93d80c8
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
- Fix: Renoteの省略表示でセンシティブチャンネルの自動CWが聞かない問題

### Server
- 2023.10.x向けのTLを内部的に構築するようになりました
- これによりこの2023.10.x以降に更新したあとも2023.9.3-kinel.4更新後のnoteが見れるようになります
- 本来の2023.10.xでは更新以前のnoteがTLで見えないという仕様がありました。

## 2023.9.3-kinel.3 (unreleased)

Expand Down
8 changes: 7 additions & 1 deletion packages/backend/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type Source = {
abuseDiscordHook?: string;
disableAbuseRepository?: boolean;
maxWebImageSize?: number;
withRepliesInHomeTL?: boolean;
withRepliesInUserList?: boolean;
}
};

Expand Down Expand Up @@ -176,6 +178,8 @@ export type Config = {
abuseDiscordHook?: string;
disableAbuseRepository?: boolean;
maxWebImageSize?: number;
withRepliesInHomeTL?: boolean,
withRepliesInUserList: boolean,
}
};

Expand Down Expand Up @@ -219,7 +223,9 @@ export function loadConfig(): Config {

return {
// to avoid merge conflict in the future, this is at top
nirila: config.nirila ?? {},
nirila: Object.assign({
withRepliesInUserList: true,
}, config.nirila ?? {}),
version,
url: url.origin,
port: config.port ?? parseInt(process.env.PORT ?? '', 10),
Expand Down
3 changes: 3 additions & 0 deletions packages/backend/src/core/AntennaService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type { AntennasRepository, UserListJoiningsRepository } from '@/models/_.
import { UtilityService } from '@/core/UtilityService.js';
import { bindThis } from '@/decorators.js';
import type { GlobalEvents } from '@/core/GlobalEventService.js';
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
import type { OnApplicationShutdown } from '@nestjs/common';

@Injectable()
Expand All @@ -38,6 +39,7 @@ export class AntennaService implements OnApplicationShutdown {

private utilityService: UtilityService,
private globalEventService: GlobalEventService,
private redisTimelineService: RedisTimelineService,
) {
this.antennasFetched = false;
this.antennas = [];
Expand Down Expand Up @@ -90,6 +92,7 @@ export class AntennaService implements OnApplicationShutdown {
'*',
'note', note.id);

this.redisTimelineService.push(`antennaTimeline:${antenna.id}`, note.id, 200, redisPipeline);
this.globalEventService.publishAntennaStream(antenna.id, 'note', note);
}

Expand Down
6 changes: 6 additions & 0 deletions packages/backend/src/core/CoreModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import { UtilityService } from './UtilityService.js';
import { FileInfoService } from './FileInfoService.js';
import { SearchService } from './SearchService.js';
import { ClipService } from './ClipService.js';
import { RedisTimelineService } from './RedisTimelineService.js';
import { ChartLoggerService } from './chart/ChartLoggerService.js';
import FederationChart from './chart/charts/federation.js';
import NotesChart from './chart/charts/notes.js';
Expand Down Expand Up @@ -186,6 +187,7 @@ const $UtilityService: Provider = { provide: 'UtilityService', useExisting: Util
const $FileInfoService: Provider = { provide: 'FileInfoService', useExisting: FileInfoService };
const $SearchService: Provider = { provide: 'SearchService', useExisting: SearchService };
const $ClipService: Provider = { provide: 'ClipService', useExisting: ClipService };
const $RedisTimelineService: Provider = { provide: 'RedisTimelineService', useExisting: RedisTimelineService };

const $ChartLoggerService: Provider = { provide: 'ChartLoggerService', useExisting: ChartLoggerService };
const $FederationChart: Provider = { provide: 'FederationChart', useExisting: FederationChart };
Expand Down Expand Up @@ -316,6 +318,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
FileInfoService,
SearchService,
ClipService,
RedisTimelineService,
ChartLoggerService,
FederationChart,
NotesChart,
Expand Down Expand Up @@ -440,6 +443,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
$FileInfoService,
$SearchService,
$ClipService,
$RedisTimelineService,
$ChartLoggerService,
$FederationChart,
$NotesChart,
Expand Down Expand Up @@ -564,6 +568,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
FileInfoService,
SearchService,
ClipService,
RedisTimelineService,
FederationChart,
NotesChart,
UsersChart,
Expand Down Expand Up @@ -687,6 +692,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
$FileInfoService,
$SearchService,
$ClipService,
$RedisTimelineService,
$FederationChart,
$NotesChart,
$UsersChart,
Expand Down
132 changes: 130 additions & 2 deletions packages/backend/src/core/NoteCreateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { setImmediate } from 'node:timers/promises';
import * as mfm from 'mfm-js';
import { In, DataSource } from 'typeorm';
import { In, DataSource, IsNull, LessThan } from 'typeorm';
import * as Redis from 'ioredis';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import RE2 from 're2';
Expand All @@ -14,7 +14,7 @@ import { extractCustomEmojisFromMfm } from '@/misc/extract-custom-emojis-from-mf
import { extractHashtags } from '@/misc/extract-hashtags.js';
import type { IMentionedRemoteUsers } from '@/models/Note.js';
import { MiNote } from '@/models/Note.js';
import type { ChannelsRepository, FollowingsRepository, InstancesRepository, MutedNotesRepository, MutingsRepository, NotesRepository, NoteThreadMutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js';
import type { ChannelFollowingsRepository, ChannelsRepository, FollowingsRepository, InstancesRepository, MutedNotesRepository, MiFollowing, MutingsRepository, NotesRepository, NoteThreadMutingsRepository, UserListJoiningsRepository, UserProfilesRepository, UsersRepository } from '@/models/_.js';
import type { MiDriveFile } from '@/models/DriveFile.js';
import type { MiApp } from '@/models/App.js';
import { concat } from '@/misc/prelude/array.js';
Expand Down Expand Up @@ -53,6 +53,7 @@ import { DB_MAX_NOTE_TEXT_LENGTH } from '@/const.js';
import { RoleService } from '@/core/RoleService.js';
import { MetaService } from '@/core/MetaService.js';
import { SearchService } from '@/core/SearchService.js';
import { RedisTimelineService } from '@/core/RedisTimelineService.js';

const mutedWordsCache = new MemorySingleCache<{ userId: MiUserProfile['userId']; mutedWords: MiUserProfile['mutedWords']; }[]>(1000 * 60 * 5);

Expand Down Expand Up @@ -175,6 +176,9 @@ export class NoteCreateService implements OnApplicationShutdown {
@Inject(DI.userProfilesRepository)
private userProfilesRepository: UserProfilesRepository,

@Inject(DI.userListJoiningsRepository)
private userListJoiningsRepository: UserListJoiningsRepository,

@Inject(DI.mutedNotesRepository)
private mutedNotesRepository: MutedNotesRepository,

Expand All @@ -187,11 +191,15 @@ export class NoteCreateService implements OnApplicationShutdown {
@Inject(DI.followingsRepository)
private followingsRepository: FollowingsRepository,

@Inject(DI.channelFollowingsRepository)
private channelFollowingsRepository: ChannelFollowingsRepository,

private userEntityService: UserEntityService,
private noteEntityService: NoteEntityService,
private idService: IdService,
private globalEventService: GlobalEventService,
private queueService: QueueService,
private redisTimelineService: RedisTimelineService,
private noteReadService: NoteReadService,
private notificationService: NotificationService,
private relayService: RelayService,
Expand Down Expand Up @@ -480,6 +488,8 @@ export class NoteCreateService implements OnApplicationShutdown {
// Increment notes count (user)
this.incNotesCountOfUser(user);

this.pushToTl(note, user);

// Word mute
mutedWordsCache.fetch(() => this.userProfilesRepository.find({
where: {
Expand Down Expand Up @@ -812,6 +822,124 @@ export class NoteCreateService implements OnApplicationShutdown {
return mentionedUsers;
}

@bindThis
private async pushToTl(note: MiNote, user: { id: MiUser['id']; host: MiUser['host']; }) {
const meta = await this.metaService.fetch();

const r = this.redisClient.pipeline();

if (note.channelId) {
this.redisTimelineService.push(`channelTimeline:${note.channelId}`, note.id, this.config.perChannelMaxNoteCacheCount, r);

this.redisTimelineService.push(`userTimelineWithChannel:${user.id}`, note.id, 300, r);

const channelFollowings = await this.channelFollowingsRepository.find({
where: {
followeeId: note.channelId,
},
select: ['followerId'],
});

for (const channelFollowing of channelFollowings) {
this.redisTimelineService.push(`homeTimeline:${channelFollowing.followerId}`, note.id, 300, r);
if (note.fileIds.length > 0) {
this.redisTimelineService.push(`homeTimelineWithFiles:${channelFollowing.followerId}`, note.id, 300 / 2, r);
}
}
} else {
// TODO: キャッシュ?
// eslint-disable-next-line prefer-const
let [followings, userListMemberships] = await Promise.all([
this.followingsRepository.find({
where: {
followeeId: user.id,
followerHost: IsNull(),
},
select: ['followerId'],
}),
this.userListJoiningsRepository.find({
where: {
userId: user.id,
},
select: ['userList', 'userListId'],
relations: ['userList'],
}),
]);

if (note.visibility === 'followers') {
// TODO: 重そうだから何とかしたい Set 使う?
userListMemberships = userListMemberships.filter(x => followings.some(f => f.followerId === x.userList!.userId));
}

// TODO: あまりにも数が多いと redisPipeline.exec に失敗する(理由は不明)ため、3万件程度を目安に分割して実行するようにする
for (const following of followings) {
// 基本的にvisibleUserIdsには自身のidが含まれている前提であること
if (note.visibility === 'specified' && !note.visibleUserIds.some(v => v === following.followerId)) continue;

// 「自分自身への返信 or そのフォロワーへの返信」のどちらでもない場合
if (note.replyId && !(note.replyUserId === note.userId || note.replyUserId === following.followerId)) {
if (!this.config.nirila.withRepliesInHomeTL) continue;
}

this.redisTimelineService.push(`homeTimeline:${following.followerId}`, note.id, 300, r);
if (note.fileIds.length > 0) {
this.redisTimelineService.push(`homeTimelineWithFiles:${following.followerId}`, note.id, 300 / 2, r);
}
}

for (const userListMembership of userListMemberships) {
// ダイレクトのとき、そのリストが対象外のユーザーの場合
if (
note.visibility === 'specified' &&
!note.visibleUserIds.some(v => v === userListMembership.userList!.userId)
) continue;

// 「自分自身への返信 or そのリストの作成者への返信」のどちらでもない場合
if (note.replyId && !(note.replyUserId === note.userId || note.replyUserId === userListMembership.userList!.userId)) {
if (!this.config.nirila.withRepliesInHomeTL) continue;
}

this.redisTimelineService.push(`userListTimeline:${userListMembership.userListId}`, note.id, 300, r);
if (note.fileIds.length > 0) {
this.redisTimelineService.push(`userListTimelineWithFiles:${userListMembership.userListId}`, note.id, 300 / 2, r);
}
}

if (note.visibility !== 'specified' || !note.visibleUserIds.some(v => v === user.id)) { // 自分自身のHTL
this.redisTimelineService.push(`homeTimeline:${user.id}`, note.id, 300, r);
if (note.fileIds.length > 0) {
this.redisTimelineService.push(`homeTimelineWithFiles:${user.id}`, note.id, 300 / 2, r);
}
}

// 自分自身以外への返信
if (note.replyId && note.replyUserId !== note.userId) {
this.redisTimelineService.push(`userTimelineWithReplies:${user.id}`, note.id, 300, r);

if (note.visibility === 'public' && note.userHost == null) {
this.redisTimelineService.push('localTimelineWithReplies', note.id, 300, r);
}
} else {
this.redisTimelineService.push(`userTimeline:${user.id}`, note.id, 300, r);
if (note.fileIds.length > 0) {
this.redisTimelineService.push(`userTimelineWithFiles:${user.id}`, note.id, 300 / 2, r);
}

if (note.visibility === 'public' && note.userHost == null) {
this.redisTimelineService.push('localTimeline', note.id, 1000, r);
if (note.fileIds.length > 0) {
this.redisTimelineService.push('localTimelineWithFiles', note.id, 500, r);
}
}
}

if (Math.random() < 0.1) {
}
}

r.exec();
}

@bindThis
public dispose(): void {
this.#shutdownController.abort();
Expand Down
80 changes: 80 additions & 0 deletions packages/backend/src/core/RedisTimelineService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-FileCopyrightText: syuilo and other misskey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/

import { Inject, Injectable } from '@nestjs/common';
import * as Redis from 'ioredis';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import { IdService } from '@/core/IdService.js';

@Injectable()
export class RedisTimelineService {
constructor(
@Inject(DI.redis)
private redisClient: Redis.Redis,

private idService: IdService,
) {
}

@bindThis
public push(tl: string, id: string, maxlen: number, pipeline: Redis.ChainableCommander) {
// リモートから遅れて届いた(もしくは後から追加された)投稿日時が古い投稿が追加されるとページネーション時に問題を引き起こすため、
// 3分以内に投稿されたものでない場合、Redisにある最古のIDより新しい場合のみ追加する
if (this.idService.parse(id).date.getTime() > Date.now() - 1000 * 60 * 3) {
pipeline.lpush('list:' + tl, id);
if (Math.random() < 0.1) { // 10%の確率でトリム
pipeline.ltrim('list:' + tl, 0, maxlen - 1);
}
} else {
// 末尾のIDを取得
this.redisClient.lindex('list:' + tl, -1).then(lastId => {
if (lastId == null || (this.idService.parse(id).date.getTime() > this.idService.parse(lastId).date.getTime())) {
this.redisClient.lpush('list:' + tl, id);
} else {
Promise.resolve();
}
});
}
}

@bindThis
public get(name: string, untilId?: string | null, sinceId?: string | null) {
if (untilId && sinceId) {
return this.redisClient.lrange('list:' + name, 0, -1)
.then(ids => ids.filter(id => id < untilId && id > sinceId).sort((a, b) => a > b ? -1 : 1));
} else if (untilId) {
return this.redisClient.lrange('list:' + name, 0, -1)
.then(ids => ids.filter(id => id < untilId).sort((a, b) => a > b ? -1 : 1));
} else if (sinceId) {
return this.redisClient.lrange('list:' + name, 0, -1)
.then(ids => ids.filter(id => id > sinceId).sort((a, b) => a < b ? -1 : 1));
} else {
return this.redisClient.lrange('list:' + name, 0, -1)
.then(ids => ids.sort((a, b) => a > b ? -1 : 1));
}
}

@bindThis
public getMulti(name: string[], untilId?: string | null, sinceId?: string | null): Promise<string[][]> {
const pipeline = this.redisClient.pipeline();
for (const n of name) {
pipeline.lrange('list:' + n, 0, -1);
}
return pipeline.exec().then(res => {
if (res == null) return [];
const tls = res.map(r => r[1] as string[]);
return tls.map(ids =>
(untilId && sinceId)
? ids.filter(id => id < untilId && id > sinceId).sort((a, b) => a > b ? -1 : 1)
: untilId
? ids.filter(id => id < untilId).sort((a, b) => a > b ? -1 : 1)
: sinceId
? ids.filter(id => id > sinceId).sort((a, b) => a < b ? -1 : 1)
: ids.sort((a, b) => a > b ? -1 : 1),
);
});
}
}
Loading

0 comments on commit 93d80c8

Please sign in to comment.