Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queuing calls inside other functions #4

Open
jamieRowen opened this issue Apr 10, 2018 · 1 comment
Open

queuing calls inside other functions #4

jamieRowen opened this issue Apr 10, 2018 · 1 comment

Comments

@jamieRowen
Copy link

Love this package as I am a big fan of the python rq package and this feels like the closest I can get in R at the moment. I notice that you have been doing a fair amount of work in this space, so thanks very much and kudos.

I have come across a problem that yields an error:

[2018-04-10 13:30:49] STOP ERROR
worker_1 | This is an uncaught error in rrq, probably a bug!
worker_1 | Error in storr_copy(dest, self, list, namespace, skip_missing) :
worker_1 | Missing values; can't copy:
worker_1 | - from namespace 'objects', key: '6717f2823d3202449301145073ab8719'
worker_1 | Calls: rrq_worker ... tryCatch -> tryCatchList -> tryCatchOne ->
worker_1 | Execution halted

as a reproducible example i have a a script file "myfuns.R"

slowdouble <- function(x) {
  Sys.sleep(x)
  print(x * 2)
  x*2
}

I create a context and save it separately to be loaded in both the master and worker with

context_holder = "context"
dir.create(context_holder)
root = paste0(context_holder, "/test_context")
out = paste0(context_holder,"/context.RData")
orig_source = "myfuns.R"

new_source = paste0(context_holder,"/",orig_source)
file.copy(orig_source,new_source)

context = context::context_save(root,sources = new_source)
context = context::context_load(context, new.env(parent = .GlobalEnv))
saveRDS(context, out)

Run the worker in one R process

library(rrq)
context = readRDS("context/context.RData")
rrq_worker(context, redux::hiredis(host = host, port = port))

Then in the master

library(rrq)
context = readRDS("context/context.RData")
obj = rrq_controller(context,redux::hiredis(host = host,port = port))

to create the controller object.

It works fine if I run

obj$enqueue(slowdouble(1))

but breaks if I do

f = function(x){
  obj$enqueue(slowdouble(x))
}
f(1)

Any ideas on why this might be or what I might do to fix it?

@richfitz
Copy link
Member

Hi @jamieRowen - sorry for missing this; I don't see most github notifications as they get a bit overwhelming!

This slightly modified version of your script works for me:

Sys.setenv(R_TESTS = "")
root <- tempfile()
context <- context::context_save(root, sources = "myfuns.R")
context <- context::context_load(context, new.env(parent = .GlobalEnv))
obj <- rrq_controller(context, redux::hiredis())
on.exit(obj$destroy())

## For testing, use: worker_command(obj)
wid <- workers_spawn(obj, timeout = 5, progress = PROGRESS)

t <- obj$enqueue(slowdouble(1))
obj$task_wait(t)

f <- function(x) {
  obj$enqueue(slowdouble(x))
}
t <- f(1)
obj$task_wait(t)

I think that the issue is the use of saveRDS/readRDS is messing up your environments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants