Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector registry stream upload #196

Merged
merged 31 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7004c21
Sanketika-obsrv/issue-tracker: #OBS-150: added connector-registry rou…
yashashkumar Jun 25, 2024
ab2a37a
Sanketika-obsrv/issue-tracker: #OBS-150: added packages
yashashkumar Jun 25, 2024
6edf484
Sanketika-obsrv/issue-tracker: #OBS-150: added download link as response
yashashkumar Jun 25, 2024
23bd5b7
Sanketika-obsrv/issue-tracker: #OBS-150: deciding path based on type
yashashkumar Jun 25, 2024
f324134
Sanketika-obsrv/issue-tracker: #OBS-150: removed container config
yashashkumar Jun 25, 2024
6f743d3
Sanketika-obsrv/issue-tracker: #OBS-150: refactored container saving …
yashashkumar Jun 25, 2024
90a5f88
Sanketika-obsrv/issue-tracker: #OBS-150: removed bodyparser
yashashkumar Jun 25, 2024
795397f
Sanketika-obsrv/issue-tracker: #OBS-150: implementation of calling co…
yashashkumar Jun 25, 2024
62261dd
Sanketika-obsrv/issue-tracker: #OBS-150: updated error codes and adde…
yashashkumar Jun 25, 2024
38e2f1e
Sanketika-obsrv/issue-tracker: #OBS-150: added config route for comma…
yashashkumar Jun 25, 2024
acfd5de
Sanketika-obsrv/issue-tracker: #OBS-150: added promise reject on error
yashashkumar Jun 25, 2024
4808e68
Sanketika-obsrv/issue-tracker: #OBS-150: added regex to find filename…
yashashkumar Jun 25, 2024
0aaec48
Sanketika-obsrv/issue-tracker: #OBS-150: changed default port
yashashkumar Jun 25, 2024
72171d3
Sanketika-obsrv/issue-tracker: #OBS-150: checking type and setting value
yashashkumar Jun 26, 2024
3923f2f
Merge remote-tracking branch 'origin/develop' into connector-registry…
yashashkumar Jun 26, 2024
6d9f8e8
Sanketika-obsrv/issue-tracker: #OBS-150: chnaged config to defaults
yashashkumar Jun 26, 2024
2dbed92
Sanketika-obsrv/issue-tracker: #OBS-150: chnaged config to defaults
yashashkumar Jun 26, 2024
4c911b8
Sanketika-obsrv/issue-tracker: #OBS-150: removed path dependency
yashashkumar Jun 26, 2024
f3b105e
Sanketika-obsrv/issue-tracker: #OBS-150: changes in config file
yashashkumar Jun 26, 2024
f95167d
Sanketika-obsrv/issue-tracker: #OBS-150: changes in config file
yashashkumar Jun 26, 2024
685e072
Sanketika-obsrv/issue-tracker: #OBS-150: updated api id
yashashkumar Jun 26, 2024
2c216fe
Sanketika-obsrv/issue-tracker: #OBS-150: added default ts in request …
yashashkumar Jun 26, 2024
d7d94ca
Sanketika-obsrv/issue-tracker: #OBS-150: added default ts in request …
yashashkumar Jun 26, 2024
422a80c
Sanketika-obsrv/issue-tracker: #OBS-150: removed config
yashashkumar Jul 1, 2024
7091aec
Sanketika-obsrv/issue-tracker: #OBS-150: generalized generate signed …
yashashkumar Jul 1, 2024
490f357
Sanketika-obsrv/issue-tracker: #OBS-150: removed config
yashashkumar Jul 1, 2024
2ca6e08
Sanketika-obsrv/issue-tracker: #OBS-150: added connector registry cal…
yashashkumar Jul 1, 2024
e5e52ae
Sanketika-obsrv/issue-tracker: #OBS-150: removed lodash map
yashashkumar Jul 1, 2024
dd4355a
Sanketika-obsrv/issue-tracker: #OBS-150: changed route name and added…
yashashkumar Jul 1, 2024
a9d0be3
Sanketika-obsrv/issue-tracker: #OBS-150: changed route name and added…
yashashkumar Jul 1, 2024
bd485f9
Sanketika-obsrv/issue-tracker: #OBS-150: changed route name and added…
yashashkumar Jul 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"aws-sdk": "^2.1348.0",
"axios": "^1.6.0",
"body-parser": "^1.20.2",
"busboy": "^1.6.0",
"compression": "^1.7.4",
"dateformat": "2.0.0",
"express": "^4.18.2",
Expand Down Expand Up @@ -59,6 +60,7 @@
"@babel/traverse": "7.23.2"
},
"devDependencies": {
"@types/busboy": "^1.5.4",
"@types/chai": "^4.3.3",
"@types/chai-as-promised": "^7.1.5",
"@types/chai-spies": "^1.0.3",
Expand Down
3 changes: 2 additions & 1 deletion api-service/src/v2/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ export const config = {
"command_service_config": {
"host": process.env.command_service_host || "http://localhost",
"port": parseInt(process.env.command_service_port || "8000"),
"path": process.env.command_service_path || "/system/v1/dataset/command"
"path": process.env.command_service_path || "/system/v1/dataset/command",
"connector_registry_path": process.env.connector_registry_path || "/connector/v1/register",
},
"flink_job_configs": {
"pipeline_merged_job_manager_url": process.env.pipeline_merged_job_manager_url || "http://localhost:8081",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { Request, Response } from "express";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import _ from "lodash";
import logger from "../../logger";
import { config } from "../../configs/Config";
import axios from "axios";
import httpStatus from "http-status";
import busboy from "busboy";
import { PassThrough } from "stream";
import { generatePreSignedUrl } from "../GenerateSignedURL/helper";

export const apiId = "api.connector.stream.upload";
export const code = "FAILED_TO_REGISTER_CONNECTOR";

const commandServiceHost = _.get(config, ["command_service_config", "host"]);
const commandServicePort = _.get(config, ["command_service_config", "port"]);
const registryUrl = _.get(config, ["command_service_config", "connector_registry_path"])
let resmsgid: string | any;

const connectorRegistryStream = async (req: Request, res: Response) => {
resmsgid = _.get(res, "resmsgid");
try {
const uploadStreamResponse: any = await uploadStream(req);
const registryRequestBody = {
relative_path: uploadStreamResponse[0]
}
logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` })
const registryResponse = await axios.post(`${commandServiceHost}:${commandServicePort}${registryUrl}`, registryRequestBody);
logger.info({ apiId, resmsgid, message: `Connector registered successfully` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } })
} catch (error: any) {
const errMessage = _.get(error, "response.data.error.message")
logger.error(error, apiId, resmsgid, code);
let errorMessage = error;
const statusCode = _.get(error, "statusCode")
if (!statusCode || statusCode == 500) {
errorMessage = { code, message: errMessage || "Failed to register connector" }
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
};

const uploadStream = async (req: Request) => {
return new Promise((resolve, reject) => {
const filePromises: Promise<void>[] = [];
const bb = busboy({ headers: req.headers });
const relative_path: any[] = [];
let fileCount = 0;

bb.on("file", async (name: any, file: any, info: any) => {
if (fileCount > 0) {
// If more than one file is detected, reject the request
bb.emit("error", reject({
code: "FAILED_TO_UPLOAD",
message: "Uploading multiple files are not allowed",
statusCode: 400,
errCode: "BAD_REQUEST"
}));
return
}
fileCount++;
const processFile = async () => {
const fileName = info?.filename;
try {
const preSignedUrl: any = await generatePreSignedUrl("write", [fileName], "connector")
const filePath = preSignedUrl[0]?.filePath
const fileNameExtracted = extractFileNameFromPath(filePath);
relative_path.push(...fileNameExtracted);
const pass = new PassThrough();
file.pipe(pass);
const fileBuffer = await streamToBuffer(pass);
await axios.put(preSignedUrl[0]?.preSignedUrl, fileBuffer, {
headers: {
"Content-Type": info.mimeType,
"Content-Length": fileBuffer.length,
}
});
}
catch (err) {
logger.error({ apiId, err, resmsgid, message: "Failed to generate sample urls", code: "FILES_GENERATE_URL_FAILURE" })
reject({
code: "FILES_GENERATE_URL_FAILURE",
message: "Failed to generate sample urls",
statusCode: 500,
errCode: "INTERNAL_SERVER_ERROR"
})
}
};
filePromises.push(processFile());
});
bb.on("close", async () => {
try {
await Promise.all(filePromises);
resolve(relative_path);
} catch (error) {
logger.error({ apiId, error, resmsgid, message: "Fail to upload a file", code: "FAILED_TO_UPLOAD" })
reject({
code: "FAILED_TO_UPLOAD",
message: "Fail to upload a file",
statusCode: 400,
errCode: "BAD_REQUEST"
});
}
});
bb.on("error", reject);
req.pipe(bb);
})
}

const streamToBuffer = (stream: PassThrough): Promise<Buffer> => {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("end", () => resolve(Buffer.concat(chunks)));
stream.on("error", reject);
});
};

const extractFileNameFromPath = (filePath: string): string[] => {
const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g;
return filePath.match(regex) || [];
};

export default connectorRegistryStream;
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import logger from "../../logger";
import { ErrorObject } from "../../types/ResponseModel";
import { schemaValidation } from "../../services/ValidationService";
import GenerateURL from "./GenerateSignedURLValidationSchema.json"
import { cloudProvider } from "../../services/CloudServices";
import { config } from "../../configs/Config";
import { URLAccess } from "../../types/SampleURLModel";
import { v4 as uuidv4 } from "uuid";
import path from "path";
import { generatePreSignedUrl } from "./helper";

export const apiId = "api.files.generate-url"
export const code = "FILES_GENERATE_URL_FAILURE"
const maxFiles = config.presigned_url_configs.maxFiles
let containerType: string;

const generateSignedURL = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
containerType = _.get(req, ["body", "request", "type"]);
try {
const isRequestValid: Record<string, any> = schemaValidation(req.body, GenerateURL)
if (!isRequestValid.isValid) {
Expand All @@ -46,21 +46,7 @@ const generateSignedURL = async (req: Request, res: Response) => {
errCode: "BAD_REQUEST"
} as ErrorObject, req, res);
}

const { filesList, updatedFileNames } = transformFileNames(files, access)
logger.info(`Updated file names with path:${updatedFileNames}`)

const urlExpiry: number = getURLExpiry(access)
const preSignedUrls = await Promise.all(cloudProvider.generateSignedURLs(config.cloud_config.container, updatedFileNames, access, urlExpiry))
const signedUrlList = _.map(preSignedUrls, list => {
const fileNameWithUid = _.keys(list)[0]
return {
filePath: getFilePath(fileNameWithUid),
fileName: filesList.get(fileNameWithUid),
preSignedUrl: _.values(list)[0]
}
})

const signedUrlList = await generatePreSignedUrl(access, files, containerType)
logger.info({ apiId, requestBody, msgid, resmsgid, response: signedUrlList, message: `Sample urls generated successfully for files:${files}` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: signedUrlList })
} catch (error: any) {
Expand All @@ -74,44 +60,6 @@ const generateSignedURL = async (req: Request, res: Response) => {
}
}

const getFilePath = (file: string) => {
return `${config.cloud_config.container}/${config.presigned_url_configs.service}/user_uploads/${file}`
}

const transformFileNames = (fileList: Array<string | any>, access: string): Record<string, any> => {
if (access === URLAccess.Read) {
return transformReadFiles(fileList)
}
return transformWriteFiles(fileList)
}

const transformReadFiles = (fileNames: Array<string | any>) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
fileMap.set(file, file)
return getFilePath(file)
})
return { filesList: fileMap, updatedFileNames }
}

const transformWriteFiles = (fileNames: Array<string | any>) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
const uuid = uuidv4().replace(/-/g, "").slice(0, 6);
const ext = path.extname(file)
const baseName = path.basename(file, ext)
const updatedFileName = `${baseName}_${uuid}${ext}`
fileMap.set(updatedFileName, file)
return getFilePath(updatedFileName)
})
return { filesList: fileMap, updatedFileNames }

}

const getURLExpiry = (access: string) => {
return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry
}

const checkLimitExceed = (files: Array<string>): boolean => {
return _.size(files) > maxFiles
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
"read",
"write"
]
},
"type": {
"type": "string",
"enum": [
"dataset",
"connector"
]
}
},
"additionalProperties": false,
Expand Down
64 changes: 64 additions & 0 deletions api-service/src/v2/controllers/GenerateSignedURL/helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { config } from "../../configs/Config";
import * as _ from "lodash";
import { cloudProvider } from "../../services/CloudServices";
import { URLAccess } from "../../types/SampleURLModel";
import path from "path";
import { v4 as uuidv4 } from "uuid";

export const generatePreSignedUrl = async (access: string, files: any, containerType: string) => {
const { filesList, updatedFileNames } = transformFileNames(files, access, containerType);
const urlExpiry: number = getURLExpiry(access);
const preSignedUrls = await Promise.all(cloudProvider.generateSignedURLs(config.cloud_config.container, updatedFileNames, access, urlExpiry));
const signedUrlList = _.map(preSignedUrls, list => {
const fileNameWithUid = _.keys(list)[0];
return {
filePath: getFilePath(fileNameWithUid, containerType),
fileName: filesList.get(fileNameWithUid),
preSignedUrl: _.values(list)[0]
};
});
return signedUrlList;
}

const getFilePath = (file: string, containerType: string) => {
const datasetUploadPath = `${config.presigned_url_configs.service}/user_uploads/${file}`;
const connectorUploadPath = `${config.cloud_config.container_prefix}/${file}`;
const paths: Record<string, string> = {
"dataset": datasetUploadPath,
"connector": connectorUploadPath
};
return paths[containerType] || datasetUploadPath;
}

const transformFileNames = (fileList: Array<string | any>, access: string, containerType: string): Record<string, any> => {
if (access === URLAccess.Read) {
return transformReadFiles(fileList, containerType);
}
return transformWriteFiles(fileList, containerType);
}

const transformReadFiles = (fileNames: Array<string | any>, containerType: string) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
Fixed Show fixed Hide fixed
fileMap.set(file, file);
return getFilePath(file, containerType);
});
return { filesList: fileMap, updatedFileNames };
}

const transformWriteFiles = (fileNames: Array<string | any>, containerType: string) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
Fixed Show fixed Hide fixed
const uuid = uuidv4().replace(/-/g, "").slice(0, 6);
const ext = path.extname(file);
const baseName = path.basename(file, ext);
const updatedFileName = `${baseName}_${uuid}${ext}`;
fileMap.set(updatedFileName, file);
return getFilePath(updatedFileName, containerType);
});
return { filesList: fileMap, updatedFileNames };
}

const getURLExpiry = (access: string) => {
return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry;
}
2 changes: 2 additions & 0 deletions api-service/src/v2/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { eventValidation } from "../controllers/EventValidation/EventValidation"
import GenerateSignedURL from "../controllers/GenerateSignedURL/GenerateSignedURL";
import { sqlQuery } from "../controllers/QueryWrapper/SqlQueryWrapper";
import DatasetStatusTansition from "../controllers/DatasetStatusTransition/DatasetStatusTransition";
import connectorRegistryStream from "../controllers/ConnectorRegistryStream/ConnectorRegistryStreamController";
import datasetHealth from "../controllers/DatasetHealth/DatasetHealth";

export const router = express.Router();
Expand All @@ -40,6 +41,7 @@ router.post("/schema/validate", setDataToRequestObject("api.schema.validator"),
router.post("/template/query/:templateId", setDataToRequestObject("api.query.template.query"), queryTemplate);
router.post("/files/generate-url", setDataToRequestObject("api.files.generate-url"), onRequest({ entity: Entity.Management }), GenerateSignedURL);
router.post("/datasets/status-transition", setDataToRequestObject("api.datasets.status-transition"), onRequest({ entity: Entity.Management }), DatasetStatusTansition);
router.post("/connector/stream/upload", setDataToRequestObject("api.connector.stream.upload"), onRequest({ entity: Entity.Management }), connectorRegistryStream);
router.post("/dataset/health", setDataToRequestObject("api.dataset.health"), onRequest({ entity: Entity.Management }), datasetHealth);

//Wrapper Service
Expand Down
3 changes: 2 additions & 1 deletion api-service/src/v2/services/CloudServices/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { GCPStorageService } from "./GCPStorageService";
import * as _ from "lodash";

const cloudProviderName = _.get(config, "cloud_config.cloud_storage_provider");
const cloudProviderConfig = _.get(config, "cloud_config.cloud_storage_config");
const cloudConfig = _.get(config, "cloud_config.cloud_storage_config");
const cloudProviderConfig = _.isString(cloudConfig) ? JSON.parse(cloudConfig) : cloudConfig;

const initialiseServiceProvider = (provider: any, config: any): AzureStorageService | AWSStorageService | GCPStorageService => {
switch (provider) {
Expand Down