mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 01:16:06 +08:00
[core] Pin lineage of plasma objects that are still in scope (#7690)
* Fix deadlock in DrainAndShutdown
* Revert "[core] Revert lineage pinning (#7499) (#7692)"
This reverts commit ba86a02b37.
* debug rllib
* debug rllib
* turn on all rllib tests again
* debug rllib
* Fix drain bug, check number of pending tasks
* revert rllib debug
* remove todo
* Trigger rllib tests
* revert rllib debug commit
This commit is contained in:
@@ -154,6 +154,17 @@ def test_background_tasks_with_max_calls(shutdown_only):
|
||||
# wait for g to finish before exiting.
|
||||
ray.get([x[0] for x in nested])
|
||||
|
||||
@ray.remote(max_calls=1, max_retries=0)
|
||||
def f():
|
||||
return os.getpid(), g.remote()
|
||||
|
||||
nested = ray.get([f.remote() for _ in range(10)])
|
||||
while nested:
|
||||
pid, g_id = nested.pop(0)
|
||||
ray.get(g_id)
|
||||
del g_id
|
||||
ray.test_utils.wait_for_pid_to_exit(pid)
|
||||
|
||||
|
||||
def test_fair_queueing(shutdown_only):
|
||||
ray.init(
|
||||
|
||||
@@ -1014,38 +1014,6 @@ def test_eviction(ray_start_cluster):
|
||||
ray.get(dependent_task.remote(obj))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_nodes": 1,
|
||||
"num_cpus": 2,
|
||||
}, {
|
||||
"num_nodes": 2,
|
||||
"num_cpus": 1,
|
||||
}],
|
||||
indirect=True)
|
||||
def test_serialized_id_eviction(ray_start_cluster):
|
||||
@ray.remote
|
||||
def large_object():
|
||||
return np.zeros(10 * 1024 * 1024)
|
||||
|
||||
@ray.remote
|
||||
def get(obj_ids):
|
||||
obj_id = obj_ids[0]
|
||||
assert (isinstance(ray.get(obj_id), np.ndarray))
|
||||
# Wait for the object to be evicted.
|
||||
ray.internal.free(obj_id)
|
||||
while ray.worker.global_worker.core_worker.object_exists(obj_id):
|
||||
time.sleep(1)
|
||||
with pytest.raises(ray.exceptions.UnreconstructableError):
|
||||
ray.get(obj_id)
|
||||
print("get done", obj_ids)
|
||||
|
||||
obj = large_object.remote()
|
||||
result = get.remote([obj])
|
||||
ray.internal.free(obj)
|
||||
ray.get(result)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_nodes": 2,
|
||||
|
||||
Reference in New Issue
Block a user