Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fixes #13

Merged
merged 9 commits into from
Mar 2, 2023
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"http-status": "^1.5.3",
"json-merger": "^1.1.9",
"kafkajs": "^2.2.3",
"kafkajs-snappy": "^1.1.0",
"knex": "^2.4.2",
"lodash": "^4.17.21",
"moment": "^2.29.4",
Expand Down
2 changes: 1 addition & 1 deletion src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const config = {
"kafka": {
"config": {
"brokers": [`${process.env.kafka_host || 'localhost'}:${process.env.kafka_port || 9092}`],
"clientId": process.env.clientId || "obsrv-apis",
"clientId": process.env.client_id || "obsrv-apis",
"retry": {
"initialRetryTime": process.env.kafka_initial_retry_time ? parseInt(process.env.kafka_initial_retry_time) : 3000,
"retries": process.env.kafka_retries ? parseInt(process.env.kafka_retries) : 5
Expand Down
10 changes: 10 additions & 0 deletions src/configs/RoutesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ export const routesConfig = {
api_id: "obsrv.config.dataset.preset",
method: "get",
path: "/obsrv/v1/datasets/preset"
},
publish:{
api_id: "obsrv.config.dataset.publish",
method: "get",
path: "/obsrv/v1/datasets/publish/:datasetId"
}
},
datasource: {
Expand Down Expand Up @@ -67,6 +72,11 @@ export const routesConfig = {
api_id: "obsrv.config.datasource.preset",
method: "get",
path: "/obsrv/v1/datasources/preset"
},
publish:{
api_id: "obsrv.config.datasource.publish",
method: "get",
path: "/obsrv/v1/datasources/publish/:datasourceId"
}
}
},
Expand Down
5 changes: 3 additions & 2 deletions src/connectors/DbConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ export class DbConnector implements IConnector {
})
}

private readRecord(table: string, fields: any) {
private async readRecord(table: string, fields: any) {
const query = this.pool.from(table).select().where(fields.filters)
const { offset, limit } = fields
if (offset && limit) {
return query.offset(offset).limit(limit)
}
return query
const fetchedRecords = await query
return fetchedRecords.length > 0 ? fetchedRecords : (() => { throw new Error('No records found') })()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we handling these exception being thrown here ?

}

}
18 changes: 11 additions & 7 deletions src/connectors/KafkaConnector.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Kafka, Producer, KafkaConfig, CompressionTypes } from 'kafkajs'
import { Kafka, Producer, KafkaConfig, CompressionTypes, CompressionCodecs } from 'kafkajs'
import { IConnector } from "../models/IngestionModels"
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

export class KafkaConnector implements IConnector {
private kafka: Kafka;
Expand All @@ -11,17 +13,19 @@ export class KafkaConnector implements IConnector {
})
}
async connect() {
return await this.producer.connect()
return await this.producer.connect()
}
async execute(topic: string, config: any) {
// TODO: Handle acks (which should be 3 for durability) and compression types
return await this.producer.send({
topic: topic,
messages: [{
value: config.value
}],
compression: CompressionTypes.Snappy
})
messages: [
{
value: config.value,
},
],
compression: CompressionTypes.Snappy,
});
}
async close() {
return await this.producer.disconnect()
Expand Down
6 changes: 5 additions & 1 deletion src/helpers/Datasets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { SchemaMerger } from '../generators/SchemaMerger'
let schemaMerger = new SchemaMerger()
export class Datasets {
private id: string
private dataset_name: string
private validation_config: ValidationConfig
private extraction_config: ExtractionConfig
private dedup_config: DedupConfig
Expand All @@ -14,9 +15,11 @@ export class Datasets {
private status: string
private created_by: string
private updated_by: string
private published_date: Date

constructor(payload: any) {
this.id = payload.id
this.dataset_name = payload.dataset_name
this.validation_config = payload.validation_config
this.extraction_config = payload.extraction_config
this.dedup_config = payload.dedup_config
Expand All @@ -26,10 +29,11 @@ export class Datasets {
this.status = payload.status
this.created_by = payload.created_by
this.updated_by = payload.updated_by
this.published_date = payload.published_date
}

public getValues() {
return Object.assign(this.removeNullValues({ id: this.id, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.id, dataset_name: this.dataset_name, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date })
}

public setValues() {
Expand Down
4 changes: 3 additions & 1 deletion src/helpers/Datasources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class Datasources {
private status: string
private created_by: string
private updated_by: string
private published_date: Date

constructor(payload: any) {
this.dataset_id = payload.dataset_id
Expand All @@ -26,9 +27,10 @@ export class Datasources {
this.status = payload.status
this.created_by = payload.created_by
this.updated_by = payload.updated_by
this.published_date = payload.published_date
}
public getValues() {
return Object.assign(this.removeNullValues({ id: this.getDataSourceId(), dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.getDataSourceId(), dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date })
}

public setValues() {
Expand Down
4 changes: 3 additions & 1 deletion src/resources/Constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
"DATASET_SAVED":"The dataset configuration has been saved successfully",
"DATASET_UPDATED":"The dataset configuration has been updated successfully",
"DATASOURCE_SAVED": "The datasource configuration has been saved successfully",
"DATASOURCE_UPDATED": "The datasource configuration has been updated successfully"
"DATASOURCE_UPDATED": "The datasource configuration has been updated successfully",
"DATASET_PUBLISHED": "The dataset configuration has been published successfully",
"DATASOURCE_PUBLISHED": "The datasource configuration has been published successfully"
}
}
2 changes: 1 addition & 1 deletion src/resources/schemas/DatasetConfigDefault.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"router_config": {
"topic": ""
},
"status": "ACTIVE",
"status": "DRAFT",
"created_by": "SYSTEM",
"updated_by": "SYSTEM"
}
6 changes: 6 additions & 0 deletions src/resources/schemas/DatasetSaveReq.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"id": {
"type": "string"
},
"dataset_name":{
"type": "string"
},
"extraction_config": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -89,6 +92,9 @@
},
"updated_by": {
"type": "string"
},
"published_date": {
"type": "string"
}
},
"required": [
Expand Down
14 changes: 4 additions & 10 deletions src/resources/schemas/DatasourceConfigDefault.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
{
"retention_period": {},
"archival_policy": {
"enabled": false
},
"purge_policy": {
"enabled": false
},
"backup_config": {
"enabled": false
},
"status": "ACTIVE",
"archival_policy": {},
"purge_policy": {},
"backup_config": {},
"status": "DRAFT",
"created_by": "SYSTEM",
"updated_by": "SYSTEM"
}
6 changes: 6 additions & 0 deletions src/resources/schemas/DatasourceSaveReq.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{
"type": "object",
"properties": {
"id": {
"type": "string"
},
"dataset_id": {
"type": "string"
},
Expand Down Expand Up @@ -30,6 +33,9 @@
},
"updated_by": {
"type": "string"
},
"published_date": {
"type": "string"
}
},
"required": [
Expand Down
14 changes: 9 additions & 5 deletions src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ const validationService = new ValidationService();

const queryService = new QueryService(new HTTPConnector(`${config.query_api.druid.host}:${config.query_api.druid.port}`));

export const kafkaConnector = new KafkaConnector(config.dataset_api.kafka.config)
export const kafkaConnector = new KafkaConnector(config.dataset_api.kafka.config);

export const dbConnector = new DbConnector(config.db_connector_config)
export const dbConnector = new DbConnector(config.db_connector_config);

export const datasourceService = new DataSourceService(dbConnector);
export const datasetService = new DatasetService(dbConnector);
export const ingestorService = new IngestorService(kafkaConnector)
export const ingestorService = new IngestorService(kafkaConnector);

const router = express.Router();

Expand All @@ -35,15 +35,19 @@ router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesC
/** Dataset APIs */
router.post(`${routesConfig.config.dataset.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.save.api_id), validationService.validateRequestBody, datasetService.save);
router.patch(`${routesConfig.config.dataset.update.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.update.api_id), validationService.validateRequestBody, datasetService.update);
router.get(`${routesConfig.config.dataset.preset.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.preset.api_id), datasetService.preset);
router.get(`${routesConfig.config.dataset.read.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.read.api_id), datasetService.read);
router.post(`${routesConfig.config.dataset.list.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.list.api_id), validationService.validateRequestBody, datasetService.list);
router.get(`${routesConfig.config.dataset.preset.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.preset.api_id), datasetService.preset)
router.get(`${routesConfig.config.dataset.publish.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.publish.api_id), datasetService.publish);


/** DataSource API(s) */
router.post(`${routesConfig.config.datasource.save.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.save.api_id), validationService.validateRequestBody, datasourceService.save);
router.patch(`${routesConfig.config.datasource.update.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.update.api_id), validationService.validateRequestBody, datasourceService.update);
router.get(`${routesConfig.config.datasource.preset.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.preset.api_id), datasourceService.preset);
router.get(`${routesConfig.config.datasource.read.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.read.api_id), datasourceService.read);
router.post(`${routesConfig.config.datasource.list.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.list.api_id), validationService.validateRequestBody, datasourceService.list);
router.get(`${routesConfig.config.datasource.preset.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.preset.api_id), datasourceService.preset)
router.get(`${routesConfig.config.datasource.publish.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.publish.api_id), datasourceService.publish);


export { router };
13 changes: 11 additions & 2 deletions src/services/DataSourceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import errorResponse from "http-errors"
import httpStatus from "http-status";
import { IConnector } from "../models/IngestionModels";
import { Datasources } from "../helpers/Datasources";
import _ from "lodash";

export class DataSourceService {
private connector: any;
Expand All @@ -31,8 +32,8 @@ export class DataSourceService {
}
public read = (req: Request, res: Response, next: NextFunction) => {
this.connector.execute("read", { "table": 'datasources', "fields": { "filters": { "id": req.params.datasourceId } } })
.then((data: any) => {
ResponseHandler.successResponse(req, res, { status: 200, data: data[0] })
.then((data: any[]) => {
ResponseHandler.successResponse(req, res, { status: 200, data: _.first(data) })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
Expand All @@ -55,4 +56,12 @@ export class DataSourceService {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
}
}
public publish = (req: Request, res: Response, next: NextFunction) => {
this.connector.execute("update", { "table": 'datasources', "fields": { "filters": { "id": req.params.datasourceId }, "values": { "status": "LIVE", "updated_date": new Date, "published_date": new Date } } })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we hardcoding status = live here?

.then(() => {
ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASOURCE_PUBLISHED, "dataset_id": req.body.id } })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
}
}
18 changes: 12 additions & 6 deletions src/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import _ from 'lodash'
import { Datasets } from "../helpers/Datasets";
import { IConnector } from "../models/IngestionModels";

const responseHandler = ResponseHandler;

export class DatasetService {
private dbConnector: IConnector;
constructor(dbConnector: IConnector) {
Expand All @@ -30,17 +28,17 @@ export class DatasetService {
}
public update = (req: Request, res: Response, next: NextFunction) => {
const dataset = new Datasets(req.body)
this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.body.id }, "values": dataset.setValues() } })
this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.body.id }, "values": dataset.getValues() } })
.then(() => {
ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASET_UPDATED, "dataset_id": req.body.id } })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
}
public read = (req: Request, res: Response, next: NextFunction) => {
this.dbConnector.execute("read", { "table": 'datasets', "fields": { "filters": {"id": req.params.datasetId} } })
.then((data: any) => {
ResponseHandler.successResponse(req, res, { status: 200, data: data[0] })
this.dbConnector.execute("read", { "table": 'datasets', "fields": { "filters": { "id": req.params.datasetId } } })
.then((data: any[]) => {
ResponseHandler.successResponse(req, res, { status: 200, data: _.first(data) })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
Expand All @@ -62,4 +60,12 @@ export class DatasetService {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
}
}
public publish = (req: Request, res: Response, next: NextFunction) => {
this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.params.datasetId }, "values": { "status": "LIVE", "updated_date": new Date, "published_date": new Date } } })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as well.

.then(() => {
ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASET_PUBLISHED, "dataset_id": req.body.id } })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
}
}
4 changes: 2 additions & 2 deletions src/services/IngestorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ export class IngestorService {
})
}
private getDatasetId(req: Request) {
let datasetId = req.params.datasetId
let datasetId = req.params.datasetId.trim()
if (!_.isEmpty(datasetId)) return datasetId
throw new Error("dataset parameter in url cannot be empty")
throw new Error("datasetId parameter in url cannot be empty")
}
}
3 changes: 2 additions & 1 deletion src/services/ValidationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class ValidationService {
status.isValid ?
next()
: next(errorResponse((httpStatus.BAD_REQUEST, status?.message || "")))
}
};


}