Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Rework internals #54

Merged
merged 12 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"cf-typegen": "wrangler types"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20240925.0",
"@cloudflare/workers-types": "^4.20241216.0",
"@types/pg": "^8.11.10",
"typescript": "^5.5.2",
"wrangler": "^3.60.3"
"typescript": "^5.7.2",
"wrangler": "^3.96.0"
},
"dependencies": {
"@libsql/client": "^0.14.0",
Expand Down
375 changes: 137 additions & 238 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

145 changes: 77 additions & 68 deletions src/allowlist/index.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,95 @@
import { HandlerConfig } from "../handler";
import { StarbaseDBConfiguration } from "../handler";
import { DataSource } from "../types";

const parser = new (require('node-sql-parser').Parser)();
const parser = new (require("node-sql-parser").Parser)();

let allowlist: string[] | null = null;
let normalizedAllowlist: any[] | null = null;

function normalizeSQL(sql: string) {
// Remove trailing semicolon. This allows a user to send a SQL statement that has
// a semicolon where the allow list might not include it but both statements can
// equate to being the same. AST seems to have an issue with matching the difference
// when included in one query vs another.
return sql.trim().replace(/;\s*$/, '');
// Remove trailing semicolon. This allows a user to send a SQL statement that has
// a semicolon where the allow list might not include it but both statements can
// equate to being the same. AST seems to have an issue with matching the difference
// when included in one query vs another.
return sql.trim().replace(/;\s*$/, "");
}

async function loadAllowlist(dataSource?: DataSource): Promise<string[]> {
try {
const statement = 'SELECT sql_statement FROM tmp_allowlist_queries'
const result = await dataSource?.internalConnection?.durableObject.executeQuery(statement, [], false) as any[];
return result.map((row: any) => row.sql_statement);
} catch (error) {
console.error('Error loading allowlist:', error);
return [];
}
async function loadAllowlist(dataSource: DataSource): Promise<string[]> {
try {
const statement = "SELECT sql_statement FROM tmp_allowlist_queries";
const result = await dataSource.rpc.executeQuery({ sql: statement });
return result.map((row) => String(row.sql_statement));
} catch (error) {
console.error("Error loading allowlist:", error);
return [];
}
}

export async function isQueryAllowed(sql: string, isEnabled: boolean, dataSource?: DataSource, config?: HandlerConfig): Promise<boolean | Error> {
// If the feature is not turned on then by default the query is allowed
if (!isEnabled) return true;
export async function isQueryAllowed(opts: {
sql: string;
isEnabled: boolean;
dataSource: DataSource;
config: StarbaseDBConfiguration;
}): Promise<boolean | Error> {
const { sql, isEnabled, dataSource, config } = opts;

// If the feature is not turned on then by default the query is allowed
if (!isEnabled) return true;

// If we are using the administrative AUTHORIZATION token value, this request is allowed.
// We want database UI's to be able to have more free reign to run queries so we can load
// tables, run queries, and more. If you want to block queries with the allowlist then we
// advise you to do so by implementing user authentication with JWT.
if (config.role === "admin") {
return true;
}

// If we are using the administrative AUTHORIZATION token value, this request is allowed.
// We want database UI's to be able to have more free reign to run queries so we can load
// tables, run queries, and more. If you want to block queries with the allowlist then we
// advise you to do so by implementing user authentication with JWT.
if (dataSource?.request.headers.get('Authorization') === `Bearer ${config?.adminAuthorizationToken}`) {
return true;
allowlist = await loadAllowlist(dataSource);
normalizedAllowlist = allowlist.map((query) =>
parser.astify(normalizeSQL(query))
);

try {
if (!sql) {
return Error("No SQL provided for allowlist check");
}

allowlist = await loadAllowlist(dataSource);
normalizedAllowlist = allowlist.map(query => parser.astify(normalizeSQL(query)));

try {
if (!sql) {
return Error('No SQL provided for allowlist check')
}
const normalizedQuery = parser.astify(normalizeSQL(sql));

// Compare ASTs while ignoring specific values
const isCurrentAllowed = normalizedAllowlist?.some((allowedQuery) => {
// Create deep copies to avoid modifying original ASTs
const allowedAst = JSON.parse(JSON.stringify(allowedQuery));
const queryAst = JSON.parse(JSON.stringify(normalizedQuery));

const normalizedQuery = parser.astify(normalizeSQL(sql));

// Compare ASTs while ignoring specific values
const isCurrentAllowed = normalizedAllowlist?.some(allowedQuery => {
// Create deep copies to avoid modifying original ASTs
const allowedAst = JSON.parse(JSON.stringify(allowedQuery));
const queryAst = JSON.parse(JSON.stringify(normalizedQuery));

// Remove or normalize value fields from both ASTs
const normalizeAst = (ast: any) => {
if (Array.isArray(ast)) {
ast.forEach(normalizeAst);
} else if (ast && typeof ast === 'object') {
// Remove or normalize fields that contain specific values
if ('value' in ast) {
ast.value = '?';
}

Object.values(ast).forEach(normalizeAst);
}

return ast;
};

normalizeAst(allowedAst);
normalizeAst(queryAst);

return JSON.stringify(allowedAst) === JSON.stringify(queryAst);
});

if (!isCurrentAllowed) {
throw new Error("Query not allowed");
// Remove or normalize value fields from both ASTs
const normalizeAst = (ast: any) => {
if (Array.isArray(ast)) {
ast.forEach(normalizeAst);
} else if (ast && typeof ast === "object") {
// Remove or normalize fields that contain specific values
if ("value" in ast) {
ast.value = "?";
}

Object.values(ast).forEach(normalizeAst);
}

return true;
} catch (error: any) {
throw new Error(error?.message ?? 'Error');
return ast;
};

normalizeAst(allowedAst);
normalizeAst(queryAst);

return JSON.stringify(allowedAst) === JSON.stringify(queryAst);
});

if (!isCurrentAllowed) {
throw new Error("Query not allowed");
}
}

return true;
} catch (error: any) {
throw new Error(error?.message ?? "Error");
}
}
188 changes: 118 additions & 70 deletions src/cache/index.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,79 @@
import { DataSource, Source } from "../types";
const parser = new (require('node-sql-parser').Parser)();
import { StarbaseDBConfiguration } from "../handler";
import { DataSource } from "../types";
import sqlparser from "node-sql-parser";
const parser = new sqlparser.Parser();

function hasModifyingStatement(ast: any): boolean {
// Check if current node is a modifying statement
if (ast.type && ['insert', 'update', 'delete'].includes(ast.type.toLowerCase())) {
return true;
}
// Check if current node is a modifying statement
if (
ast.type &&
["insert", "update", "delete"].includes(ast.type.toLowerCase())
) {
return true;
}

// Recursively check all properties of the AST
for (const key in ast) {
if (typeof ast[key] === 'object' && ast[key] !== null) {
if (Array.isArray(ast[key])) {
if (ast[key].some(item => hasModifyingStatement(item))) {
return true;
}
} else if (hasModifyingStatement(ast[key])) {
return true;
}
// Recursively check all properties of the AST
for (const key in ast) {
if (typeof ast[key] === "object" && ast[key] !== null) {
if (Array.isArray(ast[key])) {
if (ast[key].some((item) => hasModifyingStatement(item))) {
return true;
}
} else if (hasModifyingStatement(ast[key])) {
return true;
}
}
}

return false;
return false;
}

export async function beforeQueryCache(sql: string, params?: any[], dataSource?: DataSource, dialect?: string): Promise<any | null> {
// Currently we do not support caching queries that have dynamic parameters
if (params?.length) return null
if (dataSource?.source === Source.internal || !dataSource?.request.headers.has('X-Starbase-Cache')) return null
export async function beforeQueryCache(opts: {
sql: string;
params?: unknown[];
dataSource: DataSource;
}): Promise<unknown | null> {
const { sql, params = [], dataSource } = opts;

if (!dialect) dialect = 'sqlite'
if (dialect.toLowerCase() === 'postgres') dialect = 'postgresql'
// Currently we do not support caching queries that have dynamic parameters
if (params.length) {
return null;
}

let ast = parser.astify(sql, { database: dialect });
if (hasModifyingStatement(ast)) return null

const fetchCacheStatement = `SELECT timestamp, ttl, query, results FROM tmp_cache WHERE query = ?`
const result = await dataSource.internalConnection?.durableObject.executeQuery(fetchCacheStatement, [sql], false) as any[];

if (result?.length) {
const { timestamp, ttl, results } = result[0];
const expirationTime = new Date(timestamp).getTime() + (ttl * 1000);

if (Date.now() < expirationTime) {
return JSON.parse(results)
}
// If it's an internal request, or cache is not enabled, return null.
if (dataSource.source === "internal" || !dataSource.cache) {
return null;
}

const dialect =
dataSource.source === "external" ? dataSource.external!.dialect : "sqlite";

let ast = parser.astify(sql, { database: dialect });
if (hasModifyingStatement(ast)) return null;

const fetchCacheStatement = `SELECT timestamp, ttl, query, results FROM tmp_cache WHERE query = ?`;

type QueryResult = {
timestamp: string;
ttl: number;
results: string;
};

const result = await dataSource.rpc.executeQuery({
sql: fetchCacheStatement,
params: [sql],
});

if (result?.length) {
const { timestamp, ttl, results } = result[0] as QueryResult;
const expirationTime = new Date(timestamp).getTime() + ttl * 1000;

if (Date.now() < expirationTime) {
return JSON.parse(results);
}
}

return null
return null;
}

// Serialized RPC arguemnts are limited to 1MiB in size at the moment for Cloudflare
Expand All @@ -55,36 +82,57 @@ export async function beforeQueryCache(sql: string, params?: any[], dataSource?:
// to look into include using Cloudflare Cache but need to find a good way to cache the
// response in a safe way for our use case. Another option is another service for queues
// or another way to ingest it directly to the Durable Object.
export async function afterQueryCache(sql: string, params: any[] | undefined, result: any, dataSource?: DataSource, dialect?: string) {
// Currently we do not support caching queries that have dynamic parameters
if (params?.length) return;
if (dataSource?.source === Source.internal || !dataSource?.request.headers.has('X-Starbase-Cache')) return null

try {
if (!dialect) dialect = 'sqlite'
if (dialect.toLowerCase() === 'postgres') dialect = 'postgresql'

let ast = parser.astify(sql, { database: dialect });

// If any modifying query exists within our SQL statement then we shouldn't proceed
if (hasModifyingStatement(ast)) return;

const timestamp = Date.now();
const results = JSON.stringify(result);

const exists = await dataSource.internalConnection?.durableObject.executeQuery(
'SELECT 1 FROM tmp_cache WHERE query = ? LIMIT 1',
[sql],
false
) as any[];

const query = exists?.length
? { sql: 'UPDATE tmp_cache SET timestamp = ?, results = ? WHERE query = ?', params: [timestamp, results, sql] }
: { sql: 'INSERT INTO tmp_cache (timestamp, ttl, query, results) VALUES (?, ?, ?, ?)', params: [timestamp, 60, sql, results] };

await dataSource.internalConnection?.durableObject.executeQuery(query.sql, query.params, false);
} catch (error) {
console.error('Error in cache operation:', error);
return;
}
}
export async function afterQueryCache(opts: {
sql: string;
params: unknown[] | undefined;
result: unknown;
dataSource: DataSource;
}) {
const { sql, params, result, dataSource } = opts;

// Currently we do not support caching queries that have dynamic parameters
if (params?.length) return;

// If it's an internal request, or cache is not enabled, return null.
if (dataSource.source === "internal" || !dataSource.cache) {
return null;
}

try {
const dialect =
dataSource.source === "external"
? dataSource.external!.dialect
: "sqlite";

let ast = parser.astify(sql, { database: dialect });

// If any modifying query exists within our SQL statement then we shouldn't proceed
if (hasModifyingStatement(ast)) return;

const timestamp = Date.now();
const results = JSON.stringify(result);

const exists = await dataSource.rpc.executeQuery({
sql: "SELECT 1 FROM tmp_cache WHERE query = ? LIMIT 1",
params: [sql],
});

const query = exists?.length
? {
sql: "UPDATE tmp_cache SET timestamp = ?, results = ? WHERE query = ?",
params: [timestamp, results, sql],
}
: {
sql: "INSERT INTO tmp_cache (timestamp, ttl, query, results) VALUES (?, ?, ?, ?)",
params: [timestamp, dataSource.cacheTTL ?? 60, sql, results],
};

await dataSource.rpc.executeQuery({
sql: query.sql,
params: query.params,
});
} catch (error) {
console.error("Error in cache operation:", error);
return;
}
}
Loading