-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Signal #374
Signal #374
Conversation
This implements Signals in a publish/subscriber pattern, the main goal is to give a bit better semantics than pure POSIX signals while still being able to work in cenarios where a native signal is needed (think a debugger and whatnot). Rationale ~~~~~~~~~ POSIX semantics for multithreading are very weak, signal dipositions continue to be process-wide while signal masking is pthread-wide, this means when a signal arrives to the process PID, any thread can end up handling the signal, this together with the normal semantics of signals of being processed in whatever altstack and interrupting code make it difficult to make proper use of it in places like EIO. I propose a publish/subscriber way of dealing with signals, a bit inspired by how Libuv already handles it, users can subscribe to a signal number in a signal box, this box can be waited on signals and integrates well with EIO Fibers, blocking and so on. A signal box (subscription) is always linked to a Switch.t in order to guarantee the subscription is removed when the handler goes out of scope. Multithreading is allowed, any Domain can create subscriptions, this makes it interesting for things like every domain installing a handler for SIGTERM and then doing a proper finishing when needed, or maybe handling a SIGHUP and parking everyone until the main domain reconfigures and whatnot. Signal Numbers ~~~~~~~~~~~~~~ The signals from Sys are in a negative form and the runtime translates them to the system signal number, this would be ok if Sys exported all signals we want, but it doesnt, so we have to discover our signal numbers. Sys actually works with its own idea of a signal number (the negative) as well as the positive (the system), with some minor inconveniences. Since not every system has every signal, signals that are not guaranteed to exist are exposed as an optional type, this forces the user to acknowledge "ok this is not portable" as well as preventing him from using an invalid number. The actual number is hidden from the user as a private int, but can be acquired via Signal.signum_to_int. Libuv/Luv Implementation ~~~~~~~~~~~~~~~~~~~~~~~~ Libuv provides pretty much the publish/subscriber mechanism already, so we just make use of that, in luv signals are not really processed in the signal handler, they just schedule something to be processed in the main loop. Uring Implementation ~~~~~~~~~~~~~~~~~~~~ In uring we implement the publish/subscriber mechanism ourselves, since a signal can have multiple listeners, we have to be careful to install a new handler when it's the first and restore the original one when it's the last one being removed. We do not make sigprocmask dances as they become a nightmare due to inheritance. The signal handler in uring must wake the signalbox (an Eio.Stream.t), which means it must go into the scheduler, which means if we just handled it from the signal handler, we would end up recursing, which means we wouldn't be able to grab locks, which means we wouldn't be able to do Eio.Stream.add. We solve this by using the old pipe trick, the signal handler just writes a byte to a pipe via Unix.write (must be outside of EIO). We then have an extra Fiber (like the eventfd listener) running by default that reads from the other end of the pipe, gets the signal and publishes to all listeners via Eio.Stream.add. Why Not signalfd ~~~~~~~~~~~~~~~~ I've actually written one version with signalfd, but it's a pretty terrible API if you need multithreading, to make usage of it you basically have to mask all signals on every Domain (which means you have to decide which ones you will mask beforehand, possibly killing uses of catching SIGSEGV/SIGSTOP). Doing the pipe trick gives us what we wanted from signalfd and is also portable if we want it in other backends. I still have the signalfd code in a branch if anyone wants it. There's a half-decent rant here: https://ldpreload.com/blog/signalfd-is-useless Why Stream.addcanfail ~~~~~~~~~~~~~~~~~~~~~ I've added addcanfail to Stream since signals can be lost, we don't want to block the sender, we also don't want to be forking senders and/or buffering signals indefinately. Actual POSIX signals can only be pending, there are never "two SIGINT" to be processed, two SIGINT would be coalesced into one. We do a bit more, when we introduce the pipe trick we end up buffering signals, which means two SIGINTS would be two SIGINTS, so we're bound by the pipe buffer size. Since draining the pipe is not dependant on the user calling anything, it's virtually impossible to saturate it. We still need addcanfail since the pipe reader cannot block on publish, imagine you have multiple signals but the user never reads from one of the boxes, if the pipe reader would block trying to send, it would not publish anything else. Worth noting that libuv as well as libevent do the exact same, as it's an old solution.
Also cc #301 |
Well, that's annoying! The
This PR seems a bit ad-hoc, though. Perhaps we can start with something simpler. The basic building block we need is the ability to schedule something to run in the main event loop. e.g. for let enqueue fn =
Lf_queue.push st.run_q (Fn fn);
if Atomic.get st.need_wakeup then wakeup st where | Some Fn fn -> fn (); schedule st Then you could handle signals like this: open Eio.Std
let () =
Eio_main.run @@ fun env ->
let enqueue = (* get it somehow *) in
let handler (_ : int) =
(* (running in signal handler) *)
enqueue @@ fun () ->
(* (called from event loop) *)
traceln "Interrupt!"
in
Sys.(set_signal sigint) (Sys.Signal_handle handler);
traceln "Sleeping for 5 seconds...";
Eio.Time.Mono.sleep env#mono_clock 5. That would give people the flexibility to try different schemes. e.g. they could set a flag and signal a condition if it was previously clear. Possibly we should store (note: |
What do you mean by ad-hoc specifically ?
This could also be done with a Stream and it was one of the ideas, but it's a bit tricky, you either have to fork a fiber indefinately so to allow the handler to block freely, or you would constrain the handler to not being able to block, which is kinda bad. I like the idea of a generic "please domain X run Y for me" but the semantics need to be defined and it's not something users should have to learn. Most of my design is to be able to free the user from any constraint and also to make sure we're dealing with being bound and not forking things indefinately. I don't like the idea of the user having to learn different contexts, most of EIO is waiting for things, so why not channel signals through the same way he is already used to ?
Not sure I follow this, but I assume you don't mean this:
That's not entirely true as the pipe is only closed after all fibers have finished, basically when domain exits, also, the closing of the pipe would just trigger an error on the write, which would not lead to any damage. The recursive mutex is a real issue (wakeup recursing into mutex) and I don't see how we could start with that. I fail to see what's wrong with the pipe, but the proper solution if that is not wanted is an Atomic.t that is modified by the signal handler and pokes the mainloop via eventfd, and we would check that Atomic.t at every loop iteration, I prefer the pipe though. |
Not sure if I was clear but you can do things like
|
It can, though, because another domain could open a different file reusing the FD. Then writing to the pipe will corrupt that file. This is why
There are other ways of solving this problem. I think the easiest is to keep a free list of eventfds instead of closing them. Then you might just get a spurious wakeup, which doesn't matter.
I mean it's doing a load complicated stuff that seems only useful for some specific use. Ideally, I think it would look like this: open Eio.Std
let () =
Eio_main.run @@ fun _ ->
let interrupted = Eio.Condition.create () in
Sys.(set_signal sigint) (Sys.Signal_handle (fun _ -> Eio.Condition.broadcast interrupted));
Fiber.first
(fun () -> Eio.Condition.await_no_mutex interrupted; traceln "Interrupted!")
(fun () ->
traceln "Running job (Ctrl-C to cancel)";
Fiber.await_cancel ()
);
traceln "Done" Note that the above doesn't require any new APIs at all. It almost works already; the only problem is that |
Ack, true I was mixing up with channels where there is a flag for closed, but my point stands: see below.
I don't see it as the specific use tbh, I see is as the very general way of handling signals, it escapes exposing anything that is not safe to the user while profiting from the same patterns he would use to wait for IO.
Not only that, Condition is a bit more complex because you'd either have to have one condition per listener, or keep flipping the state condition state all the time, would each listener flip its own condition to wait for the next signal ? would we accumulate ? (I need to read Condition better but so far that's what I see). IIRC our Condition implementation is more suitable for "I'm gonna wait for this to happen and I'm done". But do you want to expose all that to the user, or that would be hidden ? There is virtually nothing he can do Eio wise that is safe from an actual signal handler, I see it as a step backwards to expose something he will shoot himself in the foot. |
…on, to make the pipe always local to the Domain. This also fixes the race between the domain exiting and a handler writing to the output pipe, as the handler will now match and get 'None' instead of seeing the opened pipe.
I'm not sure what you mean. Any number of fibers can wait on a condition. e.g. with two fibers:
I get:
Also, it doesn't matter if the domain where the signal was registered exits or is busy when the signal is delivered. (the condition we're waiting for is "more interrupts have been received than when we started waiting", but we don't need to keep a count or check it, since the condition will always be true when the await returns) |
I was referring to keeping the "interrupt signaled state somewhere", since you don't want to lose signals while there are no listeners. In your example (IIUC), if the broadcast happens before the waiter is on |
Well, I do want that. Ctrl-C should cancel the jobs that are running when I press it, not any future jobs I start later. If I press Ctrl-C three times trying to kill one job, it shouldn't then kill the next two as well. Of course, if I wanted that then I could use a stream instead.
There might be cases where you want that, but it's not wanted in this job example. Pressing Ctrl-C a second time while one job is cancelling will cancel any new jobs started since then, but not jobs started after the second Ctrl-C. The point is, the user needs to be able to choose the handling schema that makes sense for their use. |
That's why my buffering is done per box, and not per signal. A
Yes, but I don't see how my scheme doesn't provide him with both schemas |
BTW, here's the config-reload example using Eio.Condition (^C to reload, for easier testing): open Eio.Std
let () =
Eio_main.run @@ fun _ ->
let interrupted = Eio.Condition.create () in
Sys.(set_signal sigint) (Sys.Signal_handle (fun _ -> Eio.Condition.broadcast interrupted));
while true do
Fiber.both
(fun () -> Eio.Condition.await_no_mutex interrupted)
(fun () ->
traceln "Reading configuration...";
Eio_unix.sleep 2.0;
traceln "Finished reading configuration";
)
done (we reload the configuration when we've both finished the previous load and received a signal since that started)
(line-breaks added for clarity) |
Ack, that works, but my points still stand:
while true do
Eio.Condition.await_no_mutex interrupted;
traceln "Reading configuration...";
Eio_unix.sleep 2.0;
traceln "Finished reading configuration"; Which is wrong as he will only see interrupts every two seconds. I don't see why we should have to burden the user with a semantic that is complex (imho) and more error prone. This is how it looks currently: let case_one () =
Eio_main.run @@ fun _ ->
Eio.Switch.run @@ fun sw ->
traceln "case_one";
let interrupted = Eio.Signal.(subscribe ~sw sigint) in
while true do
Eio.Signal.wait interrupted;
traceln "Reading configuration...";
Eio_unix.sleep 2.0;
traceln "Finished reading configuration";
done
let case_two () =
Eio_main.run @@ fun env ->
traceln "case_two";
Eio.Fiber.first
(fun () -> Eio.Flow.single_read (Eio.Stdenv.stdin env) (Cstruct.create 1) |> ignore)
(fun () -> Eio.Signal.(wait_one sigint))
(* as a bonus this is clean, the handler is not installed in the system anymore *)
I don't wanna be a pain in the ass (although I am and I blame my parents), but I fail to see how your model is preferrable. The recent PR #381 is nice and I can cut the pipe, although I'll have to |
Sometimes though there are other things you want to do, such as updating an atomic counter, or dumping debug info (which might not be completely safe, but still good enough for debugging why your signal handler isn't waking the main loop). But whether we provide a high-level API or not, we can still implement it this way without needing special support in the backends.
I don't think there is. You can certainly argue that's a bit subtle and should be wrapped in something. However, if so, we should be providing an API that wraps Eio.Condition (not just signals). There are other places where we need to recalculate something when a condition changes. For example, OCurrent needs to recalculate when it receives a web-hook, including if it gets another web-hook event while recalculating.
I disagree with that. My version uses only OCaml signals and Eio.Condition, which are both things you might know anyway. Sigbox is a new abstraction, used only for signals. And I think it is quite surprising:
|
I think there is, remember, the broadcast happens from any domain, and since there is no state being kept, lost wakeups[1] can occur at anytime the signal is broadcasted before or after someone let () =
Eio_main.run @@ fun _ ->
let interrupted = Eio.Condition.create () in
Sys.(set_signal sigint) (Sys.Signal_handle (fun _ -> Eio.Condition.broadcast interrupted));
-----> signal broadcasted here is lost since there is no one waiting
while true do
-----> signal broadcasted here is lost since there is no one waiting
Fiber.both
(fun () -> Eio.Condition.await_no_mutex interrupted)
(fun () ->
traceln "Reading configuration...";
Eio_unix.sleep 2.0;
traceln "Finished reading configuration";
)
-----> signal broadcasted here is lost since there is no one waiting
done The wait/broadcast dance needs to keep state, and not just get a signal when it's waiting. That's what I meant earlier with having to keep state, you need pretty much the example that is documented in condition.mli (adapted to Mutex), that correctly keeps state ( {[
let x = ref 0
let cond = Eio.Condition.create ()
let mutex = Eio.Mutex.create ()
let set_x value =
Eio.Mutex.use_rw ~protect:false mutex (fun () -> x := value);
Eio.Condition.broadcast cond
let await_x p =
Eio.Mutex.use_ro mutex (fun () ->
while not (p !x) do (* [x] cannot change, as mutex is locked. *)
Eio.Condition.await ~mutex cond (* Mutex is unlocked while suspended. *)
done
)
]}
This is incorrect. Each Fiber can register for signals with a sigbox, this Fiber can be in whatever Domain. The first sigbox that is created installs the hard handler, which doesn't require any action from the the user code.
This is incorrect, each sigbox is independantly buffered, the hard handler never blocks on write, or waits for anyone, if the user creates a sigbox and never
This is incorrect. If the switch containing the sigbox ends, that sigbox is destroyed, it has no impact on other sigbox or further signals of that type. If this was the last sigbox, the hard handler is unregistered. [1] https://docs.oracle.com/cd/E19120-01/open.solaris/816-5137/sync-30/index.html |
It doesn't matter if the signal is lost in any of the places you marked. If we're at any of those points then we haven't yet started reading the configuration, so we will read the configuration after the signal was sent, as required. let () =
Eio_main.run @@ fun _ ->
let interrupted = Eio.Condition.create () in
Sys.(set_signal sigint) (Sys.Signal_handle (fun _ -> Eio.Condition.broadcast interrupted));
-----> signal broadcasted here is lost since there is no one waiting
-----> doesn't matter; we're about to enter the loop and read the configuration anyway
while true do
-----> signal broadcasted here is lost since there is no one waiting
-----> doesn't matter; we're about to start reading anyway
Fiber.both
(fun () -> Eio.Condition.await_no_mutex interrupted)
(fun () ->
traceln "Reading configuration...";
Eio_unix.sleep 2.0;
traceln "Finished reading configuration";
)
-----> signal broadcasted here is lost since there is no one waiting
-----> doesn't matter; we're about to loop and read the config again anyway
done
I'm looking at this code in this PR: let subscribe signum box =
match Domain.DLS.get signal_pipe_key with
| None -> ()
| Some pipe ->
with_signal (fun () ->
let ol = Atomic.get sigmap.(signum) in
let first = ol = [] in
Atomic.set sigmap.(signum) (box :: ol);
if first then
Sys.set_signal signum
(Sys.Signal_handle
(fun _sysnum ->
let b = Bytes.create 1 in
Bytes.set_uint8 b 0 signum;
let n = Unix.single_write (snd pipe) b 0 1 in
assert (n = 1)))) Let's say:
|
I have to disagree here, it does matter, the configuration file changed again and the signal happens while you were in the blank spots, you miss the event, that's the whole point of signals being leveled and not edge triggered. Your code makes them all edge triggered.
Yes, if you mean that signal can be delayed until the receiving Domain blocks on a Fiber, yes it is delayed, if this is a problem, then the whole assumption of a concurrent scheduler is broken and we should delete it all and make it all preemptive through SIGALARM tricks :). |
Perhaps you could give an example sequence of actions where it doesn't work? If the user changes the config file and then we reach one of the spots you indicated, we will certainly read the updated configuration (in fact, we'll do that even if the user forgets to send a signal at all, in those specific cases). The only difficult case is where a signal occurs after we start reading the configuration and before we start watching for the signal. But that can't happen because we have already started watching for the signal before starting to read the config.
I would expect this to work (and it will, with Eio.Condition and lock-free waiters). In the example I gave:
|
I can't, but you end up processing the file twice for one signal, no ? 1 - user changed the configuration
|
What I might be missing, is that in your example, after start, one Fiber has always finished, but the one waiting the signal did not, so Fiber.both is always half complete. |
This implements Signals in a publish/subscriber pattern, the main goal is to
give a bit better semantics than pure POSIX signals while still being able to
work in cenarios where a native signal is needed (think a debugger and whatnot).
Rationale
POSIX semantics for multithreading are very weak, signal dipositions continue to
be process-wide while signal masking is pthread-wide, this means when a signal
arrives to the process PID, any thread can end up handling the signal, this
together with the normal semantics of signals of being processed in whatever
altstack and interrupting code make it difficult to make proper use of it in
places like EIO.
I propose a publish/subscriber way of dealing with signals, a bit inspired by
how Libuv already handles it, users can subscribe to a signal number in a signal
box, this box can be waited on signals and integrates well with EIO Fibers,
blocking and so on.
A signal box (subscription) is always linked to a Switch.t in order to guarantee
the subscription is removed when the handler goes out of scope.
Multithreading is allowed, any Domain can create subscriptions, this makes it
interesting for things like every domain installing a handler for SIGTERM and
then doing a proper finishing when needed, or maybe handling a SIGHUP and
parking everyone until the main domain reconfigures and whatnot.
Signal Numbers
The signals from Sys are in a negative form and the runtime translates them to
the system signal number, this would be ok if Sys exported all signals we want,
but it doesnt, so we have to discover our signal numbers. Sys actually works
with its own idea of a signal number (the negative) as well as the positive (the
system), with some minor inconveniences.
Since not every system has every signal, signals that are not guaranteed to
exist are exposed as an optional type, this forces the user to acknowledge "ok
this is not portable" as well as preventing him from using an invalid number.
The actual number is hidden from the user as a private int, but can be acquired
via Signal.signum_to_int.
Libuv/Luv Implementation
Libuv provides pretty much the publish/subscriber mechanism already, so we just
make use of that, in luv signals are not really processed in the signal handler,
they just schedule something to be processed in the main loop.
Uring Implementation
In uring we implement the publish/subscriber mechanism ourselves, since a signal
can have multiple listeners, we have to be careful to install a new handler when
it's the first and restore the original one when it's the last one being
removed. We do not make sigprocmask dances as they become a nightmare due to
inheritance.
The signal handler in uring must wake the signalbox (an Eio.Stream.t), which
means it must go into the scheduler, which means if we just handled it from the
signal handler, we would end up recursing, which means we wouldn't be able to
grab locks, which means we wouldn't be able to do Eio.Stream.add.
We solve this by using the old pipe trick, the signal handler just writes a byte
to a pipe via Unix.write (must be outside of EIO). We then have an extra Fiber
(like the eventfd listener) running by default that reads from the other end of
the pipe, gets the signal and publishes to all listeners via Eio.Stream.add.
Why Not signalfd
I've actually written one version with signalfd, but it's a pretty terrible API
if you need multithreading, to make usage of it you basically have to mask all
signals on every Domain (which means you have to decide which ones you will mask
beforehand, possibly killing uses of catching SIGSEGV/SIGSTOP). Doing the pipe
trick gives us what we wanted from signalfd and is also portable if we want it
in other backends. I still have the signalfd code in a branch if anyone wants
it.
There's a half-decent rant here:
https://ldpreload.com/blog/signalfd-is-useless
Why Stream.addcanfail
I've added addcanfail to Stream since signals can be lost, we don't want to
block the sender, we also don't want to be forking senders and/or buffering
signals indefinately. Actual POSIX signals can only be pending, there are never
"two SIGINT" to be processed, two SIGINT would be coalesced into one. We do a
bit more, when we introduce the pipe trick we end up buffering signals, which
means two SIGINTS would be two SIGINTS, so we're bound by the pipe buffer size.
Since draining the pipe is not dependant on the user calling anything, it's
virtually impossible to saturate it.
We still need addcanfail since the pipe reader cannot block on publish, imagine
you have multiple signals but the user never reads from one of the boxes, if the
pipe reader would block trying to send, it would not publish anything else.
Worth noting that libuv as well as libevent do the exact same, as it's an old
solution.
Pokes at #321