diff --git a/src/clients/lsp.ts b/src/clients/lsp.ts index 9fbe7f8..4bda10b 100644 --- a/src/clients/lsp.ts +++ b/src/clients/lsp.ts @@ -27,6 +27,21 @@ const SERVER_DECOMPRESS_PATH: string = path.join(os.tmpdir(), "mz", "bin", "mz-l /// The final server binary path. const SERVER_PATH: string = path.join(__dirname, "bin", "mz-lsp-server"); + +/// Represents the structure a client uses to understand +export interface ExecuteCommandParseStatement { + /// The sql content in the statement + sql: string, + /// The type of statement. + /// Represents the String version of [Statement]. + kind: string, +} + +/// Represents the response from the parse command. +interface ExecuteCommandParseResponse { + statements: Array +} + /// This class implements the Language Server Protocol (LSP) client for Materialize. /// The LSP is downloaded for an endpoint an it is out of the bundle. Binaries are heavy-weight /// and is preferable to download on the first activation. @@ -280,4 +295,31 @@ export default class LspClient { stop() { this.client && this.client.stop(); } + + /** + * Sends a request to the LSP server to execute the parse command. + * The parse command returns the list of statements in an array, + * including their corresponding SQL and type (e.g., select, create_table, etc.). + * + * For more information about commands: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#workspace_executeCommand + */ + async parseSql(sql: string): Promise> { + if (this.client) { + console.log("[LSP]", "Setting on request handler."); + + // Setup the handler. + this.client.onRequest("workspace/executeCommand", (...params) => { + console.log("[LSP]", "Response params: ", params); + }); + + // Send request + const { statements } = await this.client.sendRequest("workspace/executeCommand", { command: "parse", arguments: [ + sql + ]}) as ExecuteCommandParseResponse; + + return statements; + } else { + throw new Error("Client is not yet available."); + } + } } \ No newline at end of file diff --git a/src/clients/sql.ts b/src/clients/sql.ts index 75a727a..26db066 100644 --- a/src/clients/sql.ts +++ b/src/clients/sql.ts @@ -1,4 +1,4 @@ -import { Pool, QueryResult } from "pg"; +import { Pool, PoolClient, PoolConfig, QueryResult } from "pg"; import AdminClient from "./admin"; import CloudClient from "./cloud"; import { Profile } from "../context/config"; @@ -6,6 +6,7 @@ import AsyncContext from "../context/asyncContext"; export default class SqlClient { private pool: Promise; + private privateClient: Promise; private adminClient: AdminClient; private cloudClient: CloudClient; private context: AsyncContext; @@ -45,6 +46,19 @@ export default class SqlClient { asyncOp(); }); + + this.privateClient = new Promise((res, rej) => { + const asyncOp = async () => { + try { + const pool = await this.pool; + this.privateClient = pool.connect(); + } catch (err) { + console.error("[SqlClient]", "Error awaiting the pool: ", err); + } + }; + + asyncOp(); + }); } async connectErr() { @@ -74,7 +88,7 @@ export default class SqlClient { return connectionOptions.join(" "); } - private async buildPoolConfig() { + private async buildPoolConfig(): Promise { console.log("[SqlClient]", "Loading host."); const hostPromise = this.cloudClient?.getHost(this.profile.region); console.log("[SqlClient]", "Loading user email."); @@ -94,46 +108,49 @@ export default class SqlClient { password: await this.context.getAppPassword(), // Disable SSL for tests ssl: (host && host.startsWith("localhost")) ? false : true, + keepAlive: true }; } - async query(statement: string, values?: Array): Promise> { + /** + * Internal queries are intended for exploring cases. + * Like quering the catalog, or information about Materialize. + * Queries goes to the pool, and no client is kept. + * @param statement + * @param values + * @returns query results + */ + async internalQuery(statement: string, values?: Array): Promise> { const pool = await this.pool; const results = await pool.query(statement, values); return results; } - async* cursorQuery(statement: string): AsyncGenerator { - const pool = await this.pool; - const client = await pool.connect(); - try { - const batchSize = 100; // Number of rows to fetch in each batch - - await client.query("BEGIN"); - await client.query(`DECLARE c CURSOR FOR ${statement}`); - let finish = false; - - // Run the query - while (!finish) { - let results: QueryResult = await client.query(`FETCH ${batchSize} c;`); - const { rowCount } = results; + /** + * Private queries are intended for the user. A private query reuses always the same client. + * In this way, it functions like a shell, processing one statement after another. + * @param statement + * @param values + * @returns query results + */ + async privateQuery(statement: string, values?: Array): Promise> { + const client = await this.privateClient; + const results = await client.query(statement, values); - if (rowCount === 0) { - finish = true; - } + return results; + } - yield results; - } - } finally { - try { - await client.query("COMMIT;"); - } catch (err) { - console.error("[SqlClient]", "Error commiting transaction.", err); - } - // Release the client and pool resources - client.release(); + /** + * Shut down cleanly the pool. + */ + async end() { + try { + const pool = await this.pool; + await pool.end(); + } catch (err) { + console.error("[SqlClient]", "Error ending the pool: ", err); } } } \ No newline at end of file diff --git a/src/context/asyncContext.ts b/src/context/asyncContext.ts index 24a18e0..03fd9f1 100644 --- a/src/context/asyncContext.ts +++ b/src/context/asyncContext.ts @@ -6,6 +6,7 @@ import AppPassword from "./appPassword"; import { ActivityLogTreeProvider, AuthProvider, DatabaseTreeProvider, ResultsProvider } from "../providers"; import * as vscode from 'vscode'; import { QueryResult } from "pg"; +import { ExecuteCommandParseStatement } from "../clients/lsp"; /** * Represents the different providers available in the extension. @@ -148,6 +149,10 @@ export default class AsyncContext extends Context { } else if (!profile) { throw new Error(Errors.unconfiguredProfile); } else { + // Clean the previous [SqlClient] connection. + if (this.clients.sql) { + this.clients.sql.end(); + } this.clients.sql = new SqlClient(this.clients.admin, this.clients.cloud, profile, this); try { @@ -160,12 +165,12 @@ export default class AsyncContext extends Context { // Set environment if (!this.environment) { const environmentPromises = [ - this.query("SHOW CLUSTER;"), - this.query("SHOW DATABASE;"), - this.query("SHOW SCHEMA;"), - this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`), - this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`), - this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`), + this.internalQuery("SHOW CLUSTER;"), + this.internalQuery("SHOW DATABASE;"), + this.internalQuery("SHOW SCHEMA;"), + this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`), + this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`), + this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`), ]; try { @@ -199,8 +204,8 @@ export default class AsyncContext extends Context { if (reloadSchema && this.environment) { console.log("[AsyncContext]", "Reloading schema."); const schemaPromises = [ - this.query("SHOW SCHEMA;"), - this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`) + this.internalQuery("SHOW SCHEMA;"), + this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`) ]; const [ { rows: [{ schema }] }, @@ -376,16 +381,45 @@ export default class AsyncContext extends Context { } /** - * Runs a query in the SQL client. + * Internal queries are intended for exploring cases. + * Like quering the catalog, or information about Materialize. + * Queries goes to the pool, and no client is kept. * - * WARNING: If using this method handle exceptions carefuly. * @param text * @param vals - * @returns + * @returns query results */ - async query(text: string, vals?: Array): Promise> { + async internalQuery(text: string, vals?: Array) { const client = await this.getSqlClient(); - return await client.query(text, vals); + + return await client.internalQuery(text, vals); + } + + /** + * Private queries are intended for the user. + * A private query reuses always the same client. + * In this way, it functions like a shell, processing one statement after another. + * + * @param text + * @param vals + * @returns query results + */ + async privateQuery(text: string, vals?: Array) { + const client = await this.getSqlClient(); + + return await client.privateQuery(text, vals); + } + + /** + * Sends a request to the LSP server to execute the parse command. + * The parse command returns the list of statements in an array, + * including their corresponding SQL and type (e.g., select, create_table, etc.). + * + * @param sql + * @returns {Promise>} + */ + async parseSql(sql: string): Promise> { + return this.clients.lsp.parseSql(sql); } /** diff --git a/src/providers/query.ts b/src/providers/query.ts index 59914b4..be857ce 100644 --- a/src/providers/query.ts +++ b/src/providers/query.ts @@ -2,6 +2,95 @@ import * as vscode from 'vscode'; import { randomUUID } from 'crypto'; import AsyncContext from "../context/asyncContext"; +// vscode.commands.executeCommand('queryResults.focus').then(async () => { +// const document = activeEditor.document; +// const selection = activeEditor.selection; +// const textSelected = activeEditor.document.getText(selection).trim(); +// const query = textSelected ? textSelected : document.getText(); +// const fileName = document.fileName; + +// // Identify the query to not overlap results. +// // When a user press many times the run query button +// // the results from one query can overlap the results +// // from another. We only want to display the last results. +// const id = randomUUID(); + +// try { +// // Clean the results by emitting a newQuery event. +// context.emit("event", { type: EventType.newQuery, data: { id } }); + +// try { +// const statements = await context.parseSql(query); + +// console.log("[RunSQLCommand]", "Running statements: ", statements); + +// const lastStatement = statements[statements.length - 1]; +// for (const statement of statements) { +// console.log("[RunSQLCommand]", "Running statement: ", statement); + +// // Benchmark +// const startTime = Date.now(); +// try { +// const results = await context.privateQuery(statement.sql); +// const endTime = Date.now(); +// const elapsedTime = endTime - startTime; + +// console.log("[RunSQLCommand]", "Results: ", results); +// console.log("[RunSQLCommand]", "Emitting results."); + +// // Only display the results from the last statement. +// if (lastStatement === statement) { +// if (Array.isArray(results)) { +// context.emit("event", { type: EventType.queryResults, data: { ...results[0], elapsedTime, id } }); +// } else { +// context.emit("event", { type: EventType.queryResults, data: { ...results, elapsedTime, id } }); +// } +// } +// activityLogProvider.addLog({ +// status: "success", +// latency: elapsedTime, // assuming elapsedTime holds the time taken for the query to execute +// sql: statement.sql +// }); +// } catch (error: any) { +// console.log("[RunSQLCommand]", error.toString()); +// console.log("[RunSQLCommand]", JSON.stringify(error)); +// const endTime = Date.now(); +// const elapsedTime = endTime - startTime; + +// activityLogProvider.addLog({ +// status: "failure", +// latency: elapsedTime, // assuming elapsedTime holds the time taken before the error was caught +// sql: statement.sql +// }); + +// context.emit("event", { type: EventType.queryResults, data: { id, rows: [], fields: [], error: { +// message: error.toString(), +// position: error.position, +// query, +// }, elapsedTime }}); +// break; +// } finally { +// resultsProvider._view?.show(); +// } +// } +// } catch (err) { + // context.emit("event", { type: EventType.queryResults, data: { id, rows: [], fields: [], error: { + // message: "Syntax errors are present. For more information, please refer to the \"Problems\" tab.", + // position: undefined, + // query, + // }, elapsedTime: undefined }}); + + // console.error("[RunSQLCommand]", "Error running statement: ", err); +// } +// } catch (err) { +// context.emit("event", { type: EventType.queryResults, data: { id, rows: [], fields: [], error: { +// message: "Error connecting to Materialize.", +// position: undefined, +// query, +// }, elapsedTime: undefined }}); +// } +// }); + export const buildRunSQLCommand = (context: AsyncContext) => { const sqlCommand = async () => { const { @@ -29,56 +118,80 @@ export const buildRunSQLCommand = (context: AsyncContext) => { const textSelected = activeEditor.document.getText(selection).trim(); const query = textSelected ? textSelected : document.getText(); - console.log("[RunSQLCommand]", "Running query: ", query); - // Identify the query to not overlap results. // When a user press many times the run query button // the results from one query can overlap the results // from another. We only want to display the last results. const id = randomUUID(); - resultsProvider.setQueryId(id); - - // Benchmark - const startTime = Date.now(); try { - const results = await context.query(query); - const endTime = Date.now(); - const elapsedTime = endTime - startTime; + resultsProvider.setQueryId(id); + try { + const statements = await context.parseSql(query); + console.log("[RunSQLCommand]", "Running statements: ", statements); + const lastStatement = statements[statements.length - 1]; - console.log("[RunSQLCommand]", "Results: ", results); - console.log("[RunSQLCommand]", "Emitting results."); + for (const statement of statements) { + console.log("[RunSQLCommand]", "Running statement: ", statement); - if (Array.isArray(results)) { - resultsProvider.setResults(id, { ...results[0], elapsedTime, id }); - } else { - resultsProvider.setResults(id, { ...results, elapsedTime, id }); - } + // Benchmark + const startTime = Date.now(); + try { + const results = await context.privateQuery(statement.sql); + const endTime = Date.now(); + const elapsedTime = endTime - startTime; + + console.log("[RunSQLCommand]", "Results: ", results); + console.log("[RunSQLCommand]", "Emitting results."); + + // Only display the results from the last statement. + if (lastStatement === statement) { + if (Array.isArray(results)) { + resultsProvider.setResults(id, { ...results[0], elapsedTime, id }); + } else { + resultsProvider.setResults(id, { ...results, elapsedTime, id }); + } + } - activityProvider.addLog({ - status: "success", - latency: elapsedTime, - sql: query - }); - } catch (error: any) { - console.log("[RunSQLCommand]", error.toString()); - console.log("[RunSQLCommand]", JSON.stringify(error)); - const endTime = Date.now(); - const elapsedTime = endTime - startTime; - - activityProvider.addLog({ - status: "failure", - latency: elapsedTime, // assuming elapsedTime holds the time taken before the error was caught - sql: query - }); - - - resultsProvider.setResults(id, - undefined, - { - message: error.toString(), - position: error.position, - query, - }); + activityProvider.addLog({ + status: "success", + latency: elapsedTime, // assuming elapsedTime holds the time taken for the query to execute + sql: statement.sql + }); + } catch (error: any) { + console.log("[RunSQLCommand]", JSON.stringify(error)); + const endTime = Date.now(); + const elapsedTime = endTime - startTime; + + activityProvider.addLog({ + status: "failure", + latency: elapsedTime, // assuming elapsedTime holds the time taken before the error was caught + sql: statement.sql + }); + + resultsProvider.setResults(id, + undefined, + { + message: error.toString(), + position: error.position, + query, + }); + + // Break for-loop. + break; + } + } + } catch (err) { + resultsProvider.setResults(id, + undefined, + { + message: "Syntax errors are present. For more information, please refer to the \"Problems\" tab.", + position: 0, + query, + } + ); + + console.error("[RunSQLCommand]", "Error running statement: ", err); + } } finally { resultsProvider._view?.show(); } diff --git a/src/providers/schema.ts b/src/providers/schema.ts index 5d0495f..3fc1c1e 100644 --- a/src/providers/schema.ts +++ b/src/providers/schema.ts @@ -94,7 +94,7 @@ export default class DatabaseTreeProvider implements vscode.TreeDataProvider): Promise> { try { - const { rows } = await this.context.query(text, vals); + const { rows } = await this.context.internalQuery(text, vals); return rows; } catch (err) { diff --git a/src/test/suite/extension.test.ts b/src/test/suite/extension.test.ts index 4e7505f..d0ec600 100644 --- a/src/test/suite/extension.test.ts +++ b/src/test/suite/extension.test.ts @@ -212,7 +212,7 @@ suite('Extension Test Suite', () => { test('Test query execution', async () => { const _context: AsyncContext = await extension.activate(); - const rows = _context.query("SELECT 100"); + const rows = _context.privateQuery("SELECT 100"); assert.ok((await rows).rowCount > 0); },); @@ -223,7 +223,7 @@ suite('Extension Test Suite', () => { const altClusterName = context.getClusters()?.find(x => x.name !== clusterName); assert.ok(typeof altClusterName?.name === "string"); context.setCluster(altClusterName.name); - const rows = await context.query("SHOW CLUSTER;"); + const rows = await context.internalQuery("SHOW CLUSTER;"); assert.ok(rows.rows[0].cluster === altClusterName.name); }).timeout(10000);