Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatcher support for crew #168

Open
wlandau opened this issue Nov 12, 2024 · 25 comments
Open

Dispatcher support for crew #168

wlandau opened this issue Nov 12, 2024 · 25 comments

Comments

@wlandau
Copy link

wlandau commented Nov 12, 2024

c.f. #163 (comment)

crew relies on features of the process dispatcher:

  • Counters online, assigned, and complete from status(), the latter two of which are cumulative (IIRC I don't believe they reset on saisei()).
  • If a task fails, reassign it to the same listener on retry, even if the worker exited when the task failed and needs to relaunch.

There might be more, but these are the important ones I can think of at the moment.

@wlandau
Copy link
Author

wlandau commented Dec 1, 2024

  • A third item: the existence of saisei() to rotate listeners

@shikokuchuo shikokuchuo changed the title threaded dispatcher support for crew Dispatcher support for crew Dec 1, 2024
@shikokuchuo
Copy link
Owner

shikokuchuo commented Dec 1, 2024

Just to note that integrating crew into the new default dispatcher is the more immediate priority. Porting the new architecture across to a thread should be much easier when the time comes.

Of course, mirai will continue to support crew using the existing dispatcher model for as long as needed. There are absolutely no changes required at crew for that.

@shikokuchuo
Copy link
Owner

Adding: (i) retries and (ii) nextflow-style retries from #163.

@wlandau
Copy link
Author

wlandau commented Jan 6, 2025

From our conversation today:

  • idletime, maxtasks, and walltime implemented directly in daemon().
  • Verbose daemon-specific messages for the above, e.g. "exited due to idling for 30 seconds".
  • Dispatcher query returns the number of running tasks, the number of queued tasks, and the pipe ID and custom (e.g. crew-specified) ID of each daemon.
  • Dispatcher query returns custom IDs of daemons that connected and then disconnected (which could happen instantaneously between queries).
  • Optionally opt out of automatic task retries.
  • If opting out of retries, relinquish the local copy of the task dependency data when a daemon receives that task. This should hugely improve memory efficiency in targets.

Things to test:

  • Dispatcher queries do not fail when the dispatcher is busy with a task (i.e. serializing or uploading a large payload).
  • A tokenized websocket can share the same port as a TCP socket.

@shikokuchuo
Copy link
Owner

shikokuchuo commented Jan 7, 2025

Tracking implementation status:

  • idletime, maxtasks, and walltime implemented directly in daemon().
  • Verbose daemon-specific messages for the above, e.g. "exited due to idling for 30 seconds". (Not messages, but a return value)
  • Dispatcher query returns the number of running tasks, the number of queued tasks, and the pipe ID and custom (e.g. crew-specified) ID of each daemon.
  • Dispatcher query returns custom IDs of daemons that connected and then disconnected (which could happen instantaneously between queries).
  • Optionally opt out of automatic task retries. (This is not optional).
  • If opting out of retries, relinquish the local copy of the task dependency data when a daemon receives that task. This should hugely improve memory efficiency in targets.

@shikokuchuo
Copy link
Owner

Things to test:

  • Dispatcher queries do not fail when the dispatcher is busy with a task (i.e. serializing or uploading a large payload).

The update shouldn't make this situation any worse. All sends have been synchronous for quite some time, so a query would fall either before or after such an event. If dispatcher receives a query, it sends the reply straight away, so it doesn't get blocked doing something else in the interim.

  • A tokenized websocket can share the same port as a TCP socket.

You can create non-dispatcher websocket connections that share one single port. This needs to be different to the TCP port used by dispatcher, as the websocket sharing must be in the same process (they share the same HTTP server). Note, this is no different to when you use different compute profiles, those would need to be on different ports.

@shikokuchuo
Copy link
Owner

  • Verbose daemon-specific messages for the above, e.g. "exited due to idling for 30 seconds".

Also I'm not sure what you had in mind for this, as daemons are by default silent (output sinked to the null device). How would you receive / log this? Does daemon() returning a value help?

@shikokuchuo
Copy link
Owner

@wlandau with #182 merged, you should have everything you need to start updating crew now. I'm going to proceed with the mirai v2 release, so you can work off that.

@wlandau
Copy link
Author

wlandau commented Jan 8, 2025

Also I'm not sure what you had in mind for this, as daemons are by default silent (output sinked to the null device). How would you receive / log this? Does daemon() returning a value help?

On clusters and cloud services, users can access logs which contain stdout and stderr printed from individual jobs. Simple message() calls from daemon() would be logged that way. It would be extremely helpful to have these simple messages.

with #182 merged, you should have everything you need to start updating crew now. I'm going to proceed with the mirai v2 release, so you can work off that.

Thanks @shikokuchuo. I will try to get started in the next few days.

@wlandau
Copy link
Author

wlandau commented Jan 8, 2025

For the upcoming release: crew tests pass using 3cf84b6

@shikokuchuo
Copy link
Owner

shikokuchuo commented Jan 8, 2025

On clusters and cloud services, users can access logs which contain stdout and stderr printed from individual jobs. Simple message() calls from daemon() would be logged that way. It would be extremely helpful to have these simple messages.

Does crew always switch mirai output = TRUE when logging is enabled? Because if mirai runs within your launcher process, it should have the last word about where to redirect stdout / stderr. I had thought that you logged within crew_eval() but that doesn't seem to be the case.

The issue I'm trying to get my head around is, if there is no active diversion then those messages would actually appear in a user's terminal (not desirable). But then mirai doesn't handle logging itself.

@wlandau
Copy link
Author

wlandau commented Jan 8, 2025

crew always sets output = TRUE in daemon() (singular) and custom messages from the user's tasks correctly show up in the cluster log files.

@shikokuchuo
Copy link
Owner

in #183 I've implemented daemon() now returning the reason for termination as an integer exit code.

As mirai doesn't handle logging itself, it would be out of place for it to output messages directly as people do use it in the terminal with output on. However, based on the return value, the launcher could then choose to log this.

@shikokuchuo
Copy link
Owner

As the features we discussed have been implemented, I'm going to close this issue to prevent it becoming too long. Feel free to open new ones as you come across things in your implementation. Good luck!

@wlandau
Copy link
Author

wlandau commented Jan 10, 2025

As mirai doesn't handle logging itself, it would be out of place for it to output messages directly as people do use it in the terminal with output on. However, based on the return value, the launcher could then choose to log this.

Is there a way to interpret these integers as human-friendly informative messages? If so, crew::crew_worker() could detect the integer code and print the message.

@shikokuchuo
Copy link
Owner

Yes - the return values are documented in the function- but it's probably better that I provide a translation function.

@shikokuchuo shikokuchuo reopened this Jan 10, 2025
@shikokuchuo
Copy link
Owner

Provided through nextcode() in c0d6ecd.

@wlandau
Copy link
Author

wlandau commented Jan 13, 2025

As part of this migration, is there an opportunity to eliminate the dispatcher altogether? If we're using a different protocol now, can we opt out of round-robin dispatch? Looks like mirai 2.0.0.9000 uses round-robin (details below).

is_resolved <- function(tasks) {
  !as.logical(lapply(tasks, unresolved))
}

print_tasks <- function(tasks, resolved, seconds) {
  text <- sprintf(
    "seconds: %s | odd: %s | even: %s",
    seconds,
    sum(resolved[c(1L, 3L, 5L, 7L)]),
    sum(resolved[c(2L, 4L, 6L, 8L)])
  )
  message(text)
}

test_dispatch <- function(dispatcher) {
  url <- "ws://127.0.0.1:57000"
  daemons(url = url, dispatcher = dispatcher)
  
  # 2 workers
  workers <- replicate(
    2L,
    callr::r_bg(
      func = \(dispatcher) {
        mirai::daemon(url = "ws://127.0.0.1:57000", dispatcher = dispatcher)
      },
      args = list(dispatcher = dispatcher)
    )
  )
  while (status()$connections < 2L) {
    Sys.sleep(0.25)
  }
  
  # Submit 8 tasks in order.
  # Odd-numbered tasks take 1 second each,
  # while even-numbered tasks take 10 seconds each.
  tasks <- list()
  for (index in seq_len(8L)) {
    if (index %% 2L) {
      tasks[[index]] <- mirai(Sys.sleep(10))
    } else {
      tasks[[index]] <- mirai(Sys.sleep(1))
    }
    Sys.sleep(0.25)
  }
  
  # Count how many odd vs even tasks are resolved each second.
  seconds <- 0
  while (!all(resolved <- is_resolved(tasks))) {
    print_tasks(tasks, resolved, seconds)
    Sys.sleep(1)
    seconds <- seconds + 1
  }
  print_tasks(tasks, resolved, seconds)
  daemons(0L)
  list(seconds = seconds, workers = workers, tasks = tasks)
}

library(mirai)
out <- test_dispatch(dispatcher = FALSE)
#> seconds: 0 | odd: 0 | even: 1
#> seconds: 1 | odd: 0 | even: 2
#> seconds: 2 | odd: 0 | even: 3
#> seconds: 3 | odd: 0 | even: 4
#> seconds: 4 | odd: 0 | even: 4
#> seconds: 5 | odd: 0 | even: 4
#> seconds: 6 | odd: 0 | even: 4
#> seconds: 7 | odd: 0 | even: 4
#> seconds: 8 | odd: 1 | even: 4
#> seconds: 9 | odd: 1 | even: 4
#> seconds: 10 | odd: 1 | even: 4
#> seconds: 11 | odd: 1 | even: 4
#> seconds: 12 | odd: 1 | even: 4
#> seconds: 13 | odd: 1 | even: 4
#> seconds: 14 | odd: 1 | even: 4
#> seconds: 15 | odd: 1 | even: 4
#> seconds: 16 | odd: 1 | even: 4
#> seconds: 17 | odd: 1 | even: 4
#> seconds: 18 | odd: 2 | even: 4
#> seconds: 19 | odd: 2 | even: 4
#> seconds: 20 | odd: 2 | even: 4
#> seconds: 21 | odd: 2 | even: 4
#> seconds: 22 | odd: 2 | even: 4
#> seconds: 23 | odd: 2 | even: 4
#> seconds: 24 | odd: 2 | even: 4
#> seconds: 25 | odd: 2 | even: 4
#> seconds: 26 | odd: 2 | even: 4
#> seconds: 27 | odd: 2 | even: 4
#> seconds: 28 | odd: 3 | even: 4
#> seconds: 29 | odd: 3 | even: 4
#> seconds: 30 | odd: 3 | even: 4
#> seconds: 31 | odd: 3 | even: 4
#> seconds: 32 | odd: 3 | even: 4
#> seconds: 33 | odd: 3 | even: 4
#> seconds: 34 | odd: 3 | even: 4
#> seconds: 35 | odd: 3 | even: 4
#> seconds: 36 | odd: 3 | even: 4
#> seconds: 37 | odd: 3 | even: 4
#> seconds: 38 | odd: 4 | even: 4

out <- test_dispatch(dispatcher = TRUE)
#> seconds: 0 | odd: 0 | even: 1
#> seconds: 1 | odd: 0 | even: 1
#> seconds: 2 | odd: 0 | even: 1
#> seconds: 3 | odd: 0 | even: 1
#> seconds: 4 | odd: 0 | even: 1
#> seconds: 5 | odd: 0 | even: 1
#> seconds: 6 | odd: 0 | even: 1
#> seconds: 7 | odd: 0 | even: 1
#> seconds: 8 | odd: 1 | even: 1
#> seconds: 9 | odd: 1 | even: 2
#> seconds: 10 | odd: 2 | even: 2
#> seconds: 11 | odd: 2 | even: 3
#> seconds: 12 | odd: 2 | even: 3
#> seconds: 13 | odd: 2 | even: 3
#> seconds: 14 | odd: 2 | even: 3
#> seconds: 15 | odd: 2 | even: 3
#> seconds: 16 | odd: 2 | even: 3
#> seconds: 17 | odd: 2 | even: 3
#> seconds: 18 | odd: 2 | even: 3
#> seconds: 19 | odd: 3 | even: 3
#> seconds: 20 | odd: 3 | even: 4
#> seconds: 21 | odd: 4 | even: 4

@wlandau
Copy link
Author

wlandau commented Jan 13, 2025

Even if it isn't possible now, I think the redesign gets us much closer in case NNG supports that someday.

@shikokuchuo
Copy link
Owner

Looks like mirai 2.0.0.9000 uses round-robin (details below).

You mean in the non-dispatcher case right? That's still the same req/rep and it's likely to be retained for the same purpose it serves now in providing a very efficient/lightweight HTTP-server like solution.

Even if it isn't possible now, I think the redesign gets us much closer in case NNG supports that someday.

You're right, and it is definitely in the pipeline to remove dispatcher (the process) altogether.

@wlandau
Copy link
Author

wlandau commented Jan 13, 2025

You mean in the non-dispatcher case right?

Yes, in the non-dispatcher case.

That's still the same req/rep and it's likely to be retained...
it is definitely in the pipeline to remove dispatcher (the process) altogether.

Just so I understand: do you mean that the process dispatcher will go away but the threaded dispatcher will replace it?

for the same purpose it serves now in providing a very efficient/lightweight HTTP-server like solution.

Would you elaborate? From my perspective in crew, it seems simpler, more efficient, and much more robust to switch to a non-round-robin protocol instead of implementing a whole new threaded dispatcher.

@wlandau
Copy link
Author

wlandau commented Jan 17, 2025

The crew redesign is running beautifully, and noticeably faster than before. The only thing I haven't implemented yet is a retry mechanism for tasks. This is going to be harder than I thought because different compute profiles apparently can't share the same TCP port:

mirai::daemons(url = "ws://127.0.0.1:57000/token1", .compute = "token1")
#> [1] 0

mirai::daemons(url = "ws://127.0.0.1:57000/token2", .compute = "token2")
#> Error in mirai::daemons(url = "ws://127.0.0.1:57000/token2", .compute = "token2") : 
#>   initial sync with dispatcher timed out after 10s

@shikokuchuo shikokuchuo reopened this Jan 18, 2025
@shikokuchuo
Copy link
Owner

The crew redesign is running beautifully, and noticeably faster than before.

That's great to hear. The performance gains are probably from a combination of not using websockets and the more efficient dispatcher design e.g. using a single socket for all daemon connections.

The only thing I haven't implemented yet is a retry mechanism for tasks. This is going to be harder than I thought because different compute profiles apparently can't share the same TCP port:

This is essentially what I mentioned above. How websockets work is that there is one HTTP server, but everything that shares that would need to be in the same process.

@wlandau
Copy link
Author

wlandau commented Jan 21, 2025

Will the threaded dispatcher allow two different compute profiles created in the same R process to share a TCP port? If not, should crew switch from ws:// URLs to tcp:// ones for performance and robustness?

Either way, I should probably rethink crew's approach to wlandau/crew.cluster#48. controller groups are more idiomatic than ad hoc compute profiles.

@shikokuchuo
Copy link
Owner

I'd make the switch to tcp, for performance and robustness as you say.

Threaded dispatcher could help, but if you really wanted to go that route, you could model that now using non-dispatcher daemons all using ws on another port. I don't think 2 ports vs. 1 will be too onerous.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants