Skip to content

Commit

Permalink
Sanketika-Obsrv/issue-tracker#154: switch for generating different in…
Browse files Browse the repository at this point in the history
…gestion template
  • Loading branch information
JeraldJF committed May 20, 2024
1 parent 427e54a commit 5f2face
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 26 deletions.
2 changes: 1 addition & 1 deletion api-service/src/configs/IngestionConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export const ingestionConfig = {
"indexCol": { "Event Arrival Time": "obsrv_meta.syncts" },
"granularitySpec": {
"rollup": false,
"segmentGranularity": env.segment_granularity || "day"
"segmentGranularity": env.segment_granularity || "DAY"
},
"ioconfig": { "topic": "", "taskDuration": "PT1H" },
"tuningConfig": { "maxRowPerSegment": 5000000, "taskCount": 1 },
Expand Down
24 changes: 1 addition & 23 deletions api-service/src/controllers/DatasetCreate/DatasetCreate.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Request, Response } from "express";
import logger from "../../logger";
import { getDraftDataset, getDuplicateConfigs, getDuplicateDenormKey, setReqDatasetId } from "../../services/DatasetService";
import { generateDataSource, getDraftDataset, getDuplicateConfigs, getDuplicateDenormKey, setReqDatasetId } from "../../services/DatasetService";
import _ from "lodash";
import DatasetCreate from "./DatasetCreateValidationSchema.json";
import { schemaValidation } from "../../services/ValidationService";
Expand All @@ -11,9 +11,7 @@ import { defaultDatasetConfig, defaultMasterConfig } from "../../configs/Dataset
import { DatasetType } from "../../types/DatasetModels";
import { query, sequelize } from "../../connections/databaseConnection";
import { ErrorObject } from "../../types/ResponseModel";
import { generateIngestionSpec } from "../../services/IngestionService";
import { DatasourceDraft } from "../../models/DatasourceDraft";
import { ingestionConfig } from "../../configs/IngestionConfig";
import { DatasetTransformationsDraft } from "../../models/TransformationDraft";

export const apiId = "api.datasets.create"
Expand Down Expand Up @@ -174,24 +172,4 @@ const getTransformationConfig = (configs: Record<string, any>): Record<string, a
return []
}

const generateDataSource = (payload: Record<string, any>) => {
const { id } = payload
const ingestionSpec = generateIngestionSpec(payload)
const dataSource = getDataSource({ ingestionSpec, id })
return dataSource
}

const getDataSource = (ingestionPayload: Record<string, any>) => {
const { ingestionSpec, id } = ingestionPayload
const dataSource = `${id}_${ingestionConfig.granularitySpec.segmentGranularity}`
const dataSourceId = `${id}_${dataSource}`
return {
id: dataSourceId,
datasource: dataSource,
dataset_id: id,
ingestion_spec: ingestionSpec,
datasource_ref: dataSource
}
}

export default datasetCreate;
22 changes: 22 additions & 0 deletions api-service/src/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import _ from "lodash";
import { DatasetDraft } from "../models/DatasetDraft";
import { DatasetTransformationsDraft } from "../models/TransformationDraft";
import { Request } from "express";
import { generateIngestionSpec } from "./IngestionService";
import { ingestionConfig } from "../configs/IngestionConfig";

export const getDataset = async (datasetId: string): Promise<any> => {
const dataset = await Dataset.findOne({
Expand Down Expand Up @@ -39,4 +41,24 @@ export const setReqDatasetId = (req: Request, dataset_id: string) => {

export const getDuplicateConfigs = (configs: Array<string | any>) => {
return _.filter(configs, (item: string, index: number) => _.indexOf(configs, item) !== index);
}

export const generateDataSource = (payload: Record<string, any>) => {
const { id } = payload
const ingestionSpec = generateIngestionSpec(payload)
const dataSource = getDataSource({ ingestionSpec, id })
return dataSource
}

const getDataSource = (ingestionPayload: Record<string, any>) => {
const { ingestionSpec, id } = ingestionPayload
const dataSource = `${id}_${_.toLower(ingestionConfig.granularitySpec.segmentGranularity)}`
const dataSourceId = `${id}_${dataSource}`
return {
id: dataSourceId,
datasource: dataSource,
dataset_id: id,
ingestion_spec: ingestionSpec,
datasource_ref: dataSource
}
}
14 changes: 12 additions & 2 deletions api-service/src/services/IngestionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,20 @@ export const generateIngestionSpec = (payload: Record<string, any>) => {
}
const simplifiedSpec = generateExpression(_.get(data_schema, "properties"), indexCol);
const generatedSpec = process(simplifiedSpec, indexCol)
const ingestionTemplate = getIngestionTemplate({ generatedSpec, id, indexCol, dataset_id })
const ingestionTemplate = generateIngestionTemplate({ generatedSpec, id, indexCol, dataset_id, type: "druid" })
return ingestionTemplate
}

const generateIngestionTemplate = (payload: Record<string, any>) => {
const { type, ...rest } = payload
switch (type) {
case 'druid':
return getDruidIngestionTemplate(rest);
default:
return null;
}
}

const checkTimestampCol = (schema: Record<string, any>) => {
const { indexCol, data_schema } = schema
if (indexCol !== defaultIndexCol) {
Expand Down Expand Up @@ -194,7 +204,7 @@ const getObjectType = (type: string): string => {
}
}

export const getIngestionTemplate = (payload: Record<string, any>) => {
export const getDruidIngestionTemplate = (payload: Record<string, any>) => {
const { id, generatedSpec, indexCol, dataset_id } = payload
const { dimensions, metrics, flattenSpec } = generatedSpec
const dataSource = `${id}_${ingestionConfig.granularitySpec.segmentGranularity}`
Expand Down

0 comments on commit 5f2face

Please sign in to comment.