mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 04:07:01 +08:00
[Refactor] Rename ClientId to NodeId (#10992)
* rename ClientId to NodeId * format lint * format lint * fix conflicts * rename new ClientId to NodeId * update lint * make same version of clang-format with travis ci
This commit is contained in:
@@ -66,7 +66,7 @@ from ray._raylet import (
|
||||
ActorCheckpointID,
|
||||
ActorClassID,
|
||||
ActorID,
|
||||
ClientID,
|
||||
NodeID,
|
||||
Config as _Config,
|
||||
JobID,
|
||||
WorkerID,
|
||||
@@ -151,7 +151,7 @@ __all__ += [
|
||||
"ActorCheckpointID",
|
||||
"ActorClassID",
|
||||
"ActorID",
|
||||
"ClientID",
|
||||
"NodeID",
|
||||
"JobID",
|
||||
"WorkerID",
|
||||
"FunctionID",
|
||||
|
||||
@@ -76,7 +76,7 @@ from ray.includes.unique_ids cimport (
|
||||
CActorID,
|
||||
CActorCheckpointID,
|
||||
CObjectID,
|
||||
CClientID,
|
||||
CNodeID,
|
||||
CPlacementGroupID,
|
||||
)
|
||||
from ray.includes.libcoreworker cimport (
|
||||
@@ -784,7 +784,7 @@ cdef class CoreWorker:
|
||||
CCoreWorkerProcess.GetCoreWorker().GetCurrentJobId().Binary())
|
||||
|
||||
def get_current_node_id(self):
|
||||
return ClientID(
|
||||
return NodeID(
|
||||
CCoreWorkerProcess.GetCoreWorker().GetCurrentNodeId().Binary())
|
||||
|
||||
def get_actor_id(self):
|
||||
@@ -1479,10 +1479,10 @@ cdef class CoreWorker:
|
||||
actor_id.native(), checkpoint_id.native()))
|
||||
|
||||
def set_resource(self, basestring resource_name,
|
||||
double capacity, ClientID client_id):
|
||||
double capacity, NodeID client_id):
|
||||
CCoreWorkerProcess.GetCoreWorker().SetResource(
|
||||
resource_name.encode("ascii"), capacity,
|
||||
CClientID.FromBinary(client_id.binary()))
|
||||
CNodeID.FromBinary(client_id.binary()))
|
||||
|
||||
def force_spill_objects(self, object_refs):
|
||||
cdef c_vector[CObjectID] object_ids
|
||||
|
||||
@@ -4,17 +4,17 @@ import ray
|
||||
def set_resource(resource_name, capacity, client_id=None):
|
||||
""" Set a resource to a specified capacity.
|
||||
|
||||
This creates, updates or deletes a custom resource for a target clientId.
|
||||
This creates, updates or deletes a custom resource for a target NodeID.
|
||||
If the resource already exists, it's capacity is updated to the new value.
|
||||
If the capacity is set to 0, the resource is deleted.
|
||||
If ClientID is not specified or set to None,
|
||||
If NodeID is not specified or set to None,
|
||||
the resource is created on the local client where the actor is running.
|
||||
|
||||
Args:
|
||||
resource_name (str): Name of the resource to be created
|
||||
capacity (int): Capacity of the new resource. Resource is deleted if
|
||||
capacity is 0.
|
||||
client_id (str): The ClientId of the node where the resource is to be
|
||||
client_id (str): The NodeID of the node where the resource is to be
|
||||
set.
|
||||
|
||||
Returns:
|
||||
@@ -25,9 +25,9 @@ def set_resource(resource_name, capacity, client_id=None):
|
||||
specified.
|
||||
"""
|
||||
if client_id is not None:
|
||||
client_id_obj = ray.ClientID(ray.utils.hex_to_binary(client_id))
|
||||
client_id_obj = ray.NodeID(ray.utils.hex_to_binary(client_id))
|
||||
else:
|
||||
client_id_obj = ray.ClientID.nil()
|
||||
client_id_obj = ray.NodeID.nil()
|
||||
if (capacity < 0) or (capacity != int(capacity)):
|
||||
raise ValueError(
|
||||
"Capacity {} must be a non-negative integer.".format(capacity))
|
||||
|
||||
@@ -4,7 +4,7 @@ from libcpp.vector cimport vector as c_vector
|
||||
from libcpp.memory cimport unique_ptr
|
||||
from ray.includes.unique_ids cimport (
|
||||
CActorID,
|
||||
CClientID,
|
||||
CNodeID,
|
||||
CObjectID,
|
||||
CWorkerID,
|
||||
CPlacementGroupID,
|
||||
@@ -24,7 +24,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
|
||||
unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id)
|
||||
c_vector[c_string] GetAllActorInfo()
|
||||
unique_ptr[c_string] GetActorInfo(const CActorID &actor_id)
|
||||
c_string GetNodeResourceInfo(const CClientID &node_id)
|
||||
c_string GetNodeResourceInfo(const CNodeID &node_id)
|
||||
unique_ptr[c_string] GetWorkerInfo(const CWorkerID &worker_id)
|
||||
c_vector[c_string] GetAllWorkerInfo()
|
||||
c_bool AddWorkerInfo(const c_string &serialized_string)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from ray.includes.unique_ids cimport (
|
||||
CActorID,
|
||||
CClientID,
|
||||
CNodeID,
|
||||
CObjectID,
|
||||
CWorkerID,
|
||||
CPlacementGroupID
|
||||
@@ -89,7 +89,7 @@ cdef class GlobalStateAccessor:
|
||||
|
||||
def get_node_resource_info(self, node_id):
|
||||
cdef c_string result
|
||||
cdef CClientID cnode_id = CClientID.FromBinary(node_id.binary())
|
||||
cdef CNodeID cnode_id = CNodeID.FromBinary(node_id.binary())
|
||||
with nogil:
|
||||
result = self.inner.get().GetNodeResourceInfo(cnode_id)
|
||||
return result
|
||||
|
||||
@@ -14,7 +14,7 @@ from libcpp.vector cimport vector as c_vector
|
||||
from ray.includes.unique_ids cimport (
|
||||
CActorID,
|
||||
CActorCheckpointID,
|
||||
CClientID,
|
||||
CNodeID,
|
||||
CJobID,
|
||||
CTaskID,
|
||||
CObjectID,
|
||||
@@ -121,7 +121,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
|
||||
CJobID GetCurrentJobId()
|
||||
CTaskID GetCurrentTaskId()
|
||||
CClientID GetCurrentNodeId()
|
||||
CNodeID GetCurrentNodeId()
|
||||
CPlacementGroupID GetCurrentPlacementGroupId()
|
||||
const CActorID &GetActorId()
|
||||
void SetActorTitle(const c_string &title)
|
||||
@@ -196,7 +196,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
const CActorID &actor_id, const CActorCheckpointID &checkpoint_id)
|
||||
CRayStatus SetResource(const c_string &resource_name,
|
||||
const double capacity,
|
||||
const CClientID &client_Id)
|
||||
const CNodeID &client_Id)
|
||||
CRayStatus SpillObjects(const c_vector[CObjectID] &object_ids)
|
||||
CRayStatus ForceRestoreSpilledObjects(
|
||||
const c_vector[CObjectID] &object_ids)
|
||||
|
||||
@@ -65,10 +65,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
|
||||
CActorID Of(CJobID job_id, CTaskID parent_task_id,
|
||||
int64_t parent_task_counter)
|
||||
|
||||
cdef cppclass CClientID "ray::ClientID"(CUniqueID):
|
||||
cdef cppclass CNodeID "ray::NodeID"(CUniqueID):
|
||||
|
||||
@staticmethod
|
||||
CClientID FromBinary(const c_string &binary)
|
||||
CNodeID FromBinary(const c_string &binary)
|
||||
|
||||
cdef cppclass CConfigID "ray::ConfigID"(CUniqueID):
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ from ray.includes.unique_ids cimport (
|
||||
CActorCheckpointID,
|
||||
CActorClassID,
|
||||
CActorID,
|
||||
CClientID,
|
||||
CNodeID,
|
||||
CConfigID,
|
||||
CJobID,
|
||||
CFunctionID,
|
||||
@@ -199,14 +199,14 @@ cdef class TaskID(BaseID):
|
||||
CTaskID.FromBinary(parent_task_id.binary()),
|
||||
parent_task_counter).Binary())
|
||||
|
||||
cdef class ClientID(UniqueID):
|
||||
cdef class NodeID(UniqueID):
|
||||
|
||||
def __init__(self, id):
|
||||
check_id(id)
|
||||
self.data = CClientID.FromBinary(<c_string>id)
|
||||
self.data = CNodeID.FromBinary(<c_string>id)
|
||||
|
||||
cdef CClientID native(self):
|
||||
return <CClientID>self.data
|
||||
cdef CNodeID native(self):
|
||||
return <CNodeID>self.data
|
||||
|
||||
|
||||
cdef class JobID(BaseID):
|
||||
@@ -373,7 +373,7 @@ _ID_TYPES = [
|
||||
ActorCheckpointID,
|
||||
ActorClassID,
|
||||
ActorID,
|
||||
ClientID,
|
||||
NodeID,
|
||||
JobID,
|
||||
WorkerID,
|
||||
FunctionID,
|
||||
|
||||
+1
-1
@@ -266,7 +266,7 @@ class GlobalState:
|
||||
"""
|
||||
self._check_connected()
|
||||
|
||||
node_id = ray.ClientID(hex_to_binary(node_id))
|
||||
node_id = ray.NodeID(hex_to_binary(node_id))
|
||||
node_resource_bytes = \
|
||||
self.global_state_accessor.get_node_resource_info(node_id)
|
||||
if node_resource_bytes is None:
|
||||
|
||||
Reference in New Issue
Block a user