Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raw query response support #6

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DurableObject } from "cloudflare:workers";
import { createResponse, QueryRequest, QueryTransactionRequest } from './utils';
import { enqueueOperation, processNextOperation } from './operation';
import { enqueueOperation, OperationQueueItem, processNextOperation } from './operation';

const DURABLE_OBJECT_ID = 'sql-durable-object';

Expand All @@ -9,12 +9,7 @@ export class DatabaseDurableObject extends DurableObject {
public sql: SqlStorage;

// Queue of operations to be processed, with each operation containing a list of queries to be executed
private operationQueue: Array<{
queries: { sql: string; params?: any[] }[];
isTransaction: boolean;
resolve: (value: Response) => void;
reject: (reason?: any) => void;
}> = [];
private operationQueue: Array<OperationQueueItem> = [];

// Flag to indicate if an operation is currently being processed
private processingOperation: { value: boolean } = { value: false };
Expand All @@ -31,7 +26,7 @@ export class DatabaseDurableObject extends DurableObject {
this.sql = ctx.storage.sql;
}

async queryRoute(request: Request): Promise<Response> {
async queryRoute(request: Request, isRaw: boolean): Promise<Response> {
try {
const contentType = request.headers.get('Content-Type') || '';
if (!contentType.includes('application/json')) {
Expand All @@ -56,6 +51,7 @@ export class DatabaseDurableObject extends DurableObject {
const response = await enqueueOperation(
queries,
true,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
);
Expand All @@ -70,6 +66,7 @@ export class DatabaseDurableObject extends DurableObject {
const response = await enqueueOperation(
queries,
false,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
);
Expand All @@ -91,8 +88,10 @@ export class DatabaseDurableObject extends DurableObject {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);

if (request.method === 'POST' && url.pathname === '/query') {
return this.queryRoute(request);
if (request.method === 'POST' && url.pathname === '/query/raw') {
return this.queryRoute(request, true);
} else if (request.method === 'POST' && url.pathname === '/query') {
return this.queryRoute(request, false);
} else if (request.method === 'GET' && url.pathname === '/status') {
return this.statusRoute(request);
} else {
Expand Down
59 changes: 51 additions & 8 deletions src/operation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
import { createResponse } from './utils';

export function executeQuery(sql: string, params: any[] | undefined, sqlInstance: any): any[] {
export type OperationQueueItem = {
queries: { sql: string; params?: any[] }[];
isTransaction: boolean;
isRaw: boolean;
resolve: (value: Response) => void;
reject: (reason?: any) => void;
}

export type RawQueryResponse = {
columns: string[];
rows: any[];
meta: {
rows_read: number;
rows_written: number;
}
}

export type QueryResponse = any[] | RawQueryResponse;

export function executeQuery(sql: string, params: any[] | undefined, isRaw: boolean, sqlInstance: any): QueryResponse {
try {
let cursor;

Expand All @@ -10,15 +29,37 @@ export function executeQuery(sql: string, params: any[] | undefined, sqlInstance
cursor = sqlInstance.exec(sql);
}

const result = cursor.toArray();
let result;

if (isRaw) {
const results = cursor.toArray();
if (!results.length) {
return { columns: [], rows: [], meta: { rows_read: 0, rows_written: 0 } };
}

const columnNames = results.length ? Object.keys(results[0]) : [];
const rows = results.map((row: any) => Object.values(row));

result = {
columns: columnNames,
rows: rows,
meta: {
rows_read: cursor.rowsRead,
rows_written: cursor.rowsWritten
}
};
Brayden marked this conversation as resolved.
Show resolved Hide resolved
} else {
result = cursor.toArray();
}

return result;
} catch (error) {
console.error('SQL Execution Error:', error);
throw error;
}
}

export async function executeTransaction(queries: { sql: string; params?: any[] }[], sqlInstance: any, ctx: any): Promise<any[]> {
export async function executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean, sqlInstance: any, ctx: any): Promise<any[]> {
const results = [];
let transactionBookmark: any | null = null;

Expand All @@ -28,7 +69,7 @@ export async function executeTransaction(queries: { sql: string; params?: any[]

for (const queryObj of queries) {
const { sql, params } = queryObj;
const result = executeQuery(sql, params, sqlInstance);
const result = executeQuery(sql, params, isRaw, sqlInstance);
results.push(result);
}

Expand All @@ -54,6 +95,7 @@ export async function executeTransaction(queries: { sql: string; params?: any[]
export async function enqueueOperation(
queries: { sql: string; params?: any[] }[],
isTransaction: boolean,
isRaw: boolean,
operationQueue: any[],
processNextOperation: () => Promise<void>
): Promise<Response> {
Expand All @@ -66,6 +108,7 @@ export async function enqueueOperation(
operationQueue.push({
queries,
isTransaction,
isRaw,
resolve: (value: Response) => {
clearTimeout(timeout);
resolve(value);
Expand All @@ -84,7 +127,7 @@ export async function enqueueOperation(

export async function processNextOperation(
sqlInstance: any,
operationQueue: any[],
operationQueue: OperationQueueItem[],
ctx: any,
processingOperation: { value: boolean }
) {
Expand All @@ -99,16 +142,16 @@ export async function processNextOperation(
}

processingOperation.value = true;
const { queries, isTransaction, resolve, reject } = operationQueue.shift()!;
const { queries, isTransaction, isRaw, resolve, reject } = operationQueue.shift()!;

try {
let result;

if (isTransaction) {
result = await executeTransaction(queries, sqlInstance, ctx);
result = await executeTransaction(queries, isRaw, sqlInstance, ctx);
} else {
const { sql, params } = queries[0];
result = executeQuery(sql, params, sqlInstance);
result = executeQuery(sql, params, isRaw, sqlInstance);
}

resolve(createResponse(result, undefined, 200));
Expand Down