Skip to content

Commit

Permalink
chore: autocomplete support for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
shaketbaby committed Dec 29, 2023
1 parent 9228ff0 commit 2bda50b
Show file tree
Hide file tree
Showing 5 changed files with 820 additions and 12 deletions.
128 changes: 117 additions & 11 deletions src/language/mongoDBService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import parseSchema from 'mongodb-schema';
import path from 'path';
import { signatures } from '@mongosh/shell-api';
import translator from '@mongosh/i18n';
import { isAtlasStream } from 'mongodb-build-info';
import { Worker as WorkerThreads } from 'worker_threads';

import { ExportToLanguageMode } from '../types/playgroundType';
Expand Down Expand Up @@ -65,6 +66,7 @@ export default class MongoDBService {
_currentConnectionOptions?: MongoClientOptions;

_databaseCompletionItems: CompletionItem[] = [];
_streamProcessorCompletionItems: CompletionItem[] = [];
_shellSymbolCompletionItems: { [symbol: string]: CompletionItem[] } = {};
_globalSymbolCompletionItems: CompletionItem[] = [];
_collections: { [database: string]: string[] } = {};
Expand Down Expand Up @@ -147,6 +149,7 @@ export default class MongoDBService {
databases: true,
collections: true,
fields: true,
streamProcessors: true,
});
await this._closeCurrentConnection();
}
Expand Down Expand Up @@ -174,6 +177,22 @@ export default class MongoDBService {
);
}

if (isAtlasStream(connectionString || '')) {
await this._getAndCacheStreamProcessors();
} else {
await this._getAndCacheDatabases();
}

this._connection.console.log(
`CliServiceProvider active connection has changed: { connectionId: ${connectionId} }`
);
return {
successfullyConnected: true,
connectionId,
};
}

async _getAndCacheDatabases() {
try {
// Get database names for the current connection.
const databases = await this._getDatabases();
Expand All @@ -184,14 +203,17 @@ export default class MongoDBService {
`LS get databases error: ${util.inspect(error)}`
);
}
}

this._connection.console.log(
`CliServiceProvider active connection has changed: { connectionId: ${connectionId} }`
);
return {
successfullyConnected: true,
connectionId,
};
async _getAndCacheStreamProcessors() {
try {
const processors = await this._getStreamProcessors();
this._cacheStreamProcessorCompletionItems(processors);
} catch (error) {
this._connection.console.error(
`LS get stream processors error: ${util.inspect(error)}`
);
}
}

/**
Expand Down Expand Up @@ -294,6 +316,24 @@ export default class MongoDBService {
});
}

/**
* Get stream processors names for the current connection.
*/
async _getStreamProcessors(): Promise<Document[]> {
if (this._serviceProvider) {
try {
const cmd = { listStreamProcessors: 1 };
const result = await this._serviceProvider.runCommand('admin', cmd);
return result.streamProcessors ?? [];
} catch (error) {
this._connection.console.error(
`LS get stream processors error: ${error}`
);
}
}
return [];
}

/**
* Get database names for the current connection.
*/
Expand Down Expand Up @@ -377,7 +417,7 @@ export default class MongoDBService {
}

/**
* Return 'db' and 'use' completion items.
* Return 'db', 'sp' and 'use' completion items.
*/
_cacheGlobalSymbolCompletionItems() {
this._globalSymbolCompletionItems = [
Expand All @@ -386,6 +426,11 @@ export default class MongoDBService {
kind: CompletionItemKind.Method,
preselect: true,
},
{
label: 'sp',
kind: CompletionItemKind.Method,
preselect: true,
},
{
label: 'use',
kind: CompletionItemKind.Function,
Expand Down Expand Up @@ -783,6 +828,18 @@ export default class MongoDBService {
}
}

/**
* If the current node is 'sp.processor.<trigger>' or 'sp["processor"].<trigger>'.
*/
_provideStreamProcessorSymbolCompletionItems(state: CompletionState) {
if (state.isStreamProcessorSymbol) {
this._connection.console.log(
'VISITOR found stream processor symbol completions'
);
return this._shellSymbolCompletionItems.StreamProcessor;
}
}

/**
* If the current node is 'db.collection.find().<trigger>'.
*/
Expand Down Expand Up @@ -895,6 +952,37 @@ export default class MongoDBService {
}
}

/**
* If the current node is 'sp.<trigger>'.
*/
_provideSpSymbolCompletionItems(state: CompletionState) {
if (state.isSpSymbol) {
if (state.isStreamProcessorName) {
this._connection.console.log(
'VISITOR found sp symbol and stream processor name completions'
);
return this._shellSymbolCompletionItems.Streams.concat(
this._streamProcessorCompletionItems
);
}

this._connection.console.log('VISITOR found sp symbol completions');
return this._shellSymbolCompletionItems.Streams;
}
}

/**
* If the current node is 'sp.get(<trigger>)'.
*/
_provideStreamProcessorNameCompletionItems(state: CompletionState) {
if (state.isStreamProcessorName) {
this._connection.console.log(
'VISITOR found stream processor name completions'
);
return this._streamProcessorCompletionItems;
}
}

/**
* If the current node can be used as a collection name
* e.g. 'db.<trigger>.find()' or 'let a = db.<trigger>'.
Expand Down Expand Up @@ -965,6 +1053,7 @@ export default class MongoDBService {
this._provideIdentifierObjectValueCompletionItems.bind(this, state),
this._provideTextObjectValueCompletionItems.bind(this, state),
this._provideCollectionSymbolCompletionItems.bind(this, state),
this._provideStreamProcessorSymbolCompletionItems.bind(this, state),
this._provideFindCursorCompletionItems.bind(this, state),
this._provideAggregationCursorCompletionItems.bind(this, state),
this._provideGlobalSymbolCompletionItems.bind(this, state),
Expand All @@ -974,13 +1063,15 @@ export default class MongoDBService {
currentLineText,
position
),
this._provideSpSymbolCompletionItems.bind(this, state),
this._provideCollectionNameCompletionItems.bind(
this,
state,
currentLineText,
position
),
this._provideDbNameCompletionItems.bind(this, state),
this._provideStreamProcessorNameCompletionItems.bind(this, state),
];

for (const func of completionOptions) {
Expand Down Expand Up @@ -1117,6 +1208,18 @@ export default class MongoDBService {
this._collections[database] = collections.map((item) => item.name);
}

_cacheStreamProcessorCompletionItems(processors: Document[]): void {
this._streamProcessorCompletionItems = processors.map(({ name }) => ({
kind: CompletionItemKind.Folder,
preselect: true,
label: name,
}));
}

clearCachedStreamProcessors(): void {
this._streamProcessorCompletionItems = [];
}

clearCachedFields(): void {
this._fields = {};
}
Expand All @@ -1142,13 +1245,16 @@ export default class MongoDBService {

clearCachedCompletions(clear: ClearCompletionsCache): void {
if (clear.fields) {
this._fields = {};
this.clearCachedFields();
}
if (clear.databases) {
this._databaseCompletionItems = [];
this.clearCachedDatabases();
}
if (clear.collections) {
this._collections = {};
this.clearCachedCollections();
}
if (clear.streamProcessors) {
this.clearCachedStreamProcessors();
}
}
}
Loading

0 comments on commit 2bda50b

Please sign in to comment.