diff --git a/connectors/migrations/20231214_find_non_shared_drives.ts b/connectors/migrations/20231214_find_non_shared_drives.ts index c9481581079d..9f862c6302a5 100644 --- a/connectors/migrations/20231214_find_non_shared_drives.ts +++ b/connectors/migrations/20231214_find_non_shared_drives.ts @@ -50,10 +50,10 @@ async function main() { }); for (const f of selectedFolders) { - const gDriveObject = await getGoogleDriveObject( + const gDriveObject = await getGoogleDriveObject({ authCredentials, - f.folderId - ); + driveObjectId: f.folderId, + }); if (!gDriveObject) { console.log(`Folder not found: folderId=${f.folderId}`); continue; diff --git a/connectors/migrations/20240422_fix_gdrive_errorType.ts b/connectors/migrations/20240422_fix_gdrive_errorType.ts index a3f9357e2846..66617cdfbbcb 100644 --- a/connectors/migrations/20240422_fix_gdrive_errorType.ts +++ b/connectors/migrations/20240422_fix_gdrive_errorType.ts @@ -23,7 +23,10 @@ async function main() { for (const connector of connectors) { try { const auth = await getAuthObject(connector.connectionId); - const gDriveObject = await getGoogleDriveObject(auth, "root"); + const gDriveObject = await getGoogleDriveObject({ + authCredentials: auth, + driveObjectId: "root", + }); logger.info( { connectorId: connector.id, diff --git a/connectors/migrations/20240529_clean_gdrive_folders.ts b/connectors/migrations/20240529_clean_gdrive_folders.ts index d17e77abf761..a54a0c1b4a7e 100644 --- a/connectors/migrations/20240529_clean_gdrive_folders.ts +++ b/connectors/migrations/20240529_clean_gdrive_folders.ts @@ -81,7 +81,10 @@ async function main() { continue; } - const file = await getGoogleDriveObject(authCredentials, folderId); + const file = await getGoogleDriveObject({ + authCredentials, + driveObjectId: folderId, + }); if (!file) { logger.info( diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index d91378038465..8d2210d7d9d0 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -378,10 +378,10 @@ export class GoogleDriveConnectorManager extends BaseConnectorManager { const nodes: ContentNode[] = await Promise.all( drives.map(async (d): Promise => { - const driveObject = await getGoogleDriveObject( + const driveObject = await getGoogleDriveObject({ authCredentials, - d.id - ); + driveObjectId: d.id, + }); if (!driveObject) { throw new Error( `Drive ${d.id} unexpectedly not found (got 404).` @@ -957,7 +957,10 @@ async function getFoldersAsContentNodes({ return concurrentExecutor( folders, async (f): Promise => { - const fd = await getGoogleDriveObject(authCredentials, f.folderId); + const fd = await getGoogleDriveObject({ + authCredentials, + driveObjectId: f.folderId, + }); if (!fd) { return null; } diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index e90af6636ac4..db5f35d60c6b 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -1,16 +1,26 @@ import type { ContentNodesViewType, ModelId } from "@dust-tt/types"; import { cacheWithRedis, + concurrentExecutor, getGoogleIdsFromSheetContentNodeInternalId, + getGoogleSheetTableId, isGoogleSheetContentNodeInternalId, + MIME_TYPES, removeNulls, } from "@dust-tt/types"; import type { Logger } from "pino"; import type { InferAttributes, WhereOptions } from "sequelize"; -import { isGoogleDriveSpreadSheetFile } from "@connectors/connectors/google_drive/temporal/mime_types"; +import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive"; +import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api"; +import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy"; +import { + isGoogleDriveFolder, + isGoogleDriveSpreadSheetFile, +} from "@connectors/connectors/google_drive/temporal/mime_types"; import { deleteSpreadsheet } from "@connectors/connectors/google_drive/temporal/spreadsheets"; import { + getAuthObject, getDriveFileId, getInternalId, } from "@connectors/connectors/google_drive/temporal/utils"; @@ -18,6 +28,9 @@ import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_c import { deleteDataSourceDocument, deleteDataSourceFolder, + updateDataSourceDocumentParents, + updateDataSourceTableParents, + upsertDataSourceFolder, } from "@connectors/lib/data_sources"; import { GoogleDriveFiles, @@ -144,12 +157,168 @@ export async function internalDeleteFile( }); } -export async function fixParents( - connector: ConnectorResource, - files: GoogleDriveFiles[], - logger: Logger, - execute: boolean = true -) { +/** + * Fixes parent-child relationship consistency for Google Drive files. + * + * This function performs two main checks: + * 1. If checkFromGoogle=true, verifies that local parent relationships match Google Drive + * 2. Validates that all parent IDs reference valid files or root folders + * + * For any inconsistencies found: + * - If execute=false, only logs the issues + * - If execute=true, fixes the inconsistencies by: + * - Deleting files that don't exist in Google Drive + * - Updating parent relationships to match Google Drive + * - Deleting files with invalid parents + * - Resetting parent to null for root folders + * + * @param connector - The Google Drive connector resource + * @param files - List of GoogleDriveFiles to check + * @param startSyncTs - Timestamp for caching/memoization + * @param checkFromGoogle - Whether to verify against Google Drive API + * @param execute - Whether to fix inconsistencies or just log them + * @param logger - Logger instance for recording issues + */ +export async function fixParentsConsistency({ + connector, + files, + startSyncTs, + checkFromGoogle = false, + execute = false, + logger, +}: { + connector: ConnectorResource; + files: GoogleDriveFiles[]; + startSyncTs: number; + checkFromGoogle?: boolean; + execute?: boolean; + logger: Logger; +}) { + // First check consistency with Google Drive + if (checkFromGoogle) { + logger.info("Checking consistency with Google Drive"); + const authCredentials = await getAuthObject(connector.connectionId); + + const googleFiles = removeNulls( + await concurrentExecutor( + files, + async (file) => + getGoogleDriveObject({ + authCredentials, + driveObjectId: file.driveFileId, + cacheKey: { + connectorId: connector.id, + ts: startSyncTs, + }, + }), + { + concurrency: 10, + } + ) + ); + + for (const file of files) { + const googleFile = googleFiles.find((f) => f.id === file.driveFileId); + if (!googleFile) { + logger.info( + { fileId: file.driveFileId }, + "File does not exist in Google Drive, deleting" + ); + if (execute) { + await internalDeleteFile(connector, file); + } + } else { + const parents = await getFileParentsMemoized( + connector.id, + authCredentials, + googleFile, + startSyncTs + ); + const localParents = await getLocalParents( + connector.id, + file.dustFileId, + `${startSyncTs}` + ); + if (parents[parents.length - 1] === "gdrive_outside_sync") { + logger.info( + { dustFileId: file.dustFileId }, + "File is outside of sync, deleting" + ); + if (execute) { + await internalDeleteFile(connector, file); + } + } else if ( + JSON.stringify(parents.map((p) => getInternalId(p))) !== + JSON.stringify(localParents) + ) { + logger.info( + { dustFileId: file.dustFileId, localParents, parents }, + "Parents not consistent with gdrive, updating" + ); + const driveParentId = parents[1]; + const dustParentId = driveParentId + ? getInternalId(driveParentId) + : null; + if (execute) { + await file.update({ + parentId: driveParentId, + }); + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const dustParentIds = parents.map((p) => getInternalId(p)); + if ( + isGoogleDriveFolder(googleFile) || + isGoogleDriveSpreadSheetFile(googleFile) + ) { + await upsertDataSourceFolder({ + dataSourceConfig, + folderId: file.dustFileId, + parents: dustParentIds, + parentId: dustParentId, + title: file.name ?? "", + mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER, + sourceUrl: getSourceUrlForGoogleDriveFiles(file), + }); + const sheets = await GoogleDriveSheet.findAll({ + where: { + driveFileId: file.driveFileId, + connectorId: connector.id, + }, + }); + for (const sheet of sheets) { + const tableId = getGoogleSheetTableId( + sheet.driveFileId, + sheet.driveSheetId + ); + await updateDataSourceTableParents({ + dataSourceConfig, + tableId, + parents: [tableId, ...dustParentIds], + parentId: file.dustFileId, + }); + } + } else { + await updateDataSourceDocumentParents({ + dataSourceConfig, + documentId: file.dustFileId, + parents: dustParentIds, + parentId: dustParentId, + }); + } + } + } + } + } + + // Re-fetch files to ensure we only process non-deleted ones + files = await GoogleDriveFiles.findAll({ + where: { + id: files.map((f) => f.id), + }, + }); + } + + logger.info("Checking parentIds validity"); + const parentFiles = await GoogleDriveFiles.findAll({ where: { connectorId: connector.id, @@ -164,6 +333,15 @@ export async function fixParents( }); for (const file of files) { + if (!file.parentId && !roots.find((r) => r.folderId === file.driveFileId)) { + logger.info( + { fileId: file.driveFileId }, + "Deleting file with no parent and not a root folder" + ); + if (execute) { + await internalDeleteFile(connector, file); + } + } if (file.parentId) { const parentFile = parentFiles.find( (f) => f.driveFileId === file.parentId diff --git a/connectors/src/connectors/google_drive/lib/cli.ts b/connectors/src/connectors/google_drive/lib/cli.ts index ebd6c40114fa..2bc690ff7ab1 100644 --- a/connectors/src/connectors/google_drive/lib/cli.ts +++ b/connectors/src/connectors/google_drive/lib/cli.ts @@ -7,7 +7,7 @@ import { googleDriveIncrementalSyncWorkflowId } from "@dust-tt/types"; import { Op } from "sequelize"; import { getConnectorManager } from "@connectors/connectors"; -import { fixParents } from "@connectors/connectors/google_drive/lib"; +import { fixParentsConsistency } from "@connectors/connectors/google_drive/lib"; import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api"; import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy"; import { launchGoogleDriveIncrementalSyncWorkflow } from "@connectors/connectors/google_drive/temporal/client"; @@ -111,12 +111,13 @@ export const google_drive = async ({ throw new Error("Missing --fileId argument"); } const fileId = args.fileId; - + const now = Date.now(); const authCredentials = await getAuthObject(connector.connectionId); - const driveObject = await getGoogleDriveObject( + const driveObject = await getGoogleDriveObject({ authCredentials, - getDriveFileId(fileId) - ); + driveObjectId: getDriveFileId(fileId), + cacheKey: { connectorId: connector.id, ts: now }, + }); if (!driveObject) { throw new Error("Can't find google drive object"); } @@ -124,7 +125,7 @@ export const google_drive = async ({ connector.id, authCredentials, driveObject, - Date.now() + now ); return { status: 200, content: parents, type: typeof parents }; } @@ -136,6 +137,7 @@ export const google_drive = async ({ const limit = 1000; let fromId = 0; let files: GoogleDriveFiles[] = []; + const now = Date.now(); do { files = await GoogleDriveFiles.findAll({ where: { @@ -145,11 +147,21 @@ export const google_drive = async ({ order: [["id", "ASC"]], limit, }); - const connecrtorResource = new ConnectorResource( - ConnectorModel, - connector + + const connectorResource = await ConnectorResource.fetchById( + connector.id ); - await fixParents(connecrtorResource, files, topLogger, execute); + if (!connectorResource) { + throw new Error("Connector not found"); + } + await fixParentsConsistency({ + connector: connectorResource, + files, + checkFromGoogle: true, + execute, + startSyncTs: now, + logger: topLogger, + }); fromId = files[limit - 1]?.id ?? 0; } while (fromId > 0); return { success: true }; diff --git a/connectors/src/connectors/google_drive/lib/google_drive_api.ts b/connectors/src/connectors/google_drive/lib/google_drive_api.ts index d4e7bfabd2c6..06afa513dc70 100644 --- a/connectors/src/connectors/google_drive/lib/google_drive_api.ts +++ b/connectors/src/connectors/google_drive/lib/google_drive_api.ts @@ -1,3 +1,4 @@ +import { cacheWithRedis } from "@dust-tt/types"; import type { OAuth2Client } from "googleapis-common"; import type { GaxiosError } from "googleapis-common"; @@ -9,10 +10,18 @@ import { ExternalOAuthTokenError } from "@connectors/lib/error"; import type { GoogleDriveObjectType } from "@connectors/types/google_drive"; import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive"; -export async function getGoogleDriveObject( - authCredentials: OAuth2Client, - driveObjectId: string -): Promise { +interface CacheKey { + connectorId: number; + ts: string | number; +} + +async function _getGoogleDriveObject({ + authCredentials, + driveObjectId, +}: { + authCredentials: OAuth2Client; + driveObjectId: string; +}): Promise { const drive = await getDriveClient(authCredentials); try { @@ -39,3 +48,39 @@ export async function getGoogleDriveObject( throw e; } } + +const cachedGetGoogleDriveObject = cacheWithRedis< + GoogleDriveObjectType | null, + [ + { + authCredentials: OAuth2Client; + driveObjectId: string; + cacheKey: CacheKey; + }, + ] +>( + _getGoogleDriveObject, + ({ driveObjectId, cacheKey }) => { + return `${cacheKey.connectorId}:${driveObjectId}:${cacheKey.ts}`; + }, + 60 * 10 * 1000 +); + +export async function getGoogleDriveObject({ + authCredentials, + driveObjectId, + cacheKey, +}: { + authCredentials: OAuth2Client; + driveObjectId: string; + cacheKey?: CacheKey; +}): Promise { + if (cacheKey) { + return cachedGetGoogleDriveObject({ + authCredentials, + driveObjectId, + cacheKey, + }); + } + return _getGoogleDriveObject({ authCredentials, driveObjectId }); +} diff --git a/connectors/src/connectors/google_drive/lib/hierarchy.ts b/connectors/src/connectors/google_drive/lib/hierarchy.ts index 7b6d41d64899..db8d04d92b4f 100644 --- a/connectors/src/connectors/google_drive/lib/hierarchy.ts +++ b/connectors/src/connectors/google_drive/lib/hierarchy.ts @@ -20,7 +20,6 @@ async function getFileParents( connectorId: ModelId, authCredentials: OAuth2Client, driveFile: GoogleDriveObjectType, - /* eslint-disable @typescript-eslint/no-unused-vars */ startSyncTs: number ): Promise { const logger = mainLogger.child({ @@ -31,10 +30,11 @@ async function getFileParents( const parents: string[] = [driveFile.id]; let currentObject = driveFile; while (currentObject.parent) { - const parent = await getGoogleDriveObject( + const parent = await getGoogleDriveObject({ authCredentials, - currentObject.parent - ); + driveObjectId: currentObject.parent, + cacheKey: { connectorId, ts: startSyncTs }, + }); if (!parent) { // If we got a 404 error we stop the iteration as the parent disappeared. logger.info("Parent not found in `getFileParents`", { diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index 88f8f02d8fc3..490561139883 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -10,7 +10,7 @@ import { Op } from "sequelize"; import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive"; import { - fixParents, + fixParentsConsistency, internalDeleteFile, } from "@connectors/connectors/google_drive/lib"; import { @@ -135,10 +135,10 @@ export async function getDrivesToSync( const drives: Record = {}; for (const folder of selectedFolders) { - const remoteFolder = await getGoogleDriveObject( + const remoteFolder = await getGoogleDriveObject({ authCredentials, - folder.folderId - ); + driveObjectId: folder.folderId, + }); if (remoteFolder) { if (!remoteFolder.driveId) { throw new Error(`Folder ${folder.folderId} does not have a driveId.`); @@ -196,10 +196,11 @@ export async function syncFiles( csvEnabled: config?.csvEnabled || false, }); const authCredentials = await getAuthObject(connector.connectionId); - const driveFolder = await getGoogleDriveObject( + const driveFolder = await getGoogleDriveObject({ authCredentials, - driveFolderId - ); + driveObjectId: driveFolderId, + cacheKey: { connectorId, ts: startSyncTs }, + }); if (!driveFolder) { // We got a 404 on this folder, we skip it. logger.info( @@ -524,7 +525,7 @@ export async function incrementalSync( driveFileId: file.id, name: file.name, mimeType: file.mimeType, - parentId: file.parent, + parentId: parents[1] ? getDriveFileId(parents[1]) : null, lastSeenTs: new Date(), }); localLogger.info({ fileId: change.file.id }, "done syncing file"); @@ -643,10 +644,10 @@ export async function shouldGarbageCollect(connectorId: ModelId) { const authCredentials = await getAuthObject(connector.connectionId); for (const folder of selectedFolder) { - const remoteFolder = await getGoogleDriveObject( + const remoteFolder = await getGoogleDriveObject({ authCredentials, - folder.folderId - ); + driveObjectId: folder.folderId, + }); if (!remoteFolder) { return true; } @@ -677,6 +678,8 @@ export async function garbageCollector( localLogger.info("Google Drive: Starting garbage collector"); + const ts = lastSeenTs || Date.now(); + const files = await GoogleDriveFiles.findAll({ where: { connectorId: connectorId, @@ -690,10 +693,11 @@ export async function garbageCollector( await Promise.all( files.map(async (file) => { return queue.add(async () => { - const driveFile = await getGoogleDriveObject( + const driveFile = await getGoogleDriveObject({ authCredentials, - file.driveFileId - ); + driveObjectId: file.driveFileId, + cacheKey: { connectorId, ts }, + }); if (!driveFile) { // Could not find the file on Gdrive, deleting our local reference to it. await deleteFile(file); @@ -718,7 +722,15 @@ export async function garbageCollector( }) ); - await fixParents(connector, files, localLogger); + // TODO(nodes-core): Run fixParents in dry run mode to check parentIds validity + await fixParentsConsistency({ + connector, + files, + checkFromGoogle: false, + execute: false, + startSyncTs: ts, + logger: localLogger, + }); return files.length; } @@ -815,7 +827,11 @@ export async function markFolderAsVisited( throw new Error(`Connector ${connectorId} not found`); } const authCredentials = await getAuthObject(connector.connectionId); - const file = await getGoogleDriveObject(authCredentials, driveFileId); + const file = await getGoogleDriveObject({ + authCredentials, + driveObjectId: driveFileId, + cacheKey: { connectorId, ts: startSyncTs }, + }); if (!file) { logger.info(