Skip to content

Commit

Permalink
fix: Only update bookmark tagging/crawling status when worker is out …
Browse files Browse the repository at this point in the history
…of retries
  • Loading branch information
MohamedBassem committed Nov 9, 2024
1 parent 10070c1 commit f8bed57
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 19 deletions.
8 changes: 4 additions & 4 deletions apps/workers/crawlerWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,20 @@ export class CrawlerWorker {
/* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
),
onComplete: async (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
const bookmarkId = job?.data.bookmarkId;
const bookmarkId = job.data.bookmarkId;
if (bookmarkId) {
await changeBookmarkStatus(bookmarkId, "success");
}
},
onError: async (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.error(
`[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`,
);
const bookmarkId = job.data?.bookmarkId;
if (bookmarkId) {
if (bookmarkId && job.numRetriesLeft == 0) {
await changeBookmarkStatus(bookmarkId, "failure");
}
},
Expand Down
12 changes: 7 additions & 5 deletions apps/workers/openaiWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ export class OpenAiWorker {
{
run: runOpenAI,
onComplete: async (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.info(`[inference][${jobId}] Completed successfully`);
await attemptMarkTaggingStatus(job?.data, "success");
await attemptMarkTaggingStatus(job.data, "success");
},
onError: async (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.error(
`[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`,
);
await attemptMarkTaggingStatus(job?.data, "failure");
if (job.numRetriesLeft == 0) {
await attemptMarkTaggingStatus(job?.data, "failure");
}
},
},
{
Expand Down Expand Up @@ -387,7 +389,7 @@ async function connectTags(
}

async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
const jobId = job.id ?? "unknown";
const jobId = job.id;

const inferenceClient = InferenceClientFactory.build();
if (!inferenceClient) {
Expand Down
6 changes: 3 additions & 3 deletions apps/workers/searchWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ export class SearchIndexingWorker {
{
run: runSearchIndexing,
onComplete: (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.info(`[search][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.error(
`[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`,
);
Expand Down Expand Up @@ -117,7 +117,7 @@ async function runDelete(
}

async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
const jobId = job.id ?? "unknown";
const jobId = job.id;

const request = zSearchIndexingRequestSchema.safeParse(job.data);
if (!request.success) {
Expand Down
6 changes: 3 additions & 3 deletions apps/workers/tidyAssetsWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ export class TidyAssetsWorker {
{
run: runTidyAssets,
onComplete: (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.info(`[tidyAssets][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.error(
`[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
);
Expand Down Expand Up @@ -86,7 +86,7 @@ async function handleAsset(
}

async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) {
const jobId = job.id ?? "unknown";
const jobId = job.id;

const request = zTidyAssetsRequestSchema.safeParse(job.data);
if (!request.success) {
Expand Down
13 changes: 9 additions & 4 deletions apps/workers/videoWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import {
} from "@hoarder/shared/assetdb";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import { VideoWorkerQueue, ZVideoRequest } from "@hoarder/shared/queues";
import {
VideoWorkerQueue,
ZVideoRequest,
zvideoRequestSchema,
} from "@hoarder/shared/queues";

import { withTimeout } from "./utils";
import { getBookmarkDetails, updateAsset } from "./workerUtils";
Expand All @@ -33,14 +37,14 @@ export class VideoWorker {
/* timeoutSec */ serverConfig.crawler.downloadVideoTimeout,
),
onComplete: async (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.info(
`[VideoCrawler][${jobId}] Video Download Completed successfully`,
);
return Promise.resolve();
},
onError: async (job) => {
const jobId = job?.id ?? "unknown";
const jobId = job.id;
logger.error(
`[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`,
);
Expand All @@ -51,6 +55,7 @@ export class VideoWorker {
pollIntervalMs: 1000,
timeoutSecs: serverConfig.crawler.downloadVideoTimeout,
concurrency: 1,
validator: zvideoRequestSchema,
},
);
}
Expand All @@ -71,7 +76,7 @@ function prepareYtDlpArguments(url: string, assetPath: string) {
}

async function runWorker(job: DequeuedJob<ZVideoRequest>) {
const jobId = job.id ?? "unknown";
const jobId = job.id;
const { bookmarkId } = job.data;

const {
Expand Down

0 comments on commit f8bed57

Please sign in to comment.