Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Decentralized multi-agent learning; PR #01 #21421

Merged

Conversation

sven1977
Copy link
Contributor

@sven1977 sven1977 commented Jan 6, 2022

Decentralized multi-agent learning; preparatory PR.

  • Adds asynchronous_parallel_sampling utility fn for asynch algos. Analogous to existing synchronous_parallel_sampling already used by PGTrainer.
  • Adds missing docstrings to some rollout utility functions.
  • Enhances create_colocated via new and more generic create_colocated_actors utility function. This allows users to co-locate any (different) types of actors on the same node.

Why are these changes needed?

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

…decentralized_multi_agent_learning_01

# Conflicts:
#	rllib/policy/policy.py
@sven1977 sven1977 changed the title [RLlib] Decentralized multi-agent learning; PR #01 [WIP; RLlib] Decentralized multi-agent learning; PR #01 Jan 6, 2022
@sven1977 sven1977 changed the title [WIP; RLlib] Decentralized multi-agent learning; PR #01 [RLlib] Decentralized multi-agent learning; PR #01 Jan 10, 2022
…ntralized_multi_agent_learning_01

# Conflicts:
#	rllib/agents/trainer.py
#	rllib/evaluation/rollout_worker.py
#	rllib/policy/policy.py
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few questions and comments. thanks.

@@ -31,6 +35,7 @@
"worker_side_prioritization": True,
"min_iter_time_s": 30,
},
_allow_unknown_configs=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this?

Copy link
Contributor Author

@sven1977 sven1977 Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using Trainer's merge utility. It requires that if the second config (APEX-DDPG's) contains new keys that you set this to True.
Otherwise, it would complain about the new key (e.g. ) not being found in the first config (DDPG's).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 👌

@@ -129,7 +136,8 @@ def execution_plan(workers: WorkerSet, config: dict,
# Create a number of replay buffer actors.
num_replay_buffer_shards = config["optimizer"][
"num_replay_buffer_shards"]
replay_actors = create_colocated(ReplayActor, [

args = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name this replay_actor_args may be clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def drop_colocated(actors):
colocated, non_colocated = split_colocated(actors)
for a in colocated:
a.__ray_terminate__.remote()
return non_colocated


def split_colocated(actors):
localhost = platform.node()
def split_colocated(actors, node=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to allow node param to be None?
the only user of this util provides node parameter when calling this.
also not sure if it's intuitive that split_colocated would split based on the node of the first actor if node param is not specified.
like this behavior feels a bit random?

Copy link
Contributor Author

@sven1977 sven1977 Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this. Should behave backward-compatibly now, with node="localhost" being the default (same behavior as before, where node arg didn't exist and we always tried to locate on localhost).
Also added docstrings.



def try_create_colocated(cls, args, count, kwargs=None, node=None):
kwargs = kwargs or {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we make {} the default value for the argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous, as python will keep that {} dict around, so if you change it in one function call (add key/values to it), the next time you call the function w/o providing the arg, the function will use the altered dict (the one with the added key/value pair).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right, I forgot {} is banned as defaults actually. thanks.

return ok


def try_create_colocated(cls, args, count, kwargs=None, node=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here about node being None? Seems it's a pretty important detail that if node is None, it means we don't care which node these actors are colocated to, as long as they are together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add (better) docstrings to all these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# Whether all shards of the replay buffer must be co-located
# with the learner process (running the execution plan).
# If False, replay shards may be created on different node(s).
"replay_buffer_shards_colocated_with_driver": True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I wonder, why do they need to be on the same node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For APEX, local node is the learner, so data (one in the buffer shards) never has to travel again. I think that's the sole intention here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see I see. to be honest, this doesn't feel like a requirement to me, more like an optimization.
since we don't have viability guarantee from Ray core, If it's up to me, I would choose to do this as a best-effort thing.
like trying to colocate everything, and if that fails, schedule the other rb shards anywhere.
then we don't have the while loop, and this scheduling can finish in at most 2 steps.

it is obviously too big of a change. maybe just add a note/todo somewhere???

as written, I am a little worried a stack may fail with mysterious error message like "fail to schedule RB actors" while there are enough CPUs, just a small head node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: This is nothing new that I introduced here for APEX. We have always forced all replay shards to be located on the driver. This change actually allows users (via setting this new flag to False) to relax this constraint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a comment to explain this more. ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appreciate!

# Maps types to lists of already co-located actors.
ok = [[] for _ in range(len(actor_specs))]
attempt = 1
while attempt < max_attempts:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if these actors don't fit on a same node the first time, why would they fit when we try the second time?
this is a case where we need PACK scheduling it seems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do use "PACK" strategy by default, so this should be ok. But it's a good question: Could still be that ray places the actor on a different node (bundle), no? And then we have to try again. I would love to use a ray tool to force placing an actor on a given node, but I don't think this exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I would love to show core folks this as an example use case when this PR is in.

assert len(remote_kwargs) == len(actors)

# Create a map inside Trainer instance that maps actorss to sets of open
# requests (object refs). This way, we keep track, of which actorss have
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you intentionally writing actorss here and above?
or are they typos?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D definitely typos

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# already been sent how many requests
# (`max_remote_requests_in_flight_per_actor` arg).
if not hasattr(trainer, "_remote_requests_in_flight"):
trainer._remote_requests_in_flight = defaultdict(set)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize this in trainer's init() or setup()?
it is confusing if some other instances are creating _ private members of another class instance.

also, instead of passing entire trainer, why not pass _remote_requests_in_flights dict into this function? so this rollout op doesn't have access to everything that we have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Makes all sense. I'll clean up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

This is only needed in the next PR (where we introduce the AlphaStar agent). In there, I'll set this property up in setup(). 👍


# Return None if nothing ready after the timeout.
if not ready:
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return [] or at least None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return is the same as return None, no? I can add the None to make it more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sven1977 sven1977 requested a review from gjoliver January 12, 2022 10:41
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for all the updates, looks great now.
one more minor suggestion to delete the util func that is not used anymore.
also thanks for digging into the logics for colocated actor scheduling. it's not pretty for sure.

# Whether all shards of the replay buffer must be co-located
# with the learner process (running the execution plan).
# If False, replay shards may be created on different node(s).
"replay_buffer_shards_colocated_with_driver": True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appreciate!

co_located, non_co_located = split_colocated(actors, node=node)
logger.info("Got {} colocated actors of {}".format(len(co_located), count))
for a in non_co_located:
a.__ray_terminate__.remote()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I double checked with clark, they have an open issue to make ray_terminate a public api.



@Deprecated(error=False)
def drop_colocated(actors: List[ActorHandle]) -> List[ActorHandle]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this util? it's not used anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some user is using it somewhere :D
I marked it @Deprecated.

@sven1977 sven1977 merged commit 90c6b10 into ray-project:master Jan 13, 2022
@sven1977 sven1977 deleted the decentralized_multi_agent_learning_01 branch June 2, 2023 20:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants