Skip to content

Commit

Permalink
Merge pull request #196 from Sanketika-Obsrv/connector-registry-stream
Browse files Browse the repository at this point in the history
Connector registry stream upload
  • Loading branch information
ravismula authored Jul 23, 2024
2 parents 334f317 + bd485f9 commit 7e1f7ed
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 58 deletions.
2 changes: 2 additions & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"aws-sdk": "^2.1348.0",
"axios": "^1.6.0",
"body-parser": "^1.20.2",
"busboy": "^1.6.0",
"compression": "^1.7.4",
"dateformat": "2.0.0",
"express": "^4.18.2",
Expand Down Expand Up @@ -59,6 +60,7 @@
"@babel/traverse": "7.23.2"
},
"devDependencies": {
"@types/busboy": "^1.5.4",
"@types/chai": "^4.3.3",
"@types/chai-as-promised": "^7.1.5",
"@types/chai-spies": "^1.0.3",
Expand Down
6 changes: 5 additions & 1 deletion api-service/src/v2/connections/commandServiceConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ export const executeCommand = async (id: string, command: string) => {
}
}
return commandHttpService.post(commandPath, payload)
}
}

export const registerConnector = async (requestBody: any) => {
return commandHttpService.post("/connector/v1/register", requestBody)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { Request, Response } from "express";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import _ from "lodash";
import logger from "../../logger";
import axios from "axios";
import httpStatus from "http-status";
import busboy from "busboy";
import { PassThrough } from "stream";
import { generatePreSignedUrl } from "../GenerateSignedURL/helper";
import { registerConnector } from "../../connections/commandServiceConnection";

export const apiId = "api.connector.register";
export const code = "FAILED_TO_REGISTER_CONNECTOR";

let resmsgid: string | any;

const connectorRegisterController = async (req: Request, res: Response) => {
resmsgid = _.get(res, "resmsgid");
try {
const uploadStreamResponse: any = await uploadStream(req);
const payload = {
relative_path: uploadStreamResponse[0]
}
logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` })
const registryResponse = await registerConnector(payload);
logger.info({ apiId, resmsgid, message: `Connector registered successfully` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } })
} catch (error: any) {
const errMessage = _.get(error, "response.data.error.message")
logger.error(error, apiId, resmsgid, code);
let errorMessage = error;
const statusCode = _.get(error, "statusCode")
if (!statusCode || statusCode == 500) {
errorMessage = { code, message: errMessage || "Failed to register connector" }
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
};

const uploadStream = async (req: Request) => {
return new Promise((resolve, reject) => {
const filePromises: Promise<void>[] = [];
const busboyClient = busboy({ headers: req.headers });
const relative_path: any[] = [];
let fileCount = 0;

busboyClient.on("file", async (name: any, file: any, info: any) => {
if (fileCount > 0) {
// If more than one file is detected, reject the request
busboyClient.emit("error", reject({
code: "FAILED_TO_UPLOAD",
message: "Uploading multiple files are not allowed",
statusCode: 400,
errCode: "BAD_REQUEST"
}));
return
}
fileCount++;
const processFile = async () => {
const fileName = info?.filename;
try {
const preSignedUrl: any = await generatePreSignedUrl("write", [fileName], "connector")
const filePath = preSignedUrl[0]?.filePath
const fileNameExtracted = extractFileNameFromPath(filePath);
relative_path.push(...fileNameExtracted);
const pass = new PassThrough();
file.pipe(pass);
const fileBuffer = await streamToBuffer(pass);
await axios.put(preSignedUrl[0]?.preSignedUrl, fileBuffer, {
headers: {
"Content-Type": info.mimeType,
"Content-Length": fileBuffer.length,
}
});
}
catch (err) {
logger.error({ apiId, err, resmsgid, message: "Failed to generate sample urls", code: "FILES_GENERATE_URL_FAILURE" })
reject({
code: "FILES_GENERATE_URL_FAILURE",
message: "Failed to generate sample urls",
statusCode: 500,
errCode: "INTERNAL_SERVER_ERROR"
})
}
};
filePromises.push(processFile());
});
busboyClient.on("close", async () => {
try {
await Promise.all(filePromises);
resolve(relative_path);
} catch (error) {
logger.error({ apiId, error, resmsgid, message: "Fail to upload a file", code: "FAILED_TO_UPLOAD" })
reject({
code: "FAILED_TO_UPLOAD",
message: "Fail to upload a file",
statusCode: 400,
errCode: "BAD_REQUEST"
});
}
});
busboyClient.on("error", reject);
req.pipe(busboyClient);
})
}

const streamToBuffer = (stream: PassThrough): Promise<Buffer> => {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("end", () => resolve(Buffer.concat(chunks)));
stream.on("error", reject);
});
};

const extractFileNameFromPath = (filePath: string): string[] => {
const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g;
return filePath.match(regex) || [];
};

export default connectorRegisterController;
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import logger from "../../logger";
import { ErrorObject } from "../../types/ResponseModel";
import { schemaValidation } from "../../services/ValidationService";
import GenerateURL from "./GenerateSignedURLValidationSchema.json"
import { cloudProvider } from "../../services/CloudServices";
import { config } from "../../configs/Config";
import { URLAccess } from "../../types/SampleURLModel";
import { v4 as uuidv4 } from "uuid";
import path from "path";
import { generatePreSignedUrl } from "./helper";

export const apiId = "api.files.generate-url"
export const code = "FILES_GENERATE_URL_FAILURE"
const maxFiles = config.presigned_url_configs.maxFiles
let containerType: string;

const generateSignedURL = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
containerType = _.get(req, ["body", "request", "type"]);
try {
const isRequestValid: Record<string, any> = schemaValidation(req.body, GenerateURL)
if (!isRequestValid.isValid) {
Expand All @@ -46,21 +46,7 @@ const generateSignedURL = async (req: Request, res: Response) => {
errCode: "BAD_REQUEST"
} as ErrorObject, req, res);
}

const { filesList, updatedFileNames } = transformFileNames(files, access)
logger.info(`Updated file names with path:${updatedFileNames}`)

const urlExpiry: number = getURLExpiry(access)
const preSignedUrls = await Promise.all(cloudProvider.generateSignedURLs(config.cloud_config.container, updatedFileNames, access, urlExpiry))
const signedUrlList = _.map(preSignedUrls, list => {
const fileNameWithUid = _.keys(list)[0]
return {
filePath: getFilePath(fileNameWithUid),
fileName: filesList.get(fileNameWithUid),
preSignedUrl: _.values(list)[0]
}
})

const signedUrlList = await generatePreSignedUrl(access, files, containerType)
logger.info({ apiId, requestBody, msgid, resmsgid, response: signedUrlList, message: `Sample urls generated successfully for files:${files}` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: signedUrlList })
} catch (error: any) {
Expand All @@ -74,44 +60,6 @@ const generateSignedURL = async (req: Request, res: Response) => {
}
}

const getFilePath = (file: string) => {
return `${config.cloud_config.container}/${config.presigned_url_configs.service}/user_uploads/${file}`
}

const transformFileNames = (fileList: Array<string | any>, access: string): Record<string, any> => {
if (access === URLAccess.Read) {
return transformReadFiles(fileList)
}
return transformWriteFiles(fileList)
}

const transformReadFiles = (fileNames: Array<string | any>) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
fileMap.set(file, file)
return getFilePath(file)
})
return { filesList: fileMap, updatedFileNames }
}

const transformWriteFiles = (fileNames: Array<string | any>) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
const uuid = uuidv4().replace(/-/g, "").slice(0, 6);
const ext = path.extname(file)
const baseName = path.basename(file, ext)
const updatedFileName = `${baseName}_${uuid}${ext}`
fileMap.set(updatedFileName, file)
return getFilePath(updatedFileName)
})
return { filesList: fileMap, updatedFileNames }

}

const getURLExpiry = (access: string) => {
return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry
}

const checkLimitExceed = (files: Array<string>): boolean => {
return _.size(files) > maxFiles
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
"read",
"write"
]
},
"type": {
"type": "string",
"enum": [
"dataset",
"connector"
]
}
},
"additionalProperties": false,
Expand Down
64 changes: 64 additions & 0 deletions api-service/src/v2/controllers/GenerateSignedURL/helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { config } from "../../configs/Config";
import * as _ from "lodash";
import { cloudProvider } from "../../services/CloudServices";
import { URLAccess } from "../../types/SampleURLModel";
import path from "path";
import { v4 as uuidv4 } from "uuid";

export const generatePreSignedUrl = async (access: string, files: any, containerType: string) => {
const { filesList, updatedFileNames } = transformFileNames(files, access, containerType);
const urlExpiry: number = getURLExpiry(access);
const preSignedUrls = await Promise.all(cloudProvider.generateSignedURLs(config.cloud_config.container, updatedFileNames, access, urlExpiry));
const signedUrlList = _.map(preSignedUrls, list => {
const fileNameWithUid = _.keys(list)[0];
return {
filePath: getFilePath(fileNameWithUid, containerType),
fileName: filesList.get(fileNameWithUid),
preSignedUrl: _.values(list)[0]
};
});
return signedUrlList;
}

const getFilePath = (file: string, containerType: string) => {
const datasetUploadPath = `${config.presigned_url_configs.service}/user_uploads/${file}`;
const connectorUploadPath = `${config.cloud_config.container_prefix}/${file}`;
const paths: Record<string, string> = {
"dataset": datasetUploadPath,
"connector": connectorUploadPath
};
return paths[containerType] || datasetUploadPath;
}

const transformFileNames = (fileList: Array<string | any>, access: string, containerType: string): Record<string, any> => {
if (access === URLAccess.Read) {
return transformReadFiles(fileList, containerType);
}
return transformWriteFiles(fileList, containerType);
};

const transformReadFiles = (fileNames: Array<string | any>, containerType: string) => {
const fileMap = new Map();
const updatedFileNames = fileNames.map(file => {
fileMap.set(file, file);
return getFilePath(file, containerType);
});
return { filesList: fileMap, updatedFileNames };
};

const transformWriteFiles = (fileNames: Array<string | any>, containerType: string) => {
const fileMap = new Map();
const updatedFileNames = fileNames.map(file => {
const uuid = uuidv4().replace(/-/g, "").slice(0, 6);
const ext = path.extname(file);
const baseName = path.basename(file, ext);
const updatedFileName = `${baseName}_${uuid}${ext}`;
fileMap.set(updatedFileName, file);
return getFilePath(updatedFileName, containerType);
});
return { filesList: fileMap, updatedFileNames };
};

const getURLExpiry = (access: string) => {
return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry;
}
2 changes: 2 additions & 0 deletions api-service/src/v2/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { eventValidation } from "../controllers/EventValidation/EventValidation"
import GenerateSignedURL from "../controllers/GenerateSignedURL/GenerateSignedURL";
import { sqlQuery } from "../controllers/QueryWrapper/SqlQueryWrapper";
import DatasetStatusTansition from "../controllers/DatasetStatusTransition/DatasetStatusTransition";
import connectorRegisterController from "../controllers/ConnectorRegister/ConnectorRegisterController";
import datasetHealth from "../controllers/DatasetHealth/DatasetHealth";

export const router = express.Router();
Expand All @@ -40,6 +41,7 @@ router.post("/schema/validate", setDataToRequestObject("api.schema.validator"),
router.post("/template/query/:templateId", setDataToRequestObject("api.query.template.query"), queryTemplate);
router.post("/files/generate-url", setDataToRequestObject("api.files.generate-url"), onRequest({ entity: Entity.Management }), GenerateSignedURL);
router.post("/datasets/status-transition", setDataToRequestObject("api.datasets.status-transition"), onRequest({ entity: Entity.Management }), DatasetStatusTansition);
router.post("/connector/register", setDataToRequestObject("api.connector.register"), onRequest({ entity: Entity.Management }), connectorRegisterController);
router.post("/dataset/health", setDataToRequestObject("api.dataset.health"), onRequest({ entity: Entity.Management }), datasetHealth);

//Wrapper Service
Expand Down
3 changes: 2 additions & 1 deletion api-service/src/v2/services/CloudServices/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { GCPStorageService } from "./GCPStorageService";
import * as _ from "lodash";

const cloudProviderName = _.get(config, "cloud_config.cloud_storage_provider");
const cloudProviderConfig = _.get(config, "cloud_config.cloud_storage_config");
const cloudConfig = _.get(config, "cloud_config.cloud_storage_config");
const cloudProviderConfig = _.isString(cloudConfig) ? JSON.parse(cloudConfig) : cloudConfig;

const initialiseServiceProvider = (provider: any, config: any): AzureStorageService | AWSStorageService | GCPStorageService => {
switch (provider) {
Expand Down

0 comments on commit 7e1f7ed

Please sign in to comment.