Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download streams #41

Merged
merged 8 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ Please read the [contributing guide](https://github.com/hmil/RosHTTP/blob/master

**v2.0.0**

- Add streaming API
- Add implicit execution context parameter
- Fix bug on responses without Content-Type header

Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ lazy val scalaHttp = crossProject.in(file("."))
pomIncludeRepository := { _ => false },

libraryDependencies += "com.lihaoyi" %%% "utest" % "0.4.3",
libraryDependencies += "org.monifu" %%% "monifu" % "1.2",

testFrameworks += new TestFramework("utest.runner.Framework")
)
Expand Down
44 changes: 33 additions & 11 deletions js/src/main/scala/fr/hmil/roshttp/BrowserDriver.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package fr.hmil.roshttp

import java.nio.ByteBuffer

import monifu.reactive.Ack.{Cancel, Continue}
import monifu.reactive.Observable
import org.scalajs.dom
import org.scalajs.dom.ext.Ajax
import org.scalajs.dom.raw.ErrorEvent
Expand All @@ -8,11 +12,13 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.scalajs.js.JSConverters._
import scala.scalajs.js.JavaScriptException
import scala.scalajs.js.typedarray.{ArrayBuffer, TypedArrayBuffer}
import scala.util.{Failure, Success}

private object BrowserDriver extends DriverTrait {

def send(req: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = {
val p: Promise[HttpResponse] = Promise[HttpResponse]()
def send[T <: HttpResponse](req: HttpRequest, factory: HttpResponseFactory[T])(implicit ec: ExecutionContext):
Future[T] = {
val p: Promise[T] = Promise[T]()

val xhr = new dom.XMLHttpRequest()
xhr.open(req.method.toString, req.url)
Expand All @@ -32,15 +38,31 @@ private object BrowserDriver extends DriverTrait {
}).toMap[String, String]
}
val charset = HttpUtils.charsetFromContentType(headers.getOrElse("content-type", null))
val response = new HttpResponse(
xhr.status,
TypedArrayBuffer.wrap(xhr.response.asInstanceOf[ArrayBuffer]),
HeaderMap(headers))
if (xhr.status >= 400) {
p.failure(HttpResponseError.badStatus(response))
} else {
p.success(response)
}
val responseBytes = TypedArrayBuffer.wrap(xhr.response.asInstanceOf[ArrayBuffer])

factory(
xhr.status,
Observable.create[ByteBuffer] { sub =>
implicit val s = sub.scheduler
// note we must apply back-pressure
// when calling `onNext`
sub.onNext(responseBytes).onComplete {
case Success(Cancel) =>
() // do nothing
case Success(Continue) =>
sub.onComplete()
case Failure(ex) =>
sub.onError(ex)
}
},
HeaderMap(headers))
.foreach({response =>
if (xhr.status >= 400) {
p.failure(HttpResponseError.badStatus(response))
} else {
p.success(response)
}
})
}
}

Expand Down
5 changes: 3 additions & 2 deletions js/src/main/scala/fr/hmil/roshttp/HttpDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ private object HttpDriver extends DriverTrait {

private var _driver: Option[DriverTrait] = None

def send(req: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = {
_driver.getOrElse(chooseBackend()).send(req)
def send[T <: HttpResponse](req: HttpRequest, factory: HttpResponseFactory[T])(implicit ec: ExecutionContext):
Future[T] = {
_driver.getOrElse(chooseBackend()).send(req, factory)
}

private def chooseBackend(): DriverTrait = {
Expand Down
114 changes: 59 additions & 55 deletions js/src/main/scala/fr/hmil/roshttp/NodeDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,18 @@ import java.nio.ByteBuffer
import fr.hmil.roshttp.node.Modules.{http, https}
import fr.hmil.roshttp.node.buffer.Buffer
import fr.hmil.roshttp.node.http.{IncomingMessage, RequestOptions}
import monifu.reactive.Ack.{Cancel, Continue}
import monifu.reactive.{Observable, Subscriber}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.scalajs.js
import scala.scalajs.js.JSConverters._
import scala.util.{Failure, Success}

private object NodeDriver extends DriverTrait {

// Accumulates chunks received by the request and turns them into a ByteBuffer
private class BufferAccumulator {
private var acc = List[Array[Byte]]()

def append(buf: Buffer): Unit = {
val length = buf.length
var i = 0
val chunk = new Array[Byte](length)
while(i < length) {
chunk(i) = buf.readInt8(i).toByte
i += 1
}
acc ::= chunk
}

def collect(): ByteBuffer = {
val length = acc.foldRight(0)((chunk, l) => l + chunk.length)
val buffer = ByteBuffer.allocate(length)
acc.foreach(chunk => {
var i = 0
while (i < chunk.length) {
buffer.put(chunk(i))
i += 1
}
})
buffer
}
}

def makeRequest(req: HttpRequest, p: Promise[HttpResponse]): Unit = {
def makeRequest[T <: HttpResponse](req: HttpRequest, factory: HttpResponseFactory[T], p: Promise[T])
(implicit ec: ExecutionContext): Unit = {
val module = {
if (req.protocol == Protocol.HTTP)
http
Expand All @@ -56,36 +31,52 @@ private object NodeDriver extends DriverTrait {
headers = js.Dictionary(req.headers.toSeq: _*),
path = req.longPath
), (message: IncomingMessage) => {

val charset = HttpUtils.charsetFromContentType(message.headers.get("content-type").orNull)

if (message.statusCode >= 300 && message.statusCode < 400 && message.headers.contains("location")) {
makeRequest(req.withURL(message.headers("location")), p)
makeRequest(req.withURL(message.headers("location")), factory, p)
} else {
val body = new BufferAccumulator()

message.on("data", { (s: js.Dynamic) =>
val buf = s.asInstanceOf[Buffer]
body.append(buf)
()
var subscribers = Set[Subscriber[ByteBuffer]]()

message.on("data", { (nodeBuffer: js.Dynamic) =>
val byteBuffer = byteBufferFromNodeBuffer(nodeBuffer)

// Send data to subscribers
subscribers.foreach(sub => sub.onNext(byteBuffer)
// And interprete their response
.onComplete {
case Success(Cancel) =>
subscribers -= sub
case Success(Continue) =>
()
case Failure(ex) =>
subscribers -= sub
sub.onError(ex)
})
})

val headers = message.headers.toMap[String, String]

message.on("end", { (s: js.Dynamic) =>
val headers = message.headers.toMap[String, String]
subscribers.foreach(_.onComplete())
subscribers = Set() // Clear the references for GC
})

val charset = HttpUtils.charsetFromContentType(headers.getOrElse("content-type", null))
val response = new HttpResponse(
val bufferStream = new Observable[ByteBuffer] {
override def onSubscribe(subscriber: Subscriber[ByteBuffer]): Unit = {
subscribers += subscriber
}
}

p.completeWith(factory(
message.statusCode,
body.collect(),
bufferStream,
HeaderMap(headers))

if (message.statusCode < 400) {
p.success(response)
} else {
p.failure(HttpResponseError.badStatus(response))
}
()
})
.map({ response =>
if (message.statusCode < 400) {
response
} else {
throw HttpResponseError.badStatus(response)
}
}))
}
()
})
Expand All @@ -102,12 +93,25 @@ private object NodeDriver extends DriverTrait {
nodeRequest.end()
}

def send(req: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = {
val p: Promise[HttpResponse] = Promise[HttpResponse]()
def send[T <: HttpResponse](req: HttpRequest, factory: HttpResponseFactory[T])(implicit ec: ExecutionContext):
Future[T] = {
val p: Promise[T] = Promise[T]()

makeRequest(req, p)
makeRequest(req, factory, p)

p.future
}

private def byteBufferFromNodeBuffer(nodeBuffer: js.Any): ByteBuffer = {
val buf = nodeBuffer.asInstanceOf[Buffer]
val byteBuffer = ByteBuffer.allocate(buf.length)
var i = 0
while (i < buf.length) {
byteBuffer.put(buf.readInt8(i).toByte)
i += 1
}
byteBuffer.rewind()
byteBuffer
}

}
71 changes: 52 additions & 19 deletions jvm/src/main/scala/fr/hmil/roshttp/HttpDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@ package fr.hmil.roshttp
import java.net.{HttpURLConnection, URL}
import java.nio.ByteBuffer

import fr.hmil.roshttp.tools.io.IO
import monifu.reactive.Observable

import scala.concurrent.ExecutionContext
import scala.concurrent.{Future, blocking}

import scala.concurrent.{ExecutionContext, Future, blocking}

private object HttpDriver extends DriverTrait {

def send(req: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = {
def send[T <: HttpResponse](req: HttpRequest, responseFactory: HttpResponseFactory[T])(implicit ec: ExecutionContext):
Future[T] = {
concurrent.Future {
try {
blocking {
blocking {
try {
val connection = prepareConnection(req)
readResponse(connection)
readResponse(connection, responseFactory, req.backendConfig)
} catch {
case e: HttpResponseError => throw e
case e: Throwable => throw new HttpNetworkError(e)
}
} catch {
case e: HttpResponseError => throw e
case e: Throwable => throw new HttpNetworkError(e)
}
}
}.flatMap(f => f)
}

private def prepareConnection(req: HttpRequest): HttpURLConnection = {
Expand All @@ -36,7 +39,9 @@ private object HttpDriver extends DriverTrait {
connection
}

private def readResponse(connection: HttpURLConnection): HttpResponse = {
private def readResponse[T <: HttpResponse](
connection: HttpURLConnection, responseFactory: HttpResponseFactory[T], config: HttpConfig)
(implicit ec: ExecutionContext): Future[T] = {
val code = connection.getResponseCode
val headerMap = HeaderMap(Iterator.from(0)
.map(i => (i, connection.getHeaderField(i)))
Expand All @@ -47,22 +52,50 @@ private object HttpDriver extends DriverTrait {
case key => Some((key, t._2.mkString.trim))
}
}).toMap[String, String])
val charset = HttpUtils.charsetFromContentType(headerMap.getOrElse("content-type", null))

if (code < 400) {
new HttpResponse(
responseFactory(
code,
ByteBuffer.wrap(IO.readInputStreamToByteArray(connection.getInputStream)),
inputStreamToObservable(connection.getInputStream, config.streamChunkSize),
headerMap
)
} else {
throw HttpResponseError.badStatus(new HttpResponse(
responseFactory(
code,
ByteBuffer.wrap(Option(connection.getErrorStream)
.map(IO.readInputStreamToByteArray)
.getOrElse(Array.empty)),
Option(connection.getErrorStream)
.map(is => inputStreamToObservable(is, config.streamChunkSize))
.getOrElse(Observable.from(ByteBuffer.allocate(0))),
headerMap
))
).map(response => throw HttpResponseError.badStatus(response))
}
}

private def inputStreamToObservable(in: java.io.InputStream, chunkSize: Int): Observable[ByteBuffer] = {
val iterator = new Iterator[ByteBuffer] {
private[this] var buffer: Array[Byte] = null
private[this] var lastCount = 0

def hasNext: Boolean =
lastCount match {
case 0 =>
buffer = new Array[Byte](chunkSize)
lastCount = in.read(buffer)
lastCount >= 0
case nr =>
nr >= 0
}

def next(): ByteBuffer = {
if (lastCount < 0)
throw new NoSuchElementException
else {
val result = ByteBuffer.wrap(buffer, 0, lastCount)
lastCount = 0
result
}
}
}

Observable.fromIterator(iterator)
}
}
3 changes: 2 additions & 1 deletion shared/src/main/scala/fr/hmil/roshttp/DriverTrait.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ package fr.hmil.roshttp
import scala.concurrent.{ExecutionContext, Future}

private trait DriverTrait {
def send(req: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse]
def send[T <: HttpResponse](req: HttpRequest, factory: HttpResponseFactory[T])(implicit ec: ExecutionContext):
Future[T]
}
25 changes: 25 additions & 0 deletions shared/src/main/scala/fr/hmil/roshttp/HttpConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package fr.hmil.roshttp

class HttpConfig private(
/** Maximum size of each data chunk in a Streamed response.
*
* TODO: This is only enforced on JVM. Enforce on other envs as well
*/
val streamChunkSize: Int,

/** Timeout for collecting the request body in a SimpleHttpResponse */
val bodyCollectTimeout: Int
)

object HttpConfig {

def apply(
streamChunkSize: Int = 4096,
bodyCollectTimeout: Int = 10
): HttpConfig = new HttpConfig(
streamChunkSize = streamChunkSize,
bodyCollectTimeout = bodyCollectTimeout
)

implicit val default = HttpConfig()
}
Loading