Skip to content


Update embedded async
Browse files Browse the repository at this point in the history
To have HTTP event emitters and SSE.
  • Loading branch information
gaborcsardi committed Mar 18, 2024
1 parent c1ac71f commit 6938e69
Showing 1 changed file with 231 additions and 7 deletions.
238 changes: 231 additions & 7 deletions R/aaa-async.R
Original file line number Diff line number Diff line change
Expand Up @@ -1087,9 +1087,10 @@ deferred <- R6Class(
initialize = function(action = NULL, on_progress = NULL, on_cancel = NULL,
parents = NULL, parent_resolve = NULL,
parent_reject = NULL, type = NULL,
call =
call =, event_emitter = NULL)
async_def_init(self, private, action, on_progress, on_cancel,
parents, parent_resolve, parent_reject, type, call),
parents, parent_resolve, parent_reject, type, call,
then = function(on_fulfilled)
def_then(self, private, on_fulfilled),
catch = function(...)
Expand All @@ -1098,7 +1099,9 @@ deferred <- R6Class(
def_finally(self, private, on_finally),
cancel = function(reason = "Cancelled")
def_cancel(self, private, reason),
share = function() { private$shared <<- TRUE; invisible(self) }
share = function() { private$shared <<- TRUE; invisible(self) },

event_emitter = NULL

private = list(
Expand Down Expand Up @@ -1150,14 +1153,15 @@ deferred <- R6Class(

async_def_init <- function(self, private, action, on_progress,
on_cancel, parents, parent_resolve,
parent_reject, type, call) {
parent_reject, type, call, event_emitter) {

private$type <- type
private$id <- get_id()
private$event_loop <- get_default_event_loop()
private$parents <- parents
private$action <- action
private$mycall <- call
self$event_emitter <- event_emitter

"!DEBUG NEW `private$id` (`type`)"

Expand Down Expand Up @@ -1773,12 +1777,15 @@ el_add_http <- function(self, private, handle, callback, progress, file,
pool = private$pool,
done = function(response) {
task <- private$tasks[[id]]
private$tasks[[id]] <- NULL
response$content <-, as.list(content))
response$file <- outfile
task$callback(NULL, response)
data = function(bytes, ...) {
task <- private$tasks[[id]]
task$data$data$event_emitter$emit("data", bytes)
if (!is.null(outfile)) {
## R runs out of connections very quickly, especially because they
## are not removed until a gc(). However, calling gc() is
Expand Down Expand Up @@ -2268,7 +2275,7 @@ el__update_curl_data <- function(self, private) {
#' In an error happen within an `error` listener, then the same happens,
#' the last `synchronise()` or `run_event_loop()` call fails. You can
#' want to wrap the body of the error listeners in a `tryCatch()` call,
#' wrap the body of the error listeners in a `tryCatch()` call,
#' if you want to avoid this.
#' @noRd
Expand Down Expand Up @@ -2491,12 +2498,200 @@ async_reject <- function(.x, .p, ...) {

async_reject <- mark_as_async(async_reject)
#' HTTP event emitter for server-sent events
#' Server-sent events are a technique to stream events from a web server
#' to a client, through an open HTTP connection.
#' This class implements an event emitter on an async HTTP query created
#' with [http_get()] and friends, that fires an `"event"` event when the
#' server sends an event. An `"end"` event is emitted when the server
#' closes the connection.
#' An event is a named character vector, the names are the keys of the
#' events.
#' Example using our built-in toy web app:
#' ```r
#' http <- webfakes::new_app_process(async:::sseapp())
#' stream_events <- function() {
#' query <- http_get(http$url("/sse"))
#' sse <- sse_events$new(query)
#' sse$
#' listen_on("event", function(event) {
#' writeLines("Got an event:")
#' print(event)
#' })$
#' listen_on("end", function() {
#' writeLines("Done.")
#' })
#' query
#' }
#' response <- synchronise(stream_events())
#' ```
#' @noRd

sse_events <- R6Class(
inherit = event_emitter,
public = list(
initialize = function(http_handle) {
http_handle$event_emitter$listen_on("data", function(bytes) {
private$data <- c(private$data, bytes)
http_handle$event_emitter$listen_on("end", function() {

private = list(
data = NULL,
sep = as.raw(c(0xaL, 0xaL)),
emit_events = function() {
evs <- chunk_sse_events(private$data, private$sep)
private$data <- evs$rest
for (ev in evs$events) {
self$emit("event", ev)

chunk_sse_events <- function(data, sep = NULL) {
# skip leading \n
no <- 0L
while (no <= length(data) && data[no + 1] == 0x0a) {
no <- no + 1L
if (no > 0) {
data <- data[(no + 1L):length(data)]
sep <- sep %||% as.raw(c(0xaL, 0xaL))
mtch <- grepRaw(sep, data, fixed = TRUE, all = TRUE)
# shortcut for no events
if (length(mtch) == 0) {
return(list(events = list(), rest = data))

events <- vector("list", length(mtch))
for (p in seq_along(mtch)) {
from <- if (p == 1) 1L else mtch[p - 1] + 2L
to <- mtch[p] - 1L
events[[p]] <- parse_sse_event(data[from:to])
events <- drop_nulls(events)

restfrom <- mtch[length(mtch)] + 2L
rest <- if (restfrom <= length(data)) {
} else {
list(events = events, rest = rest)

parse_sse_event <- function(data) {
txt <- rawToChar(data)
Encoding(txt) <- "UTF-8"
lines <- strsplit(txt, "\n", fixed = TRUE)[[1]]
lines <- lines[lines != ""]
if (length(lines) == 0) {
keys <- sub(":.*$", "", lines)
vals <- sub("^[^:]*:[ ]*", "", lines)
structure(vals, names = keys)

drop_nulls <- function(x) {
x[!vapply(x, is.null, logical(1))]

sseapp <- function() {
app <- webfakes::new_app()
app$get("/sse", function(req, res) {
`%||%` <- function(l, r) if (is.null(l)) r else l
if (is.null(res$locals$sse)) {
duration <- as.double(req$query$duration %||% 2)
delay <- as.double(req$query$delay %||% 0)
numevents <- as.integer(req$query$numevents %||% 5)
pause <- max(duration / numevents, 0.01)
res$locals$sse <- list(
sent = 0,
numevents = numevents,
pause = pause

set_header("cache-control", "no-cache")$
set_header("content-type", "text/event-stream")$
set_header("access-control-allow-origin", "*")$
set_header("connection", "keep-alive")$

if (delay > 0) {

msg <- paste0(
"event: ", res$locals$sse$sent + 1L, "\n",
"message: live long and prosper\n\n"
res$locals$sse$sent <- res$locals$sse$sent + 1L

if (res$locals$sse$sent == res$locals$sse$numevents) {
} else {

#' Asynchronous HTTP GET request
#' Start an HTTP GET request in the background, and report its completion
#' via a deferred.
#' @section HTTP event emitters:
#' An async HTTP deferred object is also an event emitter, see
#' [event_emitter]. Use `$event_emitter` to access the event emitter API,
#' and call `$event_emitter$listen_on()` etc. to listen on HTTP events,
#' etc.
#' * `"data"` is emitted when we receive data from the server, the data is
#' passed on to the listeners as a raw vector. Note that zero-length
#' raw vectors might also happen.
#' * `"end"` is emitted at the end of the HTTP data stream, without
#' additional arguments (Also on error.)
#' Here is an example, that uses the web server from the webfakes
#' package:
#' ```r
#' http <- webfakes::new_app_process(webfakes::httpbin_app())
#' stream_http <- function() {
#' query <- http_get(http$url("/drip?duration=3&numbytes=10"))
#' query$event_emitter$
#' listen_on("data", function(bytes) {
#' writeLines(paste("Got", length(bytes), "byte(s):"))
#' print(bytes)
#' })$
#' listen_on("end", function() {
#' writeLines("Done.")
#' })
#' query
#' }
#' response <- synchronise(stream_http())
#' ```
#' @param url URL to connect to.
#' @param headers HTTP headers to send.
#' @param file If not `NULL`, it must be a string, specifying a file.
Expand Down Expand Up @@ -2571,6 +2766,9 @@ http_get <- mark_as_async(http_get)

#' Asynchronous HTTP HEAD request
#' An async HTTP deferred object is also an event emitter, see
#' [http_get()] for details, and also [event_emitter].
#' @inheritParams http_get
#' @return Deferred object.
Expand Down Expand Up @@ -2619,6 +2817,9 @@ http_head <- mark_as_async(http_head)
#' Start an HTTP POST request in the background, and report its completion
#' via a deferred value.
#' An async HTTP deferred object is also an event emitter, see
#' [http_get()] for details, and also [event_emitter].
#' @inheritParams http_get
#' @param data Data to send. Either a raw vector, or a character string
#' that will be converted to raw with [base::charToRaw]. At most one of
Expand Down Expand Up @@ -2721,9 +2922,30 @@ get_default_curl_options <- function(options) {

http_events <- R6Class(
inherit = event_emitter,
public = list(
listen_on = function(event, callback) {
super$listen_on(event, callback)
listen_off = function(event, callback) {
super$listen_off(event, callback)
private = list(
check = function(event) {
stopifnot(event %in% c("data", "end"))

make_deferred_http <- function(cb, file) {
cb; file
id <- NULL
ee <- http_events$new()
type = "http", call =,
action = function(resolve, progress) {
Expand All @@ -2738,11 +2960,13 @@ make_deferred_http <- function(cb, file) {
function(err, res) if (is.null(err)) resolve(res) else reject(err),
data = ho$options)
data = c(ho$options, list(event_emitter = ee))
on_cancel = function(reason) {
if (!is.null(id)) get_default_event_loop()$cancel(id)
event_emitter = ee

Expand Down

0 comments on commit 6938e69

Please sign in to comment.