Skip to content
This repository has been archived by the owner on Nov 15, 2022. It is now read-only.

Commit

Permalink
Http.fs dependency replaced with self written Http module
Browse files Browse the repository at this point in the history
  • Loading branch information
xkrt committed Aug 14, 2015
1 parent 6112cca commit 35bba06
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 67 deletions.
4 changes: 2 additions & 2 deletions src/InfluxDB.FSharp.IntegrationTests/Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ let shouldNotFailA achoice = achoice |> notFailA |> ignore
let get = function Ok x -> x | Fail e -> failwithf "Unexpected Fail %+A" e

let machine = Environment.MachineName.ToLower()
let fiddler = { Address = "localhost"; Port = 8888us; Credentials = None }
let fiddler = { Address = "localhost"; Port = 8888us; Credentials = ProxyCredentials.No }
let fmtTimestamp (value: DateTime) = value.ToString("yyyy-MM-ddTHH:mm:ssZ")

[<TestFixtureSetUp>]
Expand Down Expand Up @@ -246,7 +246,7 @@ let ``query have wrong syntax => error in response`` () =
let result = client.Query("nevermind", "SELECT wrong wrong wrong") |> Async.RunSynchronously

match result with
| Fail (HttpError (HttpStatusCode.BadRequest, Some msg)) -> msg =? "error parsing query: found wrong, expected FROM at line 1, char 14"
| Fail (HttpError (BadStatusCode (HttpStatusCode.BadRequest, Some msg))) -> msg =? "error parsing query: found wrong, expected FROM at line 1, char 14"
| x -> failwithf "Unexpected result: %+A" x

[<Test>]
Expand Down
84 changes: 33 additions & 51 deletions src/InfluxDB.FSharp/Client.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

open System
open System.Net
open HttpClient
open InfluxDB.FSharp.Choice
open InfluxDB.FSharp.AsyncChoice
open InfluxDB.FSharp.Http

module internal Contracts =
open System.Collections.Generic
Expand Down Expand Up @@ -63,100 +63,84 @@ module internal Contracts =
type Client (host: string, ?port: uint16, ?credentials: Credentials, ?proxy: InfluxDB.FSharp.Proxy) =
let port = defaultArg port 8086us
let baseUri = Uri(sprintf "http://%s:%d" host port)
let url (path: string) = Uri(baseUri, path).AbsoluteUri
let uri (path: string) = Uri(baseUri, path)

let createRequest =
match proxy with
| Some proxy ->
let httpfsProxy: HttpClient.Proxy =
{ Address = proxy.Address
Port = int proxy.Port
Credentials = match proxy.Credentials with
| Some creds -> ProxyCredentials.Custom { username = creds.Username; password = creds.Password }
| None -> ProxyCredentials.None }
fun action url -> createRequest action url |> withProxy httpfsProxy
| None -> createRequest

let withQueryStringItems items request =
items |> List.fold (swap withQueryStringItem) request
| Some proxy -> fun ``method`` url -> Http.createRequest ``method`` url |> Http.withProxy proxy
| None -> Http.createRequest

let buildError (response: Response) =
let code = enum response.StatusCode
let msg = match response.EntityBody with
let msg = match response.Body with
| Some body ->
match Contracts.deserialize<Contracts.Response> body with
| Ok resp -> Some resp.Error
| Fail _ -> Some body
| None -> None
Fail (HttpError (code, msg))
Fail (BadStatusCode (response.StatusCode, msg))

let query db qstr mapOk = asyncChoice {
let withDb =
match db with
| Some db -> withQueryStringItem { name="db"; value=db }
| Some db -> withQueryStringItem "db" db
| None -> id

let! response =
createRequest Get (url "query")
createRequest Get (uri "query")
|> withDb
|> withQueryStringItem { name="q"; value=qstr }
|> getResponseAsync
|> Async.Catch
<!!> TransportError
|> withQueryStringItem "q" qstr
|> getResponse
<!!> HttpError

return!
match enum response.StatusCode with
match response.StatusCode with
| HttpStatusCode.OK -> mapOk response
| _ -> buildError response
| _ -> buildError response <!> HttpError
}

let write query body = asyncChoice {
let! response =
createRequest Post (url "write")
createRequest (Post body) (uri "write")
|> withQueryStringItems query
|> withBody body
|> getResponseAsync
|> Async.Catch
<!!> TransportError
|> getResponse
<!!> HttpError

return!
match enum response.StatusCode with
match response.StatusCode with
| HttpStatusCode.NoContent -> Ok ()
| _ -> buildError response
| _ -> buildError response <!> HttpError
}

let ping () = asyncChoice {
let sw = Diagnostics.Stopwatch()
do sw.Start()

let! response =
createRequest Get (url "ping")
|> getResponseAsync
|> Async.Catch
<!!> TransportError
createRequest Get (uri "ping")
|> getResponse
<!!> HttpError
do sw.Stop()

let! version =
response.Headers
|> Map.tryFind (NonStandard "X-Influxdb-Version")
|> Map.tryFind "X-Influxdb-Version"
|> function
| Some version -> Ok version
| None -> Fail (OtherError "No version header in response")
| None -> Fail (ResponseParseError "No version header in response")

return sw.Elapsed, version
}

let showDbs () =
query None "SHOW DATABASES" <| fun resp ->
choice {
let! json = resp.EntityBody |> Choice.ofOption |> Choice.mapFail (fun _ -> OtherError "Response doesnt contain body")
let! response = Contracts.deserialize<Contracts.Response> json <!~> ResponseParseError
let! json = resp.Body |> Choice.ofOption <!~> ResponseParseError "Response doesnt contain body"
let! response = Contracts.deserialize<Contracts.Response> json <!~> ResponseParseError "Cant parse response contract"
return!
response.Results
|> Seq.trySingle
|> Option.bind (fun r -> Seq.trySingle r.Series)
|> Choice.ofOption
|> Choice.mapFail (konst ResponseParseError)
<!~> ResponseParseError "TODO"
|> function
| Ok series ->
match Option.ofNull series.Values with
Expand All @@ -169,9 +153,9 @@ type Client (host: string, ?port: uint16, ?credentials: Credentials, ?proxy: Inf
// todo: refact "<!~> ResponseParseError" somehow
let checkForError resp =
choice {
let! json = resp.EntityBody |> Choice.ofOption <!~> ResponseParseError
let! resp = Contracts.deserialize<Contracts.Response> json <!~> ResponseParseError
let! result = resp.Results |> Seq.trySingleC <!~> ResponseParseError
let! json = resp.Body |> Choice.ofOption <!~> ResponseParseError "TODO"
let! resp = Contracts.deserialize<Contracts.Response> json <!~> ResponseParseError "TODO"
let! result = resp.Results |> Seq.trySingleC <!~> ResponseParseError "TODO"
return!
match result.Error with
| null -> Ok ()
Expand Down Expand Up @@ -200,8 +184,7 @@ type Client (host: string, ?port: uint16, ?credentials: Credentials, ?proxy: Inf
let doWrite db (point: Point.T) precision =
let line = Point.toLine precision point
let precision = toStr precision
let query = [ { name="db"; value=db }
{ name="precision"; value=precision } ]
let query = [ "db", db; "precision", precision ]
write query line

let doWriteMany db (points: Point.T[]) precision =
Expand All @@ -210,15 +193,14 @@ type Client (host: string, ?port: uint16, ?credentials: Credentials, ?proxy: Inf
|> Array.map (Point.toLine precision)
|> String.concat "\n"
let precision = toStr precision
let query = [ { name="db"; value=db }
{ name="precision"; value=precision } ]
let query = [ "db", db; "precision", precision ]
write query lines

let doQuery db querystr =
query (Some db) querystr <| fun (resp: Response) ->
choice {
let! body = Choice.ofOption resp.EntityBody <!~> ResponseParseError
let! qresp = Contracts.deserialize<Contracts.Response> body <!~> ResponseParseError
let! body = Choice.ofOption resp.Body <!~> ResponseParseError "TODO"
let! qresp = Contracts.deserialize<Contracts.Response> body <!~> ResponseParseError "TODO"
let response =
match Option.ofNull qresp.Error with
| Some errormsg -> Fail (ServerError errormsg)
Expand Down
126 changes: 126 additions & 0 deletions src/InfluxDB.FSharp/Http.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
module InfluxDB.FSharp.Http

open System
open System.Net
open InfluxDB.FSharp.AsyncChoice

do if ServicePointManager.Expect100Continue then ServicePointManager.Expect100Continue <- false

type HttpMethod =
| Get
| Post of body: string

type Request =
{ Uri : Uri
Method : HttpMethod
Query : Map<string,string>
Timeout : int<msec>
Proxy : Proxy option }

type Response =
{ StatusCode : HttpStatusCode
Headers : Map<string,string>
Body : string option }

let createRequest ``method`` (uri : Uri) =
{ Uri = uri
Method = ``method``
Query = Map.empty
Timeout = 100000<msec> // https://msdn.microsoft.com/en-us/library/system.net.httpwebrequest.timeout%28v=vs.110%29.aspx
Proxy = None }

let withQueryStringItem name value request =
{ request with Query = request.Query.Add(name,value) }

let withQueryStringItems items request =
items |> List.fold (fun req (n, v) -> withQueryStringItem n v req) request

let withProxy proxy request =
{ request with Proxy = Some proxy }

let private writeBody (body: string) (webRequest: HttpWebRequest) = asyncChoice {
use! reqStream = webRequest.GetRequestStreamAsync() |> Async.AwaitTask |> Async.Catch
let bytes = Text.Encoding.UTF8.GetBytes(body)
return!
reqStream.WriteAsync(bytes, 0, bytes.Length)
|> Async.AwaitTaskVoid
|> Async.Catch
}

let private toWebRequest request =
let uri =
let b = UriBuilder request.Uri
b.Query <-
request.Query
|> Map.toSeq
|> Seq.map (fun (k,v) -> sprintf "%s=%s" (Uri.EscapeDataString k) (Uri.EscapeDataString v))
|> String.concat "&"
b.Uri

let webRequest = HttpWebRequest.Create(uri) :?> HttpWebRequest
webRequest.AllowAutoRedirect <- true
webRequest.Timeout <- int request.Timeout

request.Proxy |> Option.iter (fun proxy ->
let webProxy = WebProxy(proxy.Address, int proxy.Port)
match proxy.Credentials with
| ProxyCredentials.No -> webProxy.Credentials <- null
| ProxyCredentials.Default -> webProxy.UseDefaultCredentials <- true
| ProxyCredentials.Custom { Username = name; Password = pwd } ->
webProxy.Credentials <- NetworkCredential(name, pwd)
webRequest.Proxy <- webProxy)

match request.Method with
| Get ->
webRequest.Method <- "GET"
asyncChoice.Return webRequest
| Post body ->
webRequest.Method <- "POST"
writeBody body webRequest
|> AsyncChoice.map (fun () -> webRequest)
|> AsyncChoice.mapFail HttpError.otherErrorExn

let private getResponseNoException (webRequest: HttpWebRequest) = async {
let! webResponse = webRequest.AsyncGetResponse() |> Async.Catch
return
match webResponse with
| Ok webResponse -> Ok (webResponse :?> HttpWebResponse)
| Fail e ->
match e with
| :? WebException as wex ->
if isNull wex.Response then
Fail (OtherError (e.Message, Some e))
else
Ok (wex.Response :?> HttpWebResponse)
| e -> Fail (OtherError (e.Message, Some e))
}

let private getHeaders (response: HttpWebResponse) =
response.Headers.Keys
|> Seq.cast<string>
|> Seq.map (fun key -> key, response.Headers.Item(key))
|> Map.ofSeq

let private readResponseBody (webResponse: HttpWebResponse) = asyncChoice {
if webResponse.ContentLength > 0L then
use stream = webResponse.GetResponseStream()
use sr = new IO.StreamReader(stream)
let! body =
sr.ReadToEndAsync()
|> Async.AwaitTask
|> Async.Catch
<!!> HttpError.otherErrorExn
return Some body
else
return None
}

let getResponse request : Async<Choice<Response,HttpError>> = asyncChoice {
let! webRequest = toWebRequest request
let! webResponse = getResponseNoException webRequest
let! responseBody = readResponseBody webResponse
return
{ StatusCode = webResponse.StatusCode
Headers = getHeaders webResponse
Body = responseBody }
}
6 changes: 1 addition & 5 deletions src/InfluxDB.FSharp/InfluxDB.FSharp.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,18 @@
</Choose>
<Import Project="$(FSharpTargetsPath)" />
<ItemGroup>
<Content Include="packages.config" />
<Content Include="app.config" />
<Compile Include="InternalsVisibleTo.fs" />
<Compile Include="Prelude.fs" />
<Compile Include="AsyncChoiceBuilder.fs" />
<Compile Include="ChoiceBuilder.fs" />
<Compile Include="Types.fs" />
<Compile Include="Http.fs" />
<Compile Include="Point.fsi" />
<Compile Include="Point.fs" />
<Compile Include="Client.fs" />
</ItemGroup>
<ItemGroup>
<Reference Include="HttpClient">
<HintPath>..\packages\Http.fs.1.5.1\lib\net40\HttpClient.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="mscorlib" />
<Reference Include="FSharp.Core, Version=$(TargetFSharpCoreVersion), Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a">
<Private>True</Private>
Expand Down
11 changes: 11 additions & 0 deletions src/InfluxDB.FSharp/Prelude.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module internal Prelude =

let invCulture = System.Globalization.CultureInfo.InvariantCulture

[<Measure>] type msec


module internal Array =
let emptyIfNull array =
Expand Down Expand Up @@ -135,13 +137,22 @@ module internal Option =


module internal Async =
open System.Threading.Tasks

let map (mapping : 'T -> 'U) (value : Async<'T>) : Async<'U> = async {
let! x = value
return mapping x
}

/// Await void Task, rethrow exception if it occurs
let AwaitTaskVoid (task: Task) =
task.ContinueWith<unit> (fun t -> if t.IsFaulted then raise t.Exception) |> Async.AwaitTask


module internal AsyncChoice =
let inline map (fn: 'a -> 'b) (value: Async<Choice<'a, 'c>>) : Async<Choice<'b, 'c>> =
value |> Async.map (Choice.map fn)

let inline mapFail (fn: 'b -> 'c) (value: Async<Choice<'a, 'b>>) : Async<Choice<'a, 'c>> =
value |> Async.map (Choice.mapFail fn)

Expand Down
Loading

0 comments on commit 35bba06

Please sign in to comment.