Skip to content

Commit

Permalink
Merge pull request #13 from Sanketika-Obsrv/config-services-v2
Browse files Browse the repository at this point in the history
Bug Fixes
  • Loading branch information
manjudr authored Mar 2, 2023
2 parents d544f45 + 801f8d5 commit 8eb49b0
Show file tree
Hide file tree
Showing 17 changed files with 90 additions and 40 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"http-status": "^1.5.3",
"json-merger": "^1.1.9",
"kafkajs": "^2.2.3",
"kafkajs-snappy": "^1.1.0",
"knex": "^2.4.2",
"lodash": "^4.17.21",
"moment": "^2.29.4",
Expand Down
2 changes: 1 addition & 1 deletion src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const config = {
"kafka": {
"config": {
"brokers": [`${process.env.kafka_host || 'localhost'}:${process.env.kafka_port || 9092}`],
"clientId": process.env.clientId || "obsrv-apis",
"clientId": process.env.client_id || "obsrv-apis",
"retry": {
"initialRetryTime": process.env.kafka_initial_retry_time ? parseInt(process.env.kafka_initial_retry_time) : 3000,
"retries": process.env.kafka_retries ? parseInt(process.env.kafka_retries) : 5
Expand Down
10 changes: 10 additions & 0 deletions src/configs/RoutesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ export const routesConfig = {
api_id: "obsrv.config.dataset.preset",
method: "get",
path: "/obsrv/v1/datasets/preset"
},
publish:{
api_id: "obsrv.config.dataset.publish",
method: "get",
path: "/obsrv/v1/datasets/publish/:datasetId"
}
},
datasource: {
Expand Down Expand Up @@ -67,6 +72,11 @@ export const routesConfig = {
api_id: "obsrv.config.datasource.preset",
method: "get",
path: "/obsrv/v1/datasources/preset"
},
publish:{
api_id: "obsrv.config.datasource.publish",
method: "get",
path: "/obsrv/v1/datasources/publish/:datasourceId"
}
}
},
Expand Down
5 changes: 3 additions & 2 deletions src/connectors/DbConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ export class DbConnector implements IConnector {
})
}

private readRecord(table: string, fields: any) {
private async readRecord(table: string, fields: any) {
const query = this.pool.from(table).select().where(fields.filters)
const { offset, limit } = fields
if (offset && limit) {
return query.offset(offset).limit(limit)
}
return query
const fetchedRecords = await query
return fetchedRecords.length > 0 ? fetchedRecords : (() => { throw new Error('No records found') })()
}

}
18 changes: 11 additions & 7 deletions src/connectors/KafkaConnector.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Kafka, Producer, KafkaConfig, CompressionTypes } from 'kafkajs'
import { Kafka, Producer, KafkaConfig, CompressionTypes, CompressionCodecs } from 'kafkajs'
import { IConnector } from "../models/IngestionModels"
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

export class KafkaConnector implements IConnector {
private kafka: Kafka;
Expand All @@ -11,17 +13,19 @@ export class KafkaConnector implements IConnector {
})
}
async connect() {
return await this.producer.connect()
return await this.producer.connect()
}
async execute(topic: string, config: any) {
// TODO: Handle acks (which should be 3 for durability) and compression types
return await this.producer.send({
topic: topic,
messages: [{
value: config.value
}],
compression: CompressionTypes.Snappy
})
messages: [
{
value: config.value,
},
],
compression: CompressionTypes.Snappy,
});
}
async close() {
return await this.producer.disconnect()
Expand Down
6 changes: 5 additions & 1 deletion src/helpers/Datasets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { SchemaMerger } from '../generators/SchemaMerger'
let schemaMerger = new SchemaMerger()
export class Datasets {
private id: string
private dataset_name: string
private validation_config: ValidationConfig
private extraction_config: ExtractionConfig
private dedup_config: DedupConfig
Expand All @@ -14,9 +15,11 @@ export class Datasets {
private status: string
private created_by: string
private updated_by: string
private published_date: Date

constructor(payload: any) {
this.id = payload.id
this.dataset_name = payload.dataset_name
this.validation_config = payload.validation_config
this.extraction_config = payload.extraction_config
this.dedup_config = payload.dedup_config
Expand All @@ -26,10 +29,11 @@ export class Datasets {
this.status = payload.status
this.created_by = payload.created_by
this.updated_by = payload.updated_by
this.published_date = payload.published_date
}

public getValues() {
return Object.assign(this.removeNullValues({ id: this.id, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.id, dataset_name: this.dataset_name, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date })
}

public setValues() {
Expand Down
4 changes: 3 additions & 1 deletion src/helpers/Datasources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class Datasources {
private status: string
private created_by: string
private updated_by: string
private published_date: Date

constructor(payload: any) {
this.dataset_id = payload.dataset_id
Expand All @@ -26,9 +27,10 @@ export class Datasources {
this.status = payload.status
this.created_by = payload.created_by
this.updated_by = payload.updated_by
this.published_date = payload.published_date
}
public getValues() {
return Object.assign(this.removeNullValues({ id: this.getDataSourceId(), dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by }), { "updated_date": new Date })
return Object.assign(this.removeNullValues({ id: this.getDataSourceId(), dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date })
}

public setValues() {
Expand Down
4 changes: 3 additions & 1 deletion src/resources/Constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
"DATASET_SAVED":"The dataset configuration has been saved successfully",
"DATASET_UPDATED":"The dataset configuration has been updated successfully",
"DATASOURCE_SAVED": "The datasource configuration has been saved successfully",
"DATASOURCE_UPDATED": "The datasource configuration has been updated successfully"
"DATASOURCE_UPDATED": "The datasource configuration has been updated successfully",
"DATASET_PUBLISHED": "The dataset configuration has been published successfully",
"DATASOURCE_PUBLISHED": "The datasource configuration has been published successfully"
}
}
2 changes: 1 addition & 1 deletion src/resources/schemas/DatasetConfigDefault.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"router_config": {
"topic": ""
},
"status": "ACTIVE",
"status": "DRAFT",
"created_by": "SYSTEM",
"updated_by": "SYSTEM"
}
6 changes: 6 additions & 0 deletions src/resources/schemas/DatasetSaveReq.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"id": {
"type": "string"
},
"dataset_name":{
"type": "string"
},
"extraction_config": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -89,6 +92,9 @@
},
"updated_by": {
"type": "string"
},
"published_date": {
"type": "string"
}
},
"required": [
Expand Down
14 changes: 4 additions & 10 deletions src/resources/schemas/DatasourceConfigDefault.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
{
"retention_period": {},
"archival_policy": {
"enabled": false
},
"purge_policy": {
"enabled": false
},
"backup_config": {
"enabled": false
},
"status": "ACTIVE",
"archival_policy": {},
"purge_policy": {},
"backup_config": {},
"status": "DRAFT",
"created_by": "SYSTEM",
"updated_by": "SYSTEM"
}
6 changes: 6 additions & 0 deletions src/resources/schemas/DatasourceSaveReq.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{
"type": "object",
"properties": {
"id": {
"type": "string"
},
"dataset_id": {
"type": "string"
},
Expand Down Expand Up @@ -30,6 +33,9 @@
},
"updated_by": {
"type": "string"
},
"published_date": {
"type": "string"
}
},
"required": [
Expand Down
14 changes: 9 additions & 5 deletions src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ const validationService = new ValidationService();

const queryService = new QueryService(new HTTPConnector(`${config.query_api.druid.host}:${config.query_api.druid.port}`));

export const kafkaConnector = new KafkaConnector(config.dataset_api.kafka.config)
export const kafkaConnector = new KafkaConnector(config.dataset_api.kafka.config);

export const dbConnector = new DbConnector(config.db_connector_config)
export const dbConnector = new DbConnector(config.db_connector_config);

export const datasourceService = new DataSourceService(dbConnector);
export const datasetService = new DatasetService(dbConnector);
export const ingestorService = new IngestorService(kafkaConnector)
export const ingestorService = new IngestorService(kafkaConnector);

const router = express.Router();

Expand All @@ -35,15 +35,19 @@ router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesC
/** Dataset APIs */
router.post(`${routesConfig.config.dataset.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.save.api_id), validationService.validateRequestBody, datasetService.save);
router.patch(`${routesConfig.config.dataset.update.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.update.api_id), validationService.validateRequestBody, datasetService.update);
router.get(`${routesConfig.config.dataset.preset.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.preset.api_id), datasetService.preset);
router.get(`${routesConfig.config.dataset.read.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.read.api_id), datasetService.read);
router.post(`${routesConfig.config.dataset.list.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.list.api_id), validationService.validateRequestBody, datasetService.list);
router.get(`${routesConfig.config.dataset.preset.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.preset.api_id), datasetService.preset)
router.get(`${routesConfig.config.dataset.publish.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.publish.api_id), datasetService.publish);


/** DataSource API(s) */
router.post(`${routesConfig.config.datasource.save.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.save.api_id), validationService.validateRequestBody, datasourceService.save);
router.patch(`${routesConfig.config.datasource.update.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.update.api_id), validationService.validateRequestBody, datasourceService.update);
router.get(`${routesConfig.config.datasource.preset.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.preset.api_id), datasourceService.preset);
router.get(`${routesConfig.config.datasource.read.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.read.api_id), datasourceService.read);
router.post(`${routesConfig.config.datasource.list.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.list.api_id), validationService.validateRequestBody, datasourceService.list);
router.get(`${routesConfig.config.datasource.preset.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.preset.api_id), datasourceService.preset)
router.get(`${routesConfig.config.datasource.publish.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.publish.api_id), datasourceService.publish);


export { router };
13 changes: 11 additions & 2 deletions src/services/DataSourceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import errorResponse from "http-errors"
import httpStatus from "http-status";
import { IConnector } from "../models/IngestionModels";
import { Datasources } from "../helpers/Datasources";
import _ from "lodash";

export class DataSourceService {
private connector: any;
Expand All @@ -31,8 +32,8 @@ export class DataSourceService {
}
public read = (req: Request, res: Response, next: NextFunction) => {
this.connector.execute("read", { "table": 'datasources', "fields": { "filters": { "id": req.params.datasourceId } } })
.then((data: any) => {
ResponseHandler.successResponse(req, res, { status: 200, data: data[0] })
.then((data: any[]) => {
ResponseHandler.successResponse(req, res, { status: 200, data: _.first(data) })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
Expand All @@ -55,4 +56,12 @@ export class DataSourceService {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
}
}
public publish = (req: Request, res: Response, next: NextFunction) => {
this.connector.execute("update", { "table": 'datasources', "fields": { "filters": { "id": req.params.datasourceId }, "values": { "status": "LIVE", "updated_date": new Date, "published_date": new Date } } })
.then(() => {
ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASOURCE_PUBLISHED, "dataset_id": req.body.id } })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
}
}
18 changes: 12 additions & 6 deletions src/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import _ from 'lodash'
import { Datasets } from "../helpers/Datasets";
import { IConnector } from "../models/IngestionModels";

const responseHandler = ResponseHandler;

export class DatasetService {
private dbConnector: IConnector;
constructor(dbConnector: IConnector) {
Expand All @@ -30,17 +28,17 @@ export class DatasetService {
}
public update = (req: Request, res: Response, next: NextFunction) => {
const dataset = new Datasets(req.body)
this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.body.id }, "values": dataset.setValues() } })
this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.body.id }, "values": dataset.getValues() } })
.then(() => {
ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASET_UPDATED, "dataset_id": req.body.id } })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
}
public read = (req: Request, res: Response, next: NextFunction) => {
this.dbConnector.execute("read", { "table": 'datasets', "fields": { "filters": {"id": req.params.datasetId} } })
.then((data: any) => {
ResponseHandler.successResponse(req, res, { status: 200, data: data[0] })
this.dbConnector.execute("read", { "table": 'datasets', "fields": { "filters": { "id": req.params.datasetId } } })
.then((data: any[]) => {
ResponseHandler.successResponse(req, res, { status: 200, data: _.first(data) })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
Expand All @@ -62,4 +60,12 @@ export class DatasetService {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
}
}
public publish = (req: Request, res: Response, next: NextFunction) => {
this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.params.datasetId }, "values": { "status": "LIVE", "updated_date": new Date, "published_date": new Date } } })
.then(() => {
ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASET_PUBLISHED, "dataset_id": req.body.id } })
}).catch((error: any) => {
next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message))
});
}
}
4 changes: 2 additions & 2 deletions src/services/IngestorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ export class IngestorService {
})
}
private getDatasetId(req: Request) {
let datasetId = req.params.datasetId
let datasetId = req.params.datasetId.trim()
if (!_.isEmpty(datasetId)) return datasetId
throw new Error("dataset parameter in url cannot be empty")
throw new Error("datasetId parameter in url cannot be empty")
}
}
3 changes: 2 additions & 1 deletion src/services/ValidationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class ValidationService {
status.isValid ?
next()
: next(errorResponse((httpStatus.BAD_REQUEST, status?.message || "")))
}
};


}

0 comments on commit 8eb49b0

Please sign in to comment.