diff --git a/package.json b/package.json index 67806639..b3481c6e 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "http-status": "^1.5.3", "json-merger": "^1.1.9", "kafkajs": "^2.2.3", + "kafkajs-snappy": "^1.1.0", "knex": "^2.4.2", "lodash": "^4.17.21", "moment": "^2.29.4", diff --git a/src/configs/Config.ts b/src/configs/Config.ts index 327d839d..e245a8f4 100644 --- a/src/configs/Config.ts +++ b/src/configs/Config.ts @@ -12,7 +12,7 @@ export const config = { "kafka": { "config": { "brokers": [`${process.env.kafka_host || 'localhost'}:${process.env.kafka_port || 9092}`], - "clientId": process.env.clientId || "obsrv-apis", + "clientId": process.env.client_id || "obsrv-apis", "retry": { "initialRetryTime": process.env.kafka_initial_retry_time ? parseInt(process.env.kafka_initial_retry_time) : 3000, "retries": process.env.kafka_retries ? parseInt(process.env.kafka_retries) : 5 diff --git a/src/configs/RoutesConfig.ts b/src/configs/RoutesConfig.ts index 6be6dd45..1635bada 100644 --- a/src/configs/RoutesConfig.ts +++ b/src/configs/RoutesConfig.ts @@ -40,6 +40,11 @@ export const routesConfig = { api_id: "obsrv.config.dataset.preset", method: "get", path: "/obsrv/v1/datasets/preset" + }, + publish:{ + api_id: "obsrv.config.dataset.publish", + method: "get", + path: "/obsrv/v1/datasets/publish/:datasetId" } }, datasource: { @@ -67,6 +72,11 @@ export const routesConfig = { api_id: "obsrv.config.datasource.preset", method: "get", path: "/obsrv/v1/datasources/preset" + }, + publish:{ + api_id: "obsrv.config.datasource.publish", + method: "get", + path: "/obsrv/v1/datasources/publish/:datasourceId" } } }, diff --git a/src/connectors/DbConnector.ts b/src/connectors/DbConnector.ts index fec054a7..4b0b1efe 100644 --- a/src/connectors/DbConnector.ts +++ b/src/connectors/DbConnector.ts @@ -47,13 +47,14 @@ export class DbConnector implements IConnector { }) } - private readRecord(table: string, fields: any) { + private async readRecord(table: string, fields: any) { const query = this.pool.from(table).select().where(fields.filters) const { offset, limit } = fields if (offset && limit) { return query.offset(offset).limit(limit) } - return query + const fetchedRecords = await query + return fetchedRecords.length > 0 ? fetchedRecords : (() => { throw new Error('No records found') })() } } \ No newline at end of file diff --git a/src/connectors/KafkaConnector.ts b/src/connectors/KafkaConnector.ts index be7af1a4..995b42dd 100644 --- a/src/connectors/KafkaConnector.ts +++ b/src/connectors/KafkaConnector.ts @@ -1,5 +1,7 @@ -import { Kafka, Producer, KafkaConfig, CompressionTypes } from 'kafkajs' +import { Kafka, Producer, KafkaConfig, CompressionTypes, CompressionCodecs } from 'kafkajs' import { IConnector } from "../models/IngestionModels" +const SnappyCodec = require('kafkajs-snappy') +CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec export class KafkaConnector implements IConnector { private kafka: Kafka; @@ -11,17 +13,19 @@ export class KafkaConnector implements IConnector { }) } async connect() { - return await this.producer.connect() + return await this.producer.connect() } async execute(topic: string, config: any) { // TODO: Handle acks (which should be 3 for durability) and compression types return await this.producer.send({ topic: topic, - messages: [{ - value: config.value - }], - compression: CompressionTypes.Snappy - }) + messages: [ + { + value: config.value, + }, + ], + compression: CompressionTypes.Snappy, + }); } async close() { return await this.producer.disconnect() diff --git a/src/helpers/Datasets.ts b/src/helpers/Datasets.ts index 9bee77b9..f45ecbab 100644 --- a/src/helpers/Datasets.ts +++ b/src/helpers/Datasets.ts @@ -5,6 +5,7 @@ import { SchemaMerger } from '../generators/SchemaMerger' let schemaMerger = new SchemaMerger() export class Datasets { private id: string + private dataset_name: string private validation_config: ValidationConfig private extraction_config: ExtractionConfig private dedup_config: DedupConfig @@ -14,9 +15,11 @@ export class Datasets { private status: string private created_by: string private updated_by: string + private published_date: Date constructor(payload: any) { this.id = payload.id + this.dataset_name = payload.dataset_name this.validation_config = payload.validation_config this.extraction_config = payload.extraction_config this.dedup_config = payload.dedup_config @@ -26,10 +29,11 @@ export class Datasets { this.status = payload.status this.created_by = payload.created_by this.updated_by = payload.updated_by + this.published_date = payload.published_date } public getValues() { - return Object.assign(this.removeNullValues({ id: this.id, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by }), { "updated_date": new Date }) + return Object.assign(this.removeNullValues({ id: this.id, dataset_name: this.dataset_name, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date }) } public setValues() { diff --git a/src/helpers/Datasources.ts b/src/helpers/Datasources.ts index 64a9bbbb..3c341d91 100644 --- a/src/helpers/Datasources.ts +++ b/src/helpers/Datasources.ts @@ -14,6 +14,7 @@ export class Datasources { private status: string private created_by: string private updated_by: string + private published_date: Date constructor(payload: any) { this.dataset_id = payload.dataset_id @@ -26,9 +27,10 @@ export class Datasources { this.status = payload.status this.created_by = payload.created_by this.updated_by = payload.updated_by + this.published_date = payload.published_date } public getValues() { - return Object.assign(this.removeNullValues({ id: this.getDataSourceId(), dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by }), { "updated_date": new Date }) + return Object.assign(this.removeNullValues({ id: this.getDataSourceId(), dataset_id: this.dataset_id, ingestion_spec: this.ingestion_spec, datasource: this.datasource, retention_period: this.retention_period, archival_policy: this.archival_policy, purge_policy: this.purge_policy, backup_config: this.backup_config, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date }) } public setValues() { diff --git a/src/resources/Constants.json b/src/resources/Constants.json index 6b18b665..124743b3 100644 --- a/src/resources/Constants.json +++ b/src/resources/Constants.json @@ -16,6 +16,8 @@ "DATASET_SAVED":"The dataset configuration has been saved successfully", "DATASET_UPDATED":"The dataset configuration has been updated successfully", "DATASOURCE_SAVED": "The datasource configuration has been saved successfully", - "DATASOURCE_UPDATED": "The datasource configuration has been updated successfully" + "DATASOURCE_UPDATED": "The datasource configuration has been updated successfully", + "DATASET_PUBLISHED": "The dataset configuration has been published successfully", + "DATASOURCE_PUBLISHED": "The datasource configuration has been published successfully" } } \ No newline at end of file diff --git a/src/resources/schemas/DatasetConfigDefault.json b/src/resources/schemas/DatasetConfigDefault.json index 930bfed8..005ca866 100644 --- a/src/resources/schemas/DatasetConfigDefault.json +++ b/src/resources/schemas/DatasetConfigDefault.json @@ -29,7 +29,7 @@ "router_config": { "topic": "" }, - "status": "ACTIVE", + "status": "DRAFT", "created_by": "SYSTEM", "updated_by": "SYSTEM" } \ No newline at end of file diff --git a/src/resources/schemas/DatasetSaveReq.json b/src/resources/schemas/DatasetSaveReq.json index e807850d..56a9d23f 100644 --- a/src/resources/schemas/DatasetSaveReq.json +++ b/src/resources/schemas/DatasetSaveReq.json @@ -4,6 +4,9 @@ "id": { "type": "string" }, + "dataset_name":{ + "type": "string" + }, "extraction_config": { "type": "object", "properties": { @@ -89,6 +92,9 @@ }, "updated_by": { "type": "string" + }, + "published_date": { + "type": "string" } }, "required": [ diff --git a/src/resources/schemas/DatasourceConfigDefault.json b/src/resources/schemas/DatasourceConfigDefault.json index 3ba955ef..09e2fb80 100644 --- a/src/resources/schemas/DatasourceConfigDefault.json +++ b/src/resources/schemas/DatasourceConfigDefault.json @@ -1,15 +1,9 @@ { "retention_period": {}, - "archival_policy": { - "enabled": false - }, - "purge_policy": { - "enabled": false - }, - "backup_config": { - "enabled": false - }, - "status": "ACTIVE", + "archival_policy": {}, + "purge_policy": {}, + "backup_config": {}, + "status": "DRAFT", "created_by": "SYSTEM", "updated_by": "SYSTEM" } \ No newline at end of file diff --git a/src/resources/schemas/DatasourceSaveReq.json b/src/resources/schemas/DatasourceSaveReq.json index 45a014ea..b318f165 100644 --- a/src/resources/schemas/DatasourceSaveReq.json +++ b/src/resources/schemas/DatasourceSaveReq.json @@ -1,6 +1,9 @@ { "type": "object", "properties": { + "id": { + "type": "string" + }, "dataset_id": { "type": "string" }, @@ -30,6 +33,9 @@ }, "updated_by": { "type": "string" + }, + "published_date": { + "type": "string" } }, "required": [ diff --git a/src/routes/Router.ts b/src/routes/Router.ts index e778f3e3..ea705dc2 100644 --- a/src/routes/Router.ts +++ b/src/routes/Router.ts @@ -15,13 +15,13 @@ const validationService = new ValidationService(); const queryService = new QueryService(new HTTPConnector(`${config.query_api.druid.host}:${config.query_api.druid.port}`)); -export const kafkaConnector = new KafkaConnector(config.dataset_api.kafka.config) +export const kafkaConnector = new KafkaConnector(config.dataset_api.kafka.config); -export const dbConnector = new DbConnector(config.db_connector_config) +export const dbConnector = new DbConnector(config.db_connector_config); export const datasourceService = new DataSourceService(dbConnector); export const datasetService = new DatasetService(dbConnector); -export const ingestorService = new IngestorService(kafkaConnector) +export const ingestorService = new IngestorService(kafkaConnector); const router = express.Router(); @@ -35,15 +35,19 @@ router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesC /** Dataset APIs */ router.post(`${routesConfig.config.dataset.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.save.api_id), validationService.validateRequestBody, datasetService.save); router.patch(`${routesConfig.config.dataset.update.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.update.api_id), validationService.validateRequestBody, datasetService.update); +router.get(`${routesConfig.config.dataset.preset.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.preset.api_id), datasetService.preset); router.get(`${routesConfig.config.dataset.read.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.read.api_id), datasetService.read); router.post(`${routesConfig.config.dataset.list.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.list.api_id), validationService.validateRequestBody, datasetService.list); -router.get(`${routesConfig.config.dataset.preset.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.preset.api_id), datasetService.preset) +router.get(`${routesConfig.config.dataset.publish.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.publish.api_id), datasetService.publish); + /** DataSource API(s) */ router.post(`${routesConfig.config.datasource.save.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.save.api_id), validationService.validateRequestBody, datasourceService.save); router.patch(`${routesConfig.config.datasource.update.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.update.api_id), validationService.validateRequestBody, datasourceService.update); +router.get(`${routesConfig.config.datasource.preset.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.preset.api_id), datasourceService.preset); router.get(`${routesConfig.config.datasource.read.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.read.api_id), datasourceService.read); router.post(`${routesConfig.config.datasource.list.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.list.api_id), validationService.validateRequestBody, datasourceService.list); -router.get(`${routesConfig.config.datasource.preset.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.preset.api_id), datasourceService.preset) +router.get(`${routesConfig.config.datasource.publish.path}`, ResponseHandler.setApiId(routesConfig.config.datasource.publish.api_id), datasourceService.publish); + export { router }; \ No newline at end of file diff --git a/src/services/DataSourceService.ts b/src/services/DataSourceService.ts index 29062915..d7b1940e 100644 --- a/src/services/DataSourceService.ts +++ b/src/services/DataSourceService.ts @@ -5,6 +5,7 @@ import errorResponse from "http-errors" import httpStatus from "http-status"; import { IConnector } from "../models/IngestionModels"; import { Datasources } from "../helpers/Datasources"; +import _ from "lodash"; export class DataSourceService { private connector: any; @@ -31,8 +32,8 @@ export class DataSourceService { } public read = (req: Request, res: Response, next: NextFunction) => { this.connector.execute("read", { "table": 'datasources', "fields": { "filters": { "id": req.params.datasourceId } } }) - .then((data: any) => { - ResponseHandler.successResponse(req, res, { status: 200, data: data[0] }) + .then((data: any[]) => { + ResponseHandler.successResponse(req, res, { status: 200, data: _.first(data) }) }).catch((error: any) => { next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)) }); @@ -55,4 +56,12 @@ export class DataSourceService { next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)) } } + public publish = (req: Request, res: Response, next: NextFunction) => { + this.connector.execute("update", { "table": 'datasources', "fields": { "filters": { "id": req.params.datasourceId }, "values": { "status": "LIVE", "updated_date": new Date, "published_date": new Date } } }) + .then(() => { + ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASOURCE_PUBLISHED, "dataset_id": req.body.id } }) + }).catch((error: any) => { + next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)) + }); + } } \ No newline at end of file diff --git a/src/services/DatasetService.ts b/src/services/DatasetService.ts index cb2ee8d6..73c95ce7 100644 --- a/src/services/DatasetService.ts +++ b/src/services/DatasetService.ts @@ -7,8 +7,6 @@ import _ from 'lodash' import { Datasets } from "../helpers/Datasets"; import { IConnector } from "../models/IngestionModels"; -const responseHandler = ResponseHandler; - export class DatasetService { private dbConnector: IConnector; constructor(dbConnector: IConnector) { @@ -30,7 +28,7 @@ export class DatasetService { } public update = (req: Request, res: Response, next: NextFunction) => { const dataset = new Datasets(req.body) - this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.body.id }, "values": dataset.setValues() } }) + this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.body.id }, "values": dataset.getValues() } }) .then(() => { ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASET_UPDATED, "dataset_id": req.body.id } }) }).catch((error: any) => { @@ -38,9 +36,9 @@ export class DatasetService { }); } public read = (req: Request, res: Response, next: NextFunction) => { - this.dbConnector.execute("read", { "table": 'datasets', "fields": { "filters": {"id": req.params.datasetId} } }) - .then((data: any) => { - ResponseHandler.successResponse(req, res, { status: 200, data: data[0] }) + this.dbConnector.execute("read", { "table": 'datasets', "fields": { "filters": { "id": req.params.datasetId } } }) + .then((data: any[]) => { + ResponseHandler.successResponse(req, res, { status: 200, data: _.first(data) }) }).catch((error: any) => { next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)) }); @@ -62,4 +60,12 @@ export class DatasetService { next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)) } } + public publish = (req: Request, res: Response, next: NextFunction) => { + this.dbConnector.execute("update", { "table": 'datasets', "fields": { "filters": { "id": req.params.datasetId }, "values": { "status": "LIVE", "updated_date": new Date, "published_date": new Date } } }) + .then(() => { + ResponseHandler.successResponse(req, res, { status: 200, data: { "message": constants.CONFIG.DATASET_PUBLISHED, "dataset_id": req.body.id } }) + }).catch((error: any) => { + next(errorResponse(httpStatus.INTERNAL_SERVER_ERROR, error.message)) + }); + } } \ No newline at end of file diff --git a/src/services/IngestorService.ts b/src/services/IngestorService.ts index eddec36d..80b52007 100644 --- a/src/services/IngestorService.ts +++ b/src/services/IngestorService.ts @@ -39,8 +39,8 @@ export class IngestorService { }) } private getDatasetId(req: Request) { - let datasetId = req.params.datasetId + let datasetId = req.params.datasetId.trim() if (!_.isEmpty(datasetId)) return datasetId - throw new Error("dataset parameter in url cannot be empty") + throw new Error("datasetId parameter in url cannot be empty") } } \ No newline at end of file diff --git a/src/services/ValidationService.ts b/src/services/ValidationService.ts index 7c31ea48..967c0afe 100644 --- a/src/services/ValidationService.ts +++ b/src/services/ValidationService.ts @@ -27,6 +27,7 @@ export class ValidationService { status.isValid ? next() : next(errorResponse((httpStatus.BAD_REQUEST, status?.message || ""))) - } + }; + }