Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server): bull jobs #2569

Merged
merged 5 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions server/apps/microservices/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { getLogLevels, SERVER_VERSION } from '@app/domain';
import { RedisIoAdapter } from '@app/infra';
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { SERVER_VERSION } from '@app/domain';
import { getLogLevels } from '@app/domain';
import { RedisIoAdapter } from '@app/infra';
import { MicroservicesModule } from './microservices.module';
import { ProcessorService } from './processor.service';
import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';

const logger = new Logger('ImmichMicroservice');
Expand All @@ -15,6 +15,8 @@ async function bootstrap() {

const listeningPort = Number(process.env.MICROSERVICES_PORT) || 3002;

await app.get(ProcessorService).init();

app.useWebSocketAdapter(new RedisIoAdapter(app));

const metadataService = app.get(MetadataExtractionProcessor);
Expand Down
26 changes: 3 additions & 23 deletions server/apps/microservices/src/microservices.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,15 @@ import { InfraModule } from '@app/infra';
import { ExifEntity } from '@app/infra/entities';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import {
BackgroundTaskProcessor,
ClipEncodingProcessor,
FacialRecognitionProcessor,
ObjectTaggingProcessor,
SearchIndexProcessor,
StorageTemplateMigrationProcessor,
ThumbnailGeneratorProcessor,
VideoTranscodeProcessor,
} from './processors';
import { MetadataExtractionProcessor, SidecarProcessor } from './processors/metadata-extraction.processor';
import { ProcessorService } from './processor.service';
import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';

@Module({
imports: [
//
DomainModule.register({ imports: [InfraModule] }),
TypeOrmModule.forFeature([ExifEntity]),
],
providers: [
ThumbnailGeneratorProcessor,
MetadataExtractionProcessor,
VideoTranscodeProcessor,
ObjectTaggingProcessor,
ClipEncodingProcessor,
StorageTemplateMigrationProcessor,
BackgroundTaskProcessor,
SearchIndexProcessor,
FacialRecognitionProcessor,
SidecarProcessor,
],
providers: [MetadataExtractionProcessor, ProcessorService],
})
export class MicroservicesModule {}
105 changes: 105 additions & 0 deletions server/apps/microservices/src/processor.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import {
AssetService,
FacialRecognitionService,
JobName,
JOBS_TO_QUEUE,
MediaService,
MetadataService,
PersonService,
QueueName,
QUEUE_TO_CONCURRENCY,
SearchService,
SmartInfoService,
StorageService,
StorageTemplateService,
SystemConfigService,
UserService,
} from '@app/domain';
import { getQueueToken } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { Queue } from 'bull';
import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';

type JobHandler<T = any> = (data: T) => void | Promise<void>;

@Injectable()
export class ProcessorService {
constructor(
private moduleRef: ModuleRef,
// TODO refactor to domain
private metadataProcessor: MetadataExtractionProcessor,

private assetService: AssetService,
private facialRecognitionService: FacialRecognitionService,
private mediaService: MediaService,
private metadataService: MetadataService,
private personService: PersonService,
private searchService: SearchService,
private smartInfoService: SmartInfoService,
private storageTemplateService: StorageTemplateService,
private storageService: StorageService,
private systemConfigService: SystemConfigService,
private userService: UserService,
) {}

private handlers: Record<JobName, JobHandler> = {
[JobName.ASSET_UPLOADED]: (data) => this.assetService.handleAssetUpload(data),
[JobName.DELETE_FILES]: (data) => this.storageService.handleDeleteFiles(data),
[JobName.USER_DELETE_CHECK]: () => this.userService.handleUserDeleteCheck(),
[JobName.USER_DELETION]: (data) => this.userService.handleUserDelete(data),
[JobName.QUEUE_OBJECT_TAGGING]: (data) => this.smartInfoService.handleQueueObjectTagging(data),
[JobName.DETECT_OBJECTS]: (data) => this.smartInfoService.handleDetectObjects(data),
[JobName.CLASSIFY_IMAGE]: (data) => this.smartInfoService.handleClassifyImage(data),
[JobName.QUEUE_ENCODE_CLIP]: (data) => this.smartInfoService.handleQueueEncodeClip(data),
[JobName.ENCODE_CLIP]: (data) => this.smartInfoService.handleEncodeClip(data),
[JobName.SEARCH_INDEX_ALBUMS]: () => this.searchService.handleIndexAlbums(),
[JobName.SEARCH_INDEX_ASSETS]: () => this.searchService.handleIndexAssets(),
[JobName.SEARCH_INDEX_FACES]: () => this.searchService.handleIndexFaces(),
[JobName.SEARCH_INDEX_ALBUM]: (data) => this.searchService.handleIndexAlbum(data),
[JobName.SEARCH_INDEX_ASSET]: (data) => this.searchService.handleIndexAsset(data),
[JobName.SEARCH_INDEX_FACE]: (data) => this.searchService.handleIndexFace(data),
[JobName.SEARCH_REMOVE_ALBUM]: (data) => this.searchService.handleRemoveAlbum(data),
[JobName.SEARCH_REMOVE_ASSET]: (data) => this.searchService.handleRemoveAsset(data),
[JobName.SEARCH_REMOVE_FACE]: (data) => this.searchService.handleRemoveFace(data),
[JobName.STORAGE_TEMPLATE_MIGRATION]: () => this.storageTemplateService.handleMigration(),
[JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: (data) => this.storageTemplateService.handleMigrationSingle(data),
[JobName.SYSTEM_CONFIG_CHANGE]: () => this.systemConfigService.refreshConfig(),
[JobName.QUEUE_GENERATE_THUMBNAILS]: (data) => this.mediaService.handleQueueGenerateThumbnails(data),
[JobName.GENERATE_JPEG_THUMBNAIL]: (data) => this.mediaService.handleGenerateJpegThumbnail(data),
[JobName.GENERATE_WEBP_THUMBNAIL]: (data) => this.mediaService.handleGenerateWepbThumbnail(data),
[JobName.QUEUE_VIDEO_CONVERSION]: (data) => this.mediaService.handleQueueVideoConversion(data),
[JobName.VIDEO_CONVERSION]: (data) => this.mediaService.handleVideoConversion(data),
[JobName.QUEUE_METADATA_EXTRACTION]: (data) => this.metadataProcessor.handleQueueMetadataExtraction(data),
[JobName.EXIF_EXTRACTION]: (data) => this.metadataProcessor.extractExifInfo(data),
[JobName.EXTRACT_VIDEO_METADATA]: (data) => this.metadataProcessor.extractVideoMetadata(data),
[JobName.QUEUE_RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleQueueRecognizeFaces(data),
[JobName.RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleRecognizeFaces(data),
[JobName.GENERATE_FACE_THUMBNAIL]: (data) => this.facialRecognitionService.handleGenerateFaceThumbnail(data),
[JobName.PERSON_CLEANUP]: () => this.personService.handlePersonCleanup(),
[JobName.QUEUE_SIDECAR]: (data) => this.metadataService.handleQueueSidecar(data),
[JobName.SIDECAR_DISCOVERY]: (data) => this.metadataService.handleSidecarDiscovery(data),
[JobName.SIDECAR_SYNC]: (data) => this.metadataService.handleSidecarSync(data),
};

async init() {
const queueSeen: Partial<Record<QueueName, boolean>> = {};

for (const jobName of Object.values(JobName)) {
const handler = this.handlers[jobName];
const queueName = JOBS_TO_QUEUE[jobName];
const queue = this.moduleRef.get<Queue>(getQueueToken(queueName), { strict: false });

// only set concurrency on the first job for a queue, since concurrency stacks
const seen = queueSeen[queueName];
const concurrency = seen ? 0 : QUEUE_TO_CONCURRENCY[queueName];
queueSeen[queueName] = true;

await queue.isReady();

queue.process(jobName, concurrency, async (job): Promise<void> => {
await handler(job.data);
});
}
}
}
219 changes: 0 additions & 219 deletions server/apps/microservices/src/processors.ts

This file was deleted.

Loading