Skip to content

Commit

Permalink
Add D1 API dialect
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-sherman committed Nov 21, 2022
1 parent 4817141 commit 31d4a90
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 0 deletions.
144 changes: 144 additions & 0 deletions src/d1-api-dialect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import {
CompiledQuery,
DatabaseConnection,
DatabaseIntrospector,
Dialect,
Driver,
Kysely,
SqliteAdapter,
SqliteIntrospector,
SqliteQueryCompiler,
QueryCompiler,
QueryResult,
} from 'kysely';
import { D1Api } from './d1-api';

export interface D1DialectConfig {
apiKey: string;
accountId: string;
databaseName: string;
}

export class D1APIDialect implements Dialect {
#config;

constructor(config: D1DialectConfig) {
this.#config = config;
}

createAdapter() {
return new SqliteAdapter();
}

createDriver(): Driver {
return new D1Driver(this.#config);
}

createQueryCompiler(): QueryCompiler {
return new SqliteQueryCompiler();
}

createIntrospector(db: Kysely<any>): DatabaseIntrospector {
return new SqliteIntrospector(db);
}
}

class D1Driver implements Driver {
#config: D1DialectConfig;

constructor(config: D1DialectConfig) {
this.#config = config;
}

async init(): Promise<void> {}

async acquireConnection(): Promise<DatabaseConnection> {
const apiClient = new D1Api(this.#config.apiKey, this.#config.accountId);
const database = await apiClient.databseFromName(this.#config.databaseName);

if (!database) {
throw new Error(`Database ${this.#config.databaseName} not found`);
}

return new D1Connection({
apiClient: apiClient,
databaseId: database.uuid,
});
}

async beginTransaction(conn: D1Connection): Promise<void> {
return await conn.beginTransaction();
}

async commitTransaction(conn: D1Connection): Promise<void> {
return await conn.commitTransaction();
}

async rollbackTransaction(conn: D1Connection): Promise<void> {
return await conn.rollbackTransaction();
}

async releaseConnection(_conn: D1Connection): Promise<void> {}

async destroy(): Promise<void> {}
}

interface D1ConnectionConfig {
apiClient: D1Api;
databaseId: string;
}

class D1Connection implements DatabaseConnection {
#config;
// #transactionClient?: D1Connection

constructor(config: D1ConnectionConfig) {
this.#config = config;
}

async executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> {
// Transactions are not supported yet.
// if (this.#transactionClient) return this.#transactionClient.executeQuery(compiledQuery)

const queryResult = await this.#config.apiClient.queryDatabase(
this.#config.databaseId,
compiledQuery.sql,
compiledQuery.parameters
);
if (queryResult.success === false) {
throw new Error(queryResult.error);
}

const result = queryResult.data.result[0];

return {
insertId: result.meta.last_row_id !== null ? BigInt(result.meta.last_row_id) : undefined,
rows: (result?.results as O[]) ?? [],
numUpdatedOrDeletedRows: result.meta.changes > 0 ? BigInt(result.meta.changes) : undefined,
};
}

async beginTransaction() {
// this.#transactionClient = this.#transactionClient ?? new PlanetScaleConnection(this.#config)
// this.#transactionClient.#conn.execute('BEGIN')
throw new Error('Transactions are not supported yet.');
}

async commitTransaction() {
// if (!this.#transactionClient) throw new Error('No transaction to commit')
// this.#transactionClient.#conn.execute('COMMIT')
// this.#transactionClient = undefined
throw new Error('Transactions are not supported yet.');
}

async rollbackTransaction() {
// if (!this.#transactionClient) throw new Error('No transaction to rollback')
// this.#transactionClient.#conn.execute('ROLLBACK')
// this.#transactionClient = undefined
throw new Error('Transactions are not supported yet.');
}

async *streamQuery<O>(_compiledQuery: CompiledQuery, _chunkSize: number): AsyncIterableIterator<QueryResult<O>> {
throw new Error('D1 Driver does not support streaming');
}
}
75 changes: 75 additions & 0 deletions src/d1-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
const BASE_URL = 'https://api.cloudflare.com/client/v4';

export class D1Api {
#apiToken;
#accountId;

constructor(apiToken: string, accountId: string) {
this.#apiToken = apiToken;
this.#accountId = accountId;
}

#fetch(input: Request | string, init?: RequestInit) {
const headers = new Headers(init?.headers);

headers.set('Authorization', `Bearer ${this.#apiToken}`);
headers.set('Content-Type', 'application/json');
return fetch(input, {
...init,
headers: headers,
});
}

async listDatabases() {
const perPage = 10;
const databases: { uuid: string; name: string }[] = [];
let page = 1;
while (databases.length % perPage === 0) {
const params = new URLSearchParams({
per_page: perPage.toString(),
page: page.toString(),
});
const response = await this.#fetch(`${BASE_URL}/accounts/${this.#accountId}/d1/database?${params.toString()}`);
const json = (await response.json()) as any;
const results = json.result;
databases.push(...results);
page++;
if (results.length < perPage) {
break;
}
}
return databases;
}

async databseFromName(name: string) {
const allDBs = await this.listDatabases();
const matchingDB = allDBs.find((db: { uuid: string; name: string }) => db.name === name);
return matchingDB ?? null;
}

async queryDatabase(
databaseId: string,
query: string,
params: readonly unknown[]
): Promise<{ success: true; data: any } | { success: false; error: any }> {
try {
const reply = await this.#fetch(`${BASE_URL}/accounts/${this.#accountId}/d1/database/${databaseId}/query`, {
method: 'POST',
body: JSON.stringify({ sql: query, params }),
});
if (reply.ok) {
const jsonData = (await reply.json()) as any;
return { success: true, data: jsonData };
} else {
try {
const jsonData = (await reply.json()) as any;
return { success: false, error: jsonData.errors[0].message };
} catch (e) {
return { success: false, error: reply.statusText };
}
}
} catch (e: any) {
return { success: false, error: e.message };
}
}
}
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
export * from './d1-dialect';

// Marked as unstable as the D1 HTTP API is currently undocumented and may change.
export { D1APIDialect as unstable_D1APIDialect, D1DialectConfig as unstable_D1DialectConfig } from './d1-api-dialect';

0 comments on commit 31d4a90

Please sign in to comment.