diff --git a/api-service/src/configs/Config.ts b/api-service/src/configs/Config.ts index 81c6dc44..1aba2066 100644 --- a/api-service/src/configs/Config.ts +++ b/api-service/src/configs/Config.ts @@ -58,6 +58,10 @@ export const config = { normalDataset: "dataset", masterDataset: "master-dataset" }, + "datasource_storage_types": { + druid: "druid", + datalake: "datalake" + }, "redis_config": { "redis_host": process.env.redis_host || 'localhost', "redis_port": process.env.redis_port || 6379 diff --git a/api-service/src/connectors/DbConnector.ts b/api-service/src/connectors/DbConnector.ts index fde799d3..3a73ec75 100644 --- a/api-service/src/connectors/DbConnector.ts +++ b/api-service/src/connectors/DbConnector.ts @@ -57,7 +57,7 @@ export class DbConnector implements IConnector { public async insertRecord(table: string, fields: any) { await this.pool.transaction(async (dbTransaction) => { - await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table) + await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table, _.get(fields, "type")) await dbTransaction(table).insert(fields).on('query-error', (error: any) => { this.log_error(OP_TYPES.INSERT, error, table, fields); throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code} @@ -89,7 +89,7 @@ export class DbConnector implements IConnector { const existingRecord = await this.pool(table).select().where(filters).first() if (!_.isUndefined(existingRecord)) { await this.pool.transaction(async (dbTransaction) => { - await this.submit_ingestion(_.get(values, 'ingestion_spec'), table) + await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(values, "type")) await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(existingRecord, values)).on('query-error', (error: any) => { this.log_error(OP_TYPES.UPSERT, error, table, values); throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code} @@ -100,7 +100,7 @@ export class DbConnector implements IConnector { }) } else { await this.pool.transaction(async (dbTransaction) => { - await this.submit_ingestion(_.get(values, 'ingestion_spec'), table) + await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(values, "type")) await dbTransaction(table).insert(values).on('query-error', (error: any) => { this.log_error(OP_TYPES.UPSERT, error, table, values); throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code} @@ -144,8 +144,8 @@ export class DbConnector implements IConnector { }) } - private async submit_ingestion(ingestion_spec: Record, table: string) { - if (appConfig.table_names.datasources === table) { + private async submit_ingestion(ingestion_spec: Record, table: string, storage_type: string) { + if (appConfig.table_names.datasources === table && storage_type === appConfig.datasource_storage_types.druid) { return await wrapperService.submitIngestion(ingestion_spec) .catch((error: any) => { console.error(constants.INGESTION_FAILED_ON_SAVE) diff --git a/api-service/src/helpers/Datasources.ts b/api-service/src/helpers/Datasources.ts index 1a28b7bf..2a3d1051 100644 --- a/api-service/src/helpers/Datasources.ts +++ b/api-service/src/helpers/Datasources.ts @@ -8,6 +8,7 @@ export class Datasources { private id: string private dataset_id: string private ingestion_spec: object + private type: string private datasource: string private datasource_ref: string private retention_period: object @@ -30,6 +31,7 @@ export class Datasources { } this.dataset_id = payload.dataset_id this.ingestion_spec = payload.ingestion_spec + this.type = payload.type this.datasource = payload.datasource this.datasource_ref = payload.datasource_ref this.retention_period = payload.retentionPeriod @@ -44,7 +46,7 @@ export class Datasources { this.metadata = payload.metadata } public getValues() { - return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date }) + return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, type: this.type, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date }) } public setValues() { diff --git a/api-service/src/resources/schemas/DatasourceConfigDefault.json b/api-service/src/resources/schemas/DatasourceConfigDefault.json index 96babc63..48645bfd 100644 --- a/api-service/src/resources/schemas/DatasourceConfigDefault.json +++ b/api-service/src/resources/schemas/DatasourceConfigDefault.json @@ -17,5 +17,6 @@ "metadata": { "aggregated": false, "granularity": "day" - } + }, + "type": "druid" } diff --git a/api-service/src/resources/schemas/DatasourceSaveReq.json b/api-service/src/resources/schemas/DatasourceSaveReq.json index 52dec1b0..6e3c4b5c 100644 --- a/api-service/src/resources/schemas/DatasourceSaveReq.json +++ b/api-service/src/resources/schemas/DatasourceSaveReq.json @@ -48,6 +48,10 @@ "type": "string" } } + }, + "type": { + "type": "string", + "enum": ["druid", "datalake"] } }, "required": [ diff --git a/api-service/src/resources/schemas/DatasourceUpdateReq.json b/api-service/src/resources/schemas/DatasourceUpdateReq.json index 85e0ca33..4faa3c1a 100644 --- a/api-service/src/resources/schemas/DatasourceUpdateReq.json +++ b/api-service/src/resources/schemas/DatasourceUpdateReq.json @@ -48,6 +48,10 @@ "type": "string" } } + }, + "type": { + "type": "string", + "enum": ["druid", "datalake"] } }, "required": [ diff --git a/api-service/src/services/DataSourceService.ts b/api-service/src/services/DataSourceService.ts index 4230c1f3..e3cb8499 100644 --- a/api-service/src/services/DataSourceService.ts +++ b/api-service/src/services/DataSourceService.ts @@ -7,6 +7,8 @@ import constants from "../resources/Constants.json"; import { ingestorService } from "../routes/Router"; import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler"; import { DatasetStatus, IConnector } from "../models/DatasetModels"; +import { config } from "../configs/Config"; + const telemetryObject = { id: null, type: "datasource", ver: "1.0.0" }; @@ -28,7 +30,7 @@ export class DataSourceService { const datasources = new Datasources(req.body) const payload: any = datasources.setValues() updateTelemetryAuditEvent({ request: req, object: { ...telemetryObject, id: _.get(payload, 'id'), } }); - await this.validateDatasource(payload) + if(payload.type === config.datasource_storage_types.druid) await this.validateDatasource(payload) await this.dbUtil.save(req, res, next, payload) } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } } @@ -36,7 +38,7 @@ export class DataSourceService { try { const datasources = new Datasources(req.body) const payload: Record = datasources.setValues() - await this.validateDatasource(payload) + if(payload.type === config.datasource_storage_types.druid) await this.validateDatasource(payload) await findAndSetExistingRecord({ dbConnector: this.dbConnector, table: this.table, request: req, filters: { "id": _.get(payload, 'id') }, object: { ...telemetryObject, id: _.get(payload, 'id') } }); await this.dbUtil.upsert(req, res, next, payload) } catch (error: any) { this.errorHandler.handleError(req, res, next, error) } diff --git a/api-service/src/validators/QueryValidator.ts b/api-service/src/validators/QueryValidator.ts index bf306e0b..312c11d4 100644 --- a/api-service/src/validators/QueryValidator.ts +++ b/api-service/src/validators/QueryValidator.ts @@ -63,7 +63,8 @@ export class QueryValidator implements IValidator { if (!isFromClausePresent) { return { isValid: false, message: "Invalid SQL Query", code: httpStatus["400_NAME"] }; } - const dataset = query.substring(query.indexOf("FROM")).split(" ")[1].replace(/\\/g, ""); + const fromIndex = query.search(fromClause); + const dataset = query.substring(fromIndex + 4).trim().split(/\s+/)[0].replace(/\\/g, ""); if (_.isEmpty(dataset)) { return { isValid: false, message: "Dataset name must be present in the SQL Query", code: httpStatus["400_NAME"] }; } @@ -85,7 +86,9 @@ export class QueryValidator implements IValidator { fromDate = moment(extractedDateRange[0], this.momentFormat); toDate = moment(extractedDateRange[1], this.momentFormat); } else { - let vocabulary = queryPayload.querySql.query.split(" "); + let query = queryPayload.querySql.query; + query = query.toUpperCase().replace(/\s+/g, " ").trim(); + let vocabulary = query.split(/\s+/); let fromDateIndex = vocabulary.indexOf("TIMESTAMP"); let toDateIndex = vocabulary.lastIndexOf("TIMESTAMP"); fromDate = moment(vocabulary[fromDateIndex + 1], this.momentFormat); @@ -100,8 +103,9 @@ export class QueryValidator implements IValidator { if (queryPayload.querySql) { let query = queryPayload.querySql.query; query = query.replace(/\s+/g, " ").trim(); - let dataSource = query.substring(query.indexOf("FROM")).split(" ")[1].replace(/\\/g, ""); - return dataSource.replace(/"/g, ""); + const fromIndex = query.search(/\bFROM\b/i); + const dataSource = query.substring(fromIndex).split(/\s+/)[1].replace(/\\/g, "").replace(/"/g, ""); + return dataSource; } else { const dataSourceField: any = queryPayload.query.dataSource if (typeof dataSourceField == 'object') { return dataSourceField.name } @@ -141,14 +145,18 @@ export class QueryValidator implements IValidator { queryPayload.query.limit = limits.maxResultRowLimit; } } else { - let vocabulary = queryPayload.querySql.query.split(" "); - let queryLimitIndex = vocabulary.indexOf("LIMIT"); - let queryLimit = Number(vocabulary[queryLimitIndex + 1]); + const limitClause = /\bLIMIT\b/i; + const vocabulary = queryPayload.querySql.query.split(/\s+/); // Splitting the query by whitespace + const queryLimitIndex = vocabulary.findIndex(word => limitClause.test(word)); + const queryLimit = Number(vocabulary[queryLimitIndex + 1]); + if (isNaN(queryLimit)) { - const updatedVocabulary = [...vocabulary, "LIMIT", limits.maxResultRowLimit].join(" "); - queryPayload.querySql.query = updatedVocabulary; + // If "LIMIT" clause doesn't exist or its value is not a number, update the query + const updatedVocabulary = [...vocabulary, "LIMIT", limits.maxResultRowLimit]; + queryPayload.querySql.query = updatedVocabulary.join(" "); } else { - let newLimit = this.getLimit(queryLimit, limits.maxResultRowLimit); + // If "LIMIT" clause exists and its value is a number, update the limit + const newLimit = this.getLimit(queryLimit, limits.maxResultRowLimit); vocabulary[queryLimitIndex + 1] = newLimit.toString(); queryPayload.querySql.query = vocabulary.join(" "); }