Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#OBS-I77 : query api changes to check datasource availability v2 #245

Merged
merged 8 commits into from
Sep 17, 2024
35 changes: 27 additions & 8 deletions api-service/src/controllers/DataOut/QueryValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import * as _ from "lodash";
import moment from "moment";
import { getDatasourceList } from "../../services/DatasourceService";
import logger from "../../logger";
import { getDatasourceListFromDruid } from "../../connections/druidConnection";
import { druidHttpService, getDatasourceListFromDruid } from "../../connections/druidConnection";
import { apiId } from "./DataOutController";
import { ErrorObject } from "../../types/ResponseModel";
import { Parser } from "node-sql-parser";
import { obsrvError } from "../../types/ObsrvError";
const parser = new Parser();

const momentFormat = "YYYY-MM-DD HH:MM:SS";
let dataset_id: string;
let requestBody: any;
let msgid: string;
const errCode = {
notFound: "DATA_OUT_SOURCE_NOT_FOUND",
notFound: "DATASOURCE_NOT_FOUND",
invalidDateRange: "DATA_OUT_INVALID_DATE_RANGE"
}

Expand Down Expand Up @@ -155,23 +156,41 @@ const validateQueryRules = (queryPayload: any, limits: any) => {
: { message: "Invalid date range! the date range cannot be a null value", statusCode: 400, errCode: "BAD_REQUEST", code: errCode.invalidDateRange };
};

const getDataSourceRef = async (datasetId: string, granularity?: string) => {
const getDataSourceRef = async (datasetId: string, requestGranularity?: string) => {
const dataSources = await getDatasourceList(datasetId)
if (_.isEmpty(dataSources)) {
logger.error({ apiId, requestBody, msgid, dataset_id, message: `Datasource ${datasetId} not available in datasource live table`, code: errCode.notFound })
throw { message: `Datasource ${datasetId} not available for querying`, statusCode: 404, errCode: "NOT_FOUND", code: errCode.notFound } as ErrorObject;
}
const record = dataSources.filter((record: any) => {
const aggregatedRecord = _.get(record, "dataValues.metadata.aggregated")
if (granularity)
return aggregatedRecord && _.get(record, "dataValues.metadata.granularity") === granularity;
const record = dataSources.find((record: any) => {
const metadata = _.get(record, "dataValues.metadata", {});
const { aggregated, granularity } = metadata;
if (!aggregated) {
return true;
}
return aggregated && requestGranularity ? granularity === requestGranularity : false;
});
return record[0]?.dataValues?.datasource_ref
return _.get(record, ["dataValues", "datasource_ref"])
}

const checkSupervisorAvailability = async (datasourceRef: string) => {
const { data } = await druidHttpService.get("/druid/coordinator/v1/loadstatus");
const datasourceAvailability = _.get(data, datasourceRef)
if (_.isUndefined(datasourceAvailability)) {
throw obsrvError("", "DATASOURCE_NOT_AVAILABLE", "Datasource not available for querying", "NOT_FOUND", 404)
}
if (datasourceAvailability !== 100) {
throw obsrvError("", "DATASOURCE_NOT_FULLY_AVAILABLE", "Datasource not fully available for querying", "RANGE_NOT_SATISFIABLE", 416)
}
}

const setDatasourceRef = async (datasetId: string, payload: any): Promise<any> => {
const granularity = _.get(payload, "context.aggregationLevel")
const datasourceRef = await getDataSourceRef(datasetId, granularity);
if (!datasourceRef) {
throw obsrvError("", "DATASOURCE_NOT_FOUND", "Datasource not found to query", "NOT_FOUND", 404)
}
await checkSupervisorAvailability(datasourceRef)
const existingDatasources = await getDatasourceListFromDruid();

if (!_.includes(existingDatasources.data, datasourceRef)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { config } from "../../../configs/Config";
import chaiSpies from "chai-spies"
import { describe, it } from "mocha";
import { Datasource } from "../../../models/Datasource";
import { druidHttpService } from "../../../connections/druidConnection";
chai.use(chaiSpies)
chai.should();
chai.use(chaiHttp);
Expand All @@ -18,6 +19,7 @@ const nativeQueryEndpointDruid = config?.query_api?.druid?.native_query_path;
const sqlQueryEndpoint = config?.query_api?.druid?.sql_query_path;

const response = [{ dataValues: { datasource_ref: "test.1_rollup_week", metadata: { aggregated: true, granularity: "week" } } }]
const invalidResponse = [{ dataValues: { datasource_ref: "test.1_rollup_week", metadata: { aggregated: true, granularity: "n/a" } } }]
const msgid = "e180ecac-8f41-4f21-9a21-0b3a1a368917";

describe("QUERY API TESTS", () => {
Expand All @@ -33,6 +35,11 @@ describe("QUERY API TESTS", () => {
response
)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["telemetry-events.1_rollup"])
Expand All @@ -46,7 +53,7 @@ describe("QUERY API TESTS", () => {
res.body.params.msgid.should.be.eq(msgid);
res.body.responseCode.should.be.eq("NOT_FOUND");
res.body.error.message.should.be.eq("Dataset telemetry-events with table week is not available for querying");
res.body.error.code.should.be.eq("DATA_OUT_SOURCE_NOT_FOUND");
res.body.error.code.should.be.eq("DATASOURCE_NOT_FOUND");
done();
});
});
Expand All @@ -65,7 +72,80 @@ describe("QUERY API TESTS", () => {
res.body.responseCode.should.be.eq("NOT_FOUND");
res.body.params.msgid.should.be.eq(msgid);
res.body.error.message.should.be.eq("Datasource telemetry-events not available for querying");
res.body.error.code.should.be.eq("DATA_OUT_SOURCE_NOT_FOUND");
res.body.error.code.should.be.eq("DATASOURCE_NOT_FOUND");
done();
});
});

it("Query api failure: Datasource not available in druid", (done) => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(
response
)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "telemetry_events": 100 }
})
})
chai
.request(app)
.post("/v2/data/query/telemetry-events")
.send(JSON.parse(TestQueries.VALID_QUERY))
.end((err, res) => {
res.should.have.status(404);
res.body.params.status.should.be.eq("FAILED");
res.body.responseCode.should.be.eq("NOT_FOUND");
res.body.params.msgid.should.be.eq(msgid);
res.body.error.message.should.be.eq("Datasource not available for querying");
res.body.error.code.should.be.eq("DATASOURCE_NOT_AVAILABLE");
done();
});
});

it("Query api failure: Datasource not fully loaded in druid", (done) => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(
response
)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 20 }
})
})
chai
.request(app)
.post("/v2/data/query/telemetry-events")
.send(JSON.parse(TestQueries.VALID_QUERY))
.end((err, res) => {
res.should.have.status(416);
res.body.params.status.should.be.eq("FAILED");
res.body.responseCode.should.be.eq("RANGE_NOT_SATISFIABLE");
res.body.params.msgid.should.be.eq(msgid);
res.body.error.message.should.be.eq("Datasource not fully available for querying");
res.body.error.code.should.be.eq("DATASOURCE_NOT_FULLY_AVAILABLE");
done();
});
});

it("Query api failure: Datasource not found", (done) => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(
invalidResponse
)
})
chai
.request(app)
.post("/v2/data/query/telemetry-events")
.send(JSON.parse(TestQueries.VALID_QUERY))
.end((err, res) => {
res.should.have.status(404);
res.body.params.status.should.be.eq("FAILED");
res.body.responseCode.should.be.eq("NOT_FOUND");
res.body.params.msgid.should.be.eq(msgid);
res.body.error.message.should.be.eq("Datasource not found to query");
res.body.error.code.should.be.eq("DATASOURCE_NOT_FOUND");
done();
});
});
Expand All @@ -74,6 +154,11 @@ describe("QUERY API TESTS", () => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(response)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_week"])
Expand All @@ -100,6 +185,11 @@ describe("QUERY API TESTS", () => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(response)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_week"])
Expand All @@ -126,6 +216,11 @@ describe("QUERY API TESTS", () => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(response)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_week"])
Expand Down Expand Up @@ -153,6 +248,11 @@ describe("QUERY API TESTS", () => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(response)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_week"])
Expand Down Expand Up @@ -180,6 +280,11 @@ describe("QUERY API TESTS", () => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(response)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_week"])
Expand Down Expand Up @@ -223,6 +328,11 @@ describe("QUERY API TESTS", () => {
chai.spy.on(Datasource, "findAll", () => {
return Promise.resolve(response)
})
chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_week": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_week"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Datasource } from "../../../models/Datasource";
import nock from "nock";
import { config } from "../../../configs/Config";
import { templateQueryApiFixtures } from "./Fixtures";
import { druidHttpService } from "../../../connections/druidConnection";
const apiId = "api.query.template.query";
const msgid = "4a7f14c3-d61e-4d4f-be78-181834eeff6d"

Expand Down Expand Up @@ -50,6 +51,11 @@ describe("QUERY TEMPLATE API", () => {
return Promise.resolve(response)
})

chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_month": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_month"])
Expand Down Expand Up @@ -93,6 +99,11 @@ describe("QUERY TEMPLATE API", () => {
return Promise.resolve(response)
})

chai.spy.on(druidHttpService, "get", () => {
return Promise.resolve({
data: { "test.1_rollup_month": 100 }
})
})
nock(druidHost + ":" + druidPort)
.get(listDruidDatasources)
.reply(200, ["test.1_rollup_month"])
Expand Down