Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanketika-Obsrv/issue-tracker#OBS-143: feat: Transaction management and swagger doc for v2 apis #194

Merged
merged 6 commits into from
Jun 24, 2024
2 changes: 1 addition & 1 deletion api-service/src/v2/connections/databaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { database, host, password, port, username } = connectionConfig.postgres
export const sequelize = new Sequelize({
database, password, username: username, dialect: "postgres", host, port: +port, pool: {
max: 5,
min: 0,
min: 2,
acquire: 30000,
idle: 10000
}
Expand Down
23 changes: 12 additions & 11 deletions api-service/src/v2/controllers/DatasetCreate/DatasetCreate.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Request, Response } from "express";
import logger from "../../logger";
import { generateDataSource, getDefaultValue, getDraftDataset, getDuplicateConfigs, getDuplicateDenormKey, setReqDatasetId } from "../../services/DatasetService";
import { getDefaultValue, getDraftDataset, getDuplicateConfigs, getDuplicateDenormKey, setReqDatasetId } from "../../services/DatasetService";
import _ from "lodash";
import DatasetCreate from "./DatasetCreateValidationSchema.json";
import { schemaValidation } from "../../services/ValidationService";
import { DatasetDraft } from "../../models/DatasetDraft";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import httpStatus from "http-status";
import { sequelize } from "../../connections/databaseConnection";
import { ErrorObject } from "../../types/ResponseModel";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";

export const apiId = "api.datasets.create"
Expand All @@ -17,6 +17,7 @@ const datasetCreate = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
let transact;
try {
const datasetId = _.get(req, ["body", "request", "dataset_id"])
setReqDatasetId(req, datasetId)
Expand Down Expand Up @@ -61,24 +62,24 @@ const datasetCreate = async (req: Request, res: Response) => {

const datasetPayload: any = await getDefaultValue(datasetBody);
const data = { ...datasetPayload, version_key: Date.now().toString() }
const transformationConfig: any = getTransformationConfig({ transformationPayload: _.get(datasetBody, "transformations_config"), datasetId: _.get(datasetPayload, "id") })

const response = await DatasetDraft.create(data)
transact = await sequelize.transaction()

const { dataset_config, denorm_config, transformation_config, data_schema, id, dataset_id } = data
const datasourcePayload = await generateDataSource({ indexCol: _.get(dataset_config, ["timestamp_key"]), data_schema, id, dataset_id, denorm_config, transformation_config, action:"create" })
await DatasourceDraft.create(datasourcePayload)
logger.info({ apiId, message: `Datasource created successsfully for the dataset:${id}` })
const response = await DatasetDraft.create(data, { transaction: transact })
logger.info({ apiId, message: `Dataset created successsfully for the dataset:${_.get(data, "id")}` })

const transformationConfig: any = getTransformationConfig({ transformationPayload: _.get(datasetBody, "transformations_config"), datasetId: _.get(datasetPayload, "id") })
if (!_.isEmpty(transformationConfig)) {
await DatasetTransformationsDraft.bulkCreate(transformationConfig);
logger.info({ apiId, message: `Dataset transformations records created successsfully for dataset:${id}` })
await DatasetTransformationsDraft.bulkCreate(transformationConfig, { transaction: transact });
logger.info({ apiId, message: `Dataset transformations records created successsfully for dataset:${_.get(data, "id")}` })
}


await transact.commit()
const responseData = { id: _.get(response, ["dataValues", "id"]) || "", version_key: data.version_key }
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset Created Successfully with id:${_.get(response, ["dataValues", "id"])}`, response: responseData })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: responseData });
} catch (error: any) {
transact && await transact.rollback()
const code = _.get(error, "code") || "DATASET_CREATION_FAILURE"
logger.error({ ...error, apiId, code, msgid, requestBody, resmsgid })
let errorMessage = error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { Datasource } from "../../models/Datasource";
import { DatasetTransformations } from "../../models/Transformation";
import { executeCommand } from "../../connections/commandServiceConnection";
import { druidHttpService } from "../../connections/druidConnection";
import { sequelize } from "../../connections/databaseConnection";

export const apiId = "api.datasets.status-transition";
export const errorCode = "DATASET_STATUS_TRANSITION_FAILURE"
Expand All @@ -41,64 +42,68 @@ const datasetStatusTransition = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
try {
const { dataset_id, status } = _.get(requestBody, "request");
setReqDatasetId(req, dataset_id)

const isRequestValid: Record<string, any> = schemaValidation(req.body, StatusTransitionSchema)
if (!isRequestValid.isValid) {
const code = "DATASET_STATUS_TRANSITION_INVALID_INPUT"
logger.error({ code, apiId, msgid, requestBody, resmsgid, message: isRequestValid.message })
return ResponseHandler.errorResponse({
code,
message: isRequestValid.message,
statusCode: 400,
errCode: "BAD_REQUEST"
} as ErrorObject, req, res);
}
let transact;
try {
const { dataset_id, status } = _.get(requestBody, "request");
setReqDatasetId(req, dataset_id)

const datasetRecord = await fetchDataset({ status, dataset_id })
if (_.isEmpty(datasetRecord)) {
const code = "DATASET_NOT_FOUND"
const errorMessage = getErrorMessage(status, code)
logger.error({ code, apiId, msgid, requestBody, resmsgid, message: `${errorMessage} for dataset:${dataset_id}` })
return ResponseHandler.errorResponse({
code,
message: errorMessage,
statusCode: 404,
errCode: "NOT_FOUND"
} as ErrorObject, req, res);
}
const isRequestValid: Record<string, any> = schemaValidation(req.body, StatusTransitionSchema)
if (!isRequestValid.isValid) {
const code = "DATASET_STATUS_TRANSITION_INVALID_INPUT"
logger.error({ code, apiId, msgid, requestBody, resmsgid, message: isRequestValid.message })
return ResponseHandler.errorResponse({
code,
message: isRequestValid.message,
statusCode: 400,
errCode: "BAD_REQUEST"
} as ErrorObject, req, res);
}

const allowedStatus = _.get(allowedTransitions, status)
const datasetStatus = _.get(datasetRecord, "status")
if (!_.includes(allowedStatus, datasetStatus)) {
const code = `DATASET_${_.toUpper(status)}_FAILURE`
const errorMessage = getErrorMessage(status, "STATUS_INVALID")
logger.error({ code, apiId, msgid, requestBody, resmsgid, message: `${errorMessage} for dataset:${dataset_id} status:${datasetStatus} with status transition to ${status}` })
return ResponseHandler.errorResponse({
code,
message: errorMessage,
statusCode: 400,
errCode: "BAD_REQUEST"
} as ErrorObject, req, res);
}
const datasetRecord = await fetchDataset({ status, dataset_id })
if (_.isEmpty(datasetRecord)) {
const code = "DATASET_NOT_FOUND"
const errorMessage = getErrorMessage(status, code)
logger.error({ code, apiId, msgid, requestBody, resmsgid, message: `${errorMessage} for dataset:${dataset_id}` })
return ResponseHandler.errorResponse({
code,
message: errorMessage,
statusCode: 404,
errCode: "NOT_FOUND"
} as ErrorObject, req, res);
}

const transitionCommands = _.get(statusTransitionCommands, status)
await executeTransition({ transitionCommands, dataset: datasetRecord })

logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset status transition to ${status} successful with id:${dataset_id}` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: `Dataset status transition to ${status} successful`, dataset_id } });
} catch (error: any) {
const code = _.get(error, "code") || errorCode
logger.error(error, apiId, msgid, code, requestBody, resmsgid)
let errorMessage = error;
const statusCode = _.get(error, "statusCode")
if (!statusCode || statusCode == 500) {
errorMessage = { code, message: "Failed to perform status transition on datasets" }
}
ResponseHandler.errorResponse(errorMessage, req, res);
const allowedStatus = _.get(allowedTransitions, status)
const datasetStatus = _.get(datasetRecord, "status")
if (!_.includes(allowedStatus, datasetStatus)) {
const code = `DATASET_${_.toUpper(status)}_FAILURE`
const errorMessage = getErrorMessage(status, "STATUS_INVALID")
logger.error({ code, apiId, msgid, requestBody, resmsgid, message: `${errorMessage} for dataset:${dataset_id} status:${datasetStatus} with status transition to ${status}` })
return ResponseHandler.errorResponse({
code,
message: errorMessage,
statusCode: 400,
errCode: "BAD_REQUEST"
} as ErrorObject, req, res);
}

const transitionCommands = _.get(statusTransitionCommands, status)
transact = await sequelize.transaction()
await executeTransition({ transitionCommands, dataset: datasetRecord, transact })

await transact.commit();
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset status transition to ${status} successful with id:${dataset_id}` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { message: `Dataset status transition to ${status} successful`, dataset_id } });
} catch (error: any) {
transact && await transact.rollback();
const code = _.get(error, "code") || errorCode
logger.error(error, apiId, msgid, code, requestBody, resmsgid)
let errorMessage = error;
const statusCode = _.get(error, "statusCode")
if (!statusCode || statusCode == 500) {
errorMessage = { code, message: "Failed to perform status transition on datasets" }
}
ResponseHandler.errorResponse(errorMessage, req, res);
}
}

const fetchDataset = async (configs: Record<string, any>) => {
Expand All @@ -115,10 +120,10 @@ const fetchDataset = async (configs: Record<string, any>) => {
}

const executeTransition = async (configs: Record<string, any>) => {
const { transitionCommands, dataset } = configs
const { transitionCommands, dataset, transact } = configs
const transitionPromises = _.map(transitionCommands, async command => {
const commandWorkflow = _.get(commandExecutors, command)
return commandWorkflow({ dataset })
return commandWorkflow({ dataset, transact })
})
await Promise.all(transitionPromises)
}
Expand All @@ -140,17 +145,17 @@ const validateDataset = async (configs: Record<string, any>) => {

//DELETE_DRAFT_DATASETS
const deleteDataset = async (configs: Record<string, any>) => {
const { dataset } = configs
const { dataset, transact } = configs
const { id } = dataset
await deleteDraftRecords({ dataset_id: id })
await deleteDraftRecords({ dataset_id: id, transact })
}

const deleteDraftRecords = async (config: Record<string, any>) => {
const { dataset_id } = config;
await DatasetTransformationsDraft.destroy({ where: { dataset_id } })
await DatasetSourceConfigDraft.destroy({ where: { dataset_id } })
await DatasourceDraft.destroy({ where: { dataset_id } })
await DatasetDraft.destroy({ where: { id: dataset_id } })
const { dataset_id, transact } = config;
await DatasetTransformationsDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasetSourceConfigDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasourceDraft.destroy({ where: { dataset_id }, transaction: transact })
await DatasetDraft.destroy({ where: { id: dataset_id }, transaction: transact })
}

//PUBLISH_DATASET
Expand Down Expand Up @@ -185,12 +190,12 @@ const checkDatasetDenorm = async (payload: Record<string, any>) => {

//SET_DATASET_TO_RETIRE
const setDatasetRetired = async (config: Record<string, any>) => {
const { dataset } = config;
const { dataset, transact } = config;
const { dataset_id } = dataset
await Dataset.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await DatasetSourceConfig.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await Datasource.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await DatasetTransformations.update({ status: DatasetStatus.Retired }, { where: { dataset_id } })
await Dataset.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await DatasetSourceConfig.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await Datasource.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
await DatasetTransformations.update({ status: DatasetStatus.Retired }, { where: { dataset_id }, transaction: transact })
}

//DELETE_SUPERVISORS
Expand Down
28 changes: 12 additions & 16 deletions api-service/src/v2/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import { DatasetDraft } from "../../models/DatasetDraft";
import logger from "../../logger";
import { defaultDatasetConfig } from "../../configs/DatasetConfigDefault";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";
import { generateDataSource, getDefaultValue, getDraftTransformations, getDuplicateConfigs, setReqDatasetId } from "../../services/DatasetService";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { getDefaultValue, getDraftTransformations, getDuplicateConfigs, setReqDatasetId } from "../../services/DatasetService";
import { ingestionConfig } from "../../configs/IngestionConfig";
import { sequelize } from "../../connections/databaseConnection";

export const apiId = "api.datasets.update";
export const invalidInputErrCode = "DATASET_UPDATE_INPUT_INVALID"
Expand All @@ -22,6 +22,7 @@ const datasetUpdate = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
let transact;
try {
const datasetId = _.get(req, ["body", "request", "dataset_id"])
setReqDatasetId(req, datasetId)
Expand Down Expand Up @@ -101,22 +102,17 @@ const datasetUpdate = async (req: Request, res: Response) => {
const updatedPayload = await getDefaultValue(datasetPayload)
logger.debug({ datasetPayload })
const transformationConfigs = _.get(datasetBody, "transformation_config")
await manageTransformations(transformationConfigs, dataset_id);

const { data_schema } = datasetBody
if (data_schema) {
const { transformation_config } = datasetBody
const { dataset_config, id, dataset_id, denorm_config } = updatedPayload
const datasourcePayload = await generateDataSource({ indexCol: _.get(dataset_config, ["timestamp_key"]), transformation_config, denorm_config, data_schema, id, dataset_id, action: "edit" })
await DatasourceDraft.update(datasourcePayload, { where: { id: _.get(datasourcePayload, "id") }})
}

await DatasetDraft.update(updatedPayload, { where: { id: dataset_id }})
transact = await sequelize.transaction()
await manageTransformations(transformationConfigs, dataset_id, transact);
await DatasetDraft.update(updatedPayload, { where: { id: dataset_id }, transaction: transact })

await transact.commit();
const responsedata = { message: "Dataset is updated successfully", id: dataset_id, version_key: _.get(datasetPayload, "version_key") }
logger.info({ apiId, msgid, requestBody, resmsgid, message: `Dataset updated successfully with id:${dataset_id}`, response: responsedata })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: responsedata });
} catch (error: any) {
transact && await transact.rollback();
const code = _.get(error, "code") || errorCode
logger.error({ ...error, apiId, code, msgid, requestBody, resmsgid })
let errorMessage = error;
Expand All @@ -128,22 +124,22 @@ const datasetUpdate = async (req: Request, res: Response) => {
}
}

const manageTransformations = async (transformations: Record<string, any>, datasetId: string) => {
const manageTransformations = async (transformations: Record<string, any>, datasetId: string, transact: any) => {
if (transformations) {
const transformationConfigs = await getTransformationConfigs(transformations, datasetId);
if (transformationConfigs) {
const { addTransformation, updateTransformation, deleteTransformation } = transformationConfigs;

if (!_.isEmpty(addTransformation)) {
await DatasetTransformationsDraft.bulkCreate(addTransformation);
await DatasetTransformationsDraft.bulkCreate(addTransformation, { transaction: transact });
}

if (!_.isEmpty(updateTransformation)) {
await Promise.all(updateTransformation.map((record: any) => DatasetTransformationsDraft.update(record, { where: { id: record.id }})));
await Promise.all(updateTransformation.map((record: any) => DatasetTransformationsDraft.update(record, { where: { id: record.id }, transaction: transact })));
}

if (!_.isEmpty(deleteTransformation)) {
await DatasetTransformationsDraft.destroy({ where: { id: deleteTransformation }});
await DatasetTransformationsDraft.destroy({ where: { id: deleteTransformation }, transaction: transact });
}
}
}
Expand Down
Loading