Fix actor garbage collection by breaking cyclic references (#1064)

* Fix bug in wait_for_pid_to_exit, add test for actor deletion.

* Fix actor garbage collection by breaking cyclic references

* Add test for calling actor method immediately after actor creation.

* Fix bug, must dispatch tasks when workers are killed.

* Fix python test

* Fix cyclic reference problem by creating ActorMethod objects on the fly.

* Try simply increasing the time allowed for many_drivers_test.py.
This commit is contained in:
Stephanie Wang
2017-10-05 00:55:33 -07:00
committed by Robert Nishihara
parent 971becc905
commit aebe9f9374
5 changed files with 66 additions and 22 deletions
+13 -12
View File
@@ -251,6 +251,8 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
# constructor.
exported = []
# Create objects to wrap method invocations. This is done so that we can
# invoke methods with actor.method.remote() instead of actor.method().
class ActorMethod(object):
def __init__(self, actor, method_name, method_signature):
self.actor = actor
@@ -307,14 +309,6 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
self._ray_method_signatures[k] = signature.extract_signature(
v, ignore_first=True)
# Create objects to wrap method invocations. This is done so that
# we can invoke methods with actor.method.remote() instead of
# actor.method().
self._actor_method_invokers = dict()
for k, v in self._ray_actor_methods.items():
self._actor_method_invokers[k] = ActorMethod(
self, k, self._ray_method_signatures[k])
# Do not export the actor class or the actor if run in PYTHON_MODE
# Instead, instantiate the actor locally and add it to
# global_worker's dictionary
@@ -390,10 +384,17 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
"_actor_method_call"]:
return object.__getattribute__(self, attr)
if attr in self._ray_actor_methods.keys():
return self._actor_method_invokers[attr]
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'"
.format(Class, attr))
# We create the ActorMethod on the fly here so that the
# ActorHandle doesn't need a reference to the ActorMethod. The
# ActorMethod has a reference to the ActorHandle and this was
# causing cyclic references which were prevent object
# deallocation from behaving in a predictable manner.
return ActorMethod(self, attr,
self._ray_method_signatures[attr])
else:
# There is no method with this name, so raise an exception.
raise AttributeError("'{}' Actor object has no attribute '{}'"
.format(Class, attr))
def __repr__(self):
return "Actor(" + self._ray_actor_id.hex() + ")"
+1
View File
@@ -118,6 +118,7 @@ def _pid_alive(pid):
"""
try:
os.kill(pid, 0)
return True
except OSError:
return False
@@ -356,12 +356,6 @@ void handle_actor_worker_connect(LocalSchedulerState *state,
dispatch_actor_task(state, algorithm_state, actor_id);
}
void handle_actor_worker_disconnect(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id) {
remove_actor(algorithm_state, actor_id);
}
/**
* This will add a task to the task queue for an actor. If this is the first
* task being processed for this actor, it is possible that the LocalActorInfo
@@ -1156,6 +1150,18 @@ void handle_worker_removed(LocalSchedulerState *state,
/* Make sure we removed the worker at most once. */
CHECK(num_times_removed <= 1);
/* Attempt to dispatch some tasks because some resources may have freed up. */
dispatch_all_tasks(state, algorithm_state);
}
void handle_actor_worker_disconnect(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id) {
remove_actor(algorithm_state, actor_id);
/* Attempt to dispatch some tasks because some resources may have freed up. */
dispatch_all_tasks(state, algorithm_state);
}
void handle_actor_worker_available(LocalSchedulerState *state,
+36
View File
@@ -5,10 +5,12 @@ from __future__ import print_function
import collections
import random
import numpy as np
import os
import time
import unittest
import ray
import ray.test.test_utils
class ActorAPI(unittest.TestCase):
@@ -279,6 +281,40 @@ class ActorMethods(unittest.TestCase):
ray.worker.cleanup()
def testActorDeletion(self):
ray.init(num_workers=0)
# Make sure that when an actor handles goes out of scope, the actor
# destructor is called.
@ray.remote
class Actor(object):
def getpid(self):
return os.getpid()
a = Actor.remote()
pid = ray.get(a.getpid.remote())
a = None
ray.test.test_utils.wait_for_pid_to_exit(pid)
actors = [Actor.remote() for _ in range(10)]
pids = ray.get([a.getpid.remote() for a in actors])
a = None
actors = None
[ray.test.test_utils.wait_for_pid_to_exit(pid) for pid in pids]
@ray.remote
class Actor(object):
def method(self):
return 1
# Make sure that if we create an actor and call a method on it
# immediately, the actor doesn't get killed before the method is
# called.
self.assertEqual(ray.get(Actor.remote().method.remote()), 1)
ray.worker.cleanup()
def testActorState(self):
ray.init()
@@ -30,10 +30,10 @@ class Actor1(object):
def driver(redis_address, driver_index):
"""The script for driver 0.
"""The script for all drivers.
This driver should create five actors that each use one GPU and some actors
that use no GPUs. After a while, it should exit.
This driver should create five actors that each use one GPU. After a while,
it should exit.
"""
ray.init(redis_address=redis_address)
@@ -44,7 +44,7 @@ def driver(redis_address, driver_index):
for i in range(driver_index - max_concurrent_drivers + 1):
_wait_for_event("DRIVER_{}_DONE".format(i), redis_address)
def try_to_create_actor(actor_class, timeout=100):
def try_to_create_actor(actor_class, timeout=500):
# Try to create an actor, but allow failures while we wait for the
# monitor to release the resources for the removed drivers.
start_time = time.time()