Skip to content

Commit

Permalink
frontend: Websocket backward compatibility
Browse files Browse the repository at this point in the history
This adds a new way to use new way to run websocket multiplexer. Default
way would be the legacy way which creates multiple websocket connection.
This adds a new flag `REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER` to run the
new API.

Signed-off-by: Kautilya Tripathi <[email protected]>
  • Loading branch information
knrt10 committed Nov 29, 2024
1 parent d72ca38 commit 31ba0ac
Show file tree
Hide file tree
Showing 7 changed files with 584 additions and 234 deletions.
1 change: 1 addition & 0 deletions frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
"build": "cross-env PUBLIC_URL=./ NODE_OPTIONS=--max-old-space-size=8096 vite build && npx shx rm -f build/frontend/index.baseUrl.html",
"pretest": "npm run make-version",
"test": "vitest",
"start-with-multiplexer": "cross-env REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER=true npm run start",
"lint": "eslint --cache -c package.json --ext .js,.ts,.tsx src/ ../app/electron ../plugins/headlamp-plugin --ignore-pattern ../plugins/headlamp-plugin/template --ignore-pattern ../plugins/headlamp-plugin/lib/",
"format": "prettier --config package.json --write --cache src ../app/electron ../app/tsconfig.json ../app/scripts ../plugins/headlamp-plugin/bin ../plugins/headlamp-plugin/config ../plugins/headlamp-plugin/template ../plugins/headlamp-plugin/test*.js ../plugins/headlamp-plugin/*.json ../plugins/headlamp-plugin/*.js",
"format-check": "prettier --config package.json --check --cache src ../app/electron ../app/tsconfig.json ../app/scripts ../plugins/headlamp-plugin/bin ../plugins/headlamp-plugin/config ../plugins/headlamp-plugin/template ../plugins/headlamp-plugin/test*.js ../plugins/headlamp-plugin/*.json ../plugins/headlamp-plugin/*.js",
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/components/common/Resource/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function getFilesToVerify() {
const filesToVerify: string[] = [];
fs.readdirSync(__dirname).forEach(file => {
const fileNoSuffix = file.replace(/\.[^/.]+$/, '');
if (!avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) {
if (fileNoSuffix && !avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) {
filesToVerify.push(fileNoSuffix);
}
});
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/components/common/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function getFilesToVerify() {
const filesToVerify: string[] = [];
fs.readdirSync(__dirname).forEach(file => {
const fileNoSuffix = file.replace(/\.[^/.]+$/, '');
if (!avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) {
if (fileNoSuffix && !avoidCheck.find(suffix => fileNoSuffix.endsWith(suffix))) {
filesToVerify.push(fileNoSuffix);
}
});
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ function loadTableSettings(tableId: string): { id: string; show: boolean }[] {
return settings;
}

/**
* @returns true if the websocket multiplexer is enabled.
* defaults to false. This is a feature flag to enable the websocket multiplexer.
*/
export function getWebsocketMultiplexerEnabled(): boolean {
return import.meta.env.REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER === 'true';
}

/**
* The backend token to use when making API calls from Headlamp when running as an app.
* The app opens the index.html?backendToken=... and passes the token to the frontend
Expand Down Expand Up @@ -393,6 +401,7 @@ const exportFunctions = {
storeClusterSettings,
loadClusterSettings,
getHeadlampAPIHeaders,
getWebsocketMultiplexerEnabled,
storeTableSettings,
loadTableSettings,
};
Expand Down
221 changes: 182 additions & 39 deletions frontend/src/lib/k8s/api/v2/useKubeObjectList.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { QueryObserverOptions, useQueries, useQueryClient } from '@tanstack/react-query';
import { useEffect, useMemo, useRef, useState } from 'react';
import { useCallback,useEffect, useMemo, useRef, useState } from 'react';
import { getWebsocketMultiplexerEnabled } from '../../../../helpers';
import { KubeObject, KubeObjectClass } from '../../KubeObject';
import { ApiError } from '../v1/clusterRequests';
import { QueryParameters } from '../v1/queryParameters';
import { clusterFetch } from './fetch';
import { QueryListResponse, useEndpoints } from './hooks';
import { KubeList } from './KubeList';
import { KubeList, KubeListUpdateEvent } from './KubeList';
import { KubeObjectEndpoint } from './KubeObjectEndpoint';
import { makeUrl } from './makeUrl';
import { BASE_WS_URL, WebSocketManager } from './webSocket';
import { BASE_WS_URL, useWebSockets, WebSocketManager } from './webSocket';

/**
* Object representing a List of Kube object
Expand Down Expand Up @@ -112,76 +113,218 @@ export function useWatchKubeObjectLists<K extends KubeObject>({
/** Which clusters and namespaces to watch */
lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>;
}) {
const websocketMultiplexerStatus = getWebsocketMultiplexerEnabled();

if (websocketMultiplexerStatus) {
return useWatchKubeObjectListsMultiplexed({
kubeObjectClass,
endpoint,
lists,
queryParams,
});
} else {
return useWatchKubeObjectListsLegacy({
kubeObjectClass,
endpoint,
lists,
queryParams,
});
}
}

/**
* Watches Kubernetes resource lists using multiplexed WebSocket connections.
* Efficiently manages subscriptions and updates to prevent unnecessary re-renders
* and WebSocket reconnections.
*
* @template K - Type extending KubeObject for the resources being watched
* @param kubeObjectClass - Class constructor for the Kubernetes resource type
* @param endpoint - API endpoint information for the resource
* @param lists - Array of cluster, namespace, and resourceVersion combinations to watch
* @param queryParams - Optional query parameters for the WebSocket URL
*/
function useWatchKubeObjectListsMultiplexed<K extends KubeObject>({
kubeObjectClass,
endpoint,
lists,
queryParams,
}: {
kubeObjectClass: (new (...args: any) => K) & typeof KubeObject<any>;
endpoint?: KubeObjectEndpoint | null;
lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>;
queryParams?: QueryParameters;
}): void {
const client = useQueryClient();

// Track the latest resource versions to prevent duplicate updates
const latestResourceVersions = useRef<Record<string, string>>({});

// Create URLs for all lists
// Stabilize queryParams to prevent unnecessary effect triggers
// Only update when the stringified params change
const stableQueryParams = useMemo(() => queryParams, [JSON.stringify(queryParams)]);

// Create stable connection URLs for each list
// Updates only when endpoint, lists, or stableQueryParams change
const connections = useMemo(() => {
if (!endpoint) return [];
if (!endpoint) {
return [];
}

return lists.map(list => {
const key = `${list.cluster}:${list.namespace || ''}`;
// Only update resourceVersion if it's newer
if (
!latestResourceVersions.current[key] ||
parseInt(list.resourceVersion) > parseInt(latestResourceVersions.current[key])
) {
latestResourceVersions.current[key] = list.resourceVersion;

// Update resource version if newer one is available
const currentVersion = latestResourceVersions.current[key];
const newVersion = list.resourceVersion;
if (!currentVersion || parseInt(newVersion) > parseInt(currentVersion)) {
latestResourceVersions.current[key] = newVersion;
}

// Construct WebSocket URL with current parameters
return {
url: makeUrl([KubeObjectEndpoint.toUrl(endpoint, list.namespace)], {
...queryParams,
...stableQueryParams,
watch: 1,
resourceVersion: latestResourceVersions.current[key],
}),
cluster: list.cluster,
namespace: list.namespace,
};
});
}, [endpoint, lists, queryParams]);
}, [endpoint, lists, stableQueryParams]);

// Create stable update handler to process WebSocket messages
// Re-create only when dependencies change
const handleUpdate = useCallback(
(update: any, cluster: string, namespace: string | undefined) => {
if (!update || typeof update !== 'object' || !endpoint) {
return;
}

const key = `${cluster}:${namespace || ''}`;

// Update resource version from incoming message
if (update.object?.metadata?.resourceVersion) {
latestResourceVersions.current[key] = update.object.metadata.resourceVersion;
}

// Create query key for React Query cache
const queryKey = kubeObjectListQuery<K>(
kubeObjectClass,
endpoint,
namespace,
cluster,
stableQueryParams ?? {}
).queryKey;

// Update React Query cache with new data
client.setQueryData(queryKey, (oldResponse: ListResponse<any> | undefined | null) => {
if (!oldResponse) {
return oldResponse;
}

const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass);

// Only update if the list actually changed
if (newList === oldResponse.list) {
return oldResponse;
}

return { ...oldResponse, list: newList };
});
},
[client, kubeObjectClass, endpoint, stableQueryParams]
);

// Set up WebSocket subscriptions
useEffect(() => {
if (!endpoint || connections.length === 0) return;
if (!endpoint || connections.length === 0) {
return;
}

const cleanups: (() => void)[] = [];

// Create subscriptions for each connection
connections.forEach(({ url, cluster, namespace }) => {
const parsedUrl = new URL(url, BASE_WS_URL);
const key = `${cluster}:${namespace || ''}`;

WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update => {
if (!update || typeof update !== 'object') return;

// Update latest resourceVersion
if (update.object?.metadata?.resourceVersion) {
latestResourceVersions.current[key] = update.object.metadata.resourceVersion;
}

const queryKey = kubeObjectListQuery<K>(
kubeObjectClass,
endpoint,
namespace,
cluster,
queryParams ?? {}
).queryKey;

client.setQueryData(queryKey, (oldResponse: ListResponse<any> | undefined | null) => {
if (!oldResponse) return oldResponse;
const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass);
if (newList === oldResponse.list) return oldResponse;
return { ...oldResponse, list: newList };
});
}).then(
// Subscribe to WebSocket updates
WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update =>
handleUpdate(update, cluster, namespace)
).then(
cleanup => cleanups.push(cleanup),
error => console.error('WebSocket subscription failed:', error)
);
});

// Cleanup subscriptions when effect re-runs or unmounts
return () => {
cleanups.forEach(cleanup => cleanup());
};
}, [connections, endpoint, client, kubeObjectClass, queryParams]);
}, [connections, endpoint, handleUpdate]);
}

/**
* Accepts a list of lists to watch.
* Upon receiving update it will modify query data for list query
* @param kubeObjectClass - KubeObject class of the watched resource list
* @param endpoint - Kube resource API endpoint information
* @param lists - Which clusters and namespaces to watch
* @param queryParams - Query parameters for the WebSocket connection URL
*/
function useWatchKubeObjectListsLegacy<K extends KubeObject>({
kubeObjectClass,
endpoint,
lists,
queryParams,
}: {
/** KubeObject class of the watched resource list */
kubeObjectClass: (new (...args: any) => K) & typeof KubeObject<any>;
/** Query parameters for the WebSocket connection URL */
queryParams?: QueryParameters;
/** Kube resource API endpoint information */
endpoint?: KubeObjectEndpoint | null;
/** Which clusters and namespaces to watch */
lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>;
}) {
const client = useQueryClient();

const connections = useMemo(() => {
if (!endpoint) return [];

return lists.map(({ cluster, namespace, resourceVersion }) => {
const url = makeUrl([KubeObjectEndpoint.toUrl(endpoint!, namespace)], {
...queryParams,
watch: 1,
resourceVersion,
});

return {
cluster,
url,
onMessage(update: KubeListUpdateEvent<K>) {
const key = kubeObjectListQuery<K>(
kubeObjectClass,
endpoint,
namespace,
cluster,
queryParams ?? {}
).queryKey;
client.setQueryData(key, (oldResponse: ListResponse<any> | undefined | null) => {
if (!oldResponse) return oldResponse;

const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass);
return { ...oldResponse, list: newList };
});
},
};
});
}, [lists, kubeObjectClass, endpoint]);

useWebSockets<KubeListUpdateEvent<K>>({
enabled: !!endpoint,
connections,
});
}

/**
Expand Down
Loading

0 comments on commit 31ba0ac

Please sign in to comment.