Skip to content

Commit

Permalink
Merge pull request #197 from Sanketika-Obsrv/#234api-integration
Browse files Browse the repository at this point in the history
Sanketika-Obsrv/issue-tracker#OBS-151: feat: Generate ingestion spec before publish dataset
  • Loading branch information
HarishGangula authored Jun 26, 2024
2 parents 163cd33 + b30bcc9 commit 5adbe2c
Show file tree
Hide file tree
Showing 17 changed files with 517 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,13 @@
"denorm_out_field": {
"type": "string",
"minLength": 1
},
"dataset_id": {
"type": "string",
"minLength": 1
}
},
"required": ["denorm_key", "denorm_out_field"],
"required": ["denorm_key", "denorm_out_field", "dataset_id"],
"additionalProperties": false
}
}
Expand Down Expand Up @@ -171,7 +175,29 @@
"minLength": 1
},
"transformation_function": {
"type": "object"
"type": "object",
"properties": {
"type": {
"type": "string",
"minLength": 1
},
"expr": {
"type": "string",
"minLength": 1
},
"condition": {
"oneOf": [
{
"type": "string"
},
{
"const": null
}
]
}
},
"additionalProperties": false,
"required": ["type", "expr"]
},
"mode": {
"type": "string",
Expand All @@ -188,16 +214,15 @@
"required": [
"field_key",
"transformation_function",
"mode",
"metadata"
"mode"
]
}
},
"tags": {
"type": "array",
"items": {
"type": "string",
"minLength":1
"minLength": 1
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Request, Response } from "express";
import _ from "lodash";
import logger from "../../logger";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import { getDataset, getDraftDataset, setReqDatasetId } from "../../services/DatasetService";
import { generateDataSource, getDataset, getDraftDataset, setReqDatasetId } from "../../services/DatasetService";
import { ErrorObject } from "../../types/ResponseModel";
import { schemaValidation } from "../../services/ValidationService";
import StatusTransitionSchema from "./RequestValidationSchema.json";
Expand Down Expand Up @@ -34,7 +34,7 @@ const allowedTransitions = {
const statusTransitionCommands = {
Delete: ["DELETE_DRAFT_DATASETS"],
ReadyToPublish: ["VALIDATE_DATASET_CONFIGS"],
Live: ["PUBLISH_DATASET"],
Live: ["GENERATE_INGESTION_SPEC", "PUBLISH_DATASET"],
Retire: ["CHECK_DATASET_IS_DENORM", "SET_DATASET_TO_RETIRE", "DELETE_SUPERVISORS", "RESTART_PIPELINE"]
}

Expand Down Expand Up @@ -121,11 +121,10 @@ const fetchDataset = async (configs: Record<string, any>) => {

const executeTransition = async (configs: Record<string, any>) => {
const { transitionCommands, dataset, transact } = configs
const transitionPromises = _.map(transitionCommands, async command => {
const commandWorkflow = _.get(commandExecutors, command)
return commandWorkflow({ dataset, transact })
})
await Promise.all(transitionPromises)
for (const command of transitionCommands) {
const commandWorkflow = _.get(commandExecutors, command);
await commandWorkflow({ dataset, transact });
}
}

//VALIDATE_DATASET_CONFIGS
Expand Down Expand Up @@ -158,6 +157,13 @@ const deleteDraftRecords = async (config: Record<string, any>) => {
await DatasetDraft.destroy({ where: { id: dataset_id }, transaction: transact })
}

//GENERATE_INGESTION_SPEC
const generateIngestionSpec = async (config: Record<string, any>) => {
const { dataset } = config;
const dataSource = await generateDataSource(dataset);
return DatasourceDraft.upsert(dataSource)
}

//PUBLISH_DATASET
const publishDataset = async (configs: Record<string, any>) => {
const { dataset } = configs
Expand Down Expand Up @@ -225,6 +231,7 @@ const restartPipeline = async (config: Record<string, any>) => {
const commandExecutors = {
DELETE_DRAFT_DATASETS: deleteDataset,
PUBLISH_DATASET: publishDataset,
GENERATE_INGESTION_SPEC: generateIngestionSpec,
CHECK_DATASET_IS_DENORM: checkDatasetDenorm,
SET_DATASET_TO_RETIRE: setDatasetRetired,
DELETE_SUPERVISORS: deleteSupervisors,
Expand Down
4 changes: 2 additions & 2 deletions api-service/src/v2/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ const checkDatasetExists = async (dataset_id: string, version_key: string): Prom
if (datasetExists) {
const validVersionKey = _.get(datasetExists, "version_key")
const apiVersion = _.get(datasetExists, "api_version")
if (validVersionKey !== version_key && apiVersion) {
if (validVersionKey !== version_key && apiVersion === "v2") {
return { isDatasetExists: true, datasetStatus: datasetExists.status, invalidVersionKey: true, validVersionKey }
}
return { isDatasetExists: true, datasetStatus: datasetExists.status }
Expand Down Expand Up @@ -344,7 +344,7 @@ const setDenormConfigs = (newDenormPayload: Record<string, any>, datasetDenormCo
if (_.isEmpty(denorm_fields)) {
return { denorm_fields }
}

const getDenormPayload = (action: string) => {
return _.compact(_.flatten(_.map(denorm_fields, payload => {
if (payload.action == action) return payload.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,29 @@
"minLength": 1
},
"transformation_function": {
"type": "object"
"type": "object",
"properties": {
"type": {
"type": "string",
"minLength": 1
},
"expr": {
"type": "string",
"minLength": 1
},
"condition": {
"oneOf": [
{
"type": "string"
},
{
"const": null
}
]
}
},
"additionalProperties": false,
"required": ["type", "expr"]
},
"mode": {
"type": "string",
Expand Down Expand Up @@ -293,9 +315,14 @@
"denorm_out_field": {
"type": "string",
"minLength": 1
},
"dataset_id": {
"type": "string",
"minLength": 1
}
},
"required": ["denorm_out_field"]
"required": ["denorm_out_field"],
"additionalProperties": false
},
"action": {
"type": "string",
Expand All @@ -314,7 +341,7 @@
"then": {
"properties": {
"values": {
"required": ["denorm_key"]
"required": ["denorm_key", "dataset_id"]
}
}
}
Expand Down
26 changes: 25 additions & 1 deletion api-service/src/v2/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { getUpdatedSchema } from "./DatasourceService";
import { DatasetType } from "../types/DatasetModels";
import { defaultDatasetConfig, defaultMasterConfig } from "../configs/DatasetConfigDefault";
import { query } from "../connections/databaseConnection";
import { ErrorObject } from "../types/ResponseModel";

export const getDataset = async (datasetId: string, raw = false): Promise<any> => {
const dataset = await Dataset.findOne({
Expand Down Expand Up @@ -76,7 +77,8 @@ const getDataSource = (ingestionPayload: Record<string, any>) => {

const getDatasetDefaults = async (payload: Record<string, any>): Promise<Record<string, any>> => {
const datasetPayload = mergeDatasetConfigs(defaultDatasetConfig, payload)
return datasetPayload
const denormPayload = await updateDenormFields(_.get(datasetPayload, "denorm_config"))
return { ...datasetPayload, denorm_config: denormPayload }
}

const setRedisDBConfig = async (datasetConfig: Record<string, any>): Promise<Record<string, any>> => {
Expand Down Expand Up @@ -115,4 +117,26 @@ const mergeDatasetConfigs = (defaultConfig: Record<string, any>, requestPayload:
const defaults = _.cloneDeep(defaultConfig)
const datasetConfigs = _.merge(defaults, modifyPayload)
return datasetConfigs
}

const updateDenormFields = async (denormConfigs: Record<string, any>) => {
const denormFields = _.get(denormConfigs, "denorm_fields")
if (_.isEmpty(denormFields)) {
return denormConfigs
}
const masterDatasets = await Dataset.findAll({ where: { type: DatasetType.MasterDataset }, raw: true });
const updatedFields = _.map(denormFields, fields => {
const master = _.find(masterDatasets, dataset => _.get(dataset, ["dataset_id"]) === fields.dataset_id)
if (_.isEmpty(master)) {
throw {
code: "DATASET_DENORM_NOT_FOUND",
message: "Denorm Master dataset not found",
statusCode: 404,
errCode: "NOT_FOUND"
} as ErrorObject
}
const redis_db = _.get(master, ["dataset_config", "redis_db"])
return { ...fields, redis_db }
})
return { ...denormConfigs, denorm_fields: updatedFields }
}
33 changes: 6 additions & 27 deletions api-service/src/v2/services/DatasourceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,36 +63,15 @@ export const getDatasource = async (datasetId: string) => {
}

export const getUpdatedSchema = async (configs: Record<string, any>) => {
const { id, transformation_config, denorm_config, data_schema, action, indexCol = ingestionConfig.indexCol["Event Arrival Time"] } = configs
const existingTransformations = await DatasetTransformationsDraft.findAll({ where: { dataset_id: id }, raw: true })
let resultantTransformations: any[] = []
if (action === "edit") {
const toDeleteTransformations = _.compact(_.map(transformation_config, config => {
if (_.includes(["update", "remove"], _.get(config, "action"))) {
return _.get(config, ["value", "field_key"])
}
}))
const updatedExistingTransformations = _.compact(_.map(existingTransformations, configs => {
if (!_.includes(toDeleteTransformations, _.get(configs, "field_key"))) {
return configs
}
})) || []
const newTransformations = _.compact(_.map(transformation_config, config => {
if (_.includes(["update", "add"], _.get(config, "action"))) {
return config
}
})) || []
resultantTransformations = [...updatedExistingTransformations, ...newTransformations]
}
if (action === "create") {
resultantTransformations = transformation_config || []
}
const { id, denorm_config, data_schema, dataset_config } = configs
const indexCol = _.get(dataset_config, "timestamp_key")
const datasetTransformations = await DatasetTransformationsDraft.findAll({ where: { dataset_id: id }, raw: true })
let denormFields = _.get(denorm_config, "denorm_fields")
let updatedColumns = flattenSchema(data_schema)
const transformedFields = _.filter(resultantTransformations, field => _.get(field, ["metadata", "section"]) === "transformation")
let additionalFields = _.filter(resultantTransformations, field => _.get(field, ["metadata", "section"]) === "additionalFields")
const transformedFields = _.filter(datasetTransformations, field => _.get(field, ["metadata", "section"]) === "transformation")
let additionalFields = _.filter(datasetTransformations, field => _.get(field, ["metadata", "section"]) === "additionalFields")
updatedColumns = _.map(updatedColumns, (item) => {
const transformedData = _.find(transformedFields, { field_key: item.column });
const transformedData: any = _.find(transformedFields, { field_key: item.column });
if (transformedData) {
const data = _.get(transformedData, "metadata")
return {
Expand Down
23 changes: 8 additions & 15 deletions api-service/src/v2/services/IngestionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ const connectorInstanceSpecObj = {
}

export const generateIngestionSpec = (payload: Record<string, any>) => {
const { indexCol = defaultIndexCol, data_schema, id, dataset_id } = payload
const { data_schema, id, dataset_id, dataset_config } = payload
const indexCol = _.get(dataset_config, "timestamp_key") || defaultIndexCol
const isValidTimestamp = checkTimestampCol({ indexCol, data_schema })
if (!isValidTimestamp) {
throw {
Expand All @@ -44,7 +45,7 @@ export const generateIngestionSpec = (payload: Record<string, any>) => {
} as ErrorObject
}
const simplifiedSpec = generateExpression(_.get(data_schema, "properties"), indexCol);
const generatedSpec = process(simplifiedSpec, indexCol)
const generatedSpec = process(simplifiedSpec)
const ingestionTemplate = generateIngestionTemplate({ generatedSpec, id, indexCol, dataset_id, type: "druid" })
return ingestionTemplate
}
Expand All @@ -69,26 +70,18 @@ const checkTimestampCol = (schema: Record<string, any>) => {
return true
}

const process = (spec: Map<string, any>, indexCol: string): IngestionSpecModel => {
const process = (spec: Map<string, any>): IngestionSpecModel => {
const colValues = Array.from(spec.values())
const dimensions = filterDimensionCols(colValues)
return <IngestionSpecModel>{
"dimensions": getObjByKey(dimensions, "dimensions"),
"metrics": filterMetricsCols(spec),
"flattenSpec": filterFlattenSpec(colValues, indexCol)
"flattenSpec": filterFlattenSpec(colValues)
}
}

const filterFlattenSpec = (column: Record<string, any>, indexCol: string) => {
let flattenSpec = getObjByKey(column, "flattenSpec")
if (indexCol === defaultIndexCol) {
const indexColDefaultSpec = {
"expr": ingestionConfig.syncts_path,
"name": ingestionConfig.indexCol["Event Arrival Time"],
"type": "path"
}
flattenSpec = _.concat(flattenSpec, indexColDefaultSpec)
}
const filterFlattenSpec = (column: Record<string, any>) => {
const flattenSpec = getObjByKey(column, "flattenSpec")
return flattenSpec
}

Expand Down Expand Up @@ -218,7 +211,7 @@ export const getDruidIngestionTemplate = (payload: Record<string, any>) => {
"dimensionsSpec": { "dimensions": dimensions },
"timestampSpec": { "column": indexCol, "format": "auto" },
"metricsSpec": metrics,
"granularitySpec": getGranularityObj(),
"granularitySpec": getGranularityObj()
},
"tuningConfig": {
"type": "kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ describe("DATASET CREATE API", () => {
return Promise.resolve({ dataValues: { id: "telemetry" } })
})
chai.spy.on(Dataset, "findOne", () => {
return Promise.resolve({ "data_schema": {"$schema": "https://json-schema.org/draft/2020-12/schema","type": "object",
"properties": {
"eid": {"type": "string"},
"ets": {"type": "string"}
return Promise.resolve({
"data_schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema", "type": "object",
"properties": {
"eid": { "type": "string" },
"ets": { "type": "string" }
},
"additionalProperties": true
},
"additionalProperties": true
},})
})
})
chai.spy.on(Dataset, "findAll", () => {
return Promise.resolve([{ "dataset_id": "master-telemetry", "dataset_config": { "redis_db": 15 } }])
})
chai.spy.on(DatasetTransformationsDraft, "findAll", () => {
return Promise.resolve()
Expand Down Expand Up @@ -156,4 +162,28 @@ describe("DATASET CREATE API", () => {
});
});

it("Failure: Master dataset not found as denorm", (done) => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve(null)
})
chai.spy.on(Dataset, "findAll", () => {
return Promise.resolve([{ "dataset_id": "trip-data-master", "dataset_config": { "redis_db": 15 } }])
})

chai
.request(app)
.post("/v2/datasets/create")
.send(TestInputsForDatasetCreate.VALID_DATASET)
.end((err, res) => {
res.should.have.status(httpStatus.NOT_FOUND);
res.body.should.be.a("object")
res.body.id.should.be.eq(apiId);
res.body.params.status.should.be.eq("FAILED")
res.body.params.msgid.should.be.eq(msgid)
res.body.error.message.should.be.eq("Denorm Master dataset not found")
res.body.error.code.should.be.eq("DATASET_DENORM_NOT_FOUND")
done();
});
});

})
Loading

0 comments on commit 5adbe2c

Please sign in to comment.