Skip to content

Commit

Permalink
lsp: single client + sequential requests (#133)
Browse files Browse the repository at this point in the history
This PR executes SQL sequentially in a single client.
  • Loading branch information
joacoc authored Nov 14, 2023
1 parent b47e7e2 commit 7afdb79
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 87 deletions.
42 changes: 42 additions & 0 deletions src/clients/lsp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ const SERVER_DECOMPRESS_PATH: string = path.join(os.tmpdir(), "mz", "bin", "mz-l
/// The final server binary path.
const SERVER_PATH: string = path.join(__dirname, "bin", "mz-lsp-server");


/// Represents the structure a client uses to understand
export interface ExecuteCommandParseStatement {
/// The sql content in the statement
sql: string,
/// The type of statement.
/// Represents the String version of [Statement].
kind: string,
}

/// Represents the response from the parse command.
interface ExecuteCommandParseResponse {
statements: Array<ExecuteCommandParseStatement>
}

/// This class implements the Language Server Protocol (LSP) client for Materialize.
/// The LSP is downloaded for an endpoint an it is out of the bundle. Binaries are heavy-weight
/// and is preferable to download on the first activation.
Expand Down Expand Up @@ -280,4 +295,31 @@ export default class LspClient {
stop() {
this.client && this.client.stop();
}

/**
* Sends a request to the LSP server to execute the parse command.
* The parse command returns the list of statements in an array,
* including their corresponding SQL and type (e.g., select, create_table, etc.).
*
* For more information about commands: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#workspace_executeCommand
*/
async parseSql(sql: string): Promise<Array<ExecuteCommandParseStatement>> {
if (this.client) {
console.log("[LSP]", "Setting on request handler.");

// Setup the handler.
this.client.onRequest("workspace/executeCommand", (...params) => {
console.log("[LSP]", "Response params: ", params);
});

// Send request
const { statements } = await this.client.sendRequest("workspace/executeCommand", { command: "parse", arguments: [
sql
]}) as ExecuteCommandParseResponse;

return statements;
} else {
throw new Error("Client is not yet available.");
}
}
}
77 changes: 47 additions & 30 deletions src/clients/sql.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Pool, QueryResult } from "pg";
import { Pool, PoolClient, PoolConfig, QueryResult } from "pg";
import AdminClient from "./admin";
import CloudClient from "./cloud";
import { Profile } from "../context/config";
import AsyncContext from "../context/asyncContext";

export default class SqlClient {
private pool: Promise<Pool>;
private privateClient: Promise<PoolClient>;
private adminClient: AdminClient;
private cloudClient: CloudClient;
private context: AsyncContext;
Expand Down Expand Up @@ -45,6 +46,19 @@ export default class SqlClient {

asyncOp();
});

this.privateClient = new Promise((res, rej) => {
const asyncOp = async () => {
try {
const pool = await this.pool;
this.privateClient = pool.connect();
} catch (err) {
console.error("[SqlClient]", "Error awaiting the pool: ", err);
}
};

asyncOp();
});
}

async connectErr() {
Expand Down Expand Up @@ -74,7 +88,7 @@ export default class SqlClient {
return connectionOptions.join(" ");
}

private async buildPoolConfig() {
private async buildPoolConfig(): Promise<PoolConfig> {
console.log("[SqlClient]", "Loading host.");
const hostPromise = this.cloudClient?.getHost(this.profile.region);
console.log("[SqlClient]", "Loading user email.");
Expand All @@ -94,46 +108,49 @@ export default class SqlClient {
password: await this.context.getAppPassword(),
// Disable SSL for tests
ssl: (host && host.startsWith("localhost")) ? false : true,
keepAlive: true
};
}

async query(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
/**
* Internal queries are intended for exploring cases.
* Like quering the catalog, or information about Materialize.
* Queries goes to the pool, and no client is kept.
* @param statement
* @param values
* @returns query results
*/
async internalQuery(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
const pool = await this.pool;
const results = await pool.query(statement, values);

return results;
}

async* cursorQuery(statement: string): AsyncGenerator<QueryResult> {
const pool = await this.pool;
const client = await pool.connect();

try {
const batchSize = 100; // Number of rows to fetch in each batch

await client.query("BEGIN");
await client.query(`DECLARE c CURSOR FOR ${statement}`);
let finish = false;

// Run the query
while (!finish) {
let results: QueryResult = await client.query(`FETCH ${batchSize} c;`);
const { rowCount } = results;
/**
* Private queries are intended for the user. A private query reuses always the same client.
* In this way, it functions like a shell, processing one statement after another.
* @param statement
* @param values
* @returns query results
*/
async privateQuery(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
const client = await this.privateClient;
const results = await client.query(statement, values);

if (rowCount === 0) {
finish = true;
}
return results;
}

yield results;
}
} finally {
try {
await client.query("COMMIT;");
} catch (err) {
console.error("[SqlClient]", "Error commiting transaction.", err);
}
// Release the client and pool resources
client.release();
/**
* Shut down cleanly the pool.
*/
async end() {
try {
const pool = await this.pool;
await pool.end();
} catch (err) {
console.error("[SqlClient]", "Error ending the pool: ", err);
}
}
}
60 changes: 47 additions & 13 deletions src/context/asyncContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import AppPassword from "./appPassword";
import { ActivityLogTreeProvider, AuthProvider, DatabaseTreeProvider, ResultsProvider } from "../providers";
import * as vscode from 'vscode';
import { QueryResult } from "pg";
import { ExecuteCommandParseStatement } from "../clients/lsp";

/**
* Represents the different providers available in the extension.
Expand Down Expand Up @@ -148,6 +149,10 @@ export default class AsyncContext extends Context {
} else if (!profile) {
throw new Error(Errors.unconfiguredProfile);
} else {
// Clean the previous [SqlClient] connection.
if (this.clients.sql) {
this.clients.sql.end();
}
this.clients.sql = new SqlClient(this.clients.admin, this.clients.cloud, profile, this);

try {
Expand All @@ -160,12 +165,12 @@ export default class AsyncContext extends Context {
// Set environment
if (!this.environment) {
const environmentPromises = [
this.query("SHOW CLUSTER;"),
this.query("SHOW DATABASE;"),
this.query("SHOW SCHEMA;"),
this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`),
this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`),
this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`),
this.internalQuery("SHOW CLUSTER;"),
this.internalQuery("SHOW DATABASE;"),
this.internalQuery("SHOW SCHEMA;"),
this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`),
this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`),
this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`),
];

try {
Expand Down Expand Up @@ -199,8 +204,8 @@ export default class AsyncContext extends Context {
if (reloadSchema && this.environment) {
console.log("[AsyncContext]", "Reloading schema.");
const schemaPromises = [
this.query("SHOW SCHEMA;"),
this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`)
this.internalQuery("SHOW SCHEMA;"),
this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`)
];
const [
{ rows: [{ schema }] },
Expand Down Expand Up @@ -376,16 +381,45 @@ export default class AsyncContext extends Context {
}

/**
* Runs a query in the SQL client.
* Internal queries are intended for exploring cases.
* Like quering the catalog, or information about Materialize.
* Queries goes to the pool, and no client is kept.
*
* WARNING: If using this method handle exceptions carefuly.
* @param text
* @param vals
* @returns
* @returns query results
*/
async query(text: string, vals?: Array<any>): Promise<QueryResult<any>> {
async internalQuery(text: string, vals?: Array<any>) {
const client = await this.getSqlClient();
return await client.query(text, vals);

return await client.internalQuery(text, vals);
}

/**
* Private queries are intended for the user.
* A private query reuses always the same client.
* In this way, it functions like a shell, processing one statement after another.
*
* @param text
* @param vals
* @returns query results
*/
async privateQuery(text: string, vals?: Array<any>) {
const client = await this.getSqlClient();

return await client.privateQuery(text, vals);
}

/**
* Sends a request to the LSP server to execute the parse command.
* The parse command returns the list of statements in an array,
* including their corresponding SQL and type (e.g., select, create_table, etc.).
*
* @param sql
* @returns {Promise<Array<ExecuteCommandParseStatement>>}
*/
async parseSql(sql: string): Promise<Array<ExecuteCommandParseStatement>> {
return this.clients.lsp.parseSql(sql);
}

/**
Expand Down
Loading

0 comments on commit 7afdb79

Please sign in to comment.