diff --git a/images/dark/stream-processor.svg b/images/dark/stream-processor.svg
new file mode 100644
index 000000000..f3268bcd7
--- /dev/null
+++ b/images/dark/stream-processor.svg
@@ -0,0 +1,11 @@
+
diff --git a/images/light/stream-processor.svg b/images/light/stream-processor.svg
new file mode 100644
index 000000000..4d4d58316
--- /dev/null
+++ b/images/light/stream-processor.svg
@@ -0,0 +1,11 @@
+
diff --git a/package-lock.json b/package-lock.json
index 2ac376da9..e1eaca309 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -34,7 +34,7 @@
"mongodb-build-info": "^1.6.2",
"mongodb-cloud-info": "^2.1.0",
"mongodb-connection-string-url": "^2.6.0",
- "mongodb-data-service": "^22.17.0",
+ "mongodb-data-service": "^22.17.1",
"mongodb-data-service-legacy": "npm:mongodb-data-service@22.8.0",
"mongodb-log-writer": "^1.4.0",
"mongodb-query-parser": "^3.1.3",
@@ -17544,9 +17544,9 @@
}
},
"node_modules/mongodb-data-service": {
- "version": "22.17.0",
- "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.0.tgz",
- "integrity": "sha512-5HO571xTjFIXZRwDEYMIefJvqQj8X8hvckrXB7U1LUEEVfIKh7ysmE1bO9eH3A50qr6i4L4bgWOV7OhzoATD6Q==",
+ "version": "22.17.1",
+ "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.1.tgz",
+ "integrity": "sha512-9TpxFMtMYuPVUOmwzNFnO+1rjzuiQkkAh1xwcat8xEFiIXY4O89h+2k7kbYiNWnwWD4LznvLyRcPqgreeTlQbg==",
"dependencies": {
"@mongodb-js/compass-logging": "^1.2.9",
"@mongodb-js/compass-utils": "^0.5.8",
@@ -38688,9 +38688,9 @@
}
},
"mongodb-data-service": {
- "version": "22.17.0",
- "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.0.tgz",
- "integrity": "sha512-5HO571xTjFIXZRwDEYMIefJvqQj8X8hvckrXB7U1LUEEVfIKh7ysmE1bO9eH3A50qr6i4L4bgWOV7OhzoATD6Q==",
+ "version": "22.17.1",
+ "resolved": "https://registry.npmjs.org/mongodb-data-service/-/mongodb-data-service-22.17.1.tgz",
+ "integrity": "sha512-9TpxFMtMYuPVUOmwzNFnO+1rjzuiQkkAh1xwcat8xEFiIXY4O89h+2k7kbYiNWnwWD4LznvLyRcPqgreeTlQbg==",
"requires": {
"@mongodb-js/compass-logging": "^1.2.9",
"@mongodb-js/compass-utils": "^0.5.8",
diff --git a/package.json b/package.json
index 78b7cad08..3ede8fd02 100644
--- a/package.json
+++ b/package.json
@@ -397,6 +397,26 @@
{
"command": "mdb.deleteDocumentFromTreeView",
"title": "Delete Document..."
+ },
+ {
+ "command": "mdb.addStreamProcessor",
+ "title": "Add StreamProcessor...",
+ "icon": {
+ "light": "images/light/plus-circle.svg",
+ "dark": "images/dark/plus-circle.svg"
+ }
+ },
+ {
+ "command": "mdb.startStreamProcessor",
+ "title": "Start Stream Processor"
+ },
+ {
+ "command": "mdb.stopStreamProcessor",
+ "title": "Stop Stream Processor"
+ },
+ {
+ "command": "mdb.dropStreamProcessor",
+ "title": "Drop Stream Processor..."
}
],
"menus": {
@@ -428,12 +448,22 @@
"view/item/context": [
{
"command": "mdb.addDatabase",
- "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem",
+ "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == false",
"group": "inline"
},
{
"command": "mdb.addDatabase",
- "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem",
+ "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == false",
+ "group": "1@1"
+ },
+ {
+ "command": "mdb.addStreamProcessor",
+ "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == true",
+ "group": "inline"
+ },
+ {
+ "command": "mdb.addStreamProcessor",
+ "when": "view == mongoDBConnectionExplorer && viewItem == connectedConnectionTreeItem && mdb.isAtlasStreams == true",
"group": "1@1"
},
{
@@ -611,6 +641,18 @@
"command": "mdb.deleteDocumentFromTreeView",
"when": "view == mongoDBConnectionExplorer && viewItem == documentTreeItem",
"group": "3@1"
+ },
+ {
+ "command": "mdb.startStreamProcessor",
+ "when": "view == mongoDBConnectionExplorer && viewItem == streamProcessorTreeItem"
+ },
+ {
+ "command": "mdb.stopStreamProcessor",
+ "when": "view == mongoDBConnectionExplorer && viewItem == streamProcessorTreeItem"
+ },
+ {
+ "command": "mdb.dropStreamProcessor",
+ "when": "view == mongoDBConnectionExplorer && viewItem == streamProcessorTreeItem"
}
],
"editor/title": [
@@ -639,27 +681,27 @@
},
{
"command": "mdb.exportToRuby",
- "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true"
+ "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false"
},
{
"command": "mdb.exportToPython",
- "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true"
+ "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false"
},
{
"command": "mdb.exportToJava",
- "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true"
+ "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false"
},
{
"command": "mdb.exportToCsharp",
- "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true"
+ "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false"
},
{
"command": "mdb.exportToNode",
- "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true"
+ "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false"
},
{
"command": "mdb.exportToGo",
- "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true"
+ "when": "mdb.isPlayground == true && mdb.connectedToMongoDB == true && mdb.isAtlasStreams == false"
},
{
"command": "mdb.refreshPlaygroundsFromTreeView",
@@ -804,6 +846,22 @@
{
"command": "mdb.deleteDocumentFromTreeView",
"when": "false"
+ },
+ {
+ "command": "mdb.addStreamProcessor",
+ "when": "false"
+ },
+ {
+ "command": "mdb.startStreamProcessor",
+ "when": "false"
+ },
+ {
+ "command": "mdb.stopStreamProcessor",
+ "when": "false"
+ },
+ {
+ "command": "mdb.dropStreamProcessor",
+ "when": "false"
}
]
},
@@ -996,7 +1054,7 @@
"mongodb-build-info": "^1.6.2",
"mongodb-cloud-info": "^2.1.0",
"mongodb-connection-string-url": "^2.6.0",
- "mongodb-data-service": "^22.17.0",
+ "mongodb-data-service": "^22.17.1",
"mongodb-data-service-legacy": "npm:mongodb-data-service@22.8.0",
"mongodb-log-writer": "^1.4.0",
"mongodb-query-parser": "^3.1.3",
diff --git a/src/commands/index.ts b/src/commands/index.ts
index 0b8dcbc89..ede46c20c 100644
--- a/src/commands/index.ts
+++ b/src/commands/index.ts
@@ -66,6 +66,10 @@ enum EXTENSION_COMMANDS {
MDB_COPY_DOCUMENT_CONTENTS_FROM_TREE_VIEW = 'mdb.copyDocumentContentsFromTreeView',
MDB_CLONE_DOCUMENT_FROM_TREE_VIEW = 'mdb.cloneDocumentFromTreeView',
MDB_DELETE_DOCUMENT_FROM_TREE_VIEW = 'mdb.deleteDocumentFromTreeView',
+ MDB_ADD_STREAM_PROCESSOR = 'mdb.addStreamProcessor',
+ MDB_START_STREAM_PROCESSOR = 'mdb.startStreamProcessor',
+ MDB_STOP_STREAM_PROCESSOR = 'mdb.stopStreamProcessor',
+ MDB_DROP_STREAM_PROCESSOR = 'mdb.dropStreamProcessor',
}
export default EXTENSION_COMMANDS;
diff --git a/src/connectionController.ts b/src/connectionController.ts
index bd3d14ea9..3aba4630c 100644
--- a/src/connectionController.ts
+++ b/src/connectionController.ts
@@ -29,6 +29,7 @@ import type TelemetryService from './telemetry/telemetryService';
import type { LoadedConnection } from './storage/connectionStorage';
import { ConnectionStorage } from './storage/connectionStorage';
import LINKS from './utils/links';
+import { isAtlasStream } from 'mongodb-build-info';
export function launderConnectionOptionTypeFromLegacyToCurrent(
opts: ConnectionOptionsFromLegacyDS
@@ -346,6 +347,12 @@ export default class ConnectionController {
true
);
+ void vscode.commands.executeCommand(
+ 'setContext',
+ 'mdb.isAtlasStreams',
+ this.isConnectedToAtlasStreams()
+ );
+
return {
successfullyConnected: true,
connectionErrorMessage: '',
@@ -408,6 +415,11 @@ export default class ConnectionController {
'mdb.connectedToMongoDB',
false
);
+ void vscode.commands.executeCommand(
+ 'setContext',
+ 'mdb.isAtlasStreams',
+ false
+ );
} catch (error) {
// Show an error, however we still reset the active connection to free up the extension.
void vscode.window.showErrorMessage(
@@ -635,6 +647,13 @@ export default class ConnectionController {
return connectionStringData.toString();
}
+ isConnectedToAtlasStreams() {
+ return (
+ this.isCurrentlyConnected() &&
+ isAtlasStream(this.getActiveConnectionString())
+ );
+ }
+
getActiveConnectionString(): string {
const mongoClientConnectionOptions = this.getMongoClientConnectionOptions();
const connectionString = mongoClientConnectionOptions?.url;
diff --git a/src/editors/playgroundController.ts b/src/editors/playgroundController.ts
index 49f0c6adb..6e932d433 100644
--- a/src/editors/playgroundController.ts
+++ b/src/editors/playgroundController.ts
@@ -20,6 +20,8 @@ import playgroundCreateIndexTemplate from '../templates/playgroundCreateIndexTem
import playgroundCreateCollectionTemplate from '../templates/playgroundCreateCollectionTemplate';
import playgroundCloneDocumentTemplate from '../templates/playgroundCloneDocumentTemplate';
import playgroundInsertDocumentTemplate from '../templates/playgroundInsertDocumentTemplate';
+import playgroundStreamsTemplate from '../templates/playgroundStreamsTemplate';
+import playgroundCreateStreamProcessorTemplate from '../templates/playgroundCreateStreamProcessorTemplate';
import type {
PlaygroundResult,
ShellEvaluateResult,
@@ -391,11 +393,25 @@ export default class PlaygroundController {
return this._createPlaygroundFileWithContent(content);
}
+ async createPlaygroundForCreateStreamProcessor(
+ element: ConnectionTreeItem
+ ): Promise {
+ const content = playgroundCreateStreamProcessorTemplate;
+
+ element.cacheIsUpToDate = false;
+
+ this._telemetryService.trackPlaygroundCreated('createStreamProcessor');
+
+ return this._createPlaygroundFileWithContent(content);
+ }
+
async createPlayground(): Promise {
const useDefaultTemplate = !!vscode.workspace
.getConfiguration('mdb')
.get('useDefaultTemplateForPlayground');
- const content = useDefaultTemplate ? playgroundTemplate : '';
+ const isStreams = this._connectionController.isConnectedToAtlasStreams();
+ const template = isStreams ? playgroundStreamsTemplate : playgroundTemplate;
+ const content = useDefaultTemplate ? template : '';
this._telemetryService.trackPlaygroundCreated('crud');
return this._createPlaygroundFileWithContent(content);
diff --git a/src/explorer/connectionTreeItem.ts b/src/explorer/connectionTreeItem.ts
index e110892ea..2ca9a7d9a 100644
--- a/src/explorer/connectionTreeItem.ts
+++ b/src/explorer/connectionTreeItem.ts
@@ -1,11 +1,13 @@
import * as vscode from 'vscode';
import path from 'path';
+import type { StreamProcessor } from 'mongodb-data-service/lib/data-service';
import DatabaseTreeItem from './databaseTreeItem';
import type ConnectionController from '../connectionController';
import formatError from '../utils/formatError';
import { getImagesPath } from '../extensionConstants';
import type TreeItemParent from './treeItemParentInterface';
+import StreamProcessorTreeItem from './streamProcessorTreeItem';
export enum ConnectionItemContextValues {
disconnected = 'disconnectedConnectionTreeItem',
@@ -36,7 +38,9 @@ export default class ConnectionTreeItem
{
contextValue = ConnectionItemContextValues.disconnected;
- private _childrenCache: { [key: string]: DatabaseTreeItem };
+ private _childrenCache: {
+ [key: string]: DatabaseTreeItem | StreamProcessorTreeItem;
+ };
cacheIsUpToDate: boolean;
private _connectionController: ConnectionController;
@@ -57,7 +61,9 @@ export default class ConnectionTreeItem
isExpanded: boolean;
connectionController: ConnectionController;
cacheIsUpToDate: boolean;
- childrenCache: { [key: string]: DatabaseTreeItem }; // Existing cache.
+ childrenCache: {
+ [key: string]: DatabaseTreeItem | StreamProcessorTreeItem;
+ }; // Existing cache.
}) {
super(
connectionController.getSavedConnectionName(connectionId),
@@ -118,6 +124,23 @@ export default class ConnectionTreeItem
}
}
+ async listStreamProcessors(): Promise {
+ const dataService = this._connectionController.getActiveDataService();
+
+ if (dataService === null) {
+ throw new Error('Not currently connected.');
+ }
+
+ try {
+ const processors = await dataService.listStreamProcessors();
+ return processors;
+ } catch (error) {
+ throw new Error(
+ `Unable to list stream processors: ${formatError(error).message}`
+ );
+ }
+ }
+
async getChildren(): Promise {
if (
!this.isExpanded ||
@@ -133,67 +156,107 @@ export default class ConnectionTreeItem
throw new Error('Not currently connected.');
}
+ const isAtlasStreams =
+ this._connectionController.isConnectedToAtlasStreams();
+
if (this.cacheIsUpToDate) {
const pastChildrenCache = this._childrenCache;
this._childrenCache = {};
- // We create a new database tree item here instead of reusing the
+ // We create a new tree item here instead of reusing the
// cached one in order to ensure the expanded state is set.
- Object.keys(pastChildrenCache).forEach((databaseName) => {
- const prevChild = pastChildrenCache[databaseName];
+ Object.keys(pastChildrenCache).forEach((childName) => {
+ const prevChild = pastChildrenCache[childName];
if (prevChild.isDropped) {
return;
}
- this._childrenCache[databaseName] = new DatabaseTreeItem({
- databaseName,
- dataService,
- isExpanded: prevChild.isExpanded,
- cacheIsUpToDate: prevChild.cacheIsUpToDate,
- childrenCache: prevChild.getChildrenCache(),
- });
+ if (isAtlasStreams) {
+ const spItem = prevChild as StreamProcessorTreeItem;
+ this._childrenCache[childName] = new StreamProcessorTreeItem({
+ dataService,
+ isExpanded: spItem.isExpanded,
+ streamProcessorName: spItem.streamProcessorName,
+ streamProcessorState: spItem.streamProcessorState,
+ });
+ } else {
+ const dbItem = prevChild as DatabaseTreeItem;
+ this._childrenCache[childName] = new DatabaseTreeItem({
+ databaseName: childName,
+ dataService,
+ isExpanded: dbItem.isExpanded,
+ cacheIsUpToDate: dbItem.cacheIsUpToDate,
+ childrenCache: dbItem.getChildrenCache(),
+ });
+ }
});
return Object.values(this._childrenCache);
}
- const databases = await this.listDatabases();
- databases.sort((a: string, b: string) => {
- return a.localeCompare(b);
- });
+ if (isAtlasStreams) {
+ const processors = await this.listStreamProcessors();
+ processors.sort((a: StreamProcessor, b: StreamProcessor) => {
+ return a.name.localeCompare(b.name);
+ });
- this.cacheIsUpToDate = true;
+ this.cacheIsUpToDate = true;
- if (!databases) {
+ const pastChildrenCache = this._childrenCache;
this._childrenCache = {};
- return [];
- }
-
- const pastChildrenCache = this._childrenCache;
- this._childrenCache = {};
-
- databases.forEach((name: string) => {
- if (pastChildrenCache[name]) {
+ processors.forEach((sp) => {
+ const cachedItem = pastChildrenCache[
+ sp.name
+ ] as StreamProcessorTreeItem;
// We create a new element here instead of reusing the cached one
// in order to ensure the expanded state is set.
- this._childrenCache[name] = new DatabaseTreeItem({
- databaseName: name,
+ this._childrenCache[sp.name] = new StreamProcessorTreeItem({
dataService,
- isExpanded: pastChildrenCache[name].isExpanded,
- cacheIsUpToDate: pastChildrenCache[name].cacheIsUpToDate,
- childrenCache: pastChildrenCache[name].getChildrenCache(),
- });
- } else {
- this._childrenCache[name] = new DatabaseTreeItem({
- databaseName: name,
- dataService,
- isExpanded: false,
- cacheIsUpToDate: false, // Cache is not up to date (no cache).
- childrenCache: {}, // No existing cache.
+ streamProcessorName: sp.name,
+ streamProcessorState: sp.state,
+ isExpanded: cachedItem ? cachedItem.isExpanded : false,
});
+ });
+ } else {
+ const databases = await this.listDatabases();
+ databases.sort((a: string, b: string) => {
+ return a.localeCompare(b);
+ });
+
+ this.cacheIsUpToDate = true;
+
+ if (!databases) {
+ this._childrenCache = {};
+ return [];
}
- });
+
+ const pastChildrenCache = this._childrenCache;
+ this._childrenCache = {};
+
+ databases.forEach((name: string) => {
+ const cachedItem = pastChildrenCache[name] as DatabaseTreeItem;
+ if (cachedItem) {
+ // We create a new element here instead of reusing the cached one
+ // in order to ensure the expanded state is set.
+ this._childrenCache[name] = new DatabaseTreeItem({
+ databaseName: name,
+ dataService,
+ isExpanded: cachedItem.isExpanded,
+ cacheIsUpToDate: cachedItem.cacheIsUpToDate,
+ childrenCache: cachedItem.getChildrenCache(),
+ });
+ } else {
+ this._childrenCache[name] = new DatabaseTreeItem({
+ databaseName: name,
+ dataService,
+ isExpanded: false,
+ cacheIsUpToDate: false, // Cache is not up to date (no cache).
+ childrenCache: {}, // No existing cache.
+ });
+ }
+ });
+ }
return Object.values(this._childrenCache);
}
@@ -235,7 +298,9 @@ export default class ConnectionTreeItem
this.cacheIsUpToDate = false;
}
- getChildrenCache(): { [key: string]: DatabaseTreeItem } {
+ getChildrenCache(): {
+ [key: string]: DatabaseTreeItem | StreamProcessorTreeItem;
+ } {
return this._childrenCache;
}
}
diff --git a/src/explorer/index.ts b/src/explorer/index.ts
index dd67393e9..849da95a3 100644
--- a/src/explorer/index.ts
+++ b/src/explorer/index.ts
@@ -7,6 +7,7 @@ import DocumentTreeItem from './documentTreeItem';
import HelpExplorer from './helpExplorer';
import SchemaTreeItem from './schemaTreeItem';
import PlaygroundsExplorer from './playgroundsExplorer';
+import StreamProcessorTreeItem from './streamProcessorTreeItem';
export {
CollectionTreeItem,
@@ -18,4 +19,5 @@ export {
HelpExplorer,
PlaygroundsExplorer,
SchemaTreeItem,
+ StreamProcessorTreeItem,
};
diff --git a/src/explorer/streamProcessorTreeItem.ts b/src/explorer/streamProcessorTreeItem.ts
new file mode 100644
index 000000000..a728cb50e
--- /dev/null
+++ b/src/explorer/streamProcessorTreeItem.ts
@@ -0,0 +1,149 @@
+import * as vscode from 'vscode';
+import path from 'path';
+import type { DataService } from 'mongodb-data-service';
+
+import formatError from '../utils/formatError';
+import { getImagesPath } from '../extensionConstants';
+import type TreeItemParent from './treeItemParentInterface';
+
+function getIconPath(): { light: string; dark: string } {
+ const LIGHT = path.join(getImagesPath(), 'light');
+ const DARK = path.join(getImagesPath(), 'dark');
+
+ return {
+ light: path.join(LIGHT, 'stream-processor.svg'),
+ dark: path.join(DARK, 'stream-processor.svg'),
+ };
+}
+
+export default class StreamProcessorTreeItem
+ extends vscode.TreeItem
+ implements TreeItemParent, vscode.TreeDataProvider
+{
+ contextValue = 'streamProcessorTreeItem' as const;
+ cacheIsUpToDate = true;
+ isExpanded: boolean;
+ isDropped = false;
+
+ streamProcessorName: string;
+ streamProcessorState: string;
+
+ private _dataService: DataService;
+
+ constructor({
+ streamProcessorName,
+ streamProcessorState,
+ dataService,
+ isExpanded,
+ }: {
+ streamProcessorName: string;
+ streamProcessorState: string;
+ dataService: DataService;
+ isExpanded: boolean;
+ }) {
+ super(
+ streamProcessorName,
+ isExpanded
+ ? vscode.TreeItemCollapsibleState.Expanded
+ : vscode.TreeItemCollapsibleState.Collapsed
+ );
+
+ this._dataService = dataService;
+ this.iconPath = getIconPath();
+ this.isExpanded = isExpanded;
+ this.tooltip = streamProcessorName;
+ this.streamProcessorName = streamProcessorName;
+ this.streamProcessorState = streamProcessorState;
+ }
+
+ getTreeItem(element: StreamProcessorTreeItem): StreamProcessorTreeItem {
+ return element;
+ }
+
+ getChildren(): Promise {
+ return Promise.resolve(
+ !this.isExpanded
+ ? []
+ : [
+ new vscode.TreeItem(
+ `State: ${this.streamProcessorState}`,
+ vscode.TreeItemCollapsibleState.None
+ ),
+ ]
+ );
+ }
+
+ onDidCollapse(): void {
+ this.isExpanded = false;
+ this.cacheIsUpToDate = false;
+ }
+
+ onDidExpand(): Promise {
+ this.cacheIsUpToDate = false;
+ this.isExpanded = true;
+ return Promise.resolve(true);
+ }
+
+ async onStartClicked(): Promise {
+ try {
+ await this._dataService.startStreamProcessor(this.streamProcessorName);
+ this.streamProcessorState = 'STARTED';
+ return true;
+ } catch (error) {
+ void vscode.window.showErrorMessage(
+ `Start stream processor failed: ${formatError(error).message}`
+ );
+ return false;
+ }
+ }
+
+ async onStopClicked(): Promise {
+ try {
+ await this._dataService.stopStreamProcessor(this.streamProcessorName);
+ this.streamProcessorState = 'STOPPED';
+ return true;
+ } catch (error) {
+ void vscode.window.showErrorMessage(
+ `Stop stream processor failed: ${formatError(error).message}`
+ );
+ return false;
+ }
+ }
+
+ // Prompt the user to input the name to confirm the drop, then drop.
+ async onDropClicked(): Promise {
+ let inputtedName: string | undefined;
+ try {
+ inputtedName = await vscode.window.showInputBox({
+ value: '',
+ placeHolder: 'e.g. myStreamProcessor',
+ prompt: `Are you sure you wish to drop this stream processor? Enter the stream processor name '${this.streamProcessorName}' to confirm.`,
+ validateInput: (inputName) => {
+ if (inputName && this.streamProcessorName !== inputName) {
+ return 'Stream processor name does not match';
+ }
+ return null;
+ },
+ });
+ } catch (e) {
+ return Promise.reject(
+ new Error(`An error occured parsing the stream processor name: ${e}`)
+ );
+ }
+
+ if (this.streamProcessorName !== inputtedName) {
+ return Promise.resolve(false);
+ }
+
+ try {
+ await this._dataService.dropStreamProcessor(this.streamProcessorName);
+ this.streamProcessorState = 'DROPPED';
+ this.isDropped = true;
+ } catch (error) {
+ void vscode.window.showErrorMessage(
+ `Drop stream processor failed: ${formatError(error).message}`
+ );
+ }
+ return this.isDropped;
+ }
+}
diff --git a/src/mdbExtensionController.ts b/src/mdbExtensionController.ts
index 65305a710..d4f29d481 100644
--- a/src/mdbExtensionController.ts
+++ b/src/mdbExtensionController.ts
@@ -39,6 +39,7 @@ import PlaygroundResultProvider from './editors/playgroundResultProvider';
import WebviewController from './views/webviewController';
import { createIdFactory, generateId } from './utils/objectIdHelper';
import { ConnectionStorage } from './storage/connectionStorage';
+import type StreamProcessorTreeItem from './explorer/streamProcessorTreeItem';
// This class is the top-level controller for our extension.
// Commands which the extensions handles are defined in the function `activate`.
@@ -665,6 +666,91 @@ export default class MDBExtensionController implements vscode.Disposable {
return true;
}
);
+ this.registerCommand(
+ EXTENSION_COMMANDS.MDB_ADD_STREAM_PROCESSOR,
+ async (element: ConnectionTreeItem): Promise => {
+ if (!element) {
+ void vscode.window.showErrorMessage(
+ 'Please wait for the connection to finish loading before adding a stream processor.'
+ );
+
+ return false;
+ }
+
+ if (
+ element.connectionId !==
+ this._connectionController.getActiveConnectionId()
+ ) {
+ void vscode.window.showErrorMessage(
+ 'Please connect to this connection before adding a stream processor.'
+ );
+
+ return false;
+ }
+
+ if (this._connectionController.isDisconnecting()) {
+ void vscode.window.showErrorMessage(
+ 'Unable to add stream processor: currently disconnecting.'
+ );
+
+ return false;
+ }
+
+ if (this._connectionController.isConnecting()) {
+ void vscode.window.showErrorMessage(
+ 'Unable to add stream processor: currently connecting.'
+ );
+
+ return false;
+ }
+
+ return this._playgroundController.createPlaygroundForCreateStreamProcessor(
+ element
+ );
+ }
+ );
+ this.registerCommand(
+ EXTENSION_COMMANDS.MDB_START_STREAM_PROCESSOR,
+ async (element: StreamProcessorTreeItem): Promise => {
+ const started = await element.onStartClicked();
+ if (started) {
+ void vscode.window.showInformationMessage(
+ 'Stream processor successfully started.'
+ );
+ // Refresh explorer view after a processor is started.
+ this._explorerController.refresh();
+ }
+ return started;
+ }
+ );
+ this.registerCommand(
+ EXTENSION_COMMANDS.MDB_STOP_STREAM_PROCESSOR,
+ async (element: StreamProcessorTreeItem): Promise => {
+ const stopped = await element.onStopClicked();
+ if (stopped) {
+ void vscode.window.showInformationMessage(
+ 'Stream processor successfully stopped.'
+ );
+ // Refresh explorer view after a processor is stopped.
+ this._explorerController.refresh();
+ }
+ return stopped;
+ }
+ );
+ this.registerCommand(
+ EXTENSION_COMMANDS.MDB_DROP_STREAM_PROCESSOR,
+ async (element: StreamProcessorTreeItem): Promise => {
+ const dropped = await element.onDropClicked();
+ if (dropped) {
+ void vscode.window.showInformationMessage(
+ 'Stream processor successfully dropped.'
+ );
+ // Refresh explorer view after a processor is dropped.
+ this._explorerController.refresh();
+ }
+ return dropped;
+ }
+ );
}
showOverviewPageIfRecentlyInstalled(): void {
diff --git a/src/templates/playgroundCreateStreamProcessorTemplate.ts b/src/templates/playgroundCreateStreamProcessorTemplate.ts
new file mode 100644
index 000000000..db2e9275b
--- /dev/null
+++ b/src/templates/playgroundCreateStreamProcessorTemplate.ts
@@ -0,0 +1,28 @@
+const template = `/* global sp */
+// MongoDB Playground
+// Use Ctrl+Space inside a snippet or a string literal to trigger completions.
+
+// create a new stream processor
+/* sp.createStreamProcessor('newStreamProcessor', [
+ {
+ $source: {
+ "connectionName": "myKafka",
+ "topic": "source"
+ }
+ },
+ {
+ $match: { temperature: 46 }
+ },
+ {
+ $emit: {
+ "connectionName": "mySink",
+ "topic" : "target",
+ }
+ }
+]); */
+
+// More information on the \`createStreamProcessor\` command can be found at:
+// https://www.mongodb.com/docs/atlas/atlas-sp/manage-stream-processor/#create-a-stream-processor
+`;
+
+export default template;
diff --git a/src/templates/playgroundStreamsTemplate.ts b/src/templates/playgroundStreamsTemplate.ts
new file mode 100644
index 000000000..14de553ce
--- /dev/null
+++ b/src/templates/playgroundStreamsTemplate.ts
@@ -0,0 +1,26 @@
+const template = `/* global sp */
+// MongoDB Playground
+// To disable this template go to Settings | MongoDB | Use Default Template For Playground.
+// Make sure you are connected to enable completions and to be able to run a playground.
+// Use Ctrl+Space inside a snippet or a string literal to trigger completions.
+// The result of the last command run in a playground is shown on the results panel.
+// Use 'console.log()' to print to the debug output.
+// For more documentation on playgrounds please refer to
+// https://www.mongodb.com/docs/mongodb-vscode/playgrounds/
+
+// Connection can be added in Atlas UI or using Atlas CLI. See doc linked below for more info
+// https://www.mongodb.com/docs/atlas/atlas-sp/manage-processing-instance/#add-a-connection-to-the-connection-registry
+// List avialable connections
+sp.listConnections();
+
+// Use process to quickly test out the stream processing
+sp.process([
+ {
+ $source:{
+ "connectionName": "sample_stream_solar"
+ }
+ }
+]);
+`;
+
+export default template;
diff --git a/src/test/suite/explorer/connectionTreeItem.test.ts b/src/test/suite/explorer/connectionTreeItem.test.ts
index b4455529b..13f7bd157 100644
--- a/src/test/suite/explorer/connectionTreeItem.test.ts
+++ b/src/test/suite/explorer/connectionTreeItem.test.ts
@@ -99,6 +99,54 @@ suite('ConnectionTreeItem Test Suite', () => {
);
}
});
+
+ suite('when connected to a Stream Processing Instnace', function () {
+ beforeEach(function () {
+ sandbox.replace(
+ mdbTestExtension.testExtensionController._connectionController,
+ 'isConnectedToAtlasStreams',
+ () => true
+ );
+ });
+
+ test('returns stream processor tree items', async () => {
+ sandbox.replace(
+ mdbTestExtension.testExtensionController._connectionController,
+ 'getActiveDataService',
+ () => new DataServiceStub() as unknown as DataService
+ );
+
+ const spItems = await testConnectionTreeItem.getChildren();
+
+ assert.strictEqual(spItems.length, 2);
+ assert.strictEqual(spItems[0].label, 'mockStreamProcessor1');
+ assert.strictEqual(spItems[1].label, 'mockStreamProcessor2');
+ });
+
+ test('when listStreamProcessors errors it wraps it in a nice message', async () => {
+ sandbox.replace(
+ mdbTestExtension.testExtensionController._connectionController,
+ 'getActiveDataService',
+ () =>
+ ({
+ listStreamProcessors: () =>
+ new Promise(() => {
+ throw Error('peaches');
+ }),
+ } as unknown as DataService)
+ );
+
+ try {
+ await testConnectionTreeItem.getChildren();
+ assert(false);
+ } catch (error) {
+ assert.strictEqual(
+ formatError(error).message,
+ 'Unable to list stream processors: peaches'
+ );
+ }
+ });
+ });
});
suite('#listDatabases', () => {
diff --git a/src/test/suite/explorer/streamProcessorTreeItem.test.ts b/src/test/suite/explorer/streamProcessorTreeItem.test.ts
new file mode 100644
index 000000000..10ef03387
--- /dev/null
+++ b/src/test/suite/explorer/streamProcessorTreeItem.test.ts
@@ -0,0 +1,72 @@
+import * as vscode from 'vscode';
+import assert from 'assert';
+import type { DataService } from 'mongodb-data-service';
+
+import StreamProcessorTreeItem from '../../../explorer/streamProcessorTreeItem';
+import { DataServiceStub, mockStreamProcessors } from '../stubs';
+
+// eslint-disable-next-line @typescript-eslint/no-var-requires
+const { contributes } = require('../../../../package.json');
+
+function getTestTreeItem(
+ options?: Partial[0]>
+) {
+ const { name, state } = mockStreamProcessors[1];
+ return new StreamProcessorTreeItem({
+ streamProcessorName: name,
+ streamProcessorState: state,
+ dataService: new DataServiceStub() as unknown as DataService,
+ isExpanded: false,
+ ...options,
+ });
+}
+
+suite('StreamProcessorTreeItem Test Suite', () => {
+ test('its context value should be in the package json', () => {
+ let spRegisteredCommandInPackageJson = false;
+
+ const testStreamProcessorTreeItem = getTestTreeItem();
+
+ contributes.menus['view/item/context'].forEach((contextItem) => {
+ if (contextItem.when.includes(testStreamProcessorTreeItem.contextValue)) {
+ spRegisteredCommandInPackageJson = true;
+ }
+ });
+
+ assert(
+ spRegisteredCommandInPackageJson,
+ 'Expected stream processor tree item to be registered with a command in package json'
+ );
+ });
+
+ test('when not expanded it does not show state', async () => {
+ const testStreamProcessorTreeItem = getTestTreeItem();
+
+ const children = await testStreamProcessorTreeItem.getChildren();
+ assert.strictEqual(
+ children.length,
+ 0,
+ `Expected no state, recieved ${children.length}`
+ );
+ });
+
+ test('when expanded shows the state as a child item in tree', async () => {
+ const testStreamProcessorTreeItem = getTestTreeItem();
+
+ await testStreamProcessorTreeItem.onDidExpand();
+
+ const children = await testStreamProcessorTreeItem.getChildren();
+ assert(
+ children.length === 1,
+ `Expected exactly one state item to be returned, recieved ${children.length}`
+ );
+ assert.strictEqual(
+ children[0].label,
+ `State: ${mockStreamProcessors[1].state}`
+ );
+ assert.strictEqual(
+ children[0].collapsibleState,
+ vscode.TreeItemCollapsibleState.None
+ );
+ });
+});
diff --git a/src/test/suite/extension.test.ts b/src/test/suite/extension.test.ts
index f0379aded..7402e7d5c 100644
--- a/src/test/suite/extension.test.ts
+++ b/src/test/suite/extension.test.ts
@@ -55,6 +55,10 @@ suite('Extension Test Suite', () => {
'mdb.copyDocumentContentsFromTreeView',
'mdb.cloneDocumentFromTreeView',
'mdb.deleteDocumentFromTreeView',
+ 'mdb.addStreamProcessor',
+ 'mdb.startStreamProcessor',
+ 'mdb.stopStreamProcessor',
+ 'mdb.dropStreamProcessor',
// Editor commands.
'mdb.codeLens.showMoreDocumentsClicked',
diff --git a/src/test/suite/mdbExtensionController.test.ts b/src/test/suite/mdbExtensionController.test.ts
index 388931535..a6d4dfefe 100644
--- a/src/test/suite/mdbExtensionController.test.ts
+++ b/src/test/suite/mdbExtensionController.test.ts
@@ -14,6 +14,7 @@ import {
DatabaseTreeItem,
DocumentTreeItem,
SchemaTreeItem,
+ StreamProcessorTreeItem,
} from '../../explorer';
import EXTENSION_COMMANDS from '../../commands';
import FieldTreeItem from '../../explorer/fieldTreeItem';
@@ -75,6 +76,18 @@ function getTestDatabaseTreeItem(
});
}
+function getTestStreamProcessorTreeItem(
+ options?: Partial[0]>
+) {
+ return new StreamProcessorTreeItem({
+ streamProcessorName: 'zebra',
+ streamProcessorState: 'CREATED',
+ dataService: {} as DataService,
+ isExpanded: false,
+ ...options,
+ });
+}
+
function getTestFieldTreeItem() {
return new FieldTreeItem({
field: {
@@ -1395,6 +1408,193 @@ suite('MDBExtensionController Test Suite', function () {
assert.strictEqual(result, true);
});
+ test('mdb.addStreamProcessor should create a MongoDB playground with create stream processor template', async () => {
+ const testConnectionTreeItem = getTestConnectionTreeItem();
+ await vscode.commands.executeCommand(
+ 'mdb.addStreamProcessor',
+ testConnectionTreeItem
+ );
+
+ const content = fakeCreatePlaygroundFileWithContent.firstCall.args[0];
+ assert(content.includes('// create a new stream processor'));
+ assert(content.includes("sp.createStreamProcessor('newStreamProcessor'"));
+ });
+
+ test('mdb.startStreamProcessor starts the stream processor', async () => {
+ let calledProcessorName = '';
+ const testProcessorTreeItem = getTestStreamProcessorTreeItem({
+ dataService: {
+ startStreamProcessor: (spName: string) => {
+ calledProcessorName = spName;
+ return Promise.resolve(true);
+ },
+ } as unknown as DataService,
+ });
+
+ const started = await vscode.commands.executeCommand(
+ 'mdb.startStreamProcessor',
+ testProcessorTreeItem
+ );
+
+ assert.strictEqual(started, true);
+ assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'STARTED');
+ assert.strictEqual(
+ calledProcessorName,
+ testProcessorTreeItem.streamProcessorName
+ );
+ assert.strictEqual(
+ showInformationMessageStub.firstCall.args[0],
+ 'Stream processor successfully started.'
+ );
+ });
+
+ test('mdb.startStreamProcessor shows error when fails', async () => {
+ let calledProcessorName = '';
+ const testProcessorTreeItem = getTestStreamProcessorTreeItem({
+ dataService: {
+ startStreamProcessor: (spName: string) => {
+ calledProcessorName = spName;
+ return Promise.reject(new Error('Fake test error'));
+ },
+ } as unknown as DataService,
+ });
+
+ const started = await vscode.commands.executeCommand(
+ 'mdb.startStreamProcessor',
+ testProcessorTreeItem
+ );
+
+ assert.strictEqual(started, false);
+ assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'CREATED');
+ assert.strictEqual(
+ calledProcessorName,
+ testProcessorTreeItem.streamProcessorName
+ );
+ assert.strictEqual(
+ showErrorMessageStub.firstCall.args[0],
+ 'Start stream processor failed: Fake test error'
+ );
+ });
+
+ test('mdb.stopStreamProcessor stops the stream processor', async () => {
+ let calledProcessorName = '';
+ const testProcessorTreeItem = getTestStreamProcessorTreeItem({
+ dataService: {
+ stopStreamProcessor: (spName: string) => {
+ calledProcessorName = spName;
+ return Promise.resolve(true);
+ },
+ } as unknown as DataService,
+ });
+
+ const stopped = await vscode.commands.executeCommand(
+ 'mdb.stopStreamProcessor',
+ testProcessorTreeItem
+ );
+
+ assert.strictEqual(stopped, true);
+ assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'STOPPED');
+ assert.strictEqual(
+ calledProcessorName,
+ testProcessorTreeItem.streamProcessorName
+ );
+ assert.strictEqual(
+ showInformationMessageStub.firstCall.args[0],
+ 'Stream processor successfully stopped.'
+ );
+ });
+
+ test('mdb.stopStreamProcessor shows error when fails', async () => {
+ let calledProcessorName = '';
+ const testProcessorTreeItem = getTestStreamProcessorTreeItem({
+ dataService: {
+ stopStreamProcessor: (spName: string) => {
+ calledProcessorName = spName;
+ return Promise.reject(new Error('Fake test error'));
+ },
+ } as unknown as DataService,
+ });
+
+ const stopped = await vscode.commands.executeCommand(
+ 'mdb.stopStreamProcessor',
+ testProcessorTreeItem
+ );
+
+ assert.strictEqual(stopped, false);
+ assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'CREATED');
+ assert.strictEqual(
+ calledProcessorName,
+ testProcessorTreeItem.streamProcessorName
+ );
+ assert.strictEqual(
+ showErrorMessageStub.firstCall.args[0],
+ 'Stop stream processor failed: Fake test error'
+ );
+ });
+
+ test('mdb.dropStreamProcessor drops the stream processor after inputting the name', async () => {
+ let calledProcessorName = '';
+ const testProcessorTreeItem = getTestStreamProcessorTreeItem({
+ streamProcessorName: 'iMissTangerineAltoids',
+ dataService: {
+ dropStreamProcessor: (spName): Promise => {
+ calledProcessorName = spName;
+ return Promise.resolve(true);
+ },
+ } as unknown as DataService,
+ });
+
+ const inputBoxResolvesStub = sandbox.stub();
+ inputBoxResolvesStub.onCall(0).resolves('iMissTangerineAltoids');
+ sandbox.replace(vscode.window, 'showInputBox', inputBoxResolvesStub);
+
+ const dropped = await vscode.commands.executeCommand(
+ 'mdb.dropStreamProcessor',
+ testProcessorTreeItem
+ );
+ assert.strictEqual(dropped, true);
+ assert.strictEqual(calledProcessorName, 'iMissTangerineAltoids');
+ assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'DROPPED');
+ assert.strictEqual(
+ showInformationMessageStub.firstCall.args[0],
+ 'Stream processor successfully dropped.'
+ );
+ });
+
+ test('mdb.dropStreamProcessor shows error when fails', async () => {
+ let calledProcessorName = '';
+ const testProcessorTreeItem = getTestStreamProcessorTreeItem({
+ dataService: {
+ dropStreamProcessor: (spName: string) => {
+ calledProcessorName = spName;
+ return Promise.reject(new Error('Fake test error'));
+ },
+ } as unknown as DataService,
+ });
+
+ const inputBoxResolvesStub = sandbox.stub();
+ inputBoxResolvesStub
+ .onCall(0)
+ .resolves(testProcessorTreeItem.streamProcessorName);
+ sandbox.replace(vscode.window, 'showInputBox', inputBoxResolvesStub);
+
+ const started = await vscode.commands.executeCommand(
+ 'mdb.dropStreamProcessor',
+ testProcessorTreeItem
+ );
+
+ assert.strictEqual(started, false);
+ assert.strictEqual(testProcessorTreeItem.streamProcessorState, 'CREATED');
+ assert.strictEqual(
+ calledProcessorName,
+ testProcessorTreeItem.streamProcessorName
+ );
+ assert.strictEqual(
+ showErrorMessageStub.firstCall.args[0],
+ 'Drop stream processor failed: Fake test error'
+ );
+ });
+
suite('with mock execute command', function () {
let executeCommandStub: SinonStub;
diff --git a/src/test/suite/stubs.ts b/src/test/suite/stubs.ts
index 41bfcd85b..ada97c86e 100644
--- a/src/test/suite/stubs.ts
+++ b/src/test/suite/stubs.ts
@@ -157,7 +157,22 @@ for (let i = 0; i < numberOfDocumentsToMock; i++) {
});
}
+const mockStreamProcessors = [
+ {
+ name: 'mockStreamProcessor1',
+ state: 'STARTED',
+ },
+ {
+ name: 'mockStreamProcessor2',
+ state: 'STOPPPED',
+ },
+];
+
class DataServiceStub {
+ listStreamProcessors(): Promise {
+ return Promise.resolve(mockStreamProcessors);
+ }
+
listDatabases(): Promise {
return Promise.resolve(
mockDatabaseNames.map((dbName) => ({ name: dbName }))
@@ -377,6 +392,7 @@ export {
mockTextEditor,
mockDatabaseNames,
mockDatabases,
+ mockStreamProcessors,
mockVSCodeTextDocument,
DataServiceStub,
ExtensionContextStub,