Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.0.1-GA to Main (#83) #84

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

538 changes: 538 additions & 0 deletions api-service/redoc-static.html

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions api-service/src/configs/RoutesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ export const routesConfig = {
path: "/obsrv/v1/data/create/:datasetId",
validation_schema: "DataIngestionReq.json",
},
tenant_ingest: {
api_id: "dataset.data.in",
method: "post",
path: "/data/tenant/in/:datasetId",
validation_schema: "DataIngestionReq.json",
},
exhaust: {
api_id: "obsrv.dataset.data.exhaust",
method: "get",
Expand Down
21 changes: 21 additions & 0 deletions api-service/src/helpers/Datasets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { defaultConfig } from '../resources/schemas/DatasetConfigDefault'
import { SchemaMerger } from '../generators/SchemaMerger'
import { config } from '../configs/Config'
import { DatasetStatus } from '../models/DatasetModels'
import constants from "../resources/Constants.json";

let schemaMerger = new SchemaMerger()
export class Datasets {
private id: string
Expand Down Expand Up @@ -47,10 +49,12 @@ export class Datasets {
}

public getValues() {
this.validateDenormConfig();
return Object.assign(this.removeNullValues({ id: this.id, dataset_id: this.dataset_id, type: this.type, name: this.name, validation_config: this.validation_config, extraction_config: this.extraction_config, dedup_config: this.dedup_config, data_schema: this.data_schema, router_config: this.router_config, denorm_config: this.denorm_config, dataset_config: this.dataset_config, tags: this.tags, status: this.status, created_by: this.created_by, updated_by: this.updated_by, published_date: this.published_date }), { "updated_date": new Date })
}

public setValues() {
this.validateDenormConfig();
return schemaMerger.mergeSchema(this.getDefaults(), this.getValues())
}

Expand All @@ -69,4 +73,21 @@ export class Datasets {
return {...defaultConfig.dataset}
}
}

private validateDenormConfig() {
if (this.denorm_config && _.has(this.denorm_config, 'denorm_fields')) {
let duplicatesExist = false;
let denormFields: any = _.get(this.denorm_config, 'denorm_fields', []);
denormFields = _.map(denormFields, (denormField: Record<string, string | number>) => _.get(denormField, 'denorm_out_field'));
denormFields.map(
(denormField: string | number) => {
if(_.indexOf(denormFields, denormField) !== _.lastIndexOf(denormFields, denormField))
duplicatesExist = true;
}
);
if(duplicatesExist) {
throw constants.DUPLICATE_DENORM_FIELD;
}
}
}
}
5 changes: 5 additions & 0 deletions api-service/src/resources/Constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
"status": 404,
"code": "NOT_FOUND"
},
"DUPLICATE_DENORM_FIELD": {
"message": "Duplicate found for denorm output key",
"status": 400,
"code": "BAD_REQUEST"
},
"INGESTION_SUBMITTED": "ingestion spec has been submitted successfully",
"INGESTION_FAILED_ON_SAVE": "Failed to submit Ingestion Spec, record is not saved"
}
1 change: 1 addition & 0 deletions api-service/src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ router.post([`${routesConfig.query.sql_query.path}`, `${routesConfig.query.sql_q

/** Ingestor API */
router.post(`${routesConfig.data_ingest.path}`, ResponseHandler.setApiId(routesConfig.data_ingest.api_id), telemetryAuditStart({ action: telemetryActions.ingestEvents, operationType: OperationType.CREATE }), onRequest({ entity: promEntities.data_in }), validationService.validateRequestBody, ingestorService.create);
router.post(`${routesConfig.tenant_ingest.path}`, ResponseHandler.setApiId(routesConfig.tenant_ingest.api_id), telemetryAuditStart({ action: telemetryActions.ingestEvents, operationType: OperationType.CREATE }), onRequest({ entity: promEntities.data_in }), validationService.validateRequestBody, ingestorService.tenant);

/** Dataset APIs */
router.post(`${routesConfig.config.dataset.save.path}`, ResponseHandler.setApiId(routesConfig.config.dataset.save.api_id), telemetryAuditStart({ action: telemetryActions.createDataset, operationType: OperationType.CREATE }), validationService.validateRequestBody, datasetService.save);
Expand Down
14 changes: 14 additions & 0 deletions api-service/src/services/IngestorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ export class IngestorService {
ResponseHandler.successResponse(req, res, { status: 200, data: { message: constants.DATASET.CREATED } });
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) }
}

public tenant = async (req: Request, res: Response, next: NextFunction) => {
try {
let datasetId = this.getDatasetId(req);
const tenantId = _.get(req.headers, 'x-tenant-id', "default");
datasetId = `${tenantId}-${datasetId}`;
const validData = await this.validateData(req.body.data, datasetId);
req.body = { ...req.body.data, dataset: datasetId };
const topic = await this.getTopic(datasetId);
await this.kafkaConnector.execute(req, res, topic);
ResponseHandler.successResponse(req, res, { status: 200, data: { message: constants.DATASET.CREATED } });
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false) }
}

public submitIngestion = async (req: Request, res: Response, next: NextFunction) => {
try {
await wrapperService.submitIngestion(req.body)
Expand Down
2 changes: 1 addition & 1 deletion api-service/src/services/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export enum OperationType { CREATE = 1, UPDATE, PUBLISH, RETIRE, LIST, GET }

const kafka = new Kafka({ clientId: telemetryTopic, brokers: brokerServers });
const telemetryEventsProducer = kafka.producer();
telemetryEventsProducer.connect();
telemetryEventsProducer.connect().catch(err => console.error("Unable to connect to kafka", err.message));

const getDefaults = () => {
return {
Expand Down
42 changes: 40 additions & 2 deletions api-service/src/test/DatasetTestService.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe("Dataset create API", () => {
res.body.params.status.should.be.eq(constants.STATUS.FAILURE)
done();
});
})
});
it("should not insert record when given invalid schema", (done) => {
chai
.request(app)
Expand All @@ -123,7 +123,45 @@ describe("Dataset create API", () => {
res.body.params.status.should.be.eq(constants.STATUS.FAILURE)
done();
});
})
});
it("should not insert the record when there's a duplicate denorm out field", (done) => {
chai.spy.on(dbConnector, "execute", () => {
return Promise.resolve([])
})
chai
.request(app)
.post(config.apiDatasetSaveEndPoint)
.send(TestDataset.DUPLICATE_DENORM_OUT_FIELD)
.end((err, res) => {
res.should.have.status(httpStatus.BAD_REQUEST);
res.body.should.be.a("object")
res.body.responseCode.should.be.eq(httpStatus["400_NAME"]);
res.body.should.have.property("result");
res.body.id.should.be.eq(routesConfig.config.dataset.save.api_id);
res.body.params.status.should.be.eq(constants.STATUS.FAILURE);
chai.spy.restore(dbConnector, "execute");
done();
});
});
it("should insert the record when there's no duplicate denorm out field", (done) => {
chai.spy.on(dbConnector, "execute", () => {
return Promise.resolve([])
})
chai
.request(app)
.post(config.apiDatasetSaveEndPoint)
.send(TestDataset.VALID_DENORM_OUT_FIELD)
.end((err, res) => {
res.should.have.status(httpStatus.OK);
res.body.should.be.a("object")
res.body.responseCode.should.be.eq(httpStatus["200_NAME"]);
res.body.should.have.property("result");
res.body.id.should.be.eq(routesConfig.config.dataset.save.api_id);
res.body.params.status.should.be.eq(constants.STATUS.SUCCESS);
chai.spy.restore(dbConnector, "execute");
done();
});
});
})
describe("Dataset update API", () => {
beforeEach(() => {
Expand Down
4 changes: 3 additions & 1 deletion api-service/src/test/Fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class TestDataset {
public static VALID_LIST_REQUEST_ACTIVE_STATUS = { "filters": { "status": [ DatasetStatus.Live ] } };
public static VALID_LIST_REQUEST_DISABLED_STATUS = { "filters": { "status": [ DatasetStatus.Retired ] } };
public static MISSING_REQUIRED_FIELDS_UPDATE = { "name": "telemetry-raw", "data_schema": { "type": "object", "properties": { "eid": { "type": "string" }, "ver": { "type": "string" }, "syncts": { "type": "integer" }, "ets": { "type": "integer" }, "mid": { "type": "string" }, "actor": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] }, "edata": { "type": "object", "properties": { "type": { "type": "string" } }, "required": [ "type" ] }, "@timestamp": { "type": "string" }, "context": { "type": "object", "properties": { "pdata": { "type": "object", "properties": { "ver": { "type": "string" }, "id": { "type": "string" }, "pid": { "type": "string" } }, "required": [ "ver", "id", "pid" ] }, "did": { "type": "string" }, "env": { "type": "string" }, "channel": { "type": "string" } }, "required": [ "pdata", "did", "env", "channel" ] }, "@version": { "type": "string" }, "object": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] } }, "required": [ "eid", "ver", "syncts", "ets", "mid", "actor", "edata", "@timestamp", "context", "@version", "object" ] }, "router_config": { "topic": "router.topic" }, "status": DatasetStatus.Live, "published_date": "2023-03-14T04:46:33.459Z" };
public static VALID_RECORD = { "type": "master-dataset", "dataset_id": "3f8b2ba7-9c74-4d7f-8b38-2b0d460b999c", "id": "observations", "name": "telemetry-raw", " validation_config": { "validate": true, "mode": "Strict" }, "extraction_config": { "is_batch_event": false, "extraction_key": "", "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 } }, "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 }, "data_schema": { "type": "object", "properties": { "eid": { "type": "string" }, "ver": { "type": "string" }, "syncts": { "type": "integer" }, "ets": { "type": "integer" }, "mid": { "type": "string" }, "actor": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] }, "edata": { "type": "object", "properties": { "type": { "type": "string" } }, "required": [ "type" ] }, "@timestamp": { "type": "string" }, "context": { "type": "object", "properties": { "pdata": { "type": "object", "properties": { "ver": { "type": "string" }, "id": { "type": "string" }, "pid": { "type": "string" } }, "required": [ "ver", "id", "pid" ] }, "did": { "type": "string" }, "env": { "type": "string" }, "channel": { "type": "string" } }, "required": [ "pdata", "did", "env", "channel" ] }, "@version": { "type": "string" }, "object": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] } }, "required": [ "eid", "ver", "syncts", "ets", "mid", "actor", "edata", "@timestamp", "context", "@version", "object" ] }, "denorm_config": { "redis_db_host": "redis_host", "redis_db_port": "redis_port", "denorm_fields": { "denorm_key": "", "redis_db": 1, "denorm_out_field": "metadata" } }, "tags": [], "router_config": { "topic": "router.topic" }, "client_state": {}, "status": DatasetStatus.Live, "created_by": "SYSTEM", "updated_by": "SYSTEM", "created_date": "2023-03-13T07:46:06.410Z", "updated_date": "2023-03-14T04:46:33.459Z", "published_date": "2023-03-14T04:46:33.459Z" }
public static VALID_RECORD = { "type": "master-dataset", "dataset_id": "3f8b2ba7-9c74-4d7f-8b38-2b0d460b999c", "id": "observations", "name": "telemetry-raw", " validation_config": { "validate": true, "mode": "Strict" }, "extraction_config": { "is_batch_event": false, "extraction_key": "", "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 } }, "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 }, "data_schema": { "type": "object", "properties": { "eid": { "type": "string" }, "ver": { "type": "string" }, "syncts": { "type": "integer" }, "ets": { "type": "integer" }, "mid": { "type": "string" }, "actor": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] }, "edata": { "type": "object", "properties": { "type": { "type": "string" } }, "required": [ "type" ] }, "@timestamp": { "type": "string" }, "context": { "type": "object", "properties": { "pdata": { "type": "object", "properties": { "ver": { "type": "string" }, "id": { "type": "string" }, "pid": { "type": "string" } }, "required": [ "ver", "id", "pid" ] }, "did": { "type": "string" }, "env": { "type": "string" }, "channel": { "type": "string" } }, "required": [ "pdata", "did", "env", "channel" ] }, "@version": { "type": "string" }, "object": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] } }, "required": [ "eid", "ver", "syncts", "ets", "mid", "actor", "edata", "@timestamp", "context", "@version", "object" ] }, "denorm_config": { "redis_db_host": "redis_host", "redis_db_port": "redis_port", "denorm_fields": [{ "denorm_key": "", "redis_db": 1, "denorm_out_field": "metadata" }]}, "tags": [], "router_config": { "topic": "router.topic" }, "client_state": {}, "status": DatasetStatus.Live, "created_by": "SYSTEM", "updated_by": "SYSTEM", "created_date": "2023-03-13T07:46:06.410Z", "updated_date": "2023-03-14T04:46:33.459Z", "published_date": "2023-03-14T04:46:33.459Z" };
public static DUPLICATE_DENORM_OUT_FIELD = { "type": "master-dataset", "dataset_id": "3f8b2ba7-9c74-4d7f-8b38-2b0d460b999c", "id": "observations", "name": "telemetry-raw", " validation_config": { "validate": true, "mode": "Strict" }, "extraction_config": { "is_batch_event": false, "extraction_key": "", "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 } }, "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 }, "data_schema": { "type": "object", "properties": { "eid": { "type": "string" }, "ver": { "type": "string" }, "syncts": { "type": "integer" }, "ets": { "type": "integer" }, "mid": { "type": "string" }, "actor": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] }, "edata": { "type": "object", "properties": { "type": { "type": "string" } }, "required": [ "type" ] }, "@timestamp": { "type": "string" }, "context": { "type": "object", "properties": { "pdata": { "type": "object", "properties": { "ver": { "type": "string" }, "id": { "type": "string" }, "pid": { "type": "string" } }, "required": [ "ver", "id", "pid" ] }, "did": { "type": "string" }, "env": { "type": "string" }, "channel": { "type": "string" } }, "required": [ "pdata", "did", "env", "channel" ] }, "@version": { "type": "string" }, "object": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] } }, "required": [ "eid", "ver", "syncts", "ets", "mid", "actor", "edata", "@timestamp", "context", "@version", "object" ] }, "denorm_config": { "redis_db_host": "redis_host", "redis_db_port": "redis_port", "denorm_fields": [{ "denorm_key": "test", "redis_db": 1, "denorm_out_field": "metadata" }, { "denorm_key": "test", "redis_db": 1, "denorm_out_field": "metadata" }]}, "tags": [], "router_config": { "topic": "router.topic" }, "client_state": {}, "status": DatasetStatus.Live, "created_by": "SYSTEM", "updated_by": "SYSTEM", "created_date": "2023-03-13T07:46:06.410Z", "updated_date": "2023-03-14T04:46:33.459Z", "published_date": "2023-03-14T04:46:33.459Z" };
public static VALID_DENORM_OUT_FIELD = { "type": "master-dataset", "dataset_id": "3f8b2ba7-9c74-4d7f-8b38-2b0d460b91ac", "id": "observctions", "name": "telemetay-raw", " validation_config": { "validate": true, "mode": "Strict" }, "extraction_config": { "is_batch_event": false, "extraction_key": "", "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 } }, "dedup_config": { "drop_duplicates": true, "dedup_key": "id", "dedup_period": 3 }, "data_schema": { "type": "object", "properties": { "eid": { "type": "string" }, "ver": { "type": "string" }, "syncts": { "type": "integer" }, "ets": { "type": "integer" }, "mid": { "type": "string" }, "actor": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] }, "edata": { "type": "object", "properties": { "type": { "type": "string" } }, "required": [ "type" ] }, "@timestamp": { "type": "string" }, "context": { "type": "object", "properties": { "pdata": { "type": "object", "properties": { "ver": { "type": "string" }, "id": { "type": "string" }, "pid": { "type": "string" } }, "required": [ "ver", "id", "pid" ] }, "did": { "type": "string" }, "env": { "type": "string" }, "channel": { "type": "string" } }, "required": [ "pdata", "did", "env", "channel" ] }, "@version": { "type": "string" }, "object": { "type": "object", "properties": { "id": { "type": "string" }, "type": { "type": "string" } }, "required": [ "id", "type" ] } }, "required": [ "eid", "ver", "syncts", "ets", "mid", "actor", "edata", "@timestamp", "context", "@version", "object" ] }, "denorm_config": { "redis_db_host": "redis_host", "redis_db_port": "redis_port", "denorm_fields": [{ "denorm_key": "test", "redis_db": 1, "denorm_out_field": "metadata2" }, { "denorm_key": "test", "redis_db": 1, "denorm_out_field": "metadata" }]}, "tags": [], "router_config": { "topic": "router.topic" }, "client_state": {}, "status": DatasetStatus.Live, "created_by": "SYSTEM", "updated_by": "SYSTEM", "created_date": "2023-03-13T07:46:06.410Z", "updated_date": "2023-03-14T04:46:33.459Z", "published_date": "2023-03-14T04:46:33.459Z" };
}

class TestDataSource {
Expand Down
1 change: 1 addition & 0 deletions api-service/src/validators/RequestsValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export class RequestsValidator implements IValidator {
routesConfig.query.native_query,
routesConfig.query.sql_query,
routesConfig.data_ingest,
routesConfig.tenant_ingest,
routesConfig.config.dataset.save,
routesConfig.config.datasource.save,
routesConfig.config.dataset.list,
Expand Down
6 changes: 3 additions & 3 deletions command-service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ COPY --from=ubuntu /usr/local/bin /usr/local/bin

RUN apk update && apk add curl jq && curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" && chmod +x kubectl && mv kubectl /usr/local/bin/

RUN apk add libcrypto3=3.1.4-r0
RUN apk add libcrypto3=3.1.4-r3
RUN apk upgrade
WORKDIR /app
COPY command-service/requirements.txt .
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY command-service/src ./src
COPY src ./src
WORKDIR /app/src
CMD [ "uvicorn", "routes:app", "--host", "0.0.0.0" ]