Miscellaneous bug fixes to throw unreconstructable errors for direct calls (#6245)

* Test cases

* Fix InPlasmaError

* raylet fixes to force errors for direct calls

* Disable lineage logging and task pending checks for direct calls

* move todo

* Clean up tests

* Fix bugs in object store for Contains and Delete

* Use direct call in tests

* Fixes, separate actor creation direct call from normal direct call spec
This commit is contained in:
Stephanie Wang
2019-11-23 15:05:49 -08:00
committed by GitHub
parent c4fa3b3afb
commit d2662fecea
14 changed files with 169 additions and 14 deletions
+104
View File
@@ -799,3 +799,107 @@ def test_fill_object_store_exception(ray_start_cluster_head):
with pytest.raises(ray.exceptions.ObjectStoreFullError):
ray.put(np.zeros(10**8 + 2, dtype=np.uint8))
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_nodes": 1,
"num_cpus": 2,
}, {
"num_nodes": 2,
"num_cpus": 1,
}],
indirect=True)
def test_direct_call_eviction(ray_start_cluster):
@ray.remote
def large_object():
return np.zeros(10 * 1024 * 1024)
large_object = large_object.options(is_direct_call=True)
obj = large_object.remote()
assert (isinstance(ray.get(obj), np.ndarray))
# Evict the object.
ray.internal.free([obj])
while ray.worker.global_worker.core_worker.object_exists(obj):
time.sleep(1)
# ray.get throws an exception.
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(obj)
@ray.remote
def dependent_task(x):
return
dependent_task = dependent_task.options(is_direct_call=True)
# If the object is passed by reference, the task throws an
# exception.
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(dependent_task.remote(obj))
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_nodes": 1,
"num_cpus": 2,
}, {
"num_nodes": 2,
"num_cpus": 1,
}],
indirect=True)
def test_direct_call_serialized_id_eviction(ray_start_cluster):
@ray.remote
def large_object():
return np.zeros(10 * 1024 * 1024)
@ray.remote
def get(obj_ids):
print("get", obj_ids)
obj_id = obj_ids[0]
assert (isinstance(ray.get(obj_id), np.ndarray))
# Evict the object.
ray.internal.free(obj_ids)
while ray.worker.global_worker.core_worker.object_exists(obj_id):
time.sleep(1)
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(obj_id)
print("get done", obj_ids)
large_object = large_object.options(is_direct_call=True)
get = get.options(is_direct_call=True)
obj = large_object.remote()
ray.get(get.remote([obj]))
@pytest.mark.skip(
"Uncomment once eviction errors for serialized IDs are implemented")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_nodes": 2,
"num_cpus": 10,
}, {
"num_nodes": 1,
"num_cpus": 20,
}],
indirect=True)
def test_direct_call_serialized_id(ray_start_cluster):
@ray.remote
def small_object():
# Sleep a bit before creating the object to force a timeout
# at the getter.
time.sleep(1)
return 1
@ray.remote
def get(obj_ids):
print("get", obj_ids)
obj_id = obj_ids[0]
assert ray.get(obj_id) == 1
small_object = small_object.options(is_direct_call=True)
get = get.options(is_direct_call=True)
obj = small_object.remote()
ray.get(get.remote([obj]))