Skip to content

Commit

Permalink
Merge pull request #6 from Brayden/bwilmoth/raw-query
Browse files Browse the repository at this point in the history
Raw query response support
  • Loading branch information
Brayden authored Oct 4, 2024
2 parents 8e06f10 + 46be137 commit 5f5255c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ curl --location --request POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/quer
</code>
</pre>

<h3>Raw Query Response</h3>
<pre>
<code>
curl --location --request POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/query/raw' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer ABC123' \
--data-raw '{
"sql": "SELECT * FROM artist;",
"params": []
}'
</code>
</pre>

<br />
<h2>🤝 Contributing</h2>
<p>We welcome contributions! Please refer to our <a href="./CONTRIBUTING.md">Contribution Guide</a> for more details.</p>
Expand Down
19 changes: 9 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DurableObject } from "cloudflare:workers";
import { createResponse, QueryRequest, QueryTransactionRequest } from './utils';
import { enqueueOperation, processNextOperation } from './operation';
import { enqueueOperation, OperationQueueItem, processNextOperation } from './operation';

const DURABLE_OBJECT_ID = 'sql-durable-object';

Expand All @@ -9,12 +9,7 @@ export class DatabaseDurableObject extends DurableObject {
public sql: SqlStorage;

// Queue of operations to be processed, with each operation containing a list of queries to be executed
private operationQueue: Array<{
queries: { sql: string; params?: any[] }[];
isTransaction: boolean;
resolve: (value: Response) => void;
reject: (reason?: any) => void;
}> = [];
private operationQueue: Array<OperationQueueItem> = [];

// Flag to indicate if an operation is currently being processed
private processingOperation: { value: boolean } = { value: false };
Expand All @@ -31,7 +26,7 @@ export class DatabaseDurableObject extends DurableObject {
this.sql = ctx.storage.sql;
}

async queryRoute(request: Request): Promise<Response> {
async queryRoute(request: Request, isRaw: boolean): Promise<Response> {
try {
const contentType = request.headers.get('Content-Type') || '';
if (!contentType.includes('application/json')) {
Expand All @@ -56,6 +51,7 @@ export class DatabaseDurableObject extends DurableObject {
const response = await enqueueOperation(
queries,
true,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
);
Expand All @@ -70,6 +66,7 @@ export class DatabaseDurableObject extends DurableObject {
const response = await enqueueOperation(
queries,
false,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
);
Expand All @@ -91,8 +88,10 @@ export class DatabaseDurableObject extends DurableObject {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);

if (request.method === 'POST' && url.pathname === '/query') {
return this.queryRoute(request);
if (request.method === 'POST' && url.pathname === '/query/raw') {
return this.queryRoute(request, true);
} else if (request.method === 'POST' && url.pathname === '/query') {
return this.queryRoute(request, false);
} else if (request.method === 'GET' && url.pathname === '/status') {
return this.statusRoute(request);
} else {
Expand Down
51 changes: 43 additions & 8 deletions src/operation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
import { createResponse } from './utils';

export function executeQuery(sql: string, params: any[] | undefined, sqlInstance: any): any[] {
export type OperationQueueItem = {
queries: { sql: string; params?: any[] }[];
isTransaction: boolean;
isRaw: boolean;
resolve: (value: Response) => void;
reject: (reason?: any) => void;
}

export type RawQueryResponse = {
columns: string[];
rows: any[];
meta: {
rows_read: number;
rows_written: number;
}
}

export type QueryResponse = any[] | RawQueryResponse;

export function executeQuery(sql: string, params: any[] | undefined, isRaw: boolean, sqlInstance: any): QueryResponse {
try {
let cursor;

Expand All @@ -10,15 +29,29 @@ export function executeQuery(sql: string, params: any[] | undefined, sqlInstance
cursor = sqlInstance.exec(sql);
}

const result = cursor.toArray();
let result;

if (isRaw) {
result = {
columns: cursor.columnNames,
rows: cursor.raw().toArray(),
meta: {
rows_read: cursor.rowsRead,
rows_written: cursor.rowsWritten,
},
};
} else {
result = cursor.toArray();
}

return result;
} catch (error) {
console.error('SQL Execution Error:', error);
throw error;
}
}

export async function executeTransaction(queries: { sql: string; params?: any[] }[], sqlInstance: any, ctx: any): Promise<any[]> {
export async function executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean, sqlInstance: any, ctx: any): Promise<any[]> {
const results = [];
let transactionBookmark: any | null = null;

Expand All @@ -28,7 +61,7 @@ export async function executeTransaction(queries: { sql: string; params?: any[]

for (const queryObj of queries) {
const { sql, params } = queryObj;
const result = executeQuery(sql, params, sqlInstance);
const result = executeQuery(sql, params, isRaw, sqlInstance);
results.push(result);
}

Expand All @@ -54,6 +87,7 @@ export async function executeTransaction(queries: { sql: string; params?: any[]
export async function enqueueOperation(
queries: { sql: string; params?: any[] }[],
isTransaction: boolean,
isRaw: boolean,
operationQueue: any[],
processNextOperation: () => Promise<void>
): Promise<Response> {
Expand All @@ -66,6 +100,7 @@ export async function enqueueOperation(
operationQueue.push({
queries,
isTransaction,
isRaw,
resolve: (value: Response) => {
clearTimeout(timeout);
resolve(value);
Expand All @@ -84,7 +119,7 @@ export async function enqueueOperation(

export async function processNextOperation(
sqlInstance: any,
operationQueue: any[],
operationQueue: OperationQueueItem[],
ctx: any,
processingOperation: { value: boolean }
) {
Expand All @@ -99,16 +134,16 @@ export async function processNextOperation(
}

processingOperation.value = true;
const { queries, isTransaction, resolve, reject } = operationQueue.shift()!;
const { queries, isTransaction, isRaw, resolve, reject } = operationQueue.shift()!;

try {
let result;

if (isTransaction) {
result = await executeTransaction(queries, sqlInstance, ctx);
result = await executeTransaction(queries, isRaw, sqlInstance, ctx);
} else {
const { sql, params } = queries[0];
result = executeQuery(sql, params, sqlInstance);
result = executeQuery(sql, params, isRaw, sqlInstance);
}

resolve(createResponse(result, undefined, 200));
Expand Down

0 comments on commit 5f5255c

Please sign in to comment.