Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download streams #41

Merged
merged 8 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 72 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ the [API doc](http://hmil.github.io/RosHTTP/docs/index.html) too.
<!--- test: "Main example" -->
```scala
import fr.hmil.roshttp.HttpRequest
import scala.concurrent.ExecutionContext.Implicits.global
import monifu.concurrent.Implicits.globalScheduler

/* ... */

Expand All @@ -36,26 +36,31 @@ val request = HttpRequest("https://schema.org/WebPage")
request.send().map(response => println(response.body))
```

When you `send()` a request, you get a `Future[HttpResponse]` which resolves to
an HttpResponse if everything went fine or fails with an HttpException if a
network error occurred or if a statusCode > 400 was received.
When you `send()` a request, you get a `Future` which resolves to
a SimpleHttpResponse if everything went fine or fails with a HttpNetworkException if a
network error occurred or with a HttpResponseException if a statusCode > 400 was received.
When applicable, the response body of a failed request can be read:

<!--- test: "Error handling" -->
```scala
HttpRequest("http://hmil.github.io/foobar")
.send()
.onFailure {
case e:HttpResponseError =>
s"Got a status: ${e.response.statusCode}" ==> "Got a status: 404"
// An HttpResponseException always provides a response
case e:HttpResponseException =>
"Got a status: ${e.response.statusCode}"
// An HttpTimeoutException may provide a partial response which contains
// response headers as well as any piece of body received before the timeout.
case HttpTimeoutException(partialResponse: Some[SimpleHttpResponse]) =>
s"Body received before timeout: ${partialResponse.get.body}"
}
```


## Configuring requests

Every aspect of a request can be customized using `.withXXX` methods. These are
meant to be chained, they do not modify the original request.
Requests can be built using `.withXXX` methods. These are meant to be chained,
they do not modify the original request.

### URI

Expand Down Expand Up @@ -110,7 +115,14 @@ request
*/
```

### Request headers
### HTTP Method

```scala
// Set the request method to GET, POST, PUT, etc...
request.withMethod(Method.PUT).send()
```

### Headers

Set individual headers using `.withHeader`
```scala
Expand All @@ -124,16 +136,31 @@ request.withHeaders(
)
```

### Response headers
### Backend configuration

Some low-level configuration settings are available in [BackendConfig](http://hmil.github.io/RosHTTP/docs/index.html#fr.hmil.roshttp.BackendConfig).
Each request can use a specific backend configuration using `.withBackendConfig`.

example:
```scala
HttpRequest(url)
.withBackendConfig(BackendConfig(
// Uses stream chunks of at most 1024 bytes
maxChunkSize = 1024
))
.stream()
```

## Response headers

A map of response headers is available on the [[HttpResponse]] object:
A map of response headers is available on the `HttpResponse` object:
```scala
request.send().map({res =>
println(res.headers("Set-Cookie"))
})
```

### Sending data
## Sending data

An HTTP request can send data wrapped in an implementation of `BodyPart`. The most common
formats are already provided but you can create your own as well.
Expand Down Expand Up @@ -163,7 +190,7 @@ val data = JSONObject(
request.post(data)
```

#### File upload
### File upload

To send file data you must turn a file into a ByteBuffer and then send it in a
StreamBody. For instance, on the jvm you could do:
Expand All @@ -176,7 +203,7 @@ request.post(ByteBuffer.wrap(bytes))
Note that the codec argument is important to read the file as-is and avoid side-effects
due to character interpretation.

#### Multipart
### Multipart

Use the `MultiPartBody` to compose request bodies arbitrarily. It allows for instance
to send binary data with some textual data.
Expand All @@ -203,39 +230,62 @@ request.post(MultiPartBody(
))
```

### HTTP Method
## Streaming

### Download streams

Streaming a response is as simple as calling `.stream()` instead of `.send()`.
`HttpRequest#stream()` returns a Future of `StreamHttpResponse`. A `StreamHttpResponse`
is just like a `SimpleHttpResponse` except that its `body` property is an
[Observable](https://monix.io/api/1.2/#monifu.reactive.Observable).
The observable will spit out a stream of `ByteBuffer`s as shown in this example:

```scala
// Set the request method to GET, POST, PUT, etc...
request.withMethod(Method.PUT).send()
import fr.hmil.roshttp.util.Utils._
HttpRequest(s"$SERVER_URL")
.stream()
.map({ r =>
r.body.foreach(buffer => println(getStringFromBuffer(buffer, "UTF-8")))
})
```
_Note that special care should be taken when converting chunks into strings because
multibyte characters may span multiple chunks._
_In general streaming is used for binary data and any reasonable quantity
of text can safely be handled by the non-streaming API_

---

Watch the [issues](https://github.com/hmil/RosHTTP/issues)
for upcoming features. Feedback is very welcome so feel free to file an issue if you
see something that is missing.

## Known limitations
# Known limitations

- Response streaming is emulated in the browser, meaning that streaming large responses in the
browser will consume large amounts of memory and might fail.
This [problem has a solution](https://github.com/hmil/RosHTTP/issues/46)
- `bodyCollectTimeout` is ignored on Chrome.
- Some headers cannot be set in the browser ([list](https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name)).
- There is no way to avoid redirects in the browser. This is a W3C spec.
- Chrome does not allow userspace handling of a 407 status code. It is treated
like a network error. See [chromium issue](https://bugs.chromium.org/p/chromium/issues/detail?id=372136).
- The `TRACE` HTTP method does not work in browsers and `PATCH` does not work in the JVM.

## Contributing
# Contributing

Please read the [contributing guide](https://github.com/hmil/RosHTTP/blob/master/CONTRIBUTING.md).

## Changelog
# Changelog

**v2.0.0**

- Timeout errors on body
- Rename *Error classes to *Exception
- Add streaming API
- Add implicit Scheduler parameter
- Add implicit execution context parameter

**v1.1.0**

- Fix bug on responses without Content-Type header
- Detect key-value pairs during query string escapement

Expand All @@ -262,6 +312,6 @@ Please read the [contributing guide](https://github.com/hmil/RosHTTP/blob/master
**v0.1.0**
- First release

## License
# License

MIT
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ lazy val scalaHttp = crossProject.in(file("."))
pomIncludeRepository := { _ => false },

libraryDependencies += "com.lihaoyi" %%% "utest" % "0.4.3",
libraryDependencies += "org.monifu" %%% "monifu" % "1.2",

testFrameworks += new TestFramework("utest.runner.Framework")
)
Expand Down
66 changes: 51 additions & 15 deletions js/src/main/scala/fr/hmil/roshttp/BrowserDriver.scala
Original file line number Diff line number Diff line change
@@ -1,51 +1,87 @@
package fr.hmil.roshttp

import java.nio.ByteBuffer

import fr.hmil.roshttp.ByteBufferChopper.Finite
import fr.hmil.roshttp.exceptions.{HttpNetworkException, HttpResponseException}
import fr.hmil.roshttp.response.{HttpResponse, HttpResponseFactory}
import fr.hmil.roshttp.util.HeaderMap
import monifu.concurrent.Scheduler
import org.scalajs.dom
import org.scalajs.dom.ext.Ajax
import org.scalajs.dom.raw.ErrorEvent

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.{Future, Promise}
import scala.scalajs.js.JSConverters._
import scala.scalajs.js.JavaScriptException
import scala.scalajs.js.typedarray.{ArrayBuffer, TypedArrayBuffer}

private object BrowserDriver extends DriverTrait {

def send(req: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = {
val p: Promise[HttpResponse] = Promise[HttpResponse]()
override def send[T <: HttpResponse](req: HttpRequest, factory: HttpResponseFactory[T])
(implicit scheduler: Scheduler): Future[T] = {
val p: Promise[T] = Promise[T]()

val xhr = new dom.XMLHttpRequest()
xhr.open(req.method.toString, req.url)
xhr.responseType = "arraybuffer"
req.headers.foreach(t => xhr.setRequestHeader(t._1, t._2))

xhr.onerror = { (e: ErrorEvent) =>
p.failure(new HttpNetworkError(new JavaScriptException(e)))
p.failure(new HttpNetworkException(new JavaScriptException(e)))
}

val bufferQueue = new ByteBufferQueue()

xhr.onreadystatechange = { (e: dom.Event) =>
if (xhr.readyState == dom.XMLHttpRequest.DONE) {
if (xhr.readyState == dom.XMLHttpRequest.HEADERS_RECEIVED) {
val headers = xhr.getAllResponseHeaders() match {
case null => Map[String, String]()
case s: String => s.split("\r\n").map({ s =>
val split = s.split(": ")
(split.head, split.tail.mkString.trim)
}).toMap[String, String]
}
val charset = HttpUtils.charsetFromContentType(headers.getOrElse("content-type", null))
val response = new HttpResponse(
xhr.status,
TypedArrayBuffer.wrap(xhr.response.asInstanceOf[ArrayBuffer]),
HeaderMap(headers))
if (xhr.status >= 400) {
p.failure(HttpResponseError.badStatus(response))
} else {
p.success(response)
}

p.completeWith(
factory(
xhr.status,
HeaderMap(headers),
bufferQueue.observable,
req.backendConfig)
.map({response =>
if (xhr.status >= 400) {
throw HttpResponseException.badStatus(response)
} else {
response
}
})
)
} else if (xhr.readyState == dom.XMLHttpRequest.DONE) {
bufferQueue.push(chopChunk())
bufferQueue.end()
}
}

def chopChunk(): Seq[ByteBuffer] = {
val buffer = xhr.response.asInstanceOf[ArrayBuffer]
val buffers = ByteBufferChopper.chop(
new FiniteArrayBuffer(buffer),
req.backendConfig.maxChunkSize,
readChunk)
buffers
}

def readChunk(buffer: FiniteArrayBuffer, start: Int, length: Int): ByteBuffer = {
TypedArrayBuffer.wrap(buffer.buffer, start, length)
}

xhr.send(req.body.map(b => Ajax.InputData.byteBuffer2ajax(b.content)).orUndefined)

p.future
}

private class FiniteArrayBuffer(val buffer: ArrayBuffer) extends Finite {
override def length: Int = buffer.byteLength
}
}
25 changes: 25 additions & 0 deletions js/src/main/scala/fr/hmil/roshttp/ByteBufferChopper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package fr.hmil.roshttp

import java.nio.ByteBuffer


object ByteBufferChopper {

def chop[T <: Finite](buffer: T, maxChunkSize: Int, read: (T, Int, Int) => ByteBuffer): Seq[ByteBuffer] = {
val nb_buffers = (buffer.length + maxChunkSize - 1) / maxChunkSize
val buffers = new Array[ByteBuffer](nb_buffers)
var currentPosition = 0
var i = 0
while (i < nb_buffers) {
val length = Math.min(maxChunkSize, buffer.length - currentPosition)
buffers(i) = read(buffer, currentPosition, length)
currentPosition += length
i = i + 1
}
buffers
}

trait Finite {
def length: Int
}
}
63 changes: 63 additions & 0 deletions js/src/main/scala/fr/hmil/roshttp/ByteBufferQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package fr.hmil.roshttp

import java.nio.ByteBuffer

import monifu.reactive.Ack.{Cancel, Continue}
import monifu.reactive.{Ack, Observable, Subscriber}

import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

class ByteBufferQueue(implicit ec: ExecutionContext) {
var subscriber: Option[Subscriber[ByteBuffer]] = None
val bufferQueue = mutable.Queue[ByteBuffer]()
var hasEnd = false

def propagate(): Unit = subscriber.foreach(_.onNext(bufferQueue.dequeue()).onComplete(handleAck))

def handleAck(ack: Try[Ack]): Unit = ack match {
case Success(Cancel) =>
subscriber = None
case Success(Continue) =>
if (bufferQueue.nonEmpty) {
propagate()
} else if (hasEnd) {
stop()
}
case Failure(ex) =>
subscriber = None
subscriber.foreach(_.onError(ex))
}

def push(byteBuffer: Seq[ByteBuffer]): Unit = {
bufferQueue.enqueue(byteBuffer:_*)
if (bufferQueue.nonEmpty) {
subscriber.foreach(_ => propagate())
}
}

def end(): Unit = {
hasEnd = true
if (bufferQueue.isEmpty) {
stop()
}
}

val observable = new Observable[ByteBuffer]() {
override def onSubscribe(sub: Subscriber[ByteBuffer]): Unit = {
if (subscriber.isDefined) {
throw new IllegalStateException("A subscriber is already defined")
}
subscriber = Some(sub)
if (bufferQueue.nonEmpty) {
propagate()
}
}
}

private def stop(): Unit = {
subscriber.foreach(_.onComplete())
subscriber = None
}
}
Loading