From 4150970226afd6e021eaf80eb8d1e52c4ab70f8f Mon Sep 17 00:00:00 2001 From: Hao Zhang Date: Mon, 4 Jan 2021 23:57:37 -0500 Subject: [PATCH] [Collective][PR 2/6] Driver program declarative interfaces (#12874) * scaffold of the code * some scratch and options change * NCCL mostly done, supporting API#1 * interface 2.1 2.2 scratch * put code into ray and fix some importing issues * add an addtional Rendezvous class to safely meet at named actor * fix some small bugs in nccl_util * some small fix * scaffold of the code * some scratch and options change * NCCL mostly done, supporting API#1 * interface 2.1 2.2 scratch * put code into ray and fix some importing issues * add an addtional Rendezvous class to safely meet at named actor * fix some small bugs in nccl_util * some small fix * add a Backend class to make Backend string more robust * add several useful APIs * add some tests * added allreduce test * fix typos * fix several bugs found via unittests * fix and update torch test * changed back actor * rearange a bit before importing distributed test * add distributed test * remove scratch code * auto-linting * linting 2 * linting 2 * linting 3 * linting 4 * linting 5 * linting 6 * 2.1 2.2 * fix small bugs * minor updates * linting again * auto linting * linting 2 * final linting * Update python/ray/util/collective_utils.py Co-authored-by: Richard Liaw * Update python/ray/util/collective_utils.py Co-authored-by: Richard Liaw * Update python/ray/util/collective_utils.py Co-authored-by: Richard Liaw * added actor test * lint * remove local sh * address most of richard's comments * minor update * remove the actor.option() interface to avoid changes in ray core * minor updates Co-authored-by: YLJALDC Co-authored-by: Richard Liaw --- python/ray/util/collective/collective.py | 73 ++++++++++++++++++- ...reduce_example_declare_collective_group.py | 34 +++++++++ python/ray/util/collective/util.py | 25 +++++++ 3 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 python/ray/util/collective/examples/nccl_allreduce_example_declare_collective_group.py diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index e2263648b..360498425 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -1,5 +1,7 @@ """APIs exposed under the namespace ray.util.collective.""" import logging +import os +from typing import List import numpy as np import ray @@ -124,6 +126,49 @@ def init_collective_group(world_size: int, _group_mgr.create_collective_group(backend, world_size, rank, group_name) +def declare_collective_group(actors, + world_size: int, + ranks: List[int], + backend=types.Backend.NCCL, + group_name: str = "default"): + """Declare a list of actors as a collective group. + + Note: This function should be called in a driver process. + + Args: + actors (list): a list of actors to be set in a collective group. + group_options (dict): a dictionary that contains group_name(str), + world_size(int), rank(list of int, e.g. [0,1] + means the first actor is rank 0, and the second + actor is rank 1), backend(str). + """ + backend = types.Backend(backend) + _check_backend_availability(backend) + + name = "info_" + group_name + try: + ray.get_actor(name) + raise RuntimeError("Trying to initialize a group twice.") + except ValueError: + pass + + if len(ranks) != len(actors): + raise RuntimeError("Each actor should correspond to one rank.") + + if set(ranks) != set(range(len(ranks))): + raise RuntimeError("Rank must be a permutation from 0 to len-1.") + + assert world_size > 0 + assert all(ranks) >= 0 and all(ranks) < world_size + + from ray.util.collective.util import Info + # store the information into a NamedActor that can be accessed later/ + name = "info_" + group_name + actors_id = [a._ray_actor_id for a in actors] + info = Info.options(name=name, lifetime="detached").remote() + ray.wait([info.set_info.remote(actors_id, world_size, ranks, backend)]) + + def destroy_collective_group(group_name: str = "default") -> None: """Destroy a collective group given its group name.""" _check_inside_actor() @@ -342,9 +387,33 @@ def recv(tensor, src_rank: int, group_name: str = "default"): def _check_and_get_group(group_name): """Check the existence and return the group handle.""" _check_inside_actor() + global _group_mgr if not is_group_initialized(group_name): - raise RuntimeError("The collective group '{}' is not " - "initialized in the process.".format(group_name)) + # try loading from remote info store + try: + # if the information is stored in an Info object, + # get and create the group. + name = "info_" + group_name + mgr = ray.get_actor(name=name) + ids, world_size, rank, backend = ray.get(mgr.get_info.remote()) + worker = ray.worker.global_worker + id_ = worker.core_worker.get_actor_id() + r = rank[ids.index(id_)] + _group_mgr.create_collective_group(backend, world_size, r, + group_name) + except ValueError as exc: + # check if this group is initialized using options() + if "collective_group_name" in os.environ and \ + os.environ["collective_group_name"] == group_name: + rank = int(os.environ["collective_rank"]) + world_size = int(os.environ["collective_world_size"]) + backend = os.environ["collective_backend"] + _group_mgr.create_collective_group(backend, world_size, rank, + group_name) + else: + raise RuntimeError( + "The collective group '{}' is not " + "initialized in the process.".format(group_name)) from exc g = _group_mgr.get_group_by_name(group_name) return g diff --git a/python/ray/util/collective/examples/nccl_allreduce_example_declare_collective_group.py b/python/ray/util/collective/examples/nccl_allreduce_example_declare_collective_group.py new file mode 100644 index 000000000..9d0335dba --- /dev/null +++ b/python/ray/util/collective/examples/nccl_allreduce_example_declare_collective_group.py @@ -0,0 +1,34 @@ +import cupy as cp +import ray + +import ray.util.collective as collective + + +@ray.remote(num_gpus=1) +class Worker: + def __init__(self): + self.send = cp.ones((4, ), dtype=cp.float32) + + def compute(self): + collective.allreduce(self.send, "177") + return self.send + + +if __name__ == "__main__": + ray.init(num_gpus=2) + + num_workers = 2 + workers = [] + for i in range(num_workers): + w = Worker.remote() + workers.append(w) + _options = { + "group_name": "177", + "world_size": 2, + "ranks": [0, 1], + "backend": "nccl" + } + collective.declare_collective_group(workers, **_options) + results = ray.get([w.compute.remote() for w in workers]) + print(results) + ray.shutdown() diff --git a/python/ray/util/collective/util.py b/python/ray/util/collective/util.py index e591e9b93..56a80673a 100644 --- a/python/ray/util/collective/util.py +++ b/python/ray/util/collective/util.py @@ -40,3 +40,28 @@ class NCCLUniqueIDStore: logger.warning("The NCCL ID has not been " "set yet for store {}.".format(self.name)) return self.nccl_id + + +@ray.remote +class Info: + """Store the group information created via `declare_collective_group`. + + Note: Should be used as a NamedActor. + """ + + def __init__(self): + self.ids = None + self.world_size = -1 + self.rank = -1 + self.backend = None + + def set_info(self, ids, world_size, rank, backend): + """Store collective information.""" + self.ids = ids + self.world_size = world_size + self.rank = rank + self.backend = backend + + def get_info(self): + """Get previously stored collective information.""" + return self.ids, self.world_size, self.rank, self.backend