Integrate ref count module into local memory store (#6122)

This commit is contained in:
Eric Liang
2019-11-15 10:52:19 -08:00
committed by GitHub
parent 62cbc043b4
commit 7d33e9949b
10 changed files with 190 additions and 61 deletions
+1 -1
View File
@@ -720,7 +720,7 @@ cdef class CoreWorker:
raylet_socket.encode("ascii"), job_id.native(),
gcs_options.native()[0], log_dir.encode("utf-8"),
node_ip_address.encode("utf-8"), node_manager_port,
task_execution_handler, check_signals, exit_handler))
task_execution_handler, check_signals, exit_handler, True))
def disconnect(self):
with nogil:
+2 -1
View File
@@ -65,7 +65,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[CObjectID] &return_ids,
c_vector[shared_ptr[CRayObject]] *returns) nogil,
CRayStatus() nogil,
void () nogil)
void () nogil,
c_bool ref_counting_enabled)
void Disconnect()
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
+22 -3
View File
@@ -1211,13 +1211,32 @@ def test_direct_call_simple(ray_start_regular):
return x + 1
f_direct = f.options(is_direct_call=True)
print("a")
assert ray.get(f_direct.remote(2)) == 3
print("b")
assert ray.get([f_direct.remote(i) for i in range(100)]) == list(
range(1, 101))
def test_direct_call_refcount(ray_start_regular):
@ray.remote
def f(x):
return x + 1
@ray.remote
def sleep():
time.sleep(.1)
return 1
# Multiple gets should not hang with ref counting enabled.
f_direct = f.options(is_direct_call=True)
x = f_direct.remote(2)
ray.get(x)
ray.get(x)
# Temporary objects should be retained for chained callers.
y = f_direct.remote(sleep.options(is_direct_call=True).remote())
assert ray.get(y) == 2
def test_direct_call_matrix(shutdown_only):
ray.init(object_store_memory=1000 * 1024 * 1024)
@@ -1407,7 +1426,7 @@ def test_direct_actor_recursive(ray_start_regular):
return x * 2
a = Actor._remote(is_direct_call=True)
b = Actor._remote(args=[a], is_direct_call=False)
b = Actor._remote(args=[a], is_direct_call=True)
c = Actor._remote(args=[b], is_direct_call=True)
result = ray.get([c.f.remote(i) for i in range(100)])