Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"'errorValue' int 7 | Object closed" with transient sockets and transient workers #50

Closed
wlandau opened this issue Apr 6, 2023 · 14 comments

Comments

@wlandau
Copy link

wlandau commented Apr 6, 2023

As I mentioned in #48 (comment), when I work on wlandau/crew#61 and test crew transient workers, I notice a lot of "'errorValue' int 7 | Object closed" (about 50% of tasks in that test). The following example reproduces the same issue (at least on my Ubuntu machine) with just mirai and nanonext. I think it has something to do with the fact that I am using transient single-task workers and recycling the websocket with saisei() after each task.

library(mirai)
library(nanonext)
packageVersion("mirai")
#> [1] ‘0.8.2.9007’
packageVersion("nanonext")
#> [1] ‘0.8.1.9008’
daemons(n = 1L, url = "ws://127.0.0.1:5000")
count <- 0L
while (TRUE) {
  count <- count + 1L
  m <- mirai("done")
  launch(
    sprintf(
      "mirai::server(url = '%s', asyncdial = FALSE)",
      saisei(i = 1L)
    )
  )
  while (unresolved(m)) {
    msleep(10)
  }
  stopifnot(identical(m$data, "done"))
}
count
#> 64
m$data
#> 'errorValue' int 7 | Object closed
daemons(n = 0L)
@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

On my Macbook, I see 'errorValue' int 7 | Object closed as soon as count == 2.

@shikokuchuo
Copy link
Owner

You are getting those errors because you are using saisei() improperly. If the socket is closed while a mirai is ongoing then it will return 7 Object closed.

saisei() should only be called if there is no existing connection at the socket, otherwise you will risk losing mirai tasks. I have added in this safeguard in c351fc6.

I am sure this ties in with your needs as well. The above example just runs indefinitely now as saisei() has no effect.

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

Thanks for the safeguard! In my case, I cannot ignore the possibility that a worker suddenly connects between the time I check it and the time I try to rotate the path, so this is really helpful.

Things on my end are working slightly better, although I am noticing a rare race condition where crew sometimes loses tasks, and I still see frequent instances of 'errorValue' int 7 | Object closed in the transient worker throughput test. This is really tough to debug.

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

Could be somewhat related: this test does not call saisei() at all, and every so often, the task is lost even though the worker is running. I am having trouble reproducing it without crew.

library(crew)
x <- crew_controller_local()
x$start()
x$router$poll()
d <- x$router$daemons
x$push(command = ps::ps_pid(), name = "task_pid")
x$wait(seconds_timeout = 5)
out <- x$pop(scale = FALSE)
stopifnot(!is.null(out)) # should not be NULL
x$terminate()

@shikokuchuo
Copy link
Owner

You will only get 7 object closed if you close the socket at dispatcher (at the listener), as this is seen as intentional.
If the server times out, the connection is dropped, but this is detected by the listener and the task is queued for re-try.
I hope this helps.

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

That's what I thought.

Looking at my original reprex from the top of the thread, I forgot to add maxtasks = 1L to represent the case of single-task servers. The following example loses the task hits the "object closed" error at count = 2 on my Macbook:

library(mirai)
library(nanonext)
packageVersion("mirai")
#> [1] ‘0.8.2.9009’
packageVersion("nanonext")
#> [1] ‘0.8.1.9008’
daemons(n = 1L, url = "ws://127.0.0.1:5000")
count <- 0L
while (TRUE) {
  count <- count + 1L

  # Submit a new task.
  m <- mirai("done")

  # Try to rotate the websocket.
  socket <- saisei(i = 1L)

  # Wait for the previous server to disconnect and exit on its own.
  start <- mclock()
  while (!is.character(socket)) {
    socket <- saisei(i = 1L)
    if (mclock() - start > 5000) break
    msleep(10)
  }
  stopifnot(is.character(socket))

  # Launch the server with a fresh websocket.
  launch(
    sprintf(
      "mirai::server(url = '%s', asyncdial = FALSE, maxtasks = 1L)",
      saisei(i = 1L)
    )
  )

  # Try to get the result of the task.
  start <- mclock()
  while (unresolved(m)) {
    if (mclock() - start > 5000) break
    msleep(10)
  }
  stopifnot(identical(m$data, "done"))
}

# State when I get "object closed":
socket 
#> [1] "ws://127.0.0.1:5000/44317ae525dbbcbbd84c06467854758ea87ed1c7"
daemons()$daemons
#> online instance assigned complete
#> ws://127.0.0.1:5000/572271fb4da78b7a25859c9f00e3b8b7e66cfbcd      1        1        0        0
count
#> 2
m$data
#> 'errorValue' int 7 | Object closed
daemons(n = 0L)

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

I fixed a bug in my example above, and now I am getting 'errorValue' int 7 | Object closed instead of losing the worker.

@shikokuchuo
Copy link
Owner

The loop logic needs a look, I think you mean something like:

while (unresolved(m) || mclock() - start < 5000)

Otherwise you risk calling saisei() while the connection is still live and it will return NULL.

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

Thanks, I changed the loop logic in the example below, and I am no longer getting "object closed" errors. Sometimes it runs for several iterations, but just now, the dispatcher exited at around count == 8.

library(mirai)
library(nanonext)
packageVersion("mirai")
#> [1] ‘0.8.2.9009’
packageVersion("nanonext")
#> [1] ‘0.8.1.9008’
daemons(n = 1L, url = "ws://127.0.0.1:5000")
count <- 0L
while (TRUE) {
  count <- count + 1L
  
  # Submit a new task.
  m <- mirai("done")
  
  # Try to rotate the websocket.
  socket <- saisei(i = 1L)
  
  # Wait for the previous server to disconnect and exit on its own.
  start <- mclock()
  while (!is.character(socket) || mclock() - start < 5000) {
    socket <- saisei(i = 1L)
    msleep(10)
  }
  stopifnot(is.character(socket))
  
  # Launch the server with a fresh websocket.
  launch(
    sprintf(
      "mirai::server(url = '%s', asyncdial = FALSE, maxtasks = 1L)",
      saisei(i = 1L)
    )
  )
  
  # Try to get the result of the task.
  start <- mclock()
  while (unresolved(m) || mclock() - start < 5000) {
    msleep(10)
  }
  stopifnot(identical(m$data, "done"))
}

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

So I found out #51 is part of the reason for "object closed" errors, but it is not the whole reason. If I run the following test in branch 61 of crew with informative print statements at https://github.com/wlandau/crew/blob/eea4cd93ce6dc8f813cd99b76ce8d1bdfb5bc2cf/R/crew_controller.R#L94, then I find saisei() still gives me a websocket even when the callr handle tells me the server process at the PID is still alive.

library(crew)
x <- crew_controller_local(tasks_max = 1L)
x$start()
n <- 4
for (index in seq_len(4L)) x$push(command = ps::ps_pid())
x$wait()
results <- list()
while (length(x$results) > 0L || length(x$queue) > 0L) {
  out <- x$pop()
  if (!is.null(out)) results[[length(results) + 1L]] <- out
}
results <- tibble::as_tibble(do.call(rbind, results))
results$result <- as.integer(results$result)
results$error
View(x$summary())
x$terminate()

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

Hmm... maybe that's not right either, I tried suppressing launches when the callr process PID is active, and that didn't seem to solve it. Something about crew and quick calls to saisei() seems off.

@shikokuchuo
Copy link
Owner

I was going to suggest using the 'connections' column at daemons()$daemons rather than repeatedly calling saisei()... but didn't see any difference... but I did make a small change in fae12d9 just sending back something instead of nothing as it's handled at the function anyway.

So just to clarify, you are still getting 7 Object closed?

@wlandau
Copy link
Author

wlandau commented Apr 6, 2023

Thanks for continuing to work on this. I am still getting "object closed", both with 0.8.2.9000 and 0.8.2.9011. Here is a reprex that uses only mirai and nanonext:

library(mirai)
library(nanonext)
daemons(n = 1L, url = "ws://127.0.0.1:5000", dispatcher = TRUE, token = TRUE)
msleep(5000)
tasks <- lapply(seq_len(2), function(x) mirai("done"))
while (any(vapply(tasks, unresolved, FUN.VALUE = logical(1L)))) {
  socket <- saisei(i = 1L)
  if (!is.null(socket)) {
    command <- sprintf(
      "mirai::server(url = '%s', asyncdial = FALSE, maxtasks = 1L)",
      socket
    )
    launch(command)
  }
  msleep(1000)
}
tasks[[1]]$data
#> [1] "done"
tasks[[2]]$data
#> 'errorValue' int 7 | Object closed
daemons(n = 0L)

@shikokuchuo
Copy link
Owner

The above now works with v0.8.2.9012.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants