diff --git a/CMakeLists.txt b/CMakeLists.txt index a6734e62c..4779a03fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -101,7 +101,7 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES") set(ray_file_list "src/ray/thirdparty/redis/src/redis-server" "src/ray/gcs/redis_module/libray_redis_module.so" - "src/ray/raylet/liblocal_scheduler_library_python.so" + "src/ray/raylet/libraylet_library_python.so" "src/ray/raylet/raylet_monitor" "src/ray/raylet/raylet") @@ -128,8 +128,8 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES") add_dependencies(copy_ray copy_ray_files) # Make sure that the Python extensions are built before copying the files. - get_local_scheduler_library("python" LOCAL_SCHEDULER_LIBRARY_PYTHON) - add_dependencies(copy_ray ${LOCAL_SCHEDULER_LIBRARY_PYTHON}) + get_raylet_library("python" RAYLET_LIBRARY_PYTHON) + add_dependencies(copy_ray ${RAYLET_LIBRARY_PYTHON}) foreach(file ${ray_file_list}) add_custom_command(TARGET copy_ray POST_BUILD @@ -146,8 +146,8 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES") endif() if ("${CMAKE_RAY_LANG_JAVA}" STREQUAL "YES") - get_local_scheduler_library("java" LOCAL_SCHEDULER_LIBRARY_JAVA) - add_dependencies(copy_ray ${LOCAL_SCHEDULER_LIBRARY_JAVA}) + get_raylet_library("java" RAYLET_LIBRARY_JAVA) + add_dependencies(copy_ray ${RAYLET_LIBRARY_JAVA}) # copy libplasma_java files add_custom_command(TARGET copy_ray POST_BUILD diff --git a/java/prepare.sh b/java/prepare.sh index 9554e500a..dcb325c4b 100755 --- a/java/prepare.sh +++ b/java/prepare.sh @@ -50,7 +50,7 @@ declare -a nativeBinaries=( declare -a nativeLibraries=( "./src/ray/gcs/redis_module/libray_redis_module.so" - "./src/ray/raylet/liblocal_scheduler_library_java.*" + "./src/ray/raylet/libraylet_library_java.*" "./src/plasma/libplasma_java.*" "./src/ray/raylet/*lib.a" ) diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 2036f2319..fd88bde35 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -61,7 +61,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { // Load native libraries. try { resetLibaryPath(); - System.loadLibrary("local_scheduler_library_java"); + System.loadLibrary("raylet_library_java"); System.loadLibrary("plasma_java"); } catch (Exception e) { LOGGER.error("Failed to load native libraries.", e); diff --git a/python/ray/actor.py b/python/ray/actor.py index 41c0d8205..66fecb118 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -791,7 +791,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, # Disconnect the worker from the local scheduler. The point of # this is so that when the worker kills itself below, the local # scheduler won't push an error message to the driver. - worker.local_scheduler_client.disconnect() + worker.raylet_client.disconnect() sys.exit(0) assert False, "This process should have terminated." @@ -832,8 +832,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, # the local scheduler will not be included, and may not be runnable # on checkpoint resumption. actor_id = ray.ObjectID(worker.actor_id) - frontier = worker.local_scheduler_client.get_actor_frontier( - actor_id) + frontier = worker.raylet_client.get_actor_frontier(actor_id) # Save the checkpoint in Redis. TODO(rkn): Checkpoints # should not be stored in Redis. Fix this. set_actor_checkpoint(worker, worker.actor_id, checkpoint_index, @@ -863,7 +862,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, # Set the number of tasks executed so far. worker.actor_task_counter = checkpoint_index # Set the actor frontier in the local scheduler. - worker.local_scheduler_client.set_actor_frontier(frontier) + worker.raylet_client.set_actor_frontier(frontier) checkpoint_resumed = True return checkpoint_resumed diff --git a/python/ray/experimental/sgd/util.py b/python/ray/experimental/sgd/util.py index d8cc2d8f0..57549d2df 100644 --- a/python/ray/experimental/sgd/util.py +++ b/python/ray/experimental/sgd/util.py @@ -36,10 +36,10 @@ def warmup(): def fetch(oids): - local_sched_client = ray.worker.global_worker.local_scheduler_client + raylet_client = ray.worker.global_worker.raylet_client for o in oids: ray_obj_id = ray.ObjectID(o) - local_sched_client.fetch_or_reconstruct([ray_obj_id], True) + raylet_client.fetch_or_reconstruct([ray_obj_id], True) def run_timeline(sess, ops, feed_dict=None, write_timeline=False, name=""): diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 777297431..c4acbbbf6 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -42,4 +42,4 @@ def free(object_ids, local_only=False, worker=None): if len(object_ids) == 0: return - worker.local_scheduler_client.free(object_ids, local_only) + worker.raylet_client.free_objects(object_ids, local_only) diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 8cdd8296e..d57d827cd 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -119,7 +119,7 @@ class Profiler(object): else: component_type = "driver" - self.worker.local_scheduler_client.push_profile_events( + self.worker.raylet_client.push_profile_events( component_type, ray.ObjectID(self.worker.worker_id), self.worker.node_ip_address, events) diff --git a/python/ray/raylet/__init__.py b/python/ray/raylet/__init__.py index 8757f5974..69545f5c6 100644 --- a/python/ray/raylet/__init__.py +++ b/python/ray/raylet/__init__.py @@ -2,12 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from ray.core.src.ray.raylet.liblocal_scheduler_library_python import ( - Task, LocalSchedulerClient, ObjectID, check_simple_value, compute_task_id, +from ray.core.src.ray.raylet.libraylet_library_python import ( + Task, RayletClient, ObjectID, check_simple_value, compute_task_id, task_from_string, task_to_string, _config, common_error) __all__ = [ - "Task", "LocalSchedulerClient", "ObjectID", "check_simple_value", + "Task", "RayletClient", "ObjectID", "check_simple_value", "compute_task_id", "task_from_string", "task_to_string", "start_local_scheduler", "_config", "common_error" ] diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index 701807331..689aa945c 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -39,8 +39,8 @@ class TaskPool(object): for worker, obj_id in self.completed(): plasma_id = ray.pyarrow.plasma.ObjectID(obj_id.id()) - (ray.worker.global_worker.local_scheduler_client. - fetch_or_reconstruct([obj_id], True)) + (ray.worker.global_worker.raylet_client.fetch_or_reconstruct( + [obj_id], True)) self._fetching.append((worker, obj_id)) remaining = [] diff --git a/python/ray/utils.py b/python/ray/utils.py index e75e00672..854393570 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -69,7 +69,7 @@ def push_error_to_driver(worker, if driver_id is None: driver_id = ray_constants.NIL_JOB_ID.id() data = {} if data is None else data - worker.local_scheduler_client.push_error( + worker.raylet_client.push_error( ray.ObjectID(driver_id), error_type, message, time.time()) diff --git a/python/ray/worker.py b/python/ray/worker.py index 0393ccc23..e9a6aac4e 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -455,7 +455,7 @@ class Worker(object): ] for i in range(0, len(object_ids), ray._config.worker_fetch_request_size()): - self.local_scheduler_client.fetch_or_reconstruct( + self.raylet_client.fetch_or_reconstruct( object_ids[i:(i + ray._config.worker_fetch_request_size())], True) @@ -490,7 +490,7 @@ class Worker(object): ray._config.worker_fetch_request_size()) for i in range(0, len(object_ids_to_fetch), fetch_request_size): - self.local_scheduler_client.fetch_or_reconstruct( + self.raylet_client.fetch_or_reconstruct( ray_object_ids_to_fetch[i:( i + fetch_request_size)], False, current_task_id) @@ -511,7 +511,7 @@ class Worker(object): # If there were objects that we weren't able to get locally, # let the local scheduler know that we're now unblocked. - self.local_scheduler_client.notify_unblocked(current_task_id) + self.raylet_client.notify_unblocked(current_task_id) assert len(final_results) == len(object_ids) return final_results @@ -628,7 +628,7 @@ class Worker(object): actor_creation_id, actor_creation_dummy_object_id, actor_id, actor_handle_id, actor_counter, execution_dependencies, resources, placement_resources) - self.local_scheduler_client.submit(task) + self.raylet_client.submit_task(task) return task.returns() @@ -936,7 +936,7 @@ class Worker(object): reached_max_executions = (self.function_actor_manager.get_task_counter( driver_id, function_id.id()) == execution_info.max_calls) if reached_max_executions: - self.local_scheduler_client.disconnect() + self.raylet_client.disconnect() sys.exit(0) def _get_next_task_from_local_scheduler(self): @@ -946,7 +946,7 @@ class Worker(object): A task from the local scheduler. """ with profiling.profile("worker_idle", worker=self): - task = self.local_scheduler_client.get_task() + task = self.raylet_client.get_task() # Automatically restrict the GPUs available to this task. ray.utils.set_cuda_visible_devices(ray.get_gpu_ids()) @@ -982,7 +982,7 @@ def get_gpu_ids(): raise Exception("ray.get_gpu_ids() currently does not work in PYTHON " "MODE.") - all_resource_ids = global_worker.local_scheduler_client.resource_ids() + all_resource_ids = global_worker.raylet_client.resource_ids() assigned_ids = [ resource_id for resource_id, _ in all_resource_ids.get("GPU", []) ] @@ -1010,7 +1010,7 @@ def get_resource_ids(): "ray.get_resource_ids() currently does not work in PYTHON " "MODE.") - return global_worker.local_scheduler_client.resource_ids() + return global_worker.raylet_client.resource_ids() def _webui_url_helper(client): @@ -1733,8 +1733,8 @@ def shutdown(worker=global_worker): will need to reload the module. """ disconnect(worker) - if hasattr(worker, "local_scheduler_client"): - del worker.local_scheduler_client + if hasattr(worker, "raylet_client"): + del worker.raylet_client if hasattr(worker, "plasma_client"): worker.plasma_client.disconnect() @@ -2120,7 +2120,7 @@ def connect(info, # multithreading per worker. worker.multithreading_warned = False - worker.local_scheduler_client = ray.raylet.LocalSchedulerClient( + worker.raylet_client = ray.raylet.RayletClient( raylet_socket, worker.worker_id, is_worker, worker.current_task_id) # Start the import thread @@ -2406,7 +2406,7 @@ def put(value, worker=global_worker): if worker.mode == LOCAL_MODE: # In LOCAL_MODE, ray.put is the identity operation. return value - object_id = worker.local_scheduler_client.compute_put_id( + object_id = worker.raylet_client.compute_put_id( worker.current_task_id, worker.put_index) worker.put_object(object_id, value) worker.put_index += 1 @@ -2486,7 +2486,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): current_task_id = worker.get_current_thread_task_id() timeout = timeout if timeout is not None else 2**30 - ready_ids, remaining_ids = worker.local_scheduler_client.wait( + ready_ids, remaining_ids = worker.raylet_client.wait( object_ids, num_returns, timeout, False, current_task_id) return ready_ids, remaining_ids diff --git a/python/setup.py b/python/setup.py index 680182536..a323501d3 100644 --- a/python/setup.py +++ b/python/setup.py @@ -23,7 +23,7 @@ ray_files = [ "ray/core/src/ray/thirdparty/redis/src/redis-server", "ray/core/src/ray/gcs/redis_module/libray_redis_module.so", "ray/core/src/plasma/plasma_store_server", - "ray/core/src/ray/raylet/liblocal_scheduler_library_python.so", + "ray/core/src/ray/raylet/libraylet_library_python.so", "ray/core/src/ray/raylet/raylet_monitor", "ray/core/src/ray/raylet/raylet", "ray/WebUI.ipynb" ] diff --git a/src/ray/raylet/CMakeLists.txt b/src/ray/raylet/CMakeLists.txt index 2faac13d0..ed202a849 100644 --- a/src/ray/raylet/CMakeLists.txt +++ b/src/ray/raylet/CMakeLists.txt @@ -44,12 +44,12 @@ include_directories(${GCS_FBS_OUTPUT_DIRECTORY}) add_library(rayletlib raylet.cc ${NODE_MANAGER_FBS_OUTPUT_FILES}) target_link_libraries(rayletlib ray_static ${Boost_SYSTEM_LIBRARY}) -add_library(local_scheduler_client STATIC local_scheduler_client.cc) +add_library(raylet_client STATIC raylet_client.cc) # Encode the fact that some things require some autogenerated flatbuffer files # to be created first. add_dependencies(rayletlib gen_gcs_fbs) -add_dependencies(local_scheduler_client gen_gcs_fbs arrow_ep gen_node_manager_fbs) +add_dependencies(raylet_client gen_gcs_fbs arrow_ep gen_node_manager_fbs) add_executable(raylet main.cc) target_link_libraries(raylet rayletlib ${Boost_SYSTEM_LIBRARY} pthread) @@ -61,35 +61,35 @@ install(FILES raylet DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/ray/raylet") -macro(get_local_scheduler_library LANG VAR) - set(${VAR} "local_scheduler_library_${LANG}") +macro(get_raylet_library LANG VAR) + set(${VAR} "raylet_library_${LANG}") endmacro() -macro(set_local_scheduler_library LANG) - get_local_scheduler_library(${LANG} LOCAL_SCHEDULER_LIBRARY_${LANG}) - set(LOCAL_SCHEDULER_LIBRARY_LANG ${LOCAL_SCHEDULER_LIBRARY_${LANG}}) +macro(set_raylet_library LANG) + get_raylet_library(${LANG} RAYLET_LIBRARY_${LANG}) + set(RAYLET_LIBRARY_LANG ${RAYLET_LIBRARY_${LANG}}) - file(GLOB LOCAL_SCHEDULER_LIBRARY_${LANG}_SRC + file(GLOB RAYLET_LIBRARY_${LANG}_SRC lib/${LANG}/*.cc) - add_library(${LOCAL_SCHEDULER_LIBRARY_LANG} SHARED - ${LOCAL_SCHEDULER_LIBRARY_${LANG}_SRC}) + add_library(${RAYLET_LIBRARY_LANG} SHARED + ${RAYLET_LIBRARY_${LANG}_SRC}) if(APPLE) if ("${LANG}" STREQUAL "python") - SET_TARGET_PROPERTIES(${LOCAL_SCHEDULER_LIBRARY_LANG} PROPERTIES SUFFIX .so) + SET_TARGET_PROPERTIES(${RAYLET_LIBRARY_LANG} PROPERTIES SUFFIX .so) endif() - target_link_libraries(${LOCAL_SCHEDULER_LIBRARY_LANG} "-undefined dynamic_lookup" local_scheduler_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY}) + target_link_libraries(${RAYLET_LIBRARY_LANG} "-undefined dynamic_lookup" raylet_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY}) else(APPLE) - target_link_libraries(${LOCAL_SCHEDULER_LIBRARY_LANG} local_scheduler_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY}) + target_link_libraries(${RAYLET_LIBRARY_LANG} raylet_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY}) endif(APPLE) - add_dependencies(${LOCAL_SCHEDULER_LIBRARY_LANG} gen_node_manager_fbs) + add_dependencies(${RAYLET_LIBRARY_LANG} gen_node_manager_fbs) - install(TARGETS ${LOCAL_SCHEDULER_LIBRARY_LANG} DESTINATION ${CMAKE_SOURCE_DIR}/local_scheduler) + install(TARGETS ${RAYLET_LIBRARY_LANG} DESTINATION ${CMAKE_SOURCE_DIR}/raylet) endmacro() if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES") - set_local_scheduler_library("python") + set_raylet_library("python") include_directories("${PYTHON_INCLUDE_DIRS}") include_directories("${NUMPY_INCLUDE_DIR}") endif() @@ -103,5 +103,5 @@ if ("${CMAKE_RAY_LANG_JAVA}" STREQUAL "YES") else() # linux add_compile_options("-I$ENV{JAVA_HOME}/include/linux") endif() - set_local_scheduler_library("java") + set_raylet_library("java") endif() diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 1e62202d7..7725b9bc7 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -118,7 +118,7 @@ table GetTaskReply { } // This struct is used to register a new worker with the local scheduler. -// It is shipped as part of local_scheduler_connect. +// It is shipped as part of raylet_connect. table RegisterClientRequest { // True if the client is a worker and false if the client is a driver. is_worker: bool; diff --git a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc index c3b1b4475..212f91a84 100644 --- a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc +++ b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc @@ -3,7 +3,7 @@ #include #include "ray/id.h" -#include "ray/raylet/local_scheduler_client.h" +#include "ray/raylet/raylet_client.h" #include "ray/util/logging.h" #ifdef __cplusplus @@ -42,10 +42,10 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit( UniqueIdFromJByteArray worker_id(env, workerId); UniqueIdFromJByteArray driver_id(env, driverId); const char *nativeString = env->GetStringUTFChars(sockName, JNI_FALSE); - auto client = LocalSchedulerConnection_init(nativeString, *worker_id.PID, isWorker, - *driver_id.PID, Language::JAVA); + auto raylet_client = new RayletClient(nativeString, *worker_id.PID, isWorker, + *driver_id.PID, Language::JAVA); env->ReleaseStringUTFChars(sockName, nativeString); - return reinterpret_cast(client); + return reinterpret_cast(raylet_client); } /* @@ -56,7 +56,7 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit( JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmitTask( JNIEnv *env, jclass, jlong client, jbyteArray cursorId, jobject taskBuff, jint pos, jint taskSize) { - auto conn = reinterpret_cast(client); + auto raylet_client = reinterpret_cast(client); std::vector execution_dependencies; if (cursorId != nullptr) { @@ -66,7 +66,8 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmit auto data = reinterpret_cast(env->GetDirectBufferAddress(taskBuff)) + pos; ray::raylet::TaskSpecification task_spec(std::string(data, taskSize)); - local_scheduler_submit_raylet(conn, execution_dependencies, task_spec); + auto status = raylet_client->SubmitTask(execution_dependencies, task_spec); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to submit a task to raylet."); } /* @@ -76,10 +77,12 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmit */ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeGetTask( JNIEnv *env, jclass, jlong client) { - auto conn = reinterpret_cast(client); + auto raylet_client = reinterpret_cast(client); // TODO: handle actor failure later - ray::raylet::TaskSpecification *spec = local_scheduler_get_task_raylet(conn); + std::unique_ptr spec; + auto status = raylet_client->GetTask(&spec); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to get a task from raylet."); // We serialize the task specification using flatbuffers and then parse the // resulting string. This awkwardness is due to the fact that the Java @@ -100,7 +103,6 @@ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_native result, 0, task_message->size(), reinterpret_cast(const_cast(task_message->data()))); - delete spec; return result; } @@ -111,9 +113,10 @@ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_native */ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeDestroy( JNIEnv *, jclass, jlong client) { - auto conn = reinterpret_cast(client); - local_scheduler_disconnect_client(conn); - LocalSchedulerConnection_free(conn); + auto raylet_client = reinterpret_cast(client); + RAY_CHECK_OK_PREPEND(raylet_client->Disconnect(), + "[RayletClient] Failed to disconnect."); + delete raylet_client; } /* @@ -135,9 +138,10 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeFetchOrReconstruct( env->DeleteLocalRef(object_id_bytes); } UniqueIdFromJByteArray current_task_id(env, currentTaskId); - auto conn = reinterpret_cast(client); - return local_scheduler_fetch_or_reconstruct(conn, object_ids, fetchOnly, - *current_task_id.PID); + auto raylet_client = reinterpret_cast(client); + auto status = + raylet_client->FetchOrReconstruct(object_ids, fetchOnly, *current_task_id.PID); + return static_cast(status.code()); } /* @@ -148,8 +152,9 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeFetchOrReconstruct( JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeNotifyUnblocked( JNIEnv *env, jclass, jlong client, jbyteArray currentTaskId) { UniqueIdFromJByteArray current_task_id(env, currentTaskId); - auto conn = reinterpret_cast(client); - local_scheduler_notify_unblocked(conn, *current_task_id.PID); + auto raylet_client = reinterpret_cast(client); + auto status = raylet_client->NotifyUnblocked(*current_task_id.PID); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to notify unblocked."); } /* @@ -172,12 +177,14 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeWaitObject( } UniqueIdFromJByteArray current_task_id(env, currentTaskId); - auto conn = reinterpret_cast(client); + auto raylet_client = reinterpret_cast(client); // Invoke wait. - std::pair, std::vector> result = - local_scheduler_wait(conn, object_ids, numReturns, timeoutMillis, - static_cast(isWaitLocal), *current_task_id.PID); + WaitResultPair result; + auto status = + raylet_client->Wait(object_ids, numReturns, timeoutMillis, + static_cast(isWaitLocal), *current_task_id.PID, &result); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to wait for objects."); // Convert result to java object. jboolean put_value = true; @@ -246,8 +253,9 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeFreePlasmaObjects( object_ids.push_back(*object_id.PID); env->DeleteLocalRef(object_id_bytes); } - auto conn = reinterpret_cast(client); - local_scheduler_free_objects_in_object_store(conn, object_ids, localOnly); + auto raylet_client = reinterpret_cast(client); + auto status = raylet_client->FreeObjects(object_ids, localOnly); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to free objects."); } #ifdef __cplusplus diff --git a/src/ray/raylet/lib/python/common_extension.cc b/src/ray/raylet/lib/python/common_extension.cc index f4979620c..2ac9d2caf 100644 --- a/src/ray/raylet/lib/python/common_extension.cc +++ b/src/ray/raylet/lib/python/common_extension.cc @@ -117,7 +117,7 @@ TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size) { * * This is called from Python like * - * task = local_scheduler.task_from_string("...") + * task = raylet.task_from_string("...") * * @param task_string String representation of the task specification. * @return Python task specification object. @@ -142,7 +142,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) { * * This is called from Python like * - * s = local_scheduler.task_to_string(task) + * s = raylet.task_to_string(task) * * @param task Ray task specification Python object. * @return String representing the task specification. diff --git a/src/ray/raylet/lib/python/local_scheduler_extension.cc b/src/ray/raylet/lib/python/raylet_extension.cc similarity index 55% rename from src/ray/raylet/lib/python/local_scheduler_extension.cc rename to src/ray/raylet/lib/python/raylet_extension.cc index 8d0480092..1a6fc9b29 100644 --- a/src/ray/raylet/lib/python/local_scheduler_extension.cc +++ b/src/ray/raylet/lib/python/raylet_extension.cc @@ -3,78 +3,73 @@ #include "common_extension.h" #include "config_extension.h" -#include "ray/raylet/local_scheduler_client.h" +#include "ray/raylet/raylet_client.h" PyObject *LocalSchedulerError; // clang-format off typedef struct { PyObject_HEAD - LocalSchedulerConnection *local_scheduler_connection; -} PyLocalSchedulerClient; + RayletClient *raylet_client; +} PyRayletClient; // clang-format on -static int PyLocalSchedulerClient_init(PyLocalSchedulerClient *self, PyObject *args, - PyObject *kwds) { +static int PyRayletClient_init(PyRayletClient *self, PyObject *args, PyObject *kwds) { char *socket_name; UniqueID client_id; PyObject *is_worker; JobID driver_id; if (!PyArg_ParseTuple(args, "sO&OO&", &socket_name, PyStringToUniqueID, &client_id, &is_worker, &PyObjectToUniqueID, &driver_id)) { - self->local_scheduler_connection = NULL; + self->raylet_client = NULL; return -1; } /* Connect to the local scheduler. */ - self->local_scheduler_connection = LocalSchedulerConnection_init( - socket_name, client_id, static_cast(PyObject_IsTrue(is_worker)), driver_id, - Language::PYTHON); + self->raylet_client = new RayletClient(socket_name, client_id, + static_cast(PyObject_IsTrue(is_worker)), + driver_id, Language::PYTHON); return 0; } -static void PyLocalSchedulerClient_dealloc(PyLocalSchedulerClient *self) { - if (self->local_scheduler_connection != NULL) { - LocalSchedulerConnection_free(self->local_scheduler_connection); +static void PyRayletClient_dealloc(PyRayletClient *self) { + if (self->raylet_client != NULL) { + delete self->raylet_client; } Py_TYPE(self)->tp_free((PyObject *)self); } -static PyObject *PyLocalSchedulerClient_disconnect(PyObject *self) { - local_scheduler_disconnect_client( - ((PyLocalSchedulerClient *)self)->local_scheduler_connection); +static PyObject *PyRayletClient_Disconnect(PyRayletClient *self) { + auto status = self->raylet_client->Disconnect(); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to disconnect."); Py_RETURN_NONE; } -static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) { +static PyObject *PyRayletClient_SubmitTask(PyRayletClient *self, PyObject *args) { PyObject *py_task; if (!PyArg_ParseTuple(args, "O", &py_task)) { return NULL; } - LocalSchedulerConnection *connection = - reinterpret_cast(self)->local_scheduler_connection; PyTask *task = reinterpret_cast(py_task); - - local_scheduler_submit_raylet(connection, *task->execution_dependencies, - *task->task_spec); - + auto status = + self->raylet_client->SubmitTask(*task->execution_dependencies, *task->task_spec); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to submit a task to raylet."); Py_RETURN_NONE; } // clang-format off -static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) { - ray::raylet::TaskSpecification *task_spec; +static PyObject *PyRayletClient_GetTask(PyRayletClient *self) { + std::unique_ptr task_spec; /* Drop the global interpreter lock while we get a task because - * local_scheduler_get_task may block for a long time. */ + * raylet_GetTask may block for a long time. */ Py_BEGIN_ALLOW_THREADS - task_spec = local_scheduler_get_task_raylet( - reinterpret_cast(self)->local_scheduler_connection); + auto status = self->raylet_client->GetTask(&task_spec); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to get a task from raylet."); Py_END_ALLOW_THREADS - return PyTask_make(task_spec); + return PyTask_make(task_spec.release()); } // clang-format on -static PyObject *PyLocalSchedulerClient_fetch_or_reconstruct(PyObject *self, - PyObject *args) { +static PyObject *PyRayletClient_FetchOrReconstruct(PyRayletClient *self, PyObject *args) { PyObject *py_object_ids; PyObject *py_fetch_only; std::vector object_ids; @@ -93,32 +88,31 @@ static PyObject *PyLocalSchedulerClient_fetch_or_reconstruct(PyObject *self, } object_ids.push_back(object_id); } - int ret = local_scheduler_fetch_or_reconstruct( - reinterpret_cast(self)->local_scheduler_connection, - object_ids, fetch_only, current_task_id); - if (ret == 0) { + auto status = + self->raylet_client->FetchOrReconstruct(object_ids, fetch_only, current_task_id); + if (status.ok()) { Py_RETURN_NONE; } else { std::ostringstream stream; - stream << "local_scheduler_fetch_or_reconstruct failed: " - << "local scheduler connection may be closed, " - << "check raylet status. return value: " << ret; + stream << "[RayletClient] FetchOrReconstruct failed: " + << "raylet client may be closed, check raylet status. error message: " + << status.ToString(); PyErr_SetString(CommonError, stream.str().c_str()); Py_RETURN_NONE; } } -static PyObject *PyLocalSchedulerClient_notify_unblocked(PyObject *self, PyObject *args) { +static PyObject *PyRayletClient_NotifyUnblocked(PyRayletClient *self, PyObject *args) { TaskID current_task_id; if (!PyArg_ParseTuple(args, "O&", &PyObjectToUniqueID, ¤t_task_id)) { return NULL; } - local_scheduler_notify_unblocked( - ((PyLocalSchedulerClient *)self)->local_scheduler_connection, current_task_id); + auto status = self->raylet_client->NotifyUnblocked(current_task_id); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to notify unblocked."); Py_RETURN_NONE; } -static PyObject *PyLocalSchedulerClient_compute_put_id(PyObject *self, PyObject *args) { +static PyObject *PyRayletClient_compute_put_id(PyObject *self, PyObject *args) { int put_index; TaskID task_id; if (!PyArg_ParseTuple(args, "O&i", &PyObjectToUniqueID, &task_id, &put_index)) { @@ -128,25 +122,10 @@ static PyObject *PyLocalSchedulerClient_compute_put_id(PyObject *self, PyObject return PyObjectID_make(put_id); } -static PyObject *PyLocalSchedulerClient_gpu_ids(PyObject *self) { - /* Construct a Python list of GPU IDs. */ - std::vector gpu_ids = - ((PyLocalSchedulerClient *)self)->local_scheduler_connection->gpu_ids; - int num_gpu_ids = gpu_ids.size(); - PyObject *gpu_ids_list = PyList_New((Py_ssize_t)num_gpu_ids); - for (int i = 0; i < num_gpu_ids; ++i) { - PyList_SetItem(gpu_ids_list, i, PyLong_FromLong(gpu_ids[i])); - } - return gpu_ids_list; -} - -// NOTE(rkn): This function only makes sense for the raylet code path. -static PyObject *PyLocalSchedulerClient_resource_ids(PyObject *self) { +static PyObject *PyRayletClient_resource_ids(PyRayletClient *self) { // Construct a Python dictionary of resource IDs and resource fractions. PyObject *resource_ids = PyDict_New(); - - for (auto const &resource_info : reinterpret_cast(self) - ->local_scheduler_connection->resource_ids_) { + for (auto const &resource_info : self->raylet_client->GetResourceIDs()) { auto const &resource_name = resource_info.first; auto const &ids_and_fractions = resource_info.second; @@ -171,7 +150,7 @@ static PyObject *PyLocalSchedulerClient_resource_ids(PyObject *self) { return resource_ids; } -static PyObject *PyLocalSchedulerClient_wait(PyObject *self, PyObject *args) { +static PyObject *PyRayletClient_Wait(PyRayletClient *self, PyObject *args) { PyObject *py_object_ids; int num_returns; int64_t timeout_ms; @@ -205,9 +184,10 @@ static PyObject *PyLocalSchedulerClient_wait(PyObject *self, PyObject *args) { } // Invoke wait. - std::pair, std::vector> result = local_scheduler_wait( - reinterpret_cast(self)->local_scheduler_connection, - object_ids, num_returns, timeout_ms, wait_local, current_task_id); + WaitResultPair result; + auto status = self->raylet_client->Wait(object_ids, num_returns, timeout_ms, wait_local, + current_task_id, &result); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to wait for objects."); // Convert result to py object. PyObject *py_found = PyList_New(static_cast(result.first.size())); @@ -221,7 +201,7 @@ static PyObject *PyLocalSchedulerClient_wait(PyObject *self, PyObject *args) { return Py_BuildValue("(OO)", py_found, py_remaining); } -static PyObject *PyLocalSchedulerClient_push_error(PyObject *self, PyObject *args) { +static PyObject *PyRayletClient_PushError(PyRayletClient *self, PyObject *args) { JobID job_id; const char *type; int type_length; @@ -234,11 +214,10 @@ static PyObject *PyLocalSchedulerClient_push_error(PyObject *self, PyObject *arg return NULL; } - local_scheduler_push_error( - reinterpret_cast(self)->local_scheduler_connection, + auto status = self->raylet_client->PushError( job_id, std::string(type, type_length), std::string(error_message, error_message_length), timestamp); - + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to push errors to raylet."); Py_RETURN_NONE; } @@ -258,8 +237,7 @@ int PyBytes_or_PyUnicode_to_string(PyObject *py_string, std::string &out) { return 0; } -static PyObject *PyLocalSchedulerClient_push_profile_events(PyObject *self, - PyObject *args) { +static PyObject *PyRayletClient_PushProfileEvents(PyRayletClient *self, PyObject *args) { const char *component_type; int component_type_length; UniqueID component_id; @@ -331,14 +309,12 @@ static PyObject *PyLocalSchedulerClient_push_profile_events(PyObject *self, profile_info.profile_events.emplace_back(new ProfileEventT(profile_event)); } - local_scheduler_push_profile_events( - reinterpret_cast(self)->local_scheduler_connection, - profile_info); - + auto status = self->raylet_client->PushProfileEvents(profile_info); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to push profile events to raylet."); Py_RETURN_NONE; } -static PyObject *PyLocalSchedulerClient_free(PyObject *self, PyObject *args) { +static PyObject *PyRayletClient_FreeObjects(PyRayletClient *self, PyObject *args) { PyObject *py_object_ids; PyObject *py_local_only; @@ -367,83 +343,80 @@ static PyObject *PyLocalSchedulerClient_free(PyObject *self, PyObject *args) { object_ids.push_back(object_id); } - // Invoke local_scheduler_free_objects_in_object_store. - local_scheduler_free_objects_in_object_store( - reinterpret_cast(self)->local_scheduler_connection, - object_ids, local_only); + // Invoke raylet_FreeObjects. + auto status = self->raylet_client->FreeObjects(object_ids, local_only); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to free objects."); Py_RETURN_NONE; } -static PyMethodDef PyLocalSchedulerClient_methods[] = { - {"disconnect", (PyCFunction)PyLocalSchedulerClient_disconnect, METH_NOARGS, +static PyMethodDef PyRayletClient_methods[] = { + {"disconnect", (PyCFunction)PyRayletClient_Disconnect, METH_NOARGS, "Notify the local scheduler that this client is exiting gracefully."}, - {"submit", (PyCFunction)PyLocalSchedulerClient_submit, METH_VARARGS, + {"submit_task", (PyCFunction)PyRayletClient_SubmitTask, METH_VARARGS, "Submit a task to the local scheduler."}, - {"get_task", (PyCFunction)PyLocalSchedulerClient_get_task, METH_NOARGS, + {"get_task", (PyCFunction)PyRayletClient_GetTask, METH_NOARGS, "Get a task from the local scheduler."}, - {"fetch_or_reconstruct", (PyCFunction)PyLocalSchedulerClient_fetch_or_reconstruct, - METH_VARARGS, "Ask the local scheduler to reconstruct an object."}, - {"notify_unblocked", (PyCFunction)PyLocalSchedulerClient_notify_unblocked, - METH_VARARGS, "Notify the local scheduler that we are unblocked."}, - {"compute_put_id", (PyCFunction)PyLocalSchedulerClient_compute_put_id, METH_VARARGS, + {"fetch_or_reconstruct", (PyCFunction)PyRayletClient_FetchOrReconstruct, METH_VARARGS, + "Ask the local scheduler to reconstruct an object."}, + {"notify_unblocked", (PyCFunction)PyRayletClient_NotifyUnblocked, METH_VARARGS, + "Notify the local scheduler that we are unblocked."}, + {"compute_put_id", (PyCFunction)PyRayletClient_compute_put_id, METH_VARARGS, "Return the object ID for a put call within a task."}, - {"gpu_ids", (PyCFunction)PyLocalSchedulerClient_gpu_ids, METH_NOARGS, - "Get the IDs of the GPUs that are reserved for this client."}, - {"resource_ids", (PyCFunction)PyLocalSchedulerClient_resource_ids, METH_NOARGS, + {"resource_ids", (PyCFunction)PyRayletClient_resource_ids, METH_NOARGS, "Get the IDs of the resources that are reserved for this client."}, - {"wait", (PyCFunction)PyLocalSchedulerClient_wait, METH_VARARGS, + {"wait", (PyCFunction)PyRayletClient_Wait, METH_VARARGS, "Wait for a list of objects to be created."}, - {"push_error", (PyCFunction)PyLocalSchedulerClient_push_error, METH_VARARGS, + {"push_error", (PyCFunction)PyRayletClient_PushError, METH_VARARGS, "Push an error message to the relevant driver."}, - {"push_profile_events", (PyCFunction)PyLocalSchedulerClient_push_profile_events, - METH_VARARGS, "Store some profiling events in the GCS."}, - {"free", (PyCFunction)PyLocalSchedulerClient_free, METH_VARARGS, + {"push_profile_events", (PyCFunction)PyRayletClient_PushProfileEvents, METH_VARARGS, + "Store some profiling events in the GCS."}, + {"free_objects", (PyCFunction)PyRayletClient_FreeObjects, METH_VARARGS, "Free a list of objects from object stores."}, {NULL} /* Sentinel */ }; -static PyTypeObject PyLocalSchedulerClientType = { - PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ - "local_scheduler.LocalSchedulerClient", /* tp_name */ - sizeof(PyLocalSchedulerClient), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor)PyLocalSchedulerClient_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_compare */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - 0, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT, /* tp_flags */ - "LocalSchedulerClient object", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - PyLocalSchedulerClient_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - (initproc)PyLocalSchedulerClient_init, /* tp_init */ - 0, /* tp_alloc */ - PyType_GenericNew, /* tp_new */ +static PyTypeObject PyRayletClientType = { + PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ + "raylet.RayletClient", /* tp_name */ + sizeof(PyRayletClient), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)PyRayletClient_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "RayletClient object", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + PyRayletClient_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)PyRayletClient_init, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ }; -static PyMethodDef local_scheduler_methods[] = { +static PyMethodDef raylet_methods[] = { {"check_simple_value", check_simple_value, METH_VARARGS, "Should the object be passed by value?"}, {"compute_task_id", compute_task_id, METH_VARARGS, @@ -459,14 +432,14 @@ static PyMethodDef local_scheduler_methods[] = { #if PY_MAJOR_VERSION >= 3 static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, - "liblocal_scheduler", /* m_name */ - "A module for the local scheduler.", /* m_doc */ - 0, /* m_size */ - local_scheduler_methods, /* m_methods */ - NULL, /* m_reload */ - NULL, /* m_traverse */ - NULL, /* m_clear */ - NULL, /* m_free */ + "libraylet", /* m_name */ + "A module for the raylet.", /* m_doc */ + 0, /* m_size */ + raylet_methods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ }; #endif @@ -486,7 +459,7 @@ static struct PyModuleDef moduledef = { #define MOD_INIT(name) PyMODINIT_FUNC init##name(void) #endif -MOD_INIT(liblocal_scheduler_library_python) { +MOD_INIT(libraylet_library_python) { if (PyType_Ready(&PyTaskType) < 0) { INITERROR; } @@ -495,7 +468,7 @@ MOD_INIT(liblocal_scheduler_library_python) { INITERROR; } - if (PyType_Ready(&PyLocalSchedulerClientType) < 0) { + if (PyType_Ready(&PyRayletClientType) < 0) { INITERROR; } @@ -506,9 +479,8 @@ MOD_INIT(liblocal_scheduler_library_python) { #if PY_MAJOR_VERSION >= 3 PyObject *m = PyModule_Create(&moduledef); #else - PyObject *m = - Py_InitModule3("liblocal_scheduler_library_python", local_scheduler_methods, - "A module for the local scheduler."); + PyObject *m = Py_InitModule3("libraylet_library_python", raylet_methods, + "A module for the raylet."); #endif init_numpy_module(); @@ -520,8 +492,8 @@ MOD_INIT(liblocal_scheduler_library_python) { Py_INCREF(&PyObjectIDType); PyModule_AddObject(m, "ObjectID", (PyObject *)&PyObjectIDType); - Py_INCREF(&PyLocalSchedulerClientType); - PyModule_AddObject(m, "LocalSchedulerClient", (PyObject *)&PyLocalSchedulerClientType); + Py_INCREF(&PyRayletClientType); + PyModule_AddObject(m, "RayletClient", (PyObject *)&PyRayletClientType); char common_error[] = "common.error"; CommonError = PyErr_NewException(common_error, NULL, NULL); diff --git a/src/ray/raylet/local_scheduler_client.cc b/src/ray/raylet/local_scheduler_client.cc deleted file mode 100644 index 584379090..000000000 --- a/src/ray/raylet/local_scheduler_client.cc +++ /dev/null @@ -1,414 +0,0 @@ -#include "local_scheduler_client.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ray/common/common_protocol.h" -#include "ray/ray_config.h" -#include "ray/raylet/format/node_manager_generated.h" -#include "ray/raylet/task_spec.h" -#include "ray/util/logging.h" - -using MessageType = ray::protocol::MessageType; - -// TODO(rkn): The io methods below should be removed. - -int connect_ipc_sock(const char *socket_pathname) { - struct sockaddr_un socket_address; - int socket_fd; - - socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (socket_fd < 0) { - RAY_LOG(ERROR) << "socket() failed for pathname " << socket_pathname; - return -1; - } - - memset(&socket_address, 0, sizeof(socket_address)); - socket_address.sun_family = AF_UNIX; - if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) { - RAY_LOG(ERROR) << "Socket pathname is too long."; - return -1; - } - strncpy(socket_address.sun_path, socket_pathname, strlen(socket_pathname) + 1); - - if (connect(socket_fd, (struct sockaddr *)&socket_address, sizeof(socket_address)) != - 0) { - close(socket_fd); - return -1; - } - - return socket_fd; -} - -int connect_ipc_sock_retry(const char *socket_pathname, int num_retries, - int64_t timeout) { - /* Pick the default values if the user did not specify. */ - if (num_retries < 0) { - num_retries = RayConfig::instance().num_connect_attempts(); - } - if (timeout < 0) { - timeout = RayConfig::instance().connect_timeout_milliseconds(); - } - - RAY_CHECK(socket_pathname); - int fd = -1; - for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) { - fd = connect_ipc_sock(socket_pathname); - if (fd >= 0) { - break; - } - if (num_attempts > 0) { - RAY_LOG(ERROR) << "Retrying to connect to socket for pathname " << socket_pathname - << " (num_attempts = " << num_attempts - << ", num_retries = " << num_retries << ")"; - } - /* Sleep for timeout milliseconds. */ - usleep(timeout * 1000); - } - /* If we could not connect to the socket, exit. */ - if (fd == -1) { - RAY_LOG(FATAL) << "Could not connect to socket " << socket_pathname; - } - return fd; -} - -int read_bytes(int fd, uint8_t *cursor, size_t length) { - ssize_t nbytes = 0; - /* Termination condition: EOF or read 'length' bytes total. */ - size_t bytesleft = length; - size_t offset = 0; - while (bytesleft > 0) { - nbytes = read(fd, cursor + offset, bytesleft); - if (nbytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - continue; - } - return -1; /* Errno will be set. */ - } else if (0 == nbytes) { - /* Encountered early EOF. */ - return -1; - } - RAY_CHECK(nbytes > 0); - bytesleft -= nbytes; - offset += nbytes; - } - - return 0; -} - -void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) { - int64_t version; - int closed = read_bytes(fd, (uint8_t *)&version, sizeof(version)); - if (closed) { - goto disconnected; - } - RAY_CHECK(version == RayConfig::instance().ray_protocol_version()); - closed = read_bytes(fd, (uint8_t *)type, sizeof(*type)); - if (closed) { - goto disconnected; - } - closed = read_bytes(fd, (uint8_t *)length, sizeof(*length)); - if (closed) { - goto disconnected; - } - *bytes = (uint8_t *)malloc(*length * sizeof(uint8_t)); - closed = read_bytes(fd, *bytes, *length); - if (closed) { - free(*bytes); - goto disconnected; - } - return; - -disconnected: - /* Handle the case in which the socket is closed. */ - *type = static_cast(MessageType::DisconnectClient); - *length = 0; - *bytes = NULL; - return; -} - -int write_bytes(int fd, uint8_t *cursor, size_t length) { - ssize_t nbytes = 0; - size_t bytesleft = length; - size_t offset = 0; - while (bytesleft > 0) { - /* While we haven't written the whole message, write to the file - * descriptor, advance the cursor, and decrease the amount left to write. */ - nbytes = write(fd, cursor + offset, bytesleft); - if (nbytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - continue; - } - return -1; /* Errno will be set. */ - } else if (0 == nbytes) { - /* Encountered early EOF. */ - return -1; - } - RAY_CHECK(nbytes > 0); - bytesleft -= nbytes; - offset += nbytes; - } - - return 0; -} - -int do_write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) { - int64_t version = RayConfig::instance().ray_protocol_version(); - int closed; - closed = write_bytes(fd, (uint8_t *)&version, sizeof(version)); - if (closed) { - return closed; - } - closed = write_bytes(fd, (uint8_t *)&type, sizeof(type)); - if (closed) { - return closed; - } - closed = write_bytes(fd, (uint8_t *)&length, sizeof(length)); - if (closed) { - return closed; - } - closed = write_bytes(fd, bytes, length * sizeof(char)); - if (closed) { - return closed; - } - return 0; -} - -int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes, - std::mutex *mutex) { - if (mutex != NULL) { - std::unique_lock guard(*mutex); - return do_write_message(fd, type, length, bytes); - } else { - return do_write_message(fd, type, length, bytes); - } -} - -LocalSchedulerConnection *LocalSchedulerConnection_init( - const char *local_scheduler_socket, const UniqueID &client_id, bool is_worker, - const JobID &driver_id, const Language &language) { - LocalSchedulerConnection *result = new LocalSchedulerConnection(); - result->conn = connect_ipc_sock_retry(local_scheduler_socket, -1, -1); - - /* Register with the local scheduler. - * NOTE(swang): If the local scheduler exits and we are registered as a - * worker, we will get killed. */ - flatbuffers::FlatBufferBuilder fbb; - auto message = ray::protocol::CreateRegisterClientRequest( - fbb, is_worker, to_flatbuf(fbb, client_id), getpid(), to_flatbuf(fbb, driver_id), - language); - fbb.Finish(message); - /* Register the process ID with the local scheduler. */ - int success = write_message( - result->conn, static_cast(MessageType::RegisterClientRequest), - fbb.GetSize(), fbb.GetBufferPointer(), &result->write_mutex); - RAY_CHECK(success == 0) << "Unable to register worker with local scheduler"; - - return result; -} - -void LocalSchedulerConnection_free(LocalSchedulerConnection *conn) { - close(conn->conn); - delete conn; -} - -void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) { - flatbuffers::FlatBufferBuilder fbb; - auto message = ray::protocol::CreateDisconnectClient(fbb); - fbb.Finish(message); - write_message(conn->conn, - static_cast(MessageType::IntentionalDisconnectClient), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); -} - -void local_scheduler_submit_raylet(LocalSchedulerConnection *conn, - const std::vector &execution_dependencies, - const ray::raylet::TaskSpecification &task_spec) { - flatbuffers::FlatBufferBuilder fbb; - auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); - auto message = ray::protocol::CreateSubmitTaskRequest( - fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb)); - fbb.Finish(message); - write_message(conn->conn, static_cast(MessageType::SubmitTask), fbb.GetSize(), - fbb.GetBufferPointer(), &conn->write_mutex); -} - -ray::raylet::TaskSpecification *local_scheduler_get_task_raylet( - LocalSchedulerConnection *conn) { - int64_t type; - int64_t reply_size; - uint8_t *reply; - { - std::unique_lock guard(conn->mutex); - write_message(conn->conn, static_cast(MessageType::GetTask), 0, NULL, - &conn->write_mutex); - // Receive a task from the local scheduler. This will block until the local - // scheduler gives this client a task. - read_message(conn->conn, &type, &reply_size, &reply); - } - if (type == static_cast(MessageType::DisconnectClient)) { - RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection."; - exit(1); - } - if (type != static_cast(MessageType::ExecuteTask)) { - RAY_LOG(FATAL) << "Problem communicating with raylet from worker: check logs or " - "dmesg for previous errors."; - } - - // Parse the flatbuffer object. - auto reply_message = flatbuffers::GetRoot(reply); - - // Set the resource IDs for this task. - conn->resource_ids_.clear(); - for (size_t i = 0; i < reply_message->fractional_resource_ids()->size(); ++i) { - auto const &fractional_resource_ids = - reply_message->fractional_resource_ids()->Get(i); - auto &acquired_resources = conn->resource_ids_[string_from_flatbuf( - *fractional_resource_ids->resource_name())]; - - size_t num_resource_ids = fractional_resource_ids->resource_ids()->size(); - size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size(); - RAY_CHECK(num_resource_ids == num_resource_fractions); - RAY_CHECK(num_resource_ids > 0); - for (size_t j = 0; j < num_resource_ids; ++j) { - int64_t resource_id = fractional_resource_ids->resource_ids()->Get(j); - double resource_fraction = fractional_resource_ids->resource_fractions()->Get(j); - if (num_resource_ids > 1) { - int64_t whole_fraction = resource_fraction; - RAY_CHECK(whole_fraction == resource_fraction); - } - acquired_resources.push_back(std::make_pair(resource_id, resource_fraction)); - } - } - - ray::raylet::TaskSpecification *task_spec = new ray::raylet::TaskSpecification( - string_from_flatbuf(*reply_message->task_spec())); - - // Free the original message from the local scheduler. - free(reply); - - // Return the copy of the task spec and pass ownership to the caller. - return task_spec; -} - -void local_scheduler_task_done(LocalSchedulerConnection *conn) { - write_message(conn->conn, static_cast(MessageType::TaskDone), 0, NULL, - &conn->write_mutex); -} - -int local_scheduler_fetch_or_reconstruct(LocalSchedulerConnection *conn, - const std::vector &object_ids, - bool fetch_only, const TaskID ¤t_task_id) { - flatbuffers::FlatBufferBuilder fbb; - auto object_ids_message = to_flatbuf(fbb, object_ids); - auto message = ray::protocol::CreateFetchOrReconstruct( - fbb, object_ids_message, fetch_only, to_flatbuf(fbb, current_task_id)); - fbb.Finish(message); - return write_message(conn->conn, static_cast(MessageType::FetchOrReconstruct), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); -} - -void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn, - const TaskID ¤t_task_id) { - flatbuffers::FlatBufferBuilder fbb; - auto message = - ray::protocol::CreateNotifyUnblocked(fbb, to_flatbuf(fbb, current_task_id)); - fbb.Finish(message); - write_message(conn->conn, static_cast(MessageType::NotifyUnblocked), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); -} - -std::pair, std::vector> local_scheduler_wait( - LocalSchedulerConnection *conn, const std::vector &object_ids, - int num_returns, int64_t timeout_milliseconds, bool wait_local, - const TaskID ¤t_task_id) { - // Write request. - flatbuffers::FlatBufferBuilder fbb; - auto message = ray::protocol::CreateWaitRequest( - fbb, to_flatbuf(fbb, object_ids), num_returns, timeout_milliseconds, wait_local, - to_flatbuf(fbb, current_task_id)); - fbb.Finish(message); - int64_t type; - int64_t reply_size; - uint8_t *reply; - { - std::unique_lock guard(conn->mutex); - write_message(conn->conn, - static_cast(ray::protocol::MessageType::WaitRequest), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); - // Read result. - read_message(conn->conn, &type, &reply_size, &reply); - } - if (static_cast(type) != - ray::protocol::MessageType::WaitReply) { - RAY_LOG(FATAL) << "Problem communicating with raylet from worker: check logs or " - "dmesg for previous errors."; - } - auto reply_message = flatbuffers::GetRoot(reply); - // Convert result. - std::pair, std::vector> result; - auto found = reply_message->found(); - for (uint i = 0; i < found->size(); i++) { - ObjectID object_id = ObjectID::from_binary(found->Get(i)->str()); - result.first.push_back(object_id); - } - auto remaining = reply_message->remaining(); - for (uint i = 0; i < remaining->size(); i++) { - ObjectID object_id = ObjectID::from_binary(remaining->Get(i)->str()); - result.second.push_back(object_id); - } - /* Free the original message from the local scheduler. */ - free(reply); - return result; -} - -void local_scheduler_push_error(LocalSchedulerConnection *conn, const JobID &job_id, - const std::string &type, const std::string &error_message, - double timestamp) { - flatbuffers::FlatBufferBuilder fbb; - auto message = ray::protocol::CreatePushErrorRequest( - fbb, to_flatbuf(fbb, job_id), fbb.CreateString(type), - fbb.CreateString(error_message), timestamp); - fbb.Finish(message); - - write_message(conn->conn, - static_cast(ray::protocol::MessageType::PushErrorRequest), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); -} - -void local_scheduler_push_profile_events(LocalSchedulerConnection *conn, - const ProfileTableDataT &profile_events) { - flatbuffers::FlatBufferBuilder fbb; - - auto message = CreateProfileTableData(fbb, &profile_events); - fbb.Finish(message); - - write_message(conn->conn, static_cast( - ray::protocol::MessageType::PushProfileEventsRequest), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); -} - -void local_scheduler_free_objects_in_object_store( - LocalSchedulerConnection *conn, const std::vector &object_ids, - bool local_only) { - flatbuffers::FlatBufferBuilder fbb; - auto message = ray::protocol::CreateFreeObjectsRequest(fbb, local_only, - to_flatbuf(fbb, object_ids)); - fbb.Finish(message); - - int success = write_message( - conn->conn, - static_cast(ray::protocol::MessageType::FreeObjectsInObjectStoreRequest), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); - RAY_CHECK(success == 0) << "Failed to write message to raylet."; -} diff --git a/src/ray/raylet/local_scheduler_client.h b/src/ray/raylet/local_scheduler_client.h deleted file mode 100644 index 0f975de5c..000000000 --- a/src/ray/raylet/local_scheduler_client.h +++ /dev/null @@ -1,185 +0,0 @@ -#ifndef LOCAL_SCHEDULER_CLIENT_H -#define LOCAL_SCHEDULER_CLIENT_H - -#include - -#include "ray/raylet/task_spec.h" - -using ray::ObjectID; -using ray::JobID; -using ray::TaskID; -using ray::ActorID; -using ray::UniqueID; - -struct LocalSchedulerConnection { - /** File descriptor of the Unix domain socket that connects to local - * scheduler. */ - int conn; - /** The IDs of the GPUs that this client can use. NOTE(rkn): This is only used - * by legacy Ray and will be deprecated. */ - std::vector gpu_ids; - /// A map from resource name to the resource IDs that are currently reserved - /// for this worker. Each pair consists of the resource ID and the fraction - /// of that resource allocated for this worker. - std::unordered_map>> resource_ids_; - /// A mutex to protect stateful operations of the local scheduler client. - std::mutex mutex; - /// A mutext to protect write operations of the local scheduler client. - std::mutex write_mutex; -}; - -/** - * Connect to the local scheduler. - * - * @param local_scheduler_socket The name of the socket to use to connect to the - * local scheduler. - * @param worker_id A unique ID to represent the worker. - * @param is_worker Whether this client is a worker. If it is a worker, an - * additional message will be sent to register as one. - * @param driver_id The ID of the driver. This is non-nil if the client is a - * driver. - * @return The connection information. - */ -LocalSchedulerConnection *LocalSchedulerConnection_init( - const char *local_scheduler_socket, const UniqueID &worker_id, bool is_worker, - const JobID &driver_id, const Language &language); - -/** - * Disconnect from the local scheduler. - * - * @param conn Local scheduler connection information returned by - * LocalSchedulerConnection_init. - * @return Void. - */ -void LocalSchedulerConnection_free(LocalSchedulerConnection *conn); - -/// Submit a task using the raylet code path. -/// -/// \param The connection information. -/// \param The execution dependencies. -/// \param The task specification. -/// \return Void. -void local_scheduler_submit_raylet(LocalSchedulerConnection *conn, - const std::vector &execution_dependencies, - const ray::raylet::TaskSpecification &task_spec); - -/** - * Notify the local scheduler that this client is disconnecting gracefully. This - * is used by actors to exit gracefully so that the local scheduler doesn't - * propagate an error message to the driver. - * - * @param conn The connection information. - * @return Void. - */ -void local_scheduler_disconnect_client(LocalSchedulerConnection *conn); - -/// Get next task for this client. This will block until the scheduler assigns -/// a task to this worker. The caller takes ownership of the returned task -/// specification and must free it. -/// -/// \param conn The connection information. -/// \return The assigned task. -ray::raylet::TaskSpecification *local_scheduler_get_task_raylet( - LocalSchedulerConnection *conn); - -/** - * Tell the local scheduler that the client has finished executing a task. - * - * @param conn The connection information. - * @return Void. - */ -void local_scheduler_task_done(LocalSchedulerConnection *conn); - -/** - * Tell the local scheduler to reconstruct or fetch objects. - * - * @param conn The connection information. - * @param object_ids The IDs of the objects to reconstruct. - * @param fetch_only Only fetch objects, do not reconstruct them. - * @param current_task_id The task that needs the objects. - * @return int 0 means correct, other numbers mean error. - */ -int local_scheduler_fetch_or_reconstruct(LocalSchedulerConnection *conn, - const std::vector &object_ids, - bool fetch_only, const TaskID ¤t_task_id); - -/** - * Notify the local scheduler that this client (worker) is no longer blocked. - * - * @param conn The connection information. - * @param current_task_id The task that is no longer blocked. - * @return Void. - */ -void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn, - const TaskID ¤t_task_id); - -// /** -// * Get an actor's current task frontier. -// * -// * @param conn The connection information. -// * @param actor_id The ID of the actor whose frontier is returned. -// * @return A byte vector that can be traversed as an ActorFrontier flatbuffer. -// */ -// const std::vector local_scheduler_get_actor_frontier( -// LocalSchedulerConnection *conn, -// ActorID actor_id); - -// /** -// * Set an actor's current task frontier. -// * -// * @param conn The connection information. -// * @param frontier An ActorFrontier flatbuffer to set the frontier to. -// * @return Void. -// */ -// void local_scheduler_set_actor_frontier(LocalSchedulerConnection *conn, -// const std::vector &frontier); - -/// Wait for the given objects until timeout expires or num_return objects are -/// found. -/// -/// \param conn The connection information. -/// \param object_ids The objects to wait for. -/// \param num_returns The number of objects to wait for. -/// \param timeout_milliseconds Duration, in milliseconds, to wait before -/// returning. -/// \param wait_local Whether to wait for objects to appear on this node. -/// \param current_task_id The task that called wait. -/// \return A pair with the first element containing the object ids that were -/// found, and the second element the objects that were not found. -std::pair, std::vector> local_scheduler_wait( - LocalSchedulerConnection *conn, const std::vector &object_ids, - int num_returns, int64_t timeout_milliseconds, bool wait_local, - const TaskID ¤t_task_id); - -/// Push an error to the relevant driver. -/// -/// \param conn The connection information. -/// \param The ID of the job that the error is for. -/// \param The type of the error. -/// \param The error message. -/// \param The timestamp of the error. -/// \return Void. -void local_scheduler_push_error(LocalSchedulerConnection *conn, const JobID &job_id, - const std::string &type, const std::string &error_message, - double timestamp); - -/// Store some profile events in the GCS. -/// -/// \param conn The connection information. -/// \param profile_events A batch of profiling event information. -/// \return Void. -void local_scheduler_push_profile_events(LocalSchedulerConnection *conn, - const ProfileTableDataT &profile_events); - -/// Free a list of objects from object stores. -/// -/// \param conn The connection information. -/// \param object_ids A list of ObjectsIDs to be deleted. -/// \param local_only Whether keep this request with local object store -/// or send it to all the object stores. -/// \return Void. -void local_scheduler_free_objects_in_object_store( - LocalSchedulerConnection *conn, const std::vector &object_ids, - bool local_only); - -#endif diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc new file mode 100644 index 000000000..884fc1f4f --- /dev/null +++ b/src/ray/raylet/raylet_client.cc @@ -0,0 +1,360 @@ +#include "raylet_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ray/common/common_protocol.h" +#include "ray/ray_config.h" +#include "ray/raylet/format/node_manager_generated.h" +#include "ray/raylet/task_spec.h" +#include "ray/util/logging.h" + +using MessageType = ray::protocol::MessageType; + +// TODO(rkn): The io methods below should be removed. +int connect_ipc_sock(const std::string &socket_pathname) { + struct sockaddr_un socket_address; + int socket_fd; + + socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (socket_fd < 0) { + RAY_LOG(ERROR) << "socket() failed for pathname " << socket_pathname; + return -1; + } + + memset(&socket_address, 0, sizeof(socket_address)); + socket_address.sun_family = AF_UNIX; + if (socket_pathname.length() + 1 > sizeof(socket_address.sun_path)) { + RAY_LOG(ERROR) << "Socket pathname is too long."; + close(socket_fd); + return -1; + } + strncpy(socket_address.sun_path, socket_pathname.c_str(), socket_pathname.length() + 1); + + if (connect(socket_fd, (struct sockaddr *)&socket_address, sizeof(socket_address)) != + 0) { + close(socket_fd); + return -1; + } + return socket_fd; +} + +int read_bytes(int socket_fd, uint8_t *cursor, size_t length) { + ssize_t nbytes = 0; + // Termination condition: EOF or read 'length' bytes total. + size_t bytesleft = length; + size_t offset = 0; + while (bytesleft > 0) { + nbytes = read(socket_fd, cursor + offset, bytesleft); + if (nbytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + continue; + } + return -1; // Errno will be set. + } else if (0 == nbytes) { + // Encountered early EOF. + return -1; + } + RAY_CHECK(nbytes > 0); + bytesleft -= nbytes; + offset += nbytes; + } + return 0; +} + +int write_bytes(int socket_fd, uint8_t *cursor, size_t length) { + ssize_t nbytes = 0; + size_t bytesleft = length; + size_t offset = 0; + while (bytesleft > 0) { + // While we haven't written the whole message, write to the file + // descriptor, advance the cursor, and decrease the amount left to write. + nbytes = write(socket_fd, cursor + offset, bytesleft); + if (nbytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + continue; + } + return -1; // Errno will be set. + } else if (0 == nbytes) { + // Encountered early EOF. + return -1; + } + RAY_CHECK(nbytes > 0); + bytesleft -= nbytes; + offset += nbytes; + } + return 0; +} + +RayletConnection::RayletConnection(const std::string &raylet_socket, int num_retries, + int64_t timeout) { + // Pick the default values if the user did not specify. + if (num_retries < 0) { + num_retries = RayConfig::instance().num_connect_attempts(); + } + if (timeout < 0) { + timeout = RayConfig::instance().connect_timeout_milliseconds(); + } + RAY_CHECK(!raylet_socket.empty()); + conn_ = -1; + for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) { + conn_ = connect_ipc_sock(raylet_socket); + if (conn_ >= 0) break; + if (num_attempts > 0) { + RAY_LOG(ERROR) << "Retrying to connect to socket for pathname " << raylet_socket + << " (num_attempts = " << num_attempts + << ", num_retries = " << num_retries << ")"; + } + // Sleep for timeout milliseconds. + usleep(timeout * 1000); + } + // If we could not connect to the socket, exit. + if (conn_ == -1) { + RAY_LOG(FATAL) << "Could not connect to socket " << raylet_socket; + } +} + +ray::Status RayletConnection::Disconnect() { + flatbuffers::FlatBufferBuilder fbb; + auto message = ray::protocol::CreateDisconnectClient(fbb); + fbb.Finish(message); + auto status = WriteMessage(MessageType::IntentionalDisconnectClient, &fbb); + // Don't be too strict for disconnection errors. + // Just create logs and prevent it from crash. + if (!status.ok()) { + RAY_LOG(ERROR) << status.ToString() + << " [RayletClient] Failed to disconnect from raylet."; + } + return ray::Status::OK(); +} + +ray::Status RayletConnection::ReadMessage(MessageType type, + std::unique_ptr &message) { + int64_t version; + int64_t type_field; + int64_t length; + int closed = read_bytes(conn_, (uint8_t *)&version, sizeof(version)); + if (closed) goto disconnected; + RAY_CHECK(version == RayConfig::instance().ray_protocol_version()); + closed = read_bytes(conn_, (uint8_t *)&type_field, sizeof(type_field)); + if (closed) goto disconnected; + closed = read_bytes(conn_, (uint8_t *)&length, sizeof(length)); + if (closed) goto disconnected; + message = std::unique_ptr(new uint8_t[length]); + closed = read_bytes(conn_, message.get(), length); + if (closed) { + // Handle the case in which the socket is closed. + message.reset(nullptr); + disconnected: + message = nullptr; + type_field = static_cast(MessageType::DisconnectClient); + length = 0; + } + if (type_field == static_cast(MessageType::DisconnectClient)) { + return ray::Status::IOError("[RayletClient] Raylet connection closed."); + } + if (type_field != static_cast(type)) { + return ray::Status::TypeError( + std::string("[RayletClient] Raylet connection corrupted. ") + + "Expected message type: " + std::to_string(static_cast(type)) + + "; got message type: " + std::to_string(type_field) + + ". Check logs or dmesg for previous errors."); + } + return ray::Status::OK(); +} + +ray::Status RayletConnection::WriteMessage(MessageType type, + flatbuffers::FlatBufferBuilder *fbb) { + std::unique_lock guard(write_mutex_); + int64_t version = RayConfig::instance().ray_protocol_version(); + int64_t length = fbb ? fbb->GetSize() : 0; + uint8_t *bytes = fbb ? fbb->GetBufferPointer() : nullptr; + int64_t type_field = static_cast(type); + auto io_error = ray::Status::IOError("[RayletClient] Connection closed unexpectedly."); + int closed; + closed = write_bytes(conn_, (uint8_t *)&version, sizeof(version)); + if (closed) return io_error; + closed = write_bytes(conn_, (uint8_t *)&type_field, sizeof(type_field)); + if (closed) return io_error; + closed = write_bytes(conn_, (uint8_t *)&length, sizeof(length)); + if (closed) return io_error; + closed = write_bytes(conn_, bytes, length * sizeof(char)); + if (closed) return io_error; + return ray::Status::OK(); +} + +ray::Status RayletConnection::AtomicRequestReply( + MessageType request_type, MessageType reply_type, + std::unique_ptr &reply_message, flatbuffers::FlatBufferBuilder *fbb) { + std::unique_lock guard(mutex_); + auto status = WriteMessage(request_type, fbb); + if (!status.ok()) return status; + return ReadMessage(reply_type, reply_message); +} + +RayletClient::RayletClient(const std::string &raylet_socket, const UniqueID &client_id, + bool is_worker, const JobID &driver_id, + const Language &language) + : client_id_(client_id), + is_worker_(is_worker), + driver_id_(driver_id), + language_(language) { + // For C++14, we could use std::make_unique + conn_ = std::unique_ptr(new RayletConnection(raylet_socket, -1, -1)); + + flatbuffers::FlatBufferBuilder fbb; + auto message = ray::protocol::CreateRegisterClientRequest( + fbb, is_worker, to_flatbuf(fbb, client_id), getpid(), to_flatbuf(fbb, driver_id), + language); + fbb.Finish(message); + // Register the process ID with the raylet. + // NOTE(swang): If raylet exits and we are registered as a worker, we will get killed. + auto status = conn_->WriteMessage(MessageType::RegisterClientRequest, &fbb); + RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet."); +} + +ray::Status RayletClient::SubmitTask(const std::vector &execution_dependencies, + const ray::raylet::TaskSpecification &task_spec) { + flatbuffers::FlatBufferBuilder fbb; + auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); + auto message = ray::protocol::CreateSubmitTaskRequest( + fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb)); + fbb.Finish(message); + return conn_->WriteMessage(MessageType::SubmitTask, &fbb); +} + +ray::Status RayletClient::GetTask( + std::unique_ptr *task_spec) { + std::unique_ptr reply; + // Receive a task from the raylet. This will block until the local + // scheduler gives this client a task. + auto status = + conn_->AtomicRequestReply(MessageType::GetTask, MessageType::ExecuteTask, reply); + if (!status.ok()) return status; + // Parse the flatbuffer object. + auto reply_message = flatbuffers::GetRoot(reply.get()); + // Set the resource IDs for this task. + resource_ids_.clear(); + for (size_t i = 0; i < reply_message->fractional_resource_ids()->size(); ++i) { + auto const &fractional_resource_ids = + reply_message->fractional_resource_ids()->Get(i); + auto &acquired_resources = + resource_ids_[string_from_flatbuf(*fractional_resource_ids->resource_name())]; + + size_t num_resource_ids = fractional_resource_ids->resource_ids()->size(); + size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size(); + RAY_CHECK(num_resource_ids == num_resource_fractions); + RAY_CHECK(num_resource_ids > 0); + for (size_t j = 0; j < num_resource_ids; ++j) { + int64_t resource_id = fractional_resource_ids->resource_ids()->Get(j); + double resource_fraction = fractional_resource_ids->resource_fractions()->Get(j); + if (num_resource_ids > 1) { + int64_t whole_fraction = resource_fraction; + RAY_CHECK(whole_fraction == resource_fraction); + } + acquired_resources.push_back(std::make_pair(resource_id, resource_fraction)); + } + } + + // Return the copy of the task spec and pass ownership to the caller. + task_spec->reset(new ray::raylet::TaskSpecification( + string_from_flatbuf(*reply_message->task_spec()))); + return ray::Status::OK(); +} + +ray::Status RayletClient::TaskDone() { + return conn_->WriteMessage(MessageType::TaskDone); +} + +ray::Status RayletClient::FetchOrReconstruct(const std::vector &object_ids, + bool fetch_only, + const TaskID ¤t_task_id) { + flatbuffers::FlatBufferBuilder fbb; + auto object_ids_message = to_flatbuf(fbb, object_ids); + auto message = ray::protocol::CreateFetchOrReconstruct( + fbb, object_ids_message, fetch_only, to_flatbuf(fbb, current_task_id)); + fbb.Finish(message); + auto status = conn_->WriteMessage(MessageType::FetchOrReconstruct, &fbb); + return status; +} + +ray::Status RayletClient::NotifyUnblocked(const TaskID ¤t_task_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + ray::protocol::CreateNotifyUnblocked(fbb, to_flatbuf(fbb, current_task_id)); + fbb.Finish(message); + return conn_->WriteMessage(MessageType::NotifyUnblocked, &fbb); +} + +ray::Status RayletClient::Wait(const std::vector &object_ids, int num_returns, + int64_t timeout_milliseconds, bool wait_local, + const TaskID ¤t_task_id, WaitResultPair *result) { + // Write request. + flatbuffers::FlatBufferBuilder fbb; + auto message = ray::protocol::CreateWaitRequest( + fbb, to_flatbuf(fbb, object_ids), num_returns, timeout_milliseconds, wait_local, + to_flatbuf(fbb, current_task_id)); + fbb.Finish(message); + std::unique_ptr reply; + auto status = conn_->AtomicRequestReply(MessageType::WaitRequest, + MessageType::WaitReply, reply, &fbb); + if (!status.ok()) return status; + // Parse the flatbuffer object. + auto reply_message = flatbuffers::GetRoot(reply.get()); + auto found = reply_message->found(); + for (uint i = 0; i < found->size(); i++) { + ObjectID object_id = ObjectID::from_binary(found->Get(i)->str()); + result->first.push_back(object_id); + } + auto remaining = reply_message->remaining(); + for (uint i = 0; i < remaining->size(); i++) { + ObjectID object_id = ObjectID::from_binary(remaining->Get(i)->str()); + result->second.push_back(object_id); + } + return ray::Status::OK(); +} + +ray::Status RayletClient::PushError(const JobID &job_id, const std::string &type, + const std::string &error_message, double timestamp) { + flatbuffers::FlatBufferBuilder fbb; + auto message = ray::protocol::CreatePushErrorRequest( + fbb, to_flatbuf(fbb, job_id), fbb.CreateString(type), + fbb.CreateString(error_message), timestamp); + fbb.Finish(message); + + return conn_->WriteMessage(MessageType::PushErrorRequest, &fbb); +} + +ray::Status RayletClient::PushProfileEvents(const ProfileTableDataT &profile_events) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreateProfileTableData(fbb, &profile_events); + fbb.Finish(message); + + auto status = conn_->WriteMessage(MessageType::PushProfileEventsRequest, &fbb); + // Don't be too strict for profile errors. Just create logs and prevent it from crash. + if (!status.ok()) { + RAY_LOG(ERROR) << status.ToString() + << " [RayletClient] Failed to push profile events."; + } + return ray::Status::OK(); +} + +ray::Status RayletClient::FreeObjects(const std::vector &object_ids, + bool local_only) { + flatbuffers::FlatBufferBuilder fbb; + auto message = ray::protocol::CreateFreeObjectsRequest(fbb, local_only, + to_flatbuf(fbb, object_ids)); + fbb.Finish(message); + + auto status = conn_->WriteMessage(MessageType::FreeObjectsInObjectStoreRequest, &fbb); + return status; +} diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h new file mode 100644 index 000000000..9d9391971 --- /dev/null +++ b/src/ray/raylet/raylet_client.h @@ -0,0 +1,171 @@ +#ifndef RAYLET_CLIENT_H +#define RAYLET_CLIENT_H + +#include +#include +#include +#include + +#include "ray/raylet/task_spec.h" +#include "ray/status.h" + +using ray::ActorID; +using ray::JobID; +using ray::ObjectID; +using ray::TaskID; +using ray::UniqueID; + +using MessageType = ray::protocol::MessageType; +using ResourceMappingType = + std::unordered_map>>; +using WaitResultPair = std::pair, std::vector>; + +class RayletConnection { + public: + /// Connect to the raylet. + /// + /// \param raylet_socket The name of the socket to use to connect to the raylet. + /// \param worker_id A unique ID to represent the worker. + /// \param is_worker Whether this client is a worker. If it is a worker, an + /// additional message will be sent to register as one. + /// \param driver_id The ID of the driver. This is non-nil if the client is a + /// driver. + /// \return The connection information. + RayletConnection(const std::string &raylet_socket, int num_retries, int64_t timeout); + + ~RayletConnection() { close(conn_); } + /// Notify the raylet that this client is disconnecting gracefully. This + /// is used by actors to exit gracefully so that the raylet doesn't + /// propagate an error message to the driver. + /// + /// \return ray::Status. + ray::Status Disconnect(); + ray::Status ReadMessage(MessageType type, std::unique_ptr &message); + ray::Status WriteMessage(MessageType type, + flatbuffers::FlatBufferBuilder *fbb = nullptr); + ray::Status AtomicRequestReply(MessageType request_type, MessageType reply_type, + std::unique_ptr &reply_message, + flatbuffers::FlatBufferBuilder *fbb = nullptr); + + private: + /// File descriptor of the Unix domain socket that connects to raylet. + int conn_; + /// A mutex to protect stateful operations of the raylet client. + std::mutex mutex_; + /// A mutex to protect write operations of the raylet client. + std::mutex write_mutex_; +}; + +class RayletClient { + public: + /// Connect to the raylet. + /// + /// \param raylet_socket The name of the socket to use to connect to the raylet. + /// \param worker_id A unique ID to represent the worker. + /// \param is_worker Whether this client is a worker. If it is a worker, an + /// additional message will be sent to register as one. + /// \param driver_id The ID of the driver. This is non-nil if the client is a driver. + /// \return The connection information. + RayletClient(const std::string &raylet_socket, const UniqueID &client_id, + bool is_worker, const JobID &driver_id, const Language &language); + + ray::Status Disconnect() { return conn_->Disconnect(); }; + + /// Submit a task using the raylet code path. + /// + /// \param The execution dependencies. + /// \param The task specification. + /// \return ray::Status. + ray::Status SubmitTask(const std::vector &execution_dependencies, + const ray::raylet::TaskSpecification &task_spec); + + /// Get next task for this client. This will block until the scheduler assigns + /// a task to this worker. The caller takes ownership of the returned task + /// specification and must free it. + /// + /// \param task_spec The assigned task. + /// \return ray::Status. + ray::Status GetTask(std::unique_ptr *task_spec); + + /// Tell the raylet that the client has finished executing a task. + /// + /// \return ray::Status. + ray::Status TaskDone(); + + /// Tell the raylet to reconstruct or fetch objects. + /// + /// \param object_ids The IDs of the objects to reconstruct. + /// \param fetch_only Only fetch objects, do not reconstruct them. + /// \param current_task_id The task that needs the objects. + /// \return int 0 means correct, other numbers mean error. + ray::Status FetchOrReconstruct(const std::vector &object_ids, bool fetch_only, + const TaskID ¤t_task_id); + /// Notify the raylet that this client (worker) is no longer blocked. + /// + /// \param current_task_id The task that is no longer blocked. + /// \return ray::Status. + ray::Status NotifyUnblocked(const TaskID ¤t_task_id); + + /// Wait for the given objects until timeout expires or num_return objects are + /// found. + /// + /// \param object_ids The objects to wait for. + /// \param num_returns The number of objects to wait for. + /// \param timeout_milliseconds Duration, in milliseconds, to wait before returning. + /// \param wait_local Whether to wait for objects to appear on this node. + /// \param current_task_id The task that called wait. + /// \param result A pair with the first element containing the object ids that were + /// found, and the second element the objects that were not found. + /// \return ray::Status. + ray::Status Wait(const std::vector &object_ids, int num_returns, + int64_t timeout_milliseconds, bool wait_local, + const TaskID ¤t_task_id, WaitResultPair *result); + + /// Push an error to the relevant driver. + /// + /// \param The ID of the job that the error is for. + /// \param The type of the error. + /// \param The error message. + /// \param The timestamp of the error. + /// \return ray::Status. + ray::Status PushError(const JobID &job_id, const std::string &type, + const std::string &error_message, double timestamp); + + /// Store some profile events in the GCS. + /// + /// \param profile_events A batch of profiling event information. + /// \return ray::Status. + ray::Status PushProfileEvents(const ProfileTableDataT &profile_events); + + /// Free a list of objects from object stores. + /// + /// \param object_ids A list of ObjectsIDs to be deleted. + /// \param local_only Whether keep this request with local object store + /// or send it to all the object stores. + /// \return ray::Status. + ray::Status FreeObjects(const std::vector &object_ids, bool local_only); + + Language GetLanguage() const { return language_; } + + JobID GetClientID() const { return client_id_; } + + JobID GetDriverID() const { return driver_id_; } + + bool IsWorker() const { return is_worker_; } + + const ResourceMappingType &GetResourceIDs() const { return resource_ids_; } + + private: + const UniqueID client_id_; + const bool is_worker_; + const JobID driver_id_; + const Language language_; + /// A map from resource name to the resource IDs that are currently reserved + /// for this worker. Each pair consists of the resource ID and the fraction + /// of that resource allocated for this worker. + ResourceMappingType resource_ids_; + /// The connection to the raylet server. + std::unique_ptr conn_; +}; + +#endif diff --git a/test/stress_tests.py b/test/stress_tests.py index c65da083c..3771f5805 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -541,8 +541,8 @@ def test_driver_put_errors(ray_start_driver_put_errors): # were evicted and whose originating tasks are still running, this # for-loop should hang on its first iteration and push an error to the # driver. - ray.worker.global_worker.local_scheduler_client.fetch_or_reconstruct( - [args[0]], False) + ray.worker.global_worker.raylet_client.fetch_or_reconstruct([args[0]], + False) def error_check(errors): return len(errors) > 1