Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add status API to get executor status #2928

Merged
merged 1 commit into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions backend/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,27 @@ async def kill(request: Request):
return json(error_response("Error killing execution!", exception), status=500)


@app.route("/status")
async def status(_request: Request):
await nodes_available()

ctx = AppContext.get(app)

executor_status: str
if ctx.executor:
e = ctx.executor
if e.progress.aborted:
executor_status = "killing"
elif e.progress.paused:
executor_status = "paused"
else:
executor_status = "running"
else:
executor_status = "ready"

return json({"executor": executor_status})


@app.route("/packages", methods=["GET"])
async def get_packages(request: Request):
await nodes_available()
Expand Down
20 changes: 19 additions & 1 deletion backend/src/server_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dataclasses import asdict, dataclass
from functools import cached_property
from json import dumps as stringify
from json import loads as json_parse
from typing import Final

import psutil
Expand Down Expand Up @@ -349,7 +350,24 @@ async def shutdown(request: Request):
@app.get("/status")
async def status(request: Request):
ctx = AppContext.get(request.app)
return json({"ready": ctx.is_ready})

worker_status = None
if ctx.is_ready:
try:
worker = ctx.get_worker_unmanaged()
worker_status = await worker.proxy_request(request)
if worker_status.body is not None:
# decode JSOn body
worker_status = json_parse(worker_status.body)
except Exception as ex:
worker_status = {"success": False, "error": str(ex)}

return json(
{
"ready": ctx.is_ready,
"worker": worker_status,
}
)


async def import_packages(
Expand Down
9 changes: 8 additions & 1 deletion src/common/Backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ export interface BackendRunIndividualRequest {
schemaId: SchemaId;
options: PackageSettings;
}
export interface BackendWorkerStatusResponse {
executor: 'running' | 'killing' | 'paused' | 'ready';
}
export interface BackendStatusResponse {
ready: boolean;
worker: null | BackendError | BackendWorkerStatusResponse;
}

export type BackendResult<T> = BackendSuccess<T> | BackendError;
export interface BackendSuccess<T> {
Expand Down Expand Up @@ -256,7 +263,7 @@ export class Backend {
return this.fetchJson('/shutdown', 'POST');
}

status(): Promise<{ ready: boolean }> {
status(): Promise<BackendStatusResponse> {
return this.fetchJson('/status', 'GET');
}
}
Expand Down
11 changes: 8 additions & 3 deletions src/renderer/contexts/BackendContext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import React, {
import { useTranslation } from 'react-i18next';
import { useQuery, useQueryClient } from 'react-query';
import { createContext, useContext } from 'use-context-selector';
import { Backend, BackendNodesResponse, getBackend } from '../../common/Backend';
import {
Backend,
BackendNodesResponse,
BackendStatusResponse,
getBackend,
} from '../../common/Backend';
import { CategoryMap } from '../../common/CategoryMap';
import {
CategoryId,
Expand Down Expand Up @@ -304,7 +309,7 @@ export const BackendProvider = memo(
}, [isBackendReady]);
const statusQuery = useQuery({
queryKey: ['status', backend.url],
queryFn: async (): Promise<{ ready: boolean }> => {
queryFn: async (): Promise<BackendStatusResponse> => {
try {
// spin until we're no longer restarting
while (backendDownRef.current) {
Expand All @@ -314,7 +319,7 @@ export const BackendProvider = memo(

return await backend.status();
} catch (error) {
return { ready: false };
return { ready: false, worker: null };
}
},
cacheTime: 0,
Expand Down
43 changes: 42 additions & 1 deletion src/renderer/contexts/ExecutionContext.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { evaluate } from '@chainner/navi';
import { memo, useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { useQuery } from 'react-query';
import { useReactFlow } from 'reactflow';
import { createContext, useContext, useContextSelector } from 'use-context-selector';
import { BackendEventMap } from '../../common/Backend';
import {
Backend,
BackendEventMap,
BackendStatusResponse,
BackendWorkerStatusResponse,
} from '../../common/Backend';
import { EdgeData, NodeData, OutputId } from '../../common/common-types';
import { formatExecutionErrorMessage } from '../../common/formatExecutionErrorMessage';
import { log } from '../../common/log';
Expand Down Expand Up @@ -130,6 +136,28 @@ const useRegisterNodeEvents = (
);
};

const useOnBackendStatus = (backend: Backend, onChange: (data: BackendStatusResponse) => void) => {
const statusQuery = useQuery({
queryKey: ['status', backend.url],
queryFn: async () => {
return { ...(await backend.status()), timestamp: Date.now() };
},
cacheTime: 0,
retry: 25,
refetchOnWindowFocus: true,
refetchInterval: 3000,
});

const lastDataRef = useRef<BackendStatusResponse | null>(null);

useEffect(() => {
if (statusQuery.data && statusQuery.data !== lastDataRef.current) {
lastDataRef.current = statusQuery.data;
onChange(statusQuery.data);
}
}, [statusQuery.data, onChange]);
};

// eslint-disable-next-line @typescript-eslint/ban-types
export const ExecutionProvider = memo(({ children }: React.PropsWithChildren<{}>) => {
const {
Expand Down Expand Up @@ -193,6 +221,19 @@ export const ExecutionProvider = memo(({ children }: React.PropsWithChildren<{}>
[nodeStatusMap]
);

useOnBackendStatus(backend, (data) => {
if (data.worker && !('error' in data.worker)) {
const executorStatus = data.worker.executor;
const statusMapping = {
running: ExecutionStatus.RUNNING,
paused: ExecutionStatus.PAUSED,
killing: ExecutionStatus.KILLING,
ready: ExecutionStatus.READY,
} satisfies Record<BackendWorkerStatusResponse['executor'], ExecutionStatus>;
setStatus(statusMapping[executorStatus]);
}
});

useEffect(() => {
if (status === ExecutionStatus.RUNNING || status === ExecutionStatus.PAUSED) {
ipcRenderer.send('set-progress-bar', totalChainProgress);
Expand Down
Loading