Skip to content

Commit

Permalink
Sanketika-Obsrv/issue-tracker#180 feat: API changes to support type c…
Browse files Browse the repository at this point in the history
…olumn in datasources
  • Loading branch information
GayathriSrividya committed May 20, 2024
1 parent 3d55381 commit e08fb87
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 19 deletions.
4 changes: 4 additions & 0 deletions api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export const config = {
normalDataset: "dataset",
masterDataset: "master-dataset"
},
"datasource_storage_types": {
druid: "druid",
datalake: "datalake"
},
"redis_config": {
"redis_host": process.env.redis_host || 'localhost',
"redis_port": process.env.redis_port || 6379
Expand Down
10 changes: 5 additions & 5 deletions api-service/src/connectors/DbConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class DbConnector implements IConnector {

public async insertRecord(table: string, fields: any) {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(fields, 'ingestion_spec'), table, _.get(fields, "type"))
await dbTransaction(table).insert(fields).on('query-error', (error: any) => {
this.log_error(OP_TYPES.INSERT, error, table, fields);
throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code}
Expand Down Expand Up @@ -89,7 +89,7 @@ export class DbConnector implements IConnector {
const existingRecord = await this.pool(table).select().where(filters).first()
if (!_.isUndefined(existingRecord)) {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(values, "type"))
await dbTransaction(table).where(filters).update(schemaMerger.mergeSchema(existingRecord, values)).on('query-error', (error: any) => {
this.log_error(OP_TYPES.UPSERT, error, table, values);
throw {...constants.FAILED_RECORD_UPDATE, "errCode": error.code}
Expand All @@ -100,7 +100,7 @@ export class DbConnector implements IConnector {
})
} else {
await this.pool.transaction(async (dbTransaction) => {
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table)
await this.submit_ingestion(_.get(values, 'ingestion_spec'), table, _.get(values, "type"))
await dbTransaction(table).insert(values).on('query-error', (error: any) => {
this.log_error(OP_TYPES.UPSERT, error, table, values);
throw {...constants.FAILED_RECORD_CREATE, "errCode": error.code}
Expand Down Expand Up @@ -144,8 +144,8 @@ export class DbConnector implements IConnector {
})
}

private async submit_ingestion(ingestion_spec: Record<string, any>, table: string) {
if (appConfig.table_names.datasources === table) {
private async submit_ingestion(ingestion_spec: Record<string, any>, table: string, storage_type: string) {
if (appConfig.table_names.datasources === table && storage_type === appConfig.datasource_storage_types.druid) {
return await wrapperService.submitIngestion(ingestion_spec)
.catch((error: any) => {
console.error(constants.INGESTION_FAILED_ON_SAVE)
Expand Down
4 changes: 3 additions & 1 deletion api-service/src/helpers/Datasources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export class Datasources {
private id: string
private dataset_id: string
private ingestion_spec: object
private type: string
private datasource: string
private datasource_ref: string
private retention_period: object
Expand All @@ -30,6 +31,7 @@ export class Datasources {
}
this.dataset_id = payload.dataset_id
this.ingestion_spec = payload.ingestion_spec
this.type = payload.type
this.datasource = payload.datasource
this.datasource_ref = payload.datasource_ref
this.retention_period = payload.retentionPeriod
Expand All @@ -44,7 +46,7 @@ export class Datasources {
this.metadata = payload.metadata
}
public getValues() {
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, type: this.type, datasource: this.datasource, datasource_ref: this.datasource_ref, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, version: this.version, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date, metadata: this.metadata }), { "updated_date": new Date })
}

public setValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
"metadata": {
"aggregated": false,
"granularity": "day"
}
},
"type": "druid"
}
4 changes: 4 additions & 0 deletions api-service/src/resources/schemas/DatasourceSaveReq.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
"type": "string"
}
}
},
"type": {
"type": "string",
"enum": ["druid", "datalake"]
}
},
"required": [
Expand Down
4 changes: 4 additions & 0 deletions api-service/src/resources/schemas/DatasourceUpdateReq.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
"type": "string"
}
}
},
"type": {
"type": "string",
"enum": ["druid", "datalake"]
}
},
"required": [
Expand Down
6 changes: 4 additions & 2 deletions api-service/src/services/DataSourceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import constants from "../resources/Constants.json";
import { ingestorService } from "../routes/Router";
import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler";
import { DatasetStatus, IConnector } from "../models/DatasetModels";
import { config } from "../configs/Config";


const telemetryObject = { id: null, type: "datasource", ver: "1.0.0" };

Expand All @@ -28,15 +30,15 @@ export class DataSourceService {
const datasources = new Datasources(req.body)
const payload: any = datasources.setValues()
updateTelemetryAuditEvent({ request: req, object: { ...telemetryObject, id: _.get(payload, 'id'), } });
await this.validateDatasource(payload)
if(payload.type === config.datasource_storage_types.druid) await this.validateDatasource(payload)
await this.dbUtil.save(req, res, next, payload)
} catch (error: any) { this.errorHandler.handleError(req, res, next, error) }
}
public update = async (req: Request, res: Response, next: NextFunction) => {
try {
const datasources = new Datasources(req.body)
const payload: Record<string, any> = datasources.setValues()
await this.validateDatasource(payload)
if(payload.type === config.datasource_storage_types.druid) await this.validateDatasource(payload)
await findAndSetExistingRecord({ dbConnector: this.dbConnector, table: this.table, request: req, filters: { "id": _.get(payload, 'id') }, object: { ...telemetryObject, id: _.get(payload, 'id') } });
await this.dbUtil.upsert(req, res, next, payload)
} catch (error: any) { this.errorHandler.handleError(req, res, next, error) }
Expand Down
28 changes: 18 additions & 10 deletions api-service/src/validators/QueryValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ export class QueryValidator implements IValidator {
if (!isFromClausePresent) {
return { isValid: false, message: "Invalid SQL Query", code: httpStatus["400_NAME"] };
}
const dataset = query.substring(query.indexOf("FROM")).split(" ")[1].replace(/\\/g, "");
const fromIndex = query.search(fromClause);
const dataset = query.substring(fromIndex + 4).trim().split(/\s+/)[0].replace(/\\/g, "");
if (_.isEmpty(dataset)) {
return { isValid: false, message: "Dataset name must be present in the SQL Query", code: httpStatus["400_NAME"] };
}
Expand All @@ -85,7 +86,9 @@ export class QueryValidator implements IValidator {
fromDate = moment(extractedDateRange[0], this.momentFormat);
toDate = moment(extractedDateRange[1], this.momentFormat);
} else {
let vocabulary = queryPayload.querySql.query.split(" ");
let query = queryPayload.querySql.query;
query = query.toUpperCase().replace(/\s+/g, " ").trim();
let vocabulary = query.split(/\s+/);
let fromDateIndex = vocabulary.indexOf("TIMESTAMP");
let toDateIndex = vocabulary.lastIndexOf("TIMESTAMP");
fromDate = moment(vocabulary[fromDateIndex + 1], this.momentFormat);
Expand All @@ -100,8 +103,9 @@ export class QueryValidator implements IValidator {
if (queryPayload.querySql) {
let query = queryPayload.querySql.query;
query = query.replace(/\s+/g, " ").trim();
let dataSource = query.substring(query.indexOf("FROM")).split(" ")[1].replace(/\\/g, "");
return dataSource.replace(/"/g, "");
const fromIndex = query.search(/\bFROM\b/i);
const dataSource = query.substring(fromIndex).split(/\s+/)[1].replace(/\\/g, "").replace(/"/g, "");
return dataSource;
} else {
const dataSourceField: any = queryPayload.query.dataSource
if (typeof dataSourceField == 'object') { return dataSourceField.name }
Expand Down Expand Up @@ -141,14 +145,18 @@ export class QueryValidator implements IValidator {
queryPayload.query.limit = limits.maxResultRowLimit;
}
} else {
let vocabulary = queryPayload.querySql.query.split(" ");
let queryLimitIndex = vocabulary.indexOf("LIMIT");
let queryLimit = Number(vocabulary[queryLimitIndex + 1]);
const limitClause = /\bLIMIT\b/i;
const vocabulary = queryPayload.querySql.query.split(/\s+/); // Splitting the query by whitespace
const queryLimitIndex = vocabulary.findIndex(word => limitClause.test(word));
const queryLimit = Number(vocabulary[queryLimitIndex + 1]);

if (isNaN(queryLimit)) {
const updatedVocabulary = [...vocabulary, "LIMIT", limits.maxResultRowLimit].join(" ");
queryPayload.querySql.query = updatedVocabulary;
// If "LIMIT" clause doesn't exist or its value is not a number, update the query
const updatedVocabulary = [...vocabulary, "LIMIT", limits.maxResultRowLimit];
queryPayload.querySql.query = updatedVocabulary.join(" ");
} else {
let newLimit = this.getLimit(queryLimit, limits.maxResultRowLimit);
// If "LIMIT" clause exists and its value is a number, update the limit
const newLimit = this.getLimit(queryLimit, limits.maxResultRowLimit);
vocabulary[queryLimitIndex + 1] = newLimit.toString();
queryPayload.querySql.query = vocabulary.join(" ");
}
Expand Down

0 comments on commit e08fb87

Please sign in to comment.