From 21bb37c3ac9c7144e9c33be51c075a8a50b6d42e Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 14:47:59 +0100 Subject: [PATCH 1/8] Add cache in getGoogleDriveObject, improved fixParents --- connectors/src/connectors/google_drive/lib.ts | 117 ++++++++++++++++-- .../src/connectors/google_drive/lib/cli.ts | 27 ++-- .../google_drive/lib/google_drive_api.ts | 31 ++++- .../connectors/google_drive/lib/hierarchy.ts | 5 +- .../google_drive/temporal/activities.ts | 26 +++- 5 files changed, 185 insertions(+), 21 deletions(-) diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index e90af6636ac4..3d714c5ae9f8 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -1,16 +1,25 @@ import type { ContentNodesViewType, ModelId } from "@dust-tt/types"; import { cacheWithRedis, + concurrentExecutor, getGoogleIdsFromSheetContentNodeInternalId, isGoogleSheetContentNodeInternalId, + MIME_TYPES, removeNulls, } from "@dust-tt/types"; import type { Logger } from "pino"; import type { InferAttributes, WhereOptions } from "sequelize"; -import { isGoogleDriveSpreadSheetFile } from "@connectors/connectors/google_drive/temporal/mime_types"; +import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive"; +import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api"; +import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy"; +import { + isGoogleDriveFolder, + isGoogleDriveSpreadSheetFile, +} from "@connectors/connectors/google_drive/temporal/mime_types"; import { deleteSpreadsheet } from "@connectors/connectors/google_drive/temporal/spreadsheets"; import { + getAuthObject, getDriveFileId, getInternalId, } from "@connectors/connectors/google_drive/temporal/utils"; @@ -18,6 +27,8 @@ import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_c import { deleteDataSourceDocument, deleteDataSourceFolder, + updateDataSourceDocumentParents, + upsertDataSourceFolder, } from "@connectors/lib/data_sources"; import { GoogleDriveFiles, @@ -144,12 +155,21 @@ export async function internalDeleteFile( }); } -export async function fixParents( - connector: ConnectorResource, - files: GoogleDriveFiles[], - logger: Logger, - execute: boolean = true -) { +export async function fixParents({ + connector, + files, + startSyncTs, + checkFromGoogle = false, + execute = false, + logger, +}: { + connector: ConnectorResource; + files: GoogleDriveFiles[]; + startSyncTs: number; + checkFromGoogle?: boolean; + execute?: boolean; + logger: Logger; +}) { const parentFiles = await GoogleDriveFiles.findAll({ where: { connectorId: connector.id, @@ -163,8 +183,91 @@ export async function fixParents( }, }); + const authCredentials = await getAuthObject(connector.connectionId); + + const googleFiles = checkFromGoogle + ? removeNulls( + await concurrentExecutor( + files, + async (file) => + getGoogleDriveObject( + authCredentials, + file.driveFileId, + connector.id, + startSyncTs + ), + { + concurrency: 10, + } + ) + ) + : []; + for (const file of files) { if (file.parentId) { + if (checkFromGoogle) { + const googleFile = googleFiles.find((f) => f.id === file.driveFileId); + if (!googleFile) { + logger.info( + { fileId: file.driveFileId }, + "File does not exist in Google Drive, skipping" + ); + continue; + } else { + const parents = await getFileParentsMemoized( + connector.id, + authCredentials, + googleFile, + startSyncTs + ); + const localParents = await getLocalParents( + connector.id, + file.dustFileId, + `${startSyncTs}` + ); + + if ( + JSON.stringify(parents.map((p) => getInternalId(p))) !== + JSON.stringify(localParents) + ) { + logger.info( + { localParents, parents }, + "Parents not consistent with gdrive, updating" + ); + const driveParentId = parents[1]; + const dustParentId = driveParentId + ? getInternalId(driveParentId) + : null; + if (execute) { + await file.update({ + parentId: driveParentId, + }); + const dataSourceConfig = dataSourceConfigFromConnector(connector); + if ( + isGoogleDriveFolder(googleFile) || + isGoogleDriveSpreadSheetFile(googleFile) + ) { + await upsertDataSourceFolder({ + dataSourceConfig, + folderId: file.dustFileId, + parents: parents.map((p) => getInternalId(p)), + parentId: dustParentId, + title: file.name ?? "", + mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER, + sourceUrl: getSourceUrlForGoogleDriveFiles(file), + }); + } else { + await updateDataSourceDocumentParents({ + dataSourceConfig, + documentId: file.dustFileId, + parents: parents.map((p) => getInternalId(p)), + parentId: dustParentId, + }); + } + } + } + } + } const parentFile = parentFiles.find( (f) => f.driveFileId === file.parentId ); diff --git a/connectors/src/connectors/google_drive/lib/cli.ts b/connectors/src/connectors/google_drive/lib/cli.ts index ebd6c40114fa..ee0d4b7c8644 100644 --- a/connectors/src/connectors/google_drive/lib/cli.ts +++ b/connectors/src/connectors/google_drive/lib/cli.ts @@ -111,11 +111,13 @@ export const google_drive = async ({ throw new Error("Missing --fileId argument"); } const fileId = args.fileId; - + const now = Date.now(); const authCredentials = await getAuthObject(connector.connectionId); const driveObject = await getGoogleDriveObject( authCredentials, - getDriveFileId(fileId) + getDriveFileId(fileId), + connector.id, + now ); if (!driveObject) { throw new Error("Can't find google drive object"); @@ -124,7 +126,7 @@ export const google_drive = async ({ connector.id, authCredentials, driveObject, - Date.now() + now ); return { status: 200, content: parents, type: typeof parents }; } @@ -136,6 +138,7 @@ export const google_drive = async ({ const limit = 1000; let fromId = 0; let files: GoogleDriveFiles[] = []; + const now = Date.now(); do { files = await GoogleDriveFiles.findAll({ where: { @@ -145,11 +148,21 @@ export const google_drive = async ({ order: [["id", "ASC"]], limit, }); - const connecrtorResource = new ConnectorResource( - ConnectorModel, - connector + + const connectorResource = await ConnectorResource.fetchById( + connector.id ); - await fixParents(connecrtorResource, files, topLogger, execute); + if (!connectorResource) { + throw new Error("Connector not found"); + } + await fixParents({ + connector: connectorResource, + files, + checkFromGoogle: true, + execute, + startSyncTs: now, + logger: topLogger, + }); fromId = files[limit - 1]?.id ?? 0; } while (fromId > 0); return { success: true }; diff --git a/connectors/src/connectors/google_drive/lib/google_drive_api.ts b/connectors/src/connectors/google_drive/lib/google_drive_api.ts index d4e7bfabd2c6..5861fc7889d8 100644 --- a/connectors/src/connectors/google_drive/lib/google_drive_api.ts +++ b/connectors/src/connectors/google_drive/lib/google_drive_api.ts @@ -1,3 +1,4 @@ +import { cacheWithRedis } from "@dust-tt/types"; import type { OAuth2Client } from "googleapis-common"; import type { GaxiosError } from "googleapis-common"; @@ -9,7 +10,7 @@ import { ExternalOAuthTokenError } from "@connectors/lib/error"; import type { GoogleDriveObjectType } from "@connectors/types/google_drive"; import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive"; -export async function getGoogleDriveObject( +async function _getGoogleDriveObject( authCredentials: OAuth2Client, driveObjectId: string ): Promise { @@ -39,3 +40,31 @@ export async function getGoogleDriveObject( throw e; } } + +const cachedGetGoogleDriveObject = cacheWithRedis< + GoogleDriveObjectType | null, + [OAuth2Client, string, number, string | number] +>( + _getGoogleDriveObject, + (_, driveObjectId, connectorId, memoizationKey) => { + return `${connectorId}:${driveObjectId}:${memoizationKey}`; + }, + 60 * 10 * 1000 +); + +export async function getGoogleDriveObject( + authCredentials: OAuth2Client, + driveObjectId: string, + connectorId?: number, + memoizationKey?: string | number +): Promise { + if (connectorId && memoizationKey) { + return cachedGetGoogleDriveObject( + authCredentials, + driveObjectId, + connectorId, + memoizationKey + ); + } + return _getGoogleDriveObject(authCredentials, driveObjectId); +} diff --git a/connectors/src/connectors/google_drive/lib/hierarchy.ts b/connectors/src/connectors/google_drive/lib/hierarchy.ts index 7b6d41d64899..f4960e6cc957 100644 --- a/connectors/src/connectors/google_drive/lib/hierarchy.ts +++ b/connectors/src/connectors/google_drive/lib/hierarchy.ts @@ -20,7 +20,6 @@ async function getFileParents( connectorId: ModelId, authCredentials: OAuth2Client, driveFile: GoogleDriveObjectType, - /* eslint-disable @typescript-eslint/no-unused-vars */ startSyncTs: number ): Promise { const logger = mainLogger.child({ @@ -33,7 +32,9 @@ async function getFileParents( while (currentObject.parent) { const parent = await getGoogleDriveObject( authCredentials, - currentObject.parent + currentObject.parent, + connectorId, + startSyncTs ); if (!parent) { // If we got a 404 error we stop the iteration as the parent disappeared. diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index 88f8f02d8fc3..29f389c562ab 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -198,7 +198,9 @@ export async function syncFiles( const authCredentials = await getAuthObject(connector.connectionId); const driveFolder = await getGoogleDriveObject( authCredentials, - driveFolderId + driveFolderId, + connectorId, + startSyncTs ); if (!driveFolder) { // We got a 404 on this folder, we skip it. @@ -677,6 +679,8 @@ export async function garbageCollector( localLogger.info("Google Drive: Starting garbage collector"); + const ts = lastSeenTs || Date.now(); + const files = await GoogleDriveFiles.findAll({ where: { connectorId: connectorId, @@ -692,7 +696,9 @@ export async function garbageCollector( return queue.add(async () => { const driveFile = await getGoogleDriveObject( authCredentials, - file.driveFileId + file.driveFileId, + connectorId, + ts ); if (!driveFile) { // Could not find the file on Gdrive, deleting our local reference to it. @@ -718,7 +724,14 @@ export async function garbageCollector( }) ); - await fixParents(connector, files, localLogger); + await fixParents({ + connector, + files, + checkFromGoogle: false, + execute: true, + startSyncTs: ts, + logger: localLogger, + }); return files.length; } @@ -815,7 +828,12 @@ export async function markFolderAsVisited( throw new Error(`Connector ${connectorId} not found`); } const authCredentials = await getAuthObject(connector.connectionId); - const file = await getGoogleDriveObject(authCredentials, driveFileId); + const file = await getGoogleDriveObject( + authCredentials, + driveFileId, + connectorId, + startSyncTs + ); if (!file) { logger.info( From 72c5a3c568d8f0432e2912a16ebe5d9f5d4e2a0f Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 15:57:25 +0100 Subject: [PATCH 2/8] use dedicated key type --- connectors/src/connectors/google_drive/lib.ts | 10 ++++---- .../src/connectors/google_drive/lib/cli.ts | 3 +-- .../google_drive/lib/google_drive_api.ts | 23 +++++++++---------- .../connectors/google_drive/lib/hierarchy.ts | 3 +-- .../google_drive/temporal/activities.ts | 14 ++++------- 5 files changed, 22 insertions(+), 31 deletions(-) diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index 3d714c5ae9f8..58ea6ed6ccbf 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -190,12 +190,10 @@ export async function fixParents({ await concurrentExecutor( files, async (file) => - getGoogleDriveObject( - authCredentials, - file.driveFileId, - connector.id, - startSyncTs - ), + getGoogleDriveObject(authCredentials, file.driveFileId, { + connectorId: connector.id, + ts: startSyncTs, + }), { concurrency: 10, } diff --git a/connectors/src/connectors/google_drive/lib/cli.ts b/connectors/src/connectors/google_drive/lib/cli.ts index ee0d4b7c8644..ea381e6311c1 100644 --- a/connectors/src/connectors/google_drive/lib/cli.ts +++ b/connectors/src/connectors/google_drive/lib/cli.ts @@ -116,8 +116,7 @@ export const google_drive = async ({ const driveObject = await getGoogleDriveObject( authCredentials, getDriveFileId(fileId), - connector.id, - now + { connectorId: connector.id, ts: now } ); if (!driveObject) { throw new Error("Can't find google drive object"); diff --git a/connectors/src/connectors/google_drive/lib/google_drive_api.ts b/connectors/src/connectors/google_drive/lib/google_drive_api.ts index 5861fc7889d8..21c6aa03fa13 100644 --- a/connectors/src/connectors/google_drive/lib/google_drive_api.ts +++ b/connectors/src/connectors/google_drive/lib/google_drive_api.ts @@ -10,6 +10,11 @@ import { ExternalOAuthTokenError } from "@connectors/lib/error"; import type { GoogleDriveObjectType } from "@connectors/types/google_drive"; import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive"; +interface CacheKey { + connectorId: number; + ts: string | number; +} + async function _getGoogleDriveObject( authCredentials: OAuth2Client, driveObjectId: string @@ -43,11 +48,11 @@ async function _getGoogleDriveObject( const cachedGetGoogleDriveObject = cacheWithRedis< GoogleDriveObjectType | null, - [OAuth2Client, string, number, string | number] + [OAuth2Client, string, CacheKey] >( _getGoogleDriveObject, - (_, driveObjectId, connectorId, memoizationKey) => { - return `${connectorId}:${driveObjectId}:${memoizationKey}`; + (_, driveObjectId, { connectorId, ts }) => { + return `${connectorId}:${driveObjectId}:${ts}`; }, 60 * 10 * 1000 ); @@ -55,16 +60,10 @@ const cachedGetGoogleDriveObject = cacheWithRedis< export async function getGoogleDriveObject( authCredentials: OAuth2Client, driveObjectId: string, - connectorId?: number, - memoizationKey?: string | number + cacheKey?: CacheKey ): Promise { - if (connectorId && memoizationKey) { - return cachedGetGoogleDriveObject( - authCredentials, - driveObjectId, - connectorId, - memoizationKey - ); + if (cacheKey) { + return cachedGetGoogleDriveObject(authCredentials, driveObjectId, cacheKey); } return _getGoogleDriveObject(authCredentials, driveObjectId); } diff --git a/connectors/src/connectors/google_drive/lib/hierarchy.ts b/connectors/src/connectors/google_drive/lib/hierarchy.ts index f4960e6cc957..ed8fa93a0e65 100644 --- a/connectors/src/connectors/google_drive/lib/hierarchy.ts +++ b/connectors/src/connectors/google_drive/lib/hierarchy.ts @@ -33,8 +33,7 @@ async function getFileParents( const parent = await getGoogleDriveObject( authCredentials, currentObject.parent, - connectorId, - startSyncTs + { connectorId, ts: startSyncTs } ); if (!parent) { // If we got a 404 error we stop the iteration as the parent disappeared. diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index 29f389c562ab..296d20920aaa 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -199,8 +199,7 @@ export async function syncFiles( const driveFolder = await getGoogleDriveObject( authCredentials, driveFolderId, - connectorId, - startSyncTs + { connectorId, ts: startSyncTs } ); if (!driveFolder) { // We got a 404 on this folder, we skip it. @@ -697,8 +696,7 @@ export async function garbageCollector( const driveFile = await getGoogleDriveObject( authCredentials, file.driveFileId, - connectorId, - ts + { connectorId, ts } ); if (!driveFile) { // Could not find the file on Gdrive, deleting our local reference to it. @@ -828,12 +826,10 @@ export async function markFolderAsVisited( throw new Error(`Connector ${connectorId} not found`); } const authCredentials = await getAuthObject(connector.connectionId); - const file = await getGoogleDriveObject( - authCredentials, - driveFileId, + const file = await getGoogleDriveObject(authCredentials, driveFileId, { connectorId, - startSyncTs - ); + ts: startSyncTs, + }); if (!file) { logger.info( From b6210af72d57f344d2a01db30b00cd7d3cc7a976 Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 16:08:43 +0100 Subject: [PATCH 3/8] Add comment, use dry run --- connectors/src/connectors/google_drive/temporal/activities.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index 296d20920aaa..c83b91971b6a 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -722,11 +722,12 @@ export async function garbageCollector( }) ); + // TODO(nodes-core): Run fixParents in dry run mode to check parentIds validity await fixParents({ connector, files, checkFromGoogle: false, - execute: true, + execute: false, startSyncTs: ts, logger: localLogger, }); From 7a3b679886040335c42639be099f4efd6d27b305 Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 16:29:15 +0100 Subject: [PATCH 4/8] Update spreadsheets --- connectors/src/connectors/google_drive/lib.ts | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index 58ea6ed6ccbf..8720901c5f63 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -3,6 +3,7 @@ import { cacheWithRedis, concurrentExecutor, getGoogleIdsFromSheetContentNodeInternalId, + getGoogleSheetTableId, isGoogleSheetContentNodeInternalId, MIME_TYPES, removeNulls, @@ -28,6 +29,7 @@ import { deleteDataSourceDocument, deleteDataSourceFolder, updateDataSourceDocumentParents, + updateDataSourceTableParents, upsertDataSourceFolder, } from "@connectors/lib/data_sources"; import { @@ -229,7 +231,7 @@ export async function fixParents({ JSON.stringify(localParents) ) { logger.info( - { localParents, parents }, + { dustFileId: file.dustFileId, localParents, parents }, "Parents not consistent with gdrive, updating" ); const driveParentId = parents[1]; @@ -241,6 +243,7 @@ export async function fixParents({ parentId: driveParentId, }); const dataSourceConfig = dataSourceConfigFromConnector(connector); + const dustParentIds = parents.map((p) => getInternalId(p)); if ( isGoogleDriveFolder(googleFile) || isGoogleDriveSpreadSheetFile(googleFile) @@ -248,17 +251,35 @@ export async function fixParents({ await upsertDataSourceFolder({ dataSourceConfig, folderId: file.dustFileId, - parents: parents.map((p) => getInternalId(p)), + parents: dustParentIds, parentId: dustParentId, title: file.name ?? "", mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER, sourceUrl: getSourceUrlForGoogleDriveFiles(file), }); + const sheets = await GoogleDriveSheet.findAll({ + where: { + driveFileId: file.driveFileId, + connectorId: connector.id, + }, + }); + for (const sheet of sheets) { + const tableId = getGoogleSheetTableId( + sheet.driveFileId, + sheet.driveSheetId + ); + await updateDataSourceTableParents({ + dataSourceConfig, + tableId, + parents: [tableId, ...dustParentIds], + parentId: file.dustFileId, + }); + } } else { await updateDataSourceDocumentParents({ dataSourceConfig, documentId: file.dustFileId, - parents: parents.map((p) => getInternalId(p)), + parents: dustParentIds, parentId: dustParentId, }); } From e9d869317cd4245c7cc91930f27b2f2317ae1210 Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 17:31:22 +0100 Subject: [PATCH 5/8] use object for params: --- .../20231214_find_non_shared_drives.ts | 6 +-- .../20240422_fix_gdrive_errorType.ts | 5 ++- .../20240529_clean_gdrive_folders.ts | 5 ++- .../src/connectors/google_drive/index.ts | 11 +++-- connectors/src/connectors/google_drive/lib.ts | 10 +++-- .../src/connectors/google_drive/lib/cli.ts | 8 ++-- .../google_drive/lib/google_drive_api.ts | 45 +++++++++++++------ .../connectors/google_drive/lib/hierarchy.ts | 8 ++-- .../google_drive/temporal/activities.ts | 35 ++++++++------- 9 files changed, 82 insertions(+), 51 deletions(-) diff --git a/connectors/migrations/20231214_find_non_shared_drives.ts b/connectors/migrations/20231214_find_non_shared_drives.ts index c9481581079d..9f862c6302a5 100644 --- a/connectors/migrations/20231214_find_non_shared_drives.ts +++ b/connectors/migrations/20231214_find_non_shared_drives.ts @@ -50,10 +50,10 @@ async function main() { }); for (const f of selectedFolders) { - const gDriveObject = await getGoogleDriveObject( + const gDriveObject = await getGoogleDriveObject({ authCredentials, - f.folderId - ); + driveObjectId: f.folderId, + }); if (!gDriveObject) { console.log(`Folder not found: folderId=${f.folderId}`); continue; diff --git a/connectors/migrations/20240422_fix_gdrive_errorType.ts b/connectors/migrations/20240422_fix_gdrive_errorType.ts index a3f9357e2846..66617cdfbbcb 100644 --- a/connectors/migrations/20240422_fix_gdrive_errorType.ts +++ b/connectors/migrations/20240422_fix_gdrive_errorType.ts @@ -23,7 +23,10 @@ async function main() { for (const connector of connectors) { try { const auth = await getAuthObject(connector.connectionId); - const gDriveObject = await getGoogleDriveObject(auth, "root"); + const gDriveObject = await getGoogleDriveObject({ + authCredentials: auth, + driveObjectId: "root", + }); logger.info( { connectorId: connector.id, diff --git a/connectors/migrations/20240529_clean_gdrive_folders.ts b/connectors/migrations/20240529_clean_gdrive_folders.ts index d17e77abf761..a54a0c1b4a7e 100644 --- a/connectors/migrations/20240529_clean_gdrive_folders.ts +++ b/connectors/migrations/20240529_clean_gdrive_folders.ts @@ -81,7 +81,10 @@ async function main() { continue; } - const file = await getGoogleDriveObject(authCredentials, folderId); + const file = await getGoogleDriveObject({ + authCredentials, + driveObjectId: folderId, + }); if (!file) { logger.info( diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index d91378038465..8d2210d7d9d0 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -378,10 +378,10 @@ export class GoogleDriveConnectorManager extends BaseConnectorManager { const nodes: ContentNode[] = await Promise.all( drives.map(async (d): Promise => { - const driveObject = await getGoogleDriveObject( + const driveObject = await getGoogleDriveObject({ authCredentials, - d.id - ); + driveObjectId: d.id, + }); if (!driveObject) { throw new Error( `Drive ${d.id} unexpectedly not found (got 404).` @@ -957,7 +957,10 @@ async function getFoldersAsContentNodes({ return concurrentExecutor( folders, async (f): Promise => { - const fd = await getGoogleDriveObject(authCredentials, f.folderId); + const fd = await getGoogleDriveObject({ + authCredentials, + driveObjectId: f.folderId, + }); if (!fd) { return null; } diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index 8720901c5f63..b1aae3da149b 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -192,9 +192,13 @@ export async function fixParents({ await concurrentExecutor( files, async (file) => - getGoogleDriveObject(authCredentials, file.driveFileId, { - connectorId: connector.id, - ts: startSyncTs, + getGoogleDriveObject({ + authCredentials, + driveObjectId: file.driveFileId, + cacheKey: { + connectorId: connector.id, + ts: startSyncTs, + }, }), { concurrency: 10, diff --git a/connectors/src/connectors/google_drive/lib/cli.ts b/connectors/src/connectors/google_drive/lib/cli.ts index ea381e6311c1..96abeeaaf9a8 100644 --- a/connectors/src/connectors/google_drive/lib/cli.ts +++ b/connectors/src/connectors/google_drive/lib/cli.ts @@ -113,11 +113,11 @@ export const google_drive = async ({ const fileId = args.fileId; const now = Date.now(); const authCredentials = await getAuthObject(connector.connectionId); - const driveObject = await getGoogleDriveObject( + const driveObject = await getGoogleDriveObject({ authCredentials, - getDriveFileId(fileId), - { connectorId: connector.id, ts: now } - ); + driveObjectId: getDriveFileId(fileId), + cacheKey: { connectorId: connector.id, ts: now }, + }); if (!driveObject) { throw new Error("Can't find google drive object"); } diff --git a/connectors/src/connectors/google_drive/lib/google_drive_api.ts b/connectors/src/connectors/google_drive/lib/google_drive_api.ts index 21c6aa03fa13..06afa513dc70 100644 --- a/connectors/src/connectors/google_drive/lib/google_drive_api.ts +++ b/connectors/src/connectors/google_drive/lib/google_drive_api.ts @@ -15,10 +15,13 @@ interface CacheKey { ts: string | number; } -async function _getGoogleDriveObject( - authCredentials: OAuth2Client, - driveObjectId: string -): Promise { +async function _getGoogleDriveObject({ + authCredentials, + driveObjectId, +}: { + authCredentials: OAuth2Client; + driveObjectId: string; +}): Promise { const drive = await getDriveClient(authCredentials); try { @@ -48,22 +51,36 @@ async function _getGoogleDriveObject( const cachedGetGoogleDriveObject = cacheWithRedis< GoogleDriveObjectType | null, - [OAuth2Client, string, CacheKey] + [ + { + authCredentials: OAuth2Client; + driveObjectId: string; + cacheKey: CacheKey; + }, + ] >( _getGoogleDriveObject, - (_, driveObjectId, { connectorId, ts }) => { - return `${connectorId}:${driveObjectId}:${ts}`; + ({ driveObjectId, cacheKey }) => { + return `${cacheKey.connectorId}:${driveObjectId}:${cacheKey.ts}`; }, 60 * 10 * 1000 ); -export async function getGoogleDriveObject( - authCredentials: OAuth2Client, - driveObjectId: string, - cacheKey?: CacheKey -): Promise { +export async function getGoogleDriveObject({ + authCredentials, + driveObjectId, + cacheKey, +}: { + authCredentials: OAuth2Client; + driveObjectId: string; + cacheKey?: CacheKey; +}): Promise { if (cacheKey) { - return cachedGetGoogleDriveObject(authCredentials, driveObjectId, cacheKey); + return cachedGetGoogleDriveObject({ + authCredentials, + driveObjectId, + cacheKey, + }); } - return _getGoogleDriveObject(authCredentials, driveObjectId); + return _getGoogleDriveObject({ authCredentials, driveObjectId }); } diff --git a/connectors/src/connectors/google_drive/lib/hierarchy.ts b/connectors/src/connectors/google_drive/lib/hierarchy.ts index ed8fa93a0e65..db8d04d92b4f 100644 --- a/connectors/src/connectors/google_drive/lib/hierarchy.ts +++ b/connectors/src/connectors/google_drive/lib/hierarchy.ts @@ -30,11 +30,11 @@ async function getFileParents( const parents: string[] = [driveFile.id]; let currentObject = driveFile; while (currentObject.parent) { - const parent = await getGoogleDriveObject( + const parent = await getGoogleDriveObject({ authCredentials, - currentObject.parent, - { connectorId, ts: startSyncTs } - ); + driveObjectId: currentObject.parent, + cacheKey: { connectorId, ts: startSyncTs }, + }); if (!parent) { // If we got a 404 error we stop the iteration as the parent disappeared. logger.info("Parent not found in `getFileParents`", { diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index c83b91971b6a..dc027399be83 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -135,10 +135,10 @@ export async function getDrivesToSync( const drives: Record = {}; for (const folder of selectedFolders) { - const remoteFolder = await getGoogleDriveObject( + const remoteFolder = await getGoogleDriveObject({ authCredentials, - folder.folderId - ); + driveObjectId: folder.folderId, + }); if (remoteFolder) { if (!remoteFolder.driveId) { throw new Error(`Folder ${folder.folderId} does not have a driveId.`); @@ -196,11 +196,11 @@ export async function syncFiles( csvEnabled: config?.csvEnabled || false, }); const authCredentials = await getAuthObject(connector.connectionId); - const driveFolder = await getGoogleDriveObject( + const driveFolder = await getGoogleDriveObject({ authCredentials, - driveFolderId, - { connectorId, ts: startSyncTs } - ); + driveObjectId: driveFolderId, + cacheKey: { connectorId, ts: startSyncTs }, + }); if (!driveFolder) { // We got a 404 on this folder, we skip it. logger.info( @@ -644,10 +644,10 @@ export async function shouldGarbageCollect(connectorId: ModelId) { const authCredentials = await getAuthObject(connector.connectionId); for (const folder of selectedFolder) { - const remoteFolder = await getGoogleDriveObject( + const remoteFolder = await getGoogleDriveObject({ authCredentials, - folder.folderId - ); + driveObjectId: folder.folderId, + }); if (!remoteFolder) { return true; } @@ -693,11 +693,11 @@ export async function garbageCollector( await Promise.all( files.map(async (file) => { return queue.add(async () => { - const driveFile = await getGoogleDriveObject( + const driveFile = await getGoogleDriveObject({ authCredentials, - file.driveFileId, - { connectorId, ts } - ); + driveObjectId: file.driveFileId, + cacheKey: { connectorId, ts }, + }); if (!driveFile) { // Could not find the file on Gdrive, deleting our local reference to it. await deleteFile(file); @@ -827,9 +827,10 @@ export async function markFolderAsVisited( throw new Error(`Connector ${connectorId} not found`); } const authCredentials = await getAuthObject(connector.connectionId); - const file = await getGoogleDriveObject(authCredentials, driveFileId, { - connectorId, - ts: startSyncTs, + const file = await getGoogleDriveObject({ + authCredentials, + driveObjectId: driveFileId, + cacheKey: { connectorId, ts: startSyncTs }, }); if (!file) { From b16111747125b3ac7de2942f920c4a7af8e284b9 Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 17:50:45 +0100 Subject: [PATCH 6/8] clean up and docment --- connectors/src/connectors/google_drive/lib.ts | 248 ++++++++++-------- .../src/connectors/google_drive/lib/cli.ts | 4 +- .../google_drive/temporal/activities.ts | 4 +- 3 files changed, 147 insertions(+), 109 deletions(-) diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index b1aae3da149b..77d48b7ef49e 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -157,7 +157,29 @@ export async function internalDeleteFile( }); } -export async function fixParents({ +/** + * Fixes parent-child relationship consistency for Google Drive files. + * + * This function performs two main checks: + * 1. If checkFromGoogle=true, verifies that local parent relationships match Google Drive + * 2. Validates that all parent IDs reference valid files or root folders + * + * For any inconsistencies found: + * - If execute=false, only logs the issues + * - If execute=true, fixes the inconsistencies by: + * - Deleting files that don't exist in Google Drive + * - Updating parent relationships to match Google Drive + * - Deleting files with invalid parents + * - Resetting parent to null for root folders + * + * @param connector - The Google Drive connector resource + * @param files - List of GoogleDriveFiles to check + * @param startSyncTs - Timestamp for caching/memoization + * @param checkFromGoogle - Whether to verify against Google Drive API + * @param execute - Whether to fix inconsistencies or just log them + * @param logger - Logger instance for recording issues + */ +export async function fixParentsConsistency({ connector, files, startSyncTs, @@ -172,6 +194,117 @@ export async function fixParents({ execute?: boolean; logger: Logger; }) { + // First check consistency with Google Drive + if (checkFromGoogle) { + logger.info("Checking consistency with Google Drive"); + const authCredentials = await getAuthObject(connector.connectionId); + + const googleFiles = removeNulls( + await concurrentExecutor( + files, + async (file) => + getGoogleDriveObject({ + authCredentials, + driveObjectId: file.driveFileId, + cacheKey: { + connectorId: connector.id, + ts: startSyncTs, + }, + }), + { + concurrency: 10, + } + ) + ); + + for (const file of files) { + const googleFile = googleFiles.find((f) => f.id === file.driveFileId); + if (!googleFile) { + logger.info( + { fileId: file.driveFileId }, + "File does not exist in Google Drive, deleting" + ); + if (execute) { + await internalDeleteFile(connector, file); + } + } else { + const parents = await getFileParentsMemoized( + connector.id, + authCredentials, + googleFile, + startSyncTs + ); + const localParents = await getLocalParents( + connector.id, + file.dustFileId, + `${startSyncTs}` + ); + + if ( + JSON.stringify(parents.map((p) => getInternalId(p))) !== + JSON.stringify(localParents) + ) { + logger.info( + { dustFileId: file.dustFileId, localParents, parents }, + "Parents not consistent with gdrive, updating" + ); + const driveParentId = parents[1]; + const dustParentId = driveParentId + ? getInternalId(driveParentId) + : null; + if (execute) { + await file.update({ + parentId: driveParentId, + }); + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const dustParentIds = parents.map((p) => getInternalId(p)); + if ( + isGoogleDriveFolder(googleFile) || + isGoogleDriveSpreadSheetFile(googleFile) + ) { + await upsertDataSourceFolder({ + dataSourceConfig, + folderId: file.dustFileId, + parents: dustParentIds, + parentId: dustParentId, + title: file.name ?? "", + mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER, + sourceUrl: getSourceUrlForGoogleDriveFiles(file), + }); + const sheets = await GoogleDriveSheet.findAll({ + where: { + driveFileId: file.driveFileId, + connectorId: connector.id, + }, + }); + for (const sheet of sheets) { + const tableId = getGoogleSheetTableId( + sheet.driveFileId, + sheet.driveSheetId + ); + await updateDataSourceTableParents({ + dataSourceConfig, + tableId, + parents: [tableId, ...dustParentIds], + parentId: file.dustFileId, + }); + } + } else { + await updateDataSourceDocumentParents({ + dataSourceConfig, + documentId: file.dustFileId, + parents: dustParentIds, + parentId: dustParentId, + }); + } + } + } + } + } + } + + logger.info("Checking parentIds validity"); + const parentFiles = await GoogleDriveFiles.findAll({ where: { connectorId: connector.id, @@ -185,112 +318,17 @@ export async function fixParents({ }, }); - const authCredentials = await getAuthObject(connector.connectionId); - - const googleFiles = checkFromGoogle - ? removeNulls( - await concurrentExecutor( - files, - async (file) => - getGoogleDriveObject({ - authCredentials, - driveObjectId: file.driveFileId, - cacheKey: { - connectorId: connector.id, - ts: startSyncTs, - }, - }), - { - concurrency: 10, - } - ) - ) - : []; - for (const file of files) { - if (file.parentId) { - if (checkFromGoogle) { - const googleFile = googleFiles.find((f) => f.id === file.driveFileId); - if (!googleFile) { - logger.info( - { fileId: file.driveFileId }, - "File does not exist in Google Drive, skipping" - ); - continue; - } else { - const parents = await getFileParentsMemoized( - connector.id, - authCredentials, - googleFile, - startSyncTs - ); - const localParents = await getLocalParents( - connector.id, - file.dustFileId, - `${startSyncTs}` - ); - - if ( - JSON.stringify(parents.map((p) => getInternalId(p))) !== - JSON.stringify(localParents) - ) { - logger.info( - { dustFileId: file.dustFileId, localParents, parents }, - "Parents not consistent with gdrive, updating" - ); - const driveParentId = parents[1]; - const dustParentId = driveParentId - ? getInternalId(driveParentId) - : null; - if (execute) { - await file.update({ - parentId: driveParentId, - }); - const dataSourceConfig = dataSourceConfigFromConnector(connector); - const dustParentIds = parents.map((p) => getInternalId(p)); - if ( - isGoogleDriveFolder(googleFile) || - isGoogleDriveSpreadSheetFile(googleFile) - ) { - await upsertDataSourceFolder({ - dataSourceConfig, - folderId: file.dustFileId, - parents: dustParentIds, - parentId: dustParentId, - title: file.name ?? "", - mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER, - sourceUrl: getSourceUrlForGoogleDriveFiles(file), - }); - const sheets = await GoogleDriveSheet.findAll({ - where: { - driveFileId: file.driveFileId, - connectorId: connector.id, - }, - }); - for (const sheet of sheets) { - const tableId = getGoogleSheetTableId( - sheet.driveFileId, - sheet.driveSheetId - ); - await updateDataSourceTableParents({ - dataSourceConfig, - tableId, - parents: [tableId, ...dustParentIds], - parentId: file.dustFileId, - }); - } - } else { - await updateDataSourceDocumentParents({ - dataSourceConfig, - documentId: file.dustFileId, - parents: dustParentIds, - parentId: dustParentId, - }); - } - } - } - } + if (!file.parentId && !roots.find((r) => r.folderId === file.driveFileId)) { + logger.info( + { fileId: file.driveFileId }, + "Deleting file with no parent and not a root folder" + ); + if (execute) { + await internalDeleteFile(connector, file); } + } + if (file.parentId) { const parentFile = parentFiles.find( (f) => f.driveFileId === file.parentId ); diff --git a/connectors/src/connectors/google_drive/lib/cli.ts b/connectors/src/connectors/google_drive/lib/cli.ts index 96abeeaaf9a8..2bc690ff7ab1 100644 --- a/connectors/src/connectors/google_drive/lib/cli.ts +++ b/connectors/src/connectors/google_drive/lib/cli.ts @@ -7,7 +7,7 @@ import { googleDriveIncrementalSyncWorkflowId } from "@dust-tt/types"; import { Op } from "sequelize"; import { getConnectorManager } from "@connectors/connectors"; -import { fixParents } from "@connectors/connectors/google_drive/lib"; +import { fixParentsConsistency } from "@connectors/connectors/google_drive/lib"; import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api"; import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy"; import { launchGoogleDriveIncrementalSyncWorkflow } from "@connectors/connectors/google_drive/temporal/client"; @@ -154,7 +154,7 @@ export const google_drive = async ({ if (!connectorResource) { throw new Error("Connector not found"); } - await fixParents({ + await fixParentsConsistency({ connector: connectorResource, files, checkFromGoogle: true, diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index dc027399be83..119103e7e517 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -10,7 +10,7 @@ import { Op } from "sequelize"; import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive"; import { - fixParents, + fixParentsConsistency, internalDeleteFile, } from "@connectors/connectors/google_drive/lib"; import { @@ -723,7 +723,7 @@ export async function garbageCollector( ); // TODO(nodes-core): Run fixParents in dry run mode to check parentIds validity - await fixParents({ + await fixParentsConsistency({ connector, files, checkFromGoogle: false, From 2d898960f55c47e42b9192953dbcb4dc5197c98f Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 18:32:37 +0100 Subject: [PATCH 7/8] fix parent update --- connectors/src/connectors/google_drive/temporal/activities.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index 119103e7e517..490561139883 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -525,7 +525,7 @@ export async function incrementalSync( driveFileId: file.id, name: file.name, mimeType: file.mimeType, - parentId: file.parent, + parentId: parents[1] ? getDriveFileId(parents[1]) : null, lastSeenTs: new Date(), }); localLogger.info({ fileId: change.file.id }, "done syncing file"); From 668bf2e84db8c3fe8fbc29c5c4000a4e3e0d29dd Mon Sep 17 00:00:00 2001 From: Thomas Draier Date: Thu, 6 Feb 2025 18:33:14 +0100 Subject: [PATCH 8/8] handle out-of-sync files --- connectors/src/connectors/google_drive/lib.ts | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index 77d48b7ef49e..db5f35d60c6b 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -239,8 +239,15 @@ export async function fixParentsConsistency({ file.dustFileId, `${startSyncTs}` ); - - if ( + if (parents[parents.length - 1] === "gdrive_outside_sync") { + logger.info( + { dustFileId: file.dustFileId }, + "File is outside of sync, deleting" + ); + if (execute) { + await internalDeleteFile(connector, file); + } + } else if ( JSON.stringify(parents.map((p) => getInternalId(p))) !== JSON.stringify(localParents) ) { @@ -301,6 +308,13 @@ export async function fixParentsConsistency({ } } } + + // Re-fetch files to ensure we only process non-deleted ones + files = await GoogleDriveFiles.findAll({ + where: { + id: files.map((f) => f.id), + }, + }); } logger.info("Checking parentIds validity");