mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 20:36:30 +08:00
[core] Pin lineage of plasma objects that are still in scope (#7499)
* Add a lineage_ref_count to References * Refactor TaskManager to store TaskEntry as a struct * Refactor to fix deadlock between TaskManager and ReferenceCounter Add references to task specs * Pin TaskEntries and References in the lineage of any ObjectIDs in scope * Fix deadlock, convert num_plasma_returns to a set of object IDs * fix unit tests * Feature flag * Do not release lineage for objects that were promoted to plasma * fix build * fix build * Remove num executions * Simplify num return values * Remove unused * doc * Set num returns * Move lineage pinning flag to ReferenceCounter * comments * Fixes * Remove irrelevant test (replaced by ref counting tests)
This commit is contained in:
@@ -1014,38 +1014,6 @@ def test_eviction(ray_start_cluster):
|
||||
ray.get(dependent_task.remote(obj))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_nodes": 1,
|
||||
"num_cpus": 2,
|
||||
}, {
|
||||
"num_nodes": 2,
|
||||
"num_cpus": 1,
|
||||
}],
|
||||
indirect=True)
|
||||
def test_serialized_id_eviction(ray_start_cluster):
|
||||
@ray.remote
|
||||
def large_object():
|
||||
return np.zeros(10 * 1024 * 1024)
|
||||
|
||||
@ray.remote
|
||||
def get(obj_ids):
|
||||
obj_id = obj_ids[0]
|
||||
assert (isinstance(ray.get(obj_id), np.ndarray))
|
||||
# Wait for the object to be evicted.
|
||||
ray.internal.free(obj_id)
|
||||
while ray.worker.global_worker.core_worker.object_exists(obj_id):
|
||||
time.sleep(1)
|
||||
with pytest.raises(ray.exceptions.UnreconstructableError):
|
||||
ray.get(obj_id)
|
||||
print("get done", obj_ids)
|
||||
|
||||
obj = large_object.remote()
|
||||
result = get.remote([obj])
|
||||
ray.internal.free(obj)
|
||||
ray.get(result)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_nodes": 2,
|
||||
|
||||
Reference in New Issue
Block a user