[Object spilling] Look up the location of the primary raylet from the owner's metadata (#10197)

* Get the primary copy from the owner, python test, some node manager fixes

* fixes and todo

* update

* lint

* fix build
This commit is contained in:
Stephanie Wang
2020-08-20 14:46:59 -07:00
committed by GitHub
parent 0baf992a4f
commit 85e57a7a98
18 changed files with 262 additions and 101 deletions
+1 -1
View File
@@ -1460,7 +1460,7 @@ cdef class CoreWorker:
object_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.ForceSpillObjects(object_ids))
.SpillObjects(object_ids))
def force_restore_spilled_objects(self, object_refs):
cdef c_vector[CObjectID] object_ids
+1 -1
View File
@@ -195,7 +195,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus SetResource(const c_string &resource_name,
const double capacity,
const CClientID &client_Id)
CRayStatus ForceSpillObjects(const c_vector[CObjectID] &object_ids)
CRayStatus SpillObjects(const c_vector[CObjectID] &object_ids)
CRayStatus ForceRestoreSpilledObjects(
const c_vector[CObjectID] &object_ids)
+47
View File
@@ -107,6 +107,53 @@ def test_spill_objects_manually_with_workers(shutdown_only):
assert np.array_equal(restored, arr)
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 0,
"object_store_memory": 75 * 1024 * 1024,
"object_spilling_config": {
"type": "filesystem",
"params": {
"directory_path": "/tmp"
}
},
"_internal_config": json.dumps({
"object_store_full_max_retries": 0,
"max_io_workers": 4,
}),
}],
indirect=True)
def test_spill_remote_object(ray_start_cluster_head):
cluster = ray_start_cluster_head
cluster.add_node(
object_store_memory=75 * 1024 * 1024,
object_spilling_config={
"type": "filesystem",
"params": {
"directory_path": "/tmp"
}
})
@ray.remote
def put():
return np.random.rand(5 * 1024 * 1024) # 40 MB data
# Create 2 objects. Only 1 should fit.
ref = put.remote()
ray.get(ref)
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(put.remote())
time.sleep(1)
# Spill 1 object. The second should now fit.
ray.experimental.force_spill_objects([ref])
ray.get(put.remote())
# TODO(swang): Restoring from the object directory is not yet supported.
# ray.experimental.force_restore_spilled_objects([ref])
# sample = ray.get(ref)
# assert np.array_equal(sample, copy)
@pytest.mark.skip(reason="have not been fully implemented")
def test_spill_objects_automatically(shutdown_only):
# Limit our object store to 75 MiB of memory.