Skip to content

Commit

Permalink
[connectors] Add cache in getGoogleDriveObject, improved fixParents (#…
Browse files Browse the repository at this point in the history
…10588)

* Add cache in getGoogleDriveObject, improved fixParents

* use dedicated key type

* Add comment, use dry run

* Update spreadsheets

* use object for params:

* clean up and docment

* fix parent update

* handle out-of-sync files
  • Loading branch information
tdraier authored Feb 6, 2025
1 parent d4e1112 commit f55079e
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 50 deletions.
6 changes: 3 additions & 3 deletions connectors/migrations/20231214_find_non_shared_drives.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ async function main() {
});

for (const f of selectedFolders) {
const gDriveObject = await getGoogleDriveObject(
const gDriveObject = await getGoogleDriveObject({
authCredentials,
f.folderId
);
driveObjectId: f.folderId,
});
if (!gDriveObject) {
console.log(`Folder not found: folderId=${f.folderId}`);
continue;
Expand Down
5 changes: 4 additions & 1 deletion connectors/migrations/20240422_fix_gdrive_errorType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ async function main() {
for (const connector of connectors) {
try {
const auth = await getAuthObject(connector.connectionId);
const gDriveObject = await getGoogleDriveObject(auth, "root");
const gDriveObject = await getGoogleDriveObject({
authCredentials: auth,
driveObjectId: "root",
});
logger.info(
{
connectorId: connector.id,
Expand Down
5 changes: 4 additions & 1 deletion connectors/migrations/20240529_clean_gdrive_folders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ async function main() {
continue;
}

const file = await getGoogleDriveObject(authCredentials, folderId);
const file = await getGoogleDriveObject({
authCredentials,
driveObjectId: folderId,
});

if (!file) {
logger.info(
Expand Down
11 changes: 7 additions & 4 deletions connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,10 @@ export class GoogleDriveConnectorManager extends BaseConnectorManager<null> {

const nodes: ContentNode[] = await Promise.all(
drives.map(async (d): Promise<ContentNode> => {
const driveObject = await getGoogleDriveObject(
const driveObject = await getGoogleDriveObject({
authCredentials,
d.id
);
driveObjectId: d.id,
});
if (!driveObject) {
throw new Error(
`Drive ${d.id} unexpectedly not found (got 404).`
Expand Down Expand Up @@ -957,7 +957,10 @@ async function getFoldersAsContentNodes({
return concurrentExecutor(
folders,
async (f): Promise<ContentNode | null> => {
const fd = await getGoogleDriveObject(authCredentials, f.folderId);
const fd = await getGoogleDriveObject({
authCredentials,
driveObjectId: f.folderId,
});
if (!fd) {
return null;
}
Expand Down
192 changes: 185 additions & 7 deletions connectors/src/connectors/google_drive/lib.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
import type { ContentNodesViewType, ModelId } from "@dust-tt/types";
import {
cacheWithRedis,
concurrentExecutor,
getGoogleIdsFromSheetContentNodeInternalId,
getGoogleSheetTableId,
isGoogleSheetContentNodeInternalId,
MIME_TYPES,
removeNulls,
} from "@dust-tt/types";
import type { Logger } from "pino";
import type { InferAttributes, WhereOptions } from "sequelize";

import { isGoogleDriveSpreadSheetFile } from "@connectors/connectors/google_drive/temporal/mime_types";
import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive";
import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api";
import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy";
import {
isGoogleDriveFolder,
isGoogleDriveSpreadSheetFile,
} from "@connectors/connectors/google_drive/temporal/mime_types";
import { deleteSpreadsheet } from "@connectors/connectors/google_drive/temporal/spreadsheets";
import {
getAuthObject,
getDriveFileId,
getInternalId,
} from "@connectors/connectors/google_drive/temporal/utils";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import {
deleteDataSourceDocument,
deleteDataSourceFolder,
updateDataSourceDocumentParents,
updateDataSourceTableParents,
upsertDataSourceFolder,
} from "@connectors/lib/data_sources";
import {
GoogleDriveFiles,
Expand Down Expand Up @@ -144,12 +157,168 @@ export async function internalDeleteFile(
});
}

export async function fixParents(
connector: ConnectorResource,
files: GoogleDriveFiles[],
logger: Logger,
execute: boolean = true
) {
/**
* Fixes parent-child relationship consistency for Google Drive files.
*
* This function performs two main checks:
* 1. If checkFromGoogle=true, verifies that local parent relationships match Google Drive
* 2. Validates that all parent IDs reference valid files or root folders
*
* For any inconsistencies found:
* - If execute=false, only logs the issues
* - If execute=true, fixes the inconsistencies by:
* - Deleting files that don't exist in Google Drive
* - Updating parent relationships to match Google Drive
* - Deleting files with invalid parents
* - Resetting parent to null for root folders
*
* @param connector - The Google Drive connector resource
* @param files - List of GoogleDriveFiles to check
* @param startSyncTs - Timestamp for caching/memoization
* @param checkFromGoogle - Whether to verify against Google Drive API
* @param execute - Whether to fix inconsistencies or just log them
* @param logger - Logger instance for recording issues
*/
export async function fixParentsConsistency({
connector,
files,
startSyncTs,
checkFromGoogle = false,
execute = false,
logger,
}: {
connector: ConnectorResource;
files: GoogleDriveFiles[];
startSyncTs: number;
checkFromGoogle?: boolean;
execute?: boolean;
logger: Logger;
}) {
// First check consistency with Google Drive
if (checkFromGoogle) {
logger.info("Checking consistency with Google Drive");
const authCredentials = await getAuthObject(connector.connectionId);

const googleFiles = removeNulls(
await concurrentExecutor(
files,
async (file) =>
getGoogleDriveObject({
authCredentials,
driveObjectId: file.driveFileId,
cacheKey: {
connectorId: connector.id,
ts: startSyncTs,
},
}),
{
concurrency: 10,
}
)
);

for (const file of files) {
const googleFile = googleFiles.find((f) => f.id === file.driveFileId);
if (!googleFile) {
logger.info(
{ fileId: file.driveFileId },
"File does not exist in Google Drive, deleting"
);
if (execute) {
await internalDeleteFile(connector, file);
}
} else {
const parents = await getFileParentsMemoized(
connector.id,
authCredentials,
googleFile,
startSyncTs
);
const localParents = await getLocalParents(
connector.id,
file.dustFileId,
`${startSyncTs}`
);
if (parents[parents.length - 1] === "gdrive_outside_sync") {
logger.info(
{ dustFileId: file.dustFileId },
"File is outside of sync, deleting"
);
if (execute) {
await internalDeleteFile(connector, file);
}
} else if (
JSON.stringify(parents.map((p) => getInternalId(p))) !==
JSON.stringify(localParents)
) {
logger.info(
{ dustFileId: file.dustFileId, localParents, parents },
"Parents not consistent with gdrive, updating"
);
const driveParentId = parents[1];
const dustParentId = driveParentId
? getInternalId(driveParentId)
: null;
if (execute) {
await file.update({
parentId: driveParentId,
});
const dataSourceConfig = dataSourceConfigFromConnector(connector);
const dustParentIds = parents.map((p) => getInternalId(p));
if (
isGoogleDriveFolder(googleFile) ||
isGoogleDriveSpreadSheetFile(googleFile)
) {
await upsertDataSourceFolder({
dataSourceConfig,
folderId: file.dustFileId,
parents: dustParentIds,
parentId: dustParentId,
title: file.name ?? "",
mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER,
sourceUrl: getSourceUrlForGoogleDriveFiles(file),
});
const sheets = await GoogleDriveSheet.findAll({
where: {
driveFileId: file.driveFileId,
connectorId: connector.id,
},
});
for (const sheet of sheets) {
const tableId = getGoogleSheetTableId(
sheet.driveFileId,
sheet.driveSheetId
);
await updateDataSourceTableParents({
dataSourceConfig,
tableId,
parents: [tableId, ...dustParentIds],
parentId: file.dustFileId,
});
}
} else {
await updateDataSourceDocumentParents({
dataSourceConfig,
documentId: file.dustFileId,
parents: dustParentIds,
parentId: dustParentId,
});
}
}
}
}
}

// Re-fetch files to ensure we only process non-deleted ones
files = await GoogleDriveFiles.findAll({
where: {
id: files.map((f) => f.id),
},
});
}

logger.info("Checking parentIds validity");

const parentFiles = await GoogleDriveFiles.findAll({
where: {
connectorId: connector.id,
Expand All @@ -164,6 +333,15 @@ export async function fixParents(
});

for (const file of files) {
if (!file.parentId && !roots.find((r) => r.folderId === file.driveFileId)) {
logger.info(
{ fileId: file.driveFileId },
"Deleting file with no parent and not a root folder"
);
if (execute) {
await internalDeleteFile(connector, file);
}
}
if (file.parentId) {
const parentFile = parentFiles.find(
(f) => f.driveFileId === file.parentId
Expand Down
32 changes: 22 additions & 10 deletions connectors/src/connectors/google_drive/lib/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { googleDriveIncrementalSyncWorkflowId } from "@dust-tt/types";
import { Op } from "sequelize";

import { getConnectorManager } from "@connectors/connectors";
import { fixParents } from "@connectors/connectors/google_drive/lib";
import { fixParentsConsistency } from "@connectors/connectors/google_drive/lib";
import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api";
import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy";
import { launchGoogleDriveIncrementalSyncWorkflow } from "@connectors/connectors/google_drive/temporal/client";
Expand Down Expand Up @@ -111,20 +111,21 @@ export const google_drive = async ({
throw new Error("Missing --fileId argument");
}
const fileId = args.fileId;

const now = Date.now();
const authCredentials = await getAuthObject(connector.connectionId);
const driveObject = await getGoogleDriveObject(
const driveObject = await getGoogleDriveObject({
authCredentials,
getDriveFileId(fileId)
);
driveObjectId: getDriveFileId(fileId),
cacheKey: { connectorId: connector.id, ts: now },
});
if (!driveObject) {
throw new Error("Can't find google drive object");
}
const parents = await getFileParentsMemoized(
connector.id,
authCredentials,
driveObject,
Date.now()
now
);
return { status: 200, content: parents, type: typeof parents };
}
Expand All @@ -136,6 +137,7 @@ export const google_drive = async ({
const limit = 1000;
let fromId = 0;
let files: GoogleDriveFiles[] = [];
const now = Date.now();
do {
files = await GoogleDriveFiles.findAll({
where: {
Expand All @@ -145,11 +147,21 @@ export const google_drive = async ({
order: [["id", "ASC"]],
limit,
});
const connecrtorResource = new ConnectorResource(
ConnectorModel,
connector

const connectorResource = await ConnectorResource.fetchById(
connector.id
);
await fixParents(connecrtorResource, files, topLogger, execute);
if (!connectorResource) {
throw new Error("Connector not found");
}
await fixParentsConsistency({
connector: connectorResource,
files,
checkFromGoogle: true,
execute,
startSyncTs: now,
logger: topLogger,
});
fromId = files[limit - 1]?.id ?? 0;
} while (fromId > 0);
return { success: true };
Expand Down
Loading

0 comments on commit f55079e

Please sign in to comment.