diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml
index 4db6796..7f6aa3c 100644
--- a/async/httpaf_async.ml
+++ b/async/httpaf_async.ml
@@ -154,11 +154,12 @@ module Server = struct
 end
 
 module Client = struct
-  let request ?(config=Config.default) socket request ~error_handler ~response_handler =
+  type t = Client_connection.t
+
+  let create_connection ?(config=Config.default) socket =
     let fd     = Socket.fd socket in
     let writev = Faraday_async.writev_of_fd fd in
-    let request_body, conn   =
-      Client_connection.request request ~error_handler ~response_handler in
+    let conn   = Client_connection.create ~config in
     let read_complete = Ivar.create () in
     let buffer = Buffer.create config.read_buffer_size in
     let rec reader_thread () =
@@ -211,5 +212,11 @@ module Client = struct
       >>| fun () ->
         if not (Fd.is_closed fd)
         then don't_wait_for (Fd.close fd));
-    request_body
+    conn
+
+  let request = Client_connection.request
+
+  let shutdown = Client_connection.shutdown
+
+  let is_closed = Client_connection.is_closed
 end
diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli
index f120624..670d669 100644
--- a/async/httpaf_async.mli
+++ b/async/httpaf_async.mli
@@ -14,11 +14,21 @@ module Server : sig
 end
 
 module Client : sig
-  val request
-    :  ?config          : Config.t
+  type t
+
+  val create_connection
+    : ?config:Config.t
     -> ([`Active], [< Socket.Address.t]) Socket.t
+    -> t
+
+  val request
+    :  t
     -> Request.t
     -> error_handler    : Client_connection.error_handler
     -> response_handler : Client_connection.response_handler
     -> [`write] Body.t
+
+  val shutdown : t -> unit
+
+  val is_closed : t -> bool
 end
diff --git a/examples/async/async_get.ml b/examples/async/async_get.ml
index da49267..7a0b537 100644
--- a/examples/async/async_get.ml
+++ b/examples/async/async_get.ml
@@ -13,11 +13,12 @@ let main port host () =
     let finished = Ivar.create () in
     let response_handler = Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished) in
     let headers = Headers.of_list [ "host", host ] in
+    let connection = Client.create_connection socket in
     let request_body =
       Client.request
-        ~error_handler
+        connection
         ~response_handler
-        socket
+        ~error_handler
         (Request.create ~headers `GET "/")
     in
     Body.close_writer request_body;
diff --git a/examples/async/async_get_pipelined.ml b/examples/async/async_get_pipelined.ml
new file mode 100644
index 0000000..5382a58
--- /dev/null
+++ b/examples/async/async_get_pipelined.ml
@@ -0,0 +1,53 @@
+open! Core
+open Async
+
+open Httpaf
+open Httpaf_async
+
+let error_handler _ = assert false
+
+let main port host () =
+  let where_to_connect = Tcp.Where_to_connect.of_host_and_port { host; port } in
+  Tcp.connect_sock where_to_connect
+  >>= fun socket ->
+    let finished = Ivar.create () in
+    let response_handler = Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished) in
+    let request_headers =
+      Request.create ~headers:(Headers.of_list [ "host", host ]) `GET "/"
+    in
+    let connection = Client.create_connection socket in
+    let request_body =
+      Client.request
+        connection
+        ~response_handler
+        ~error_handler
+        request_headers
+    in
+    let finished' = Ivar.create () in
+    let response_handler' =
+      Httpaf_examples.Client.print ~on_eof:(Ivar.fill finished')
+    in
+    let request_body' =
+      Client.request
+        connection
+        ~response_handler:response_handler'
+        ~error_handler
+        request_headers
+    in
+    Body.close_writer request_body';
+    Body.close_writer request_body;
+    Async.Deferred.all_unit [Ivar.read finished; Ivar.read finished'] >>| fun () ->
+      Client.shutdown connection
+;;
+
+let () =
+  Command.async
+    ~summary:"Start a hello world Async client"
+    Command.Param.(
+      map (both
+          (flag "-p" (optional_with_default 80 int)
+            ~doc:"int destination port")
+          (anon ("host" %: string)))
+        ~f:(fun (port, host) ->
+              (fun () -> main port host ())))
+  |> Command.run
diff --git a/examples/async/async_post.ml b/examples/async/async_post.ml
index c2e9702..2f34668 100644
--- a/examples/async/async_post.ml
+++ b/examples/async/async_post.ml
@@ -19,11 +19,12 @@ let main port host () =
       ; "host"             , host
       ]
     in
+    let connection = Client.create_connection socket in
     let request_body =
       Client.request
-        ~error_handler
+        connection
         ~response_handler
-        socket
+        ~error_handler
         (Request.create ~headers `POST "/")
     in
     let stdin = Lazy.force Reader.stdin in
diff --git a/examples/async/dune b/examples/async/dune
index 4008a21..868edbe 100644
--- a/examples/async/dune
+++ b/examples/async/dune
@@ -1,7 +1,8 @@
 (executables
  (libraries httpaf httpaf-async httpaf_examples async core)
- (names     async_echo_post async_get async_post))
+ (names async_echo_post async_get async_get_pipelined async_post))
 
 (alias
  (name examples)
- (deps (glob_files *.exe)))
+ (deps
+  (glob_files *.exe)))
diff --git a/examples/lwt/dune b/examples/lwt/dune
index 1b84bf1..1118a01 100644
--- a/examples/lwt/dune
+++ b/examples/lwt/dune
@@ -1,6 +1,7 @@
 (executables
  (libraries httpaf httpaf-lwt-unix httpaf_examples base stdio lwt lwt.unix)
- (names lwt_get lwt_post lwt_echo_post lwt_https_get lwt_https_server))
+ (names lwt_get lwt_get_pipelined lwt_post lwt_echo_post lwt_https_get
+   lwt_https_server))
 
 (alias
  (name examples)
diff --git a/examples/lwt/lwt_get.ml b/examples/lwt/lwt_get.ml
index c109d51..7671c94 100644
--- a/examples/lwt/lwt_get.ml
+++ b/examples/lwt/lwt_get.ml
@@ -18,15 +18,31 @@ let main port host =
     Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
   in
   let headers = Headers.of_list [ "host", host ] in
+  Client.create_connection socket >>= fun connection ->
   let request_body =
     Client.request
+      connection
+      ~response_handler
       ~error_handler
+      (Request.create ~headers `GET "/")
+  in
+  Body.close_writer request_body;
+  finished >>= fun () ->
+  let finished, notify_finished = Lwt.wait () in
+  let response_handler =
+    Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
+  in
+  let headers = Headers.of_list [ "host", host ] in
+  let request_body =
+    Client.request
+      connection
       ~response_handler
-      socket
+      ~error_handler
       (Request.create ~headers `GET "/")
   in
   Body.close_writer request_body;
-  finished
+  finished >|= fun () ->
+    Client.shutdown connection
 ;;
 
 let () =
diff --git a/examples/lwt/lwt_get_pipelined.ml b/examples/lwt/lwt_get_pipelined.ml
new file mode 100644
index 0000000..d029a1e
--- /dev/null
+++ b/examples/lwt/lwt_get_pipelined.ml
@@ -0,0 +1,61 @@
+open Base
+open Lwt.Infix
+module Arg = Caml.Arg
+
+open Httpaf
+open Httpaf_lwt_unix
+
+let error_handler _ = assert false
+
+let main port host =
+  Lwt_unix.getaddrinfo host (Int.to_string port) [Unix.(AI_FAMILY PF_INET)]
+  >>= fun addresses ->
+  let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
+  Lwt_unix.connect socket (List.hd_exn addresses).Unix.ai_addr
+  >>= fun () ->
+  let finished, notify_finished = Lwt.wait () in
+  let response_handler =
+    Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
+  in
+  let request_headers =
+    Request.create ~headers:(Headers.of_list [ "host", host ]) `GET "/"
+  in
+  Client.create_connection socket >>= fun connection ->
+  let request_body =
+    Client.request
+      connection
+      ~response_handler
+      ~error_handler
+      request_headers
+  in
+  let finished', notify_finished' = Lwt.wait () in
+  let response_handler' =
+    Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished')
+  in
+  let request_body' =
+    Client.request
+      connection
+      ~response_handler:response_handler'
+      ~error_handler
+      request_headers
+  in
+  Body.close_writer request_body';
+  Body.close_writer request_body;
+  Lwt.join [finished; finished'] >|= fun () ->
+    Client.shutdown connection
+;;
+
+let () =
+  let host = ref None in
+  let port = ref 80 in
+  Arg.parse
+    ["-p", Set_int port, " Port number (80 by default)"]
+    (fun host_argument -> host := Some host_argument)
+    "lwt_get.exe [-p N] HOST";
+  let host =
+    match !host with
+    | None -> failwith "No hostname provided"
+    | Some host -> host
+  in
+  Lwt_main.run (main !port host)
+;;
diff --git a/examples/lwt/lwt_https_get.ml b/examples/lwt/lwt_https_get.ml
index 6546f80..da257c9 100644
--- a/examples/lwt/lwt_https_get.ml
+++ b/examples/lwt/lwt_https_get.ml
@@ -18,12 +18,13 @@ let main port host =
     Httpaf_examples.Client.print ~on_eof:(Lwt.wakeup_later notify_finished)
   in
   let headers = Headers.of_list [ "host", host ] in
-  Client.TLS.request
+  Client.TLS.create_connection socket >>= fun connection ->
+  let request_body = Client.TLS.request
+    connection
     ~error_handler
     ~response_handler
-    socket
     (Request.create ~headers `GET "/")
-  >>= fun request_body ->
+  in
   Body.close_writer request_body;
   finished
 ;;
diff --git a/examples/lwt/lwt_post.ml b/examples/lwt/lwt_post.ml
index 91451ad..6f1af57 100644
--- a/examples/lwt/lwt_post.ml
+++ b/examples/lwt/lwt_post.ml
@@ -26,11 +26,12 @@ let main port host =
     ; "host"             , host
     ]
   in
+  Client.create_connection socket >>= fun connection ->
   let request_body =
     Client.request
-      ~error_handler
+      connection
       ~response_handler
-      socket
+      ~error_handler
       (Request.create ~headers `POST "/")
   in
   Body.write_string request_body body;
diff --git a/lib/body.ml b/lib/body.ml
index 60d3fd3..ff4513e 100644
--- a/lib/body.ml
+++ b/lib/body.ml
@@ -59,6 +59,7 @@ let create buffer =
 
 let create_empty () =
   let t = create Bigstringaf.empty in
+  t.write_final_if_chunked <- false;
   Faraday.close t.faraday;
   t
 
diff --git a/lib/client_connection.ml b/lib/client_connection.ml
index 502f729..d5f215e 100644
--- a/lib/client_connection.ml
+++ b/lib/client_connection.ml
@@ -1,5 +1,6 @@
 (*----------------------------------------------------------------------------
     Copyright (c) 2017-2019 Inhabited Type LLC.
+    Copyright (c) 2019 Antonio Nuno Monteiro.
 
     All rights reserved.
 
@@ -34,176 +35,238 @@
 module Reader = Parse.Reader
 module Writer = Serialize.Writer
 
-module Oneshot = struct
-  type error =
-    [ `Malformed_response of string | `Invalid_response_body_length of Response.t | `Exn of exn ]
-
-  type response_handler = Response.t -> [`read] Body.t  -> unit
-  type error_handler = error -> unit
-
-  type state =
-    | Awaiting_response
-    | Received_response of Response.t * [`read] Body.t
-    | Closed
-
-  type t =
-    { request          : Request.t
-    ; request_body     : [ `write ] Body.t
-    ; error_handler    : (error -> unit)
-    ; reader : Reader.response
-    ; writer : Writer.t
-    ; state  : state ref
-    ; mutable error_code : [ `Ok | error ]
-    }
-
-  let request ?(config=Config.default) request ~error_handler ~response_handler =
-    let state = ref Awaiting_response in
-    let request_method = request.Request.meth in
-    let handler response body =
-      state := Received_response(response, body);
-      response_handler response body
-    in
-    let request_body = Body.create (Bigstringaf.create config.request_body_buffer_size) in
-    let t =
-      { request
-      ; request_body
-      ; error_handler
-      ; error_code = `Ok
-      ; reader = Reader.response ~request_method handler
-      ; writer = Writer.create ()
-      ; state }
-    in
-    Writer.write_request t.writer request;
-    request_body, t
-  ;;
-
-  let flush_request_body t =
-    if Body.has_pending_output t.request_body
-    then
-      let encoding =
-        match Request.body_length t.request with
-        | `Fixed _ | `Chunked as encoding -> encoding
-        | `Error _ -> assert false (* XXX(seliopou): This needs to be handled properly *)
-      in
-      Body.transfer_to_writer_with_encoding t.request_body ~encoding t.writer
-  ;;
-
-  let set_error_and_handle_without_shutdown t error =
-    t.state := Closed;
-    t.error_code <- (error :> [`Ok | error]);
-    t.error_handler error;
-  ;;
-
-  let unexpected_eof t =
-    set_error_and_handle_without_shutdown t (`Malformed_response "unexpected eof");
-  ;;
-
-  let shutdown_reader t =
-    Reader.force_close t.reader;
-    begin match !(t.state) with
-    | Awaiting_response -> unexpected_eof t;
-    | Closed -> ()
-    | Received_response(_, response_body) ->
-      Body.close_reader response_body;
-      Body.execute_read response_body;
-    end;
-  ;;
-
-  let shutdown_writer t =
-    flush_request_body t;
-    Writer.close t.writer;
-    Body.close_writer t.request_body;
-  ;;
-
-  let shutdown t =
-    shutdown_reader t;
-    shutdown_writer t;
-  ;;
-
-  let set_error_and_handle t error =
-    Reader.force_close t.reader;
-    begin match !(t.state) with
-    | Closed -> ()
-    | Awaiting_response ->
-      set_error_and_handle_without_shutdown t error;
-    | Received_response(_, response_body) ->
-      Body.close_reader response_body;
-      Body.execute_read response_body;
-      set_error_and_handle_without_shutdown t error;
+type error =
+  [ `Malformed_response of string | `Invalid_response_body_length of Response.t | `Exn of exn ]
+
+type response_handler = Response.t -> [`read] Body.t  -> unit
+type error_handler = error -> unit
+
+type t =
+  { config : Config.t
+  ; reader : Reader.response
+  ; writer : Writer.t
+  ; request_queue : Respd.t Queue.t
+    (* invariant: If [request_queue] is not empty, then the head of the queue
+       has already written the request headers to the wire. *)
+  ; wakeup_writer  : (unit -> unit) list ref
+  }
+
+let is_closed t =
+  Reader.is_closed t.reader && Writer.is_closed t.writer
+
+let is_waiting t =
+  not (is_closed t) && Queue.is_empty t.request_queue
+
+let is_active t =
+  not (Queue.is_empty t.request_queue)
+
+let current_respd_exn t =
+  Queue.peek t.request_queue
+
+let on_wakeup_writer t k =
+  if is_closed t
+  then failwith "on_wakeup_writer on closed conn"
+  else t.wakeup_writer := k::!(t.wakeup_writer)
+
+let _wakeup_writer callbacks =
+  let fs = !callbacks in
+  callbacks := [];
+  List.iter (fun f -> f ()) fs
+
+let wakeup_writer t =
+  _wakeup_writer t.wakeup_writer
+
+let[@ocaml.warning "-16"] create ?(config=Config.default) =
+  let request_queue = Queue.create () in
+  { config
+  ; reader = Reader.response request_queue
+  ; writer = Writer.create ()
+  ; request_queue
+  ; wakeup_writer = ref []
+  }
+
+let request t request ~error_handler ~response_handler =
+  let request_body =
+    Body.create (Bigstringaf.create t.config.request_body_buffer_size)
+  in
+  let respd =
+    Respd.create error_handler request request_body t.writer response_handler in
+  let handle_now = Queue.is_empty t.request_queue in
+  Queue.push respd t.request_queue;
+  if handle_now then
+    Respd.write_request respd;
+  (* Not handling the request now means it may be pipelined.
+   * `advance_request_queue_if_necessary` will take care of it, but we still
+   * wanna wake up the writer so that the function gets called. *)
+  _wakeup_writer t.wakeup_writer;
+  request_body
+;;
+
+let flush_request_body t =
+  if is_active t then begin
+    let respd = current_respd_exn t in
+    Respd.flush_request_body respd
+  end
+
+let set_error_and_handle_without_shutdown t error =
+  if is_active t then begin
+    let respd = current_respd_exn t in
+    Respd.report_error respd error
+  end
+  (* TODO: not active?! can be because of a closed FD for example. *)
+;;
+
+let unexpected_eof t =
+  set_error_and_handle_without_shutdown t (`Malformed_response "unexpected eof");
+;;
+
+let shutdown_reader t =
+  Reader.force_close t.reader;
+  if is_active t
+  then Respd.close_response_body (current_respd_exn t)
+
+let shutdown_writer t =
+  flush_request_body t;
+  Writer.close t.writer;
+  if is_active t then begin
+    let respd = current_respd_exn t in
+    Body.close_writer respd.request_body;
+  end
+;;
+
+let shutdown t =
+  shutdown_reader t;
+  shutdown_writer t;
+;;
+
+(* TODO: Need to check in the RFC if reporting an error, e.g. in a malformed
+ * response causes the whole connection to shutdown. *)
+let set_error_and_handle t error =
+  shutdown t;
+  set_error_and_handle_without_shutdown t error;
+;;
+
+let report_exn t exn =
+  set_error_and_handle t (`Exn exn)
+;;
+
+exception Local
+
+let maybe_pipeline_queued_requests t =
+  (* Don't bother trying to pipeline if there aren't multiple requests in the
+   * queue. *)
+  if Queue.length t.request_queue > 1 then begin
+    match Queue.fold (fun prev respd ->
+      begin match prev with
+      | None -> ()
+      | Some prev ->
+        if respd.Respd.state = Uninitialized && not (Respd.requires_output prev)
+        then Respd.write_request respd
+        else
+          (* bail early. If we can't pipeline this request, we can't write
+           * next ones either. *)
+          raise Local
+      end;
+      Some respd)
+      None
+      t.request_queue
+    with
+    | _ -> ()
+    | exception Local -> ()
+  end
+
+let advance_request_queue_if_necessary t =
+  if is_active t then begin
+    let respd = current_respd_exn t in
+    if Respd.persistent_connection respd then begin
+      if Respd.is_complete respd then begin
+        ignore (Queue.take t.request_queue);
+        if not (Queue.is_empty t.request_queue) then begin
+          (* write request to the wire *)
+          let respd = current_respd_exn t in
+          Respd.write_request respd;
+        end;
+        wakeup_writer t;
+      end else if not (Respd.requires_output respd) then
+        (* From RFC7230ยง6.3.2:
+         *   A client that supports persistent connections MAY "pipeline" its
+         *   requests (i.e., send multiple requests without waiting for each
+         *   response). *)
+        maybe_pipeline_queued_requests t
+    end else begin
+      ignore (Queue.take t.request_queue);
+      Queue.iter Respd.close_response_body t.request_queue;
+      Queue.clear t.request_queue;
+      Queue.push respd t.request_queue;
+      wakeup_writer t;
+      if Respd.is_complete respd
+      then shutdown t
+      else if not (Respd.requires_output respd)
+      then shutdown_writer t
     end
-  ;;
-
-  let report_exn t exn =
-    set_error_and_handle t (`Exn exn)
-  ;;
-
-  let flush_response_body t =
-    match !(t.state) with
-    | Awaiting_response | Closed -> ()
-    | Received_response(_, response_body) ->
-      try Body.execute_read response_body
-      with exn -> report_exn t exn
-  ;;
-
-  let _next_read_operation t =
-    match !(t.state) with
-    | Awaiting_response | Closed -> Reader.next t.reader
-    | Received_response(_, response_body) ->
-      if not (Body.is_closed response_body)
-      then Reader.next t.reader
-      else begin
-        Reader.force_close t.reader;
-        Reader.next        t.reader
-      end
-  ;;
-
-  let next_read_operation t =
-    match _next_read_operation t with
-    | `Error (`Parse(marks, message)) ->
-      let message = String.concat "" [ String.concat ">" marks; ": "; message] in
-      set_error_and_handle t (`Malformed_response message);
-      `Close
-    | `Error (`Invalid_response_body_length _ as error) ->
-      set_error_and_handle t error;
-      `Close
-    | (`Read | `Close) as operation -> operation
-  ;;
-
-  let read_with_more t bs ~off ~len more =
-    let consumed = Reader.read_with_more t.reader bs ~off ~len more in
-    flush_response_body t;
-    consumed
-  ;;
-
-  let read t bs ~off ~len =
-    read_with_more t bs ~off ~len Incomplete
-
-  let read_eof t bs ~off ~len =
-    let bytes_read = read_with_more t bs ~off ~len Complete in
-    begin match !(t.state) with
+  end else if Reader.is_closed t.reader
+  then shutdown t
+
+let next_read_operation t =
+  advance_request_queue_if_necessary t;
+  match Reader.next t.reader with
+  | `Error (`Parse(marks, message)) ->
+    let message = String.concat "" [ String.concat ">" marks; ": "; message] in
+    set_error_and_handle t (`Malformed_response message);
+    `Close
+  | `Error (`Invalid_response_body_length _ as error) ->
+    set_error_and_handle t error;
+    `Close
+  | (`Read | `Close) as operation -> operation
+;;
+
+let read_with_more t bs ~off ~len more =
+  let consumed = Reader.read_with_more t.reader bs ~off ~len more in
+  if is_active t then
+    Respd.flush_response_body (current_respd_exn t);
+  consumed
+;;
+
+let read t bs ~off ~len =
+  read_with_more t bs ~off ~len Incomplete
+
+let read_eof t bs ~off ~len =
+  let bytes_read = read_with_more t bs ~off ~len Complete in
+  if is_active t then begin
+    let respd = current_respd_exn t in
+    (* TODO: could just check for `Respd.requires_input`? *)
+    match respd.state with
+    | Uninitialized -> assert false
     | Received_response _ | Closed -> ()
-    | Awaiting_response -> unexpected_eof t;
-    end;
-    bytes_read
-  ;;
-
-  let next_write_operation t =
-    flush_request_body t;
-    if Body.is_closed t.request_body
-    then Writer.close t.writer;
-    Writer.next t.writer
-  ;;
-
-  let yield_writer t k =
-    if Body.is_closed t.request_body
-    then begin
+    | Awaiting_response ->
+      (* TODO: review this. It makes sense to tear down the connection if an
+       * unexpected EOF is received. *)
+      shutdown t;
+      unexpected_eof t
+  end;
+  bytes_read
+;;
+
+let next_write_operation t =
+  advance_request_queue_if_necessary t;
+  flush_request_body t;
+  Writer.next t.writer
+;;
+
+let yield_writer t k =
+  if is_active t then begin
+    let respd = current_respd_exn t in
+    if Respd.requires_output respd then
+      Respd.on_more_output_available respd k
+    else if Respd.persistent_connection respd then
+      on_wakeup_writer t k
+    else begin
+      (*  TODO: call shutdown? *)
       Writer.close t.writer;
       k ()
-    end else
-      Body.when_ready_to_write t.request_body k
-
-  let report_write_result t result =
-    Writer.report_result t.writer result
+    end
+  end else
+    on_wakeup_writer t k
 
-  let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer
-end
+let report_write_result t result =
+  Writer.report_result t.writer result
diff --git a/lib/httpaf.ml b/lib/httpaf.ml
index 6d16dab..da47492 100644
--- a/lib/httpaf.ml
+++ b/lib/httpaf.ml
@@ -10,7 +10,7 @@ module Body = Body
 module Config = Config
 
 module Server_connection = Server_connection
-module Client_connection = Client_connection.Oneshot
+module Client_connection = Client_connection
 
 module Httpaf_private = struct
   module Parse = Parse
diff --git a/lib/httpaf.mli b/lib/httpaf.mli
index b2c341f..3f15231 100644
--- a/lib/httpaf.mli
+++ b/lib/httpaf.mli
@@ -1,5 +1,6 @@
 (*----------------------------------------------------------------------------
     Copyright (c) 2017 Inhabited Type LLC.
+    Copyright (c) 2019 Antonio Nuno Monteiro.
 
     All rights reserved.
 
@@ -760,12 +761,14 @@ module Client_connection : sig
 
   type error_handler = error -> unit
 
+  val create : ?config:Config.t -> t
+
   val request
-    :  ?config:Config.t
+    :  t
     -> Request.t
     -> error_handler:error_handler
     -> response_handler:response_handler
-    -> [`write] Body.t * t
+    -> [`write] Body.t
 
   val next_read_operation : t -> [ `Read | `Close ]
   (** [next_read_operation t] returns a value describing the next operation
@@ -817,10 +820,15 @@ module Client_connection : sig
       may call its error handler before terminating the connection. *)
 
   val is_closed : t -> bool
+  (** [is_closed t] is [true] if both the read and write processors have been
+      shutdown. When this is the case {!next_read_operation} will return
+      [`Close _] and {!next_write_operation} will return [`Write _] until all
+      buffered output has been flushed, at which point it will also return
+      `Close. *)
 
-  (**/**)
   val shutdown : t -> unit
-  (**/**)
+  (** [shutdown connection] closes the underlying input and output channels of
+      the connection, rendering it unusable for any further communication. *)
 end
 
 (**/**)
diff --git a/lib/parse.ml b/lib/parse.ml
index cf9d0a2..2f870a3 100644
--- a/lib/parse.ml
+++ b/lib/parse.ml
@@ -252,19 +252,30 @@ module Reader = struct
     in
     create parser
 
-  let response ~request_method handler =
+  let response request_queue =
     let parser =
       response <* commit >>= fun response ->
+      assert (not (Queue.is_empty request_queue));
+      let exception Local of Respd.t in
+      let respd = match
+        (Queue.iter (fun respd ->
+          if respd.Respd.state = Awaiting_response then
+            raise (Local respd)) request_queue)
+        with
+        | exception Local respd -> respd
+        | _ -> assert false
+      in
+      let request = Respd.request respd in
       let proxy = false in
-      match Response.body_length ~request_method response with
+      match Response.body_length ~request_method:request.meth response with
       | `Error `Bad_gateway           -> assert (not proxy); assert false
       | `Error `Internal_server_error -> return (Error (`Invalid_response_body_length response))
       | `Fixed 0L ->
-        handler response Body.empty;
+        respd.response_handler response Body.empty;
         ok
       | `Fixed _ | `Chunked | `Close_delimited as encoding ->
         let response_body = Body.create Bigstringaf.empty in
-        handler response response_body;
+        respd.response_handler response response_body;
         body ~encoding response_body *> ok
     in
     create parser
diff --git a/lib/respd.ml b/lib/respd.ml
new file mode 100644
index 0000000..43e0f9d
--- /dev/null
+++ b/lib/respd.ml
@@ -0,0 +1,118 @@
+module Writer = Serialize.Writer
+
+type error =
+  [ `Malformed_response of string
+  | `Invalid_response_body_length of Response.t
+  | `Exn of exn ]
+
+type state =
+  | Uninitialized
+  | Awaiting_response
+  | Received_response of Response.t * [`read] Body.t
+  | Closed
+
+type t =
+  { request          : Request.t
+  ; request_body     : [ `write ] Body.t
+  ; response_handler : (Response.t -> [`read] Body.t -> unit)
+  ; error_handler    : (error -> unit)
+  ; mutable error_code : [ `Ok | error ]
+  ; writer : Writer.t
+  ; mutable state    : state
+  ; mutable persistent      : bool
+  }
+
+let create error_handler request request_body writer response_handler =
+  let rec handler response body =
+    let t = Lazy.force t in
+    if t.persistent then
+      t.persistent <- Response.persistent_connection response;
+    t.state <- Received_response(response, body);
+    response_handler response body
+  and t =
+    lazy
+    { request
+    ; request_body
+    ; response_handler = handler
+    ; error_handler
+    ; error_code = `Ok
+    ; writer
+    ; state = Uninitialized
+    ; persistent = Request.persistent_connection request
+    }
+  in
+  Lazy.force t
+
+let request { request; _ } = request
+
+let request_body { request_body; _ } = request_body
+
+let write_request t =
+  Writer.write_request t.writer t.request;
+  t.state <- Awaiting_response
+
+let on_more_output_available { request_body; _ } f =
+  Body.when_ready_to_write request_body f
+
+let report_error t error =
+  (* t.persistent <- false; *)
+  (* TODO: drain queue? *)
+  match t.state, t.error_code with
+  | (Uninitialized | Awaiting_response | Received_response _), `Ok ->
+    t.state <- Closed;
+    t.error_code <- (error :> [`Ok | error]);
+    t.error_handler error
+  | Uninitialized, `Exn _ ->
+    (* TODO(anmonteiro): Not entirely sure this is possible in the client. *)
+    failwith "httpaf.Reqd.report_exn: NYI"
+  | (Uninitialized | Awaiting_response | Received_response _ | Closed), _ ->
+    (* XXX(seliopou): Once additional logging support is added, log the error
+     * in case it is not spurious. *)
+    ()
+
+let persistent_connection t =
+  t.persistent
+
+let close_response_body t =
+  match t.state with
+  | Uninitialized
+  | Awaiting_response
+  | Closed -> ()
+  | Received_response (_, response_body) ->
+    Body.close_reader response_body
+
+let requires_input t =
+  match t.state with
+  | Uninitialized -> true
+  | Awaiting_response -> true
+  | Received_response (_, response_body) ->
+    not (Body.is_closed response_body)
+  | Closed -> false
+
+let requires_output { request_body; state; _ } =
+  state = Uninitialized ||
+  not (Body.is_closed request_body) ||
+  Body.has_pending_output request_body
+
+let is_complete t =
+  not (requires_input t || requires_output t)
+
+let flush_request_body { request; request_body; writer; _ } =
+  if Body.has_pending_output request_body
+  then
+    let encoding =
+      match Request.body_length request with
+      | `Fixed _ | `Chunked as encoding -> encoding
+      | `Error _ -> assert false (* XXX(seliopou): This needs to be handled properly *)
+    in
+    Body.transfer_to_writer_with_encoding request_body ~encoding writer
+
+let flush_response_body t =
+  match t.state with
+  | Uninitialized | Awaiting_response | Closed -> ()
+  | Received_response(_, response_body) ->
+    try Body.execute_read response_body
+    (* TODO: report_exn *)
+    with _exn ->
+      Format.eprintf "EXN@."
+    (* report_exn t exn *)
diff --git a/lib/server_connection.ml b/lib/server_connection.ml
index 3ab5944..e51cf7e 100644
--- a/lib/server_connection.ml
+++ b/lib/server_connection.ml
@@ -32,17 +32,6 @@
   ----------------------------------------------------------------------------*)
 
 
-module Queue = struct
-  include Queue
-
-  let peek_exn = peek
-
-  let peek t =
-    if is_empty t
-    then None
-    else Some (peek_exn t)
-end
-
 module Reader = Parse.Reader
 module Writer = Serialize.Writer
 
@@ -82,7 +71,7 @@ let is_active t =
   not (Queue.is_empty t.request_queue)
 
 let current_reqd_exn t =
-  Queue.peek_exn t.request_queue
+  Queue.peek t.request_queue
 
 let yield_reader t k =
   if is_closed t
diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml
index f170e1b..65cdf88 100644
--- a/lib_test/test_client_connection.ml
+++ b/lib_test/test_client_connection.ml
@@ -72,30 +72,53 @@ let test_get () =
   let response = Response.create `OK in
 
   (* Single GET *)
-  let body, t =
+  let t = create ?config:None in
+  let body =
     request
+      t
       request'
       ~response_handler:(default_response_handler response)
       ~error_handler:no_error_handler
   in
   Body.close_writer body;
-  write_request  t request';
-  writer_closed  t;
-  read_response  t response;
+  write_request t request';
+  read_response t response;
+
+  (* Single GET, request closes the connection. *)
+  let request_close =
+    Request.create
+      ~headers:(Headers.of_list ["connection", "close"])
+      `GET "/"
+  in
+  let t = create ?config:None in
+  let body =
+    request
+      t
+      request_close
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body;
+  write_request t request_close;
+  writer_closed t;
+  read_response t response;
 
   (* Single GET, response closes connection *)
   let response =
     Response.create `OK ~headers:(Headers.of_list [ "connection", "close" ])
   in
-  let body, t =
+  let t = create ?config:None in
+  let body =
     request
+      t
       request'
       ~response_handler:(default_response_handler response)
       ~error_handler:no_error_handler
   in
   Body.close_writer body;
-  write_request  t request';
-  read_response  t response;
+  write_request t request';
+  read_response t response;
+  writer_closed t;
   let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in
   Alcotest.(check int) "read_eof with no input returns 0" 0 c;
   connection_is_shutdown t;
@@ -104,16 +127,56 @@ let test_get () =
   let response =
     Response.create `OK ~headers:(Headers.of_list [ "transfer-encoding", "chunked" ])
   in
-  let body, t =
+  let t = create ?config:None in
+  let body =
     request
+      t
       request'
       ~response_handler:(default_response_handler response)
       ~error_handler:no_error_handler
   in
   Body.close_writer body;
-  write_request  t request';
-  read_response  t response;
-  read_string    t "d\r\nHello, world!\r\n0\r\n\r\n";
+  write_request t request';
+  read_response t response;
+  read_string t "d\r\nHello, world!\r\n0\r\n\r\n";
+;;
+
+let test_get_last_close () =
+  (* Multiple GET requests, the last one closes the connection *)
+  let request' = Request.create `GET "/" in
+  let response =
+    Response.create ~headers:(Headers.of_list ["content-length", "0"]) `OK
+  in
+  let t = create ?config:None in
+  let body =
+    request
+      t
+      request'
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body;
+  write_request t request';
+  read_response t response;
+
+  let request'' =
+    Request.create ~headers:(Headers.of_list ["connection", "close"]) `GET "/"
+  in
+  let body' =
+    request
+      t
+      request''
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body';
+  write_request t request'';
+  read_response t response;
+
+  writer_closed t;
+  let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in
+  Alcotest.(check int) "read_eof with no input returns 0" 0 c;
+  connection_is_shutdown t;
 ;;
 
 let test_response_eof () =
@@ -121,8 +184,10 @@ let test_response_eof () =
   let response = Response.create `OK in (* not actually writen to the channel *)
 
   let error_message = ref None in
-  let body, t =
+  let t = create ?config:None in
+  let body =
     request
+      t
       request'
       ~response_handler:(default_response_handler response)
       ~error_handler:(function
@@ -130,8 +195,7 @@ let test_response_eof () =
         | _ -> assert false)
   in
   Body.close_writer body;
-  write_request  t request';
-  writer_closed  t;
+  write_request t request';
   reader_ready t;
   let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in
   Alcotest.(check int) "read_eof with no input returns 0" 0 c;
@@ -141,6 +205,138 @@ let test_response_eof () =
     !error_message
 ;;
 
+let test_persistent_connection_requests () =
+  let request' = Request.create `GET "/" in
+  let response =
+    Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `OK
+  in
+  let t = create ?config:None in
+  let body =
+    request
+      t
+      request'
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body;
+  write_request t request';
+  read_response t response;
+  writer_yielded t;
+  reader_ready t;
+  let body' =
+    request
+      t
+      request'
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body';
+  write_request t request';
+  read_response t response;
+;;
+
+let test_persistent_connection_requests_pipelining () =
+  let request' = Request.create `GET "/" in
+  let response =
+    Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `OK
+  in
+  let t = create ?config:None in
+  let body =
+    request
+      t
+      request'
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body;
+  write_request t request';
+  (* send the 2nd request without reading the response *)
+  let response' =
+    Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `Not_found
+  in
+  let body' =
+    request
+      t
+      request'
+      ~response_handler:(fun response body ->
+        (default_response_handler response' response body))
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body';
+  write_request t request';
+  read_response t response;
+  read_response t response';
+;;
+
+let test_persistent_connection_requests_pipelining_send_body () =
+  let request' =
+    Request.create ~headers:(Headers.of_list [ "content-length", "8" ]) `GET "/"
+  in
+  let response =
+    Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `OK
+  in
+  let t = create ?config:None in
+  let body =
+    request
+      t
+      request'
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  write_request t request';
+  (* send the 2nd request without reading the response *)
+  let request'' = Request.create `GET "/" in
+  let response' =
+    Response.create ~headers:(Headers.of_list [ "content-length", "0" ]) `Not_found
+  in
+  let body' =
+    request
+      t
+      request''
+      ~response_handler:(fun response body ->
+        (default_response_handler response' response body))
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body';
+  Body.write_string body "a string";
+  Body.close_writer body;
+  write_string ~msg:"writes the body for the first request" t "a string";
+  write_request t request'';
+  read_response t response;
+  read_response t response';
+;;
+
+let test_persistent_connection_requests_body () =
+  let request' = Request.create `GET "/" in
+  let request'' = Request.create `GET "/second" in
+  let response =
+    Response.create ~headers:(Headers.of_list [ "content-length", "10" ]) `OK
+  in
+  let t = create ?config:None in
+  let body =
+    request
+      t
+      request'
+      ~response_handler:(default_response_handler response)
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body;
+  write_request t request';
+  let response' = Response.create `OK in
+  read_response t response;
+  read_string t "ten chars.";
+  let body' =
+    request
+      t
+      request''
+      ~response_handler:(default_response_handler response')
+      ~error_handler:no_error_handler
+  in
+  Body.close_writer body';
+  write_request t request'';
+  read_response t response';
+;;
+
 let test_response_header_order () =
   let request' = Request.create `GET "/" in
   let headers =
@@ -151,15 +347,17 @@ let test_response_header_order () =
   in
   let response = Response.create `OK ~headers:(Headers.of_list headers) in
   let received = ref None in
-  let body, t =
+  let t = create ?config:None in
+  let body =
     request
+      t
       request'
       ~response_handler:(fun response _ -> received := Some response)
       ~error_handler:no_error_handler
   in
   Body.close_writer body;
   write_request t request';
-  writer_closed t;
+  writer_yielded t;
   read_response t response;
   match !received with
   | None -> assert false
@@ -173,8 +371,10 @@ let test_report_exn () =
   let response = Response.create `OK in (* not actually writen to the channel *)
 
   let error_message = ref None in
-  let body, t =
+  let t = create ?config:None in
+  let body =
     request
+      t
       request'
       ~response_handler:(default_response_handler response)
       ~error_handler:(function
@@ -183,7 +383,7 @@ let test_report_exn () =
   in
   Body.close_writer body;
   write_request  t request';
-  writer_closed  t;
+  writer_yielded  t;
   reader_ready t;
   report_exn t (Failure "something went wrong");
   connection_is_shutdown t;
@@ -197,8 +397,10 @@ let test_input_shrunk () =
   let response = Response.create `OK in (* not actually writen to the channel *)
 
   let error_message = ref None in
-  let body, t =
+  let t = create ?config:None in
+  let body =
     request
+      t
       request'
       ~response_handler:(default_response_handler response)
       ~error_handler:(function
@@ -207,7 +409,7 @@ let test_input_shrunk () =
   in
   Body.close_writer body;
   write_request  t request';
-  writer_closed  t;
+  writer_yielded  t;
   reader_ready t;
   let c = feed_string  t "HTTP/1.1 200 OK\r\nDate" in
   Alcotest.(check int) "read the status line" c 17;
@@ -224,4 +426,15 @@ let tests =
   ; "Response header order preserved", `Quick, test_response_header_order
   ; "report_exn"  , `Quick, test_report_exn
   ; "input_shrunk", `Quick, test_input_shrunk
+  ; "multiple GET, last request closes connection", `Quick, test_get_last_close
+  ; "Persistent connection, multiple GETs", `Quick, test_persistent_connection_requests
+  ; "Persistent connection, request pipelining", `Quick, test_persistent_connection_requests_pipelining
+  ; "Persistent connection, first request includes body", `Quick, test_persistent_connection_requests_pipelining_send_body
+  ; "Persistent connections, read response body", `Quick, test_persistent_connection_requests_body
   ]
+
+(*
+ * TODO:
+ * - test client connection error handling
+ *
+ *)
diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml
index 28d0547..4b84164 100644
--- a/lwt-unix/httpaf_lwt_unix.ml
+++ b/lwt-unix/httpaf_lwt_unix.ml
@@ -136,16 +136,22 @@ module Client = struct
   module TLS = struct
     include Httpaf_lwt.Client (Tls_io.Io)
 
-    let request ?client ?(config=Config.default) socket request_headers ~error_handler ~response_handler =
-      Tls_io.make_client ?client socket >|= fun tls_client ->
-      request ~config (socket, tls_client) request_headers ~error_handler ~response_handler
+    let create_connection ?client ?(config = Config.default) =
+      let make_tls_client = Tls_io.make_client ?client in
+      fun socket ->
+        make_tls_client socket >>= fun tls_client ->
+        create_connection
+          ~config
+          (socket, tls_client)
   end
 
   module SSL = struct
     include Httpaf_lwt.Client (Ssl_io.Io)
 
-    let request ?client ?(config=Config.default) socket request_headers ~error_handler ~response_handler =
-      Ssl_io.make_client ?client socket >|= fun ssl_client ->
-      request ~config ssl_client request_headers ~error_handler ~response_handler
+    let create_connection ?client ?(config = Config.default) =
+      let make_ssl_client = Ssl_io.make_client ?client in
+      fun socket ->
+        make_ssl_client socket >>= fun ssl_client ->
+        create_connection ~config ssl_client
   end
 end
diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli
index f08c718..e48ffbb 100644
--- a/lwt-unix/httpaf_lwt_unix.mli
+++ b/lwt-unix/httpaf_lwt_unix.mli
@@ -77,33 +77,63 @@ end
 
 (* For an example, see [examples/lwt_get.ml]. *)
 module Client : sig
-  val request
-    :  ?config          : Config.t
+  type t
+
+  val create_connection
+    : ?config:Config.t
     -> Lwt_unix.file_descr
+    -> t Lwt.t
+
+  val request
+    :  t
     -> Request.t
     -> error_handler    : Client_connection.error_handler
     -> response_handler : Client_connection.response_handler
     -> [`write] Body.t
 
+  val shutdown : t -> unit
+
+  val is_closed : t -> bool
+
   module TLS : sig
-    val request
+    type t
+
+    val create_connection
       :  ?client          : Tls_io.client
       -> ?config          : Config.t
       -> Lwt_unix.file_descr
+      -> t Lwt.t
+
+    val request
+      :  t
       -> Request.t
       -> error_handler    : Client_connection.error_handler
       -> response_handler : Client_connection.response_handler
-      -> [`write] Body.t Lwt.t
+      -> [`write] Body.t
+
+    val shutdown : t -> unit
+
+    val is_closed : t -> bool
   end
 
   module SSL : sig
-    val request
+    type t
+
+    val create_connection
       :  ?client          : Ssl_io.client
       -> ?config          : Config.t
       -> Lwt_unix.file_descr
+      -> t Lwt.t
+
+    val request
+      :  t
       -> Request.t
       -> error_handler    : Client_connection.error_handler
       -> response_handler : Client_connection.response_handler
-      -> [`write] Body.t Lwt.t
+      -> [`write] Body.t
+
+    val shutdown : t -> unit
+
+    val is_closed : t -> bool
   end
 end
diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml
index 75ee1b0..841fe92 100644
--- a/lwt/httpaf_lwt.ml
+++ b/lwt/httpaf_lwt.ml
@@ -199,10 +199,13 @@ end
 
 
 module Client (Io: IO) = struct
-  let request ?(config=Config.default) socket request ~error_handler ~response_handler =
-    let module Client_connection = Httpaf.Client_connection in
-    let request_body, connection =
-      Client_connection.request ~config request ~error_handler ~response_handler in
+  module Client_connection = Httpaf.Client_connection
+
+  type t = Client_connection.t
+
+  let create_connection ?(config=Config.default) socket =
+    let connection =
+      Client_connection.create ~config in
 
 
     let read_buffer = Buffer.create config.read_buffer_size in
@@ -276,5 +279,11 @@ module Client (Io: IO) = struct
       Lwt.join [read_loop_exited; write_loop_exited] >>= fun () ->
       Io.close socket);
 
-    request_body
+    Lwt.return connection
+
+  let request = Client_connection.request
+
+  let shutdown = Client_connection.shutdown
+
+  let is_closed = Client_connection.is_closed
 end
diff --git a/lwt/httpaf_lwt.mli b/lwt/httpaf_lwt.mli
index 7ad52e7..2fe00b9 100644
--- a/lwt/httpaf_lwt.mli
+++ b/lwt/httpaf_lwt.mli
@@ -74,11 +74,21 @@ end
 
 (* For an example, see [examples/lwt_get.ml]. *)
 module Client (Io: IO) : sig
-  val request
-    :  ?config          : Httpaf.Config.t
+  type t
+
+  val create_connection
+    : ?config          : Config.t
     -> Io.socket
+    -> t Lwt.t
+
+  val request
+    :  t
     -> Request.t
     -> error_handler    : Client_connection.error_handler
     -> response_handler : Client_connection.response_handler
-    -> [`write] Httpaf.Body.t
+    -> [`write] Body.t
+
+  val shutdown: t -> unit
+
+  val is_closed : t -> bool
 end
diff --git a/mirage/httpaf_mirage.mli b/mirage/httpaf_mirage.mli
index e038d6d..70cdfaa 100644
--- a/mirage/httpaf_mirage.mli
+++ b/mirage/httpaf_mirage.mli
@@ -32,13 +32,15 @@
     POSSIBILITY OF SUCH DAMAGE.
   ----------------------------------------------------------------------------*)
 
+open Httpaf
+
 module type Server_intf = sig
   type flow
 
   val create_connection_handler
-    :  ?config : Httpaf.Config.t
-    -> request_handler : Httpaf.Server_connection.request_handler
-    -> error_handler : Httpaf.Server_connection.error_handler
+    :  ?config : Config.t
+    -> request_handler : Server_connection.request_handler
+    -> error_handler : Server_connection.error_handler
     -> (flow -> unit Lwt.t)
 end
 
@@ -56,11 +58,21 @@ module Server_with_conduit : sig
 end
 
 module Client (Flow : Mirage_flow_lwt.S) : sig
-  val request
-    :  ?config : Httpaf.Config.t
+  type t
+
+  val create_connection
+    : ?config          : Config.t
     -> Flow.flow
-    -> Httpaf.Request.t
-    -> error_handler : Httpaf.Client_connection.error_handler
-    -> response_handler : Httpaf.Client_connection.response_handler
-      -> [`write] Httpaf.Body.t
+    -> t Lwt.t
+
+  val request
+    :  t
+    -> Request.t
+    -> error_handler    : Client_connection.error_handler
+    -> response_handler : Client_connection.response_handler
+    -> [`write] Body.t
+
+  val shutdown : t -> unit
+
+  val is_closed : t -> bool
 end