Clean up Ray processes after cluster util exits (#3278)

This commit is contained in:
Richard Liaw
2018-11-13 13:18:12 -08:00
committed by Eric Liang
parent c3a2c7ebed
commit 97f423781b
2 changed files with 22 additions and 4 deletions
+13 -3
View File
@@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import atexit
import logging
import time
@@ -15,7 +16,8 @@ class Cluster(object):
def __init__(self,
initialize_head=False,
connect=False,
head_node_args=None):
head_node_args=None,
shutdown_at_exit=True):
"""Initializes the cluster.
Args:
@@ -24,8 +26,10 @@ class Cluster(object):
connect (bool): If `initialize_head=True` and `connect=True`,
ray.init will be called with the redis address of this cluster
passed in.
head_node_args (kwargs): Arguments to be passed into
head_node_args (dict): Arguments to be passed into
`start_ray_head` via `self.add_node`.
shutdown_at_exit (bool): If True, registers an exit hook
for shutting down all started processes.
"""
self.head_node = None
self.worker_nodes = {}
@@ -41,6 +45,8 @@ class Cluster(object):
ray.init(
redis_address=self.redis_address,
redis_password=redis_password)
if shutdown_at_exit:
atexit.register(self.shutdown)
def add_node(self, **override_kwargs):
"""Adds a node to the local Ray Cluster.
@@ -158,12 +164,16 @@ class Cluster(object):
return nodes
def shutdown(self):
"""Removes all nodes."""
# We create a list here as a copy because `remove_node`
# modifies `self.worker_nodes`.
all_nodes = list(self.worker_nodes)
for node in all_nodes:
self.remove_node(node)
self.remove_node(self.head_node)
if self.head_node is not None:
self.remove_node(self.head_node)
class Node(object):