Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aDAG] support buffered input #47272

Merged
merged 33 commits into from
Sep 10, 2024
Merged

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Aug 22, 2024

Why are these changes needed?

Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

  • Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
  • Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.

This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

  • Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
  • When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Q&A

  • What's going to happen if we don't have max_buffered_inputs_?
    • in this case, users can have more than max_buffered_inputs_ in-flight requests without timeout. But when timeout actually occurs, it is very difficult to tell users if this is due to pipeline being full or other errors (such as deadlock). By limiting max_buffered_inputs_ from the dag-level, we can more clearly tell users what's the case. Users can simply increase max_buffered_inputs_ if they want to submit more tasks.
  • why do you raise an exception from a dag when max_buffered_inputs_ is full instead of per channel?
    • I tried this approach at first, but it was difficult to reliably raise a max capacity exception because of many race conditions. It complicated the PR, so I decided to make it simple.
  • why did you choose many channel approach vs additional thread approach?
    • additional thread with python's queue introduces very high overhead (almost 30us to just coordinate synchronization), and the threading sychroniziation has to be implemented in cpp. I thought it is much simpler to use multiple channels instead.

Related issue number

Closes #47097
Closes #43826

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@rkooo567 rkooo567 added the go add ONLY when ready to merge, run all tests label Aug 22, 2024
Comment on lines 511 to 513
if not self._background_thread_started:
self.start()
self._background_thread_started = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need additional threads per channel? Can we just use the caller's thread to do read and write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there's a clever way to make it non-blocking...



# @DeveloperAPI
class BufferedRemoteChannel(ChannelInterface):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is just a draft PR, but wondering why you call it a remote channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intra process channel doesn't have to use this mechanism as it is immediately read/written

self.start()
self._background_thread_started = True

self.queue.put((value, timeout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if buffering at Python would work. I was thinking the buffering probably happens at CPP level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least self.queue.put seems too slow. exploring other approaches now

@rkooo567
Copy link
Contributor Author

@ruisearch42 @kevin85421 the PR is ready for a review

) -> ChannelInterface:
"""Generic actor method to allocate an output channel.

Args:
reader_and_node_list: A list of tuples, where each tuple contains a reader
actor handle and the node ID where the actor is located.
typ: The output type hint for the channel.
num_shm_buffers: The number of shared memory buffer per channel.
It is currently ignored for nccl channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove "currently".
If the variable is called shm buffers, then it should always not affect nccl.

Comment on lines 492 to 494
exhausted. The caller of the API should guarantee to consume buffers
before they are exhausted, otherwise write/read raises
`RayChannelTimeoutError`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe saying something like the following is more accurate:
If the buffer is full for writes or empty for reads and the operation blocks longer than configured timeouts, RayChannelTimeoutError will be raised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated accordingly.

Comment on lines 546 to 547
max_buffered_inputs: The maximum number of in-flight requests that
are allowed to be buffered. Before submitting more requests,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this is not the max number of inflight requests but rather num of buffers per channel. For example, if there are two channels, then the max in-flight requests is at least 2 * channel_num_buffers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I'd like to keep the semantic of max_in_flight_requests. What about

  • we change it to num_in_flight_requests
  • We use num_in_flight_requests * 2 as the num buffer size only for shm channel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be even better if we could avoid adding the new config and just use self._max_buffered_results. They are trying to accomplish the same thing, I believe, just whether we buffer on the input or output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's okay if the max buffered results is conservative, i.e. the DAG has more capacity than what is requested through max buffered results. This can always be improved later.

For now, I would say the best is to determine the number of buffers per channel to use based on the existing max_buffered_results config (which we can rename to something like max_inflight_executions) and additionally add a "global memory limit" for all buffers' sizes, in case the user puts something very high like 1M max in-flight executions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's okay if the max buffered results is conservative, i.e. the DAG has more capacity than what is requested through max buffered results. This can always be improved later.

yeah +1 on this! And the plan we discussed exactly aligns with what you suggested (rename it to max_inflight_executions and set the buffer size the same, which is the most conservative config)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For global limit, let me follow up in other PR actually. I tried, and I think it involves in some logics, so I think it is cleaner to separate it (I think no one is going to increase this so high yet, so it should be okay)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! How about just filing issues for now?

  • global memory limit for buffers
  • dynamically allocating additional buffers

@@ -1542,6 +1562,18 @@ def run(self):
monitor.start()
return monitor

def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = self._execution_index - self._max_execution_index
if num_in_flight_requests > self._max_buffered_inputs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comparison would be more involved:
max_num_in_flight_requests is not max_buffered_inputs, but also related to the number of channels.

See the other comment as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is cleaner with a new name max_num_in_flight_requests

@@ -77,6 +77,7 @@ def create_channel(
self,
writer: Optional["ray.actor.ActorHandle"],
reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]],
num_shm_buffers: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At channel constructor, we can just all it num_buffers to be more general and not referring to a specific implementation of shared memory. We can still assert this is None for nccl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this, I moved it to a constructor arg of SharedMemoryType, so it is only applied to shm channel. I currently don't really see the need of buffer in nccl. We can change it in the future if needed

def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = self._execution_index - self._max_execution_index
if num_in_flight_requests > self._max_buffered_inputs:
raise ray.exceptions.RayChannelBufferAtMaxCapacity(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the condition self.num_in_flight_requests > self._max_buffered_inputs is met? Will it cause a deadlock, or is it possible for the execution to still finish but slower?

If it is still possible to finish, maybe we should consider using a warning message instead of raising an exception. Consider the case where a job has already been running for a long time but finally fails because it executes too frequently in the last few steps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it is the same semantic as before. It can timeout depending on how many channels are available. (this is the problem we want to fix. Basically problem 1 in this doc https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit#heading=h.8j5v3i9ykgsy).

This raises when .execute() is called, so actually we have no problem you mentioned.

# output. The CPU memory overhead per shared memory channel is
# DEFAULT_BUFFER_SIZE_BYTES * DEFAULT_MAX_BUFFERED_INPUTS even when channel is unused.
# There's no additional memory impact on Nccl channels.
DEFAULT_MAX_BUFFERED_INPUTS = int(os.environ.get("RAY_DAG_max_buffered_results", 10))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DEFAULT_MAX_BUFFERED_INPUTS = int(os.environ.get("RAY_DAG_max_buffered_results", 10))
DEFAULT_MAX_BUFFERED_INPUTS = int(os.environ.get("RAY_DAG_max_buffered_inputs", 10))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops good catch

self._channels = [
Channel(writer, reader_and_node_list, typ) for _ in range(num_shm_buffers)
]
self._next_write_index = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments to explain how _next_write_index works? Maybe we could use self.execution_index instead and increment it whenever write or read is called on the writer and reader. When retrieving the buffer, we can use execution_index % num_shm_buffers. Using execution_index will also make debugging easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we should pass self.execution_index from a caller?

@@ -470,39 +471,6 @@ def test_chain_dag(ray_start_regular, num_actors):
compiled_dag.teardown()


def test_execution_timeout(ray_start_regular):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test is not relevant anymore because it relies on that buffering is not possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep it while disabling buffering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not possible to disable buffering. it will raise exception in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test should be done by test_channel.py

def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = self._execution_index - self._max_execution_index
if num_in_flight_requests > self._max_buffered_inputs:
raise ray.exceptions.RayChannelBufferAtMaxCapacity(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it is the same semantic as before. It can timeout depending on how many channels are available. (this is the problem we want to fix. Basically problem 1 in this doc https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit#heading=h.8j5v3i9ykgsy).

This raises when .execute() is called, so actually we have no problem you mentioned.

# output. The CPU memory overhead per shared memory channel is
# DEFAULT_BUFFER_SIZE_BYTES * DEFAULT_MAX_BUFFERED_INPUTS even when channel is unused.
# There's no additional memory impact on Nccl channels.
DEFAULT_MAX_BUFFERED_INPUTS = int(os.environ.get("RAY_DAG_max_buffered_results", 10))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops good catch

Comment on lines 546 to 547
max_buffered_inputs: The maximum number of in-flight requests that
are allowed to be buffered. Before submitting more requests,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I'd like to keep the semantic of max_in_flight_requests. What about

  • we change it to num_in_flight_requests
  • We use num_in_flight_requests * 2 as the num buffer size only for shm channel?

Comment on lines 492 to 494
exhausted. The caller of the API should guarantee to consume buffers
before they are exhausted, otherwise write/read raises
`RayChannelTimeoutError`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated accordingly.

self._channels = [
Channel(writer, reader_and_node_list, typ) for _ in range(num_shm_buffers)
]
self._next_write_index = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we should pass self.execution_index from a caller?

@rkooo567
Copy link
Contributor Author

Discussed offline.

  1. max_buffered_inputs is more like capacity of the dag. I will change the name to max_in_flight_requests. We are using the most conservative capacity heuristic (which is the same number of # of shm buffers) which has no timeout in any case.
  2. num_shm_buffer is specific to shm. I will move logics to SharedMemoryType.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, I agree with the approach of having multiple channels, and it'd be great if we can later allocate them dynamically too. Will take another look once the current comments are addressed!

Comment on lines 546 to 547
max_buffered_inputs: The maximum number of in-flight requests that
are allowed to be buffered. Before submitting more requests,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be even better if we could avoid adding the new config and just use self._max_buffered_results. They are trying to accomplish the same thing, I believe, just whether we buffer on the input or output.

Comment on lines 546 to 547
max_buffered_inputs: The maximum number of in-flight requests that
are allowed to be buffered. Before submitting more requests,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's okay if the max buffered results is conservative, i.e. the DAG has more capacity than what is requested through max buffered results. This can always be improved later.

For now, I would say the best is to determine the number of buffers per channel to use based on the existing max_buffered_results config (which we can rename to something like max_inflight_executions) and additionally add a "global memory limit" for all buffers' sizes, in case the user puts something very high like 1M max in-flight executions.

@@ -1036,6 +1055,7 @@ def _get_or_compile(
self,
reader_and_node_list,
typ=type_hint,
num_shm_buffers=self._max_buffered_inputs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the num buffers only apply for the DAG input or is it to all channels in the DAG?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is applied to all channels. This way, there's a additional benefit that the last stage is not blocked when ray.get is not called.

@@ -109,6 +110,8 @@ def create_channel(
writer: The actor that may write to the channel. None signifies the driver.
reader_and_node_list: A list of tuples, where each tuple contains a reader
actor handle and the node ID where the actor is located.
num_shm_buffers: The number of shared memory buffer per channel.
It is currently ignored for nccl channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding this note, just raise NotImplementedError for NCCL channels.

@@ -480,6 +484,71 @@ def close(self) -> None:
self._worker.core_worker.experimental_channel_set_error(self._reader_ref)


@DeveloperAPI
class BufferedSharedMemoryChannel(ChannelInterface):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a new class instead of extending Channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't really need to inherit any implementation, so I made a new class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I meant just to add directly to Channel class, instead of making a new class or subclass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see. I found it a little cleaner to implement it this way. But no strong opinion if you prefer that way. Let me know if you want me to change this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I see, yeah that makes sense. Initially, I was thinking that having too many different classes gets a bit complicated once we start wanting to wrap or subclass multiple of them. But for now this seems fine, probably we should just refactor ChannelInterface at some point...

@rkooo567 rkooo567 requested a review from a team as a code owner August 28, 2024 06:19
Copy link
Contributor Author

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang @ruisearch42 @kevin85421 every comment addressed. Let's try merging it today if possible!

  • now we are using max_in_flight_requests
  • As we discussed, we set buffer size to max_in_flight_requests, which is the most conservative policy but there's no false positive timeout
  • shm_buffer arg moved to a constructor of SharedMemoryType

Comment on lines 546 to 547
max_buffered_inputs: The maximum number of in-flight requests that
are allowed to be buffered. Before submitting more requests,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's okay if the max buffered results is conservative, i.e. the DAG has more capacity than what is requested through max buffered results. This can always be improved later.

yeah +1 on this! And the plan we discussed exactly aligns with what you suggested (rename it to max_inflight_executions and set the buffer size the same, which is the most conservative config)

Comment on lines 546 to 547
max_buffered_inputs: The maximum number of in-flight requests that
are allowed to be buffered. Before submitting more requests,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For global limit, let me follow up in other PR actually. I tried, and I think it involves in some logics, so I think it is cleaner to separate it (I think no one is going to increase this so high yet, so it should be okay)

@@ -1542,6 +1562,18 @@ def run(self):
monitor.start()
return monitor

def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = self._execution_index - self._max_execution_index
if num_in_flight_requests > self._max_buffered_inputs:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is cleaner with a new name max_num_in_flight_requests

@@ -77,6 +77,7 @@ def create_channel(
self,
writer: Optional["ray.actor.ActorHandle"],
reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]],
num_shm_buffers: Optional[int] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this, I moved it to a constructor arg of SharedMemoryType, so it is only applied to shm channel. I currently don't really see the need of buffer in nccl. We can change it in the future if needed

@@ -480,6 +484,71 @@ def close(self) -> None:
self._worker.core_worker.experimental_channel_set_error(self._reader_ref)


@DeveloperAPI
class BufferedSharedMemoryChannel(ChannelInterface):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see. I found it a little cleaner to implement it this way. But no strong opinion if you prefer that way. Let me know if you want me to change this!

Comment on lines +536 to +537
# A single channel is not supposed to read and write at the same time.
assert self._next_read_index == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: why is the value 0 special here? Should we just check if the write index equals the read index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one channel doesn't call both read/write. So if your channel writes, there's no read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead moving this logic to ensure_registered_as_writer/reader? You can initialize the indices to -1, and check the other index in those methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this, but I feel like it is not the best way to do it because we don't require to call ensure_* before using read/write technically. So I feel like asserting in read/write is actually better. lmk if you disagree with this comment, I will follow up in other PR

self._next_write_index %= self._num_shm_buffers

def read(self, timeout: Optional[float] = None) -> Any:
"""Read a value to a channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: from a channel

ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…project#47320)

When users do read(timeout=0) or write(timeout=0), one may expect it returns immediately if the buffer is readable/writable. It is not true in some cases (I found it from ray-project#47272).

The root cause is that we apply timeout when we acquire headers. When a buffer is writable (its obj buffer is not acquired), it is possible write(timeout=0) fails if there are readers trying to acquire a header because we apply the same timeout when we acquire a header. This PR fixes the isuse by not applying the timeout in this case.

It is okay because acquiring header takes a very short time, and it is immediately released.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
\Based on https://docs.google.com/document/d/1Ka_HFwPBNIY1u3kuroHOSZMEQ8AgwpYciZ4n08HJ0Xc/edit

When there are many in-flight requests (pipelining inputs to the DAG), 2 problems occur.

Input submitter timeout. InputSubmitter.write() waits until the buffer is read from downstream tasks. Since timeout count is started as soon as InputSubmitter.write() is called, when there are many in-flight requests, the later requests are likely to timeout.
Pipeline bubble. Output fetcher doesn’t read the channel until CompiledDagRef.get is called. It means the upstream task (actor 2) has to be blocked until .get is called from a driver although it can execute tasks.
This PR solves the problem by providing multiple buffer per shm channel. Note that the buffering is not supported for nccl yet (we can do it when we overlap compute/comm).

Main changes

Introduce BufferedSharedMemoryChannel which allows to create multiple buffers (10 by default). Read/write is done in round robin manner.
When you have more in-flight request than the buffer size, Dag can still have timeout error. To make debugging easy and behavior straightforward, we introduce max_buffered_inputs_ argument. If there are more than max_buffered_inputs_ requests submitted to the dag without ray.get, it immediately raises an exception.

Signed-off-by: ujjawal-khare <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
4 participants