Skip to content

Commit

Permalink
refactor(server): job handlers (#2572)
Browse files Browse the repository at this point in the history
* refactor(server): job handlers

* chore: remove comment

* chore: add comments for
  • Loading branch information
jrasm91 authored May 26, 2023
1 parent d6756f3 commit 1c2d83e
Show file tree
Hide file tree
Showing 33 changed files with 781 additions and 1,056 deletions.
7 changes: 5 additions & 2 deletions server/apps/immich/src/api-v1/asset/asset.core.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AuthUserDto, IJobRepository, JobName } from '@app/domain';
import { AssetEntity, UserEntity } from '@app/infra/entities';
import { AssetEntity, AssetType, UserEntity } from '@app/infra/entities';
import { IAssetRepository } from './asset-repository';
import { CreateAssetDto, UploadFile } from './dto/create-asset.dto';
import { parse } from 'node:path';
Expand Down Expand Up @@ -43,7 +43,10 @@ export class AssetCore {
sidecarPath: sidecarFile?.originalPath || null,
});

await this.jobRepository.queue({ name: JobName.ASSET_UPLOADED, data: { asset } });
await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: asset.id } });
if (asset.type === AssetType.VIDEO) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.id } });
}

return asset;
}
Expand Down
5 changes: 3 additions & 2 deletions server/apps/immich/src/api-v1/asset/asset.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ describe('AssetService', () => {
});

expect(jobMock.queue.mock.calls).toEqual([
[{ name: JobName.ASSET_UPLOADED, data: { asset: assetEntityStub.livePhotoMotionAsset } }],
[{ name: JobName.ASSET_UPLOADED, data: { asset: assetEntityStub.livePhotoStillAsset } }],
[{ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: assetEntityStub.livePhotoMotionAsset.id } }],
[{ name: JobName.VIDEO_CONVERSION, data: { id: assetEntityStub.livePhotoMotionAsset.id } }],
[{ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: assetEntityStub.livePhotoStillAsset.id } }],
]);
});
});
Expand Down
29 changes: 19 additions & 10 deletions server/apps/microservices/src/processor.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import {
AssetService,
FacialRecognitionService,
IDeleteFilesJob,
JobItem,
JobName,
JobService,
JOBS_TO_QUEUE,
MediaService,
MetadataService,
Expand All @@ -16,12 +18,12 @@ import {
UserService,
} from '@app/domain';
import { getQueueToken } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { Queue } from 'bull';
import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';

type JobHandler<T = any> = (data: T) => void | Promise<void>;
type JobHandler<T = any> = (data: T) => boolean | Promise<boolean>;

@Injectable()
export class ProcessorService {
Expand All @@ -30,8 +32,8 @@ export class ProcessorService {
// TODO refactor to domain
private metadataProcessor: MetadataExtractionProcessor,

private assetService: AssetService,
private facialRecognitionService: FacialRecognitionService,
private jobService: JobService,
private mediaService: MediaService,
private metadataService: MetadataService,
private personService: PersonService,
Expand All @@ -43,9 +45,10 @@ export class ProcessorService {
private userService: UserService,
) {}

private logger = new Logger(ProcessorService.name);

private handlers: Record<JobName, JobHandler> = {
[JobName.ASSET_UPLOADED]: (data) => this.assetService.handleAssetUpload(data),
[JobName.DELETE_FILES]: (data) => this.storageService.handleDeleteFiles(data),
[JobName.DELETE_FILES]: (data: IDeleteFilesJob) => this.storageService.handleDeleteFiles(data),
[JobName.USER_DELETE_CHECK]: () => this.userService.handleUserDeleteCheck(),
[JobName.USER_DELETION]: (data) => this.userService.handleUserDelete(data),
[JobName.QUEUE_OBJECT_TAGGING]: (data) => this.smartInfoService.handleQueueObjectTagging(data),
Expand All @@ -71,15 +74,14 @@ export class ProcessorService {
[JobName.QUEUE_VIDEO_CONVERSION]: (data) => this.mediaService.handleQueueVideoConversion(data),
[JobName.VIDEO_CONVERSION]: (data) => this.mediaService.handleVideoConversion(data),
[JobName.QUEUE_METADATA_EXTRACTION]: (data) => this.metadataProcessor.handleQueueMetadataExtraction(data),
[JobName.EXIF_EXTRACTION]: (data) => this.metadataProcessor.extractExifInfo(data),
[JobName.EXTRACT_VIDEO_METADATA]: (data) => this.metadataProcessor.extractVideoMetadata(data),
[JobName.METADATA_EXTRACTION]: (data) => this.metadataProcessor.handleMetadataExtraction(data),
[JobName.QUEUE_RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleQueueRecognizeFaces(data),
[JobName.RECOGNIZE_FACES]: (data) => this.facialRecognitionService.handleRecognizeFaces(data),
[JobName.GENERATE_FACE_THUMBNAIL]: (data) => this.facialRecognitionService.handleGenerateFaceThumbnail(data),
[JobName.PERSON_CLEANUP]: () => this.personService.handlePersonCleanup(),
[JobName.QUEUE_SIDECAR]: (data) => this.metadataService.handleQueueSidecar(data),
[JobName.SIDECAR_DISCOVERY]: (data) => this.metadataService.handleSidecarDiscovery(data),
[JobName.SIDECAR_SYNC]: (data) => this.metadataService.handleSidecarSync(data),
[JobName.SIDECAR_SYNC]: () => this.metadataService.handleSidecarSync(),
};

async init() {
Expand All @@ -98,7 +100,14 @@ export class ProcessorService {
await queue.isReady();

queue.process(jobName, concurrency, async (job): Promise<void> => {
await handler(job.data);
try {
const success = await handler(job.data);
if (success) {
await this.jobService.onDone({ name: jobName, data: job.data } as JobItem);
}
} catch (error: Error | any) {
this.logger.error(`Unable to run job handler: ${error}`, error?.stack, job.data);
}
});
}
}
Expand Down
Loading

0 comments on commit 1c2d83e

Please sign in to comment.