Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache in getGoogleDriveObject, improved fixParents #10588

Merged
merged 9 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 129 additions & 7 deletions connectors/src/connectors/google_drive/lib.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
import type { ContentNodesViewType, ModelId } from "@dust-tt/types";
import {
cacheWithRedis,
concurrentExecutor,
getGoogleIdsFromSheetContentNodeInternalId,
getGoogleSheetTableId,
isGoogleSheetContentNodeInternalId,
MIME_TYPES,
removeNulls,
} from "@dust-tt/types";
import type { Logger } from "pino";
import type { InferAttributes, WhereOptions } from "sequelize";

import { isGoogleDriveSpreadSheetFile } from "@connectors/connectors/google_drive/temporal/mime_types";
import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive";
import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api";
import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy";
import {
isGoogleDriveFolder,
isGoogleDriveSpreadSheetFile,
} from "@connectors/connectors/google_drive/temporal/mime_types";
import { deleteSpreadsheet } from "@connectors/connectors/google_drive/temporal/spreadsheets";
import {
getAuthObject,
getDriveFileId,
getInternalId,
} from "@connectors/connectors/google_drive/temporal/utils";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
deleteDataSourceDocument,
deleteDataSourceFolder,
updateDataSourceDocumentParents,
updateDataSourceTableParents,
upsertDataSourceFolder,
} from "@connectors/lib/data_sources";
import {
GoogleDriveFiles,
Expand Down Expand Up @@ -144,12 +157,21 @@ export async function internalDeleteFile(
});
}

export async function fixParents(
connector: ConnectorResource,
files: GoogleDriveFiles[],
logger: Logger,
execute: boolean = true
) {
export async function fixParents({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this function supposed to be removed after?
I assume yes; let's add a TODO(nodes-core) clean up after project

(if not, we need to explain in detail what this does in a comment and why we had to resort to such extremities)

Copy link
Contributor

@philipperolet philipperolet Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe after we just remove the call in activities/garbage collector (but in that case would still need to explain what we do & why in a com)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it, at least for cli. For GC if everything is ok we can at some point remove it too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched it to dry run when running from the garbage collect, this will just log if we have inconsistent parent ids (not checking from google to avoid load)
Still the cli may be necessary until we're sure everything is fixed in the code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we keep it then we need a good comment here because the function doesn't speak for itself

connector,
files,
startSyncTs,
checkFromGoogle = false,
execute = false,
logger,
}: {
connector: ConnectorResource;
files: GoogleDriveFiles[];
startSyncTs: number;
checkFromGoogle?: boolean;
execute?: boolean;
logger: Logger;
}) {
const parentFiles = await GoogleDriveFiles.findAll({
where: {
connectorId: connector.id,
Expand All @@ -163,8 +185,108 @@ export async function fixParents(
},
});

const authCredentials = await getAuthObject(connector.connectionId);

const googleFiles = checkFromGoogle
? removeNulls(
await concurrentExecutor(
files,
async (file) =>
getGoogleDriveObject(authCredentials, file.driveFileId, {
connectorId: connector.id,
ts: startSyncTs,
}),
{
concurrency: 10,
}
)
)
: [];

for (const file of files) {
if (file.parentId) {
if (checkFromGoogle) {
const googleFile = googleFiles.find((f) => f.id === file.driveFileId);
if (!googleFile) {
logger.info(
{ fileId: file.driveFileId },
"File does not exist in Google Drive, skipping"
);
continue;
} else {
const parents = await getFileParentsMemoized(
connector.id,
authCredentials,
googleFile,
startSyncTs
);
const localParents = await getLocalParents(
connector.id,
file.dustFileId,
`${startSyncTs}`
);

if (
JSON.stringify(parents.map((p) => getInternalId(p))) !==
JSON.stringify(localParents)
) {
logger.info(
{ dustFileId: file.dustFileId, localParents, parents },
"Parents not consistent with gdrive, updating"
);
const driveParentId = parents[1];
const dustParentId = driveParentId
? getInternalId(driveParentId)
: null;
if (execute) {
await file.update({
parentId: driveParentId,
});
const dataSourceConfig = dataSourceConfigFromConnector(connector);
const dustParentIds = parents.map((p) => getInternalId(p));
if (
isGoogleDriveFolder(googleFile) ||
isGoogleDriveSpreadSheetFile(googleFile)
) {
await upsertDataSourceFolder({
dataSourceConfig,
folderId: file.dustFileId,
parents: dustParentIds,
parentId: dustParentId,
title: file.name ?? "",
mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER,
sourceUrl: getSourceUrlForGoogleDriveFiles(file),
});
const sheets = await GoogleDriveSheet.findAll({
where: {
driveFileId: file.driveFileId,
connectorId: connector.id,
},
});
for (const sheet of sheets) {
const tableId = getGoogleSheetTableId(
sheet.driveFileId,
sheet.driveSheetId
);
await updateDataSourceTableParents({
dataSourceConfig,
tableId,
parents: [tableId, ...dustParentIds],
parentId: file.dustFileId,
});
}
} else {
await updateDataSourceDocumentParents({
dataSourceConfig,
documentId: file.dustFileId,
parents: dustParentIds,
parentId: dustParentId,
});
}
}
}
}
}
const parentFile = parentFiles.find(
(f) => f.driveFileId === file.parentId
);
Expand Down
26 changes: 19 additions & 7 deletions connectors/src/connectors/google_drive/lib/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ export const google_drive = async ({
throw new Error("Missing --fileId argument");
}
const fileId = args.fileId;

const now = Date.now();
const authCredentials = await getAuthObject(connector.connectionId);
const driveObject = await getGoogleDriveObject(
authCredentials,
getDriveFileId(fileId)
getDriveFileId(fileId),
{ connectorId: connector.id, ts: now }
);
if (!driveObject) {
throw new Error("Can't find google drive object");
Expand All @@ -124,7 +125,7 @@ export const google_drive = async ({
connector.id,
authCredentials,
driveObject,
Date.now()
now
);
return { status: 200, content: parents, type: typeof parents };
}
Expand All @@ -136,6 +137,7 @@ export const google_drive = async ({
const limit = 1000;
let fromId = 0;
let files: GoogleDriveFiles[] = [];
const now = Date.now();
do {
files = await GoogleDriveFiles.findAll({
where: {
Expand All @@ -145,11 +147,21 @@ export const google_drive = async ({
order: [["id", "ASC"]],
limit,
});
const connecrtorResource = new ConnectorResource(
ConnectorModel,
connector

const connectorResource = await ConnectorResource.fetchById(
connector.id
);
await fixParents(connecrtorResource, files, topLogger, execute);
if (!connectorResource) {
throw new Error("Connector not found");
}
await fixParents({
connector: connectorResource,
files,
checkFromGoogle: true,
execute,
startSyncTs: now,
logger: topLogger,
});
fromId = files[limit - 1]?.id ?? 0;
} while (fromId > 0);
return { success: true };
Expand Down
30 changes: 29 additions & 1 deletion connectors/src/connectors/google_drive/lib/google_drive_api.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { cacheWithRedis } from "@dust-tt/types";
import type { OAuth2Client } from "googleapis-common";
import type { GaxiosError } from "googleapis-common";

Expand All @@ -9,7 +10,12 @@ import { ExternalOAuthTokenError } from "@connectors/lib/error";
import type { GoogleDriveObjectType } from "@connectors/types/google_drive";
import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive";

export async function getGoogleDriveObject(
interface CacheKey {
connectorId: number;
ts: string | number;
}

async function _getGoogleDriveObject(
authCredentials: OAuth2Client,
driveObjectId: string
): Promise<GoogleDriveObjectType | null> {
Expand Down Expand Up @@ -39,3 +45,25 @@ export async function getGoogleDriveObject(
throw e;
}
}

const cachedGetGoogleDriveObject = cacheWithRedis<
GoogleDriveObjectType | null,
[OAuth2Client, string, CacheKey]
>(
_getGoogleDriveObject,
(_, driveObjectId, { connectorId, ts }) => {
return `${connectorId}:${driveObjectId}:${ts}`;
},
60 * 10 * 1000
);

export async function getGoogleDriveObject(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here what I had suggested was to move to an object argument, so we see in the calls that we are providing cache keys

authCredentials: OAuth2Client,
driveObjectId: string,
cacheKey?: CacheKey
): Promise<GoogleDriveObjectType | null> {
if (cacheKey) {
return cachedGetGoogleDriveObject(authCredentials, driveObjectId, cacheKey);
}
return _getGoogleDriveObject(authCredentials, driveObjectId);
}
4 changes: 2 additions & 2 deletions connectors/src/connectors/google_drive/lib/hierarchy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ async function getFileParents(
connectorId: ModelId,
authCredentials: OAuth2Client,
driveFile: GoogleDriveObjectType,
/* eslint-disable @typescript-eslint/no-unused-vars */
startSyncTs: number
): Promise<string[]> {
const logger = mainLogger.child({
Expand All @@ -33,7 +32,8 @@ async function getFileParents(
while (currentObject.parent) {
const parent = await getGoogleDriveObject(
authCredentials,
currentObject.parent
currentObject.parent,
{ connectorId, ts: startSyncTs }
);
if (!parent) {
// If we got a 404 error we stop the iteration as the parent disappeared.
Expand Down
23 changes: 19 additions & 4 deletions connectors/src/connectors/google_drive/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ export async function syncFiles(
const authCredentials = await getAuthObject(connector.connectionId);
const driveFolder = await getGoogleDriveObject(
authCredentials,
driveFolderId
driveFolderId,
{ connectorId, ts: startSyncTs }
);
if (!driveFolder) {
// We got a 404 on this folder, we skip it.
Expand Down Expand Up @@ -677,6 +678,8 @@ export async function garbageCollector(

localLogger.info("Google Drive: Starting garbage collector");

const ts = lastSeenTs || Date.now();

const files = await GoogleDriveFiles.findAll({
where: {
connectorId: connectorId,
Expand All @@ -692,7 +695,8 @@ export async function garbageCollector(
return queue.add(async () => {
const driveFile = await getGoogleDriveObject(
authCredentials,
file.driveFileId
file.driveFileId,
{ connectorId, ts }
);
if (!driveFile) {
// Could not find the file on Gdrive, deleting our local reference to it.
Expand All @@ -718,7 +722,15 @@ export async function garbageCollector(
})
);

await fixParents(connector, files, localLogger);
// TODO(nodes-core): Run fixParents in dry run mode to check parentIds validity
await fixParents({
connector,
files,
checkFromGoogle: false,
execute: false,
startSyncTs: ts,
logger: localLogger,
});

return files.length;
}
Expand Down Expand Up @@ -815,7 +827,10 @@ export async function markFolderAsVisited(
throw new Error(`Connector ${connectorId} not found`);
}
const authCredentials = await getAuthObject(connector.connectionId);
const file = await getGoogleDriveObject(authCredentials, driveFileId);
const file = await getGoogleDriveObject(authCredentials, driveFileId, {
connectorId,
ts: startSyncTs,
});

if (!file) {
logger.info(
Expand Down