[Collective][PR 2/6] Driver program declarative interfaces (#12874)

* scaffold of the code

* some scratch and options change

* NCCL mostly done, supporting API#1

* interface 2.1 2.2 scratch

* put code into ray and fix some importing issues

* add an addtional Rendezvous class to safely meet at named actor

* fix some small bugs in nccl_util

* some small fix

* scaffold of the code

* some scratch and options change

* NCCL mostly done, supporting API#1

* interface 2.1 2.2 scratch

* put code into ray and fix some importing issues

* add an addtional Rendezvous class to safely meet at named actor

* fix some small bugs in nccl_util

* some small fix

* add a Backend class to make Backend string more robust

* add several useful APIs

* add some tests

* added allreduce test

* fix typos

* fix several bugs found via unittests

* fix and update torch test

* changed back actor

* rearange a bit before importing distributed test

* add distributed test

* remove scratch code

* auto-linting

* linting 2

* linting 2

* linting 3

* linting 4

* linting 5

* linting 6

* 2.1 2.2

* fix small bugs

* minor updates

* linting again

* auto linting

* linting 2

* final linting

* Update python/ray/util/collective_utils.py

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* Update python/ray/util/collective_utils.py

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* Update python/ray/util/collective_utils.py

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* added actor test

* lint

* remove local sh

* address most of richard's comments

* minor update

* remove the actor.option() interface to avoid changes in ray core

* minor updates

Co-authored-by: YLJALDC <dal177@ucsd.edu>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Hao Zhang
2021-01-04 23:57:37 -05:00
committed by GitHub
parent c617291b27
commit 4150970226
3 changed files with 130 additions and 2 deletions
+71 -2
View File
@@ -1,5 +1,7 @@
"""APIs exposed under the namespace ray.util.collective."""
import logging
import os
from typing import List
import numpy as np
import ray
@@ -124,6 +126,49 @@ def init_collective_group(world_size: int,
_group_mgr.create_collective_group(backend, world_size, rank, group_name)
def declare_collective_group(actors,
world_size: int,
ranks: List[int],
backend=types.Backend.NCCL,
group_name: str = "default"):
"""Declare a list of actors as a collective group.
Note: This function should be called in a driver process.
Args:
actors (list): a list of actors to be set in a collective group.
group_options (dict): a dictionary that contains group_name(str),
world_size(int), rank(list of int, e.g. [0,1]
means the first actor is rank 0, and the second
actor is rank 1), backend(str).
"""
backend = types.Backend(backend)
_check_backend_availability(backend)
name = "info_" + group_name
try:
ray.get_actor(name)
raise RuntimeError("Trying to initialize a group twice.")
except ValueError:
pass
if len(ranks) != len(actors):
raise RuntimeError("Each actor should correspond to one rank.")
if set(ranks) != set(range(len(ranks))):
raise RuntimeError("Rank must be a permutation from 0 to len-1.")
assert world_size > 0
assert all(ranks) >= 0 and all(ranks) < world_size
from ray.util.collective.util import Info
# store the information into a NamedActor that can be accessed later/
name = "info_" + group_name
actors_id = [a._ray_actor_id for a in actors]
info = Info.options(name=name, lifetime="detached").remote()
ray.wait([info.set_info.remote(actors_id, world_size, ranks, backend)])
def destroy_collective_group(group_name: str = "default") -> None:
"""Destroy a collective group given its group name."""
_check_inside_actor()
@@ -342,9 +387,33 @@ def recv(tensor, src_rank: int, group_name: str = "default"):
def _check_and_get_group(group_name):
"""Check the existence and return the group handle."""
_check_inside_actor()
global _group_mgr
if not is_group_initialized(group_name):
raise RuntimeError("The collective group '{}' is not "
"initialized in the process.".format(group_name))
# try loading from remote info store
try:
# if the information is stored in an Info object,
# get and create the group.
name = "info_" + group_name
mgr = ray.get_actor(name=name)
ids, world_size, rank, backend = ray.get(mgr.get_info.remote())
worker = ray.worker.global_worker
id_ = worker.core_worker.get_actor_id()
r = rank[ids.index(id_)]
_group_mgr.create_collective_group(backend, world_size, r,
group_name)
except ValueError as exc:
# check if this group is initialized using options()
if "collective_group_name" in os.environ and \
os.environ["collective_group_name"] == group_name:
rank = int(os.environ["collective_rank"])
world_size = int(os.environ["collective_world_size"])
backend = os.environ["collective_backend"]
_group_mgr.create_collective_group(backend, world_size, rank,
group_name)
else:
raise RuntimeError(
"The collective group '{}' is not "
"initialized in the process.".format(group_name)) from exc
g = _group_mgr.get_group_by_name(group_name)
return g
@@ -0,0 +1,34 @@
import cupy as cp
import ray
import ray.util.collective as collective
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.send = cp.ones((4, ), dtype=cp.float32)
def compute(self):
collective.allreduce(self.send, "177")
return self.send
if __name__ == "__main__":
ray.init(num_gpus=2)
num_workers = 2
workers = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
_options = {
"group_name": "177",
"world_size": 2,
"ranks": [0, 1],
"backend": "nccl"
}
collective.declare_collective_group(workers, **_options)
results = ray.get([w.compute.remote() for w in workers])
print(results)
ray.shutdown()
+25
View File
@@ -40,3 +40,28 @@ class NCCLUniqueIDStore:
logger.warning("The NCCL ID has not been "
"set yet for store {}.".format(self.name))
return self.nccl_id
@ray.remote
class Info:
"""Store the group information created via `declare_collective_group`.
Note: Should be used as a NamedActor.
"""
def __init__(self):
self.ids = None
self.world_size = -1
self.rank = -1
self.backend = None
def set_info(self, ids, world_size, rank, backend):
"""Store collective information."""
self.ids = ids
self.world_size = world_size
self.rank = rank
self.backend = backend
def get_info(self):
"""Get previously stored collective information."""
return self.ids, self.world_size, self.rank, self.backend