mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 11:01:06 +08:00
[gRPC] Migrate raylet client implementation to grpc (#5120)
This commit is contained in:
@@ -5,6 +5,13 @@ from __future__ import print_function
|
||||
import os
|
||||
import sys
|
||||
|
||||
# MUST import ray._raylet before pyarrow to initialize some global variables.
|
||||
# It seems the library related to memory allocation in pyarrow will destroy the
|
||||
# initialization of grpc if we import pyarrow at first.
|
||||
# NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more
|
||||
# details.
|
||||
import ray._raylet
|
||||
|
||||
if "pyarrow" in sys.modules:
|
||||
raise ImportError("Ray must be imported before pyarrow because Ray "
|
||||
"requires a specific version of pyarrow (which is "
|
||||
|
||||
@@ -220,14 +220,14 @@ cdef class RayletClient:
|
||||
cdef unique_ptr[CRayletClient] client
|
||||
|
||||
def __cinit__(self, raylet_socket,
|
||||
ClientID client_id,
|
||||
WorkerID worker_id,
|
||||
c_bool is_worker,
|
||||
JobID job_id):
|
||||
# We know that we are using Python, so just skip the language
|
||||
# parameter.
|
||||
# TODO(suquark): Should we allow unicode chars in "raylet_socket"?
|
||||
self.client.reset(new CRayletClient(
|
||||
raylet_socket.encode("ascii"), client_id.native(), is_worker,
|
||||
raylet_socket.encode("ascii"), worker_id.native(), is_worker,
|
||||
job_id.native(), LANGUAGE_PYTHON))
|
||||
|
||||
def disconnect(self):
|
||||
@@ -374,7 +374,7 @@ cdef class RayletClient:
|
||||
|
||||
@property
|
||||
def client_id(self):
|
||||
return ClientID(self.client.get().GetClientID().Binary())
|
||||
return ClientID(self.client.get().GetWorkerId().Binary())
|
||||
|
||||
@property
|
||||
def job_id(self):
|
||||
|
||||
@@ -88,16 +88,16 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
|
||||
|
||||
|
||||
cdef extern from "ray/protobuf/common.pb.h" nogil:
|
||||
cdef cppclass CLanguage "Language":
|
||||
cdef cppclass CLanguage "ray::rpc::Language":
|
||||
pass
|
||||
|
||||
|
||||
# This is a workaround for C++ enum class since Cython has no corresponding
|
||||
# representation.
|
||||
cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil:
|
||||
cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON"
|
||||
cdef CLanguage LANGUAGE_CPP "Language::CPP"
|
||||
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"
|
||||
cdef extern from "ray/protobuf/common.pb.h" namespace "ray::rpc::Language" nogil:
|
||||
cdef CLanguage LANGUAGE_PYTHON "ray::rpc::Language::PYTHON"
|
||||
cdef CLanguage LANGUAGE_CPP "ray::rpc::Language::CPP"
|
||||
cdef CLanguage LANGUAGE_JAVA "ray::rpc::Language::JAVA"
|
||||
|
||||
|
||||
cdef extern from "ray/common/task/scheduling_resources.h" \
|
||||
|
||||
@@ -23,14 +23,14 @@ from ray.includes.task cimport CTaskSpec
|
||||
|
||||
|
||||
cdef extern from "ray/protobuf/gcs.pb.h" nogil:
|
||||
cdef cppclass GCSProfileEvent "ProfileTableData::ProfileEvent":
|
||||
cdef cppclass GCSProfileEvent "ray::rpc::ProfileTableData::ProfileEvent":
|
||||
void set_event_type(const c_string &value)
|
||||
void set_start_time(double value)
|
||||
void set_end_time(double value)
|
||||
c_string set_extra_data(const c_string &value)
|
||||
GCSProfileEvent()
|
||||
|
||||
cdef cppclass GCSProfileTableData "ProfileTableData":
|
||||
cdef cppclass GCSProfileTableData "ray::rpc::ProfileTableData":
|
||||
void set_component_type(const c_string &value)
|
||||
void set_component_id(const c_string &value)
|
||||
void set_node_ip_address(const c_string &value)
|
||||
@@ -43,13 +43,12 @@ ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \
|
||||
ctypedef pair[c_vector[CObjectID], c_vector[CObjectID]] WaitResultPair
|
||||
|
||||
|
||||
cdef extern from "ray/raylet/raylet_client.h" nogil:
|
||||
cdef cppclass CRayletClient "RayletClient":
|
||||
cdef extern from "ray/rpc/raylet/raylet_client.h" namespace "ray::rpc" nogil:
|
||||
cdef cppclass CRayletClient "ray::rpc::RayletClient":
|
||||
CRayletClient(const c_string &raylet_socket,
|
||||
const CClientID &client_id,
|
||||
const CWorkerID &worker_id,
|
||||
c_bool is_worker, const CJobID &job_id,
|
||||
const CLanguage &language)
|
||||
CRayStatus Disconnect()
|
||||
CRayStatus SubmitTask(const CTaskSpec &task_spec)
|
||||
CRayStatus GetTask(unique_ptr[CTaskSpec] *task_spec)
|
||||
CRayStatus TaskDone()
|
||||
@@ -73,7 +72,8 @@ cdef extern from "ray/raylet/raylet_client.h" nogil:
|
||||
const CActorID &actor_id, const CActorCheckpointID &checkpoint_id)
|
||||
CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id)
|
||||
CLanguage GetLanguage() const
|
||||
CClientID GetClientID() const
|
||||
CWorkerID GetWorkerId() const
|
||||
CJobID GetJobID() const
|
||||
c_bool IsWorker() const
|
||||
CRayStatus Disconnect()
|
||||
const ResourceMappingType &GetResourceIDs() const
|
||||
|
||||
@@ -458,11 +458,9 @@ print("success")
|
||||
# Make sure the first driver ran to completion.
|
||||
assert "success" in out
|
||||
|
||||
nonexistent_id_bytes = _random_string()
|
||||
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
|
||||
# Define a driver that creates one task that depends on a nonexistent
|
||||
# object. This task will be queued as waiting to execute.
|
||||
driver_script = """
|
||||
driver_script_template = """
|
||||
import time
|
||||
import ray
|
||||
ray.init(redis_address="{}")
|
||||
@@ -472,11 +470,15 @@ def g(x):
|
||||
g.remote(ray.ObjectID(ray.utils.hex_to_binary("{}")))
|
||||
time.sleep(1)
|
||||
print("success")
|
||||
""".format(redis_address, nonexistent_id_hex)
|
||||
"""
|
||||
|
||||
# Create some drivers and let them exit and make sure everything is
|
||||
# still alive.
|
||||
for _ in range(3):
|
||||
nonexistent_id_bytes = _random_string()
|
||||
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
|
||||
driver_script = driver_script_template.format(redis_address,
|
||||
nonexistent_id_hex)
|
||||
out = run_string_as_driver(driver_script)
|
||||
# Simulate the nonexistent dependency becoming available.
|
||||
ray.worker.global_worker.put_object(
|
||||
@@ -484,10 +486,8 @@ print("success")
|
||||
# Make sure the first driver ran to completion.
|
||||
assert "success" in out
|
||||
|
||||
nonexistent_id_bytes = _random_string()
|
||||
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
|
||||
# Define a driver that calls `ray.wait` on a nonexistent object.
|
||||
driver_script = """
|
||||
driver_script_template = """
|
||||
import time
|
||||
import ray
|
||||
ray.init(redis_address="{}")
|
||||
@@ -497,11 +497,15 @@ def g():
|
||||
g.remote()
|
||||
time.sleep(1)
|
||||
print("success")
|
||||
""".format(redis_address, nonexistent_id_hex)
|
||||
"""
|
||||
|
||||
# Create some drivers and let them exit and make sure everything is
|
||||
# still alive.
|
||||
for _ in range(3):
|
||||
nonexistent_id_bytes = _random_string()
|
||||
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
|
||||
driver_script = driver_script_template.format(redis_address,
|
||||
nonexistent_id_hex)
|
||||
out = run_string_as_driver(driver_script)
|
||||
# Simulate the nonexistent dependency becoming available.
|
||||
ray.worker.global_worker.put_object(
|
||||
|
||||
@@ -41,7 +41,6 @@ import ray.state
|
||||
from ray import (
|
||||
ActorHandleID,
|
||||
ActorID,
|
||||
ClientID,
|
||||
WorkerID,
|
||||
JobID,
|
||||
ObjectID,
|
||||
@@ -1923,7 +1922,7 @@ def connect(node,
|
||||
|
||||
worker.raylet_client = ray._raylet.RayletClient(
|
||||
node.raylet_socket_name,
|
||||
ClientID(worker.worker_id),
|
||||
WorkerID(worker.worker_id),
|
||||
(mode == WORKER_MODE),
|
||||
worker.current_job_id,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user