Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use logs for logging #38

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
all:
dune build @install @runtest
dune build --root . @install @tests/runtest

clean:
dune clean
dune clean --root .
4 changes: 2 additions & 2 deletions ppx_sqlexpr.opam
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
opam-version: "1.2"
opam-version: "2.0"
maintainer: "[email protected]"
authors: ["Mauricio Fernandez <[email protected]>"]
license: "LGPL-2.1 with OCaml linking exception"
Expand All @@ -9,7 +9,7 @@ doc: "doc"
build: [
[ "dune" "build" "-p" name "-j" jobs ]
]
build-test: [["dune" "runtest" "-p" name "-j" jobs]]
run-test: [["dune" "runtest" "-p" name "-j" jobs]]
depends: [
"dune" {build & >= "1.1.1"}
"ppx_tools_versioned"
Expand Down
10 changes: 6 additions & 4 deletions sqlexpr.opam
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
opam-version: "1.2"
opam-version: "2.0"
maintainer: "[email protected]"
authors: ["Mauricio Fernandez <[email protected]>"]
license: "LGPL-2.1 with OCaml linking exception"
homepage: "http://github.com/mfp/ocaml-sqlexpr"
dev-repo: "https://github.com/mfp/ocaml-sqlexpr.git"
bug-reports: "https://github.com/mfp/ocaml-sqlexpr/issues"
available: ocaml-version >= "4.02.0"

build: [
[ "env" "ESTRING=%{estring:enable}%" "dune" "build" "-p" name "-j" jobs ]
]
depends: [
"ocaml" {>= "4.02.0"}
"dune" {build & >= "1.1.1"}
"csv"
"lwt" {>= "2.2.0"}
"lwt"
"lwt_ppx" {build}
("sqlite3" {>= "2.0.4"} | "sqlite3" {= "2.0.3"})
"sqlite3"
"base-unix"
"ppx_sqlexpr"
"logs"
]
depopts: [ "estring" ]

Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
(synopsis "SQLite database access.")
(wrapped false)
(flags (:standard -w -9-3))
(libraries csv sqlite3 lwt lwt.unix unix threads)
(libraries csv sqlite3 lwt lwt.unix unix threads logs.lwt)
(preprocess (pps lwt_ppx)))
3 changes: 1 addition & 2 deletions src/sqlexpr_sqlite.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

open Sqlexpr_utils
open Printf

module Option = Sqlexpr_utils.Option
exception Error of string * exn
exception Sqlite_error of string * Sqlite3.Rc.t

Expand Down
91 changes: 49 additions & 42 deletions src/sqlexpr_sqlite_lwt.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
open Printf
open Sqlexpr_sqlite
open Lwt
open Lwt.Infix

let src = Logs.Src.create "sqlexpr_sqlite_lwt"

module Option = Sqlexpr_utils.Option
module CONC = Sqlexpr_concurrency.Lwt
Expand Down Expand Up @@ -101,7 +102,7 @@ struct

(* will be set to [detach] later, done this way to avoid cumbersome gigantic
* let rec definition *)
let do_detach = ref (fun _ _ _ -> return ())
let do_detach = ref (fun _ _ _ -> Lwt.return_unit)

let rec close_db db =
db.db_finished <- true;
Expand All @@ -118,7 +119,7 @@ struct
WT.iter (fun stmt -> Stmt.finalize stmt) w.stmts;
ignore (Sqlite3.db_close handle))
()
with _ -> return () (* FIXME: log? *)
with _ -> Lwt.return_unit (* FIXME: log? *)
)

let new_id =
Expand All @@ -139,7 +140,7 @@ struct
tx_key = Lwt.new_key ();
}
in
Lwt_gc.finalise (fun db -> close_db db; return ()) r;
Lwt_gc.finalise (fun db -> close_db db; Lwt.return_unit) r;
r

let rec thread_loop thread =
Expand Down Expand Up @@ -175,7 +176,7 @@ struct

let check_worker_finished worker =
if worker.db.db_finished then
failwith (sprintf "db %d:%S is closed" worker.db.id worker.db.file)
failwith (Printf.sprintf "db %d:%S is closed" worker.db.id worker.db.file)

let detach worker f args =
let result = ref `Nothing in
Expand All @@ -184,17 +185,17 @@ struct
result := `Success (f dbh args)
with exn ->
result := `Failure exn in
let waiter, wakener = wait () in
let waiter, wakener = Lwt.wait () in
let id =
Lwt_unix.make_notification ~once:true
(fun () ->
match !result with
| `Nothing ->
wakeup_exn wakener (Failure "Sqlexpr_sqlite.detach")
Lwt.wakeup_exn wakener (Failure "Sqlexpr_sqlite.detach")
| `Success value ->
wakeup wakener value
Lwt.wakeup wakener value
| `Failure exn ->
wakeup_exn wakener exn) in
Lwt.wakeup_exn wakener exn) in
let thread = worker.worker_thread in
(
WSet.remove worker.db.free_workers worker;
Expand All @@ -208,13 +209,13 @@ struct
thread.tasks <- (id, task worker.handle) :: thread.tasks;
Mutex.unlock thread.pmutex;
Condition.signal thread.cv;
return ()
with e -> wakeup_exn wakener e; return ())
Lwt.return_unit
with e -> Lwt.wakeup_exn wakener e; Lwt.return_unit)
in
waiter
)[%finally
WSet.add worker.db.free_workers worker;
return ()
Lwt.return_unit
]

let () = do_detach := detach
Expand All @@ -223,20 +224,20 @@ struct
let add_thread thread =
match Lwt_sequence.take_opt_l waiters with
| None -> Queue.add thread threads
| Some t -> wakeup t thread
| Some t -> Lwt.wakeup t thread

(* Add a worker to the pool: *)
let add_worker db worker =
match Lwt_sequence.take_opt_l db.db_waiters with
| None -> WSet.add db.free_workers worker
| Some w -> wakeup w worker
| Some w -> Lwt.wakeup w worker

(* Wait for thread to be available, then return it: *)
let get_thread () =
if not (Queue.is_empty threads) then
return (Queue.take threads)
Lwt.return (Queue.take threads)
else if !thread_count < !max_threads then
return (make_thread ())
Lwt.return (make_thread ())
else begin
let (res, w) = Lwt.task () in
let node = Lwt_sequence.add_r w waiters in
Expand Down Expand Up @@ -267,30 +268,30 @@ struct
in worker.handle <- handle;
db.workers <- worker :: db.workers;
add_worker db worker;
return worker
Lwt.return worker
with e ->
db.worker_count <- db.worker_count - 1;
Lwt.fail e
)
[%finally
add_thread thread;
return ()
Lwt.return_unit
]

let do_raise_error ?sql ?params ?errmsg errcode =
let msg = Sqlite3.Rc.to_string errcode ^ Option.map_default ((^) " ") "" errmsg in
let msg = match sql with
None -> msg
| Some sql -> sprintf "%s in %s" msg (prettify_sql_stmt sql) in
| Some sql -> Printf.sprintf "%s in %s" msg (prettify_sql_stmt sql) in
let msg = match params with
None | Some [] -> msg
| Some params ->
sprintf "%s with params %s" msg (string_of_params (List.rev params))
Printf.sprintf "%s with params %s" msg (string_of_params (List.rev params))
in raise (Error (msg, Sqlite_error (msg, errcode)))

let raise_error worker ?sql ?params ?errmsg errcode =
let%lwt errmsg = match errmsg with
Some e -> return e
Some e -> Lwt.return e
| None -> detach worker (fun dbh () -> Sqlite3.errmsg dbh) ()
in
Lwt.catch
Expand All @@ -299,24 +300,27 @@ struct

let rec run ?(retry_on_busy = !retry_on_busy) ?stmt ?sql ?params worker f x =
detach worker f x >>= function
Sqlite3.Rc.OK | Sqlite3.Rc.ROW | Sqlite3.Rc.DONE as r -> return r
Sqlite3.Rc.OK | Sqlite3.Rc.ROW | Sqlite3.Rc.DONE as r -> Lwt.return r
| Sqlite3.Rc.BUSY when retry_on_busy ->
let%lwt () = Logs_lwt.err ~src (fun m -> m "BUSY") in
let%lwt () = Lwt_unix.sleep 0.010 in run ~retry_on_busy ?sql ?stmt ?params worker f x
| code ->
let%lwt errmsg = detach worker (fun dbh () -> Sqlite3.errmsg dbh) () in
let%lwt () =
Logs_lwt.err ~src (fun m -> m "%s: %s" (Sqlite3.Rc.to_string code) errmsg) in
let%lwt () = begin match stmt with
None -> return ()
| Some stmt -> let%lwt _ = detach worker (fun _dbh -> Stmt.reset) stmt in return ()
None -> Lwt.return_unit
| Some stmt -> let%lwt _ = detach worker (fun _dbh -> Stmt.reset) stmt in Lwt.return_unit
end in
raise_error worker ?sql ?params ~errmsg code

let check_ok ?retry_on_busy ?stmt ?sql ?params worker f x =
let%lwt _ = run ?retry_on_busy ?stmt ?sql ?params worker f x in return ()
let%lwt _ = run ?retry_on_busy ?stmt ?sql ?params worker f x in Lwt.return_unit

(* Wait for worker to be available, then return it: *)
let get_worker db =
if not (WSet.is_empty db.free_workers) then
return (WSet.take db.free_workers)
Lwt.return (WSet.take db.free_workers)
else if db.worker_count < db.max_workers then
make_worker db
else begin
Expand All @@ -330,7 +334,7 @@ struct
let%lwt worker = get_worker db in
let%lwt () =
try%lwt
return (check_worker_finished worker)
Lwt.return (check_worker_finished worker)
with exn -> Lwt.fail exn
in
let%lwt stmt =
Expand All @@ -341,7 +345,7 @@ struct
(fun () ->
let%lwt stmt = detach worker Stmt.prepare sql in
WT.add worker.stmts stmt;
return stmt)
Lwt.return stmt)
| Some id ->
match Stmt_cache.find_remove_stmt worker.stmt_cache id with
Some stmt ->
Expand All @@ -354,16 +358,16 @@ struct
Lwt.fail e
end
in
return stmt
Lwt.return stmt
| None ->
profile_prepare_stmt sql
(fun () ->
let%lwt stmt = detach worker Stmt.prepare sql in
WT.add worker.stmts stmt;
return stmt)
Lwt.return stmt)
with e ->
add_worker db worker;
let s = sprintf "Error with SQL statement %s" sql in
let s = Printf.sprintf "Error with SQL statement %s" sql in
Lwt.fail (Error (s, e)) in

let%lwt () =
Expand All @@ -379,16 +383,16 @@ struct
stmt
)[%finally
add_worker db worker;
return ()
Lwt.return_unit
]
in
profile_execute_sql sql ~params
(fun () ->
(f (worker, stmt) sql params)
[%finally
match stmt_id with
Some id -> Stmt_cache.add_stmt worker.stmt_cache id stmt; return ()
| None -> return ()
Some id -> Stmt_cache.add_stmt worker.stmt_cache id stmt; Lwt.return_unit
| None -> Lwt.return_unit
]
)

Expand All @@ -406,7 +410,7 @@ struct
[%finally
db'.workers <- [];
close_db db';
return ()
Lwt.return_unit
]

let steal_worker db f =
Expand All @@ -423,7 +427,7 @@ struct
db'.workers <- [];
close_db db';
add_worker db worker;
return ()
Lwt.return_unit
]

let step ?sql ?params (worker, stmt) =
Expand All @@ -436,7 +440,7 @@ struct
let reset_with_errcode (worker, stmt) =
detach worker (fun _ -> Stmt.reset) stmt

let reset x = let%lwt _ = reset_with_errcode x in return ()
let reset x = let%lwt _ = reset_with_errcode x in Lwt.return_unit

let row_data (worker, stmt) = detach worker (fun _ -> Stmt.row_data) stmt

Expand All @@ -445,7 +449,7 @@ struct
(check_ok ?retry_on_busy ~sql worker (fun dbh sql -> Sqlite3.exec dbh sql) sql)
[%finally
add_worker db worker;
return ()
Lwt.return_unit
]

let raise_error (worker, _) ?sql ?params ?errmsg errcode =
Expand All @@ -454,12 +458,12 @@ struct
type 'a ret = OK of 'a | Error of exn

let bad_maxrows fname batch =
Invalid_argument (sprintf "Sqlexpr_sqlite.%s: bad batch size (%d)" fname batch)
Invalid_argument (Printf.sprintf "Sqlexpr_sqlite.%s: bad batch size (%d)" fname batch)

let read_rows ~fname (worker, stmt) ~sql params ?(batch = 1000) ~cols read =
let open Sqlexpr_sqlite.Types in

if batch < 0 then fail @@ bad_maxrows fname batch else
if batch < 0 then Lwt.fail @@ bad_maxrows fname batch else

detach worker
(fun dbh () ->
Expand All @@ -472,7 +476,7 @@ struct
let cols' = Array.length data in
if cols' <> cols then
let msg =
sprintf
Printf.sprintf
"Sqlexpr_sqlite.%s: wrong number of columns \
(expected %d, got %d) in SQL: %s" fname cols cols' sql
in
Expand All @@ -482,6 +486,9 @@ struct
| OK row -> read_rows_loop (n - 1) (row :: l)
| Error exn -> Batch_error (List.rev l, exn)
end
| Sqlite3.Rc.BUSY when !retry_on_busy ->
Thread.delay 0.01;
read_rows_loop n l
| Sqlite3.Rc.DONE -> Batch_complete (List.rev l)
| rc ->
let errmsg = Sqlite3.errmsg dbh in
Expand Down
12 changes: 1 addition & 11 deletions src/sqlexpr_utils.ml
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@


module Option = struct
let may f = function None -> () | Some x -> f x
let map f = function None -> None | Some x -> Some (f x)
let map_default f d o = match o with
| None -> d
| Some x -> f x
end

module List = struct
let init n f =
let rec init i =
if i=n then []
else f i :: init (i+1)
in init 0
| Some x -> f x
end
Loading