Skip to content

Commit

Permalink
Merge pull request #54 from Ehesp/ehesp/rework
Browse files Browse the repository at this point in the history
Added type safety, feature toggles and shared RPC
  • Loading branch information
Brayden authored Dec 21, 2024
2 parents 7c5a783 + 2ccce1b commit 99567b9
Show file tree
Hide file tree
Showing 22 changed files with 2,228 additions and 1,822 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"cf-typegen": "wrangler types"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20240925.0",
"@cloudflare/workers-types": "^4.20241216.0",
"@types/pg": "^8.11.10",
"typescript": "^5.5.2",
"wrangler": "^3.60.3"
"typescript": "^5.7.2",
"wrangler": "^3.96.0"
},
"dependencies": {
"@libsql/client": "^0.14.0",
Expand Down
375 changes: 137 additions & 238 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

147 changes: 78 additions & 69 deletions src/allowlist/index.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,95 @@
import { HandlerConfig } from "../handler";
import { DataSource } from "../types";
import { StarbaseDBConfiguration } from "../handler";
import { DataSource, QueryResult } from "../types";

const parser = new (require('node-sql-parser').Parser)();
const parser = new (require("node-sql-parser").Parser)();

let allowlist: string[] | null = null;
let normalizedAllowlist: any[] | null = null;

function normalizeSQL(sql: string) {
// Remove trailing semicolon. This allows a user to send a SQL statement that has
// a semicolon where the allow list might not include it but both statements can
// equate to being the same. AST seems to have an issue with matching the difference
// when included in one query vs another.
return sql.trim().replace(/;\s*$/, '');
// Remove trailing semicolon. This allows a user to send a SQL statement that has
// a semicolon where the allow list might not include it but both statements can
// equate to being the same. AST seems to have an issue with matching the difference
// when included in one query vs another.
return sql.trim().replace(/;\s*$/, "");
}

async function loadAllowlist(dataSource?: DataSource): Promise<string[]> {
try {
const statement = 'SELECT sql_statement FROM tmp_allowlist_queries'
const result = await dataSource?.internalConnection?.durableObject.executeQuery(statement, [], false) as any[];
return result.map((row: any) => row.sql_statement);
} catch (error) {
console.error('Error loading allowlist:', error);
return [];
}
async function loadAllowlist(dataSource: DataSource): Promise<string[]> {
try {
const statement = "SELECT sql_statement FROM tmp_allowlist_queries";
const result = await dataSource.rpc.executeQuery({ sql: statement }) as QueryResult[]
return result.map((row) => String(row.sql_statement));
} catch (error) {
console.error("Error loading allowlist:", error);
return [];
}
}

export async function isQueryAllowed(sql: string, isEnabled: boolean, dataSource?: DataSource, config?: HandlerConfig): Promise<boolean | Error> {
// If the feature is not turned on then by default the query is allowed
if (!isEnabled) return true;
export async function isQueryAllowed(opts: {
sql: string;
isEnabled: boolean;
dataSource: DataSource;
config: StarbaseDBConfiguration;
}): Promise<boolean | Error> {
const { sql, isEnabled, dataSource, config } = opts;

// If the feature is not turned on then by default the query is allowed
if (!isEnabled) return true;

// If we are using the administrative AUTHORIZATION token value, this request is allowed.
// We want database UI's to be able to have more free reign to run queries so we can load
// tables, run queries, and more. If you want to block queries with the allowlist then we
// advise you to do so by implementing user authentication with JWT.
if (config.role === "admin") {
return true;
}

// If we are using the administrative AUTHORIZATION token value, this request is allowed.
// We want database UI's to be able to have more free reign to run queries so we can load
// tables, run queries, and more. If you want to block queries with the allowlist then we
// advise you to do so by implementing user authentication with JWT.
if (dataSource?.request.headers.get('Authorization') === `Bearer ${config?.adminAuthorizationToken}`) {
return true;
allowlist = await loadAllowlist(dataSource);
normalizedAllowlist = allowlist.map((query) =>
parser.astify(normalizeSQL(query))
);

try {
if (!sql) {
return Error("No SQL provided for allowlist check");
}

allowlist = await loadAllowlist(dataSource);
normalizedAllowlist = allowlist.map(query => parser.astify(normalizeSQL(query)));

try {
if (!sql) {
return Error('No SQL provided for allowlist check')
}
const normalizedQuery = parser.astify(normalizeSQL(sql));

// Compare ASTs while ignoring specific values
const isCurrentAllowed = normalizedAllowlist?.some((allowedQuery) => {
// Create deep copies to avoid modifying original ASTs
const allowedAst = JSON.parse(JSON.stringify(allowedQuery));
const queryAst = JSON.parse(JSON.stringify(normalizedQuery));

const normalizedQuery = parser.astify(normalizeSQL(sql));

// Compare ASTs while ignoring specific values
const isCurrentAllowed = normalizedAllowlist?.some(allowedQuery => {
// Create deep copies to avoid modifying original ASTs
const allowedAst = JSON.parse(JSON.stringify(allowedQuery));
const queryAst = JSON.parse(JSON.stringify(normalizedQuery));

// Remove or normalize value fields from both ASTs
const normalizeAst = (ast: any) => {
if (Array.isArray(ast)) {
ast.forEach(normalizeAst);
} else if (ast && typeof ast === 'object') {
// Remove or normalize fields that contain specific values
if ('value' in ast) {
ast.value = '?';
}

Object.values(ast).forEach(normalizeAst);
}

return ast;
};

normalizeAst(allowedAst);
normalizeAst(queryAst);

return JSON.stringify(allowedAst) === JSON.stringify(queryAst);
});

if (!isCurrentAllowed) {
throw new Error("Query not allowed");
// Remove or normalize value fields from both ASTs
const normalizeAst = (ast: any) => {
if (Array.isArray(ast)) {
ast.forEach(normalizeAst);
} else if (ast && typeof ast === "object") {
// Remove or normalize fields that contain specific values
if ("value" in ast) {
ast.value = "?";
}

Object.values(ast).forEach(normalizeAst);
}

return true;
} catch (error: any) {
throw new Error(error?.message ?? 'Error');
return ast;
};

normalizeAst(allowedAst);
normalizeAst(queryAst);

return JSON.stringify(allowedAst) === JSON.stringify(queryAst);
});

if (!isCurrentAllowed) {
throw new Error("Query not allowed");
}
}

return true;
} catch (error: any) {
throw new Error(error?.message ?? "Error");
}
}
188 changes: 118 additions & 70 deletions src/cache/index.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,79 @@
import { DataSource, Source } from "../types";
const parser = new (require('node-sql-parser').Parser)();
import { StarbaseDBConfiguration } from "../handler";
import { DataSource, QueryResult } from "../types";
import sqlparser from "node-sql-parser";
const parser = new sqlparser.Parser();

function hasModifyingStatement(ast: any): boolean {
// Check if current node is a modifying statement
if (ast.type && ['insert', 'update', 'delete'].includes(ast.type.toLowerCase())) {
return true;
}
// Check if current node is a modifying statement
if (
ast.type &&
["insert", "update", "delete"].includes(ast.type.toLowerCase())
) {
return true;
}

// Recursively check all properties of the AST
for (const key in ast) {
if (typeof ast[key] === 'object' && ast[key] !== null) {
if (Array.isArray(ast[key])) {
if (ast[key].some(item => hasModifyingStatement(item))) {
return true;
}
} else if (hasModifyingStatement(ast[key])) {
return true;
}
// Recursively check all properties of the AST
for (const key in ast) {
if (typeof ast[key] === "object" && ast[key] !== null) {
if (Array.isArray(ast[key])) {
if (ast[key].some((item) => hasModifyingStatement(item))) {
return true;
}
} else if (hasModifyingStatement(ast[key])) {
return true;
}
}
}

return false;
return false;
}

export async function beforeQueryCache(sql: string, params?: any[], dataSource?: DataSource, dialect?: string): Promise<any | null> {
// Currently we do not support caching queries that have dynamic parameters
if (params?.length) return null
if (dataSource?.source === Source.internal || !dataSource?.request.headers.has('X-Starbase-Cache')) return null
export async function beforeQueryCache(opts: {
sql: string;
params?: unknown[];
dataSource: DataSource;
}): Promise<unknown | null> {
const { sql, params = [], dataSource } = opts;

if (!dialect) dialect = 'sqlite'
if (dialect.toLowerCase() === 'postgres') dialect = 'postgresql'
// Currently we do not support caching queries that have dynamic parameters
if (params.length) {
return null;
}

let ast = parser.astify(sql, { database: dialect });
if (hasModifyingStatement(ast)) return null

const fetchCacheStatement = `SELECT timestamp, ttl, query, results FROM tmp_cache WHERE query = ?`
const result = await dataSource.internalConnection?.durableObject.executeQuery(fetchCacheStatement, [sql], false) as any[];

if (result?.length) {
const { timestamp, ttl, results } = result[0];
const expirationTime = new Date(timestamp).getTime() + (ttl * 1000);

if (Date.now() < expirationTime) {
return JSON.parse(results)
}
// If it's an internal request, or cache is not enabled, return null.
if (dataSource.source === "internal" || !dataSource.cache) {
return null;
}

const dialect =
dataSource.source === "external" ? dataSource.external!.dialect : "sqlite";

let ast = parser.astify(sql, { database: dialect });
if (hasModifyingStatement(ast)) return null;

const fetchCacheStatement = `SELECT timestamp, ttl, query, results FROM tmp_cache WHERE query = ?`;

type QueryResult = {
timestamp: string;
ttl: number;
results: string;
};

const result = await dataSource.rpc.executeQuery({
sql: fetchCacheStatement,
params: [sql],
}) as any[];

if (result?.length) {
const { timestamp, ttl, results } = result[0] as QueryResult;
const expirationTime = new Date(timestamp).getTime() + ttl * 1000;

if (Date.now() < expirationTime) {
return JSON.parse(results);
}
}

return null
return null;
}

// Serialized RPC arguemnts are limited to 1MiB in size at the moment for Cloudflare
Expand All @@ -55,36 +82,57 @@ export async function beforeQueryCache(sql: string, params?: any[], dataSource?:
// to look into include using Cloudflare Cache but need to find a good way to cache the
// response in a safe way for our use case. Another option is another service for queues
// or another way to ingest it directly to the Durable Object.
export async function afterQueryCache(sql: string, params: any[] | undefined, result: any, dataSource?: DataSource, dialect?: string) {
// Currently we do not support caching queries that have dynamic parameters
if (params?.length) return;
if (dataSource?.source === Source.internal || !dataSource?.request.headers.has('X-Starbase-Cache')) return null

try {
if (!dialect) dialect = 'sqlite'
if (dialect.toLowerCase() === 'postgres') dialect = 'postgresql'

let ast = parser.astify(sql, { database: dialect });

// If any modifying query exists within our SQL statement then we shouldn't proceed
if (hasModifyingStatement(ast)) return;

const timestamp = Date.now();
const results = JSON.stringify(result);

const exists = await dataSource.internalConnection?.durableObject.executeQuery(
'SELECT 1 FROM tmp_cache WHERE query = ? LIMIT 1',
[sql],
false
) as any[];

const query = exists?.length
? { sql: 'UPDATE tmp_cache SET timestamp = ?, results = ? WHERE query = ?', params: [timestamp, results, sql] }
: { sql: 'INSERT INTO tmp_cache (timestamp, ttl, query, results) VALUES (?, ?, ?, ?)', params: [timestamp, 60, sql, results] };

await dataSource.internalConnection?.durableObject.executeQuery(query.sql, query.params, false);
} catch (error) {
console.error('Error in cache operation:', error);
return;
}
}
export async function afterQueryCache(opts: {
sql: string;
params: unknown[] | undefined;
result: unknown;
dataSource: DataSource;
}) {
const { sql, params, result, dataSource } = opts;

// Currently we do not support caching queries that have dynamic parameters
if (params?.length) return;

// If it's an internal request, or cache is not enabled, return null.
if (dataSource.source === "internal" || !dataSource.cache) {
return null;
}

try {
const dialect =
dataSource.source === "external"
? dataSource.external!.dialect
: "sqlite";

let ast = parser.astify(sql, { database: dialect });

// If any modifying query exists within our SQL statement then we shouldn't proceed
if (hasModifyingStatement(ast)) return;

const timestamp = Date.now();
const results = JSON.stringify(result);

const exists = await dataSource.rpc.executeQuery({
sql: "SELECT 1 FROM tmp_cache WHERE query = ? LIMIT 1",
params: [sql],
}) as QueryResult[];

const query = exists?.length
? {
sql: "UPDATE tmp_cache SET timestamp = ?, results = ? WHERE query = ?",
params: [timestamp, results, sql],
}
: {
sql: "INSERT INTO tmp_cache (timestamp, ttl, query, results) VALUES (?, ?, ?, ?)",
params: [timestamp, dataSource.cacheTTL ?? 60, sql, results],
};

await dataSource.rpc.executeQuery({
sql: query.sql,
params: query.params,
});
} catch (error) {
console.error("Error in cache operation:", error);
return;
}
}
Loading

0 comments on commit 99567b9

Please sign in to comment.