Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSCODE-504: streams support in playground #633

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 4 additions & 28 deletions src/editors/playgroundController.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import * as vscode from 'vscode';
import path from 'path';
import type { OutputChannel, TextEditor } from 'vscode';
import type { TextEditor } from 'vscode';
import { ProgressLocation } from 'vscode';
import vm from 'vm';
import os from 'os';
import transpiler from 'bson-transpilers';
import util from 'util';

import type ActiveConnectionCodeLensProvider from './activeConnectionCodeLensProvider';
import type PlaygroundSelectedCodeActionProvider from './playgroundSelectedCodeActionProvider';
Expand Down Expand Up @@ -111,7 +110,6 @@ export default class PlaygroundController {

_isPartialRun = false;

_outputChannel: OutputChannel;
private _activeConnectionCodeLensProvider: ActiveConnectionCodeLensProvider;
private _playgroundResultViewColumn?: vscode.ViewColumn;
private _playgroundResultTextDocument?: vscode.TextDocument;
Expand Down Expand Up @@ -145,8 +143,6 @@ export default class PlaygroundController {
this._telemetryService = telemetryService;
this._statusView = statusView;
this._playgroundResultViewProvider = playgroundResultViewProvider;
this._outputChannel =
vscode.window.createOutputChannel('Playground output');
this._activeConnectionCodeLensProvider = activeConnectionCodeLensProvider;
this._exportToLanguageCodeLensProvider = exportToLanguageCodeLensProvider;
this._playgroundSelectedCodeActionProvider =
Expand Down Expand Up @@ -467,7 +463,7 @@ export default class PlaygroundController {
// If a user clicked the cancel button terminate all playground scripts.
this._languageServerController.cancelAll();

return { outputLines: undefined, result: undefined };
return { result: undefined };
});

// Run all playground scripts.
Expand All @@ -483,10 +479,7 @@ export default class PlaygroundController {
} catch (error) {
log.error('Evaluating playground with cancel modal failed', error);

return {
outputLines: undefined,
result: undefined,
};
return { result: undefined };
}
}

Expand Down Expand Up @@ -567,27 +560,10 @@ export default class PlaygroundController {
}
}

this._outputChannel.clear();

const evaluateResponse: ShellEvaluateResult =
await this._evaluateWithCancelModal();

if (evaluateResponse?.outputLines?.length) {
for (const line of evaluateResponse.outputLines) {
this._outputChannel.appendLine(
typeof line.content === 'string'
? line.content
: util.inspect(line.content)
);
}

this._outputChannel.show(true);
}

if (
!evaluateResponse ||
(!evaluateResponse.outputLines && !evaluateResponse.result)
) {
if (!evaluateResponse || !evaluateResponse.result) {
return false;
}

Expand Down
19 changes: 19 additions & 0 deletions src/language/languageServerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from 'vscode-languageclient/node';
import type { ExtensionContext } from 'vscode';
import { workspace } from 'vscode';
import util from 'util';

import { createLogger } from '../logging';
import type {
Expand All @@ -38,6 +39,9 @@ export default class LanguageServerController {
_currentConnectionString?: string;
_currentConnectionOptions?: MongoClientOptions;

_consoleOutputChannel =
vscode.window.createOutputChannel('Playground output');

constructor(context: ExtensionContext) {
this._context = context;

Expand Down Expand Up @@ -151,6 +155,19 @@ export default class LanguageServerController {
void vscode.window.showErrorMessage(messsage);
}
);

this._client.onNotification(
ServerCommands.SHOW_CONSOLE_OUTPUT,
(outputs) => {
for (const line of outputs) {
this._consoleOutputChannel.appendLine(
typeof line === 'string' ? line : util.inspect(line)
);
}

this._consoleOutputChannel.show(true);
}
);
}

deactivate(): Thenable<void> | undefined {
Expand All @@ -173,6 +190,8 @@ export default class LanguageServerController {
});
this._isExecutingInProgress = true;

this._consoleOutputChannel.clear();

// Instantiate a new CancellationTokenSource object
// that generates a cancellation token for each run of a playground.
this._source = new CancellationTokenSource();
Expand Down
143 changes: 128 additions & 15 deletions src/language/mongoDBService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import parseSchema from 'mongodb-schema';
import path from 'path';
import { signatures } from '@mongosh/shell-api';
import translator from '@mongosh/i18n';
import { isAtlasStream } from 'mongodb-build-info';
import { Worker as WorkerThreads } from 'worker_threads';

import { ExportToLanguageMode } from '../types/playgroundType';
Expand Down Expand Up @@ -65,6 +66,7 @@ export default class MongoDBService {
_currentConnectionOptions?: MongoClientOptions;

_databaseCompletionItems: CompletionItem[] = [];
_streamProcessorCompletionItems: CompletionItem[] = [];
_shellSymbolCompletionItems: { [symbol: string]: CompletionItem[] } = {};
_globalSymbolCompletionItems: CompletionItem[] = [];
_collections: { [database: string]: string[] } = {};
Expand Down Expand Up @@ -147,6 +149,7 @@ export default class MongoDBService {
databases: true,
collections: true,
fields: true,
streamProcessors: true,
});
await this._closeCurrentConnection();
}
Expand Down Expand Up @@ -174,6 +177,22 @@ export default class MongoDBService {
);
}

if (isAtlasStream(connectionString || '')) {
await this._getAndCacheStreamProcessors();
} else {
await this._getAndCacheDatabases();
}

this._connection.console.log(
`CliServiceProvider active connection has changed: { connectionId: ${connectionId} }`
);
return {
successfullyConnected: true,
connectionId,
};
}

async _getAndCacheDatabases() {
try {
// Get database names for the current connection.
const databases = await this._getDatabases();
Expand All @@ -184,14 +203,17 @@ export default class MongoDBService {
`LS get databases error: ${util.inspect(error)}`
);
}
}

this._connection.console.log(
`CliServiceProvider active connection has changed: { connectionId: ${connectionId} }`
);
return {
successfullyConnected: true,
connectionId,
};
async _getAndCacheStreamProcessors() {
try {
const processors = await this._getStreamProcessors();
this._cacheStreamProcessorCompletionItems(processors);
} catch (error) {
this._connection.console.error(
`LS get stream processors error: ${util.inspect(error)}`
);
}
}

/**
Expand Down Expand Up @@ -245,9 +267,16 @@ export default class MongoDBService {
)
);

worker?.on(
'message',
({ error, data }: { data?: ShellEvaluateResult; error?: any }) => {
worker?.on('message', ({ name, payload }) => {
if (name === ServerCommands.SHOW_CONSOLE_OUTPUT) {
void this._connection.sendNotification(name, payload);
}

if (name === ServerCommands.CODE_EXECUTION_RESULT) {
const { error, data } = payload as {
data?: ShellEvaluateResult;
error?: any;
};
if (error) {
this._connection.console.error(
`WORKER error: ${util.inspect(error)}`
Expand All @@ -261,7 +290,7 @@ export default class MongoDBService {
resolve(data);
});
}
);
});

worker.postMessage({
name: ServerCommands.EXECUTE_CODE_FROM_PLAYGROUND,
Expand Down Expand Up @@ -294,6 +323,24 @@ export default class MongoDBService {
});
}

/**
* Get stream processors names for the current connection.
*/
async _getStreamProcessors(): Promise<Document[]> {
if (this._serviceProvider) {
try {
const cmd = { listStreamProcessors: 1 };
const result = await this._serviceProvider.runCommand('admin', cmd);
return result.streamProcessors ?? [];
} catch (error) {
this._connection.console.error(
`LS get stream processors error: ${error}`
);
}
}
return [];
}

/**
* Get database names for the current connection.
*/
Expand Down Expand Up @@ -377,7 +424,7 @@ export default class MongoDBService {
}

/**
* Return 'db' and 'use' completion items.
* Return 'db', 'sp' and 'use' completion items.
*/
_cacheGlobalSymbolCompletionItems() {
this._globalSymbolCompletionItems = [
Expand All @@ -386,6 +433,11 @@ export default class MongoDBService {
kind: CompletionItemKind.Method,
preselect: true,
},
{
label: 'sp',
kind: CompletionItemKind.Method,
preselect: true,
},
{
label: 'use',
kind: CompletionItemKind.Function,
Expand Down Expand Up @@ -783,6 +835,18 @@ export default class MongoDBService {
}
}

/**
* If the current node is 'sp.processor.<trigger>' or 'sp["processor"].<trigger>'.
*/
_provideStreamProcessorSymbolCompletionItems(state: CompletionState) {
if (state.isStreamProcessorSymbol) {
this._connection.console.log(
'VISITOR found stream processor symbol completions'
);
return this._shellSymbolCompletionItems.StreamProcessor;
}
}

/**
* If the current node is 'db.collection.find().<trigger>'.
*/
Expand Down Expand Up @@ -895,6 +959,37 @@ export default class MongoDBService {
}
}

/**
* If the current node is 'sp.<trigger>'.
*/
_provideSpSymbolCompletionItems(state: CompletionState) {
if (state.isSpSymbol) {
if (state.isStreamProcessorName) {
this._connection.console.log(
'VISITOR found sp symbol and stream processor name completions'
);
return this._shellSymbolCompletionItems.Streams.concat(
this._streamProcessorCompletionItems
);
}

this._connection.console.log('VISITOR found sp symbol completions');
return this._shellSymbolCompletionItems.Streams;
}
}

/**
* If the current node is 'sp.get(<trigger>)'.
*/
_provideStreamProcessorNameCompletionItems(state: CompletionState) {
if (state.isStreamProcessorName) {
this._connection.console.log(
'VISITOR found stream processor name completions'
);
return this._streamProcessorCompletionItems;
}
}

/**
* If the current node can be used as a collection name
* e.g. 'db.<trigger>.find()' or 'let a = db.<trigger>'.
Expand Down Expand Up @@ -965,6 +1060,7 @@ export default class MongoDBService {
this._provideIdentifierObjectValueCompletionItems.bind(this, state),
this._provideTextObjectValueCompletionItems.bind(this, state),
this._provideCollectionSymbolCompletionItems.bind(this, state),
this._provideStreamProcessorSymbolCompletionItems.bind(this, state),
Anemy marked this conversation as resolved.
Show resolved Hide resolved
this._provideFindCursorCompletionItems.bind(this, state),
this._provideAggregationCursorCompletionItems.bind(this, state),
this._provideGlobalSymbolCompletionItems.bind(this, state),
Expand All @@ -974,13 +1070,15 @@ export default class MongoDBService {
currentLineText,
position
),
this._provideSpSymbolCompletionItems.bind(this, state),
this._provideCollectionNameCompletionItems.bind(
this,
state,
currentLineText,
position
),
this._provideDbNameCompletionItems.bind(this, state),
this._provideStreamProcessorNameCompletionItems.bind(this, state),
];

for (const func of completionOptions) {
Expand Down Expand Up @@ -1117,6 +1215,18 @@ export default class MongoDBService {
this._collections[database] = collections.map((item) => item.name);
}

_cacheStreamProcessorCompletionItems(processors: Document[]): void {
this._streamProcessorCompletionItems = processors.map(({ name }) => ({
kind: CompletionItemKind.Folder,
preselect: true,
label: name,
}));
}

clearCachedStreamProcessors(): void {
this._streamProcessorCompletionItems = [];
}

clearCachedFields(): void {
this._fields = {};
}
Expand All @@ -1142,13 +1252,16 @@ export default class MongoDBService {

clearCachedCompletions(clear: ClearCompletionsCache): void {
if (clear.fields) {
this._fields = {};
this.clearCachedFields();
}
if (clear.databases) {
this._databaseCompletionItems = [];
this.clearCachedDatabases();
}
if (clear.collections) {
this._collections = {};
this.clearCachedCollections();
}
if (clear.streamProcessors) {
this.clearCachedStreamProcessors();
}
}
}
Loading
Loading