From 3c1618c069194beb6c570ec85955e0af696f0dda Mon Sep 17 00:00:00 2001 From: Bret Comnes Date: Tue, 12 Nov 2024 09:47:58 -0800 Subject: [PATCH] WIP WIP WIP # Conflicts: # packages/worker/package.json # packages/worker/workers/bookmark-initializer/index.js Deps WIP Refactor PgClient types WIP --- .github/workflows/deploy.yml | 2 +- eslint.config.js | 1 + package.json | 5 +- .../archives/archive-query-create.js | 0 .../archives/resolve-archive-queue.js | 42 +++ packages/resources/bookmarks/normalize-url.js | 10 + .../bookmarks/resolve-bookmark-queue.js | 54 ++++ .../resources/bullmq/default-job-options.js | 14 + .../episodes/episode-query-create.js | 25 +- .../episodes/resolve-episode-queue.js | 43 ++++ packages/resources/episodes/resolve-type.js | 14 +- .../feeds}/default-feed-query.js | 6 +- packages/resources/package.json | 3 +- packages/resources/tags/put-tags-query.js | 6 +- packages/resources/tsconfig.json | 4 +- packages/resources/types/pg-client.d.ts | 4 + packages/resources/urls/ssrf-check.js | 38 +++ .../web/migrations/019.do.normalized-urls.sql | 4 + .../migrations/019.undo.normalized-urls.sql | 2 + packages/web/package.json | 5 +- packages/web/plugins/bullmq-decorators.d.ts | 16 +- packages/web/plugins/bullmq.js | 57 +++-- packages/web/plugins/flags/index.js | 4 +- packages/web/plugins/pg.js | 5 +- .../routes/api/archives/archive-query-get.js | 4 +- .../routes/api/bookmarks/_id/put-bookmark.js | 5 +- .../api/bookmarks/get-bookmarks-query.js | 4 +- .../web/routes/api/bookmarks/normalizeURL.js | 13 - .../web/routes/api/bookmarks/put-bookmarks.js | 98 ++----- .../web/routes/api/feeds/_feed/autohooks.js | 11 +- .../web/routes/api/feeds/_feed/delete-feed.js | 11 +- .../feeds/_feed/episode/_episode/routes.js | 9 + .../feeds/_feed/episode/placeholder/routes.js | 11 +- .../user/email/resend-account-confirmation.js | 4 +- .../user/email/resend-pending-confirmation.js | 5 +- .../email/verify-email-confirm-handler.js | 6 +- .../user/email/verify-email-update-handler.js | 6 +- packages/web/routes/api/user/user-query.js | 4 +- packages/web/tsconfig.json | 4 +- packages/worker/package.json | 6 +- .../worker/plugins/bullmq-decorators.d.ts | 19 +- packages/worker/plugins/bullmq.js | 63 ++++- packages/worker/tsconfig.json | 4 +- .../extract-archive.js | 0 .../fetch-html.js | 13 +- .../get-site-metadata.js | 0 packages/worker/workers/archives/index.js | 64 +++++ .../resolve-archive.js | 0 .../resolve-bookmark.js | 0 packages/worker/workers/bookmarks/index.js | 239 ++++++++++++++++++ .../worker/workers/bookmarks/is-yt-url.js | 19 ++ packages/worker/workers/bookmarks/ua.js | 27 ++ .../workers/document-processor/index.js | 98 ------- .../worker/workers/episode-worker/index.js | 133 ---------- .../workers/episodes/finaize-episode.js | 86 +++++++ .../workers/episodes/handle-upcoming.js | 30 +++ packages/worker/workers/episodes/index.js | 90 +++++++ .../index.js => subscription-service.ts | 0 58 files changed, 1017 insertions(+), 433 deletions(-) rename packages/{web/routes/api => resources}/archives/archive-query-create.js (100%) create mode 100644 packages/resources/archives/resolve-archive-queue.js create mode 100644 packages/resources/bookmarks/normalize-url.js create mode 100644 packages/resources/bookmarks/resolve-bookmark-queue.js create mode 100644 packages/resources/bullmq/default-job-options.js rename packages/{web/routes/api => resources}/episodes/episode-query-create.js (74%) create mode 100644 packages/resources/episodes/resolve-episode-queue.js rename packages/{web/routes/api/feeds/default-feed => resources/feeds}/default-feed-query.js (88%) create mode 100644 packages/resources/types/pg-client.d.ts create mode 100644 packages/resources/urls/ssrf-check.js create mode 100644 packages/web/migrations/019.do.normalized-urls.sql create mode 100644 packages/web/migrations/019.undo.normalized-urls.sql delete mode 100644 packages/web/routes/api/bookmarks/normalizeURL.js rename packages/worker/workers/{document-processor => archives}/extract-archive.js (100%) rename packages/worker/workers/{document-processor => archives}/fetch-html.js (86%) rename packages/worker/workers/{document-processor => archives}/get-site-metadata.js (100%) create mode 100644 packages/worker/workers/archives/index.js rename packages/worker/workers/{document-processor => archives}/resolve-archive.js (100%) rename packages/worker/workers/{document-processor => archives}/resolve-bookmark.js (100%) create mode 100644 packages/worker/workers/bookmarks/index.js create mode 100644 packages/worker/workers/bookmarks/is-yt-url.js create mode 100644 packages/worker/workers/bookmarks/ua.js delete mode 100644 packages/worker/workers/document-processor/index.js delete mode 100644 packages/worker/workers/episode-worker/index.js create mode 100644 packages/worker/workers/episodes/finaize-episode.js create mode 100644 packages/worker/workers/episodes/handle-upcoming.js create mode 100644 packages/worker/workers/episodes/index.js rename packages/worker/workers/bookmark-initializer/index.js => subscription-service.ts (100%) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 008c6052..15892213 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -13,7 +13,7 @@ on: default: true env: - node_version: 22 + node_version: lts/* redis-version: 7 FORCE_COLOR: 1 NPM_CONFIG_COLOR: always diff --git a/eslint.config.js b/eslint.config.js index 4bc80c7a..f8f49453 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -3,5 +3,6 @@ import neostandard, { resolveIgnoresFromGitignore } from 'neostandard' // Used for editors and canary testing export default neostandard({ + ts: true, ignores: resolveIgnoresFromGitignore(), }) diff --git a/package.json b/package.json index 4a36c618..3846c2bd 100644 --- a/package.json +++ b/package.json @@ -16,9 +16,10 @@ "custompatch": "^1.0.28", "dependency-cruiser": "^16.3.2", "gh-release": "^7.0.0", + "knip": "^5.37.1", "neostandard": "^0.12.0", "npm-run-all2": "^7.0.1", - "typescript": "~5.6.2" + "typescript": "~5.7.2" }, "funding": { "type": "individual", @@ -33,7 +34,7 @@ "url": "https://github.com/hifiwi-fi/breadcrum.net.git" }, "scripts": { - "clean": "rm -rf node_modules && rm -rf ./packages/*/node_modules && rm package-lock.json", + "clean": "rm -rf node_modules && rm -rf ./packages/*/node_modules && rm -f package-lock.json", "prepublishOnly": "git push --follow-tags && gh-release -y", "postinstall": "custompatch", "release": "git push --follow-tags && gh-release -y", diff --git a/packages/web/routes/api/archives/archive-query-create.js b/packages/resources/archives/archive-query-create.js similarity index 100% rename from packages/web/routes/api/archives/archive-query-create.js rename to packages/resources/archives/archive-query-create.js diff --git a/packages/resources/archives/resolve-archive-queue.js b/packages/resources/archives/resolve-archive-queue.js new file mode 100644 index 00000000..b9f9a48f --- /dev/null +++ b/packages/resources/archives/resolve-archive-queue.js @@ -0,0 +1,42 @@ +/** + * @import { Queue, Worker, Processor } from 'bullmq' + */ + +/** + * Data relating to resolve document jobs + * + * @typedef {{ + * url: string + * userId: string + * archive: boolean + * archiveId: string + * archiveURL: string + * }} ResolveArchiveData + */ + +export const resolveArchiveQName = 'resolveArchive' +export const resolveArchiveJobName = 'resolve-archive' + +/** + * @typedef {Queue< + * ResolveArchiveData, + * null, + * typeof resolveArchiveJobName + * >} ResolveArchiveQ + */ + +/** + * @typedef {Worker< + * ResolveArchiveData, + * null, + * typeof resolveArchiveJobName + * >} ResolveArchiveW + */ + +/** + * @typedef {Processor< + * ResolveArchiveData, + * null, + * typeof resolveArchiveJobName + * >} ResolveArchiveP + */ diff --git a/packages/resources/bookmarks/normalize-url.js b/packages/resources/bookmarks/normalize-url.js new file mode 100644 index 00000000..7ce7867f --- /dev/null +++ b/packages/resources/bookmarks/normalize-url.js @@ -0,0 +1,10 @@ +/** + * Normalizes a URL object by modifying its host property if necessary. + * + * @param {URL} url - The URL string to be normalized. + * @returns {Promise} An object containing the normalized URL string. + */ +export async function normalizeURL (url) { + if (url.host === 'm.youtube.com') url.host = 'www.youtube.com' + return url +} diff --git a/packages/resources/bookmarks/resolve-bookmark-queue.js b/packages/resources/bookmarks/resolve-bookmark-queue.js new file mode 100644 index 00000000..fe77dccc --- /dev/null +++ b/packages/resources/bookmarks/resolve-bookmark-queue.js @@ -0,0 +1,54 @@ +/** + * @import { Queue, Worker, Processor } from 'bullmq' + */ + +/** + * User-provided metadata for a bookmark. + * + * @typedef {{ + * title: string + * tags: string[] + * summary: string + * }} UserProvidedMeta + */ + +/** + * Data required to initialize a bookmark. + * + * @typedef {{ + * userId: string + * bookmarkId: string + * url: string + * resolveBookmark: boolean + * resolveArchive: boolean + * resolveEpisode: boolean + * userProvidedMeta: UserProvidedMeta + * }} ResolveBookmarkData + */ + +export const resolveBookmarkQName = 'resolveBookmark' +export const resolveBookmarkJobName = 'resolve-bookmark' + +/** + * @typedef {Queue< + * ResolveBookmarkData, + * null, + * typeof resolveBookmarkJobName + * >} ResolveBookmarkQ + */ + +/** + * @typedef {Worker< + * ResolveBookmarkData, + * null, + * typeof resolveBookmarkJobName + * >} ResolveBookmarkW + */ + +/** + * @typedef {Processor< + * ResolveBookmarkData, + * null, + * typeof resolveBookmarkJobName + * >} ResolveBookmarkP + */ diff --git a/packages/resources/bullmq/default-job-options.js b/packages/resources/bullmq/default-job-options.js new file mode 100644 index 00000000..acbcb2ef --- /dev/null +++ b/packages/resources/bullmq/default-job-options.js @@ -0,0 +1,14 @@ +/** + * @import { JobsOptions } from 'bullmq' + */ + +/** @type {JobsOptions} */ +export const defaultJobOptions = { + removeOnComplete: { + age: 3600, // keep up to 1 hour + count: 1000, // keep up to 1000 jobs + }, + removeOnFail: { + age: 24 * 3600, // keep up to 24 hours + } +} diff --git a/packages/web/routes/api/episodes/episode-query-create.js b/packages/resources/episodes/episode-query-create.js similarity index 74% rename from packages/web/routes/api/episodes/episode-query-create.js rename to packages/resources/episodes/episode-query-create.js index 68a6ee5e..68822ccd 100644 --- a/packages/web/routes/api/episodes/episode-query-create.js +++ b/packages/resources/episodes/episode-query-create.js @@ -1,10 +1,19 @@ /** - * @import { FastifyInstance } from 'fastify' - * @import { PoolClient } from 'pg' + * @import { PgClient } from '../types/pg-client.js' */ import SQL from '@nearform/sql' -import { getOrCreateDefaultFeed } from '../feeds/default-feed/default-feed-query.js' +import { getOrCreateDefaultFeed } from '../feeds/default-feed-query.js' + +/** + * @typedef {{ + * id: string, + * type: string, + * medium: string, + * podcast_feed_id: string, + * url: string + * }} CreatedEpisode + */ /** * Creates an episode entry in the database. @@ -13,19 +22,13 @@ import { getOrCreateDefaultFeed } from '../feeds/default-feed/default-feed-query * into the `episodes` table with the provided details and returns the newly created episode object. * * @param {object} params - The parameters for creating an episode. - * @param {PoolClient | FastifyInstance['pg']} params.client - The database client for executing queries, an instance of a pg connection from `fastify.pg` or `node-pg`. + * @param {PgClient} params.client - The database client for executing queries, an instance of a pg connection from `fastify.pg` or `node-pg`. * @param {string} params.userId - The ID of the user who owns the episode. * @param {string} params.bookmarkId - The ID of the bookmark associated with the episode. * @param {string} params.type - The type of the episode. * @param {string} params.medium - The medium of the episode (e.g., audio, video). * @param {string} params.url - The URL of the episode. - * @returns {Promise<{ - * id: string, - * type: string, - * medium: string, - * podcast_feed_id: string, - * url: string - * }>} A promise that resolves to the newly created episode object, including its ID, type, medium, podcast feed ID, and URL. + * @returns {Promise} A promise that resolves to the newly created episode object, including its ID, type, medium, podcast feed ID, and URL. */ export async function createEpisode ({ client, diff --git a/packages/resources/episodes/resolve-episode-queue.js b/packages/resources/episodes/resolve-episode-queue.js new file mode 100644 index 00000000..addca3da --- /dev/null +++ b/packages/resources/episodes/resolve-episode-queue.js @@ -0,0 +1,43 @@ +/** + * @import { Queue, Worker, Processor } from 'bullmq' + * @import { MediumTypes } from './yt-dlp-api-client.js' + */ + +/** + * Data required to resolve an episode. + * + * @typedef {{ + * userId: string + * bookmarkTitle: string + * episodeId: string + * url: string + * medium: MediumTypes + * }} ResolveEpisodeData + */ + +export const resolveEpisodeQName = 'resolveEpisode' +export const resolveEpisodeJobName = 'resolve-episode' + +/** + * @typedef {Queue< + * ResolveEpisodeData, + * null, + * typeof resolveEpisodeJobName + * >} ResolveEpisodeQ + */ + +/** + * @typedef {Worker< + * ResolveEpisodeData, + * null, + * typeof resolveEpisodeJobName + * >} ResolveEpisodeW + */ + +/** + * @typedef {Processor< + * ResolveEpisodeData, + * null, + * typeof resolveEpisodeJobName + * >} ResolveEpisodeP + */ diff --git a/packages/resources/episodes/resolve-type.js b/packages/resources/episodes/resolve-type.js index 77d7e9a7..2b45b754 100644 --- a/packages/resources/episodes/resolve-type.js +++ b/packages/resources/episodes/resolve-type.js @@ -1,15 +1,15 @@ /** - * @param {object} metadata - * @param {string} [metadata.ext] - * @param {string} [metadata._type] + * @param {object} media + * @param {string} [media.ext] + * @param {string} [media._type] * @return {'audio' | 'video' | string | null | undefined } */ -export function resolveType (metadata) { +export function resolveType (media) { return ( - ['mp3', 'm4a'].includes(metadata.ext ?? '') + ['mp3', 'm4a'].includes(media.ext ?? '') ? 'audio' - : ['mp4', 'mov', 'm3u8'].includes(metadata.ext ?? '') + : ['mp4', 'mov', 'm3u8'].includes(media.ext ?? '') ? 'video' - : metadata._type + : media._type ) } diff --git a/packages/web/routes/api/feeds/default-feed/default-feed-query.js b/packages/resources/feeds/default-feed-query.js similarity index 88% rename from packages/web/routes/api/feeds/default-feed/default-feed-query.js rename to packages/resources/feeds/default-feed-query.js index 26886141..46718a2a 100644 --- a/packages/web/routes/api/feeds/default-feed/default-feed-query.js +++ b/packages/resources/feeds/default-feed-query.js @@ -1,7 +1,7 @@ /* eslint-disable camelcase */ + /** - * @import { FastifyInstance } from 'fastify' - * @import { PoolClient } from 'pg' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ import SQL from '@nearform/sql' @@ -17,7 +17,7 @@ import SQL from '@nearform/sql' * within a database transaction to ensure data consistency. * * @param {Object} params - The function parameters. - * @param {PoolClient | FastifyInstance['pg']} params.client - The database client for executing queries, an instance of a pg connection from `fastify.pg` or `node-pg`. + * @param {PgClient} params.client - The database client for executing queries, an instance of a pg connection from `fastify.pg` or `node-pg`. * @param {string} params.userId - The ID of the user for whom to retrieve or create the default podcast feed. * @returns {Promise} The ID of the default podcast feed for the given user. * @throws {Error} Throws an error if the database transaction fails. diff --git a/packages/resources/package.json b/packages/resources/package.json index 7a5bf47c..e386e63c 100644 --- a/packages/resources/package.json +++ b/packages/resources/package.json @@ -11,6 +11,7 @@ }, "dependencies": { "@nearform/sql": "^1.10.5", + "bullmq": "^5.1.12", "undici": "^7.0.0" }, "devDependencies": { @@ -19,7 +20,7 @@ "npm-run-all2": "^7.0.1", "neostandard": "^0.12.0", "tap": "^21.0.0", - "typescript": "~5.6.2" + "typescript": "~5.7.2" }, "tap": { "serial": [], diff --git a/packages/resources/tags/put-tags-query.js b/packages/resources/tags/put-tags-query.js index d5eed498..5fe7f2b2 100644 --- a/packages/resources/tags/put-tags-query.js +++ b/packages/resources/tags/put-tags-query.js @@ -2,8 +2,7 @@ import SQL from '@nearform/sql' /** * @import { FastifyInstance } from 'fastify' - * @import { PoolClient } from 'pg' - * @import { PostgresDb } from '@fastify/postgres' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ /** @@ -13,7 +12,7 @@ import SQL from '@nearform/sql' * @exports * @param {Object} params - Parameters to shape the query. * @param {FastifyInstance} params.fastify - Fastify instance, used for logging and other utilities. - * @param {PoolClient | PostgresDb?} [params.pg] - PostgreSQL connection or transaction client for executing the query. + * @param {PgClient?} [params.pg] - PostgreSQL connection or transaction client for executing the query. * @param {string} params.userId - UserID of the owner * @param {string} params.bookmarkId - The Bookmark ID to add tags to * @param {Array} params.tags- List of tags to associate with the bookmark. @@ -25,7 +24,6 @@ export async function putTagsQuery ({ bookmarkId, tags, }) { - // @ts-ignore pg = pg ?? fastify.pg if (!pg) throw new Error('A postgres client is required') diff --git a/packages/resources/tsconfig.json b/packages/resources/tsconfig.json index ab2f5504..0f6e00ad 100644 --- a/packages/resources/tsconfig.json +++ b/packages/resources/tsconfig.json @@ -2,9 +2,7 @@ "extends": "@voxpelli/tsconfig/node20.json", "compilerOptions": { "noEmit": true, - "skipLibCheck": true, - "composite": true, - "incremental": true, + "skipLibCheck": false }, "include": [ "**/*" diff --git a/packages/resources/types/pg-client.d.ts b/packages/resources/types/pg-client.d.ts new file mode 100644 index 00000000..191384d3 --- /dev/null +++ b/packages/resources/types/pg-client.d.ts @@ -0,0 +1,4 @@ +import { PoolClient } from 'pg' +import { PostgresDb } from '@fastify/postgres' + +export type PgClient = PoolClient | PostgresDb diff --git a/packages/resources/urls/ssrf-check.js b/packages/resources/urls/ssrf-check.js new file mode 100644 index 00000000..bff28624 --- /dev/null +++ b/packages/resources/urls/ssrf-check.js @@ -0,0 +1,38 @@ +/** + * @param {Url} url A URL object to check + * @return {[type]} [description] + */ +export async function ssrfCheck (url) { + try { + // Validate protocol + if (url.protocol !== 'https:' && url.protocol !== 'http:') { + return { + ssrf: false + } + } + + // Block cloud metadata endpoints + const blockedHostnames = ['169.254.169.254', 'metadata.google.internal'] + if (blockedHostnames.includes(url.hostname)) { + return false // Block specific metadata endpoints + } + + // Resolve hostname to IP addresses + const addresses = await dns.lookup(url.hostname, { all: true }) + + // Validate each resolved IP + for (const { address } of addresses) { + if (isBlockedIP(address)) { + return false // Blocked IP detected + } + } + + // Additional DNS rebinding check + await dnsRebindingCheck(url.hostname) + + return true // Passed all checks, URL is safe + } catch (err) { + console.error('Error validating URL:', err.message) + return false // Fail closed on error + } +} diff --git a/packages/web/migrations/019.do.normalized-urls.sql b/packages/web/migrations/019.do.normalized-urls.sql new file mode 100644 index 00000000..25a92474 --- /dev/null +++ b/packages/web/migrations/019.do.normalized-urls.sql @@ -0,0 +1,4 @@ +alter table bookmarks + add column original_url text; + +comment on column bookmarks.original_url is 'The original, unnormalized URL of the bookmark.'; diff --git a/packages/web/migrations/019.undo.normalized-urls.sql b/packages/web/migrations/019.undo.normalized-urls.sql new file mode 100644 index 00000000..7f402d63 --- /dev/null +++ b/packages/web/migrations/019.undo.normalized-urls.sql @@ -0,0 +1,2 @@ +alter table bookmarks + drop column if exists original_url; diff --git a/packages/web/package.json b/packages/web/package.json index a3534b7b..a5328f30 100644 --- a/packages/web/package.json +++ b/packages/web/package.json @@ -48,8 +48,7 @@ "pg": "^8.6.0", "postgrator-cli": "^9.0.0", "resolve-email": "3.0.10", - "undici": "^7.0.0", - "webassert": "^3.0.2" + "undici": "^7.0.0" }, "optionalDependencies": { "sodium-native": "^4.0.0" @@ -78,7 +77,7 @@ "tap": "^21.0.0", "top-bun": "^10.0.0", "type-fest": "^4.26.1", - "typescript": "~5.6.2", + "typescript": "~5.7.2", "uland-isomorphic": "^2.0.0", "xmlbuilder": "^15.1.1" }, diff --git a/packages/web/plugins/bullmq-decorators.d.ts b/packages/web/plugins/bullmq-decorators.d.ts index 7ee20fdf..ace4d321 100644 --- a/packages/web/plugins/bullmq-decorators.d.ts +++ b/packages/web/plugins/bullmq-decorators.d.ts @@ -1,11 +1,21 @@ -import type { FastifyRequest } from 'fastify'; +import type { FastifyRequest } from 'fastify' import type { Queue } from 'bullmq' +import type { + ResolveEpisodeQ, +} from '@breadcrum/resources/episodes/resolve-episode-queue.js' +import type { + ResolveArchiveQ, +} from '@breadcrum/resources/archives/resolve-archive-queue.js' +import type { + ResolveBookmarkQ, +} from '@breadcrum/resources/bookmarks/resolve-bookmark-queue.js' declare module 'fastify' { interface FastifyInstance { queues: { - resolveEpisodeQ: Queue - resolveDocumentQ: Queue + resolveEpisodeQ: ResolveEpisodeQ + resolveArchiveQ: ResolveArchiveQ + resolveBookmarkQ: ResolveBookmarkQ } } } diff --git a/packages/web/plugins/bullmq.js b/packages/web/plugins/bullmq.js index e29450d6..61049961 100644 --- a/packages/web/plugins/bullmq.js +++ b/packages/web/plugins/bullmq.js @@ -1,35 +1,53 @@ +/** + * @import { ResolveEpisodeQ } from '@breadcrum/resources/episodes/resolve-episode-queue.js' + * @import { ResolveArchiveQ } from '@breadcrum/resources/archives/resolve-archive-queue.js' + * @import { ResolveBookmarkQ } from '@breadcrum/resources/bookmarks/resolve-bookmark-queue.js' + */ import fp from 'fastify-plugin' import { Queue } from 'bullmq' +import { resolveEpisodeQName } from '@breadcrum/resources/episodes/resolve-episode-queue.js' +import { resolveArchiveQName } from '@breadcrum/resources/archives/resolve-archive-queue.js' +import { resolveBookmarkQName } from '@breadcrum/resources/bookmarks/resolve-bookmark-queue.js' +import { defaultJobOptions } from '@breadcrum/resources/bullmq/default-job-options.js' + /** * This plugins adds bullMQ queues */ export default fp(async function (fastify, _) { - const defaultJobOptions = { - removeOnComplete: { - age: 3600, // keep up to 1 hour - count: 1000, // keep up to 1000 jobs - }, - removeOnFail: { - age: 24 * 3600, // keep up to 24 hours - }, - } - const queueRedis = fastify.redis['bullmq'] if (queueRedis !== undefined) { - const resolveEpisodeQ = new Queue('resolveEpisode', { - connection: queueRedis, - defaultJobOptions, - }) + /** @type {ResolveEpisodeQ} */ + const resolveEpisodeQ = new Queue( + resolveEpisodeQName, + { + connection: queueRedis, + defaultJobOptions, + } + ) + + /** @type {ResolveArchiveQ} */ + const resolveArchiveQ = new Queue( + resolveArchiveQName, + { + connection: queueRedis, + defaultJobOptions, + } + ) - const resolveDocumentQ = new Queue('resolveDocument', { - connection: queueRedis, - defaultJobOptions, - }) + /** @type {ResolveBookmarkQ} */ + const resolveBookmarkQ = new Queue( + resolveBookmarkQName, + { + connection: queueRedis, + defaultJobOptions, + } + ) const queues = { resolveEpisodeQ, - resolveDocumentQ, + resolveArchiveQ, + resolveBookmarkQ } fastify.decorate('queues', queues) @@ -39,4 +57,5 @@ export default fp(async function (fastify, _) { }, { dependencies: ['env', 'redis'], + name: 'bullmq', }) diff --git a/packages/web/plugins/flags/index.js b/packages/web/plugins/flags/index.js index 89d844d0..c3eab3f9 100644 --- a/packages/web/plugins/flags/index.js +++ b/packages/web/plugins/flags/index.js @@ -4,7 +4,7 @@ import { defaultFrontendFlags } from './frontend-flags.js' import { defaultBackendFlags } from './backend-flags.js' /** - * @import { FastifyInstance } from 'fastify' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ /** @@ -16,7 +16,7 @@ export default fp(async function (fastify, _) { * Retrieves the feature flags. * * @param {Object} options - The options for retrieving flags. - * @param {FastifyInstance['pg']} [options.pgClient] - The PostgreSQL client instance. + * @param {PgClient} [options.pgClient] - The PostgreSQL client instance. * @param {boolean} [options.frontend=true] - Whether to retrieve frontend flags. * @param {boolean} [options.backend=true] - Whether to retrieve backend flags. * @returns {Promise} The retrieved flag set. diff --git a/packages/web/plugins/pg.js b/packages/web/plugins/pg.js index a522384c..6f75cb41 100644 --- a/packages/web/plugins/pg.js +++ b/packages/web/plugins/pg.js @@ -1,12 +1,11 @@ import fp from 'fastify-plugin' /** - * @import { FastifyInstance } from 'fastify' - * @import { PoolClient } from 'pg' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ /** - * @typedef {PoolClient | FastifyInstance['pg']} PgClientAlias + * @typedef {PgClient} PgClientAlias */ /** diff --git a/packages/web/routes/api/archives/archive-query-get.js b/packages/web/routes/api/archives/archive-query-get.js index adb66fd2..67277c50 100644 --- a/packages/web/routes/api/archives/archive-query-get.js +++ b/packages/web/routes/api/archives/archive-query-get.js @@ -1,7 +1,7 @@ /** * @import { FastifyInstance } from 'fastify' - * @import { PoolClient } from 'pg' * @import { TypeArchiveRead } from './schemas/schema-archive-read.js' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ import SQL from '@nearform/sql' @@ -9,7 +9,7 @@ import SQL from '@nearform/sql' /** * @typedef {ArchiveQueryParams & { * fastify: FastifyInstance, - * pg?: PoolClient | FastifyInstance['pg'] + * pg?: PgClient * }} GetArchivesParams */ diff --git a/packages/web/routes/api/bookmarks/_id/put-bookmark.js b/packages/web/routes/api/bookmarks/_id/put-bookmark.js index e0f2f05d..bda4b190 100644 --- a/packages/web/routes/api/bookmarks/_id/put-bookmark.js +++ b/packages/web/routes/api/bookmarks/_id/put-bookmark.js @@ -1,6 +1,7 @@ import SQL from '@nearform/sql' -import { createEpisode } from '../../episodes/episode-query-create.js' -import { createArchive } from '../../archives/archive-query-create.js' +import { createEpisode } from '@breadcrum/resources/episodes/episode-query-create.js' + +import { createArchive } from '@breadcrum/resources/archives/archive-query-create.js' import { getBookmark } from '../get-bookmarks-query.js' /** diff --git a/packages/web/routes/api/bookmarks/get-bookmarks-query.js b/packages/web/routes/api/bookmarks/get-bookmarks-query.js index 921123c7..a7f0b167 100644 --- a/packages/web/routes/api/bookmarks/get-bookmarks-query.js +++ b/packages/web/routes/api/bookmarks/get-bookmarks-query.js @@ -1,7 +1,7 @@ /** * @import { FastifyInstance } from 'fastify' - * @import { PoolClient } from 'pg' * @import { TypeBookmarkRead } from './schemas/schema-bookmark-read.js' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ import SQL from '@nearform/sql' @@ -9,7 +9,7 @@ import SQL from '@nearform/sql' /** * @typedef {GetBookmarksQueryParams & { * fastify: FastifyInstance, - * pg?: PoolClient | FastifyInstance['pg'] + * pg?: PgClient * }} GetBookmarksParams */ diff --git a/packages/web/routes/api/bookmarks/normalizeURL.js b/packages/web/routes/api/bookmarks/normalizeURL.js deleted file mode 100644 index b381aa51..00000000 --- a/packages/web/routes/api/bookmarks/normalizeURL.js +++ /dev/null @@ -1,13 +0,0 @@ -// TODO: make this normalization way more robust and use it everywhere. -/** - * Normalizes a URL object by modifying its host property if necessary. - * - * @param {URL} urlObj - The URL object to be normalized. - * @returns {Promise<{normalizedURL: string}>} An object containing the normalized URL string. - */ -export async function normalizeURL (urlObj) { - if (urlObj.host === 'm.youtube.com') urlObj.host = 'www.youtube.com' - return { - normalizedURL: urlObj.toString(), - } -} diff --git a/packages/web/routes/api/bookmarks/put-bookmarks.js b/packages/web/routes/api/bookmarks/put-bookmarks.js index d61aa405..fb9d168a 100644 --- a/packages/web/routes/api/bookmarks/put-bookmarks.js +++ b/packages/web/routes/api/bookmarks/put-bookmarks.js @@ -5,11 +5,9 @@ * @import { SchemaBookmarkRead } from './schemas/schema-bookmark-read.js' */ import { oneLineTrim } from 'common-tags' -import { createEpisode } from '../episodes/episode-query-create.js' -import { createArchive } from '../archives/archive-query-create.js' import { getBookmark } from './get-bookmarks-query.js' -import { normalizeURL } from './normalizeURL.js' import { createBookmark } from './put-bookmark-query.js' +import { normalizeURL } from '@breadcrum/resources/bookmarks/normalize-url.js' /** * @type {FastifyPluginAsyncJsonSchemaToTs<{ @@ -95,8 +93,8 @@ export async function putBookmarks (fastify, _opts) { }, async function createBookmarkHandler (request, reply) { return fastify.pg.transact(async client => { - console.log('RUNNNING') const userId = request.user.id + const { note, toread, @@ -105,8 +103,9 @@ export async function putBookmarks (fastify, _opts) { archive_urls = [], summary, } = request.body - let { url, title } = request.body - const urlObj = new URL(url) + + const submittedUrl = new URL(request.body.url) + const submittedTitle = request.body.title const { update, @@ -116,27 +115,24 @@ export async function putBookmarks (fastify, _opts) { normalize, } = request.query - if (normalize) { // This will be the one possibly slow step - const { normalizedURL } = await normalizeURL(urlObj) - url = normalizedURL - } + // This will be the one possibly slow step + // This needs to happen on create for de-dupe behavior + const workingUrl = normalize ? await normalizeURL(submittedUrl) : submittedUrl const maybeResult = await getBookmark({ fastify, pg: client, ownerId: userId, - url, + url: workingUrl.toString(), sensitive: true, perPage: 1, }) if (maybeResult) { if (update) { - console.log({ update }) reply.redirect(`/api/bookmarks/${maybeResult.id}`, 308) return } else { - console.log({ update }) reply.status(200) return { status: 'nochange', @@ -147,12 +143,13 @@ export async function putBookmarks (fastify, _opts) { } // Title will fallback to just being the URL on create - title = title ?? url + // TODO: handle this more explicitely + const title = submittedTitle ?? workingUrl.toString() const bookmark = await createBookmark({ fastify, pg: client, - url, + url: workingUrl.toString(), title, note, toread, @@ -164,69 +161,24 @@ export async function putBookmarks (fastify, _opts) { tags }) - let episodeId, episodeMedium, episodeURL - if (episode) { - // TODO: ensure handling of createEpisode url is correct - const episodeEntity = await createEpisode({ - client, - userId, - bookmarkId: bookmark.id, - type: request?.body?.createEpisode?.type ?? 'redirect', - medium: request?.body?.createEpisode?.medium ?? 'video', - url: request?.body?.createEpisode?.url ?? url, - }) - episodeId = episodeEntity.id - episodeMedium = episodeEntity.medium - episodeURL = episodeEntity.url - - await fastify.queues.resolveEpisodeQ.add( - 'resolve-episode', - { - userId, - bookmarkTitle: bookmark.title, - episodeId, - url: episodeURL, - medium: episodeMedium, - } - ) - } - - let archiveId, archiveURL - if (archive) { - // TODO: ensure handling of createArchive url is correct - const archiveEntity = await createArchive({ - client, - userId, - bookmarkId: bookmark.id, - bookmarkTitle: title ?? null, - url: request?.body?.createArchive?.url ?? url, - extractionMethod: 'server', - }) - - archiveId = archiveEntity.id - archiveURL = archiveEntity.url - } - - // Commit bookmark, tags, archive and episode in their incomplete state + // Commit bookmark create for background job lookup await client.query('commit') - fastify.prom.episodeCounter.inc() - fastify.prom.archiveCounter.inc() fastify.prom.bookmarkCreatedCounter.inc() - if (archive || meta) { - await fastify.queues.resolveDocumentQ.add( - 'resolve-document', + if (meta || episode || archive) { + await fastify.queues.initializeBookmarkQ.add( + 'initialize-bookmark', { - url, userId, - resolveMeta: meta, - archive, - title, - tags, - summary, bookmarkId: bookmark.id, - archiveId, - archiveURL, + resolveBookmark: meta, + resolveEpisode: episode, + resolveArchive: archive, + userProvidedMeta: { + title, + tags, + summary, + } } ) } @@ -235,7 +187,7 @@ export async function putBookmarks (fastify, _opts) { const createdBookmark = await getBookmark({ fastify, ownerId: userId, - url, + bookmarkId: bookmark.id, sensitive: true, perPage: 1, }) diff --git a/packages/web/routes/api/feeds/_feed/autohooks.js b/packages/web/routes/api/feeds/_feed/autohooks.js index ec3fb0f5..3e1fc339 100644 --- a/packages/web/routes/api/feeds/_feed/autohooks.js +++ b/packages/web/routes/api/feeds/_feed/autohooks.js @@ -1,6 +1,15 @@ import SQL from '@nearform/sql' -export default async function (fastify, opts) { +/** + * @import { FastifyPluginAsyncJsonSchemaToTs } from '@bret/type-provider-json-schema-to-ts' + */ + +/** + * + * @type {FastifyPluginAsyncJsonSchemaToTs} + * @returns {Promise} + */ +export default async function (fastify, _opts) { // Add basic auth for feed and feed episode routes fastify.register(import('@fastify/basic-auth'), { validate, diff --git a/packages/web/routes/api/feeds/_feed/delete-feed.js b/packages/web/routes/api/feeds/_feed/delete-feed.js index 3a95adfb..814a8b85 100644 --- a/packages/web/routes/api/feeds/_feed/delete-feed.js +++ b/packages/web/routes/api/feeds/_feed/delete-feed.js @@ -1,6 +1,15 @@ import SQL from '@nearform/sql' -export async function deleteFeed (fastify, opts) { +/** + * @import { FastifyPluginAsyncJsonSchemaToTs } from '@bret/type-provider-json-schema-to-ts' + */ + +/** + * + * @type {FastifyPluginAsyncJsonSchemaToTs} + * @returns {Promise} + */ +export async function deleteFeed (fastify, _opts) { const podcastFeedDeleteCounter = new fastify.metrics.client.Counter({ name: 'breadcrum_podcast_feed_delete_total', help: 'The number of times podcast feeds are deleted', diff --git a/packages/web/routes/api/feeds/_feed/episode/_episode/routes.js b/packages/web/routes/api/feeds/_feed/episode/_episode/routes.js index 8037a5bf..ccafeb04 100644 --- a/packages/web/routes/api/feeds/_feed/episode/_episode/routes.js +++ b/packages/web/routes/api/feeds/_feed/episode/_episode/routes.js @@ -1,5 +1,14 @@ import SQL from '@nearform/sql' +/** + * @import { FastifyPluginAsyncJsonSchemaToTs } from '@bret/type-provider-json-schema-to-ts' + */ + +/** + * + * @type {FastifyPluginAsyncJsonSchemaToTs} + * @returns {Promise} + */ export default async function podcastFeedsRoutes (fastify, opts) { fastify.get( '/', diff --git a/packages/web/routes/api/feeds/_feed/episode/placeholder/routes.js b/packages/web/routes/api/feeds/_feed/episode/placeholder/routes.js index e01c516e..5d662875 100644 --- a/packages/web/routes/api/feeds/_feed/episode/placeholder/routes.js +++ b/packages/web/routes/api/feeds/_feed/episode/placeholder/routes.js @@ -1,4 +1,13 @@ -export default async function podcastFeedsRoutes (fastify, opts) { +/** + * @import { FastifyPluginAsyncJsonSchemaToTs } from '@bret/type-provider-json-schema-to-ts' + */ + +/** + * + * @type {FastifyPluginAsyncJsonSchemaToTs} + * @returns {Promise} + */ +export default async function podcastFeedsRoutes (fastify, _opts) { fastify.get( '/', { diff --git a/packages/web/routes/api/user/email/resend-account-confirmation.js b/packages/web/routes/api/user/email/resend-account-confirmation.js index f8f1a928..cdae699c 100644 --- a/packages/web/routes/api/user/email/resend-account-confirmation.js +++ b/packages/web/routes/api/user/email/resend-account-confirmation.js @@ -1,6 +1,6 @@ /** * @import { FastifyInstance, FastifyReply } from 'fastify' - * @import { PoolClient } from 'pg' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ import SQL from '@nearform/sql' import { EMAIL_CONFIRM_TOKEN, EMAIL_CONFIRM_TOKEN_EXP } from './email-confirm-tokens.js' @@ -8,7 +8,7 @@ import { EMAIL_CONFIRM_TOKEN, EMAIL_CONFIRM_TOKEN_EXP } from './email-confirm-to /** * @param {object} params * @param {string} params.userId - * @param {PoolClient | FastifyInstance['pg']} params.client + * @param {PgClient} params.client * @param {FastifyReply} params.reply [description] * @param {FastifyInstance} params.fastify */ diff --git a/packages/web/routes/api/user/email/resend-pending-confirmation.js b/packages/web/routes/api/user/email/resend-pending-confirmation.js index 64f97228..43633d4f 100644 --- a/packages/web/routes/api/user/email/resend-pending-confirmation.js +++ b/packages/web/routes/api/user/email/resend-pending-confirmation.js @@ -1,7 +1,8 @@ /** * @import { FastifyInstance, FastifyReply } from 'fastify' - * @import { PoolClient } from 'pg' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ + import SQL from '@nearform/sql' import { EMAIL_CONFIRM_TOKEN, EMAIL_CONFIRM_TOKEN_EXP } from './email-confirm-tokens.js' import { verifyEmailSubject, verifyEmailUpdateBody } from './post-email.js' @@ -9,7 +10,7 @@ import { verifyEmailSubject, verifyEmailUpdateBody } from './post-email.js' /** * @param {object} params * @param {string} params.userId - * @param {PoolClient | FastifyInstance['pg']} params.client + * @param {PgClient} params.client * @param {FastifyReply} params.reply [description] * @param {FastifyInstance} params.fastify */ diff --git a/packages/web/routes/api/user/email/verify-email-confirm-handler.js b/packages/web/routes/api/user/email/verify-email-confirm-handler.js index fdd11eb3..47efd53f 100644 --- a/packages/web/routes/api/user/email/verify-email-confirm-handler.js +++ b/packages/web/routes/api/user/email/verify-email-confirm-handler.js @@ -1,13 +1,13 @@ /** - * @import { FastifyInstance, FastifyReply } from 'fastify' - * @import { PoolClient } from 'pg' + * @import { FastifyReply } from 'fastify' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ import SQL from '@nearform/sql' /** * @param {object} params * @param {string} params.userId - * @param {PoolClient | FastifyInstance['pg']} params.client + * @param {PgClient} params.client * @param {FastifyReply} params.reply [description] * @param {string} params.token * @param {Date} params.now diff --git a/packages/web/routes/api/user/email/verify-email-update-handler.js b/packages/web/routes/api/user/email/verify-email-update-handler.js index 7b98cede..793a803d 100644 --- a/packages/web/routes/api/user/email/verify-email-update-handler.js +++ b/packages/web/routes/api/user/email/verify-email-update-handler.js @@ -1,13 +1,13 @@ /** - * @import { FastifyInstance, FastifyReply } from 'fastify' - * @import { PoolClient } from 'pg' + * @import { FastifyReply } from 'fastify' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ import SQL from '@nearform/sql' /** * @param {object} params * @param {string} params.userId - * @param {PoolClient | FastifyInstance['pg']} params.client + * @param {PgClient} params.client * @param {FastifyReply} params.reply [description] * @param {string} params.token * @param {Date} params.now diff --git a/packages/web/routes/api/user/user-query.js b/packages/web/routes/api/user/user-query.js index 9480de49..83afe238 100644 --- a/packages/web/routes/api/user/user-query.js +++ b/packages/web/routes/api/user/user-query.js @@ -1,7 +1,7 @@ /** * @import { FastifyInstance } from 'fastify' - * @import { PoolClient } from 'pg' * @import { TypeUserRead } from './schemas/schema-user-read.js' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' */ import SQL from '@nearform/sql' @@ -10,7 +10,7 @@ import SQL from '@nearform/sql' * getUser returns the user object for a given userId * @typedef {GetUserQueryParams & { * fastify: FastifyInstance, - * pg?: PoolClient | FastifyInstance['pg'] + * pg?: PgClient * }} GetUserParams */ diff --git a/packages/web/tsconfig.json b/packages/web/tsconfig.json index 4ea5abbd..e6f96907 100644 --- a/packages/web/tsconfig.json +++ b/packages/web/tsconfig.json @@ -2,9 +2,7 @@ "extends": "@voxpelli/tsconfig/node20.json", "compilerOptions": { "noEmit": true, - "skipLibCheck": true, - "composite": true, - "incremental": true, + "skipLibCheck": false }, "include": [ "**/*" diff --git a/packages/worker/package.json b/packages/worker/package.json index 32597b1d..7015fbf3 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -22,6 +22,7 @@ "print-plugins": "fastify print-plugins app.js" }, "dependencies": { + "@bret/is-youtube-url": "^1.0.3", "@breadcrum/extract-meta": "^2.0.0", "@breadcrum/resources": "file:../resources", "@fastify/autoload": "^6.0.1", @@ -44,7 +45,7 @@ "hyperid": "^3.1.1", "jsdom": "^25.0.1", "undici": "^7.0.0", - "webassert": "^3.0.2" + "user-agents": "^1.1.355" }, "devDependencies": { "@bret/type-provider-json-schema-to-ts": "^5.0.1", @@ -53,11 +54,12 @@ "@types/gunzip-maybe": "^1.4.2", "@types/jsdom": "^21.1.7", "@types/node": "^22.0.0", + "@types/user-agents": "^1.0.4", "@voxpelli/tsconfig": "^15.0.0", "neostandard": "^0.12.0", "npm-run-all2": "^7.0.1", "tap": "^21.0.0", - "typescript": "~5.6.2" + "typescript": "~5.7.2" }, "tap": { "serial": [ diff --git a/packages/worker/plugins/bullmq-decorators.d.ts b/packages/worker/plugins/bullmq-decorators.d.ts index 050e35b6..593d4842 100644 --- a/packages/worker/plugins/bullmq-decorators.d.ts +++ b/packages/worker/plugins/bullmq-decorators.d.ts @@ -1,11 +1,26 @@ import type { FastifyRequest } from 'fastify'; import type { Worker } from 'bullmq' +import type { + ResolveBookmarkW, +} from '@breadcrum/resources/bookmarks/resolve-bookmark-queue.js' +import type { + ResolveEpisodeQ, + ResolveEpisodeW, +} from '@breadcrum/resources/episodes/resolve-episode-queue.js' +import type { + ResolveArchiveW, +} from '@breadcrum/resources/archives/resolve-archive-queue.js' + declare module 'fastify' { interface FastifyInstance { + queues: { + resolveEpisodeQ: ResolveEpisodeQ + } workers: { - createEpisodeWorker: Worker - resolveDocumentWorker: Worker + resolveEpisodeW: ResolveEpisodeW + resolveArchiveW: ResolveArchiveW + resolveBookmarkW: ResolveBookmarkW } } } diff --git a/packages/worker/plugins/bullmq.js b/packages/worker/plugins/bullmq.js index 43802ef0..c6f2a14a 100644 --- a/packages/worker/plugins/bullmq.js +++ b/packages/worker/plugins/bullmq.js @@ -1,8 +1,19 @@ +/** + * @import { ResolveEpisodeW } from '@breadcrum/resources/episodes/resolve-episode-queue.js' + * @import { ResolveArchiveW } from '@breadcrum/resources/archives/resolve-archive-queue.js' + * @import { ResolveBookmarkW } from '@breadcrum/resources/bookmarks/resolve-bookmark-queue.js' + */ import fp from 'fastify-plugin' -import { Worker } from 'bullmq' +import { Worker, Queue } from 'bullmq' + +import { resolveEpisodeQName } from '@breadcrum/resources/episodes/resolve-episode-queue.js' +import { resolveArchiveQName } from '@breadcrum/resources/archives/resolve-archive-queue.js' +import { resolveBookmarkQName } from '@breadcrum/resources/bookmarks/resolve-bookmark-queue.js' +import { defaultJobOptions } from '@breadcrum/resources/bullmq/default-job-options.js' -import { makeEpisodeWorker } from '../workers/episode-worker/index.js' -import { makeDocumentWorker } from '../workers/document-processor/index.js' +import { makeEpisodeP } from '../workers/episodes/index.js' +import { makeArchiveP } from '../workers/archives/index.js' +import { makeBookmarkP } from '../workers/bookmarks/index.js' /** * This plugins adds bullMQ queues @@ -12,27 +23,51 @@ export default fp(async function (fastify, _opts) { if (!redis) throw new Error('Missing a redis connection object') - const defautOpts = { + const resolveEpisodeQ = new Queue( + resolveEpisodeQName, + { + connection: redis, + defaultJobOptions, + } + ) + + const queues = { + resolveEpisodeQ, + } + + fastify.decorate('queues', queues) + + const defaultWorkerOpts = { connection: redis, autorun: false, } // Running both workers in a single process - const createEpisodeWorker = new Worker( - 'resolveEpisode', - makeEpisodeWorker({ fastify }), - defautOpts + /** @type {ResolveEpisodeW} */ + const resolveEpisodeW = new Worker( + resolveEpisodeQName, + makeEpisodeP({ fastify }), + defaultWorkerOpts + ) + + /** @type {ResolveArchiveW} */ + const resolveArchiveW = new Worker( + resolveArchiveQName, + makeArchiveP({ fastify }), + defaultWorkerOpts ) - const resolveDocumentWorker = new Worker( - 'resolveDocument', - makeDocumentWorker({ fastify }), - defautOpts + /** @type {ResolveBookmarkW} */ + const resolveBookmarkW = new Worker( + resolveBookmarkQName, + makeBookmarkP({ fastify }), + defaultWorkerOpts ) const workers = { - createEpisodeWorker, - resolveDocumentWorker, + resolveEpisodeW, + resolveArchiveW, + resolveBookmarkW } fastify.decorate('workers', workers) diff --git a/packages/worker/tsconfig.json b/packages/worker/tsconfig.json index ab2f5504..0f6e00ad 100644 --- a/packages/worker/tsconfig.json +++ b/packages/worker/tsconfig.json @@ -2,9 +2,7 @@ "extends": "@voxpelli/tsconfig/node20.json", "compilerOptions": { "noEmit": true, - "skipLibCheck": true, - "composite": true, - "incremental": true, + "skipLibCheck": false }, "include": [ "**/*" diff --git a/packages/worker/workers/document-processor/extract-archive.js b/packages/worker/workers/archives/extract-archive.js similarity index 100% rename from packages/worker/workers/document-processor/extract-archive.js rename to packages/worker/workers/archives/extract-archive.js diff --git a/packages/worker/workers/document-processor/fetch-html.js b/packages/worker/workers/archives/fetch-html.js similarity index 86% rename from packages/worker/workers/document-processor/fetch-html.js rename to packages/worker/workers/archives/fetch-html.js index a9ae0e47..37d15419 100644 --- a/packages/worker/workers/document-processor/fetch-html.js +++ b/packages/worker/workers/archives/fetch-html.js @@ -1,7 +1,3 @@ -/** - * @import { FastifyInstance } from 'fastify' - */ - import { request as undiciRequest } from 'undici' import { pipeline } from 'node:stream/promises' import gunzip from 'gunzip-maybe' @@ -20,12 +16,12 @@ const uaHacks = { } /** + * Fetch HTML documents from an HTML. Supports max 3 redirects and gzip. Probably needs more work. * @param {object} params - * @param {string} params.url - * @param {FastifyInstance} params.fastify + * @param {URL} params.url */ -export async function fetchHTML ({ url, fastify }) { - const requestURL = new URL(url) +export async function fetchHTML ({ url }) { + const requestURL = url const ua = uaHacks[requestURL.hostname] ?? 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36' const response = await undiciRequest(requestURL, { @@ -60,6 +56,5 @@ export async function fetchHTML ({ url, fastify }) { html = await response.body.text() } - fastify.log.debug({ url, html }) return html } diff --git a/packages/worker/workers/document-processor/get-site-metadata.js b/packages/worker/workers/archives/get-site-metadata.js similarity index 100% rename from packages/worker/workers/document-processor/get-site-metadata.js rename to packages/worker/workers/archives/get-site-metadata.js diff --git a/packages/worker/workers/archives/index.js b/packages/worker/workers/archives/index.js new file mode 100644 index 00000000..397532a8 --- /dev/null +++ b/packages/worker/workers/archives/index.js @@ -0,0 +1,64 @@ +/** + * @import { FastifyInstance } from 'fastify' + * @import { ResolveArchiveP } from '@breadcrum/resources/archives/resolve-archive-queue.js' + */ + +import SQL from '@nearform/sql' +import { JSDOM } from 'jsdom' +import { fetchHTML } from './fetch-html.js' +import { resolveArchive } from './resolve-archive.js' + +// The ArchiveP Worker Processor attempts to extract archive content on an un-ready +// Archive row, and set it to ready when completed. + +/** + * @param {object} params + * @param { FastifyInstance } params.fastify + * @return {ResolveArchiveP} + */export function makeArchiveP ({ fastify }) { + /** @type { ResolveArchiveP } */ + async function archiveP (job) { + const { + url, + userId, + archiveId, + archiveURL, + } = job.data + const log = fastify.log.child({ jobId: job.id }) + const pg = fastify.pg + + try { + const html = await fetchHTML({ url: new URL(url) }) + const initialDocument = (new JSDOM(html, { url })).window.document + + log.info('resolving archive') + + const results = await resolveArchive({ + fastify, + log, + userId, + archiveId, + url: archiveURL, + initialDocument, + }) + + log.info({ results }, 'document processed') + } catch (err) { + const handledError = err instanceof Error ? err : new Error('Unknown error', { cause: err }) + + log.error({ error: handledError, archiveId }, 'Error resolving Archive') + + if (archiveId && userId) { + const errorQuery = SQL` + update archives + set error = ${handledError.stack}, done = true + where id = ${archiveId} + and owner_id =${userId};` + await pg.query(errorQuery) + } + } + return null + } + + return archiveP +} diff --git a/packages/worker/workers/document-processor/resolve-archive.js b/packages/worker/workers/archives/resolve-archive.js similarity index 100% rename from packages/worker/workers/document-processor/resolve-archive.js rename to packages/worker/workers/archives/resolve-archive.js diff --git a/packages/worker/workers/document-processor/resolve-bookmark.js b/packages/worker/workers/archives/resolve-bookmark.js similarity index 100% rename from packages/worker/workers/document-processor/resolve-bookmark.js rename to packages/worker/workers/archives/resolve-bookmark.js diff --git a/packages/worker/workers/bookmarks/index.js b/packages/worker/workers/bookmarks/index.js new file mode 100644 index 00000000..add6969d --- /dev/null +++ b/packages/worker/workers/bookmarks/index.js @@ -0,0 +1,239 @@ +/** + * @import { FastifyInstance } from 'fastify' + * @import { ResolveBookmarkP } from '@breadcrum/resources/bookmarks/resolve-bookmark-queue.js' + * @import { YTDLPMetadata } from '@breadcrum/resources/episodes/yt-dlp-api-client.js' + */ + +import SQL from '@nearform/sql' +import { JSDOM } from 'jsdom' +import { putTagsQuery } from '@breadcrum/resources/tags/put-tags-query.js' +import { createEpisode } from '@breadcrum/resources/episodes/episode-query-create.js' +import { getYTDLPMetadata } from '@breadcrum/resources/episodes/yt-dlp-api-client.js' +import { resolveType } from '@breadcrum/resources/episodes/resolve-type.js' + +import { fetchHTML } from '../archives/fetch-html.js' +import { getSiteMetadata } from '../archives/get-site-metadata.js' +import { extractArchive } from '../archives/extract-archive.js' +import { upcomingCheck } from '../episodes/handle-upcoming.js' +import { finalizeEpisode, finalizeEpisodeError } from '../episodes/finaize-episode.js' +// import { isYouTubeUrl } from '@bret/is-youtube-url' + +/** + * @param {object} params + * @param { FastifyInstance } params.fastify + * @return {ResolveBookmarkP} + */ +export function makeBookmarkP ({ fastify }) { + /** @type {ResolveBookmarkP} */ + async function bookmarkP (job) { + const { + userId, + bookmarkId, + url, + resolveBookmark, + resolveArchive, + resolveEpisode, + userProvidedMeta, + } = job.data + + const log = fastify.log.child({ jobId: job.id }) + const pg = fastify.pg + + /** workingUrl is the URL to fetch subsequent data with. It is probably normalized but might not be. */ + const workingUrl = new URL(url) + + // We perform the expensive network calls, according to input options + // and use the results to derive associated assets. + + /** @type { YTDLPMetadata | undefined } */ + let media + if (resolveEpisode) { + try { + media = await getYTDLPMetadata({ + url, + medium: 'video', + ytDLPEndpoint: fastify.config.YT_DLP_API_URL, + attempt: job.attemptsMade, + cache: fastify.ytdlpCache, + }) + } catch (err) { + log.warn({ + error: err, + bookmarkId, + resolveBookmark, + resolveArchive, + resolveEpisode, + userProvidedMeta + }, 'getYTDLPMetadata threw during bookmark resolve') + } + } + + /** @type { Document | undefined } */ + let document + if (resolveBookmark || resolveArchive) { + try { + const html = await fetchHTML({ url: workingUrl }) + document = (new JSDOM(html, { url })).window.document + } catch (err) { + log.warn({ + error: err, + bookmarkId, + resolveBookmark, + resolveArchive, + resolveEpisode, + userProvidedMeta + }, 'Resolving html document failed during bookmark resolve') + } + } + + if (resolveBookmark) { + + } + + if (resolveArchive) { + + } + + + const pageMetadata = await getSiteMetadata({ document }) + const article = await extractArchive({ document }) + + const bookmarkTitle = userProvidedMeta.title + + // log.debug({ url, html }, 'Fetched HTML') + + if (resolveEpisode) { + try { + media = await getYTDLPMetadata({ + url, + medium: 'video', + ytDLPEndpoint: fastify.config.YT_DLP_API_URL, + attempt: job.attemptsMade, + cache: fastify.ytdlpCache, + }) + const mediaUrlFound = media?.url + const upcomingData = upcomingCheck({ media }) + + if (upcomingData.isUpcoming || mediaUrlFound) { + const episodeEntity = await createEpisode({ + client: pg, + userId, + bookmarkId, + type: 'redirect', + medium: 'video', + url, + }) + + if (upcomingData.isUpcoming) { + const scheduledEpisodeJob = await fastify.queues.resolveEpisodeQ.add( + 'resolve-episode', + { + userId, + bookmarkTitle, + episodeId: episodeEntity.id, + url, + medium: 'video' + }, + { + delay: upcomingData.jobDelayMs + } + ) + + const releaseTimestampDate = new Date(upcomingData.releaseTimestampMs) + const jobDelayDate = new Date(upcomingData.jobDelayMs) + + log.info({ + episodeEntity, + upcomingData, + jobId: scheduledEpisodeJob.id, + releaseTimestamp: releaseTimestampDate.toLocaleString(), + jobDelay: jobDelayDate.toLocaleString(), + }, 'Upcoming episode for bookmark scheduled') + } else if (mediaUrlFound) { + await finalizeEpisode({ + pg, + media, + bookmarkTitle, + episodeId: episodeEntity.id, + userId, + url + }) + + log.info(`Episode ${episodeEntity.id} for ${url} is ready.`) + } else { + throw new Error('An episode was created without a scheduled time or URL') + } + } + } catch (err) { + const handledError = err instanceof Error ? err : new Error('Unknown episode create error', { cause: err }) + log.error({ + episodeEntity, + error: err + }, + 'Error creating episode on bookmark create') + + await finalizeEpisodeError({ + pg, + error: handledError, + episodeId: episodeEntity.id, + userId + }) + } + } + } + + if (resolveBookmark) { + const pageMetadata = await getSiteMetadata({ + document, + }) + + const bookmarkUpdates = [] + + bookmarkUpdates.push(SQL`done = true`) + + if (pageMetadata?.title && userProvidedMeta.title === url) { + bookmarkUpdates.push(SQL`title = ${pageMetadata.title}`) + } + + if (pageMetadata?.summary && !userProvidedMeta.summary) { + bookmarkUpdates.push(SQL`summary = ${pageMetadata?.summary}`) + } + + log.debug({ bookmarkUpdates }, 'Bookmark updates') + + if (bookmarkUpdates.length > 0) { + const bookmarkResolveQuery = SQL` + update bookmarks + set ${SQL.glue(bookmarkUpdates, ' , ')} + where id = ${bookmarkId} + and owner_id =${userId}; + ` + log.debug({ bookmarkResolveQuery }, 'Bookmark resolve query') + + const bookmarkResolveResult = await pg.query(bookmarkResolveQuery) + log.debug({ bookmarkResolveResult }, 'Bookmark resolved') + } + + if (pageMetadata?.tags?.length > 0 && !(userProvidedMeta?.tags?.length > 0)) { + await putTagsQuery({ + fastify, + pg, + userId, + bookmarkId, + tags: pageMetadata.tags, + }) + } + log.info(`Bookmark ${bookmarkId} for ${url} is ready.`) + } + + if (resolveArchive) { + const article = await extractArchive({ document }) + + const archiveData = [] + } + + return null + } + + return bookmarkP +} diff --git a/packages/worker/workers/bookmarks/is-yt-url.js b/packages/worker/workers/bookmarks/is-yt-url.js new file mode 100644 index 00000000..725028bb --- /dev/null +++ b/packages/worker/workers/bookmarks/is-yt-url.js @@ -0,0 +1,19 @@ +/** + * Checks if a given URL belongs to YouTube or Google Video domains. + * + * @param {URL} parsedUrl - A valid URL object (already validated by Ajv). + * @returns {boolean} - Returns true if the URL is for a YouTube or Google Video resource. + */ +export function isYouTubeUrl (parsedUrl) { + const validHosts = new Set([ + 'www.youtube.com', + 'youtube.com', + 'm.youtube.com', + 'youtu.be', + 'youtube-nocookie.com', + 'googlevideo.com' + ]) + + // Return true if the host matches any known YouTube or Google video domains + return validHosts.has(parsedUrl.host) +} diff --git a/packages/worker/workers/bookmarks/ua.js b/packages/worker/workers/bookmarks/ua.js new file mode 100644 index 00000000..58545f1a --- /dev/null +++ b/packages/worker/workers/bookmarks/ua.js @@ -0,0 +1,27 @@ +import UserAgent from 'user-agents' + +// const DEFAULT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36' +const GOOGLE_BOT_UA = 'Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36' + +const ua = new UserAgent() + +/** + * @type {{ + * [url: string]: string + * }} + */ +const uaHacks = { + 'twitter.com': GOOGLE_BOT_UA, + 'mobile.twitter.com': GOOGLE_BOT_UA, +} + +/** + * @param {URL} url + */ +export function getUA (url) { + return ( + uaHacks[url.hostname] ?? + // @ts-expect-error ua is not listed as callable for some reason + ua().toString() + ) +} diff --git a/packages/worker/workers/document-processor/index.js b/packages/worker/workers/document-processor/index.js deleted file mode 100644 index dc35ec1e..00000000 --- a/packages/worker/workers/document-processor/index.js +++ /dev/null @@ -1,98 +0,0 @@ -/** - * @import { FastifyInstance } from 'fastify' - * @import { Processor} from 'bullmq' - */ - -import SQL from '@nearform/sql' -import { JSDOM } from 'jsdom' -import { fetchHTML } from './fetch-html.js' -import { resolveBookmark } from './resolve-bookmark.js' -import { resolveArchive } from './resolve-archive.js' - -/** - * @param {object} params - * @param {FastifyInstance} params.fastify - */ -export function makeDocumentWorker ({ fastify }) { - /** @type {Processor< - * { - * url: string - * userId: string - * resolveMeta: boolean - * archive: boolean - * title: string - * tags: string[] - * summary: string - * bookmarkId: string - * archiveId: string - * archiveURL: string - * } - * >} */ - return async function documentWorker (job) { - const { - url, - userId, - resolveMeta, - archive, - title, - tags, - summary, - bookmarkId, - archiveId, - archiveURL, - } = job.data - const log = fastify.log.child({ jobId: job.id }) - const pg = fastify.pg - - try { - const html = await fetchHTML({ url, fastify }) - const initialDocument = (new JSDOM(html, { url })).window.document - - const work = [] - if (resolveMeta) { - log.info('resolving meta') - work.push( - resolveBookmark({ - fastify, - log, - userId, - bookmarkId, - url, - title, - tags, - summary, - initialDocument, - }) - ) - } - - if (archive) { - log.info('resolving archive') - work.push( - resolveArchive({ - fastify, - log, - userId, - archiveId, - url: archiveURL, - initialDocument, - })) - } - - const results = await Promise.allSettled(work) - log.info({ results }, 'document processed') - } catch (err) { - log.error(`Error resolving document: Bookmark ${bookmarkId} Archive ${archiveId}`) - log.error(err) - const handledError = err instanceof Error ? err : new Error('Unknown error', { cause: err }) - if (archiveId && userId) { - const errorQuery = SQL` - update archives - set error = ${handledError.stack}, done = true - where id = ${archiveId} - and owner_id =${userId};` - await pg.query(errorQuery) - } - } - } -} diff --git a/packages/worker/workers/episode-worker/index.js b/packages/worker/workers/episode-worker/index.js deleted file mode 100644 index c6d61a51..00000000 --- a/packages/worker/workers/episode-worker/index.js +++ /dev/null @@ -1,133 +0,0 @@ -/** - * @import { Processor} from 'bullmq' - * @import { FastifyInstance } from 'fastify' - * @import { MediumTypes } from '@breadcrum/resources/episodes/yt-dlp-api-client.js' - */ -import SQL from '@nearform/sql' -import { getYTDLPMetadata } from '@breadcrum/resources/episodes/yt-dlp-api-client.js' -import { resolveType } from '@breadcrum/resources/episodes/resolve-type.js' -import { DelayedError } from 'bullmq' - -/** - * @param {object} params - * @param { FastifyInstance } params.fastify - * @return {Processor} - */ -export function makeEpisodeWorker ({ fastify }) { - const logger = fastify.log - - /** @type {Processor< - * { - * userId: string - * bookmarkTitle: string - * episodeId: string - * url: string - * medium: MediumTypes - * } - * >} */ - async function episodeWorker (job, token) { - const log = logger.child({ - jobId: job.id, - }) - - const { - userId, - bookmarkTitle, - episodeId, - url, - medium, - } = job.data - - await fastify.pg.transact(async client => { - const pg = client - - try { - const metadata = await getYTDLPMetadata({ - url, - medium, - ytDLPEndpoint: fastify.config.YT_DLP_API_URL, - attempt: job.attemptsMade, - cache: fastify.ytdlpCache, - }) - - if (metadata.live_status === 'is_upcoming' && metadata.release_timestamp) { - const releaseTimestamp = metadata.release_timestamp * 1000 // Convert seconds to milliseconds - const threeMinutesInMilliseconds = 3 * 60 * 1000 // 3 minutes in milliseconds - - const delayedTimestamp = releaseTimestamp + threeMinutesInMilliseconds - - const releaseDate = new Date(releaseTimestamp) - const delayedDate = new Date(delayedTimestamp) - const isoTimestamp = delayedDate.toISOString() - - log.info(`Episode ${episodeId} for ${url} is scheduled at ${releaseDate.toLocaleString()} and will be processed at ${isoTimestamp}.`) - - job.moveToDelayed(delayedTimestamp, token) - - throw new DelayedError() - } - - if (!metadata?.url) { - throw new Error('No video URL was found in discovery step') - } - - const videoData = [] - - videoData.push(SQL`done = true`) - videoData.push(SQL`url = ${url}`) - if ('filesize_approx' in metadata) videoData.push(SQL`size_in_bytes = ${Math.round(metadata.filesize_approx)}`) - if ('duration' in metadata) videoData.push(SQL`duration_in_seconds = ${Math.round(metadata.duration)}`) - if ('channel' in metadata) videoData.push(SQL`author_name = ${metadata.channel}`) - if ('title' in metadata && 'ext' in metadata) { - const filename = `${metadata.title}.${metadata.ext}` - videoData.push(SQL`filename = ${filename}`) - } - - if ('title' in metadata && metadata.title !== bookmarkTitle) { - videoData.push(SQL`title = ${metadata.title.trim().substring(0, 255)}`) - } - if ('ext' in metadata) videoData.push(SQL`ext = ${metadata.ext}`) - if ('ext' in metadata) videoData.push(SQL`src_type = ${resolveType(metadata)}`) - if ('description' in metadata) { - videoData.push(SQL`text_content = ${metadata.description}`) - } - if ('uploader_url' in metadata || 'channel_url' in metadata) { - videoData.push(SQL`author_url = ${metadata.uploader_url || metadata.channel_url}`) - } - if ('thumbnail' in metadata) { - videoData.push(SQL`thumbnail = ${metadata.thumbnail}`) - } - - const query = SQL` - update episodes - set ${SQL.glue(videoData, ' , ')} - where id = ${episodeId} - and owner_id =${userId} - returning type, medium; - ` - - /* const epResults = */ await pg.query(query) - // const episode = epResults.rows.pop() - - log.info(`Episode ${episodeId} for ${url} is ready.`) - } catch (err) { - if (err instanceof DelayedError) { - // TODO make this not janky - throw err - } - /** @type {Error} */ - const handledError = err instanceof Error ? err : new Error('Unknown error', { cause: err }) - log.error(`Error extracting video for episode ${episodeId}`) - log.error(handledError) - const errorQuery = SQL` - update episodes - set error = ${handledError.stack}, done = true - where id = ${episodeId} - and owner_id =${userId};` - await pg.query(errorQuery) - } - }) - } - - return episodeWorker -} diff --git a/packages/worker/workers/episodes/finaize-episode.js b/packages/worker/workers/episodes/finaize-episode.js new file mode 100644 index 00000000..412d9972 --- /dev/null +++ b/packages/worker/workers/episodes/finaize-episode.js @@ -0,0 +1,86 @@ +/** + * @import { YTDLPMetadata } from '@breadcrum/resources/episodes/yt-dlp-api-client.js' + * @import { PgClient } from '@breadcrum/resources/types/pg-client.js' + */ + +import SQL from '@nearform/sql' +import { resolveType } from '@breadcrum/resources/episodes/resolve-type.js' + +/** + * @param {object} params + * @param {YTDLPMetadata} params.media + * @param {string} params.url + * @param {string} params.bookmarkTitle + * @param {string} params.episodeId + * @param {string} params.userId + * @param {PgClient} params.pg + */ +export async function finalizeEpisode ({ + pg, + media, + bookmarkTitle, + episodeId, + userId, + url +}) { + const videoData = [] + + videoData.push(SQL`done = true`) + videoData.push(SQL`url = ${url}`) + if ('filesize_approx' in media) videoData.push(SQL`size_in_bytes = ${Math.round(media.filesize_approx)}`) + if ('duration' in media) videoData.push(SQL`duration_in_seconds = ${Math.round(media.duration)}`) + if ('channel' in media) videoData.push(SQL`author_name = ${media.channel}`) + if ('title' in media && 'ext' in media) { + const filename = `${media.title}.${media.ext}` + videoData.push(SQL`filename = ${filename}`) + } + + if ('title' in media && media.title !== bookmarkTitle) { + videoData.push(SQL`title = ${media.title.trim().substring(0, 255)}`) + } + + if ('ext' in media) videoData.push(SQL`ext = ${media.ext}`) + if ('ext' in media) videoData.push(SQL`src_type = ${resolveType(media)}`) + if ('description' in media) { + videoData.push(SQL`text_content = ${media.description}`) + } + if ('uploader_url' in media || 'channel_url' in media) { + videoData.push(SQL`author_url = ${media.uploader_url || media.channel_url}`) + } + if ('thumbnail' in media) { + videoData.push(SQL`thumbnail = ${media.thumbnail}`) + } + + const query = SQL` + update episodes + set ${SQL.glue(videoData, ' , ')} + where id = ${episodeId} + and owner_id =${userId} + returning type, medium; + ` + + await pg.query(query) + // const episode = epResults.rows.pop() +} + +/** + * @param {object} params + * @param {Error} params.error + * @param {string} params.episodeId + * @param {string} params.userId + * @param {PgClient} params.pg + */ +export async function finalizeEpisodeError ({ + pg, + error, + episodeId, + userId +}) { + const errorQuery = SQL` + update episodes + set error = ${error.stack}, done = true + where id = ${episodeId} + and owner_id =${userId};` + + await pg.query(errorQuery) +} diff --git a/packages/worker/workers/episodes/handle-upcoming.js b/packages/worker/workers/episodes/handle-upcoming.js new file mode 100644 index 00000000..46dbf853 --- /dev/null +++ b/packages/worker/workers/episodes/handle-upcoming.js @@ -0,0 +1,30 @@ +/** + * @param {object} params + * @param {YTDLPMetadata} params.media + */ +export function upcomingCheck ({ + media +}) { + if (media.live_status === 'is_upcoming' && media.release_timestamp) { + const releaseTimestampMs = media.release_timestamp * 1000 // Convert seconds to milliseconds + const threeMinutesMs = 3 * 60 * 1000 // 3 minutes in milliseconds + + const jobDelayMs = releaseTimestampMs + threeMinutesMs + + return /** @type {const} */({ + isUpcoming: true, + jobDelayMs, + releaseTimestampMs + }) + } else { + return /** @type {const} */({ + isUpcoming: false, + jobDelayMs: null, + releaseTimestampMs: null + }) + } +} + +/** + * @import { YTDLPMetadata } from '@breadcrum/resources/episodes/yt-dlp-api-client.js' + */ diff --git a/packages/worker/workers/episodes/index.js b/packages/worker/workers/episodes/index.js new file mode 100644 index 00000000..9fe230b8 --- /dev/null +++ b/packages/worker/workers/episodes/index.js @@ -0,0 +1,90 @@ +/** + * @import { FastifyInstance } from 'fastify' + * @import { ResolveEpisodeP } from '@breadcrum/resources/episodes/resolve-episode-queue.js' + */ +import { getYTDLPMetadata } from '@breadcrum/resources/episodes/yt-dlp-api-client.js' +import { DelayedError } from 'bullmq' +import { finalizeEpisode, finalizeEpisodeError } from './finaize-episode.js' +import { upcomingCheck } from './handle-upcoming.js' + +/** + * @param {object} params + * @param { FastifyInstance } params.fastify + * @return {ResolveEpisodeP} + */ +export function makeEpisodeP ({ fastify }) { + const logger = fastify.log + + /** @type { ResolveEpisodeP } */ + async function episodeP (job, token) { + const log = logger.child({ + jobId: job.id, + }) + + const { + userId, + bookmarkTitle, + episodeId, + url, + medium, + } = job.data + + const pg = fastify.pg + + try { + const media = await getYTDLPMetadata({ + url, + medium, + ytDLPEndpoint: fastify.config.YT_DLP_API_URL, + attempt: job.attemptsMade, + cache: fastify.ytdlpCache, + }) + + const upcomingData = upcomingCheck({ media }) + + if (upcomingData.isUpcoming) { + const releaseTimestampDate = new Date(upcomingData.releaseTimestampMs) + const jobDelayDate = new Date(upcomingData.jobDelayMs) + + log.info(`Episode ${episodeId} for ${url} is scheduled at ${releaseTimestampDate.toLocaleString()} and will be processed at ${jobDelayDate.toISOString()}.`) + + job.moveToDelayed(upcomingData.jobDelayMs, token) + throw new DelayedError() + } + + if (!media?.url) { + throw new Error('No video URL was found in discovery step') + } + + await finalizeEpisode({ + pg, + media, + bookmarkTitle, + episodeId, + userId, + url + }) + + log.info(`Episode ${episodeId} for ${url} is ready.`) + } catch (err) { + if (err instanceof DelayedError) { + // TODO make this not janky + throw err + } + /** @type {Error} */ + const handledError = err instanceof Error ? err : new Error('Unknown error', { cause: err }) + log.error(`Error extracting video for episode ${episodeId}`) + log.error(handledError) + await finalizeEpisodeError({ + pg, + error: handledError, + episodeId, + userId + }) + } + + return null + } + + return episodeP +} diff --git a/packages/worker/workers/bookmark-initializer/index.js b/subscription-service.ts similarity index 100% rename from packages/worker/workers/bookmark-initializer/index.js rename to subscription-service.ts