Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raw query response support #6

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ curl --location --request POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/quer
</code>
</pre>

<h3>Raw Query Response</h3>
<pre>
<code>
curl --location --request POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/query/raw' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer ABC123' \
--data-raw '{
"sql": "SELECT * FROM artist;",
"params": []
}'
</code>
</pre>

<br />
<h2>🤝 Contributing</h2>
<p>We welcome contributions! Please refer to our <a href="./CONTRIBUTING.md">Contribution Guide</a> for more details.</p>
Expand Down
19 changes: 9 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DurableObject } from "cloudflare:workers";
import { createResponse, QueryRequest, QueryTransactionRequest } from './utils';
import { enqueueOperation, processNextOperation } from './operation';
import { enqueueOperation, OperationQueueItem, processNextOperation } from './operation';

const DURABLE_OBJECT_ID = 'sql-durable-object';

Expand All @@ -9,12 +9,7 @@ export class DatabaseDurableObject extends DurableObject {
public sql: SqlStorage;

// Queue of operations to be processed, with each operation containing a list of queries to be executed
private operationQueue: Array<{
queries: { sql: string; params?: any[] }[];
isTransaction: boolean;
resolve: (value: Response) => void;
reject: (reason?: any) => void;
}> = [];
private operationQueue: Array<OperationQueueItem> = [];

// Flag to indicate if an operation is currently being processed
private processingOperation: { value: boolean } = { value: false };
Expand All @@ -31,7 +26,7 @@ export class DatabaseDurableObject extends DurableObject {
this.sql = ctx.storage.sql;
}

async queryRoute(request: Request): Promise<Response> {
async queryRoute(request: Request, isRaw: boolean): Promise<Response> {
try {
const contentType = request.headers.get('Content-Type') || '';
if (!contentType.includes('application/json')) {
Expand All @@ -56,6 +51,7 @@ export class DatabaseDurableObject extends DurableObject {
const response = await enqueueOperation(
queries,
true,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
);
Expand All @@ -70,6 +66,7 @@ export class DatabaseDurableObject extends DurableObject {
const response = await enqueueOperation(
queries,
false,
isRaw,
this.operationQueue,
() => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
);
Expand All @@ -91,8 +88,10 @@ export class DatabaseDurableObject extends DurableObject {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);

if (request.method === 'POST' && url.pathname === '/query') {
return this.queryRoute(request);
if (request.method === 'POST' && url.pathname === '/query/raw') {
return this.queryRoute(request, true);
} else if (request.method === 'POST' && url.pathname === '/query') {
return this.queryRoute(request, false);
} else if (request.method === 'GET' && url.pathname === '/status') {
return this.statusRoute(request);
} else {
Expand Down
51 changes: 43 additions & 8 deletions src/operation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
import { createResponse } from './utils';

export function executeQuery(sql: string, params: any[] | undefined, sqlInstance: any): any[] {
export type OperationQueueItem = {
queries: { sql: string; params?: any[] }[];
isTransaction: boolean;
isRaw: boolean;
resolve: (value: Response) => void;
reject: (reason?: any) => void;
}

export type RawQueryResponse = {
columns: string[];
rows: any[];
meta: {
rows_read: number;
rows_written: number;
}
}

export type QueryResponse = any[] | RawQueryResponse;

export function executeQuery(sql: string, params: any[] | undefined, isRaw: boolean, sqlInstance: any): QueryResponse {
try {
let cursor;

Expand All @@ -10,15 +29,29 @@ export function executeQuery(sql: string, params: any[] | undefined, sqlInstance
cursor = sqlInstance.exec(sql);
}

const result = cursor.toArray();
let result;

if (isRaw) {
result = {
columns: cursor.columnNames,
rows: cursor.raw().toArray(),
meta: {
rows_read: cursor.rowsRead,
rows_written: cursor.rowsWritten,
},
};
} else {
result = cursor.toArray();
}

return result;
} catch (error) {
console.error('SQL Execution Error:', error);
throw error;
}
}

export async function executeTransaction(queries: { sql: string; params?: any[] }[], sqlInstance: any, ctx: any): Promise<any[]> {
export async function executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean, sqlInstance: any, ctx: any): Promise<any[]> {
const results = [];
let transactionBookmark: any | null = null;

Expand All @@ -28,7 +61,7 @@ export async function executeTransaction(queries: { sql: string; params?: any[]

for (const queryObj of queries) {
const { sql, params } = queryObj;
const result = executeQuery(sql, params, sqlInstance);
const result = executeQuery(sql, params, isRaw, sqlInstance);
results.push(result);
}

Expand All @@ -54,6 +87,7 @@ export async function executeTransaction(queries: { sql: string; params?: any[]
export async function enqueueOperation(
queries: { sql: string; params?: any[] }[],
isTransaction: boolean,
isRaw: boolean,
operationQueue: any[],
processNextOperation: () => Promise<void>
): Promise<Response> {
Expand All @@ -66,6 +100,7 @@ export async function enqueueOperation(
operationQueue.push({
queries,
isTransaction,
isRaw,
resolve: (value: Response) => {
clearTimeout(timeout);
resolve(value);
Expand All @@ -84,7 +119,7 @@ export async function enqueueOperation(

export async function processNextOperation(
sqlInstance: any,
operationQueue: any[],
operationQueue: OperationQueueItem[],
ctx: any,
processingOperation: { value: boolean }
) {
Expand All @@ -99,16 +134,16 @@ export async function processNextOperation(
}

processingOperation.value = true;
const { queries, isTransaction, resolve, reject } = operationQueue.shift()!;
const { queries, isTransaction, isRaw, resolve, reject } = operationQueue.shift()!;

try {
let result;

if (isTransaction) {
result = await executeTransaction(queries, sqlInstance, ctx);
result = await executeTransaction(queries, isRaw, sqlInstance, ctx);
} else {
const { sql, params } = queries[0];
result = executeQuery(sql, params, sqlInstance);
result = executeQuery(sql, params, isRaw, sqlInstance);
}

resolve(createResponse(result, undefined, 200));
Expand Down