Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanketika-Obsrv/issue-tracker#OBS-151: feat: Generate ingestion spec before publish dataset #197

Merged
merged 4 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,13 @@
"denorm_out_field": {
"type": "string",
"minLength": 1
},
"dataset_id": {
"type": "string",
"minLength": 1
}
},
"required": ["denorm_key", "denorm_out_field"],
"required": ["denorm_key", "denorm_out_field", "dataset_id"],
"additionalProperties": false
}
}
Expand Down Expand Up @@ -171,7 +175,29 @@
"minLength": 1
},
"transformation_function": {
"type": "object"
"type": "object",
"properties": {
"type": {
"type": "string",
"minLength": 1
},
"expr": {
"type": "string",
"minLength": 1
},
"condition": {
"oneOf": [
{
"type": "string"
},
{
"const": null
}
]
}
},
"additionalProperties": false,
"required": ["type", "expr"]
},
"mode": {
"type": "string",
Expand All @@ -188,16 +214,15 @@
"required": [
"field_key",
"transformation_function",
"mode",
"metadata"
"mode"
]
}
},
"tags": {
"type": "array",
"items": {
"type": "string",
"minLength":1
"minLength": 1
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Request, Response } from "express";
import _ from "lodash";
import logger from "../../logger";
import { ResponseHandler } from "../../helpers/ResponseHandler";
import { getDataset, getDraftDataset, setReqDatasetId } from "../../services/DatasetService";
import { generateDataSource, getDataset, getDraftDataset, setReqDatasetId } from "../../services/DatasetService";
import { ErrorObject } from "../../types/ResponseModel";
import { schemaValidation } from "../../services/ValidationService";
import StatusTransitionSchema from "./RequestValidationSchema.json";
Expand Down Expand Up @@ -34,7 +34,7 @@ const allowedTransitions = {
const statusTransitionCommands = {
Delete: ["DELETE_DRAFT_DATASETS"],
ReadyToPublish: ["VALIDATE_DATASET_CONFIGS"],
Live: ["PUBLISH_DATASET"],
Live: ["GENERATE_INGESTION_SPEC", "PUBLISH_DATASET"],
Retire: ["CHECK_DATASET_IS_DENORM", "SET_DATASET_TO_RETIRE", "DELETE_SUPERVISORS", "RESTART_PIPELINE"]
}

Expand Down Expand Up @@ -121,11 +121,10 @@ const fetchDataset = async (configs: Record<string, any>) => {

const executeTransition = async (configs: Record<string, any>) => {
const { transitionCommands, dataset, transact } = configs
const transitionPromises = _.map(transitionCommands, async command => {
const commandWorkflow = _.get(commandExecutors, command)
return commandWorkflow({ dataset, transact })
})
await Promise.all(transitionPromises)
for (const command of transitionCommands) {
const commandWorkflow = _.get(commandExecutors, command);
await commandWorkflow({ dataset, transact });
}
}

//VALIDATE_DATASET_CONFIGS
Expand Down Expand Up @@ -158,6 +157,13 @@ const deleteDraftRecords = async (config: Record<string, any>) => {
await DatasetDraft.destroy({ where: { id: dataset_id }, transaction: transact })
}

//GENERATE_INGESTION_SPEC
const generateIngestionSpec = async (config: Record<string, any>) => {
const { dataset } = config;
const dataSource = await generateDataSource(dataset);
return DatasourceDraft.upsert(dataSource)
}

//PUBLISH_DATASET
const publishDataset = async (configs: Record<string, any>) => {
const { dataset } = configs
Expand Down Expand Up @@ -225,6 +231,7 @@ const restartPipeline = async (config: Record<string, any>) => {
const commandExecutors = {
DELETE_DRAFT_DATASETS: deleteDataset,
PUBLISH_DATASET: publishDataset,
GENERATE_INGESTION_SPEC: generateIngestionSpec,
CHECK_DATASET_IS_DENORM: checkDatasetDenorm,
SET_DATASET_TO_RETIRE: setDatasetRetired,
DELETE_SUPERVISORS: deleteSupervisors,
Expand Down
4 changes: 2 additions & 2 deletions api-service/src/v2/controllers/DatasetUpdate/DatasetUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ const checkDatasetExists = async (dataset_id: string, version_key: string): Prom
if (datasetExists) {
const validVersionKey = _.get(datasetExists, "version_key")
const apiVersion = _.get(datasetExists, "api_version")
if (validVersionKey !== version_key && apiVersion) {
if (validVersionKey !== version_key && apiVersion === "v2") {
return { isDatasetExists: true, datasetStatus: datasetExists.status, invalidVersionKey: true, validVersionKey }
}
return { isDatasetExists: true, datasetStatus: datasetExists.status }
Expand Down Expand Up @@ -344,7 +344,7 @@ const setDenormConfigs = (newDenormPayload: Record<string, any>, datasetDenormCo
if (_.isEmpty(denorm_fields)) {
return { denorm_fields }
}

const getDenormPayload = (action: string) => {
return _.compact(_.flatten(_.map(denorm_fields, payload => {
if (payload.action == action) return payload.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,29 @@
"minLength": 1
},
"transformation_function": {
"type": "object"
"type": "object",
"properties": {
"type": {
"type": "string",
"minLength": 1
},
"expr": {
"type": "string",
"minLength": 1
},
"condition": {
"oneOf": [
{
"type": "string"
},
{
"const": null
}
]
}
},
"additionalProperties": false,
"required": ["type", "expr"]
},
"mode": {
"type": "string",
Expand Down Expand Up @@ -293,9 +315,14 @@
"denorm_out_field": {
"type": "string",
"minLength": 1
},
"dataset_id": {
"type": "string",
"minLength": 1
}
},
"required": ["denorm_out_field"]
"required": ["denorm_out_field"],
"additionalProperties": false
},
"action": {
"type": "string",
Expand All @@ -314,7 +341,7 @@
"then": {
"properties": {
"values": {
"required": ["denorm_key"]
"required": ["denorm_key", "dataset_id"]
}
}
}
Expand Down
26 changes: 25 additions & 1 deletion api-service/src/v2/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { getUpdatedSchema } from "./DatasourceService";
import { DatasetType } from "../types/DatasetModels";
import { defaultDatasetConfig, defaultMasterConfig } from "../configs/DatasetConfigDefault";
import { query } from "../connections/databaseConnection";
import { ErrorObject } from "../types/ResponseModel";

export const getDataset = async (datasetId: string, raw = false): Promise<any> => {
const dataset = await Dataset.findOne({
Expand Down Expand Up @@ -76,7 +77,8 @@ const getDataSource = (ingestionPayload: Record<string, any>) => {

const getDatasetDefaults = async (payload: Record<string, any>): Promise<Record<string, any>> => {
const datasetPayload = mergeDatasetConfigs(defaultDatasetConfig, payload)
return datasetPayload
const denormPayload = await updateDenormFields(_.get(datasetPayload, "denorm_config"))
return { ...datasetPayload, denorm_config: denormPayload }
}

const setRedisDBConfig = async (datasetConfig: Record<string, any>): Promise<Record<string, any>> => {
Expand Down Expand Up @@ -115,4 +117,26 @@ const mergeDatasetConfigs = (defaultConfig: Record<string, any>, requestPayload:
const defaults = _.cloneDeep(defaultConfig)
const datasetConfigs = _.merge(defaults, modifyPayload)
return datasetConfigs
}

const updateDenormFields = async (denormConfigs: Record<string, any>) => {
const denormFields = _.get(denormConfigs, "denorm_fields")
if (_.isEmpty(denormFields)) {
return denormConfigs
}
const masterDatasets = await Dataset.findAll({ where: { type: DatasetType.MasterDataset }, raw: true });
const updatedFields = _.map(denormFields, fields => {
const master = _.find(masterDatasets, dataset => _.get(dataset, ["dataset_id"]) === fields.dataset_id)
if (_.isEmpty(master)) {
throw {
code: "DATASET_DENORM_NOT_FOUND",
message: "Denorm Master dataset not found",
statusCode: 404,
errCode: "NOT_FOUND"
} as ErrorObject
}
const redis_db = _.get(master, ["dataset_config", "redis_db"])
return { ...fields, redis_db }
})
return { ...denormConfigs, denorm_fields: updatedFields }
}
33 changes: 6 additions & 27 deletions api-service/src/v2/services/DatasourceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,36 +63,15 @@ export const getDatasource = async (datasetId: string) => {
}

export const getUpdatedSchema = async (configs: Record<string, any>) => {
const { id, transformation_config, denorm_config, data_schema, action, indexCol = ingestionConfig.indexCol["Event Arrival Time"] } = configs
const existingTransformations = await DatasetTransformationsDraft.findAll({ where: { dataset_id: id }, raw: true })
let resultantTransformations: any[] = []
if (action === "edit") {
const toDeleteTransformations = _.compact(_.map(transformation_config, config => {
if (_.includes(["update", "remove"], _.get(config, "action"))) {
return _.get(config, ["value", "field_key"])
}
}))
const updatedExistingTransformations = _.compact(_.map(existingTransformations, configs => {
if (!_.includes(toDeleteTransformations, _.get(configs, "field_key"))) {
return configs
}
})) || []
const newTransformations = _.compact(_.map(transformation_config, config => {
if (_.includes(["update", "add"], _.get(config, "action"))) {
return config
}
})) || []
resultantTransformations = [...updatedExistingTransformations, ...newTransformations]
}
if (action === "create") {
resultantTransformations = transformation_config || []
}
const { id, denorm_config, data_schema, dataset_config } = configs
const indexCol = _.get(dataset_config, "timestamp_key")
const datasetTransformations = await DatasetTransformationsDraft.findAll({ where: { dataset_id: id }, raw: true })
let denormFields = _.get(denorm_config, "denorm_fields")
let updatedColumns = flattenSchema(data_schema)
const transformedFields = _.filter(resultantTransformations, field => _.get(field, ["metadata", "section"]) === "transformation")
let additionalFields = _.filter(resultantTransformations, field => _.get(field, ["metadata", "section"]) === "additionalFields")
const transformedFields = _.filter(datasetTransformations, field => _.get(field, ["metadata", "section"]) === "transformation")
let additionalFields = _.filter(datasetTransformations, field => _.get(field, ["metadata", "section"]) === "additionalFields")
updatedColumns = _.map(updatedColumns, (item) => {
const transformedData = _.find(transformedFields, { field_key: item.column });
const transformedData: any = _.find(transformedFields, { field_key: item.column });
if (transformedData) {
const data = _.get(transformedData, "metadata")
return {
Expand Down
23 changes: 8 additions & 15 deletions api-service/src/v2/services/IngestionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ const connectorInstanceSpecObj = {
}

export const generateIngestionSpec = (payload: Record<string, any>) => {
const { indexCol = defaultIndexCol, data_schema, id, dataset_id } = payload
const { data_schema, id, dataset_id, dataset_config } = payload
const indexCol = _.get(dataset_config, "timestamp_key") || defaultIndexCol
const isValidTimestamp = checkTimestampCol({ indexCol, data_schema })
if (!isValidTimestamp) {
throw {
Expand All @@ -44,7 +45,7 @@ export const generateIngestionSpec = (payload: Record<string, any>) => {
} as ErrorObject
}
const simplifiedSpec = generateExpression(_.get(data_schema, "properties"), indexCol);
const generatedSpec = process(simplifiedSpec, indexCol)
const generatedSpec = process(simplifiedSpec)
const ingestionTemplate = generateIngestionTemplate({ generatedSpec, id, indexCol, dataset_id, type: "druid" })
return ingestionTemplate
}
Expand All @@ -69,26 +70,18 @@ const checkTimestampCol = (schema: Record<string, any>) => {
return true
}

const process = (spec: Map<string, any>, indexCol: string): IngestionSpecModel => {
const process = (spec: Map<string, any>): IngestionSpecModel => {
const colValues = Array.from(spec.values())
const dimensions = filterDimensionCols(colValues)
return <IngestionSpecModel>{
"dimensions": getObjByKey(dimensions, "dimensions"),
"metrics": filterMetricsCols(spec),
"flattenSpec": filterFlattenSpec(colValues, indexCol)
"flattenSpec": filterFlattenSpec(colValues)
}
}

const filterFlattenSpec = (column: Record<string, any>, indexCol: string) => {
let flattenSpec = getObjByKey(column, "flattenSpec")
if (indexCol === defaultIndexCol) {
const indexColDefaultSpec = {
"expr": ingestionConfig.syncts_path,
"name": ingestionConfig.indexCol["Event Arrival Time"],
"type": "path"
}
flattenSpec = _.concat(flattenSpec, indexColDefaultSpec)
}
const filterFlattenSpec = (column: Record<string, any>) => {
const flattenSpec = getObjByKey(column, "flattenSpec")
return flattenSpec
}

Expand Down Expand Up @@ -218,7 +211,7 @@ export const getDruidIngestionTemplate = (payload: Record<string, any>) => {
"dimensionsSpec": { "dimensions": dimensions },
"timestampSpec": { "column": indexCol, "format": "auto" },
"metricsSpec": metrics,
"granularitySpec": getGranularityObj(),
"granularitySpec": getGranularityObj()
},
"tuningConfig": {
"type": "kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ describe("DATASET CREATE API", () => {
return Promise.resolve({ dataValues: { id: "telemetry" } })
})
chai.spy.on(Dataset, "findOne", () => {
return Promise.resolve({ "data_schema": {"$schema": "https://json-schema.org/draft/2020-12/schema","type": "object",
"properties": {
"eid": {"type": "string"},
"ets": {"type": "string"}
return Promise.resolve({
"data_schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema", "type": "object",
"properties": {
"eid": { "type": "string" },
"ets": { "type": "string" }
},
"additionalProperties": true
},
"additionalProperties": true
},})
})
})
chai.spy.on(Dataset, "findAll", () => {
return Promise.resolve([{ "dataset_id": "master-telemetry", "dataset_config": { "redis_db": 15 } }])
})
chai.spy.on(DatasetTransformationsDraft, "findAll", () => {
return Promise.resolve()
Expand Down Expand Up @@ -156,4 +162,28 @@ describe("DATASET CREATE API", () => {
});
});

it("Failure: Master dataset not found as denorm", (done) => {
chai.spy.on(DatasetDraft, "findOne", () => {
return Promise.resolve(null)
})
chai.spy.on(Dataset, "findAll", () => {
return Promise.resolve([{ "dataset_id": "trip-data-master", "dataset_config": { "redis_db": 15 } }])
})

chai
.request(app)
.post("/v2/datasets/create")
.send(TestInputsForDatasetCreate.VALID_DATASET)
.end((err, res) => {
res.should.have.status(httpStatus.NOT_FOUND);
res.body.should.be.a("object")
res.body.id.should.be.eq(apiId);
res.body.params.status.should.be.eq("FAILED")
res.body.params.msgid.should.be.eq(msgid)
res.body.error.message.should.be.eq("Denorm Master dataset not found")
res.body.error.code.should.be.eq("DATASET_DENORM_NOT_FOUND")
done();
});
});

})
Loading