Skip to content

Commit

Permalink
feature: Introduce a mechanism to cleanup dangling assets
Browse files Browse the repository at this point in the history
  • Loading branch information
MohamedBassem committed Oct 12, 2024
1 parent 9f87207 commit c16173e
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 8 deletions.
22 changes: 22 additions & 0 deletions apps/web/components/dashboard/admin/AdminActions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ export default function AdminActions() {
},
});

const { mutateAsync: tidyAssets, isPending: isTidyAssetsPending } =
api.admin.tidyAssets.useMutation({
onSuccess: () => {
toast({
description: "Tidy assets request has been enqueued!",
});
},
onError: (e) => {
toast({
variant: "destructive",
description: e.message,
});
},
});

return (
<div>
<div className="mb-2 mt-8 text-xl font-medium">Actions</div>
Expand Down Expand Up @@ -97,6 +112,13 @@ export default function AdminActions() {
>
Reindex All Bookmarks
</ActionButton>
<ActionButton
variant="destructive"
loading={isTidyAssetsPending}
onClick={() => tidyAssets()}
>
Compact Assets
</ActionButton>
</div>
</div>
);
Expand Down
6 changes: 6 additions & 0 deletions apps/web/components/dashboard/admin/ServerStats.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ export default function ServerStats() {
<TableCell>{serverStats.inferenceStats.pending}</TableCell>
<TableCell>{serverStats.inferenceStats.failed}</TableCell>
</TableRow>
<TableRow>
<TableCell>Tidy Assets Jobs</TableCell>
<TableCell>{serverStats.tidyAssetsStats.queued}</TableCell>
<TableCell>-</TableCell>
<TableCell>-</TableCell>
</TableRow>
</TableBody>
</Table>
</div>
Expand Down
18 changes: 17 additions & 1 deletion apps/web/components/dashboard/admin/UserList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ import { api } from "@/lib/trpc";
import { Trash } from "lucide-react";
import { useSession } from "next-auth/react";

function toHumanReadableSize(size: number) {
const sizes = ["Bytes", "KB", "MB", "GB", "TB"];
if (size === 0) return "0 Bytes";
const i = Math.floor(Math.log(size) / Math.log(1024));
return (size / Math.pow(1024, i)).toFixed(2) + " " + sizes[i];
}

export default function UsersSection() {
const { data: session } = useSession();
const invalidateUserList = api.useUtils().users.list.invalidate;
const { data: users } = api.users.list.useQuery();
const { data: userStats } = api.admin.userStats.useQuery();
const { mutate: deleteUser, isPending: isDeletionPending } =
api.users.delete.useMutation({
onSuccess: () => {
Expand All @@ -35,7 +43,7 @@ export default function UsersSection() {
},
});

if (!users) {
if (!users || !userStats) {
return <LoadingSpinner />;
}

Expand All @@ -47,6 +55,8 @@ export default function UsersSection() {
<TableHeader className="bg-gray-200">
<TableHead>Name</TableHead>
<TableHead>Email</TableHead>
<TableHead>Num Bookmarks</TableHead>
<TableHead>Asset Sizes</TableHead>
<TableHead>Role</TableHead>
<TableHead>Action</TableHead>
</TableHeader>
Expand All @@ -55,6 +65,12 @@ export default function UsersSection() {
<TableRow key={u.id}>
<TableCell className="py-1">{u.name}</TableCell>
<TableCell className="py-1">{u.email}</TableCell>
<TableCell className="py-1">
{userStats[u.id].numBookmarks}
</TableCell>
<TableCell className="py-1">
{toHumanReadableSize(userStats[u.id].assetSizes)}
</TableCell>
<TableCell className="py-1 capitalize">{u.role}</TableCell>
<TableCell className="py-1">
<ActionButton
Expand Down
12 changes: 9 additions & 3 deletions apps/workers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import "dotenv/config";

import { TidyAssetsWorker } from "tidyAssetsWorker";

import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import { runQueueDBMigrations } from "@hoarder/shared/queues";
Expand All @@ -13,21 +15,25 @@ async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();

const [crawler, openai, search] = [
const [crawler, openai, search, tidyAssets] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
SearchIndexingWorker.build(),
TidyAssetsWorker.build(),
];

await Promise.any([
Promise.all([crawler.run(), openai.run(), search.run()]),
Promise.all([crawler.run(), openai.run(), search.run(), tidyAssets.run()]),
shutdownPromise,
]);
logger.info("Shutting down crawler, openai and search workers ...");
logger.info(
"Shutting down crawler, openai, tidyAssets and search workers ...",
);

crawler.stop();
openai.stop();
search.stop();
tidyAssets.stop();
}

main();
107 changes: 107 additions & 0 deletions apps/workers/tidyAssetsWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { eq } from "drizzle-orm";

import { db } from "@hoarder/db";
import { assets } from "@hoarder/db/schema";
import { DequeuedJob, Runner } from "@hoarder/queue";
import { deleteAsset, getAllAssets } from "@hoarder/shared/assetdb";
import logger from "@hoarder/shared/logger";
import {
TidyAssetsQueue,
ZTidyAssetsRequest,
zTidyAssetsRequestSchema,
} from "@hoarder/shared/queues";

export class TidyAssetsWorker {
static build() {
logger.info("Starting tidy assets worker ...");
const worker = new Runner<ZTidyAssetsRequest>(
TidyAssetsQueue,
{
run: runTidyAssets,
onComplete: (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[tidyAssets][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
const jobId = job?.id ?? "unknown";
logger.error(
`[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
);
return Promise.resolve();
},
},
{
concurrency: 1,
pollIntervalMs: 1000,
timeoutSecs: 30,
},
);

return worker;
}
}

async function handleAsset(
asset: {
assetId: string;
userId: string;
size: number;
contentType: string;
fileName?: string | null;
},
request: ZTidyAssetsRequest,
jobId: string,
) {
const dbRow = await db.query.assets.findFirst({
where: eq(assets.id, asset.assetId),
});
if (!dbRow) {
if (request.cleanDanglingAssets) {
await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
logger.info(
`[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
);
} else {
logger.warn(
`[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
);
}
return;
}

if (request.syncAssetMetadata) {
await db
.update(assets)
.set({
contentType: asset.contentType,
fileName: asset.fileName,
size: asset.size,
})
.where(eq(assets.id, asset.assetId));
logger.info(
`[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`,
);
}
}

async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) {
const jobId = job.id ?? "unknown";

const request = zTidyAssetsRequestSchema.safeParse(job.data);
if (!request.success) {
throw new Error(
`[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`,
);
}

for await (const asset of getAllAssets()) {
try {
handleAsset(asset, request.data, jobId);
} catch (e) {
logger.error(
`[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`,
);
}
}
}
42 changes: 42 additions & 0 deletions packages/shared/assetdb.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as fs from "fs";
import * as path from "path";
import { Glob } from "glob";
import { z } from "zod";

import serverConfig from "./config";
Expand Down Expand Up @@ -120,6 +121,25 @@ export async function readAsset({
return { asset, metadata };
}

export async function readAssetMetadata({
userId,
assetId,
}: {
userId: string;
assetId: string;
}) {
const assetDir = getAssetDir(userId, assetId);

const metadataStr = await fs.promises.readFile(
path.join(assetDir, "metadata.json"),
{
encoding: "utf8",
},
);

return zAssetMetadataSchema.parse(JSON.parse(metadataStr));
}

export async function getAssetSize({
userId,
assetId,
Expand Down Expand Up @@ -154,3 +174,25 @@ export async function deleteUserAssets({ userId }: { userId: string }) {
}
await fs.promises.rm(userDir, { recursive: true });
}

export async function* getAllAssets() {
const g = new Glob(`/**/**/asset.bin`, {
maxDepth: 3,
root: ROOT_PATH,
cwd: ROOT_PATH,
absolute: false,
});
for await (const file of g) {
const [userId, assetId] = file.split("/").slice(0, 2);
const [size, metadata] = await Promise.all([
getAssetSize({ userId, assetId }),
readAssetMetadata({ userId, assetId }),
]);
yield {
userId,
assetId,
...metadata,
size,
};
}
}
1 change: 1 addition & 0 deletions packages/shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"type": "module",
"dependencies": {
"@hoarder/queue": "workspace:^0.1.0",
"glob": "^11.0.0",
"meilisearch": "^0.37.0",
"winston": "^3.11.0",
"zod": "^3.22.4"
Expand Down
16 changes: 16 additions & 0 deletions packages/shared/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
},
);

// Tidy Assets Worker
export const zTidyAssetsRequestSchema = z.object({
cleanDanglingAssets: z.boolean().optional().default(false),
syncAssetMetadata: z.boolean().optional().default(false),
});
export type ZTidyAssetsRequest = z.infer<typeof zTidyAssetsRequestSchema>;
export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>(
"tidy_assets_queue",
queueDB,
{
defaultJobArgs: {
numRetries: 1,
},
},
);

export async function triggerSearchReindex(bookmarkId: string) {
await SearchIndexingQueue.enqueue({
bookmarkId,
Expand Down
Loading

0 comments on commit c16173e

Please sign in to comment.