Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Health reset api #212

Merged
merged 10 commits into from
Jul 26, 2024
57 changes: 57 additions & 0 deletions api-service/src/controllers/DatasetReset/DatasetReset.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Request, Response } from "express";
import _ from "lodash";
import { schemaValidation } from "../../services/ValidationService";
import DatasetResetRequestSchema from "./DatasetResetValidationSchema.json"
import { ResponseHandler } from "../../helpers/ResponseHandler";
import { HealthStatus } from "../../types/DatasetModels";
import { getDruidIndexers, getFlinkHealthStatus, restartDruidIndexers } from "../../services/DatasetHealthService";
import { restartPipeline } from "../DatasetStatusTransition/DatasetStatusTransition";
import { obsrvError } from "../../types/ObsrvError";
import { datasetService } from "../../services/DatasetService";
import httpStatus from "http-status";

export const apiId = "api.dataset.reset";

const validateRequest = async (req: Request) => {

const isRequestValid: Record<string, any> = schemaValidation(req.body, DatasetResetRequestSchema)
if (!isRequestValid.isValid) {
throw obsrvError("", "DATASET_INVALID_INPUT", isRequestValid.message, "BAD_REQUEST", 400)
}
const datasetId = _.get(req, ["params", "datasetId"])
const isDataSetExists = await datasetService.checkDatasetExists(datasetId);
if (!isDataSetExists) {
throw obsrvError(datasetId, "DATASET_NOT_FOUND", `Dataset not exists with id:${datasetId}`, httpStatus[httpStatus.NOT_FOUND], 404)
}
}

const datasetReset = async (req: Request, res: Response) => {

const category = _.get(req, ["body", "request", "category"]);
const datasetId = _.get(req, ["params"," datasetId"]);

await validateRequest(req);
if (category == "processing") {
const pipeLineStatus = await getFlinkHealthStatus()
if (pipeLineStatus == HealthStatus.UnHealthy) {
await restartPipeline({ "dataset": { "dataset_id": datasetId } })
}
} else if (category == "query") {
const datasources = await datasetService.findDatasources({"dataset_id": datasetId})
if(!_.isEmpty(datasources)) {
const unHealthySupervisors = await getDruidIndexers(datasources, HealthStatus.UnHealthy)
const unHealthyDataSources = _.filter(unHealthySupervisors, (supervisor: any) => supervisor?.state == "SUSPENDED")
if (!_.isEmpty(unHealthyDataSources)) {
await restartDruidIndexers(unHealthyDataSources)
}
}
}

return ResponseHandler.successResponse(req, res, {
status: 200, data: {
"status": "Completed"
}
});
}

export default datasetReset;
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"type": "object",
"properties": {
"id": {
"type": "string"
},
"ver": {
"type": "string"
},
"ts": {
"type": "string"
},
"params": {
"type": "object",
"properties": {
"msgid": {
"type": "string"
}
},
"required": [
"msgid"
],
"additionalProperties": false
},
"request": {
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["processing", "query"]
}
},
"required": [
"category"
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ const canRetireIfMasterDataset = async (dataset: Record<string, any>) => {
}
}

const restartPipeline = async (dataset: Record<string, any>) => {
export const restartPipeline = async (dataset: Record<string, any>) => {
return executeCommand(dataset.id, "RESTART_PIPELINE")
}

Expand Down
2 changes: 2 additions & 0 deletions api-service/src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { sqlQuery } from "../controllers/QueryWrapper/SqlQueryWrapper";
import DatasetStatusTansition from "../controllers/DatasetStatusTransition/DatasetStatusTransition";
import datasetHealth from "../controllers/DatasetHealth/DatasetHealth";
import DataSchemaGenerator from "../controllers/GenerateDataSchema/GenerateDataSchema";
import datasetReset from "../controllers/DatasetReset/DatasetReset";

export const router = express.Router();

Expand All @@ -41,6 +42,7 @@ router.post("/template/query/:templateId", setDataToRequestObject("api.query.tem
router.post("/files/generate-url", setDataToRequestObject("api.files.generate-url"), onRequest({ entity: Entity.Management }), GenerateSignedURL);
router.post("/datasets/status-transition", setDataToRequestObject("api.datasets.status-transition"), onRequest({ entity: Entity.Management }), DatasetStatusTansition);
router.post("/dataset/health", setDataToRequestObject("api.dataset.health"), onRequest({ entity: Entity.Management }), datasetHealth);
router.post("/dataset/reset/:datasetId", setDataToRequestObject("api.dataset.reset"), onRequest({ entity: Entity.Management }), datasetReset);
router.post("/datasets/dataschema", setDataToRequestObject("api.datasets.dataschema"), onRequest({ entity: Entity.Management }), DataSchemaGenerator);


Expand Down
40 changes: 34 additions & 6 deletions api-service/src/services/DatasetHealthService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ const queryMetrics = async (datasetId: string, query: string) => {
export const getInfraHealth = async (isMasterDataset: boolean): Promise<{ components: any, status: string }> => {
const postgres = await getPostgresStatus()
const druid = await getDruidHealthStatus()
const flink = await getFlinkHealthStaus()
const flink = await getFlinkHealthStatus()
const kafka = await getKafkaHealthStatus()
let redis = HealthStatus.Healthy
const components = [
Expand All @@ -104,7 +104,7 @@ export const getInfraHealth = async (isMasterDataset: boolean): Promise<{ compon
export const getProcessingHealth = async (dataset: any): Promise<{ components: any, status: string }> => {
const dataset_id = _.get(dataset, "dataset_id")
const isMasterDataset = _.get(dataset, "type") == DatasetType.master;
const flink = await getFlinkHealthStaus()
const flink = await getFlinkHealthStatus()
const { count, health } = await getEventsProcessedToday(dataset_id, isMasterDataset)
const processingDefaultThreshold = await SystemConfig.getThresholds("processing")
// eslint-disable-next-line prefer-const
Expand Down Expand Up @@ -279,8 +279,6 @@ const getDruidIndexerStatus = async (datasources: any,) => {
logger.error(error)
return { value: [], status: HealthStatus.UnHealthy }
}


}

const getDruidDataourceStatus = async (datasourceId: string) => {
Expand Down Expand Up @@ -335,7 +333,7 @@ const getKafkaHealthStatus = async () => {

}

const getFlinkHealthStaus = async () => {
export const getFlinkHealthStatus = async () => {
try {
const responses = await Promise.all(
[axios.get(config?.flink_job_configs?.masterdata_processor_job_manager_url as string + "/jobs"),
Expand Down Expand Up @@ -482,4 +480,34 @@ const getAvgProcessingSpeedInSec = async (datasetId: string, isMasterDataset: bo
logger.error(error)
return { count: 0, health: HealthStatus.UnHealthy }
}
}
}

export const getDruidIndexers = async (datasources: any, status = HealthStatus.Healthy) => {
const results = await Promise.all(_.map(datasources, (datasource) => getDruidDataourceStatus(datasource["datasource"])))
const indexers: any = []
_.forEach(results, (result: any) => {
logger.debug({ result })
const sourceStatus = _.get(result, "payload.state") == "RUNNING" ? HealthStatus.Healthy : HealthStatus.UnHealthy
logger.debug({ sourceStatus })
if (sourceStatus == status) {
indexers.push(
{
"type": "druid",
"datasource": _.get(result, "id"),
"status": sourceStatus,
"state": _.get(result, "payload.state")
}
)
}
})
return indexers
}

const restartDruidSupervisors = async (datasourceId: string) => {
const { data } = await druidHttpService.post(`/druid/indexer/v1/supervisor/${datasourceId}/resume`)
return data;
}

export const restartDruidIndexers = async (datasources: any) => {
await Promise.all(_.map(datasources, (datasource) => restartDruidSupervisors(datasource["datasource"])))
}
Loading