From aebe9f937451bfa10aa0f2a41bafcf4747fb60f0 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 5 Oct 2017 00:55:33 -0700 Subject: [PATCH] Fix actor garbage collection by breaking cyclic references (#1064) * Fix bug in wait_for_pid_to_exit, add test for actor deletion. * Fix actor garbage collection by breaking cyclic references * Add test for calling actor method immediately after actor creation. * Fix bug, must dispatch tasks when workers are killed. * Fix python test * Fix cyclic reference problem by creating ActorMethod objects on the fly. * Try simply increasing the time allowed for many_drivers_test.py. --- python/ray/actor.py | 25 ++++++------- python/ray/test/test_utils.py | 1 + .../local_scheduler_algorithm.cc | 18 ++++++---- test/actor_test.py | 36 +++++++++++++++++++ .../multi_node_tests/many_drivers_test.py | 8 ++--- 5 files changed, 66 insertions(+), 22 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index 7967d17de..36845216b 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -251,6 +251,8 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): # constructor. exported = [] + # Create objects to wrap method invocations. This is done so that we can + # invoke methods with actor.method.remote() instead of actor.method(). class ActorMethod(object): def __init__(self, actor, method_name, method_signature): self.actor = actor @@ -307,14 +309,6 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): self._ray_method_signatures[k] = signature.extract_signature( v, ignore_first=True) - # Create objects to wrap method invocations. This is done so that - # we can invoke methods with actor.method.remote() instead of - # actor.method(). - self._actor_method_invokers = dict() - for k, v in self._ray_actor_methods.items(): - self._actor_method_invokers[k] = ActorMethod( - self, k, self._ray_method_signatures[k]) - # Do not export the actor class or the actor if run in PYTHON_MODE # Instead, instantiate the actor locally and add it to # global_worker's dictionary @@ -390,10 +384,17 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): "_actor_method_call"]: return object.__getattribute__(self, attr) if attr in self._ray_actor_methods.keys(): - return self._actor_method_invokers[attr] - # There is no method with this name, so raise an exception. - raise AttributeError("'{}' Actor object has no attribute '{}'" - .format(Class, attr)) + # We create the ActorMethod on the fly here so that the + # ActorHandle doesn't need a reference to the ActorMethod. The + # ActorMethod has a reference to the ActorHandle and this was + # causing cyclic references which were prevent object + # deallocation from behaving in a predictable manner. + return ActorMethod(self, attr, + self._ray_method_signatures[attr]) + else: + # There is no method with this name, so raise an exception. + raise AttributeError("'{}' Actor object has no attribute '{}'" + .format(Class, attr)) def __repr__(self): return "Actor(" + self._ray_actor_id.hex() + ")" diff --git a/python/ray/test/test_utils.py b/python/ray/test/test_utils.py index adca6b00e..1e0ef174c 100644 --- a/python/ray/test/test_utils.py +++ b/python/ray/test/test_utils.py @@ -118,6 +118,7 @@ def _pid_alive(pid): """ try: os.kill(pid, 0) + return True except OSError: return False diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 7a50e0c4e..4ab796ca2 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -356,12 +356,6 @@ void handle_actor_worker_connect(LocalSchedulerState *state, dispatch_actor_task(state, algorithm_state, actor_id); } -void handle_actor_worker_disconnect(LocalSchedulerState *state, - SchedulingAlgorithmState *algorithm_state, - ActorID actor_id) { - remove_actor(algorithm_state, actor_id); -} - /** * This will add a task to the task queue for an actor. If this is the first * task being processed for this actor, it is possible that the LocalActorInfo @@ -1156,6 +1150,18 @@ void handle_worker_removed(LocalSchedulerState *state, /* Make sure we removed the worker at most once. */ CHECK(num_times_removed <= 1); + + /* Attempt to dispatch some tasks because some resources may have freed up. */ + dispatch_all_tasks(state, algorithm_state); +} + +void handle_actor_worker_disconnect(LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + ActorID actor_id) { + remove_actor(algorithm_state, actor_id); + + /* Attempt to dispatch some tasks because some resources may have freed up. */ + dispatch_all_tasks(state, algorithm_state); } void handle_actor_worker_available(LocalSchedulerState *state, diff --git a/test/actor_test.py b/test/actor_test.py index 612c92157..df4a1ae5a 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -5,10 +5,12 @@ from __future__ import print_function import collections import random import numpy as np +import os import time import unittest import ray +import ray.test.test_utils class ActorAPI(unittest.TestCase): @@ -279,6 +281,40 @@ class ActorMethods(unittest.TestCase): ray.worker.cleanup() + def testActorDeletion(self): + ray.init(num_workers=0) + + # Make sure that when an actor handles goes out of scope, the actor + # destructor is called. + + @ray.remote + class Actor(object): + def getpid(self): + return os.getpid() + + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + a = None + ray.test.test_utils.wait_for_pid_to_exit(pid) + + actors = [Actor.remote() for _ in range(10)] + pids = ray.get([a.getpid.remote() for a in actors]) + a = None + actors = None + [ray.test.test_utils.wait_for_pid_to_exit(pid) for pid in pids] + + @ray.remote + class Actor(object): + def method(self): + return 1 + + # Make sure that if we create an actor and call a method on it + # immediately, the actor doesn't get killed before the method is + # called. + self.assertEqual(ray.get(Actor.remote().method.remote()), 1) + + ray.worker.cleanup() + def testActorState(self): ray.init() diff --git a/test/jenkins_tests/multi_node_tests/many_drivers_test.py b/test/jenkins_tests/multi_node_tests/many_drivers_test.py index ed9b24776..cc73d325b 100644 --- a/test/jenkins_tests/multi_node_tests/many_drivers_test.py +++ b/test/jenkins_tests/multi_node_tests/many_drivers_test.py @@ -30,10 +30,10 @@ class Actor1(object): def driver(redis_address, driver_index): - """The script for driver 0. + """The script for all drivers. - This driver should create five actors that each use one GPU and some actors - that use no GPUs. After a while, it should exit. + This driver should create five actors that each use one GPU. After a while, + it should exit. """ ray.init(redis_address=redis_address) @@ -44,7 +44,7 @@ def driver(redis_address, driver_index): for i in range(driver_index - max_concurrent_drivers + 1): _wait_for_event("DRIVER_{}_DONE".format(i), redis_address) - def try_to_create_actor(actor_class, timeout=100): + def try_to_create_actor(actor_class, timeout=500): # Try to create an actor, but allow failures while we wait for the # monitor to release the resources for the removed drivers. start_time = time.time()