diff --git a/images/dark/stream-processor.svg b/images/dark/stream-processor.svg new file mode 100644 index 000000000..f3268bcd7 --- /dev/null +++ b/images/dark/stream-processor.svg @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/images/light/stream-processor.svg b/images/light/stream-processor.svg new file mode 100644 index 000000000..4d4d58316 --- /dev/null +++ b/images/light/stream-processor.svg @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/package-lock.json b/package-lock.json index 081f2119d..c95435af8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35,7 +35,7 @@ "mongodb-build-info": "^1.6.2", "mongodb-cloud-info": "^2.1.0", "mongodb-connection-string-url": "^2.6.0", - "mongodb-data-service": "^22.17.0", + "mongodb-data-service": "^22.17.1", "mongodb-data-service-legacy": "npm:mongodb-data-service@22.8.0", "mongodb-log-writer": "^1.4.0", "mongodb-query-parser": "^3.1.3", @@ -17545,9 +17545,9 @@ } }, "node_modules/mongodb-data-service": { - "version": "22.17.0", - "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.0.tgz", - "integrity": "sha512-5HO571xTjFIXZRwDEYMIefJvqQj8X8hvckrXB7U1LUEEVfIKh7ysmE1bO9eH3A50qr6i4L4bgWOV7OhzoATD6Q==", + "version": "22.17.1", + "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.1.tgz", + "integrity": "sha512-9TpxFMtMYuPVUOmwzNFnO+1rjzuiQkkAh1xwcat8xEFiIXY4O89h+2k7kbYiNWnwWD4LznvLyRcPqgreeTlQbg==", "dependencies": { "@mongodb-js/compass-logging": "^1.2.9", "@mongodb-js/compass-utils": "^0.5.8", @@ -38689,9 +38689,9 @@ } }, "mongodb-data-service": { - "version": "22.17.0", - "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.0.tgz", - "integrity": "sha512-5HO571xTjFIXZRwDEYMIefJvqQj8X8hvckrXB7U1LUEEVfIKh7ysmE1bO9eH3A50qr6i4L4bgWOV7OhzoATD6Q==", + "version": "22.17.1", + "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.1.tgz", + "integrity": "sha512-9TpxFMtMYuPVUOmwzNFnO+1rjzuiQkkAh1xwcat8xEFiIXY4O89h+2k7kbYiNWnwWD4LznvLyRcPqgreeTlQbg==", "requires": { "@mongodb-js/compass-logging": "^1.2.9", "@mongodb-js/compass-utils": "^0.5.8", diff --git a/package.json b/package.json index 5b616a37f..9a65eaf7e 100644 --- a/package.json +++ b/package.json @@ -397,6 +397,26 @@ { "command": "mdb.deleteDocumentFromTreeView", "title": "Delete Document..." + }, + { + "command": "mdb.addStreamProcessor", + "title": "Add StreamProcessor...", + "icon": { + "light": "images/light/plus-circle.svg", + "dark": "images/dark/plus-circle.svg" + } + }, + { + "command": "mdb.startStreamProcessor", + "title": "Start Stream Processor" + }, + { + "command": "mdb.stopStreamProcessor", + "title": "Stop Stream Processor" + }, + { + "command": "mdb.dropStreamProcessor", + "title": "Drop Stream Processor..." } ], "menus": { @@ -428,12 +448,22 @@ "view/item/context": [ { "command": "mdb.addDatabase", - "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem", + "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == false", "group": "inline" }, { "command": "mdb.addDatabase", - "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem", + "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == false", + "group": "1@1" + }, + { + "command": "mdb.addStreamProcessor", + "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == true", + "group": "inline" + }, + { + "command": "mdb.addStreamProcessor", + "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == true", "group": "1@1" }, { @@ -611,6 +641,18 @@ "command": "mdb.deleteDocumentFromTreeView", "when": "view == mongoDBConnectionExplorer && viewItem == documentTreeItem", "group": "3@1" + }, + { + "command": "mdb.startStreamProcessor", + "when": "view == mongoDBConnectionExplorer && viewItem == streamProcessorTreeItem" + }, + { + "command": "mdb.stopStreamProcessor", + "when": "view == mongoDBConnectionExplorer && viewItem == streamProcessorTreeItem" + }, + { + "command": "mdb.dropStreamProcessor", + "when": "view == mongoDBConnectionExplorer && viewItem == streamProcessorTreeItem" } ], "editor/title": [ @@ -639,27 +681,27 @@ }, { "command": "mdb.exportToRuby", - "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true" + "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false" }, { "command": "mdb.exportToPython", - "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true" + "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false" }, { "command": "mdb.exportToJava", - "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true" + "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false" }, { "command": "mdb.exportToCsharp", - "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true" + "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false" }, { "command": "mdb.exportToNode", - "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true" + "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false" }, { "command": "mdb.exportToGo", - "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true" + "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false" }, { "command": "mdb.refreshPlaygroundsFromTreeView", @@ -804,6 +846,22 @@ { "command": "mdb.deleteDocumentFromTreeView", "when": "false" + }, + { + "command": "mdb.addStreamProcessor", + "when": "false" + }, + { + "command": "mdb.startStreamProcessor", + "when": "false" + }, + { + "command": "mdb.stopStreamProcessor", + "when": "false" + }, + { + "command": "mdb.dropStreamProcessor", + "when": "false" } ] }, @@ -1002,7 +1060,7 @@ "mongodb-build-info": "^1.6.2", "mongodb-cloud-info": "^2.1.0", "mongodb-connection-string-url": "^2.6.0", - "mongodb-data-service": "^22.17.0", + "mongodb-data-service": "^22.17.1", "mongodb-data-service-legacy": "npm:mongodb-data-service@22.8.0", "mongodb-log-writer": "^1.4.0", "mongodb-query-parser": "^3.1.3", diff --git a/src/commands/index.ts b/src/commands/index.ts index 0b8dcbc89..ede46c20c 100644 --- a/src/commands/index.ts +++ b/src/commands/index.ts @@ -66,6 +66,10 @@ enum EXTENSION_COMMANDS { MDB_COPY_DOCUMENT_CONTENTS_FROM_TREE_VIEW = 'mdb.copyDocumentContentsFromTreeView', MDB_CLONE_DOCUMENT_FROM_TREE_VIEW = 'mdb.cloneDocumentFromTreeView', MDB_DELETE_DOCUMENT_FROM_TREE_VIEW = 'mdb.deleteDocumentFromTreeView', + MDB_ADD_STREAM_PROCESSOR = 'mdb.addStreamProcessor', + MDB_START_STREAM_PROCESSOR = 'mdb.startStreamProcessor', + MDB_STOP_STREAM_PROCESSOR = 'mdb.stopStreamProcessor', + MDB_DROP_STREAM_PROCESSOR = 'mdb.dropStreamProcessor', } export default EXTENSION_COMMANDS; diff --git a/src/connectionController.ts b/src/connectionController.ts index f831773a7..5e72ff334 100644 --- a/src/connectionController.ts +++ b/src/connectionController.ts @@ -32,6 +32,7 @@ import { openLink } from './utils/linkHelper'; import type { LoadedConnection } from './storage/connectionStorage'; import { ConnectionStorage } from './storage/connectionStorage'; import LINKS from './utils/links'; +import { isAtlasStream } from 'mongodb-build-info'; // eslint-disable-next-line @typescript-eslint/no-var-requires const packageJSON = require('../package.json'); @@ -397,6 +398,12 @@ export default class ConnectionController { true ); + void vscode.commands.executeCommand( + 'setContext', + 'mdb.isAtlasStreams', + this.isConnectedToAtlasStreams() + ); + void this.onConnectSuccess({ connectionInfo, dataService, @@ -542,6 +549,11 @@ export default class ConnectionController { 'mdb.connectedToMongoDB', false ); + void vscode.commands.executeCommand( + 'setContext', + 'mdb.isAtlasStreams', + false + ); } catch (error) { // Show an error, however we still reset the active connection to free up the extension. void vscode.window.showErrorMessage( @@ -769,6 +781,13 @@ export default class ConnectionController { return connectionStringData.toString(); } + isConnectedToAtlasStreams() { + return ( + this.isCurrentlyConnected() && + isAtlasStream(this.getActiveConnectionString()) + ); + } + getActiveConnectionString(): string { const mongoClientConnectionOptions = this.getMongoClientConnectionOptions(); const connectionString = mongoClientConnectionOptions?.url; diff --git a/src/editors/playgroundController.ts b/src/editors/playgroundController.ts index 49f0c6adb..6e932d433 100644 --- a/src/editors/playgroundController.ts +++ b/src/editors/playgroundController.ts @@ -20,6 +20,8 @@ import playgroundCreateIndexTemplate from '../templates/playgroundCreateIndexTem import playgroundCreateCollectionTemplate from '../templates/playgroundCreateCollectionTemplate'; import playgroundCloneDocumentTemplate from '../templates/playgroundCloneDocumentTemplate'; import playgroundInsertDocumentTemplate from '../templates/playgroundInsertDocumentTemplate'; +import playgroundStreamsTemplate from '../templates/playgroundStreamsTemplate'; +import playgroundCreateStreamProcessorTemplate from '../templates/playgroundCreateStreamProcessorTemplate'; import type { PlaygroundResult, ShellEvaluateResult, @@ -391,11 +393,25 @@ export default class PlaygroundController { return this._createPlaygroundFileWithContent(content); } + async createPlaygroundForCreateStreamProcessor( + element: ConnectionTreeItem + ): Promise { + const content = playgroundCreateStreamProcessorTemplate; + + element.cacheIsUpToDate = false; + + this._telemetryService.trackPlaygroundCreated('createStreamProcessor'); + + return this._createPlaygroundFileWithContent(content); + } + async createPlayground(): Promise { const useDefaultTemplate = !!vscode.workspace .getConfiguration('mdb') .get('useDefaultTemplateForPlayground'); - const content = useDefaultTemplate ? playgroundTemplate : ''; + const isStreams = this._connectionController.isConnectedToAtlasStreams(); + const template = isStreams ? playgroundStreamsTemplate : playgroundTemplate; + const content = useDefaultTemplate ? template : ''; this._telemetryService.trackPlaygroundCreated('crud'); return this._createPlaygroundFileWithContent(content); diff --git a/src/explorer/connectionTreeItem.ts b/src/explorer/connectionTreeItem.ts index e110892ea..2ca9a7d9a 100644 --- a/src/explorer/connectionTreeItem.ts +++ b/src/explorer/connectionTreeItem.ts @@ -1,11 +1,13 @@ import * as vscode from 'vscode'; import path from 'path'; +import type { StreamProcessor } from 'mongodb-data-service/lib/data-service'; import DatabaseTreeItem from './databaseTreeItem'; import type ConnectionController from '../connectionController'; import formatError from '../utils/formatError'; import { getImagesPath } from '../extensionConstants'; import type TreeItemParent from './treeItemParentInterface'; +import StreamProcessorTreeItem from './streamProcessorTreeItem'; export enum ConnectionItemContextValues { disconnected = 'disconnectedConnectionTreeItem', @@ -36,7 +38,9 @@ export default class ConnectionTreeItem { contextValue = ConnectionItemContextValues.disconnected; - private _childrenCache: { [key: string]: DatabaseTreeItem }; + private _childrenCache: { + [key: string]: DatabaseTreeItem | StreamProcessorTreeItem; + }; cacheIsUpToDate: boolean; private _connectionController: ConnectionController; @@ -57,7 +61,9 @@ export default class ConnectionTreeItem isExpanded: boolean; connectionController: ConnectionController; cacheIsUpToDate: boolean; - childrenCache: { [key: string]: DatabaseTreeItem }; // Existing cache. + childrenCache: { + [key: string]: DatabaseTreeItem | StreamProcessorTreeItem; + }; // Existing cache. }) { super( connectionController.getSavedConnectionName(connectionId), @@ -118,6 +124,23 @@ export default class ConnectionTreeItem } } + async listStreamProcessors(): Promise { + const dataService = this._connectionController.getActiveDataService(); + + if (dataService === null) { + throw new Error('Not currently connected.'); + } + + try { + const processors = await dataService.listStreamProcessors(); + return processors; + } catch (error) { + throw new Error( + `Unable to list stream processors: ${formatError(error).message}` + ); + } + } + async getChildren(): Promise { if ( !this.isExpanded || @@ -133,67 +156,107 @@ export default class ConnectionTreeItem throw new Error('Not currently connected.'); } + const isAtlasStreams = + this._connectionController.isConnectedToAtlasStreams(); + if (this.cacheIsUpToDate) { const pastChildrenCache = this._childrenCache; this._childrenCache = {}; - // We create a new database tree item here instead of reusing the + // We create a new tree item here instead of reusing the // cached one in order to ensure the expanded state is set. - Object.keys(pastChildrenCache).forEach((databaseName) => { - const prevChild = pastChildrenCache[databaseName]; + Object.keys(pastChildrenCache).forEach((childName) => { + const prevChild = pastChildrenCache[childName]; if (prevChild.isDropped) { return; } - this._childrenCache[databaseName] = new DatabaseTreeItem({ - databaseName, - dataService, - isExpanded: prevChild.isExpanded, - cacheIsUpToDate: prevChild.cacheIsUpToDate, - childrenCache: prevChild.getChildrenCache(), - }); + if (isAtlasStreams) { + const spItem = prevChild as StreamProcessorTreeItem; + this._childrenCache[childName] = new StreamProcessorTreeItem({ + dataService, + isExpanded: spItem.isExpanded, + streamProcessorName: spItem.streamProcessorName, + streamProcessorState: spItem.streamProcessorState, + }); + } else { + const dbItem = prevChild as DatabaseTreeItem; + this._childrenCache[childName] = new DatabaseTreeItem({ + databaseName: childName, + dataService, + isExpanded: dbItem.isExpanded, + cacheIsUpToDate: dbItem.cacheIsUpToDate, + childrenCache: dbItem.getChildrenCache(), + }); + } }); return Object.values(this._childrenCache); } - const databases = await this.listDatabases(); - databases.sort((a: string, b: string) => { - return a.localeCompare(b); - }); + if (isAtlasStreams) { + const processors = await this.listStreamProcessors(); + processors.sort((a: StreamProcessor, b: StreamProcessor) => { + return a.name.localeCompare(b.name); + }); - this.cacheIsUpToDate = true; + this.cacheIsUpToDate = true; - if (!databases) { + const pastChildrenCache = this._childrenCache; this._childrenCache = {}; - return []; - } - - const pastChildrenCache = this._childrenCache; - this._childrenCache = {}; - - databases.forEach((name: string) => { - if (pastChildrenCache[name]) { + processors.forEach((sp) => { + const cachedItem = pastChildrenCache[ + sp.name + ] as StreamProcessorTreeItem; // We create a new element here instead of reusing the cached one // in order to ensure the expanded state is set. - this._childrenCache[name] = new DatabaseTreeItem({ - databaseName: name, + this._childrenCache[sp.name] = new StreamProcessorTreeItem({ dataService, - isExpanded: pastChildrenCache[name].isExpanded, - cacheIsUpToDate: pastChildrenCache[name].cacheIsUpToDate, - childrenCache: pastChildrenCache[name].getChildrenCache(), - }); - } else { - this._childrenCache[name] = new DatabaseTreeItem({ - databaseName: name, - dataService, - isExpanded: false, - cacheIsUpToDate: false, // Cache is not up to date (no cache). - childrenCache: {}, // No existing cache. + streamProcessorName: sp.name, + streamProcessorState: sp.state, + isExpanded: cachedItem ? cachedItem.isExpanded : false, }); + }); + } else { + const databases = await this.listDatabases(); + databases.sort((a: string, b: string) => { + return a.localeCompare(b); + }); + + this.cacheIsUpToDate = true; + + if (!databases) { + this._childrenCache = {}; + return []; } - }); + + const pastChildrenCache = this._childrenCache; + this._childrenCache = {}; + + databases.forEach((name: string) => { + const cachedItem = pastChildrenCache[name] as DatabaseTreeItem; + if (cachedItem) { + // We create a new element here instead of reusing the cached one + // in order to ensure the expanded state is set. + this._childrenCache[name] = new DatabaseTreeItem({ + databaseName: name, + dataService, + isExpanded: cachedItem.isExpanded, + cacheIsUpToDate: cachedItem.cacheIsUpToDate, + childrenCache: cachedItem.getChildrenCache(), + }); + } else { + this._childrenCache[name] = new DatabaseTreeItem({ + databaseName: name, + dataService, + isExpanded: false, + cacheIsUpToDate: false, // Cache is not up to date (no cache). + childrenCache: {}, // No existing cache. + }); + } + }); + } return Object.values(this._childrenCache); } @@ -235,7 +298,9 @@ export default class ConnectionTreeItem this.cacheIsUpToDate = false; } - getChildrenCache(): { [key: string]: DatabaseTreeItem } { + getChildrenCache(): { + [key: string]: DatabaseTreeItem | StreamProcessorTreeItem; + } { return this._childrenCache; } } diff --git a/src/explorer/index.ts b/src/explorer/index.ts index dd67393e9..849da95a3 100644 --- a/src/explorer/index.ts +++ b/src/explorer/index.ts @@ -7,6 +7,7 @@ import DocumentTreeItem from './documentTreeItem'; import HelpExplorer from './helpExplorer'; import SchemaTreeItem from './schemaTreeItem'; import PlaygroundsExplorer from './playgroundsExplorer'; +import StreamProcessorTreeItem from './streamProcessorTreeItem'; export { CollectionTreeItem, @@ -18,4 +19,5 @@ export { HelpExplorer, PlaygroundsExplorer, SchemaTreeItem, + StreamProcessorTreeItem, }; diff --git a/src/explorer/streamProcessorTreeItem.ts b/src/explorer/streamProcessorTreeItem.ts new file mode 100644 index 000000000..a728cb50e --- /dev/null +++ b/src/explorer/streamProcessorTreeItem.ts @@ -0,0 +1,149 @@ +import * as vscode from 'vscode'; +import path from 'path'; +import type { DataService } from 'mongodb-data-service'; + +import formatError from '../utils/formatError'; +import { getImagesPath } from '../extensionConstants'; +import type TreeItemParent from './treeItemParentInterface'; + +function getIconPath(): { light: string; dark: string } { + const LIGHT = path.join(getImagesPath(), 'light'); + const DARK = path.join(getImagesPath(), 'dark'); + + return { + light: path.join(LIGHT, 'stream-processor.svg'), + dark: path.join(DARK, 'stream-processor.svg'), + }; +} + +export default class StreamProcessorTreeItem + extends vscode.TreeItem + implements TreeItemParent, vscode.TreeDataProvider +{ + contextValue = 'streamProcessorTreeItem' as const; + cacheIsUpToDate = true; + isExpanded: boolean; + isDropped = false; + + streamProcessorName: string; + streamProcessorState: string; + + private _dataService: DataService; + + constructor({ + streamProcessorName, + streamProcessorState, + dataService, + isExpanded, + }: { + streamProcessorName: string; + streamProcessorState: string; + dataService: DataService; + isExpanded: boolean; + }) { + super( + streamProcessorName, + isExpanded + ? vscode.TreeItemCollapsibleState.Expanded + : vscode.TreeItemCollapsibleState.Collapsed + ); + + this._dataService = dataService; + this.iconPath = getIconPath(); + this.isExpanded = isExpanded; + this.tooltip = streamProcessorName; + this.streamProcessorName = streamProcessorName; + this.streamProcessorState = streamProcessorState; + } + + getTreeItem(element: StreamProcessorTreeItem): StreamProcessorTreeItem { + return element; + } + + getChildren(): Promise { + return Promise.resolve( + !this.isExpanded + ? [] + : [ + new vscode.TreeItem( + `State: ${this.streamProcessorState}`, + vscode.TreeItemCollapsibleState.None + ), + ] + ); + } + + onDidCollapse(): void { + this.isExpanded = false; + this.cacheIsUpToDate = false; + } + + onDidExpand(): Promise { + this.cacheIsUpToDate = false; + this.isExpanded = true; + return Promise.resolve(true); + } + + async onStartClicked(): Promise { + try { + await this._dataService.startStreamProcessor(this.streamProcessorName); + this.streamProcessorState = 'STARTED'; + return true; + } catch (error) { + void vscode.window.showErrorMessage( + `Start stream processor failed: ${formatError(error).message}` + ); + return false; + } + } + + async onStopClicked(): Promise { + try { + await this._dataService.stopStreamProcessor(this.streamProcessorName); + this.streamProcessorState = 'STOPPED'; + return true; + } catch (error) { + void vscode.window.showErrorMessage( + `Stop stream processor failed: ${formatError(error).message}` + ); + return false; + } + } + + // Prompt the user to input the name to confirm the drop, then drop. + async onDropClicked(): Promise { + let inputtedName: string | undefined; + try { + inputtedName = await vscode.window.showInputBox({ + value: '', + placeHolder: 'e.g. myStreamProcessor', + prompt: `Are you sure you wish to drop this stream processor? Enter the stream processor name '${this.streamProcessorName}' to confirm.`, + validateInput: (inputName) => { + if (inputName && this.streamProcessorName !== inputName) { + return 'Stream processor name does not match'; + } + return null; + }, + }); + } catch (e) { + return Promise.reject( + new Error(`An error occured parsing the stream processor name: ${e}`) + ); + } + + if (this.streamProcessorName !== inputtedName) { + return Promise.resolve(false); + } + + try { + await this._dataService.dropStreamProcessor(this.streamProcessorName); + this.streamProcessorState = 'DROPPED'; + this.isDropped = true; + } catch (error) { + void vscode.window.showErrorMessage( + `Drop stream processor failed: ${formatError(error).message}` + ); + } + return this.isDropped; + } +} diff --git a/src/mdbExtensionController.ts b/src/mdbExtensionController.ts index 65305a710..d4f29d481 100644 --- a/src/mdbExtensionController.ts +++ b/src/mdbExtensionController.ts @@ -39,6 +39,7 @@ import PlaygroundResultProvider from './editors/playgroundResultProvider'; import WebviewController from './views/webviewController'; import { createIdFactory, generateId } from './utils/objectIdHelper'; import { ConnectionStorage } from './storage/connectionStorage'; +import type StreamProcessorTreeItem from './explorer/streamProcessorTreeItem'; // This class is the top-level controller for our extension. // Commands which the extensions handles are defined in the function `activate`. @@ -665,6 +666,91 @@ export default class MDBExtensionController implements vscode.Disposable { return true; } ); + this.registerCommand( + EXTENSION_COMMANDS.MDB_ADD_STREAM_PROCESSOR, + async (element: ConnectionTreeItem): Promise => { + if (!element) { + void vscode.window.showErrorMessage( + 'Please wait for the connection to finish loading before adding a stream processor.' + ); + + return false; + } + + if ( + element.connectionId !== + this._connectionController.getActiveConnectionId() + ) { + void vscode.window.showErrorMessage( + 'Please connect to this connection before adding a stream processor.' + ); + + return false; + } + + if (this._connectionController.isDisconnecting()) { + void vscode.window.showErrorMessage( + 'Unable to add stream processor: currently disconnecting.' + ); + + return false; + } + + if (this._connectionController.isConnecting()) { + void vscode.window.showErrorMessage( + 'Unable to add stream processor: currently connecting.' + ); + + return false; + } + + return this._playgroundController.createPlaygroundForCreateStreamProcessor( + element + ); + } + ); + this.registerCommand( + EXTENSION_COMMANDS.MDB_START_STREAM_PROCESSOR, + async (element: StreamProcessorTreeItem): Promise => { + const started = await element.onStartClicked(); + if (started) { + void vscode.window.showInformationMessage( + 'Stream processor successfully started.' + ); + // Refresh explorer view after a processor is started. + this._explorerController.refresh(); + } + return started; + } + ); + this.registerCommand( + EXTENSION_COMMANDS.MDB_STOP_STREAM_PROCESSOR, + async (element: StreamProcessorTreeItem): Promise => { + const stopped = await element.onStopClicked(); + if (stopped) { + void vscode.window.showInformationMessage( + 'Stream processor successfully stopped.' + ); + // Refresh explorer view after a processor is stopped. + this._explorerController.refresh(); + } + return stopped; + } + ); + this.registerCommand( + EXTENSION_COMMANDS.MDB_DROP_STREAM_PROCESSOR, + async (element: StreamProcessorTreeItem): Promise => { + const dropped = await element.onDropClicked(); + if (dropped) { + void vscode.window.showInformationMessage( + 'Stream processor successfully dropped.' + ); + // Refresh explorer view after a processor is dropped. + this._explorerController.refresh(); + } + return dropped; + } + ); } showOverviewPageIfRecentlyInstalled(): void { diff --git a/src/templates/playgroundCreateStreamProcessorTemplate.ts b/src/templates/playgroundCreateStreamProcessorTemplate.ts new file mode 100644 index 000000000..db2e9275b --- /dev/null +++ b/src/templates/playgroundCreateStreamProcessorTemplate.ts @@ -0,0 +1,28 @@ +const template = `/* global sp */ +// MongoDB Playground +// Use Ctrl+Space inside a snippet or a string literal to trigger completions. + +// create a new stream processor +/* sp.createStreamProcessor('newStreamProcessor', [ + { + $source: { + "connectionName": "myKafka", + "topic": "source" + } + }, + { + $match: { temperature: 46 } + }, + { + $emit: { + "connectionName": "mySink", + "topic" : "target", + } + } +]); */ + +// More information on the \`createStreamProcessor\` command can be found at: +// https://www.mongodb.com/docs/atlas/atlas-sp/manage-stream-processor/#create-a-stream-processor +`; + +export default template; diff --git a/src/templates/playgroundStreamsTemplate.ts b/src/templates/playgroundStreamsTemplate.ts new file mode 100644 index 000000000..14de553ce --- /dev/null +++ b/src/templates/playgroundStreamsTemplate.ts @@ -0,0 +1,26 @@ +const template = `/* global sp */ +// MongoDB Playground +// To disable this template go to Settings | MongoDB | Use Default Template For Playground. +// Make sure you are connected to enable completions and to be able to run a playground. +// Use Ctrl+Space inside a snippet or a string literal to trigger completions. +// The result of the last command run in a playground is shown on the results panel. +// Use 'console.log()' to print to the debug output. +// For more documentation on playgrounds please refer to +// https://www.mongodb.com/docs/mongodb-vscode/playgrounds/ + +// Connection can be added in Atlas UI or using Atlas CLI. See doc linked below for more info +// https://www.mongodb.com/docs/atlas/atlas-sp/manage-processing-instance/#add-a-connection-to-the-connection-registry +// List avialable connections +sp.listConnections(); + +// Use process to quickly test out the stream processing +sp.process([ + { + $source:{ + "connectionName": "sample_stream_solar" + } + } +]); +`; + +export default template; diff --git a/src/test/suite/explorer/connectionTreeItem.test.ts b/src/test/suite/explorer/connectionTreeItem.test.ts index b4455529b..13f7bd157 100644 --- a/src/test/suite/explorer/connectionTreeItem.test.ts +++ b/src/test/suite/explorer/connectionTreeItem.test.ts @@ -99,6 +99,54 @@ suite('ConnectionTreeItem Test Suite', () => { ); } }); + + suite('when connected to a Stream Processing Instnace', function () { + beforeEach(function () { + sandbox.replace( + mdbTestExtension.testExtensionController._connectionController, + 'isConnectedToAtlasStreams', + () => true + ); + }); + + test('returns stream processor tree items', async () => { + sandbox.replace( + mdbTestExtension.testExtensionController._connectionController, + 'getActiveDataService', + () => new DataServiceStub() as unknown as DataService + ); + + const spItems = await testConnectionTreeItem.getChildren(); + + assert.strictEqual(spItems.length, 2); + assert.strictEqual(spItems[0].label, 'mockStreamProcessor1'); + assert.strictEqual(spItems[1].label, 'mockStreamProcessor2'); + }); + + test('when listStreamProcessors errors it wraps it in a nice message', async () => { + sandbox.replace( + mdbTestExtension.testExtensionController._connectionController, + 'getActiveDataService', + () => + ({ + listStreamProcessors: () => + new Promise(() => { + throw Error('peaches'); + }), + } as unknown as DataService) + ); + + try { + await testConnectionTreeItem.getChildren(); + assert(false); + } catch (error) { + assert.strictEqual( + formatError(error).message, + 'Unable to list stream processors: peaches' + ); + } + }); + }); }); suite('#listDatabases', () => { diff --git a/src/test/suite/explorer/streamProcessorTreeItem.test.ts b/src/test/suite/explorer/streamProcessorTreeItem.test.ts new file mode 100644 index 000000000..10ef03387 --- /dev/null +++ b/src/test/suite/explorer/streamProcessorTreeItem.test.ts @@ -0,0 +1,72 @@ +import * as vscode from 'vscode'; +import assert from 'assert'; +import type { DataService } from 'mongodb-data-service'; + +import StreamProcessorTreeItem from '../../../explorer/streamProcessorTreeItem'; +import { DataServiceStub, mockStreamProcessors } from '../stubs'; + +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { contributes } = require('../../../../package.json'); + +function getTestTreeItem( + options?: Partial[0]> +) { + const { name, state } = mockStreamProcessors[1]; + return new StreamProcessorTreeItem({ + streamProcessorName: name, + streamProcessorState: state, + dataService: new DataServiceStub() as unknown as DataService, + isExpanded: false, + ...options, + }); +} + +suite('StreamProcessorTreeItem Test Suite', () => { + test('its context value should be in the package json', () => { + let spRegisteredCommandInPackageJson = false; + + const testStreamProcessorTreeItem = getTestTreeItem(); + + contributes.menus['view/item/context'].forEach((contextItem) => { + if (contextItem.when.includes(testStreamProcessorTreeItem.contextValue)) { + spRegisteredCommandInPackageJson = true; + } + }); + + assert( + spRegisteredCommandInPackageJson, + 'Expected stream processor tree item to be registered with a command in package json' + ); + }); + + test('when not expanded it does not show state', async () => { + const testStreamProcessorTreeItem = getTestTreeItem(); + + const children = await testStreamProcessorTreeItem.getChildren(); + assert.strictEqual( + children.length, + 0, + `Expected no state, recieved ${children.length}` + ); + }); + + test('when expanded shows the state as a child item in tree', async () => { + const testStreamProcessorTreeItem = getTestTreeItem(); + + await testStreamProcessorTreeItem.onDidExpand(); + + const children = await testStreamProcessorTreeItem.getChildren(); + assert( + children.length === 1, + `Expected exactly one state item to be returned, recieved ${children.length}` + ); + assert.strictEqual( + children[0].label, + `State: ${mockStreamProcessors[1].state}` + ); + assert.strictEqual( + children[0].collapsibleState, + vscode.TreeItemCollapsibleState.None + ); + }); +}); diff --git a/src/test/suite/extension.test.ts b/src/test/suite/extension.test.ts index f0379aded..7402e7d5c 100644 --- a/src/test/suite/extension.test.ts +++ b/src/test/suite/extension.test.ts @@ -55,6 +55,10 @@ suite('Extension Test Suite', () => { 'mdb.copyDocumentContentsFromTreeView', 'mdb.cloneDocumentFromTreeView', 'mdb.deleteDocumentFromTreeView', + 'mdb.addStreamProcessor', + 'mdb.startStreamProcessor', + 'mdb.stopStreamProcessor', + 'mdb.dropStreamProcessor', // Editor commands. 'mdb.codeLens.showMoreDocumentsClicked', diff --git a/src/test/suite/mdbExtensionController.test.ts b/src/test/suite/mdbExtensionController.test.ts index 388931535..a6d4dfefe 100644 --- a/src/test/suite/mdbExtensionController.test.ts +++ b/src/test/suite/mdbExtensionController.test.ts @@ -14,6 +14,7 @@ import { DatabaseTreeItem, DocumentTreeItem, SchemaTreeItem, + StreamProcessorTreeItem, } from '../../explorer'; import EXTENSION_COMMANDS from '../../commands'; import FieldTreeItem from '../../explorer/fieldTreeItem'; @@ -75,6 +76,18 @@ function getTestDatabaseTreeItem( }); } +function getTestStreamProcessorTreeItem( + options?: Partial[0]> +) { + return new StreamProcessorTreeItem({ + streamProcessorName: 'zebra', + streamProcessorState: 'CREATED', + dataService: {} as DataService, + isExpanded: false, + ...options, + }); +} + function getTestFieldTreeItem() { return new FieldTreeItem({ field: { @@ -1395,6 +1408,193 @@ suite('MDBExtensionController Test Suite', function () { assert.strictEqual(result, true); }); + test('mdb.addStreamProcessor should create a MongoDB playground with create stream processor template', async () => { + const testConnectionTreeItem = getTestConnectionTreeItem(); + await vscode.commands.executeCommand( + 'mdb.addStreamProcessor', + testConnectionTreeItem + ); + + const content = fakeCreatePlaygroundFileWithContent.firstCall.args[0]; + assert(content.includes('// create a new stream processor')); + assert(content.includes("sp.createStreamProcessor('newStreamProcessor'")); + }); + + test('mdb.startStreamProcessor starts the stream processor', async () => { + let calledProcessorName = ''; + const testProcessorTreeItem = getTestStreamProcessorTreeItem({ + dataService: { + startStreamProcessor: (spName: string) => { + calledProcessorName = spName; + return Promise.resolve(true); + }, + } as unknown as DataService, + }); + + const started = await vscode.commands.executeCommand( + 'mdb.startStreamProcessor', + testProcessorTreeItem + ); + + assert.strictEqual(started, true); + assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'STARTED'); + assert.strictEqual( + calledProcessorName, + testProcessorTreeItem.streamProcessorName + ); + assert.strictEqual( + showInformationMessageStub.firstCall.args[0], + 'Stream processor successfully started.' + ); + }); + + test('mdb.startStreamProcessor shows error when fails', async () => { + let calledProcessorName = ''; + const testProcessorTreeItem = getTestStreamProcessorTreeItem({ + dataService: { + startStreamProcessor: (spName: string) => { + calledProcessorName = spName; + return Promise.reject(new Error('Fake test error')); + }, + } as unknown as DataService, + }); + + const started = await vscode.commands.executeCommand( + 'mdb.startStreamProcessor', + testProcessorTreeItem + ); + + assert.strictEqual(started, false); + assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'CREATED'); + assert.strictEqual( + calledProcessorName, + testProcessorTreeItem.streamProcessorName + ); + assert.strictEqual( + showErrorMessageStub.firstCall.args[0], + 'Start stream processor failed: Fake test error' + ); + }); + + test('mdb.stopStreamProcessor stops the stream processor', async () => { + let calledProcessorName = ''; + const testProcessorTreeItem = getTestStreamProcessorTreeItem({ + dataService: { + stopStreamProcessor: (spName: string) => { + calledProcessorName = spName; + return Promise.resolve(true); + }, + } as unknown as DataService, + }); + + const stopped = await vscode.commands.executeCommand( + 'mdb.stopStreamProcessor', + testProcessorTreeItem + ); + + assert.strictEqual(stopped, true); + assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'STOPPED'); + assert.strictEqual( + calledProcessorName, + testProcessorTreeItem.streamProcessorName + ); + assert.strictEqual( + showInformationMessageStub.firstCall.args[0], + 'Stream processor successfully stopped.' + ); + }); + + test('mdb.stopStreamProcessor shows error when fails', async () => { + let calledProcessorName = ''; + const testProcessorTreeItem = getTestStreamProcessorTreeItem({ + dataService: { + stopStreamProcessor: (spName: string) => { + calledProcessorName = spName; + return Promise.reject(new Error('Fake test error')); + }, + } as unknown as DataService, + }); + + const stopped = await vscode.commands.executeCommand( + 'mdb.stopStreamProcessor', + testProcessorTreeItem + ); + + assert.strictEqual(stopped, false); + assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'CREATED'); + assert.strictEqual( + calledProcessorName, + testProcessorTreeItem.streamProcessorName + ); + assert.strictEqual( + showErrorMessageStub.firstCall.args[0], + 'Stop stream processor failed: Fake test error' + ); + }); + + test('mdb.dropStreamProcessor drops the stream processor after inputting the name', async () => { + let calledProcessorName = ''; + const testProcessorTreeItem = getTestStreamProcessorTreeItem({ + streamProcessorName: 'iMissTangerineAltoids', + dataService: { + dropStreamProcessor: (spName): Promise => { + calledProcessorName = spName; + return Promise.resolve(true); + }, + } as unknown as DataService, + }); + + const inputBoxResolvesStub = sandbox.stub(); + inputBoxResolvesStub.onCall(0).resolves('iMissTangerineAltoids'); + sandbox.replace(vscode.window, 'showInputBox', inputBoxResolvesStub); + + const dropped = await vscode.commands.executeCommand( + 'mdb.dropStreamProcessor', + testProcessorTreeItem + ); + assert.strictEqual(dropped, true); + assert.strictEqual(calledProcessorName, 'iMissTangerineAltoids'); + assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'DROPPED'); + assert.strictEqual( + showInformationMessageStub.firstCall.args[0], + 'Stream processor successfully dropped.' + ); + }); + + test('mdb.dropStreamProcessor shows error when fails', async () => { + let calledProcessorName = ''; + const testProcessorTreeItem = getTestStreamProcessorTreeItem({ + dataService: { + dropStreamProcessor: (spName: string) => { + calledProcessorName = spName; + return Promise.reject(new Error('Fake test error')); + }, + } as unknown as DataService, + }); + + const inputBoxResolvesStub = sandbox.stub(); + inputBoxResolvesStub + .onCall(0) + .resolves(testProcessorTreeItem.streamProcessorName); + sandbox.replace(vscode.window, 'showInputBox', inputBoxResolvesStub); + + const started = await vscode.commands.executeCommand( + 'mdb.dropStreamProcessor', + testProcessorTreeItem + ); + + assert.strictEqual(started, false); + assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'CREATED'); + assert.strictEqual( + calledProcessorName, + testProcessorTreeItem.streamProcessorName + ); + assert.strictEqual( + showErrorMessageStub.firstCall.args[0], + 'Drop stream processor failed: Fake test error' + ); + }); + suite('with mock execute command', function () { let executeCommandStub: SinonStub; diff --git a/src/test/suite/stubs.ts b/src/test/suite/stubs.ts index 41bfcd85b..ada97c86e 100644 --- a/src/test/suite/stubs.ts +++ b/src/test/suite/stubs.ts @@ -157,7 +157,22 @@ for (let i = 0; i < numberOfDocumentsToMock; i++) { }); } +const mockStreamProcessors = [ + { + name: 'mockStreamProcessor1', + state: 'STARTED', + }, + { + name: 'mockStreamProcessor2', + state: 'STOPPPED', + }, +]; + class DataServiceStub { + listStreamProcessors(): Promise { + return Promise.resolve(mockStreamProcessors); + } + listDatabases(): Promise { return Promise.resolve( mockDatabaseNames.map((dbName) => ({ name: dbName })) @@ -377,6 +392,7 @@ export { mockTextEditor, mockDatabaseNames, mockDatabases, + mockStreamProcessors, mockVSCodeTextDocument, DataServiceStub, ExtensionContextStub,