[Object Spilling] Introduce SpillWorker & RestoreWorker Pool to avoid IO worker deadlock. (#11885)

This commit is contained in:
SangBin Cho
2020-11-11 18:20:14 -08:00
committed by GitHub
parent de6df51bd2
commit f80d812799
18 changed files with 513 additions and 134 deletions
+4 -4
View File
@@ -86,10 +86,10 @@ from ray.state import (jobs, nodes, actors, objects, timeline,
object_transfer_timeline, cluster_resources,
available_resources) # noqa: E402
from ray.worker import ( # noqa: F401
LOCAL_MODE, SCRIPT_MODE, WORKER_MODE, IO_WORKER_MODE, cancel, connect,
disconnect, get, get_actor, get_gpu_ids, get_resource_ids,
get_dashboard_url, init, is_initialized, put, kill, remote, shutdown,
show_in_dashboard, wait,
LOCAL_MODE, SCRIPT_MODE, WORKER_MODE, RESTORE_WORKER_MODE,
SPILL_WORKER_MODE, cancel, connect, disconnect, get, get_actor,
get_gpu_ids, get_resource_ids, get_dashboard_url, init, is_initialized,
put, kill, remote, shutdown, show_in_dashboard, wait,
) # noqa: E402
import ray.internal # noqa: E402
# We import ray.actor because some code is run in actor.py which initializes
+17 -7
View File
@@ -64,7 +64,8 @@ from ray.includes.common cimport (
TASK_TYPE_ACTOR_TASK,
WORKER_TYPE_WORKER,
WORKER_TYPE_DRIVER,
WORKER_TYPE_IO_WORKER,
WORKER_TYPE_SPILL_WORKER,
WORKER_TYPE_RESTORE_WORKER,
PLACEMENT_STRATEGY_PACK,
PLACEMENT_STRATEGY_SPREAD,
PLACEMENT_STRATEGY_STRICT_PACK,
@@ -604,7 +605,10 @@ cdef c_vector[c_string] spill_objects_handler(
with gil:
object_refs = VectorToObjectRefs(object_ids_to_spill)
try:
urls = external_storage.spill_objects(object_refs)
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER,
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE):
urls = external_storage.spill_objects(object_refs)
for url in urls:
return_urls.push_back(url)
except Exception:
@@ -614,7 +618,7 @@ cdef c_vector[c_string] spill_objects_handler(
logger.exception(exception_str)
ray.utils.push_error_to_driver(
ray.worker.global_worker,
"io_worker_spill_objects_error",
"spill_objects_error",
traceback.format_exc() + exception_str,
job_id=None)
return return_urls
@@ -628,7 +632,10 @@ cdef void restore_spilled_objects_handler(
for i in range(size):
urls.append(object_urls[i])
try:
external_storage.restore_spilled_objects(urls)
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER,
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE):
external_storage.restore_spilled_objects(urls)
except Exception:
exception_str = (
"An unexpected internal error occurred while the IO worker "
@@ -636,7 +643,7 @@ cdef void restore_spilled_objects_handler(
logger.exception(exception_str)
ray.utils.push_error_to_driver(
ray.worker.global_worker,
"io_worker_retore_spilled_objects_error",
"restore_spilled_objects_error",
traceback.format_exc() + exception_str,
job_id=None)
@@ -722,9 +729,12 @@ cdef class CoreWorker:
elif worker_type == ray.WORKER_MODE:
self.is_driver = False
options.worker_type = WORKER_TYPE_WORKER
elif worker_type == ray.IO_WORKER_MODE:
elif worker_type == ray.SPILL_WORKER_MODE:
self.is_driver = False
options.worker_type = WORKER_TYPE_IO_WORKER
options.worker_type = WORKER_TYPE_SPILL_WORKER
elif worker_type == ray.RESTORE_WORKER_MODE:
self.is_driver = False
options.worker_type = WORKER_TYPE_RESTORE_WORKER
else:
raise ValueError(f"Unknown worker type: {worker_type}")
options.language = LANGUAGE_PYTHON
+2 -1
View File
@@ -162,7 +162,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER"
cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER"
cdef CWorkerType WORKER_TYPE_IO_WORKER "ray::WorkerType::IO_WORKER"
cdef CWorkerType WORKER_TYPE_SPILL_WORKER "ray::WorkerType::SPILL_WORKER"
cdef CWorkerType WORKER_TYPE_RESTORE_WORKER "ray::WorkerType::RESTORE_WORKER" # noqa: E501
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK"
+10 -1
View File
@@ -163,7 +163,16 @@ PROCESS_TYPE_WEB_UI = "web_ui"
PROCESS_TYPE_GCS_SERVER = "gcs_server"
WORKER_PROCESS_TYPE_IDLE_WORKER = "ray::IDLE"
WORKER_PROCESS_TYPE_IO_WORKER = "ray::IOWorker"
WORKER_PROCESS_TYPE_SPILL_WORKER_NAME = "SpillWorker"
WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME = "RestoreWorker"
WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE = (
f"ray::IDLE_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}")
WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE = (
f"ray::IDLE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}")
WORKER_PROCESS_TYPE_SPILL_WORKER = (
f"ray::SPILL_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}")
WORKER_PROCESS_TYPE_RESTORE_WORKER = (
f"ray::RESTORE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}")
LOG_MONITOR_MAX_OPEN_FILES = 200
+43 -4
View File
@@ -151,7 +151,8 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only):
x.cmdline()[0] for x in psutil.process_iter(attrs=["cmdline"])
if is_worker(x.info["cmdline"])
]
assert ray.ray_constants.WORKER_PROCESS_TYPE_IO_WORKER in processes
assert (
ray.ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE in processes)
# Spill 2 more objects so we will always have enough space for
# restoring objects back.
@@ -164,6 +165,14 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only):
sample = ray.get(ref)
assert np.array_equal(sample, arr)
# Make sure io workers are spawned with proper name.
processes = [
x.cmdline()[0] for x in psutil.process_iter(attrs=["cmdline"])
if is_worker(x.info["cmdline"])
]
assert (
ray.ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE in processes)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@@ -317,9 +326,6 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
object_store_memory=100 * 1024 * 1024,
_system_config={
"automatic_object_spilling_enabled": True,
# This test will deadlock if only one IO worker is allowed because
# the IO worker will try to restore an object, but this requires
# another object to be spilled, which also requires an IO worker.
"max_io_workers": 2,
"object_spilling_config": object_spilling_config,
},
@@ -341,5 +347,38 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
print(ray.get(x).shape)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_deadlock(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
"automatic_object_spilling_enabled": True,
"object_store_full_max_retries": 4,
"object_store_full_initial_delay_ms": 100,
"object_spilling_config": object_spilling_config,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
# Wait raylet for starting an IO worker.
time.sleep(1)
# Create objects of more than 400 MiB.
for _ in range(50):
ref = None
while ref is None:
ref = ray.put(arr)
replay_buffer.append(ref)
# This is doing random sampling with 50% prob.
if random.randint(0, 9) < 5:
for _ in range(5):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
+11 -5
View File
@@ -53,7 +53,8 @@ from ray.utils import (_random_string, check_oversized_pickle, is_cython,
SCRIPT_MODE = 0
WORKER_MODE = 1
LOCAL_MODE = 2
IO_WORKER_MODE = 3
SPILL_WORKER_MODE = 3
RESTORE_WORKER_MODE = 4
ERROR_KEY_PREFIX = b"Error:"
@@ -1165,7 +1166,7 @@ def connect(node,
worker.redis_client = node.create_redis_client()
# Initialize some fields.
if mode in (WORKER_MODE, IO_WORKER_MODE):
if mode in (WORKER_MODE, RESTORE_WORKER_MODE, SPILL_WORKER_MODE):
# We should not specify the job_id if it's `WORKER_MODE`.
assert job_id is None
job_id = JobID.nil()
@@ -1186,8 +1187,12 @@ def connect(node,
if mode is not SCRIPT_MODE and mode is not LOCAL_MODE and setproctitle:
process_name = ray_constants.WORKER_PROCESS_TYPE_IDLE_WORKER
if mode is IO_WORKER_MODE:
process_name = ray_constants.WORKER_PROCESS_TYPE_IO_WORKER
if mode is SPILL_WORKER_MODE:
process_name = (
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE)
elif mode is RESTORE_WORKER_MODE:
process_name = (
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE)
setproctitle.setproctitle(process_name)
if not isinstance(job_id, JobID):
@@ -1222,7 +1227,8 @@ def connect(node,
import __main__ as main
driver_name = (main.__file__
if hasattr(main, "__file__") else "INTERACTIVE MODE")
elif mode == WORKER_MODE or mode == IO_WORKER_MODE:
elif (mode == WORKER_MODE or mode == SPILL_WORKER_MODE
or mode == RESTORE_WORKER_MODE):
# Check the RedirectOutput key in Redis and based on its value redirect
# worker output and error to their own files.
# This key is set in services.py when Redis is started.
+6 -4
View File
@@ -115,15 +115,17 @@ if __name__ == "__main__":
if args.worker_type == "WORKER":
mode = ray.WORKER_MODE
elif args.worker_type == "IO_WORKER":
mode = ray.IO_WORKER_MODE
elif args.worker_type == "SPILL_WORKER":
mode = ray.SPILL_WORKER_MODE
elif args.worker_type == "RESTORE_WORKER":
mode = ray.RESTORE_WORKER_MODE
else:
raise ValueError("Unknown worker type: " + args.worker_type)
# NOTE(suquark): We must initialize the external storage before we
# connect to raylet. Otherwise we may receive requests before the
# external storage is intialized.
if mode == ray.IO_WORKER_MODE:
if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE:
from ray import external_storage
if args.object_spilling_config:
object_spilling_config = base64.b64decode(
@@ -168,7 +170,7 @@ if __name__ == "__main__":
ray.worker.connect(node, mode=mode)
if mode == ray.WORKER_MODE:
ray.worker.global_worker.main_loop()
elif mode == ray.IO_WORKER_MODE:
elif mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE:
# It is handled by another thread in the C++ core worker.
# We just need to keep the worker alive.
while True: