Skip to content

Commit

Permalink
Merge pull request #954 from funmusicplace/various-fixes
Browse files Browse the repository at this point in the history
feat: queue and job for moving files from minio to backblaze
  • Loading branch information
simonv3 authored Jan 2, 2025
2 parents 8f084f6 + f901c5e commit 2a67d20
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 1 deletion.
1 change: 1 addition & 0 deletions client/src/components/Admin/CallServerTasks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export const Admin: React.FC = () => {
<label>{t("whatTaskCall")}</label>
<SelectEl {...methods.register("jobName")}>
<option value="cleanUpFiles">cleanUpFiles</option>
<option value="moveBucketToBackblaze">moveBucketToBackblaze</option>
<option value="initiateUserNotifications">
initiateUserNotifications
</option>
Expand Down
2 changes: 1 addition & 1 deletion src/jobs/optimize-image.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const { defaultOptions, config: sharpConfig } = tempSharpConfig;

const { SIGHTENGINE_USER, SIGHTENGINE_SECRET } = process.env;

const sleep = (ms: number) =>
export const sleep = (ms: number) =>
new Promise((resolve) => {
return setTimeout(resolve, ms);
});
Expand Down
25 changes: 25 additions & 0 deletions src/jobs/queue-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import sendMail from "./send-mail";
import "../queues/send-mail-queue";

import { REDIS_CONFIG } from "../config/redis";
import {
moveFilesToBackblazeJob,
moveFilesToBackBlazeQueue,
} from "../queues/moving-files-to-backblaze";

export const logger = winston.createLogger({
level: "info",
Expand Down Expand Up @@ -49,6 +53,7 @@ yargs // eslint-disable-line
generateAlbumQueueWorker();
sendMailQueue();
cleanUpFilesQueue();
moveFilesToBackBlazeWorker();
})
.help().argv;

Expand Down Expand Up @@ -145,6 +150,26 @@ export async function cleanUpFilesQueue() {
cleanUpOldFilesJob,
workerOptions
);
logger.info("Move Files From Minio to BackBlaze worker started");

worker.on("completed", (job: any) => {
logger.info("completed:clean-up-old-files");
});

worker.on("failed", (job: any, err: any) => {
logger.error("failed:clean-up-old-files", err);
});

worker.on("error", (err: any) => {
logger.error("error:clean-up-old-files", err);
});
}
export async function moveFilesToBackBlazeWorker() {
const worker = new Worker(
"move-file-to-backblaze",
moveFilesToBackblazeJob,
workerOptions
);
logger.info("Clean Up Files worker started");

worker.on("completed", (job: any) => {
Expand Down
87 changes: 87 additions & 0 deletions src/queues/moving-files-to-backblaze.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { Job, Queue, QueueEvents } from "bullmq";
import { REDIS_CONFIG } from "../config/redis";
import { promises as fsPromises } from "fs";
import logger from "../logger";
import {
getBufferFromMinio,
getObjectList,
minioClient,
removeObjectsFromBucket,
trackGroupFormatBucket,
uploadWrapper,
} from "../utils/minio";
import { sleep } from "../jobs/optimize-image";

const queueOptions = {
prefix: "mirlo",
connection: REDIS_CONFIG,
};

export const moveFilesToBackBlazeQueue = new Queue(
"move-file-to-backblaze",
queueOptions
);

export const moveFilesToBackBlazeQueueEvents = new QueueEvents(
"move-file-to-backblaze",
queueOptions
);

moveFilesToBackBlazeQueueEvents.on(
"completed",
async (result: { jobId: string; returnvalue?: any }) => {
logger.info(
`Job with id ${JSON.stringify(
result.jobId
)} has been completed, ${JSON.stringify(result.returnvalue)}`
);
}
);

moveFilesToBackBlazeQueueEvents.on(
"stalled",
async (result: { jobId: string }) => {
logger.info(`jobId ${result.jobId} stalled: that's a bummer`);
}
);

moveFilesToBackBlazeQueueEvents.on("error", async (error) => {
logger.error(`jobId ${JSON.stringify(error)} had an error`);
});

export const startMovingFiles = async (bucketName: string) => {
const files = await getObjectList(bucketName, "");
files.map(async (file, i) => {
try {
console.log("adding to queue", file.name);
moveFilesToBackBlazeQueue.add(
"move-file-to-backblaze",
{
bucketName,
fileName: file.name,
},
{
delay: i * 1500,
}
);
} catch (e) {
console.error(e);
logger.error("Error adding to queue");
}
});
};

export const moveFilesToBackblazeJob = async (job: Job) => {
const bucketName = job.data.bucketName;
const fileName = job.data.fileName;

logger.info(`moving file: bucket: ${bucketName}, ${fileName}`);
try {
const stream = await minioClient.getObject(bucketName, fileName);
await uploadWrapper(bucketName, fileName, stream);
} catch (e) {
console.error(e);
logger.error("Error moving file");
}
logger.info(`done transfering ${bucketName}/${fileName}`);
};
9 changes: 9 additions & 0 deletions src/routers/v1/admin/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { NextFunction, Request, Response } from "express";
import { userAuthenticated, userHasPermission } from "../../../auth/passport";
import cleanUpFiles from "../../../jobs/tasks/clean-up-files";
import initiateUserNotifcations from "../../../jobs/tasks/initiate-user-notifications";
import { startMovingFiles } from "../../../queues/moving-files-to-backblaze";

export default function () {
const operations = {
Expand All @@ -17,6 +18,14 @@ export default function () {
await cleanUpFiles(jobParam);
result[jobName] = "Success";
}
if (
jobName === "moveBucketToBackblaze" &&
typeof jobParam === "string" &&
["artist-avatars"].includes(jobParam)
) {
await startMovingFiles(jobParam);
result[jobName] = "Success";
}
if (jobName === "initiateUserNotifications") {
await initiateUserNotifcations();
result[jobName] = "Success";
Expand Down

0 comments on commit 2a67d20

Please sign in to comment.