-
Notifications
You must be signed in to change notification settings - Fork 280
/
sqlite-driver.ts
122 lines (97 loc) · 3.18 KB
/
sqlite-driver.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import {
DatabaseConnection,
QueryResult,
} from '../../driver/database-connection.js'
import { Driver } from '../../driver/driver.js'
import { CompiledQuery } from '../../query-compiler/compiled-query.js'
import { freeze, isFunction } from '../../util/object-utils.js'
import { SqliteDatabase, SqliteDialectConfig } from './sqlite-dialect-config.js'
export class SqliteDriver implements Driver {
readonly #config: SqliteDialectConfig
readonly #connectionMutex = new ConnectionMutex()
#db?: SqliteDatabase
#connection?: DatabaseConnection
constructor(config: SqliteDialectConfig) {
this.#config = freeze({ ...config })
}
async init(): Promise<void> {
this.#db = isFunction(this.#config.database)
? await this.#config.database()
: this.#config.database
this.#connection = new SqliteConnection(this.#db)
if (this.#config.onCreateConnection) {
await this.#config.onCreateConnection(this.#connection)
}
}
async acquireConnection(): Promise<DatabaseConnection> {
// SQLite only has one single connection. We use a mutex here to wait
// until the single connection has been released.
await this.#connectionMutex.lock()
return this.#connection!
}
async beginTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('begin'))
}
async commitTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('commit'))
}
async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('rollback'))
}
async releaseConnection(): Promise<void> {
this.#connectionMutex.unlock()
}
async destroy(): Promise<void> {
this.#db?.close()
}
}
class SqliteConnection implements DatabaseConnection {
readonly #db: SqliteDatabase
constructor(db: SqliteDatabase) {
this.#db = db
}
executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> {
const { sql, parameters } = compiledQuery
const stmt = this.#db.prepare(sql)
if (stmt.reader) {
return Promise.resolve({
rows: stmt.all(parameters) as O[],
})
} else {
const { changes, lastInsertRowid } = stmt.run(parameters)
const numAffectedRows =
changes !== undefined && changes !== null ? BigInt(changes) : undefined
return Promise.resolve({
// TODO: remove.
numUpdatedOrDeletedRows: numAffectedRows,
numAffectedRows,
insertId:
lastInsertRowid !== undefined && lastInsertRowid !== null
? BigInt(lastInsertRowid)
: undefined,
rows: [],
})
}
}
async *streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
throw new Error("Sqlite driver doesn't support streaming")
}
}
class ConnectionMutex {
#promise?: Promise<void>
#resolve?: () => void
async lock(): Promise<void> {
while (this.#promise) {
await this.#promise
}
this.#promise = new Promise((resolve) => {
this.#resolve = resolve
})
}
unlock(): void {
const resolve = this.#resolve
this.#promise = undefined
this.#resolve = undefined
resolve?.()
}
}