From 7004c218e78fa854341f84789d6489a761202068 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 10:53:33 +0530 Subject: [PATCH 01/30] Sanketika-obsrv/issue-tracker: #OBS-150: added connector-registry route, controller,and required configs --- api-service/src/app.ts | 1 + api-service/src/v2/configs/Config.ts | 18 ++- .../ConnectorRegistryStreamController.ts | 113 ++++++++++++++++++ .../GenerateSignedURL/GenerateSignedURL.ts | 12 +- api-service/src/v2/routes/Router.ts | 2 + 5 files changed, 139 insertions(+), 7 deletions(-) create mode 100644 api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts diff --git a/api-service/src/app.ts b/api-service/src/app.ts index bc7aad96..177d106f 100644 --- a/api-service/src/app.ts +++ b/api-service/src/app.ts @@ -24,6 +24,7 @@ app.use(bodyParser.json({ limit: config.body_parser_limit})); app.use(express.text()); app.use(express.json()); app.set("queryServices", services); +app.use(bodyParser.raw({ type: "application/octet-stream", limit: "500mb" })) loadExtensions(app) .finally(() => { diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index 094b1924..c96665d0 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -69,11 +69,12 @@ export const config = { } }, "cloud_config": { - "cloud_storage_provider": process.env.cloud_storage_provider || "aws", // Supported providers - AWS, GCP, Azure - "cloud_storage_region": process.env.cloud_storage_region || "", // Region for the cloud provider storage - "cloud_storage_config": process.env.cloud_storage_config ? JSON.parse(process.env.cloud_storage_config) : {}, // Respective credentials object for cloud provider. Optional if service account provided - "container": process.env.container || "container", // Storage container/bucket name - "container_prefix": process.env.container_prefix || "", // Path to the folder inside container/bucket. Empty if data at root level + "cloud_storage_provider": process.env.cloud_storage_provider || "gcloud", // Supported providers - AWS, GCP, Azure + "cloud_storage_region": process.env.cloud_storage_region || "us-east-2", // Region for the cloud provider storage + "cloud_storage_config": process.env.cloud_storage_config || {}, // Respective credentials object for cloud provider. Optional if service account provided + "container": process.env.container || "exhaust-test-bucket", // Storage container/bucket name + "connector_container": process.env.container || "connector-registry", + "container_prefix": process.env.container_prefix || "connector-registry", // Path to the folder inside container/bucket. Empty if data at root level "storage_url_expiry": process.env.storage_url_expiry ? parseInt(process.env.storage_url_expiry) : 3600, // in seconds, Default 1hr of expiry for Signed URLs. "maxQueryDateRange": process.env.exhaust_query_range ? parseInt(process.env.exhaust_query_range) : 31, // in days. Defines the maximum no. of days the files can be fetched "exclude_exhaust_types": process.env.exclude_exhaust_types ? process.env.exclude_exhaust_types.split(",") : ["system-stats", "masterdata-system-stats", "system-events",] // list of folder type names to skip exhaust service @@ -90,7 +91,12 @@ export const config = { }, "command_service_config": { "host": process.env.command_service_host || "http://localhost", - "port": parseInt(process.env.command_service_port || "8000"), + "port": parseInt(process.env.command_service_port || "9999"), "path": process.env.command_service_path || "/system/v1/dataset/command" + }, + "obsrv_api_service_config": { + "host": process.env.obser_api_service_host || "http://localhost", + "port": parseInt(process.env.obser_api_service_port || "3007"), + "generate_url_path": process.env.generate_url_path || "/v2/files/generate-url" } } diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts new file mode 100644 index 00000000..702d79a9 --- /dev/null +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -0,0 +1,113 @@ +import { Request, Response } from "express"; +import { ResponseHandler } from "../../helpers/ResponseHandler"; +import _ from "lodash"; +import logger from "../../logger"; +import { config } from "../../configs/Config"; +import axios from "axios"; +import httpStatus from "http-status"; +import busboy from "busboy"; +import { v4 } from "uuid" +import { PassThrough } from "stream"; +import { URLAccess } from "../../types/SampleURLModel"; + +export const apiId = "api.files.generate-url"; +export const code = "FILES_GENERATE_URL_FAILURE"; + +const apiServiceHost = _.get(config, ["obsrv_api_service_config", "host"]); +const apiServicePort = _.get(config, ["obsrv_api_service_config", "port"]); +const generateSignedURLPath = _.get(config, ["obsrv_api_service_config", "generate_url_path"]); + +const getGenerateSignedURLRequestBody = (files: string[], access: string) => ({ + id: apiId, + ver: "v2", + ts: new Date().toISOString(), + params: { + msgid: v4() + }, + request: { + files, + access: access || URLAccess.Read, + type: "connector" + } +}); + +const connectorRegistryStream = async (req: Request, res: Response) => { + const resmsgid = _.get(res, "resmsgid"); + try { + const uploadStreamResponse: any = await uploadStream(req); + // console.log({ uploadStreamResponse }) + // const readPreSignedUrlsPromises = uploadStreamResponse.map(async (filePath: any) => { + // const readPreSignedUrl: any = await generatePresignedUrl(filePath, URLAccess.Read); + // return readPreSignedUrl[0]?.preSignedUrl; + // }); + // const preSignedReadUrls = await Promise.all(readPreSignedUrlsPromises); + logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` }) + ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: "Successfully uploaded", preSignedReadUrls: uploadStreamResponse } }) + } catch (error: any) { + logger.error(error, apiId, resmsgid, code); + const statusCode = _.get(error, "statusCode", 500); + const errorMessage = statusCode === 500 ? { code, message: "Failed to upload" } : error; + ResponseHandler.errorResponse(errorMessage, req, res); + } +}; + +export const generatePresignedUrl = async (fileName: string, access: string) => { + try { + const requestBody = getGenerateSignedURLRequestBody([fileName], access); + const response = await axios.post(`${apiServiceHost}:${apiServicePort}${generateSignedURLPath}`, requestBody); + return response?.data?.result; + } + catch (err) { + throw err + } +}; + +const uploadStream = async (req: Request) => { + return new Promise((resolve, reject) => { + const filePromises: Promise[] = []; + const bb = busboy({ headers: req.headers }); + console.log(bb, true); + const match: any[] = []; + + bb.on("file", async (name: any, file: any, info: any) => { + const processFile = async () => { + const fileName = info?.filename; + const preSignedUrl: any = await generatePresignedUrl(fileName, URLAccess.Write); + const filePath = preSignedUrl[0]?.filePath + const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g; + match.push(...filePath.match(regex)); + const pass = new PassThrough(); + file.pipe(pass); + const fileBuffer = await streamToBuffer(pass); + await axios.put(preSignedUrl[0]?.preSignedUrl, fileBuffer, { + headers: { + "Content-Type": info.mimeType, + "Content-Length": fileBuffer.length, + } + }); + }; + filePromises.push(processFile()); + }); + bb.on("close", async () => { + try { + await Promise.all(filePromises); + resolve(match); + } catch (error) { + reject(error); + } + }); + bb.on("error", reject); + req.pipe(bb); + }) +} + +const streamToBuffer = (stream: PassThrough): Promise => { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + stream.on("data", (chunk) => chunks.push(chunk)); + stream.on("end", () => resolve(Buffer.concat(chunks))); + stream.on("error", reject); + }); +}; + +export default connectorRegistryStream; diff --git a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts index 6182f9b3..7979cafa 100644 --- a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts +++ b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts @@ -15,11 +15,13 @@ import path from "path"; export const apiId = "api.files.generate-url" export const code = "FILES_GENERATE_URL_FAILURE" const maxFiles = config.presigned_url_configs.maxFiles +let containerType: string; const generateSignedURL = async (req: Request, res: Response) => { const requestBody = req.body const msgid = _.get(req, ["body", "params", "msgid"]); const resmsgid = _.get(res, "resmsgid"); + containerType = _.get(req, ["body", "request", "type"]); try { const isRequestValid: Record = schemaValidation(req.body, GenerateURL) if (!isRequestValid.isValid) { @@ -75,7 +77,15 @@ const generateSignedURL = async (req: Request, res: Response) => { } const getFilePath = (file: string) => { - return `${config.cloud_config.container}/${config.presigned_url_configs.service}/user_uploads/${file}` + const datasetUploadPath = `${config.cloud_config.container}/${config.presigned_url_configs.service}/user_uploads/${file}`; + const connectorUploadPath = `${config.cloud_config.connector_container}/${file}`; + + const paths: Record = { + "dataset": datasetUploadPath, + "connector": connectorUploadPath + }; + + return paths[containerType] || datasetUploadPath; } const transformFileNames = (fileList: Array, access: string): Record => { diff --git a/api-service/src/v2/routes/Router.ts b/api-service/src/v2/routes/Router.ts index ebdbe423..7ad01a2a 100644 --- a/api-service/src/v2/routes/Router.ts +++ b/api-service/src/v2/routes/Router.ts @@ -20,6 +20,7 @@ import { eventValidation } from "../controllers/EventValidation/EventValidation" import GenerateSignedURL from "../controllers/GenerateSignedURL/GenerateSignedURL"; import { sqlQuery } from "../controllers/QueryWrapper/SqlQueryWrapper"; import DatasetStatusTansition from "../controllers/DatasetStatusTransition/DatasetStatusTransition"; +import connectorRegistryStream from "../controllers/ConnectorRegistryStream/ConnectorRegistryStreamController"; export const router = express.Router(); @@ -39,6 +40,7 @@ router.post("/schema/validate", setDataToRequestObject("api.schema.validator"), router.post("/template/query/:templateId", setDataToRequestObject("api.query.template.query"), queryTemplate); router.post("/files/generate-url", setDataToRequestObject("api.files.generate-url"), onRequest({ entity: Entity.Management }), GenerateSignedURL); router.post("/datasets/status-transition", setDataToRequestObject("api.datasets.status-transition"), onRequest({ entity: Entity.Management }), DatasetStatusTansition); +router.post("/connector/stream/upload", setDataToRequestObject("api.connector.stream.upload"), onRequest({ entity: Entity.Management }), connectorRegistryStream); //Wrapper Service router.post("/obsrv/data/sql-query", setDataToRequestObject("api.obsrv.data.sql-query"), onRequest({ entity: Entity.Data_out }), sqlQuery); From ab2a37a492d076fb8b476e2a322bf1a9aefee19a Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 11:18:45 +0530 Subject: [PATCH 02/30] Sanketika-obsrv/issue-tracker: #OBS-150: added packages --- api-service/package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api-service/package.json b/api-service/package.json index e4754097..1df0f26e 100644 --- a/api-service/package.json +++ b/api-service/package.json @@ -28,6 +28,7 @@ "aws-sdk": "^2.1348.0", "axios": "^1.6.0", "body-parser": "^1.20.2", + "busboy": "^1.6.0", "compression": "^1.7.4", "dateformat": "2.0.0", "express": "^4.18.2", @@ -43,6 +44,7 @@ "moment": "^2.29.4", "multiparty": "4.2.1", "node-sql-parser": "^5.1.0", + "path": "^0.12.7", "pg": "^8.11.3", "pg-hstore": "^2.3.4", "prom-client": "^14.2.0", @@ -58,6 +60,7 @@ "@babel/traverse": "7.23.2" }, "devDependencies": { + "@types/busboy": "^1.5.4", "@types/chai": "^4.3.3", "@types/chai-as-promised": "^7.1.5", "@types/chai-spies": "^1.0.3", From 6edf484465f0623c1a7366c7df5d8f90f9281b01 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 11:31:49 +0530 Subject: [PATCH 03/30] Sanketika-obsrv/issue-tracker: #OBS-150: added download link as response --- .../ConnectorRegistryStreamController.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 702d79a9..3f081b3d 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -36,13 +36,13 @@ const connectorRegistryStream = async (req: Request, res: Response) => { try { const uploadStreamResponse: any = await uploadStream(req); // console.log({ uploadStreamResponse }) - // const readPreSignedUrlsPromises = uploadStreamResponse.map(async (filePath: any) => { - // const readPreSignedUrl: any = await generatePresignedUrl(filePath, URLAccess.Read); - // return readPreSignedUrl[0]?.preSignedUrl; - // }); - // const preSignedReadUrls = await Promise.all(readPreSignedUrlsPromises); + const readPreSignedUrlsPromises = uploadStreamResponse.map(async (filePath: any) => { + const readPreSignedUrl: any = await generatePresignedUrl(filePath, URLAccess.Read); + return readPreSignedUrl[0]?.preSignedUrl; + }); + const preSignedReadUrls = await Promise.all(readPreSignedUrlsPromises); logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` }) - ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: "Successfully uploaded", preSignedReadUrls: uploadStreamResponse } }) + ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: "Successfully uploaded", preSignedReadUrls: preSignedReadUrls } }) } catch (error: any) { logger.error(error, apiId, resmsgid, code); const statusCode = _.get(error, "statusCode", 500); From 23bd5b78e742d09e0c13ab2f2eaf116df1a83c6f Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 11:32:26 +0530 Subject: [PATCH 04/30] Sanketika-obsrv/issue-tracker: #OBS-150: deciding path based on type --- .../v2/controllers/GenerateSignedURL/GenerateSignedURL.ts | 2 +- .../GenerateSignedURLValidationSchema.json | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts index 7979cafa..04f60329 100644 --- a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts +++ b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts @@ -78,7 +78,7 @@ const generateSignedURL = async (req: Request, res: Response) => { const getFilePath = (file: string) => { const datasetUploadPath = `${config.cloud_config.container}/${config.presigned_url_configs.service}/user_uploads/${file}`; - const connectorUploadPath = `${config.cloud_config.connector_container}/${file}`; + const connectorUploadPath = `${config.cloud_config.container}/${config.cloud_config.container_prefix}/${file}`; const paths: Record = { "dataset": datasetUploadPath, diff --git a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURLValidationSchema.json b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURLValidationSchema.json index 553a784e..77db2e33 100644 --- a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURLValidationSchema.json +++ b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURLValidationSchema.json @@ -42,6 +42,13 @@ "read", "write" ] + }, + "type": { + "type": "string", + "enum": [ + "dataset", + "connector" + ] } }, "additionalProperties": false, From f324134c41e21e89fb59fe9d5db594ed67b1dde0 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 11:33:13 +0530 Subject: [PATCH 05/30] Sanketika-obsrv/issue-tracker: #OBS-150: removed container config --- api-service/src/v2/configs/Config.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index c96665d0..2c0e7300 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -69,11 +69,10 @@ export const config = { } }, "cloud_config": { - "cloud_storage_provider": process.env.cloud_storage_provider || "gcloud", // Supported providers - AWS, GCP, Azure + "cloud_storage_provider": process.env.cloud_storage_provider || "aws", // Supported providers - AWS, GCP, Azure "cloud_storage_region": process.env.cloud_storage_region || "us-east-2", // Region for the cloud provider storage "cloud_storage_config": process.env.cloud_storage_config || {}, // Respective credentials object for cloud provider. Optional if service account provided "container": process.env.container || "exhaust-test-bucket", // Storage container/bucket name - "connector_container": process.env.container || "connector-registry", "container_prefix": process.env.container_prefix || "connector-registry", // Path to the folder inside container/bucket. Empty if data at root level "storage_url_expiry": process.env.storage_url_expiry ? parseInt(process.env.storage_url_expiry) : 3600, // in seconds, Default 1hr of expiry for Signed URLs. "maxQueryDateRange": process.env.exhaust_query_range ? parseInt(process.env.exhaust_query_range) : 31, // in days. Defines the maximum no. of days the files can be fetched From 6f743d30e778bf877d8bcbc74429be681c05ec7c Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 11:55:49 +0530 Subject: [PATCH 06/30] Sanketika-obsrv/issue-tracker: #OBS-150: refactored container saving path --- .../src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts index 04f60329..e2bc2855 100644 --- a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts +++ b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts @@ -77,8 +77,8 @@ const generateSignedURL = async (req: Request, res: Response) => { } const getFilePath = (file: string) => { - const datasetUploadPath = `${config.cloud_config.container}/${config.presigned_url_configs.service}/user_uploads/${file}`; - const connectorUploadPath = `${config.cloud_config.container}/${config.cloud_config.container_prefix}/${file}`; + const datasetUploadPath = `${config.presigned_url_configs.service}/user_uploads/${file}`; + const connectorUploadPath = `${config.cloud_config.container_prefix}/${file}`; const paths: Record = { "dataset": datasetUploadPath, From 90a5f88a7b846ef2f673a719f71311a4225e2286 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 16:47:04 +0530 Subject: [PATCH 07/30] Sanketika-obsrv/issue-tracker: #OBS-150: removed bodyparser --- api-service/src/app.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/api-service/src/app.ts b/api-service/src/app.ts index 177d106f..bc7aad96 100644 --- a/api-service/src/app.ts +++ b/api-service/src/app.ts @@ -24,7 +24,6 @@ app.use(bodyParser.json({ limit: config.body_parser_limit})); app.use(express.text()); app.use(express.json()); app.set("queryServices", services); -app.use(bodyParser.raw({ type: "application/octet-stream", limit: "500mb" })) loadExtensions(app) .finally(() => { From 795397ffdde68fe25e34be1114c7c752ed385b25 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 16:52:45 +0530 Subject: [PATCH 08/30] Sanketika-obsrv/issue-tracker: #OBS-150: implementation of calling command api with relative path as request body --- .../ConnectorRegistryStreamController.ts | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 3f081b3d..082d7a0e 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -9,6 +9,7 @@ import busboy from "busboy"; import { v4 } from "uuid" import { PassThrough } from "stream"; import { URLAccess } from "../../types/SampleURLModel"; +import { ErrorObject } from "../../types/ResponseModel"; export const apiId = "api.files.generate-url"; export const code = "FILES_GENERATE_URL_FAILURE"; @@ -17,6 +18,10 @@ const apiServiceHost = _.get(config, ["obsrv_api_service_config", "host"]); const apiServicePort = _.get(config, ["obsrv_api_service_config", "port"]); const generateSignedURLPath = _.get(config, ["obsrv_api_service_config", "generate_url_path"]); +const commandServiceHost = _.get(config, ["command_service_config", "host"]); +const commandServicePort = _.get(config, ["command_service_config", "port"]); +const registryUrl = _.get(config, ["command_service_config", "connector_registry_path"]) + const getGenerateSignedURLRequestBody = (files: string[], access: string) => ({ id: apiId, ver: "v2", @@ -35,18 +40,20 @@ const connectorRegistryStream = async (req: Request, res: Response) => { const resmsgid = _.get(res, "resmsgid"); try { const uploadStreamResponse: any = await uploadStream(req); - // console.log({ uploadStreamResponse }) - const readPreSignedUrlsPromises = uploadStreamResponse.map(async (filePath: any) => { - const readPreSignedUrl: any = await generatePresignedUrl(filePath, URLAccess.Read); - return readPreSignedUrl[0]?.preSignedUrl; - }); - const preSignedReadUrls = await Promise.all(readPreSignedUrlsPromises); + const registryRequestBody = { + relative_path: uploadStreamResponse[0] + } logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` }) - ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: "Successfully uploaded", preSignedReadUrls: preSignedReadUrls } }) + const registryResponse = await axios.post(`${commandServiceHost}:${commandServicePort}${registryUrl}`, registryRequestBody); + logger.info({ apiId, resmsgid, message: `Connector registered successfully` }) + ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } }) } catch (error: any) { logger.error(error, apiId, resmsgid, code); - const statusCode = _.get(error, "statusCode", 500); - const errorMessage = statusCode === 500 ? { code, message: "Failed to upload" } : error; + let errorMessage = error; + const statusCode = _.get(error, "statusCode") + if (!statusCode || statusCode == 500) { + errorMessage = { code, message: "Failed to read dataset" } + } ResponseHandler.errorResponse(errorMessage, req, res); } }; @@ -58,7 +65,12 @@ export const generatePresignedUrl = async (fileName: string, access: string) => return response?.data?.result; } catch (err) { - throw err + throw { + code: "FILES_GENERATE_URL_FAILURE", + message: "Failed to generate sample urls", + statusCode: 400, + errCode: "BAD_REQUEST" + } as ErrorObject } }; @@ -66,10 +78,21 @@ const uploadStream = async (req: Request) => { return new Promise((resolve, reject) => { const filePromises: Promise[] = []; const bb = busboy({ headers: req.headers }); - console.log(bb, true); const match: any[] = []; + let fileCount = 0; bb.on("file", async (name: any, file: any, info: any) => { + if (fileCount > 0) { + // If more than one file is detected, reject the request + bb.emit("error", reject({ + code: "FAILED_TO_UPLOAD", + message: "Uploading multiple files are not allowed", + statusCode: 400, + errCode: "BAD_REQUEST" + })); + return + } + fileCount++; const processFile = async () => { const fileName = info?.filename; const preSignedUrl: any = await generatePresignedUrl(fileName, URLAccess.Write); From 62261dd2885d61e9cae97d1f3787ac890cc50eb5 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 17:47:28 +0530 Subject: [PATCH 09/30] Sanketika-obsrv/issue-tracker: #OBS-150: updated error codes and added error handling --- .../ConnectorRegistryStreamController.ts | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 082d7a0e..20c7d3c2 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -12,7 +12,7 @@ import { URLAccess } from "../../types/SampleURLModel"; import { ErrorObject } from "../../types/ResponseModel"; export const apiId = "api.files.generate-url"; -export const code = "FILES_GENERATE_URL_FAILURE"; +export const code = "FAILED_TO_REGISTER_CONNECTOR"; const apiServiceHost = _.get(config, ["obsrv_api_service_config", "host"]); const apiServicePort = _.get(config, ["obsrv_api_service_config", "port"]); @@ -48,11 +48,12 @@ const connectorRegistryStream = async (req: Request, res: Response) => { logger.info({ apiId, resmsgid, message: `Connector registered successfully` }) ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } }) } catch (error: any) { + const errMessage = _.get(error, "response.data.error.message") logger.error(error, apiId, resmsgid, code); let errorMessage = error; const statusCode = _.get(error, "statusCode") if (!statusCode || statusCode == 500) { - errorMessage = { code, message: "Failed to read dataset" } + errorMessage = { code, message: errMessage || "Failed to register connector" } } ResponseHandler.errorResponse(errorMessage, req, res); } @@ -78,7 +79,7 @@ const uploadStream = async (req: Request) => { return new Promise((resolve, reject) => { const filePromises: Promise[] = []; const bb = busboy({ headers: req.headers }); - const match: any[] = []; + const relative_path: any[] = []; let fileCount = 0; bb.on("file", async (name: any, file: any, info: any) => { @@ -97,8 +98,8 @@ const uploadStream = async (req: Request) => { const fileName = info?.filename; const preSignedUrl: any = await generatePresignedUrl(fileName, URLAccess.Write); const filePath = preSignedUrl[0]?.filePath - const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g; - match.push(...filePath.match(regex)); + const fileNameExtracted = extractFileNameFromPath(filePath); + relative_path.push(...fileNameExtracted); const pass = new PassThrough(); file.pipe(pass); const fileBuffer = await streamToBuffer(pass); @@ -114,12 +115,16 @@ const uploadStream = async (req: Request) => { bb.on("close", async () => { try { await Promise.all(filePromises); - resolve(match); + resolve(relative_path); } catch (error) { - reject(error); + reject({ + code: "FAILED_TO_UPLOAD", + message: "Fail to upload a file", + statusCode: 400, + errCode: "BAD_REQUEST" + }); } }); - bb.on("error", reject); req.pipe(bb); }) } @@ -133,4 +138,9 @@ const streamToBuffer = (stream: PassThrough): Promise => { }); }; +const extractFileNameFromPath = (filePath: string): string[] => { + const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g; + return filePath.match(regex) || []; +}; + export default connectorRegistryStream; From 38e2f1ead1318ea450cb5e33ea397829335c16e4 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 17:50:55 +0530 Subject: [PATCH 10/30] Sanketika-obsrv/issue-tracker: #OBS-150: added config route for command api --- api-service/src/v2/configs/Config.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index 2c0e7300..2d59c935 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -91,11 +91,12 @@ export const config = { "command_service_config": { "host": process.env.command_service_host || "http://localhost", "port": parseInt(process.env.command_service_port || "9999"), - "path": process.env.command_service_path || "/system/v1/dataset/command" + "path": process.env.command_service_path || "/system/v1/dataset/command", + "connector_registry_path": process.env.connector_registry_path || "/connector/v1/register", }, "obsrv_api_service_config": { "host": process.env.obser_api_service_host || "http://localhost", "port": parseInt(process.env.obser_api_service_port || "3007"), - "generate_url_path": process.env.generate_url_path || "/v2/files/generate-url" + "generate_url_path": process.env.generate_url_path || "/v2/files/generate-url", } } From acfd5de918340eb659014a3b9c81602f4bda88a6 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 18:05:20 +0530 Subject: [PATCH 11/30] Sanketika-obsrv/issue-tracker: #OBS-150: added promise reject on error --- .../ConnectorRegistryStreamController.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 20c7d3c2..fe2ee46b 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -79,7 +79,7 @@ const uploadStream = async (req: Request) => { return new Promise((resolve, reject) => { const filePromises: Promise[] = []; const bb = busboy({ headers: req.headers }); - const relative_path: any[] = []; + const match: any[] = []; let fileCount = 0; bb.on("file", async (name: any, file: any, info: any) => { @@ -98,8 +98,8 @@ const uploadStream = async (req: Request) => { const fileName = info?.filename; const preSignedUrl: any = await generatePresignedUrl(fileName, URLAccess.Write); const filePath = preSignedUrl[0]?.filePath - const fileNameExtracted = extractFileNameFromPath(filePath); - relative_path.push(...fileNameExtracted); + const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g; + match.push(...filePath.match(regex)); const pass = new PassThrough(); file.pipe(pass); const fileBuffer = await streamToBuffer(pass); @@ -115,7 +115,7 @@ const uploadStream = async (req: Request) => { bb.on("close", async () => { try { await Promise.all(filePromises); - resolve(relative_path); + resolve(match); } catch (error) { reject({ code: "FAILED_TO_UPLOAD", @@ -125,6 +125,7 @@ const uploadStream = async (req: Request) => { }); } }); + bb.on("error", reject); req.pipe(bb); }) } @@ -138,9 +139,4 @@ const streamToBuffer = (stream: PassThrough): Promise => { }); }; -const extractFileNameFromPath = (filePath: string): string[] => { - const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g; - return filePath.match(regex) || []; -}; - export default connectorRegistryStream; From 4808e68708a8cd4af74d546d1a1c697fc5db7d27 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 18:10:47 +0530 Subject: [PATCH 12/30] Sanketika-obsrv/issue-tracker: #OBS-150: added regex to find filename in presigned link --- .../ConnectorRegistryStreamController.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index fe2ee46b..941e9dfb 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -79,7 +79,7 @@ const uploadStream = async (req: Request) => { return new Promise((resolve, reject) => { const filePromises: Promise[] = []; const bb = busboy({ headers: req.headers }); - const match: any[] = []; + const relative_path: any[] = []; let fileCount = 0; bb.on("file", async (name: any, file: any, info: any) => { @@ -98,8 +98,8 @@ const uploadStream = async (req: Request) => { const fileName = info?.filename; const preSignedUrl: any = await generatePresignedUrl(fileName, URLAccess.Write); const filePath = preSignedUrl[0]?.filePath - const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g; - match.push(...filePath.match(regex)); + const fileNameExtracted = extractFileNameFromPath(filePath); + relative_path.push(...fileNameExtracted); const pass = new PassThrough(); file.pipe(pass); const fileBuffer = await streamToBuffer(pass); @@ -115,7 +115,7 @@ const uploadStream = async (req: Request) => { bb.on("close", async () => { try { await Promise.all(filePromises); - resolve(match); + resolve(relative_path); } catch (error) { reject({ code: "FAILED_TO_UPLOAD", @@ -139,4 +139,9 @@ const streamToBuffer = (stream: PassThrough): Promise => { }); }; +const extractFileNameFromPath = (filePath: string): string[] => { + const regex = /(?<=\/)[^/]+\.[^/]+(?=\/|$)/g; + return filePath.match(regex) || []; +}; + export default connectorRegistryStream; From 0aaec483caf2b2540cce651c1cf46b700884d319 Mon Sep 17 00:00:00 2001 From: yashashk Date: Tue, 25 Jun 2024 18:21:25 +0530 Subject: [PATCH 13/30] Sanketika-obsrv/issue-tracker: #OBS-150: changed default port --- api-service/src/v2/configs/Config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index 2d59c935..ddec9c29 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -96,7 +96,7 @@ export const config = { }, "obsrv_api_service_config": { "host": process.env.obser_api_service_host || "http://localhost", - "port": parseInt(process.env.obser_api_service_port || "3007"), + "port": parseInt(process.env.obser_api_service_port || "3000"), "generate_url_path": process.env.generate_url_path || "/v2/files/generate-url", } } From 72171d365f9128484c4544d1051c37815741834e Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 12:56:40 +0530 Subject: [PATCH 14/30] Sanketika-obsrv/issue-tracker: #OBS-150: checking type and setting value --- api-service/src/v2/services/CloudServices/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api-service/src/v2/services/CloudServices/index.ts b/api-service/src/v2/services/CloudServices/index.ts index 6245ad19..c458016d 100644 --- a/api-service/src/v2/services/CloudServices/index.ts +++ b/api-service/src/v2/services/CloudServices/index.ts @@ -5,7 +5,8 @@ import { GCPStorageService } from "./GCPStorageService"; import * as _ from "lodash"; const cloudProviderName = _.get(config, "cloud_config.cloud_storage_provider"); -const cloudProviderConfig = _.get(config, "cloud_config.cloud_storage_config"); +const cloudConfig = _.get(config, "cloud_config.cloud_storage_config"); +const cloudProviderConfig = _.isString(cloudConfig) ? JSON.parse(cloudConfig) : cloudConfig; const initialiseServiceProvider = (provider: any, config: any): AzureStorageService | AWSStorageService | GCPStorageService => { switch (provider) { From 6d9f8e8224614b8b79273c1cc38774ead8b00cea Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 13:11:19 +0530 Subject: [PATCH 15/30] Sanketika-obsrv/issue-tracker: #OBS-150: chnaged config to defaults --- api-service/src/v2/configs/Config.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index afabc4ca..b30783cb 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -74,11 +74,11 @@ export const config = { } }, "cloud_config": { - "cloud_storage_provider": process.env.cloud_storage_provider || "aws", // Supported providers - AWS, GCP, Azure - "cloud_storage_region": process.env.cloud_storage_region || "us-east-2", // Region for the cloud provider storage - "cloud_storage_config": process.env.cloud_storage_config || {}, // Respective credentials object for cloud provider. Optional if service account provided - "container": process.env.container || "exhaust-test-bucket", // Storage container/bucket name - "container_prefix": process.env.container_prefix || "connector-registry", // Path to the folder inside container/bucket. Empty if data at root level + "cloud_storage_provider": process.env.cloud_storage_provider || "aws", // Supported providers - AWS, GCP, Azure + "cloud_storage_region": process.env.cloud_storage_region || "", // Region for the cloud provider storage + "cloud_storage_config": process.env.cloud_storage_config ? JSON.parse(process.env.cloud_storage_config) : {}, // Respective credentials object for cloud provider. Optional if service account provided + "container": process.env.container || "container", // Storage container/bucket name + "container_prefix": process.env.container_prefix || "", // Path to the folder inside container/bucket. Empty if data at root level "storage_url_expiry": process.env.storage_url_expiry ? parseInt(process.env.storage_url_expiry) : 3600, // in seconds, Default 1hr of expiry for Signed URLs. "maxQueryDateRange": process.env.exhaust_query_range ? parseInt(process.env.exhaust_query_range) : 31, // in days. Defines the maximum no. of days the files can be fetched "exclude_exhaust_types": process.env.exclude_exhaust_types ? process.env.exclude_exhaust_types.split(",") : ["system-stats", "masterdata-system-stats", "system-events",] // list of folder type names to skip exhaust service @@ -95,7 +95,7 @@ export const config = { }, "command_service_config": { "host": process.env.command_service_host || "http://localhost", - "port": parseInt(process.env.command_service_port || "9999"), + "port": parseInt(process.env.command_service_port || "8000"), "path": process.env.command_service_path || "/system/v1/dataset/command", "connector_registry_path": process.env.connector_registry_path || "/connector/v1/register", }, From 2dbed92802c918ef8eaf9877f7e01b690835fdca Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 13:13:08 +0530 Subject: [PATCH 16/30] Sanketika-obsrv/issue-tracker: #OBS-150: chnaged config to defaults --- api-service/src/v2/configs/Config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index b30783cb..3ec3d345 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -74,7 +74,7 @@ export const config = { } }, "cloud_config": { - "cloud_storage_provider": process.env.cloud_storage_provider || "aws", // Supported providers - AWS, GCP, Azure + "cloud_storage_provider": process.env.cloud_storage_provider || "aws", // Supported providers - AWS, GCP, Azure "cloud_storage_region": process.env.cloud_storage_region || "", // Region for the cloud provider storage "cloud_storage_config": process.env.cloud_storage_config ? JSON.parse(process.env.cloud_storage_config) : {}, // Respective credentials object for cloud provider. Optional if service account provided "container": process.env.container || "container", // Storage container/bucket name From 4c911b8bc76fc64a0f1c62d839c11949193b3e35 Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 15:17:30 +0530 Subject: [PATCH 17/30] Sanketika-obsrv/issue-tracker: #OBS-150: removed path dependency --- api-service/package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/api-service/package.json b/api-service/package.json index d7b15957..c7d2370b 100644 --- a/api-service/package.json +++ b/api-service/package.json @@ -44,7 +44,6 @@ "moment": "^2.29.4", "multiparty": "4.2.1", "node-sql-parser": "^5.1.0", - "path": "^0.12.7", "pg": "^8.11.3", "pg-hstore": "^2.3.4", "prom-client": "^14.2.0", From f3b105ed2e39bf7ab4dbf9eff30203495fe25f10 Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 17:37:34 +0530 Subject: [PATCH 18/30] Sanketika-obsrv/issue-tracker: #OBS-150: changes in config file --- api-service/src/v2/configs/Config.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index 3ec3d345..81ef91e0 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -95,13 +95,13 @@ export const config = { }, "command_service_config": { "host": process.env.command_service_host || "http://localhost", - "port": parseInt(process.env.command_service_port || "8000"), + "port": parseInt(process.env.command_service_port || "9999"), "path": process.env.command_service_path || "/system/v1/dataset/command", "connector_registry_path": process.env.connector_registry_path || "/connector/v1/register", }, "obsrv_api_service_config": { - "host": process.env.obser_api_service_host || "http://localhost", - "port": parseInt(process.env.obser_api_service_port || "3000"), + "host": process.env.obsrv_api_service_host || "http://localhost", + "port": parseInt(process.env.obsrv_api_service_port || "3000"), "generate_url_path": process.env.generate_url_path || "/v2/files/generate-url", "path": process.env.command_service_path || "/system/v1/dataset/command" }, From f95167dd26796e589ed1f2f82f519fc8ae1e240e Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 17:38:37 +0530 Subject: [PATCH 19/30] Sanketika-obsrv/issue-tracker: #OBS-150: changes in config file --- api-service/src/v2/configs/Config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index 81ef91e0..867f26fb 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -95,7 +95,7 @@ export const config = { }, "command_service_config": { "host": process.env.command_service_host || "http://localhost", - "port": parseInt(process.env.command_service_port || "9999"), + "port": parseInt(process.env.command_service_port || "8000"), "path": process.env.command_service_path || "/system/v1/dataset/command", "connector_registry_path": process.env.connector_registry_path || "/connector/v1/register", }, From 685e0729a13e080014dffa0267335ac84af2165e Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 17:46:21 +0530 Subject: [PATCH 20/30] Sanketika-obsrv/issue-tracker: #OBS-150: updated api id --- .../ConnectorRegistryStreamController.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 941e9dfb..8bda5527 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -11,7 +11,7 @@ import { PassThrough } from "stream"; import { URLAccess } from "../../types/SampleURLModel"; import { ErrorObject } from "../../types/ResponseModel"; -export const apiId = "api.files.generate-url"; +export const apiId = "api.connector.stream.upload"; export const code = "FAILED_TO_REGISTER_CONNECTOR"; const apiServiceHost = _.get(config, ["obsrv_api_service_config", "host"]); From 2c216fea140fa5e0c798cf09b7fcb2261b616843 Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 17:48:07 +0530 Subject: [PATCH 21/30] Sanketika-obsrv/issue-tracker: #OBS-150: added default ts in request body --- .../ConnectorRegistryStreamController.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 8bda5527..1f5c8cfe 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -25,7 +25,7 @@ const registryUrl = _.get(config, ["command_service_config", "connector_registry const getGenerateSignedURLRequestBody = (files: string[], access: string) => ({ id: apiId, ver: "v2", - ts: new Date().toISOString(), + ts: Date.now(), params: { msgid: v4() }, From d7d94cadb7b39456d8e376a76b5a6fcd83edcf95 Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 26 Jun 2024 18:09:16 +0530 Subject: [PATCH 22/30] Sanketika-obsrv/issue-tracker: #OBS-150: added default ts in request body --- .../ConnectorRegistryStreamController.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 1f5c8cfe..1ee82751 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -23,9 +23,9 @@ const commandServicePort = _.get(config, ["command_service_config", "port"]); const registryUrl = _.get(config, ["command_service_config", "connector_registry_path"]) const getGenerateSignedURLRequestBody = (files: string[], access: string) => ({ - id: apiId, + id: "api.files.generate-url", ver: "v2", - ts: Date.now(), + ts: Date.now().toString(), params: { msgid: v4() }, From 422a80c3b4caf979e626e5bcfdc720897c9ee8b9 Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 15:18:42 +0530 Subject: [PATCH 23/30] Sanketika-obsrv/issue-tracker: #OBS-150: removed config --- api-service/src/v2/configs/Config.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index 867f26fb..60a048c1 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -99,12 +99,6 @@ export const config = { "path": process.env.command_service_path || "/system/v1/dataset/command", "connector_registry_path": process.env.connector_registry_path || "/connector/v1/register", }, - "obsrv_api_service_config": { - "host": process.env.obsrv_api_service_host || "http://localhost", - "port": parseInt(process.env.obsrv_api_service_port || "3000"), - "generate_url_path": process.env.generate_url_path || "/v2/files/generate-url", - "path": process.env.command_service_path || "/system/v1/dataset/command" - }, "flink_job_configs": { "pipeline_merged_job_manager_url": process.env.pipeline_merged_job_manager_url || "http://localhost:8081", "masterdata_processor_job_manager_url": process.env.masterdata_processor_job_manager_url || "http://localhost:8081" From 7091aec6b8576ffd269e2dc3f7661003b9cbe91f Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 15:20:02 +0530 Subject: [PATCH 24/30] Sanketika-obsrv/issue-tracker: #OBS-150: generalized generate signed url method --- .../ConnectorRegistryStreamController.ts | 79 +++++++------------ .../GenerateSignedURL/GenerateSignedURL.ts | 66 +--------------- .../controllers/GenerateSignedURL/helper.ts | 64 +++++++++++++++ 3 files changed, 94 insertions(+), 115 deletions(-) create mode 100644 api-service/src/v2/controllers/GenerateSignedURL/helper.ts diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index 1ee82751..dc66d595 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -6,38 +6,19 @@ import { config } from "../../configs/Config"; import axios from "axios"; import httpStatus from "http-status"; import busboy from "busboy"; -import { v4 } from "uuid" import { PassThrough } from "stream"; -import { URLAccess } from "../../types/SampleURLModel"; -import { ErrorObject } from "../../types/ResponseModel"; +import { generatePreSignedUrl } from "../GenerateSignedURL/helper"; export const apiId = "api.connector.stream.upload"; export const code = "FAILED_TO_REGISTER_CONNECTOR"; -const apiServiceHost = _.get(config, ["obsrv_api_service_config", "host"]); -const apiServicePort = _.get(config, ["obsrv_api_service_config", "port"]); -const generateSignedURLPath = _.get(config, ["obsrv_api_service_config", "generate_url_path"]); - const commandServiceHost = _.get(config, ["command_service_config", "host"]); const commandServicePort = _.get(config, ["command_service_config", "port"]); const registryUrl = _.get(config, ["command_service_config", "connector_registry_path"]) - -const getGenerateSignedURLRequestBody = (files: string[], access: string) => ({ - id: "api.files.generate-url", - ver: "v2", - ts: Date.now().toString(), - params: { - msgid: v4() - }, - request: { - files, - access: access || URLAccess.Read, - type: "connector" - } -}); +let resmsgid: string | any; const connectorRegistryStream = async (req: Request, res: Response) => { - const resmsgid = _.get(res, "resmsgid"); + resmsgid = _.get(res, "resmsgid"); try { const uploadStreamResponse: any = await uploadStream(req); const registryRequestBody = { @@ -59,22 +40,6 @@ const connectorRegistryStream = async (req: Request, res: Response) => { } }; -export const generatePresignedUrl = async (fileName: string, access: string) => { - try { - const requestBody = getGenerateSignedURLRequestBody([fileName], access); - const response = await axios.post(`${apiServiceHost}:${apiServicePort}${generateSignedURLPath}`, requestBody); - return response?.data?.result; - } - catch (err) { - throw { - code: "FILES_GENERATE_URL_FAILURE", - message: "Failed to generate sample urls", - statusCode: 400, - errCode: "BAD_REQUEST" - } as ErrorObject - } -}; - const uploadStream = async (req: Request) => { return new Promise((resolve, reject) => { const filePromises: Promise[] = []; @@ -96,19 +61,30 @@ const uploadStream = async (req: Request) => { fileCount++; const processFile = async () => { const fileName = info?.filename; - const preSignedUrl: any = await generatePresignedUrl(fileName, URLAccess.Write); - const filePath = preSignedUrl[0]?.filePath - const fileNameExtracted = extractFileNameFromPath(filePath); - relative_path.push(...fileNameExtracted); - const pass = new PassThrough(); - file.pipe(pass); - const fileBuffer = await streamToBuffer(pass); - await axios.put(preSignedUrl[0]?.preSignedUrl, fileBuffer, { - headers: { - "Content-Type": info.mimeType, - "Content-Length": fileBuffer.length, - } - }); + try { + const preSignedUrl: any = await generatePreSignedUrl("write", [fileName], "connector") + const filePath = preSignedUrl[0]?.filePath + const fileNameExtracted = extractFileNameFromPath(filePath); + relative_path.push(...fileNameExtracted); + const pass = new PassThrough(); + file.pipe(pass); + const fileBuffer = await streamToBuffer(pass); + await axios.put(preSignedUrl[0]?.preSignedUrl, fileBuffer, { + headers: { + "Content-Type": info.mimeType, + "Content-Length": fileBuffer.length, + } + }); + } + catch (err) { + logger.error({ apiId, err, resmsgid, message: "Failed to generate sample urls", code: "FILES_GENERATE_URL_FAILURE" }) + reject({ + code: "FILES_GENERATE_URL_FAILURE", + message: "Failed to generate sample urls", + statusCode: 500, + errCode: "INTERNAL_SERVER_ERROR" + }) + } }; filePromises.push(processFile()); }); @@ -117,6 +93,7 @@ const uploadStream = async (req: Request) => { await Promise.all(filePromises); resolve(relative_path); } catch (error) { + logger.error({ apiId, error, resmsgid, message: "Fail to upload a file", code: "FAILED_TO_UPLOAD" }) reject({ code: "FAILED_TO_UPLOAD", message: "Fail to upload a file", diff --git a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts index e2bc2855..6454b1fa 100644 --- a/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts +++ b/api-service/src/v2/controllers/GenerateSignedURL/GenerateSignedURL.ts @@ -6,11 +6,9 @@ import logger from "../../logger"; import { ErrorObject } from "../../types/ResponseModel"; import { schemaValidation } from "../../services/ValidationService"; import GenerateURL from "./GenerateSignedURLValidationSchema.json" -import { cloudProvider } from "../../services/CloudServices"; import { config } from "../../configs/Config"; import { URLAccess } from "../../types/SampleURLModel"; -import { v4 as uuidv4 } from "uuid"; -import path from "path"; +import { generatePreSignedUrl } from "./helper"; export const apiId = "api.files.generate-url" export const code = "FILES_GENERATE_URL_FAILURE" @@ -48,21 +46,7 @@ const generateSignedURL = async (req: Request, res: Response) => { errCode: "BAD_REQUEST" } as ErrorObject, req, res); } - - const { filesList, updatedFileNames } = transformFileNames(files, access) - logger.info(`Updated file names with path:${updatedFileNames}`) - - const urlExpiry: number = getURLExpiry(access) - const preSignedUrls = await Promise.all(cloudProvider.generateSignedURLs(config.cloud_config.container, updatedFileNames, access, urlExpiry)) - const signedUrlList = _.map(preSignedUrls, list => { - const fileNameWithUid = _.keys(list)[0] - return { - filePath: getFilePath(fileNameWithUid), - fileName: filesList.get(fileNameWithUid), - preSignedUrl: _.values(list)[0] - } - }) - + const signedUrlList = await generatePreSignedUrl(access, files, containerType) logger.info({ apiId, requestBody, msgid, resmsgid, response: signedUrlList, message: `Sample urls generated successfully for files:${files}` }) ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: signedUrlList }) } catch (error: any) { @@ -76,52 +60,6 @@ const generateSignedURL = async (req: Request, res: Response) => { } } -const getFilePath = (file: string) => { - const datasetUploadPath = `${config.presigned_url_configs.service}/user_uploads/${file}`; - const connectorUploadPath = `${config.cloud_config.container_prefix}/${file}`; - - const paths: Record = { - "dataset": datasetUploadPath, - "connector": connectorUploadPath - }; - - return paths[containerType] || datasetUploadPath; -} - -const transformFileNames = (fileList: Array, access: string): Record => { - if (access === URLAccess.Read) { - return transformReadFiles(fileList) - } - return transformWriteFiles(fileList) -} - -const transformReadFiles = (fileNames: Array) => { - const fileMap = new Map(); - const updatedFileNames = _.map(fileNames, file => { - fileMap.set(file, file) - return getFilePath(file) - }) - return { filesList: fileMap, updatedFileNames } -} - -const transformWriteFiles = (fileNames: Array) => { - const fileMap = new Map(); - const updatedFileNames = _.map(fileNames, file => { - const uuid = uuidv4().replace(/-/g, "").slice(0, 6); - const ext = path.extname(file) - const baseName = path.basename(file, ext) - const updatedFileName = `${baseName}_${uuid}${ext}` - fileMap.set(updatedFileName, file) - return getFilePath(updatedFileName) - }) - return { filesList: fileMap, updatedFileNames } - -} - -const getURLExpiry = (access: string) => { - return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry -} - const checkLimitExceed = (files: Array): boolean => { return _.size(files) > maxFiles } diff --git a/api-service/src/v2/controllers/GenerateSignedURL/helper.ts b/api-service/src/v2/controllers/GenerateSignedURL/helper.ts new file mode 100644 index 00000000..c812c56a --- /dev/null +++ b/api-service/src/v2/controllers/GenerateSignedURL/helper.ts @@ -0,0 +1,64 @@ +import { config } from "../../configs/Config"; +import * as _ from "lodash"; +import { cloudProvider } from "../../services/CloudServices"; +import { URLAccess } from "../../types/SampleURLModel"; +import path from "path"; +import { v4 as uuidv4 } from "uuid"; + +export const generatePreSignedUrl = async (access: string, files: any, containerType: string) => { + const { filesList, updatedFileNames } = transformFileNames(files, access, containerType); + const urlExpiry: number = getURLExpiry(access); + const preSignedUrls = await Promise.all(cloudProvider.generateSignedURLs(config.cloud_config.container, updatedFileNames, access, urlExpiry)); + const signedUrlList = _.map(preSignedUrls, list => { + const fileNameWithUid = _.keys(list)[0]; + return { + filePath: getFilePath(fileNameWithUid, containerType), + fileName: filesList.get(fileNameWithUid), + preSignedUrl: _.values(list)[0] + }; + }); + return signedUrlList; +} + +const getFilePath = (file: string, containerType: string) => { + const datasetUploadPath = `${config.presigned_url_configs.service}/user_uploads/${file}`; + const connectorUploadPath = `${config.cloud_config.container_prefix}/${file}`; + const paths: Record = { + "dataset": datasetUploadPath, + "connector": connectorUploadPath + }; + return paths[containerType] || datasetUploadPath; +} + +const transformFileNames = (fileList: Array, access: string, containerType: string): Record => { + if (access === URLAccess.Read) { + return transformReadFiles(fileList, containerType); + } + return transformWriteFiles(fileList, containerType); +} + +const transformReadFiles = (fileNames: Array, containerType: string) => { + const fileMap = new Map(); + const updatedFileNames = _.map(fileNames, file => { + fileMap.set(file, file); + return getFilePath(file, containerType); + }); + return { filesList: fileMap, updatedFileNames }; +} + +const transformWriteFiles = (fileNames: Array, containerType: string) => { + const fileMap = new Map(); + const updatedFileNames = _.map(fileNames, file => { + const uuid = uuidv4().replace(/-/g, "").slice(0, 6); + const ext = path.extname(file); + const baseName = path.basename(file, ext); + const updatedFileName = `${baseName}_${uuid}${ext}`; + fileMap.set(updatedFileName, file); + return getFilePath(updatedFileName, containerType); + }); + return { filesList: fileMap, updatedFileNames }; +} + +const getURLExpiry = (access: string) => { + return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry; +} From 490f3575e61c2af9e56b17cd5a1638b5fd1f02d6 Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 16:37:53 +0530 Subject: [PATCH 25/30] Sanketika-obsrv/issue-tracker: #OBS-150: removed config --- api-service/src/v2/configs/Config.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api-service/src/v2/configs/Config.ts b/api-service/src/v2/configs/Config.ts index 60a048c1..c1958580 100644 --- a/api-service/src/v2/configs/Config.ts +++ b/api-service/src/v2/configs/Config.ts @@ -96,8 +96,7 @@ export const config = { "command_service_config": { "host": process.env.command_service_host || "http://localhost", "port": parseInt(process.env.command_service_port || "8000"), - "path": process.env.command_service_path || "/system/v1/dataset/command", - "connector_registry_path": process.env.connector_registry_path || "/connector/v1/register", + "path": process.env.command_service_path || "/system/v1/dataset/command" }, "flink_job_configs": { "pipeline_merged_job_manager_url": process.env.pipeline_merged_job_manager_url || "http://localhost:8081", From 2ca6e08e4bb1eb047a98dfdc90bdecfea5438ac2 Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 16:38:55 +0530 Subject: [PATCH 26/30] Sanketika-obsrv/issue-tracker: #OBS-150: added connector registry call in connector register connection --- api-service/src/v2/connections/commandServiceConnection.ts | 6 +++++- .../ConnectorRegistryStreamController.ts | 7 ++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/api-service/src/v2/connections/commandServiceConnection.ts b/api-service/src/v2/connections/commandServiceConnection.ts index 94597474..63912747 100644 --- a/api-service/src/v2/connections/commandServiceConnection.ts +++ b/api-service/src/v2/connections/commandServiceConnection.ts @@ -18,4 +18,8 @@ export const executeCommand = async (id: string, command: string) => { } } return commandHttpService.post(commandPath, payload) -} \ No newline at end of file +} + +export const connectorRegistry = async (registryRequestBody: any) => { + return commandHttpService.post("/connector/v1/register", registryRequestBody) +} diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts index dc66d595..b4415727 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts @@ -2,19 +2,16 @@ import { Request, Response } from "express"; import { ResponseHandler } from "../../helpers/ResponseHandler"; import _ from "lodash"; import logger from "../../logger"; -import { config } from "../../configs/Config"; import axios from "axios"; import httpStatus from "http-status"; import busboy from "busboy"; import { PassThrough } from "stream"; import { generatePreSignedUrl } from "../GenerateSignedURL/helper"; +import { connectorRegistry } from "../../connections/commandServiceConnection"; export const apiId = "api.connector.stream.upload"; export const code = "FAILED_TO_REGISTER_CONNECTOR"; -const commandServiceHost = _.get(config, ["command_service_config", "host"]); -const commandServicePort = _.get(config, ["command_service_config", "port"]); -const registryUrl = _.get(config, ["command_service_config", "connector_registry_path"]) let resmsgid: string | any; const connectorRegistryStream = async (req: Request, res: Response) => { @@ -25,7 +22,7 @@ const connectorRegistryStream = async (req: Request, res: Response) => { relative_path: uploadStreamResponse[0] } logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` }) - const registryResponse = await axios.post(`${commandServiceHost}:${commandServicePort}${registryUrl}`, registryRequestBody); + const registryResponse = await connectorRegistry(registryRequestBody); logger.info({ apiId, resmsgid, message: `Connector registered successfully` }) ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } }) } catch (error: any) { From e5e52ae76684ec910fb946ab402ecd2a7e5ddbb7 Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 16:42:22 +0530 Subject: [PATCH 27/30] Sanketika-obsrv/issue-tracker: #OBS-150: removed lodash map --- .../src/v2/controllers/GenerateSignedURL/helper.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/api-service/src/v2/controllers/GenerateSignedURL/helper.ts b/api-service/src/v2/controllers/GenerateSignedURL/helper.ts index c812c56a..fc42e2e1 100644 --- a/api-service/src/v2/controllers/GenerateSignedURL/helper.ts +++ b/api-service/src/v2/controllers/GenerateSignedURL/helper.ts @@ -35,20 +35,20 @@ const transformFileNames = (fileList: Array, access: string, conta return transformReadFiles(fileList, containerType); } return transformWriteFiles(fileList, containerType); -} +}; const transformReadFiles = (fileNames: Array, containerType: string) => { const fileMap = new Map(); - const updatedFileNames = _.map(fileNames, file => { + const updatedFileNames = fileNames.map(file => { fileMap.set(file, file); return getFilePath(file, containerType); }); return { filesList: fileMap, updatedFileNames }; -} +}; const transformWriteFiles = (fileNames: Array, containerType: string) => { const fileMap = new Map(); - const updatedFileNames = _.map(fileNames, file => { + const updatedFileNames = fileNames.map(file => { const uuid = uuidv4().replace(/-/g, "").slice(0, 6); const ext = path.extname(file); const baseName = path.basename(file, ext); @@ -57,7 +57,7 @@ const transformWriteFiles = (fileNames: Array, containerType: stri return getFilePath(updatedFileName, containerType); }); return { filesList: fileMap, updatedFileNames }; -} +}; const getURLExpiry = (access: string) => { return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry; From dd4355a303aecb81df1bcef4a2a274532d0e9501 Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 18:37:05 +0530 Subject: [PATCH 28/30] Sanketika-obsrv/issue-tracker: #OBS-150: changed route name and added connector register service in command api connection --- .../connections/commandServiceConnection.ts | 2 +- .../ConnectorRegisterController.ts} | 22 +++++++++---------- api-service/src/v2/routes/Router.ts | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) rename api-service/src/v2/controllers/{ConnectorRegistryStream/ConnectorRegistryStreamController.ts => ConnectorRegister/ConnectorRegisterController.ts} (88%) diff --git a/api-service/src/v2/connections/commandServiceConnection.ts b/api-service/src/v2/connections/commandServiceConnection.ts index 63912747..3f1582fa 100644 --- a/api-service/src/v2/connections/commandServiceConnection.ts +++ b/api-service/src/v2/connections/commandServiceConnection.ts @@ -20,6 +20,6 @@ export const executeCommand = async (id: string, command: string) => { return commandHttpService.post(commandPath, payload) } -export const connectorRegistry = async (registryRequestBody: any) => { +export const registerConnector = async (registryRequestBody: any) => { return commandHttpService.post("/connector/v1/register", registryRequestBody) } diff --git a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts b/api-service/src/v2/controllers/ConnectorRegister/ConnectorRegisterController.ts similarity index 88% rename from api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts rename to api-service/src/v2/controllers/ConnectorRegister/ConnectorRegisterController.ts index b4415727..ca21f9d7 100644 --- a/api-service/src/v2/controllers/ConnectorRegistryStream/ConnectorRegistryStreamController.ts +++ b/api-service/src/v2/controllers/ConnectorRegister/ConnectorRegisterController.ts @@ -7,14 +7,14 @@ import httpStatus from "http-status"; import busboy from "busboy"; import { PassThrough } from "stream"; import { generatePreSignedUrl } from "../GenerateSignedURL/helper"; -import { connectorRegistry } from "../../connections/commandServiceConnection"; +import { registerConnector } from "../../connections/commandServiceConnection"; -export const apiId = "api.connector.stream.upload"; +export const apiId = "api.connector.register"; export const code = "FAILED_TO_REGISTER_CONNECTOR"; let resmsgid: string | any; -const connectorRegistryStream = async (req: Request, res: Response) => { +const connectorRegisterController = async (req: Request, res: Response) => { resmsgid = _.get(res, "resmsgid"); try { const uploadStreamResponse: any = await uploadStream(req); @@ -22,7 +22,7 @@ const connectorRegistryStream = async (req: Request, res: Response) => { relative_path: uploadStreamResponse[0] } logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` }) - const registryResponse = await connectorRegistry(registryRequestBody); + const registryResponse = await registerConnector(registryRequestBody); logger.info({ apiId, resmsgid, message: `Connector registered successfully` }) ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } }) } catch (error: any) { @@ -40,14 +40,14 @@ const connectorRegistryStream = async (req: Request, res: Response) => { const uploadStream = async (req: Request) => { return new Promise((resolve, reject) => { const filePromises: Promise[] = []; - const bb = busboy({ headers: req.headers }); + const busboyClient = busboy({ headers: req.headers }); const relative_path: any[] = []; let fileCount = 0; - bb.on("file", async (name: any, file: any, info: any) => { + busboyClient.on("file", async (name: any, file: any, info: any) => { if (fileCount > 0) { // If more than one file is detected, reject the request - bb.emit("error", reject({ + busboyClient.emit("error", reject({ code: "FAILED_TO_UPLOAD", message: "Uploading multiple files are not allowed", statusCode: 400, @@ -85,7 +85,7 @@ const uploadStream = async (req: Request) => { }; filePromises.push(processFile()); }); - bb.on("close", async () => { + busboyClient.on("close", async () => { try { await Promise.all(filePromises); resolve(relative_path); @@ -99,8 +99,8 @@ const uploadStream = async (req: Request) => { }); } }); - bb.on("error", reject); - req.pipe(bb); + busboyClient.on("error", reject); + req.pipe(busboyClient); }) } @@ -118,4 +118,4 @@ const extractFileNameFromPath = (filePath: string): string[] => { return filePath.match(regex) || []; }; -export default connectorRegistryStream; +export default connectorRegisterController; diff --git a/api-service/src/v2/routes/Router.ts b/api-service/src/v2/routes/Router.ts index e5e47105..de72d93c 100644 --- a/api-service/src/v2/routes/Router.ts +++ b/api-service/src/v2/routes/Router.ts @@ -20,7 +20,7 @@ import { eventValidation } from "../controllers/EventValidation/EventValidation" import GenerateSignedURL from "../controllers/GenerateSignedURL/GenerateSignedURL"; import { sqlQuery } from "../controllers/QueryWrapper/SqlQueryWrapper"; import DatasetStatusTansition from "../controllers/DatasetStatusTransition/DatasetStatusTransition"; -import connectorRegistryStream from "../controllers/ConnectorRegistryStream/ConnectorRegistryStreamController"; +import connectorRegisterController from "../controllers/ConnectorRegister/ConnectorRegisterController"; import datasetHealth from "../controllers/DatasetHealth/DatasetHealth"; export const router = express.Router(); @@ -41,7 +41,7 @@ router.post("/schema/validate", setDataToRequestObject("api.schema.validator"), router.post("/template/query/:templateId", setDataToRequestObject("api.query.template.query"), queryTemplate); router.post("/files/generate-url", setDataToRequestObject("api.files.generate-url"), onRequest({ entity: Entity.Management }), GenerateSignedURL); router.post("/datasets/status-transition", setDataToRequestObject("api.datasets.status-transition"), onRequest({ entity: Entity.Management }), DatasetStatusTansition); -router.post("/connector/stream/upload", setDataToRequestObject("api.connector.stream.upload"), onRequest({ entity: Entity.Management }), connectorRegistryStream); +router.post("/connector/register", setDataToRequestObject("api.connector.register"), onRequest({ entity: Entity.Management }), connectorRegisterController); router.post("/dataset/health", setDataToRequestObject("api.dataset.health"), onRequest({ entity: Entity.Management }), datasetHealth); //Wrapper Service From a9d0be318a18898fac729c5ee4812382c9520f0b Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 18:53:17 +0530 Subject: [PATCH 29/30] Sanketika-obsrv/issue-tracker: #OBS-150: changed route name and added connector register service in command api connection --- .../ConnectorRegister/ConnectorRegisterController.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api-service/src/v2/controllers/ConnectorRegister/ConnectorRegisterController.ts b/api-service/src/v2/controllers/ConnectorRegister/ConnectorRegisterController.ts index ca21f9d7..3a81f771 100644 --- a/api-service/src/v2/controllers/ConnectorRegister/ConnectorRegisterController.ts +++ b/api-service/src/v2/controllers/ConnectorRegister/ConnectorRegisterController.ts @@ -18,11 +18,11 @@ const connectorRegisterController = async (req: Request, res: Response) => { resmsgid = _.get(res, "resmsgid"); try { const uploadStreamResponse: any = await uploadStream(req); - const registryRequestBody = { + const payload = { relative_path: uploadStreamResponse[0] } logger.info({ apiId, resmsgid, message: `File uploaded to cloud provider successfully` }) - const registryResponse = await registerConnector(registryRequestBody); + const registryResponse = await registerConnector(payload); logger.info({ apiId, resmsgid, message: `Connector registered successfully` }) ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: registryResponse?.data?.message } }) } catch (error: any) { From bd485f9b852f882d45c84c5457788b08619dbde5 Mon Sep 17 00:00:00 2001 From: yashashk Date: Mon, 1 Jul 2024 19:01:26 +0530 Subject: [PATCH 30/30] Sanketika-obsrv/issue-tracker: #OBS-150: changed route name and added connector register service in command api connection --- api-service/src/v2/connections/commandServiceConnection.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api-service/src/v2/connections/commandServiceConnection.ts b/api-service/src/v2/connections/commandServiceConnection.ts index 3f1582fa..541bded6 100644 --- a/api-service/src/v2/connections/commandServiceConnection.ts +++ b/api-service/src/v2/connections/commandServiceConnection.ts @@ -20,6 +20,6 @@ export const executeCommand = async (id: string, command: string) => { return commandHttpService.post(commandPath, payload) } -export const registerConnector = async (registryRequestBody: any) => { - return commandHttpService.post("/connector/v1/register", registryRequestBody) +export const registerConnector = async (requestBody: any) => { + return commandHttpService.post("/connector/v1/register", requestBody) }