Change Python's ObjectID to ObjectRef (#9353)

This commit is contained in:
Hao Chen
2020-07-10 17:49:04 +08:00
committed by GitHub
parent 6311e5a947
commit d49dadf891
91 changed files with 959 additions and 907 deletions
+2
View File
@@ -65,6 +65,7 @@ from ray._raylet import (
WorkerID,
FunctionID,
ObjectID,
ObjectRef,
TaskID,
UniqueID,
Language,
@@ -166,6 +167,7 @@ __all__ += [
"WorkerID",
"FunctionID",
"ObjectID",
"ObjectRef",
"TaskID",
"UniqueID",
]
+3 -3
View File
@@ -65,10 +65,10 @@ cdef class BaseID:
# here `cdef size_t` is required.
cdef size_t hash(self)
cdef class ObjectID(BaseID):
cdef class ObjectRef(BaseID):
cdef:
CObjectID data
# Flag indicating whether or not this object ID was added to the set
# Flag indicating whether or not this object ref was added to the set
# of active IDs in the core worker so we know whether we should clean
# it up.
c_bool in_core_worker
@@ -91,7 +91,7 @@ cdef class CoreWorker:
c_bool is_local_mode
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectID object_id,
size_t data_size, ObjectRef object_ref,
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data)
cdef store_task_outputs(
+75 -71
View File
@@ -103,6 +103,7 @@ import msgpack
cimport cpython
include "includes/object_ref.pxi"
include "includes/unique_ids.pxi"
include "includes/ray_config.pxi"
include "includes/function_descriptor.pxi"
@@ -162,31 +163,31 @@ cdef RayObjectsToDataMetadataPairs(
return data_metadata_pairs
cdef VectorToObjectIDs(const c_vector[CObjectID] &object_ids):
cdef VectorToObjectRefs(const c_vector[CObjectID] &object_refs):
result = []
for i in range(object_ids.size()):
result.append(ObjectID(object_ids[i].Binary()))
for i in range(object_refs.size()):
result.append(ObjectRef(object_refs[i].Binary()))
return result
cdef c_vector[CObjectID] ObjectIDsToVector(object_ids):
"""A helper function that converts a Python list of object IDs to a vector.
cdef c_vector[CObjectID] ObjectRefsToVector(object_refs):
"""A helper function that converts a Python list of object refs to a vector.
Args:
object_ids (list): The Python list of object IDs.
object_refs (list): The Python list of object refs.
Returns:
The output vector.
"""
cdef:
c_vector[CObjectID] result
for object_id in object_ids:
result.push_back((<ObjectID>object_id).native())
for object_ref in object_refs:
result.push_back((<ObjectRef>object_ref).native())
return result
def compute_task_id(ObjectID object_id):
return TaskID(object_id.native().TaskId().Binary())
def compute_task_id(ObjectRef object_ref):
return TaskID(object_ref.native().TaskId().Binary())
cdef increase_recursion_limit():
@@ -273,8 +274,8 @@ cdef prepare_args(
worker = ray.worker.global_worker
put_threshold = RayConfig.instance().max_direct_call_object_size()
for arg in args:
if isinstance(arg, ObjectID):
c_arg = (<ObjectID>arg).native()
if isinstance(arg, ObjectRef):
c_arg = (<ObjectRef>arg).native()
args_vector.push_back(
unique_ptr[CTaskArg](new CTaskArgByReference(
c_arg,
@@ -292,9 +293,9 @@ cdef prepare_args(
metadata, language))
size = serialized_arg.total_bytes
# TODO(edoakes): any objects containing ObjectIDs are spilled to
# TODO(edoakes): any objects containing ObjectRefs are spilled to
# plasma here. This is inefficient for small objects, but inlined
# arguments aren't associated ObjectIDs right now so this is a
# arguments aren't associated ObjectRefs right now so this is a
# simple fix for reference counting purposes.
if <int64_t>size <= put_threshold:
arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
@@ -302,8 +303,8 @@ cdef prepare_args(
if size > 0:
(<SerializedObject>serialized_arg).write_to(
Buffer.make(arg_data))
for object_id in serialized_arg.contained_object_ids:
inlined_ids.push_back((<ObjectID>object_id).native())
for object_ref in serialized_arg.contained_object_refs:
inlined_ids.push_back((<ObjectRef>object_ref).native())
args_vector.push_back(
unique_ptr[CTaskArg](new CTaskArgByValue(
make_shared[CRayObject](
@@ -436,7 +437,7 @@ cdef execute_task(
args, kwargs = [], {}
else:
metadata_pairs = RayObjectsToDataMetadataPairs(c_args)
object_ids = VectorToObjectIDs(c_arg_reference_ids)
object_refs = VectorToObjectRefs(c_arg_reference_ids)
if core_worker.current_actor_is_asyncio():
# We deserialize objects in event loop thread to
@@ -444,12 +445,12 @@ cdef execute_task(
async def deserialize_args():
return (ray.worker.global_worker
.deserialize_objects(
metadata_pairs, object_ids))
metadata_pairs, object_refs))
args = core_worker.run_async_func_in_event_loop(
deserialize_args)
else:
args = ray.worker.global_worker.deserialize_objects(
metadata_pairs, object_ids)
metadata_pairs, object_refs)
for arg in args:
if isinstance(arg, RayError):
@@ -721,12 +722,12 @@ cdef class CoreWorker:
def get_plasma_event_handler(self):
return self.plasma_event_handler
def get_objects(self, object_ids, TaskID current_task_id,
def get_objects(self, object_refs, TaskID current_task_id,
int64_t timeout_ms=-1):
cdef:
c_vector[shared_ptr[CRayObject]] results
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectIDsToVector(object_ids)
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Get(
@@ -734,10 +735,10 @@ cdef class CoreWorker:
return RayObjectsToDataMetadataPairs(results)
def object_exists(self, ObjectID object_id):
def object_exists(self, ObjectRef object_ref):
cdef:
c_bool has_object
CObjectID c_object_id = object_id.native()
CObjectID c_object_id = object_ref.native()
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Contains(
@@ -746,29 +747,29 @@ cdef class CoreWorker:
return has_object
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectID object_id,
size_t data_size, ObjectRef object_ref,
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data):
if object_id is None:
if object_ref is None:
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Create(
metadata, data_size, contained_ids,
c_object_id, data))
else:
c_object_id[0] = object_id.native()
c_object_id[0] = object_ref.native()
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Create(
metadata, data_size,
c_object_id[0], data))
# If data is nullptr, that means the ObjectID already existed,
# If data is nullptr, that means the ObjectRef already existed,
# which we ignore.
# TODO(edoakes): this is hacky, we should return the error instead
# and deal with it here.
return data.get() == NULL
def put_serialized_object(self, serialized_object,
ObjectID object_id=None,
ObjectRef object_ref=None,
c_bool pin_object=True):
cdef:
CObjectID c_object_id
@@ -784,8 +785,8 @@ cdef class CoreWorker:
RayConfig.instance().put_small_object_in_memory_store())
total_bytes = serialized_object.total_bytes
object_already_exists = self._create_put_buffer(
metadata, total_bytes, object_id,
ObjectIDsToVector(serialized_object.contained_object_ids),
metadata, total_bytes, object_ref,
ObjectRefsToVector(serialized_object.contained_object_refs),
&c_object_id, &data)
if not object_already_exists:
@@ -800,42 +801,42 @@ cdef class CoreWorker:
c_object_id_vector, c_object_id))
else:
with nogil:
# Using custom object IDs is not supported because we can't
# track their lifecycle, so we don't pin the object in this
# case.
# Using custom object refs is not supported because we
# can't track their lifecycle, so we don't pin the object
# in this case.
check_status(CCoreWorkerProcess.GetCoreWorker().Seal(
c_object_id,
pin_object and object_id is None))
pin_object and object_ref is None))
return c_object_id.Binary()
def wait(self, object_ids, int num_returns, int64_t timeout_ms,
def wait(self, object_refs, int num_returns, int64_t timeout_ms,
TaskID current_task_id):
cdef:
c_vector[CObjectID] wait_ids
c_vector[c_bool] results
CTaskID c_task_id = current_task_id.native()
wait_ids = ObjectIDsToVector(object_ids)
wait_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Wait(
wait_ids, num_returns, timeout_ms, &results))
assert len(results) == len(object_ids)
assert len(results) == len(object_refs)
ready, not_ready = [], []
for i, object_id in enumerate(object_ids):
for i, object_ref in enumerate(object_refs):
if results[i]:
ready.append(object_id)
ready.append(object_ref)
else:
not_ready.append(object_id)
not_ready.append(object_ref)
return ready, not_ready
def free_objects(self, object_ids, c_bool local_only,
def free_objects(self, object_refs, c_bool local_only,
c_bool delete_creating_tasks):
cdef:
c_vector[CObjectID] free_ids = ObjectIDsToVector(object_ids)
c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Delete(
@@ -894,7 +895,7 @@ cdef class CoreWorker:
ray_function, args_vector, task_options, &return_ids,
max_retries)
return VectorToObjectIDs(return_ids)
return VectorToObjectRefs(return_ids)
def create_actor(self,
Language language,
@@ -966,7 +967,7 @@ cdef class CoreWorker:
ray_function,
args_vector, task_options, &return_ids)
return VectorToObjectIDs(return_ids)
return VectorToObjectRefs(return_ids)
def kill_actor(self, ActorID actor_id, c_bool no_restart):
cdef:
@@ -976,9 +977,9 @@ cdef class CoreWorker:
check_status(CCoreWorkerProcess.GetCoreWorker().KillActor(
c_actor_id, True, no_restart))
def cancel_task(self, ObjectID object_id, c_bool force_kill):
def cancel_task(self, ObjectRef object_ref, c_bool force_kill):
cdef:
CObjectID c_object_id = object_id.native()
CObjectID c_object_id = object_ref.native()
CRayStatus status = CRayStatus.OK()
status = CCoreWorkerProcess.GetCoreWorker().CancelTask(
@@ -1061,11 +1062,11 @@ cdef class CoreWorker:
worker.current_session_and_job)
def deserialize_and_register_actor_handle(self, const c_string &bytes,
ObjectID
outer_object_id):
ObjectRef
outer_object_ref):
cdef:
CObjectID c_outer_object_id = (outer_object_id.native() if
outer_object_id else
CObjectID c_outer_object_id = (outer_object_ref.native() if
outer_object_ref else
CObjectID.Nil())
c_actor_id = (CCoreWorkerProcess
.GetCoreWorker()
@@ -1093,35 +1094,37 @@ cdef class CoreWorker:
CObjectID c_actor_handle_id
check_status(CCoreWorkerProcess.GetCoreWorker().SerializeActorHandle(
actor_id.native(), &output, &c_actor_handle_id))
return output, ObjectID(c_actor_handle_id.Binary())
return output, ObjectRef(c_actor_handle_id.Binary())
def add_object_id_reference(self, ObjectID object_id):
def add_object_ref_reference(self, ObjectRef object_ref):
# Note: faster to not release GIL for short-running op.
CCoreWorkerProcess.GetCoreWorker().AddLocalReference(
object_id.native())
object_ref.native())
def remove_object_id_reference(self, ObjectID object_id):
def remove_object_ref_reference(self, ObjectRef object_ref):
# Note: faster to not release GIL for short-running op.
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
object_id.native())
object_ref.native())
def serialize_and_promote_object_id(self, ObjectID object_id):
def serialize_and_promote_object_ref(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_id.native()
CObjectID c_object_id = object_ref.native()
CAddress c_owner_address = CAddress()
CCoreWorkerProcess.GetCoreWorker().PromoteObjectToPlasma(c_object_id)
CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
c_object_id, &c_owner_address)
return (object_id,
return (object_ref,
c_owner_address.SerializeAsString())
def deserialize_and_register_object_id(
self, const c_string &object_id_binary, ObjectID outer_object_id,
const c_string &serialized_owner_address):
def deserialize_and_register_object_ref(
self, const c_string &object_ref_binary,
ObjectRef outer_object_ref,
const c_string &serialized_owner_address,
):
cdef:
CObjectID c_object_id = CObjectID.FromBinary(object_id_binary)
CObjectID c_outer_object_id = (outer_object_id.native() if
outer_object_id else
CObjectID c_object_id = CObjectID.FromBinary(object_ref_binary)
CObjectID c_outer_object_id = (outer_object_ref.native() if
outer_object_ref else
CObjectID.Nil())
CAddress c_owner_address = CAddress()
@@ -1158,7 +1161,8 @@ cdef class CoreWorker:
string_to_buffer(serialized_object.metadata))
serialized_objects.append(serialized_object)
contained_ids.push_back(
ObjectIDsToVector(serialized_object.contained_object_ids))
ObjectRefsToVector(serialized_object.contained_object_refs)
)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
@@ -1239,18 +1243,18 @@ cdef class CoreWorker:
ref_counts = {}
while it != c_ref_counts.end():
object_id = dereference(it).first.Hex()
ref_counts[object_id] = {
object_ref = dereference(it).first.Hex()
ref_counts[object_ref] = {
"local": dereference(it).second.first,
"submitted": dereference(it).second.second}
postincrement(it)
return ref_counts
def get_async(self, ObjectID object_id, future):
def get_async(self, ObjectRef object_ref, future):
cpython.Py_INCREF(future)
CCoreWorkerProcess.GetCoreWorker().GetAsync(
object_id.native(),
object_ref.native(),
async_set_result,
<void*>future)
@@ -1287,7 +1291,7 @@ cdef class CoreWorker:
CClientID.FromBinary(client_id.binary()))
cdef void async_set_result(shared_ptr[CRayObject] obj,
CObjectID object_id,
CObjectID object_ref,
void *future) with gil:
cdef:
c_vector[shared_ptr[CRayObject]] objects_to_deserialize
@@ -1299,7 +1303,7 @@ cdef void async_set_result(shared_ptr[CRayObject] obj,
objects_to_deserialize.push_back(obj)
data_metadata_pairs = RayObjectsToDataMetadataPairs(
objects_to_deserialize)
ids_to_deserialize = [ObjectID(object_id.Binary())]
ids_to_deserialize = [ObjectRef(object_ref.Binary())]
result = ray.worker.global_worker.deserialize_objects(
data_metadata_pairs, ids_to_deserialize)[0]
+15 -15
View File
@@ -31,7 +31,7 @@ def method(*args, **kwargs):
_, _ = f.bar.remote()
Args:
num_return_vals: The number of object IDs that should be returned by
num_return_vals: The number of object refs that should be returned by
invocations of this actor method.
"""
assert len(args) == 0
@@ -64,7 +64,7 @@ class ActorMethod:
invoking the method. The decorator must return a function that
takes in two arguments ("args" and "kwargs"). In most cases, it
should call the function that was passed into the decorator and
return the resulting ObjectIDs. For an example, see
return the resulting ObjectRefs. For an example, see
"test_decorated_method" in "python/ray/tests/test_actor.py".
"""
@@ -81,7 +81,7 @@ class ActorMethod:
# opposed to the function execution). The decorator must return a
# function that takes in two arguments ("args" and "kwargs"). In most
# cases, it should call the function that was passed into the decorator
# and return the resulting ObjectIDs.
# and return the resulting ObjectRefs.
self._decorator = decorator
# Acquire a hard ref to the actor, this is useful mainly when passing
@@ -685,7 +685,7 @@ class ActorHandle:
num_return_vals (int): The number of return values for the method.
Returns:
object_ids: A list of object IDs returned by the remote actor
object_refs: A list of object refs returned by the remote actor
method.
"""
worker = ray.worker.global_worker
@@ -713,16 +713,16 @@ class ActorHandle:
"Cross language remote actor method " \
"cannot be executed locally."
object_ids = worker.core_worker.submit_actor_task(
object_refs = worker.core_worker.submit_actor_task(
self._ray_actor_language, self._ray_actor_id, function_descriptor,
list_args, num_return_vals, self._ray_actor_method_cpus)
if len(object_ids) == 1:
object_ids = object_ids[0]
elif len(object_ids) == 0:
object_ids = None
if len(object_refs) == 1:
object_refs = object_refs[0]
elif len(object_refs) == 0:
object_refs = None
return object_ids
return object_refs
def __getattr__(self, item):
if not self._ray_is_cross_language:
@@ -795,14 +795,14 @@ class ActorHandle:
return state
@classmethod
def _deserialization_helper(cls, state, outer_object_id=None):
def _deserialization_helper(cls, state, outer_object_ref=None):
"""This is defined in order to make pickling work.
Args:
state: The serialized state of the actor handle.
outer_object_id: The ObjectID that the serialized actor handle was
contained in, if any. This is used for counting references to
the actor handle.
outer_object_ref: The ObjectRef that the serialized actor handle
was contained in, if any. This is used for counting references
to the actor handle.
"""
worker = ray.worker.global_worker
@@ -811,7 +811,7 @@ class ActorHandle:
if hasattr(worker, "core_worker"):
# Non-local mode
return worker.core_worker.deserialize_and_register_actor_handle(
state, outer_object_id)
state, outer_object_ref)
else:
# Local mode
return cls(
+4 -4
View File
@@ -33,13 +33,13 @@ def sync_to_async(func):
return wrapper
def get_async(object_id):
def get_async(object_ref):
"""C++ Asyncio version of ray.get"""
loop = asyncio.get_event_loop()
core_worker = ray.worker.global_worker.core_worker
future = loop.create_future()
core_worker.get_async(object_id, future)
# A hack to keep a reference to the object ID for ref counting.
future.object_id = object_id
core_worker.get_async(object_ref, future)
# A hack to keep a reference to the object ref for ref counting.
future.object_ref = object_ref
return future
+2 -2
View File
@@ -166,7 +166,7 @@ export type RayletActorInfo =
nodeId: string;
numExecutedTasks: number;
numLocalObjects: number;
numObjectIdsInScope: number;
numObjectRefsInScope: number;
pid: number;
port: number;
state:
@@ -356,7 +356,7 @@ export type MemoryTableEntry = {
node_ip_address: string;
pid: number;
type: string;
object_id: string;
object_ref: string;
object_size: number;
reference_type: string;
call_site: string;
@@ -160,10 +160,10 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
"The number of tasks this actor has executed throughout its lifetimes.",
},
{
label: "Number of ObjectIDs in scope",
value: actor.numObjectIdsInScope.toLocaleString(),
label: "Number of ObjectRefs in scope",
value: actor.numObjectRefsInScope.toLocaleString(),
tooltip:
"The number of ObjectIDs that this actor is keeping in scope via its internal state. " +
"The number of ObjectRefs that this actor is keeping in scope via its internal state. " +
"This does not imply that the objects are in active use or colocated on the node with the actor " +
`currently. This can be useful for debugging memory leaks. See the docs at ${memoryDebuggingDocLink} ` +
"for more information.",
@@ -75,7 +75,7 @@ const memoryHeaderInfo: HeaderInfo<MemoryTableEntry>[] = [
{ id: "node_ip_address", label: "IP Address", numeric: true },
{ id: "pid", label: "pid", numeric: true },
{ id: "type", label: "Type", numeric: false },
{ id: "object_id", label: "Object ID", numeric: false },
{ id: "object_ref", label: "Object Ref", numeric: false },
{ id: "object_size", label: "Object Size (B)", numeric: true },
{ id: "reference_type", label: "Reference Type", numeric: false },
{ id: "call_site", label: "Call Site", numeric: false },
@@ -48,7 +48,7 @@ const MemoryRowGroup: React.FC<MemoryRowGroupProps> = ({
"node_ip_address",
"pid",
"type",
"object_id",
"object_ref",
"object_size",
"reference_type",
"call_site",
@@ -18,7 +18,7 @@ export const MemoryTableRow = (props: Props) => {
memoryTableEntry.node_ip_address,
memoryTableEntry.pid,
memoryTableEntry.type,
memoryTableEntry.object_id,
memoryTableEntry.object_ref,
object_size,
memoryTableEntry.reference_type,
memoryTableEntry.call_site,
+27 -27
View File
@@ -8,7 +8,7 @@ import ray
from ray._raylet import (TaskID, ActorID, JobID)
# These values are used to calculate if objectIDs are actor handles.
# These values are used to calculate if objectRefs are actor handles.
TASKID_BYTES_SIZE = TaskID.size()
ACTORID_BYTES_SIZE = ActorID.size()
JOBID_BYTES_SIZE = JobID.size()
@@ -17,21 +17,21 @@ TASKID_RANDOM_BITS_SIZE = (TASKID_BYTES_SIZE - ACTORID_BYTES_SIZE) * 2
ACTORID_RANDOM_BITS_SIZE = (ACTORID_BYTES_SIZE - JOBID_BYTES_SIZE) * 2
def decode_object_id_if_needed(object_id: str) -> bytes:
"""Decode objectID bytes string.
def decode_object_ref_if_needed(object_ref: str) -> bytes:
"""Decode objectRef bytes string.
gRPC reply contains an objectID that is encodded by Base64.
This function is used to decode the objectID.
Note that there are times that objectID is already decoded as
gRPC reply contains an objectRef that is encodded by Base64.
This function is used to decode the objectRef.
Note that there are times that objectRef is already decoded as
a hex string. In this case, just convert it to a binary number.
"""
if object_id.endswith("="):
# If the object id ends with =, that means it is base64 encoded.
# Object ids will always have = as a padding
# when it is base64 encoded because objectID is always 20B.
return base64.standard_b64decode(object_id)
if object_ref.endswith("="):
# If the object ref ends with =, that means it is base64 encoded.
# Object refs will always have = as a padding
# when it is base64 encoded because objectRef is always 20B.
return base64.standard_b64decode(object_ref)
else:
return ray.utils.hex_to_binary(object_id)
return ray.utils.hex_to_binary(object_ref)
class SortingType(Enum):
@@ -65,8 +65,8 @@ class MemoryTableEntry:
# object info
self.object_size = int(object_ref.get("objectSize", -1))
self.call_site = object_ref.get("callSite", "<Unknown>")
self.object_id = ray.ObjectID(
decode_object_id_if_needed(object_ref["objectId"]))
self.object_ref = ray.ObjectRef(
decode_object_ref_if_needed(object_ref["objectId"]))
# reference info
self.local_ref_count = int(object_ref.get("localRefCount", 0))
@@ -74,19 +74,19 @@ class MemoryTableEntry:
self.submitted_task_ref_count = int(
object_ref.get("submittedTaskRefCount", 0))
self.contained_in_owned = [
ray.ObjectID(decode_object_id_if_needed(object_id))
for object_id in object_ref.get("containedInOwned", [])
ray.ObjectRef(decode_object_ref_if_needed(object_ref))
for object_ref in object_ref.get("containedInOwned", [])
]
self.reference_type = self._get_reference_type()
def is_valid(self) -> bool:
# If the entry doesn't have a reference type or some invalid state,
# (e.g., no object ID presented), it is considered invalid.
# (e.g., no object ref presented), it is considered invalid.
if (not self.pinned_in_memory and self.local_ref_count == 0
and self.submitted_task_ref_count == 0
and len(self.contained_in_owned) == 0):
return False
elif self.object_id.is_nil():
elif self.object_ref.is_nil():
return False
else:
return True
@@ -99,7 +99,7 @@ class MemoryTableEntry:
"group by type {} is invalid.".format(group_by_type))
def _get_reference_type(self) -> str:
if self._is_object_id_actor_handle():
if self._is_object_ref_actor_handle():
return ReferenceType.ACTOR_HANDLE
if self.pinned_in_memory:
return ReferenceType.PINNED_IN_MEMORY
@@ -112,18 +112,18 @@ class MemoryTableEntry:
else:
return ReferenceType.UNKNOWN_STATUS
def _is_object_id_actor_handle(self) -> bool:
object_id_hex = self.object_id.hex()
def _is_object_ref_actor_handle(self) -> bool:
object_ref_hex = self.object_ref.hex()
# random (8B) | ActorID(6B) | flag (2B) | index (6B)
# ActorID(6B) == ActorRandomByte(4B) + JobID(2B)
# If random bytes are all 'f', but ActorRandomBytes
# are not all 'f', that means it is an actor creation
# task, which is an actor handle.
random_bits = object_id_hex[:TASKID_RANDOM_BITS_SIZE]
actor_random_bits = object_id_hex[TASKID_RANDOM_BITS_SIZE:
TASKID_RANDOM_BITS_SIZE +
ACTORID_RANDOM_BITS_SIZE]
random_bits = object_ref_hex[:TASKID_RANDOM_BITS_SIZE]
actor_random_bits = object_ref_hex[TASKID_RANDOM_BITS_SIZE:
TASKID_RANDOM_BITS_SIZE +
ACTORID_RANDOM_BITS_SIZE]
if (random_bits == "f" * 16 and not actor_random_bits == "f" * 8):
return True
else:
@@ -131,7 +131,7 @@ class MemoryTableEntry:
def __dict__(self):
return {
"object_id": self.object_id.hex(),
"object_ref": self.object_ref.hex(),
"pid": self.pid,
"node_ip_address": self.node_address,
"object_size": self.object_size,
@@ -141,7 +141,7 @@ class MemoryTableEntry:
"pinned_in_memory": self.pinned_in_memory,
"submitted_task_ref_count": self.submitted_task_ref_count,
"contained_in_owned": [
object_id.hex() for object_id in self.contained_in_owned
object_ref.hex() for object_ref in self.contained_in_owned
],
"type": "Driver" if self.is_driver else "Worker"
}
+1 -1
View File
@@ -36,7 +36,7 @@ class NodeStats(threading.Thread):
"jobId": "",
"numExecutedTasks": 0,
"numLocalObjects": 0,
"numObjectIdsInScope": 0,
"numObjectRefsInScope": 0,
"port": 0,
"state": 0,
"taskQueueLength": 0,
+4 -4
View File
@@ -170,11 +170,11 @@ class UnreconstructableError(RayError):
reconstruct the object.
Attributes:
object_id: ID of the object.
object_ref: ID of the object.
"""
def __init__(self, object_id):
self.object_id = object_id
def __init__(self, object_ref):
self.object_ref = object_ref
def __str__(self):
return (
@@ -183,7 +183,7 @@ class UnreconstructableError(RayError):
"memory available with ray.init(object_store_memory=<bytes>) "
"or setting object store limits with "
"ray.remote(object_store_memory=<bytes>). See also: {}".format(
self.object_id.hex(),
self.object_ref.hex(),
"https://docs.ray.io/en/latest/memory-management.html"))
+18 -18
View File
@@ -2,58 +2,58 @@ import ray
import numpy as np
def get(object_ids):
def get(object_refs):
"""Get a single or a collection of remote objects from the object store.
This method is identical to `ray.get` except it adds support for tuples,
ndarrays and dictionaries.
Args:
object_ids: Object ID of the object to get, a list, tuple, ndarray of
object IDs to get or a dict of {key: object ID}.
object_refs: Object ref of the object to get, a list, tuple, ndarray of
object refs to get or a dict of {key: object ref}.
Returns:
A Python object, a list of Python objects or a dict of {key: object}.
"""
if isinstance(object_ids, (tuple, np.ndarray)):
return ray.get(list(object_ids))
elif isinstance(object_ids, dict):
if isinstance(object_refs, (tuple, np.ndarray)):
return ray.get(list(object_refs))
elif isinstance(object_refs, dict):
keys_to_get = [
k for k, v in object_ids.items() if isinstance(v, ray.ObjectID)
k for k, v in object_refs.items() if isinstance(v, ray.ObjectRef)
]
ids_to_get = [
v for k, v in object_ids.items() if isinstance(v, ray.ObjectID)
v for k, v in object_refs.items() if isinstance(v, ray.ObjectRef)
]
values = ray.get(ids_to_get)
result = object_ids.copy()
result = object_refs.copy()
for key, value in zip(keys_to_get, values):
result[key] = value
return result
else:
return ray.get(object_ids)
return ray.get(object_refs)
def wait(object_ids, num_returns=1, timeout=None):
def wait(object_refs, num_returns=1, timeout=None):
"""Return a list of IDs that are ready and a list of IDs that are not.
This method is identical to `ray.wait` except it adds support for tuples
and ndarrays.
Args:
object_ids (List[ObjectID], Tuple(ObjectID), np.array(ObjectID)):
List like of object IDs for objects that may or may not be ready.
object_refs (List[ObjectRef], Tuple(ObjectRef), np.array(ObjectRef)):
List like of object refs for objects that may or may not be ready.
Note that these IDs must be unique.
num_returns (int): The number of object IDs that should be returned.
num_returns (int): The number of object refs that should be returned.
timeout (float): The maximum amount of time in seconds to wait before
returning.
Returns:
A list of object IDs that are ready and a list of the remaining object
A list of object refs that are ready and a list of the remaining object
IDs.
"""
if isinstance(object_ids, (tuple, np.ndarray)):
if isinstance(object_refs, (tuple, np.ndarray)):
return ray.wait(
list(object_ids), num_returns=num_returns, timeout=timeout)
list(object_refs), num_returns=num_returns, timeout=timeout)
return ray.wait(object_ids, num_returns=num_returns, timeout=timeout)
return ray.wait(object_refs, num_returns=num_returns, timeout=timeout)
@@ -6,21 +6,22 @@ BLOCK_SIZE = 10
class DistArray:
def __init__(self, shape, objectids=None):
def __init__(self, shape, object_refs=None):
self.shape = shape
self.ndim = len(shape)
self.num_blocks = [
int(np.ceil(1.0 * a / BLOCK_SIZE)) for a in self.shape
]
if objectids is not None:
self.objectids = objectids
if object_refs is not None:
self.object_refs = object_refs
else:
self.objectids = np.empty(self.num_blocks, dtype=object)
if self.num_blocks != list(self.objectids.shape):
raise Exception("The fields `num_blocks` and `objectids` are "
"inconsistent, `num_blocks` is {} and `objectids` "
"has shape {}".format(self.num_blocks,
list(self.objectids.shape)))
self.object_refs = np.empty(self.num_blocks, dtype=object)
if self.num_blocks != list(self.object_refs.shape):
raise Exception(
"The fields `num_blocks` and `object_refs` are "
"inconsistent, `num_blocks` is {} and `object_refs` "
"has shape {}".format(self.num_blocks,
list(self.object_refs.shape)))
@staticmethod
def compute_block_lower(index, shape):
@@ -52,14 +53,14 @@ class DistArray:
return [int(np.ceil(1.0 * a / BLOCK_SIZE)) for a in shape]
def assemble(self):
"""Assemble an array from a distributed array of object IDs."""
first_block = ray.get(self.objectids[(0, ) * self.ndim])
"""Assemble an array from a distributed array of object refs."""
first_block = ray.get(self.object_refs[(0, ) * self.ndim])
dtype = first_block.dtype
result = np.zeros(self.shape, dtype=dtype)
for index in np.ndindex(*self.num_blocks):
lower = DistArray.compute_block_lower(index, self.shape)
upper = DistArray.compute_block_upper(index, self.shape)
value = ray.get(self.objectids[index])
value = ray.get(self.object_refs[index])
result[tuple(slice(l, u) for (l, u) in zip(lower, upper))] = value
return result
@@ -83,7 +84,7 @@ def numpy_to_dist(a):
lower = DistArray.compute_block_lower(index, a.shape)
upper = DistArray.compute_block_upper(index, a.shape)
idx = tuple(slice(l, u) for (l, u) in zip(lower, upper))
result.objectids[index] = ray.put(a[idx])
result.object_refs[index] = ray.put(a[idx])
return result
@@ -91,7 +92,7 @@ def numpy_to_dist(a):
def zeros(shape, dtype_name="float"):
result = DistArray(shape)
for index in np.ndindex(*result.num_blocks):
result.objectids[index] = ra.zeros.remote(
result.object_refs[index] = ra.zeros.remote(
DistArray.compute_block_shape(index, shape), dtype_name=dtype_name)
return result
@@ -100,7 +101,7 @@ def zeros(shape, dtype_name="float"):
def ones(shape, dtype_name="float"):
result = DistArray(shape)
for index in np.ndindex(*result.num_blocks):
result.objectids[index] = ra.ones.remote(
result.object_refs[index] = ra.ones.remote(
DistArray.compute_block_shape(index, shape), dtype_name=dtype_name)
return result
@@ -111,7 +112,7 @@ def copy(a):
for index in np.ndindex(*result.num_blocks):
# We don't need to actually copy the objects because remote objects are
# immutable.
result.objectids[index] = a.objectids[index]
result.object_refs[index] = a.object_refs[index]
return result
@@ -123,10 +124,10 @@ def eye(dim1, dim2=-1, dtype_name="float"):
for (i, j) in np.ndindex(*result.num_blocks):
block_shape = DistArray.compute_block_shape([i, j], shape)
if i == j:
result.objectids[i, j] = ra.eye.remote(
result.object_refs[i, j] = ra.eye.remote(
block_shape[0], block_shape[1], dtype_name=dtype_name)
else:
result.objectids[i, j] = ra.zeros.remote(
result.object_refs[i, j] = ra.zeros.remote(
block_shape, dtype_name=dtype_name)
return result
@@ -139,11 +140,12 @@ def triu(a):
result = DistArray(a.shape)
for (i, j) in np.ndindex(*result.num_blocks):
if i < j:
result.objectids[i, j] = ra.copy.remote(a.objectids[i, j])
result.object_refs[i, j] = ra.copy.remote(a.object_refs[i, j])
elif i == j:
result.objectids[i, j] = ra.triu.remote(a.objectids[i, j])
result.object_refs[i, j] = ra.triu.remote(a.object_refs[i, j])
else:
result.objectids[i, j] = ra.zeros_like.remote(a.objectids[i, j])
result.object_refs[i, j] = ra.zeros_like.remote(
a.object_refs[i, j])
return result
@@ -155,11 +157,12 @@ def tril(a):
result = DistArray(a.shape)
for (i, j) in np.ndindex(*result.num_blocks):
if i > j:
result.objectids[i, j] = ra.copy.remote(a.objectids[i, j])
result.object_refs[i, j] = ra.copy.remote(a.object_refs[i, j])
elif i == j:
result.objectids[i, j] = ra.tril.remote(a.objectids[i, j])
result.object_refs[i, j] = ra.tril.remote(a.object_refs[i, j])
else:
result.objectids[i, j] = ra.zeros_like.remote(a.objectids[i, j])
result.object_refs[i, j] = ra.zeros_like.remote(
a.object_refs[i, j])
return result
@@ -191,8 +194,8 @@ def dot(a, b):
shape = [a.shape[0], b.shape[1]]
result = DistArray(shape)
for (i, j) in np.ndindex(*result.num_blocks):
args = list(a.objectids[i, :]) + list(b.objectids[:, j])
result.objectids[i, j] = blockwise_dot.remote(*args)
args = list(a.object_refs[i, :]) + list(b.object_refs[:, j])
result.object_refs[i, j] = blockwise_dot.remote(*args)
return result
@@ -203,9 +206,9 @@ def subblocks(a, *ranges):
the `a`. The result and `a` will have the same number of dimensions. For
example,
subblocks(a, [0, 1], [2, 4])
will produce a DistArray whose objectids are
[[a.objectids[0, 2], a.objectids[0, 4]],
[a.objectids[1, 2], a.objectids[1, 4]]]
will produce a DistArray whose object_refs are
[[a.object_refs[0, 2], a.object_refs[0, 4]],
[a.object_refs[1, 2], a.object_refs[1, 4]]]
We allow the user to pass in an empty list [] to indicate the full range.
"""
ranges = list(ranges)
@@ -236,7 +239,7 @@ def subblocks(a, *ranges):
for i in range(a.ndim)]
result = DistArray(shape)
for index in np.ndindex(*result.num_blocks):
result.objectids[index] = a.objectids[tuple(
result.object_refs[index] = a.object_refs[tuple(
ranges[i][index[i]] for i in range(a.ndim))]
return result
@@ -250,7 +253,7 @@ def transpose(a):
result = DistArray([a.shape[1], a.shape[0]])
for i in range(result.num_blocks[0]):
for j in range(result.num_blocks[1]):
result.objectids[i, j] = ra.transpose.remote(a.objectids[j, i])
result.object_refs[i, j] = ra.transpose.remote(a.object_refs[j, i])
return result
@@ -263,8 +266,8 @@ def add(x1, x2):
x1.shape, x2.shape))
result = DistArray(x1.shape)
for index in np.ndindex(*result.num_blocks):
result.objectids[index] = ra.add.remote(x1.objectids[index],
x2.objectids[index])
result.object_refs[index] = ra.add.remote(x1.object_refs[index],
x2.object_refs[index])
return result
@@ -277,6 +280,6 @@ def subtract(x1, x2):
.format(x1.shape, x2.shape))
result = DistArray(x1.shape)
for index in np.ndindex(*result.num_blocks):
result.objectids[index] = ra.subtract.remote(x1.objectids[index],
x2.objectids[index])
result.object_refs[index] = ra.subtract.remote(x1.object_refs[index],
x2.object_refs[index])
return result
@@ -35,7 +35,7 @@ def tsqr(a):
q_tree = np.empty((num_blocks, K), dtype=object)
current_rs = []
for i in range(num_blocks):
block = a.objectids[i, 0]
block = a.object_refs[i, 0]
q, r = ra.linalg.qr.remote(block)
q_tree[i, 0] = q
current_rs.append(r)
@@ -57,8 +57,8 @@ def tsqr(a):
else:
q_shape = [a.shape[0], a.shape[0]]
q_num_blocks = core.DistArray.compute_num_blocks(q_shape)
q_objectids = np.empty(q_num_blocks, dtype=object)
q_result = core.DistArray(q_shape, q_objectids)
q_object_refs = np.empty(q_num_blocks, dtype=object)
q_result = core.DistArray(q_shape, q_object_refs)
# reconstruct output
for i in range(num_blocks):
@@ -75,7 +75,7 @@ def tsqr(a):
q_block_current = ra.dot.remote(
q_block_current,
ra.subarray.remote(q_tree[ith_index, j], lower, upper))
q_result.objectids[i] = q_block_current
q_result.object_refs[i] = q_block_current
r = current_rs[0]
return q_result, ray.get(r)
@@ -142,7 +142,7 @@ def tsqr_hr(a):
q, r_temp = tsqr.remote(a)
y, u, s = modified_lu.remote(q)
y_blocked = ray.get(y)
t, y_top = tsqr_hr_helper1.remote(u, s, y_blocked.objectids[0, 0],
t, y_top = tsqr_hr_helper1.remote(u, s, y_blocked.object_refs[0, 0],
a.shape[1])
r = tsqr_hr_helper2.remote(s, r_temp)
return ray.get(y), ray.get(t), ray.get(y_top), ray.get(r)
@@ -167,9 +167,9 @@ def qr(a):
k = min(m, n)
# we will store our scratch work in a_work
a_work = core.DistArray(a.shape, np.copy(a.objectids))
a_work = core.DistArray(a.shape, np.copy(a.object_refs))
result_dtype = np.linalg.qr(ray.get(a.objectids[0, 0]))[0].dtype.name
result_dtype = np.linalg.qr(ray.get(a.object_refs[0, 0]))[0].dtype.name
# TODO(rkn): It would be preferable not to get this right after creating
# it.
r_res = ray.get(core.zeros.remote([k, n], result_dtype))
@@ -188,28 +188,29 @@ def qr(a):
y_val = ray.get(y)
for j in range(i, a.num_blocks[0]):
y_res.objectids[j, i] = y_val.objectids[j - i, 0]
y_res.object_refs[j, i] = y_val.object_refs[j - i, 0]
if a.shape[0] > a.shape[1]:
# in this case, R needs to be square
R_shape = ray.get(ra.shape.remote(R))
eye_temp = ra.eye.remote(
R_shape[1], R_shape[0], dtype_name=result_dtype)
r_res.objectids[i, i] = ra.dot.remote(eye_temp, R)
r_res.object_refs[i, i] = ra.dot.remote(eye_temp, R)
else:
r_res.objectids[i, i] = R
r_res.object_refs[i, i] = R
Ts.append(core.numpy_to_dist.remote(t))
for c in range(i + 1, a.num_blocks[1]):
W_rcs = []
for r in range(i, a.num_blocks[0]):
y_ri = y_val.objectids[r - i, 0]
W_rcs.append(qr_helper2.remote(y_ri, a_work.objectids[r, c]))
y_ri = y_val.object_refs[r - i, 0]
W_rcs.append(qr_helper2.remote(y_ri, a_work.object_refs[r, c]))
W_c = ra.sum_list.remote(*W_rcs)
for r in range(i, a.num_blocks[0]):
y_ri = y_val.objectids[r - i, 0]
A_rc = qr_helper1.remote(a_work.objectids[r, c], y_ri, t, W_c)
a_work.objectids[r, c] = A_rc
r_res.objectids[i, c] = a_work.objectids[i, c]
y_ri = y_val.object_refs[r - i, 0]
A_rc = qr_helper1.remote(a_work.object_refs[r, c], y_ri, t,
W_c)
a_work.object_refs[r, c] = A_rc
r_res.object_refs[i, c] = a_work.object_refs[i, c]
# construct q_res from Ys and Ts
q = core.eye.remote(m, k, dtype_name=result_dtype)
@@ -8,9 +8,9 @@ from .core import DistArray
@ray.remote
def normal(shape):
num_blocks = DistArray.compute_num_blocks(shape)
objectids = np.empty(num_blocks, dtype=object)
object_refs = np.empty(num_blocks, dtype=object)
for index in np.ndindex(*num_blocks):
objectids[index] = ra.random.normal.remote(
object_refs[index] = ra.random.normal.remote(
DistArray.compute_block_shape(index, shape))
result = DistArray(shape, objectids)
result = DistArray(shape, object_refs)
return result
+2 -2
View File
@@ -253,7 +253,7 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor):
"""Get the function id calculated from this descriptor.
Returns:
The value of ray.ObjectID that represents the function id.
The value of ray.ObjectRef that represents the function id.
"""
if not self._function_id:
self._function_id = self._get_function_id()
@@ -266,7 +266,7 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor):
descriptor.
Returns:
ray.ObjectID to represent the function descriptor.
ray.ObjectRef to represent the function descriptor.
"""
function_id_hash = hashlib.sha1()
# Include the function module and name in the hash.
@@ -12,8 +12,8 @@ from ray.includes.unique_ids cimport (
cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
cdef cppclass CGlobalStateAccessor "ray::gcs::GlobalStateAccessor":
CGlobalStateAccessor(const c_string &redis_address,
const c_string &redis_password,
c_bool is_test)
const c_string &redis_password,
c_bool is_test)
c_bool Connect()
void Disconnect()
c_vector[c_string] GetAllJobInfo()
+17 -8
View File
@@ -16,12 +16,17 @@ cdef class GlobalStateAccessor:
cdef:
unique_ptr[CGlobalStateAccessor] inner
def __init__(self, redis_address, redis_password, c_bool is_test_client=False):
def __init__(self, redis_address, redis_password,
c_bool is_test_client=False):
if not redis_password:
redis_password = ""
self.inner.reset(
new CGlobalStateAccessor(redis_address.encode("ascii"),
redis_password.encode("ascii"), is_test_client))
new CGlobalStateAccessor(
redis_address.encode("ascii"),
redis_password.encode("ascii"),
is_test_client,
),
)
def connect(self):
return self.inner.get().Connect()
@@ -41,8 +46,9 @@ cdef class GlobalStateAccessor:
def get_object_table(self):
return self.inner.get().GetAllObjectInfo()
def get_object_info(self, object_id):
object_info = self.inner.get().GetObjectInfo(CObjectID.FromBinary(object_id.binary()))
def get_object_info(self, object_ref):
object_info = self.inner.get().GetObjectInfo(
CObjectID.FromBinary(object_ref.binary()))
if object_info:
return c_string(object_info.get().data(), object_info.get().size())
return None
@@ -51,19 +57,22 @@ cdef class GlobalStateAccessor:
return self.inner.get().GetAllActorInfo()
def get_actor_info(self, actor_id):
actor_info = self.inner.get().GetActorInfo(CActorID.FromBinary(actor_id.binary()))
actor_info = self.inner.get().GetActorInfo(
CActorID.FromBinary(actor_id.binary()))
if actor_info:
return c_string(actor_info.get().data(), actor_info.get().size())
return None
def get_node_resource_info(self, node_id):
return self.inner.get().GetNodeResourceInfo(CClientID.FromBinary(node_id.binary()))
return self.inner.get().GetNodeResourceInfo(
CClientID.FromBinary(node_id.binary()))
def get_worker_table(self):
return self.inner.get().GetAllWorkerInfo()
def get_worker_info(self, worker_id):
worker_info = self.inner.get().GetWorkerInfo(CWorkerID.FromBinary(worker_id.binary()))
worker_info = self.inner.get().GetWorkerInfo(
CWorkerID.FromBinary(worker_id.binary()))
if worker_info:
return c_string(worker_info.get().data(), worker_info.get().size())
return None
+72
View File
@@ -0,0 +1,72 @@
from ray.includes.unique_ids cimport CObjectID
import ray
cdef class ObjectRef(BaseID):
def __init__(self, id):
check_id(id)
self.data = CObjectID.FromBinary(<c_string>id)
self.in_core_worker = False
worker = ray.worker.global_worker
# TODO(edoakes): We should be able to remove the in_core_worker flag.
# But there are still some dummy object refs being created outside the
# context of a core worker.
if hasattr(worker, "core_worker"):
worker.core_worker.add_object_ref_reference(self)
self.in_core_worker = True
def __dealloc__(self):
if self.in_core_worker:
try:
worker = ray.worker.global_worker
worker.core_worker.remove_object_ref_reference(self)
except Exception as e:
# There is a strange error in rllib that causes the above to
# fail. Somehow the global 'ray' variable corresponding to the
# imported package is None when this gets called. Unfortunately
# this is hard to debug because __dealloc__ is called during
# garbage collection so we can't get a good stack trace. In any
# case, there's not much we can do besides ignore it
# (re-importing ray won't help).
pass
cdef CObjectID native(self):
return <CObjectID>self.data
def size(self):
return CObjectID.Size()
def binary(self):
return self.data.Binary()
def hex(self):
return decode(self.data.Hex())
def is_nil(self):
return self.data.IsNil()
def task_id(self):
return TaskID(self.data.TaskId().Binary())
cdef size_t hash(self):
return self.data.Hash()
@classmethod
def nil(cls):
return cls(CObjectID.Nil().Binary())
@classmethod
def from_random(cls):
return cls(CObjectID.FromRandom().Binary())
def __await__(self):
# Delayed import because this can only be imported in py3.
from ray.async_compat import get_async
return get_async(self).__await__()
def as_future(self):
# Delayed import because this can only be imported in py3.
from ray.async_compat import get_async
return get_async(self)
+17 -12
View File
@@ -177,7 +177,8 @@ cdef class MessagePackSerializer(object):
raise Exception('Unrecognized ext type id: {}'.format(code))
try:
gc.disable() # Performance optimization for msgpack.
return msgpack.loads(s, ext_hook=_ext_hook, raw=False, strict_map_key=False)
return msgpack.loads(s, ext_hook=_ext_hook, raw=False,
strict_map_key=False)
finally:
gc.enable()
@@ -371,11 +372,11 @@ cdef class Pickle5Writer:
cdef class SerializedObject(object):
cdef:
object _metadata
object _contained_object_ids
object _contained_object_refs
def __init__(self, metadata, contained_object_ids=None):
def __init__(self, metadata, contained_object_refs=None):
self._metadata = metadata
self._contained_object_ids = contained_object_ids or []
self._contained_object_refs = contained_object_refs or []
@property
def total_bytes(self):
@@ -387,8 +388,8 @@ cdef class SerializedObject(object):
return self._metadata
@property
def contained_object_ids(self):
return self._contained_object_ids
def contained_object_refs(self):
return self._contained_object_refs
@cython.boundscheck(False)
@cython.wraparound(False)
@@ -404,9 +405,9 @@ cdef class Pickle5SerializedObject(SerializedObject):
object _total_bytes
def __init__(self, metadata, inband, Pickle5Writer writer,
contained_object_ids):
contained_object_refs):
super(Pickle5SerializedObject, self).__init__(metadata,
contained_object_ids)
contained_object_refs)
self.inband = inband
self.writer = writer
# cached total bytes
@@ -438,13 +439,17 @@ cdef class MessagePackSerializedObject(SerializedObject):
def __init__(self, metadata, msgpack_data,
SerializedObject nest_serialized_object=None):
if nest_serialized_object:
contained_object_ids = nest_serialized_object.contained_object_ids
contained_object_refs = (
nest_serialized_object.contained_object_refs
)
total_bytes = nest_serialized_object.total_bytes
else:
contained_object_ids = []
contained_object_refs = []
total_bytes = 0
super(MessagePackSerializedObject, self).__init__(metadata,
contained_object_ids)
super(MessagePackSerializedObject, self).__init__(
metadata,
contained_object_refs,
)
self.nest_serialized_object = nest_serialized_object
self.msgpack_header = msgpack_header = msgpack.dumps(len(msgpack_data))
self.msgpack_data = msgpack_data
+3 -69
View File
@@ -126,75 +126,6 @@ cdef class UniqueID(BaseID):
return self.data.Hash()
cdef class ObjectID(BaseID):
def __init__(self, id):
check_id(id)
self.data = CObjectID.FromBinary(<c_string>id)
self.in_core_worker = False
worker = ray.worker.global_worker
# TODO(edoakes): We should be able to remove the in_core_worker flag.
# But there are still some dummy object IDs being created outside the
# context of a core worker.
if hasattr(worker, "core_worker"):
worker.core_worker.add_object_id_reference(self)
self.in_core_worker = True
def __dealloc__(self):
if self.in_core_worker:
try:
worker = ray.worker.global_worker
worker.core_worker.remove_object_id_reference(self)
except Exception as e:
# There is a strange error in rllib that causes the above to
# fail. Somehow the global 'ray' variable corresponding to the
# imported package is None when this gets called. Unfortunately
# this is hard to debug because __dealloc__ is called during
# garbage collection so we can't get a good stack trace. In any
# case, there's not much we can do besides ignore it
# (re-importing ray won't help).
pass
cdef CObjectID native(self):
return <CObjectID>self.data
def size(self):
return CObjectID.Size()
def binary(self):
return self.data.Binary()
def hex(self):
return decode(self.data.Hex())
def is_nil(self):
return self.data.IsNil()
def task_id(self):
return TaskID(self.data.TaskId().Binary())
cdef size_t hash(self):
return self.data.Hash()
@classmethod
def nil(cls):
return cls(CObjectID.Nil().Binary())
@classmethod
def from_random(cls):
return cls(CObjectID.FromRandom().Binary())
def __await__(self):
# Delayed import because this can only be imported in py3.
from ray.async_compat import get_async
return get_async(self).__await__()
def as_future(self):
# Delayed import because this can only be imported in py3.
from ray.async_compat import get_async
return get_async(self)
cdef class TaskID(BaseID):
cdef CTaskID data
@@ -397,6 +328,9 @@ cdef class ActorClassID(UniqueID):
cdef CActorClassID native(self):
return <CActorClassID>self.data
# This type alias is for backward compatibility.
ObjectID = ObjectRef
_ID_TYPES = [
ActorCheckpointID,
ActorClassID,
+15 -14
View File
@@ -29,7 +29,7 @@ def memory_summary():
return reply.memory_summary
def free(object_ids, local_only=False, delete_creating_tasks=False):
def free(object_refs, local_only=False, delete_creating_tasks=False):
"""Free a list of IDs from object stores.
This function is a low-level API which should be used in restricted
@@ -48,7 +48,7 @@ def free(object_ids, local_only=False, delete_creating_tasks=False):
>>> free([x_id]) # unpin & delete x globally
Args:
object_ids (List[ObjectID]): List of object IDs to delete.
object_refs (List[ObjectRef]): List of object refs to delete.
local_only (bool): Whether only deleting the list of objects in local
object store or all object stores.
delete_creating_tasks (bool): Whether also delete the object creating
@@ -56,23 +56,24 @@ def free(object_ids, local_only=False, delete_creating_tasks=False):
"""
worker = ray.worker.global_worker
if isinstance(object_ids, ray.ObjectID):
object_ids = [object_ids]
if isinstance(object_refs, ray.ObjectRef):
object_refs = [object_refs]
if not isinstance(object_ids, list):
raise TypeError("free() expects a list of ObjectID, got {}".format(
type(object_ids)))
if not isinstance(object_refs, list):
raise TypeError("free() expects a list of ObjectRef, got {}".format(
type(object_refs)))
# Make sure that the values are object IDs.
for object_id in object_ids:
if not isinstance(object_id, ray.ObjectID):
raise TypeError("Attempting to call `free` on the value {}, "
"which is not an ray.ObjectID.".format(object_id))
# Make sure that the values are object refs.
for object_ref in object_refs:
if not isinstance(object_ref, ray.ObjectRef):
raise TypeError(
"Attempting to call `free` on the value {}, "
"which is not an ray.ObjectRef.".format(object_ref))
worker.check_connected()
with profiling.profile("ray.free"):
if len(object_ids) == 0:
if len(object_refs) == 0:
return
worker.core_worker.free_objects(object_ids, local_only,
worker.core_worker.free_objects(object_refs, local_only,
delete_creating_tasks)
+3 -3
View File
@@ -283,9 +283,9 @@ class Node:
return self._ray_params.load_code_from_local
@property
def object_id_seed(self):
"""Get the seed for deterministic generation of object IDs"""
return self._ray_params.object_id_seed
def object_ref_seed(self):
"""Get the seed for deterministic generation of object refs"""
return self._ray_params.object_ref_seed
@property
def plasma_store_socket_name(self):
+5 -5
View File
@@ -40,9 +40,9 @@ class RayParams:
on. If not set or set to 0, random ports will be chosen.
max_worker_port (int): The highest port number that workers will bind
on. If set, min_worker_port must also be set.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
same job in order to generate the object IDs in a consistent
object_ref_seed (int): Used to seed the deterministic generation of
object refs. The same value can be used across multiple runs of the
same job in order to generate the object refs in a consistent
manner. However, the same ID should not be used for different jobs.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
@@ -107,7 +107,7 @@ class RayParams:
raylet_ip_address=None,
min_worker_port=None,
max_worker_port=None,
object_id_seed=None,
object_ref_seed=None,
driver_mode=None,
redirect_worker_output=None,
redirect_output=None,
@@ -131,7 +131,7 @@ class RayParams:
java_worker_options=None,
load_code_from_local=False,
_internal_config=None):
self.object_id_seed = object_id_seed
self.object_ref_seed = object_ref_seed
self.redis_address = redis_address
self.num_cpus = num_cpus
self.num_gpus = num_gpus
+7 -7
View File
@@ -18,23 +18,23 @@ def main():
cluster.add_node(
object_store_memory=20 * 1024 * 1024 * 1024, num_gpus=1, num_cpus=16)
object_id_list = []
object_ref_list = []
for i in range(0, 10):
object_id = ray.put(np.random.rand(1024 * 128, 1024))
object_id_list.append(object_id)
object_ref = ray.put(np.random.rand(1024 * 128, 1024))
object_ref_list.append(object_ref)
@ray.remote(num_gpus=1)
def f(object_id_list):
def f(object_ref_list):
diffs = []
for object_id in object_id_list:
for object_ref in object_ref_list:
before = time.time()
ray.get(object_id)
ray.get(object_ref)
after = time.time()
diffs.append(after - before)
time.sleep(1)
return np.mean(diffs), np.std(diffs)
time_diff, time_diff_std = ray.get(f.remote(object_id_list))
time_diff, time_diff_std = ray.get(f.remote(object_ref_list))
print("latency to get an 1G object over network", round(time_diff, 2),
"+-", round(time_diff_std, 2))
+6 -6
View File
@@ -47,7 +47,7 @@ class RemoteFunction:
invoking the function. The decorator must return a function that
takes in two arguments ("args" and "kwargs"). In most cases, it
should call the function that was passed into the decorator and
return the resulting ObjectIDs. For an example, see
return the resulting ObjectRefs. For an example, see
"test_decorated_function" in "python/ray/tests/test_basic.py".
_function_signature: The function signature.
_last_export_session_and_job: A pair of the last exported session
@@ -203,14 +203,14 @@ class RemoteFunction:
assert not self._is_cross_language, \
"Cross language remote function " \
"cannot be executed locally."
object_ids = worker.core_worker.submit_task(
object_refs = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args,
num_return_vals, resources, max_retries)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids
if len(object_refs) == 1:
return object_refs[0]
elif len(object_refs) > 1:
return object_refs
if self._decorator is not None:
invocation = self._decorator(invocation)
+51 -50
View File
@@ -88,7 +88,7 @@ def _try_to_compute_deterministic_class_id(cls, depth=5):
class SerializationContext:
"""Initialize the serialization library.
This defines a custom serializer for object IDs and also tells ray to
This defines a custom serializer for object refs and also tells ray to
serialize several exception classes that we define for error handling.
"""
@@ -99,14 +99,14 @@ class SerializationContext:
def actor_handle_serializer(obj):
serialized, actor_handle_id = obj._serialization_helper()
# Update ref counting for the actor handle
self.add_contained_object_id(actor_handle_id)
self.add_contained_object_ref(actor_handle_id)
return serialized
def actor_handle_deserializer(serialized_obj):
# If this actor handle was stored in another object, then tell the
# core worker.
context = ray.worker.global_worker.get_serialization_context()
outer_id = context.get_outer_object_id()
outer_id = context.get_outer_object_ref()
return ray.actor.ActorHandle._deserialization_helper(
serialized_obj, outer_id)
@@ -121,22 +121,22 @@ class SerializationContext:
def id_deserializer(serialized_obj):
return serialized_obj[0](*serialized_obj[1])
def object_id_serializer(obj):
self.add_contained_object_id(obj)
def object_ref_serializer(obj):
self.add_contained_object_ref(obj)
worker = ray.worker.global_worker
worker.check_connected()
obj, owner_address = (
worker.core_worker.serialize_and_promote_object_id(obj))
worker.core_worker.serialize_and_promote_object_ref(obj))
obj = id_serializer(obj)
return obj, owner_address
def object_id_deserializer(serialized_obj):
obj_id, owner_address = serialized_obj
def object_ref_deserializer(serialized_obj):
obj_ref, owner_address = serialized_obj
# NOTE(swang): Must deserialize the object first before asking
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectID is greater than 0 by the
# that the ref count for the ObjectRef is greater than 0 by the
# time the core worker resolves the value of the object.
deserialized_object_id = id_deserializer(obj_id)
deserialized_object_ref = id_deserializer(obj_ref)
# TODO(edoakes): we should be able to just capture a reference
# to 'self' here instead, but this function is itself pickled
# somewhere, which causes an error.
@@ -146,19 +146,19 @@ class SerializationContext:
worker.check_connected()
# UniqueIDs are serialized as
# (class name, (unique bytes,)).
outer_id = context.get_outer_object_id()
# outer_id is None in the case that this ObjectID was closed
outer_id = context.get_outer_object_ref()
# outer_id is None in the case that this ObjectRef was closed
# over in a function or pickled directly using pickle.dumps().
if outer_id is None:
outer_id = ray.ObjectID.nil()
worker.core_worker.deserialize_and_register_object_id(
obj_id[1][0], outer_id, owner_address)
return deserialized_object_id
outer_id = ray.ObjectRef.nil()
worker.core_worker.deserialize_and_register_object_ref(
obj_ref[1][0], outer_id, owner_address)
return deserialized_object_ref
for id_type in ray._raylet._ID_TYPES:
if id_type == ray._raylet.ObjectID:
if id_type == ray._raylet.ObjectRef:
self._register_cloudpickle_serializer(
id_type, object_id_serializer, object_id_deserializer)
id_type, object_ref_serializer, object_ref_deserializer)
else:
self._register_cloudpickle_serializer(id_type, id_serializer,
id_deserializer)
@@ -180,36 +180,36 @@ class SerializationContext:
def set_out_of_band_serialization(self):
self._thread_local.in_band = False
def set_outer_object_id(self, outer_object_id):
self._thread_local.outer_object_id = outer_object_id
def set_outer_object_ref(self, outer_object_ref):
self._thread_local.outer_object_ref = outer_object_ref
def get_outer_object_id(self):
return getattr(self._thread_local, "outer_object_id", None)
def get_outer_object_ref(self):
return getattr(self._thread_local, "outer_object_ref", None)
def get_and_clear_contained_object_ids(self):
if not hasattr(self._thread_local, "object_ids"):
self._thread_local.object_ids = set()
def get_and_clear_contained_object_refs(self):
if not hasattr(self._thread_local, "object_refs"):
self._thread_local.object_refs = set()
return set()
object_ids = self._thread_local.object_ids
self._thread_local.object_ids = set()
return object_ids
object_refs = self._thread_local.object_refs
self._thread_local.object_refs = set()
return object_refs
def add_contained_object_id(self, object_id):
def add_contained_object_ref(self, object_ref):
if self.is_in_band_serialization():
# This object ID is being stored in an object. Add the ID to the
# This object ref is being stored in an object. Add the ID to the
# list of IDs contained in the object so that we keep the inner
# object value alive as long as the outer object is in scope.
if not hasattr(self._thread_local, "object_ids"):
self._thread_local.object_ids = set()
self._thread_local.object_ids.add(object_id)
if not hasattr(self._thread_local, "object_refs"):
self._thread_local.object_refs = set()
self._thread_local.object_refs.add(object_ref)
else:
# If this serialization is out-of-band (e.g., from a call to
# cloudpickle directly or captured in a remote function/actor),
# then pin the object for the lifetime of this worker by adding
# a local reference that won't ever be removed.
ray.worker.global_worker.core_worker.add_object_id_reference(
object_id)
ray.worker.global_worker.core_worker.add_object_ref_reference(
object_ref)
def _deserialize_pickle5_data(self, data):
try:
@@ -242,7 +242,7 @@ class SerializationContext:
raise DeserializationError()
return obj
def _deserialize_object(self, data, metadata, object_id):
def _deserialize_object(self, data, metadata, object_ref):
if metadata:
if metadata in [
ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE,
@@ -261,7 +261,7 @@ class SerializationContext:
except Exception:
raise Exception(
"Can't deserialize object: {}, metadata: {}".format(
object_id, metadata))
object_ref, metadata))
# RayTaskError is serialized with pickle5 in the data field.
# TODO (kfstorm): exception serialization should be language
@@ -277,7 +277,8 @@ class SerializationContext:
elif error_type == ErrorType.Value("TASK_CANCELLED"):
return RayCancellationError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return UnreconstructableError(ray.ObjectID(object_id.binary()))
return UnreconstructableError(
ray.ObjectRef(object_ref.binary()))
else:
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \
"Tried to get object that has been promoted to plasma."
@@ -293,22 +294,22 @@ class SerializationContext:
def deserialize_objects(self,
data_metadata_pairs,
object_ids,
object_refs,
error_timeout=10):
assert len(data_metadata_pairs) == len(object_ids)
assert len(data_metadata_pairs) == len(object_refs)
start_time = time.time()
results = []
warning_sent = False
i = 0
while i < len(object_ids):
object_id = object_ids[i]
while i < len(object_refs):
object_ref = object_refs[i]
data, metadata = data_metadata_pairs[i]
assert self.get_outer_object_id() is None
self.set_outer_object_id(object_id)
assert self.get_outer_object_ref() is None
self.set_outer_object_ref(object_ref)
try:
results.append(
self._deserialize_object(data, metadata, object_id))
self._deserialize_object(data, metadata, object_ref))
i += 1
except DeserializationError:
# Wait a little bit for the import thread to import the class.
@@ -330,27 +331,27 @@ class SerializationContext:
job_id=self.worker.current_job_id)
warning_sent = True
finally:
# Must clear ObjectID to not hold a reference.
self.set_outer_object_id(None)
# Must clear ObjectRef to not hold a reference.
self.set_outer_object_ref(None)
return results
def _serialize_to_pickle5(self, metadata, value):
writer = Pickle5Writer()
# TODO(swang): Check that contained_object_ids is empty.
# TODO(swang): Check that contained_object_refs is empty.
try:
self.set_in_band_serialization()
inband = pickle.dumps(
value, protocol=5, buffer_callback=writer.buffer_callback)
except Exception as e:
self.get_and_clear_contained_object_ids()
self.get_and_clear_contained_object_refs()
raise e
finally:
self.set_out_of_band_serialization()
return Pickle5SerializedObject(
metadata, inband, writer,
self.get_and_clear_contained_object_ids())
self.get_and_clear_contained_object_refs())
def _serialize_to_msgpack(self, metadata, value):
python_objects = []
+1 -1
View File
@@ -20,7 +20,7 @@ class RayServeHandle:
Traffic=...
)
>>> handle.remote(my_request_content)
ObjectID(...)
ObjectRef(...)
>>> ray.get(handle.remote(...))
# result
>>> ray.get(handle.remote(let_it_crash_request))
+1 -1
View File
@@ -198,7 +198,7 @@ class Router:
self.endpoint_queues[endpoint].appendleft(query)
self.flush_endpoint_queue(endpoint)
# Note: a future change can be to directly return the ObjectID from
# Note: a future change can be to directly return the ObjectRef from
# replica task submission
try:
result = await query.async_future
+21 -20
View File
@@ -131,7 +131,7 @@ class GlobalState:
"""Execute a Redis command on the appropriate Redis shard based on key.
Args:
key: The object ID or the task ID that the query is about.
key: The object ref or the task ID that the query is about.
args: The command to run.
Returns:
@@ -155,11 +155,11 @@ class GlobalState:
result.extend(list(client.scan_iter(match=pattern)))
return result
def object_table(self, object_id=None):
"""Fetch and parse the object table info for one or more object IDs.
def object_table(self, object_ref=None):
"""Fetch and parse the object table info for one or more object refs.
Args:
object_id: An object ID to fetch information about. If this is
object_ref: An object ref to fetch information about. If this is
None, then the entire object table is fetched.
Returns:
@@ -167,9 +167,10 @@ class GlobalState:
"""
self._check_connected()
if object_id is not None:
object_id = ray.ObjectID(hex_to_binary(object_id))
object_info = self.global_state_accessor.get_object_info(object_id)
if object_ref is not None:
object_ref = ray.ObjectRef(hex_to_binary(object_ref))
object_info = self.global_state_accessor.get_object_info(
object_ref)
if object_info is None:
return {}
else:
@@ -182,7 +183,7 @@ class GlobalState:
for i in range(len(object_table)):
object_location_info = gcs_utils.ObjectLocationInfo.FromString(
object_table[i])
results[binary_to_hex(object_location_info.object_id)] = \
results[binary_to_hex(object_location_info.object_ref)] = \
self._gen_object_info(object_location_info)
return results
@@ -196,8 +197,8 @@ class GlobalState:
locations.append(ray.utils.binary_to_hex(location.manager))
object_info = {
"ObjectID": ray.utils.binary_to_hex(
object_location_info.object_id),
"ObjectRef": ray.utils.binary_to_hex(
object_location_info.object_ref),
"Locations": locations,
}
return object_info
@@ -538,21 +539,21 @@ class GlobalState:
for event in items:
if event["event_type"] == "transfer_send":
object_id, remote_node_id, _, _ = event["extra_data"]
object_ref, remote_node_id, _, _ = event["extra_data"]
elif event["event_type"] == "transfer_receive":
object_id, remote_node_id, _, _ = event["extra_data"]
object_ref, remote_node_id, _, _ = event["extra_data"]
elif event["event_type"] == "receive_pull_request":
object_id, remote_node_id = event["extra_data"]
object_ref, remote_node_id = event["extra_data"]
else:
assert False, "This should be unreachable."
# Choose a color by reading the first couple of hex digits of
# the object ID as an integer and turning that into a color.
object_id_int = int(object_id[:2], 16)
color = self._chrome_tracing_colors[object_id_int % len(
# the object ref as an integer and turning that into a color.
object_ref_int = int(object_ref[:2], 16)
color = self._chrome_tracing_colors[object_ref_int % len(
self._chrome_tracing_colors)]
new_event = {
@@ -921,17 +922,17 @@ def actors(actor_id=None):
return state.actor_table(actor_id=actor_id)
def objects(object_id=None):
"""Fetch and parse the object table info for one or more object IDs.
def objects(object_ref=None):
"""Fetch and parse the object table info for one or more object refs.
Args:
object_id: An object ID to fetch information about. If this is None,
object_ref: An object ref to fetch information about. If this is None,
then the entire object table is fetched.
Returns:
Information from the object table.
"""
return state.object_table(object_id=object_id)
return state.object_table(object_ref=object_ref)
def timeline(filename=None):
@@ -72,8 +72,8 @@ def test_task_forward(benchmark, num_tasks):
benchmark(benchmark_task_forward, f, num_tasks)
def benchmark_transfer_object(actor, object_ids):
ray.get(actor.f.remote(object_ids))
def benchmark_transfer_object(actor, object_refs):
ray.get(actor.f.remote(object_refs))
@pytest.mark.benchmark
@@ -87,14 +87,14 @@ def test_transfer_performance(benchmark, ray_start_cluster_head, object_number,
@ray.remote(resources={"my_resource": 1})
class ObjectActor:
def f(self, object_ids):
ray.get(object_ids)
def f(self, object_refs):
ray.get(object_refs)
# setup remote actor
actor = ObjectActor.remote()
actor.f.remote([])
data = bytes(1) * data_size
object_ids = [ray.put(data) for _ in range(object_number)]
object_refs = [ray.put(data) for _ in range(object_number)]
benchmark(benchmark_transfer_object, actor, object_ids)
benchmark(benchmark_transfer_object, actor, object_refs)
+4 -4
View File
@@ -614,11 +614,11 @@ def test_remote_function_within_actor(ray_start_10_cpus):
def __init__(self, x):
self.x = x
self.y = val2
self.object_ids = [f.remote(i) for i in range(5)]
self.object_refs = [f.remote(i) for i in range(5)]
self.values2 = ray.get([f.remote(i) for i in range(5)])
def get_values(self):
return self.x, self.y, self.object_ids, self.values2
return self.x, self.y, self.object_refs, self.values2
def f(self):
return [f.remote(i) for i in range(5)]
@@ -626,8 +626,8 @@ def test_remote_function_within_actor(ray_start_10_cpus):
def g(self):
return ray.get([g.remote(i) for i in range(5)])
def h(self, object_ids):
return ray.get(object_ids)
def h(self, object_refs):
return ray.get(object_refs)
actor = Actor.remote(1)
values = ray.get(actor.get_values.remote())
+4 -4
View File
@@ -473,7 +473,7 @@ def test_multiple_actor_restart(ray_start_cluster_head):
# Wait for the actors to start up.
time.sleep(1)
# This is a mapping from actor handles to object IDs returned by
# This is a mapping from actor handles to object refs returned by
# methods on that actor.
result_ids = collections.defaultdict(lambda: [])
@@ -813,10 +813,10 @@ def test_decorated_method(ray_start_regular):
a = Actor.remote()
object_id, extra = a.decorated_method.remote(3, kwarg=3)
assert isinstance(object_id, ray.ObjectID)
object_ref, extra = a.decorated_method.remote(3, kwarg=3)
assert isinstance(object_ref, ray.ObjectRef)
assert extra == {"kwarg": 3}
assert ray.get(object_id) == 7 # 2 * 3 + 1
assert ray.get(object_ref) == 7 # 2 * 3 + 1
@pytest.mark.parametrize(
+3 -3
View File
@@ -631,12 +631,12 @@ def test_creating_more_actors_than_resources(shutdown_only):
results = []
for _ in range(3):
actor = ResourceActor2.remote()
object_id = actor.method.remote()
results.append(object_id)
object_ref = actor.method.remote()
results.append(object_ref)
# Wait for the task to execute. We do this because otherwise it may
# be possible for the __ray_terminate__ task to execute before the
# method.
ray.wait([object_id])
ray.wait([object_ref])
ray.get(results)
+19 -19
View File
@@ -34,10 +34,10 @@ def test_internal_free(shutdown_only):
sampler = Sampler.remote()
# Free does not delete from in-memory store.
obj_id = sampler.sample.remote()
ray.get(obj_id)
ray.internal.free(obj_id)
assert ray.get(obj_id) == [1, 2, 3, 4, 5]
obj_ref = sampler.sample.remote()
ray.get(obj_ref)
ray.internal.free(obj_ref)
assert ray.get(obj_ref) == [1, 2, 3, 4, 5]
# Free deletes big objects from plasma store.
big_id = sampler.sample_big.remote()
@@ -54,17 +54,17 @@ def test_wait_iterables(ray_start_regular):
time.sleep(delay)
return 1
objectids = (f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5))
ready_ids, remaining_ids = ray.experimental.wait(objectids)
object_refs = (f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5))
ready_ids, remaining_ids = ray.experimental.wait(object_refs)
assert len(ready_ids) == 1
assert len(remaining_ids) == 3
objectids = np.array(
object_refs = np.array(
[f.remote(1.0),
f.remote(0.5),
f.remote(0.5),
f.remote(0.5)])
ready_ids, remaining_ids = ray.experimental.wait(objectids)
ready_ids, remaining_ids = ray.experimental.wait(object_refs)
assert len(ready_ids) == 1
assert len(remaining_ids) == 3
@@ -81,20 +81,20 @@ def test_multiple_waits_and_gets(shutdown_only):
@ray.remote
def g(l):
# The argument l should be a list containing one object ID.
# The argument l should be a list containing one object ref.
ray.wait([l[0]])
@ray.remote
def h(l):
# The argument l should be a list containing one object ID.
# The argument l should be a list containing one object ref.
ray.get(l[0])
# Make sure that multiple wait requests involving the same object ID
# Make sure that multiple wait requests involving the same object ref
# all return.
x = f.remote(1)
ray.get([g.remote([x]), g.remote([x])])
# Make sure that multiple get requests involving the same object ID all
# Make sure that multiple get requests involving the same object ref all
# return.
x = f.remote(1)
ray.get([h.remote([x]), h.remote([x])])
@@ -179,9 +179,9 @@ def test_profiling_api(ray_start_2_cpus):
pass
ray.put(1)
object_id = f.remote()
ray.wait([object_id])
ray.get(object_id)
object_ref = f.remote()
ray.wait([object_ref])
ray.get(object_ref)
# Wait until all of the profiling information appears in the profile
# table.
@@ -259,14 +259,14 @@ def test_object_transfer_dump(ray_start_cluster):
return
# These objects will live on different nodes.
object_ids = [
object_refs = [
f._remote(args=[1], resources={str(i): 1}) for i in range(num_nodes)
]
# Broadcast each object from each machine to each other machine.
for object_id in object_ids:
for object_ref in object_refs:
ray.get([
f._remote(args=[object_id], resources={str(i): 1})
f._remote(args=[object_ref], resources={str(i): 1})
for i in range(num_nodes)
])
@@ -355,7 +355,7 @@ def test_identical_function_names(ray_start_regular):
def test_illegal_api_calls(ray_start_regular):
# Verify that we cannot call put on an ObjectID.
# Verify that we cannot call put on an ObjectRef.
x = ray.put(1)
with pytest.raises(Exception):
ray.put(x)
+4 -4
View File
@@ -690,15 +690,15 @@ def test_blocking_tasks(ray_start_regular):
def g(i):
# Each instance of g submits and blocks on the result of another
# remote task.
object_ids = [f.remote(i, j) for j in range(2)]
return ray.get(object_ids)
object_refs = [f.remote(i, j) for j in range(2)]
return ray.get(object_refs)
@ray.remote
def h(i):
# Each instance of g submits and blocks on the result of another
# remote task using ray.wait.
object_ids = [f.remote(i, j) for j in range(2)]
return ray.wait(object_ids, num_returns=len(object_ids))
object_refs = [f.remote(i, j) for j in range(2)]
return ray.wait(object_refs, num_returns=len(object_refs))
ray.get([h.remote(i) for i in range(4)])
+21 -21
View File
@@ -119,7 +119,7 @@ def test_global_state_api(shutdown_only):
# A driver/worker creates a temporary object during startup. Although the
# temporary object is freed immediately, in a rare case, we can still find
# the object ID in GCS because Raylet removes the object ID from GCS
# the object ref in GCS because Raylet removes the object ref from GCS
# asynchronously.
# Because we can't control when workers create the temporary objects, so
# We can't assert that `ray.objects()` returns an empty dict. Here we just
@@ -281,22 +281,22 @@ def test_specific_job_id():
ray.shutdown()
def test_object_id_properties():
def test_object_ref_properties():
id_bytes = b"00112233445566778899"
object_id = ray.ObjectID(id_bytes)
assert object_id.binary() == id_bytes
object_id = ray.ObjectID.nil()
assert object_id.is_nil()
object_ref = ray.ObjectRef(id_bytes)
assert object_ref.binary() == id_bytes
object_ref = ray.ObjectRef.nil()
assert object_ref.is_nil()
with pytest.raises(ValueError, match=r".*needs to have length 20.*"):
ray.ObjectID(id_bytes + b"1234")
ray.ObjectRef(id_bytes + b"1234")
with pytest.raises(ValueError, match=r".*needs to have length 20.*"):
ray.ObjectID(b"0123456789")
object_id = ray.ObjectID.from_random()
assert not object_id.is_nil()
assert object_id.binary() != id_bytes
id_dumps = pickle.dumps(object_id)
ray.ObjectRef(b"0123456789")
object_ref = ray.ObjectRef.from_random()
assert not object_ref.is_nil()
assert object_ref.binary() != id_bytes
id_dumps = pickle.dumps(object_ref)
id_from_dumps = pickle.loads(id_dumps)
assert id_from_dumps == object_id
assert id_from_dumps == object_ref
@pytest.fixture
@@ -434,7 +434,7 @@ def test_pandas_parquet_serialization():
def test_socket_dir_not_existing(shutdown_only):
random_name = ray.ObjectID.from_random().hex()
random_name = ray.ObjectRef.from_random().hex()
temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(),
"tests", random_name)
temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir,
@@ -487,20 +487,20 @@ def test_put_pins_object(ray_start_object_store_memory):
obj = np.ones(200 * 1024, dtype=np.uint8)
x_id = ray.put(obj)
x_binary = x_id.binary()
assert (ray.get(ray.ObjectID(x_binary)) == obj).all()
assert (ray.get(ray.ObjectRef(x_binary)) == obj).all()
# x cannot be evicted since x_id pins it
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
assert (ray.get(x_id) == obj).all()
assert (ray.get(ray.ObjectID(x_binary)) == obj).all()
assert (ray.get(ray.ObjectRef(x_binary)) == obj).all()
# now it can be evicted since x_id pins it but x_binary does not
del x_id
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
assert not ray.worker.global_worker.core_worker.object_exists(
ray.ObjectID(x_binary))
ray.ObjectRef(x_binary))
# weakref put
y_id = ray.put(obj, weakref=True)
@@ -530,7 +530,7 @@ def test_decorated_function(ray_start_regular):
def test_get_postprocess(ray_start_regular):
def get_postprocessor(object_ids, values):
def get_postprocessor(object_refs, values):
return [value for value in values if value > 0]
ray.worker.global_worker._post_get_hooks.append(get_postprocessor)
@@ -657,10 +657,10 @@ def test_lease_request_leak(shutdown_only):
# from the raylet.
tasks = []
for _ in range(10):
oid = ray.put(1)
obj_ref = ray.put(1)
for _ in range(2):
tasks.append(f.remote(oid))
del oid
tasks.append(f.remote(obj_ref))
del obj_ref
ray.get(tasks)
time.sleep(
+5 -5
View File
@@ -19,13 +19,13 @@ def reload_modules():
def test_remote_array_methods(ray_start_2_cpus, reload_modules):
# test eye
object_id = ra.eye.remote(3)
val = ray.get(object_id)
object_ref = ra.eye.remote(3)
val = ray.get(object_ref)
assert_almost_equal(val, np.eye(3))
# test zeros
object_id = ra.zeros.remote([3, 4, 5])
val = ray.get(object_id)
object_ref = ra.zeros.remote([3, 4, 5])
val = ray.get(object_ref)
assert_equal(val, np.zeros([3, 4, 5]))
# test qr - pass by value
@@ -35,7 +35,7 @@ def test_remote_array_methods(ray_start_2_cpus, reload_modules):
r_val = ray.get(r_id)
assert_almost_equal(np.dot(q_val, r_val), a_val)
# test qr - pass by objectid
# test qr - pass by object_ref
a = ra.random.normal.remote([10, 13])
q_id, r_id = ra.linalg.qr.remote(a)
a_val = ray.get(a)
+3 -3
View File
@@ -40,7 +40,7 @@ def test_simple(init):
def test_gather(init):
loop = asyncio.get_event_loop()
tasks = gen_tasks()
futures = [obj_id.as_future() for obj_id in tasks]
futures = [obj_ref.as_future() for obj_ref in tasks]
results = loop.run_until_complete(asyncio.gather(*futures))
assert all(a[0] == b[0] for a, b in zip(results, ray.get(tasks)))
@@ -48,7 +48,7 @@ def test_gather(init):
def test_wait(init):
loop = asyncio.get_event_loop()
tasks = gen_tasks()
futures = [obj_id.as_future() for obj_id in tasks]
futures = [obj_ref.as_future() for obj_ref in tasks]
results, _ = loop.run_until_complete(asyncio.wait(futures))
assert set(results) == set(futures)
@@ -56,7 +56,7 @@ def test_wait(init):
def test_wait_timeout(init):
loop = asyncio.get_event_loop()
tasks = gen_tasks(10)
futures = [obj_id.as_future() for obj_id in tasks]
futures = [obj_ref.as_future() for obj_ref in tasks]
fut = asyncio.wait(futures, timeout=5)
results, _ = loop.run_until_complete(fut)
assert list(results)[0] == futures[0]
+24 -9
View File
@@ -441,7 +441,7 @@ def test_passing_arguments_by_value_out_of_the_box(ray_start_regular):
"local_mode": False
}],
indirect=True)
def test_putting_object_that_closes_over_object_id(ray_start_regular):
def test_putting_object_that_closes_over_object_ref(ray_start_regular):
# This test is here to prevent a regression of
# https://github.com/ray-project/ray/issues/1317.
@@ -461,26 +461,26 @@ def test_put_get(shutdown_only):
for i in range(100):
value_before = i * 10**6
objectid = ray.put(value_before)
value_after = ray.get(objectid)
object_ref = ray.put(value_before)
value_after = ray.get(object_ref)
assert value_before == value_after
for i in range(100):
value_before = i * 10**6 * 1.0
objectid = ray.put(value_before)
value_after = ray.get(objectid)
object_ref = ray.put(value_before)
value_after = ray.get(object_ref)
assert value_before == value_after
for i in range(100):
value_before = "h" * i
objectid = ray.put(value_before)
value_after = ray.get(objectid)
object_ref = ray.put(value_before)
value_after = ray.get(object_ref)
assert value_before == value_after
for i in range(100):
value_before = [1] * i
objectid = ray.put(value_before)
value_after = ray.get(objectid)
object_ref = ray.put(value_before)
value_after = ray.get(object_ref)
assert value_before == value_after
@@ -728,6 +728,21 @@ def test_args_stars_after(ray_start_regular):
ray.get(remote_test_function.remote(local_method, actor_method))
def test_object_id_backward_compatibility(ray_start_regular):
# We've renamed Python's `ObjectID` to `ObjectRef`, and added a type
# alias for backward compatibility.
# This test is to make sure legacy code can still use `ObjectID`.
# TODO(hchen): once we completely remove Python's `ObjectID`,
# this test can be removed as well.
# Check that these 2 types are the same.
assert ray.ObjectID == ray.ObjectRef
object_ref = ray.put(1)
# Check that users can use either type in `isinstance`
assert isinstance(object_ref, ray.ObjectID)
assert isinstance(object_ref, ray.ObjectRef)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+29 -28
View File
@@ -196,13 +196,13 @@ def test_redefining_remote_functions(shutdown_only):
}],
indirect=True)
def test_get_multiple(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
assert ray.get(object_ids) == list(range(10))
object_refs = [ray.put(i) for i in range(10)]
assert ray.get(object_refs) == list(range(10))
# Get a random choice of object IDs with duplicates.
# Get a random choice of object refs with duplicates.
indices = list(np.random.choice(range(10), 5))
indices += indices
results = ray.get([object_ids[i] for i in indices])
results = ray.get([object_refs[i] for i in indices])
assert results == indices
@@ -214,13 +214,13 @@ def test_get_multiple(ray_start_regular):
}],
indirect=True)
def test_get_multiple_experimental(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
object_refs = [ray.put(i) for i in range(10)]
object_ids_tuple = tuple(object_ids)
assert ray.experimental.get(object_ids_tuple) == list(range(10))
object_refs_tuple = tuple(object_refs)
assert ray.experimental.get(object_refs_tuple) == list(range(10))
object_ids_nparray = np.array(object_ids)
assert ray.experimental.get(object_ids_nparray) == list(range(10))
object_refs_nparray = np.array(object_refs)
assert ray.experimental.get(object_refs_nparray) == list(range(10))
@pytest.mark.parametrize(
@@ -412,10 +412,10 @@ def test_skip_plasma(ray_start_regular):
return x * 2
a = Actor.remote()
obj_id = a.f.remote(1)
obj_ref = a.f.remote(1)
# it is not stored in plasma
assert not ray.worker.global_worker.core_worker.object_exists(obj_id)
assert ray.get(obj_id) == 2
assert not ray.worker.global_worker.core_worker.object_exists(obj_ref)
assert ray.get(obj_ref) == 2
def test_actor_call_order(shutdown_only):
@@ -452,12 +452,12 @@ def test_actor_large_objects(ray_start_regular):
return np.zeros(10000000)
a = Actor.remote()
obj_id = a.f.remote()
assert not ray.worker.global_worker.core_worker.object_exists(obj_id)
done, _ = ray.wait([obj_id])
obj_ref = a.f.remote()
assert not ray.worker.global_worker.core_worker.object_exists(obj_ref)
done, _ = ray.wait([obj_ref])
assert len(done) == 1
assert ray.worker.global_worker.core_worker.object_exists(obj_id)
assert isinstance(ray.get(obj_id), np.ndarray)
assert ray.worker.global_worker.core_worker.object_exists(obj_ref)
assert isinstance(ray.get(obj_ref), np.ndarray)
def test_actor_pass_by_ref(ray_start_regular):
@@ -580,20 +580,21 @@ def test_wait(ray_start_regular):
time.sleep(delay)
return
object_ids = [f.remote(0), f.remote(0), f.remote(0), f.remote(0)]
ready_ids, remaining_ids = ray.wait(object_ids)
object_refs = [f.remote(0), f.remote(0), f.remote(0), f.remote(0)]
ready_ids, remaining_ids = ray.wait(object_refs)
assert len(ready_ids) == 1
assert len(remaining_ids) == 3
ready_ids, remaining_ids = ray.wait(object_ids, num_returns=4)
assert set(ready_ids) == set(object_ids)
ready_ids, remaining_ids = ray.wait(object_refs, num_returns=4)
assert set(ready_ids) == set(object_refs)
assert remaining_ids == []
object_ids = [f.remote(0), f.remote(5)]
ready_ids, remaining_ids = ray.wait(object_ids, timeout=0.5, num_returns=2)
object_refs = [f.remote(0), f.remote(5)]
ready_ids, remaining_ids = ray.wait(
object_refs, timeout=0.5, num_returns=2)
assert len(ready_ids) == 1
assert len(remaining_ids) == 1
# Verify that calling wait with duplicate object IDs throws an
# Verify that calling wait with duplicate object refs throws an
# exception.
x = ray.put(1)
with pytest.raises(Exception):
@@ -605,8 +606,8 @@ def test_wait(ray_start_regular):
assert remaining_ids == []
# Test semantics of num_returns with no timeout.
oids = [ray.put(i) for i in range(10)]
(found, rest) = ray.wait(oids, num_returns=2)
obj_refs = [ray.put(i) for i in range(10)]
(found, rest) = ray.wait(obj_refs, num_returns=2)
assert len(found) == 2
assert len(rest) == 8
@@ -665,14 +666,14 @@ def test_internal_config_when_connecting(ray_start_cluster):
# Check that the config was picked up (object pinning is disabled).
ray.init(address=cluster.address)
oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
obj_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
for _ in range(5):
ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
# This would not raise an exception if object pinning was enabled.
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(oid)
ray.get(obj_ref)
def test_get_correct_node_ip():
+2 -2
View File
@@ -223,9 +223,9 @@ def test_fast(shutdown_only, use_force):
if random.random() > 0.95:
ray.cancel(ids[idx], use_force)
signaler.send.remote()
for obj_id in ids:
for obj_ref in ids:
try:
ray.get(obj_id)
ray.get(obj_ref)
except Exception as e:
assert isinstance(e, valid_exceptions(use_force))
+4 -4
View File
@@ -81,7 +81,7 @@ def test_dying_driver_get(ray_start_regular):
driver = """
import ray
ray.init("{}")
ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}")))
ray.get(ray.ObjectRef(ray.utils.hex_to_binary("{}")))
""".format(address_info["redis_address"], x_id.hex())
p = run_string_as_driver_nonblocking(driver)
@@ -128,8 +128,8 @@ def test_dying_worker_wait(ray_start_2_cpus):
worker_pid = ray.get(get_pid.remote())
@ray.remote
def block_in_wait(object_id_in_list):
ray.wait(object_id_in_list)
def block_in_wait(object_ref_in_list):
ray.wait(object_ref_in_list)
# Have the worker wait in a wait call.
block_in_wait.remote([x_id])
@@ -166,7 +166,7 @@ def test_dying_driver_wait(ray_start_regular):
driver = """
import ray
ray.init("{}")
ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))])
ray.wait([ray.ObjectRef(ray.utils.hex_to_binary("{}"))])
""".format(address_info["redis_address"], x_id.hex())
p = run_string_as_driver_nonblocking(driver)
@@ -58,8 +58,8 @@ def test_worker_failed(ray_start_workers_separate_multinode):
# Submit more tasks than there are workers so that all workers and
# cores are utilized.
object_ids = [f.remote(i) for i in range(num_initial_workers * num_nodes)]
object_ids += [f.remote(object_id) for object_id in object_ids]
object_refs = [f.remote(i) for i in range(num_initial_workers * num_nodes)]
object_refs += [f.remote(object_ref) for object_ref in object_refs]
# Allow the tasks some time to begin executing.
time.sleep(0.1)
# Kill the workers as the tasks execute.
@@ -68,9 +68,9 @@ def test_worker_failed(ray_start_workers_separate_multinode):
time.sleep(0.1)
# Make sure that we either get the object or we get an appropriate
# exception.
for object_id in object_ids:
for object_ref in object_refs:
try:
ray.get(object_id)
ray.get(object_ref)
except (ray.exceptions.RayTaskError, ray.exceptions.RayWorkerError):
pass
+2 -2
View File
@@ -66,7 +66,7 @@ def test_dynamic_res_infeasible_rescheduling(ray_start_regular):
return 1
remote_task = ray.remote(resources={res_name: res_capacity})(f)
oid = remote_task.remote() # This is infeasible
obj_ref = remote_task.remote() # This is infeasible
ray.get(set_res.remote(res_name, res_capacity)) # Now should be feasible
def check_resources():
@@ -75,7 +75,7 @@ def test_dynamic_res_infeasible_rescheduling(ray_start_regular):
wait_for_condition(check_resources)
successful, unsuccessful = ray.wait([oid], timeout=1)
successful, unsuccessful = ray.wait([obj_ref], timeout=1)
assert successful # The task completed
+19 -19
View File
@@ -819,15 +819,15 @@ def test_raylet_crash_when_get(ray_start_regular):
time.sleep(2)
ray.worker._global_node.kill_raylet()
object_id = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
ray.internal.free(object_id)
while ray.worker.global_worker.core_worker.object_exists(object_id):
object_ref = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
ray.internal.free(object_ref)
while ray.worker.global_worker.core_worker.object_exists(object_ref):
time.sleep(1)
thread = threading.Thread(target=sleep_to_kill_raylet)
thread.start()
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(object_id)
ray.get(object_ref)
thread.join()
@@ -938,11 +938,11 @@ def test_fill_object_store_lru_fallback(shutdown_only):
end = time.time()
assert end - start < 3
oids = []
obj_refs = []
for _ in range(3):
oid = expensive_task.remote()
ray.get(oid)
oids.append(oid)
obj_ref = expensive_task.remote()
ray.get(obj_ref)
obj_refs.append(obj_ref)
@ray.remote
class LargeMemoryActor:
@@ -954,16 +954,16 @@ def test_fill_object_store_lru_fallback(shutdown_only):
actor = LargeMemoryActor.remote()
for _ in range(3):
oid = actor.some_expensive_task.remote()
ray.get(oid)
oids.append(oid)
obj_ref = actor.some_expensive_task.remote()
ray.get(obj_ref)
obj_refs.append(obj_ref)
# Make sure actor does not die
ray.get(actor.test.remote())
for _ in range(3):
oid = ray.put(np.zeros(10**8 // 2, dtype=np.uint8))
ray.get(oid)
oids.append(oid)
obj_ref = ray.put(np.zeros(10**8 // 2, dtype=np.uint8))
ray.get(obj_ref)
obj_refs.append(obj_ref)
@pytest.mark.parametrize(
@@ -1022,13 +1022,13 @@ def test_serialized_id(ray_start_cluster):
return x
@ray.remote
def get(obj_ids, test_dependent_task):
print("get", obj_ids)
obj_id = obj_ids[0]
def get(obj_refs, test_dependent_task):
print("get", obj_refs)
obj_ref = obj_refs[0]
if test_dependent_task:
assert ray.get(dependent_task.remote(obj_id)) == 1
assert ray.get(dependent_task.remote(obj_ref)) == 1
else:
assert ray.get(obj_id) == 1
assert ray.get(obj_ref) == 1
obj = small_object.remote()
ray.get(get.remote([obj], False))
+2 -2
View File
@@ -97,7 +97,7 @@ def test_global_gc_when_full(shutdown_only):
# GC should be triggered for all workers, including the local driver,
# when the driver tries to ray.put a value that doesn't fit in the
# object store. This should cause the captured ObjectIDs' numpy arrays
# object store. This should cause the captured ObjectRefs' numpy arrays
# to be evicted.
ray.put(np.zeros(80 * 1024 * 1024, dtype=np.uint8))
@@ -116,7 +116,7 @@ def test_global_gc_when_full(shutdown_only):
# GC should be triggered for all workers, including the local driver,
# when a remote task tries to put a return value that doesn't fit in
# the object store. This should cause the captured ObjectIDs' numpy
# the object store. This should cause the captured ObjectRefs' numpy
# arrays to be evicted.
ray.get(actors[0].return_large_array.remote())
+9 -9
View File
@@ -11,7 +11,7 @@ from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.dashboard.memory import (ReferenceType, decode_object_id_if_needed,
from ray.dashboard.memory import (ReferenceType, decode_object_ref_if_needed,
MemoryTableEntry, MemoryTable, SortingType)
from ray.test_utils import (RayTestTimeoutException,
wait_until_succeeded_without_exception,
@@ -210,7 +210,7 @@ def test_raylet_info_endpoint(shutdown_only):
try:
assert len(actor_info) == 1
_, parent_actor_info = actor_info.popitem()
assert parent_actor_info["numObjectIdsInScope"] == 13
assert parent_actor_info["numObjectRefsInScope"] == 13
assert parent_actor_info["numLocalObjects"] == 10
children = parent_actor_info["children"]
assert len(children) == 2
@@ -486,7 +486,7 @@ def test_memory_dashboard(shutdown_only):
stop_memory_table()
return True
def test_serialized_object_id_reference():
def test_serialized_object_ref_reference():
@ray.remote
def f(arg):
time.sleep(1)
@@ -508,7 +508,7 @@ def test_memory_dashboard(shutdown_only):
stop_memory_table()
return True
def test_captured_object_id_reference():
def test_captured_object_ref_reference():
a = ray.put(None)
b = ray.put([a]) # Noqa F841
del a
@@ -562,12 +562,12 @@ def test_memory_dashboard(shutdown_only):
True)
assert (wait_for_condition(
test_serialized_object_id_reference,
test_serialized_object_ref_reference,
timeout=30000,
retry_interval_ms=1000) is True)
assert (wait_for_condition(
test_captured_object_id_reference,
test_captured_object_ref_reference,
timeout=30000,
retry_interval_ms=1000) is True)
@@ -583,7 +583,7 @@ IS_DRIVER = True
PID = 1
OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA="
ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000"
DECODED_ID = decode_object_id_if_needed(OBJECT_ID)
DECODED_ID = decode_object_ref_if_needed(OBJECT_ID)
OBJECT_SIZE = 100
@@ -721,8 +721,8 @@ def test_invalid_memory_entry():
def test_valid_reference_memory_entry():
memory_entry = build_local_reference_entry()
assert memory_entry.reference_type == ReferenceType.LOCAL_REFERENCE
assert memory_entry.object_id == ray.ObjectID(
decode_object_id_if_needed(OBJECT_ID))
assert memory_entry.object_ref == ray.ObjectRef(
decode_object_ref_if_needed(OBJECT_ID))
assert memory_entry.is_valid() is True
+1 -1
View File
@@ -41,7 +41,7 @@ def test_put_api(ray_start_regular):
for obj in test_values:
assert ray.get(ray.put(obj)) == obj
# Test putting object IDs.
# Test putting object refs.
x_id = ray.put(0)
for obj in [[x_id], (x_id, ), {x_id: x_id}]:
assert ray.get(ray.put(obj)) == obj
+5 -5
View File
@@ -176,7 +176,7 @@ import time
import ray
import numpy as np
ray.init(address="{}")
object_ids = [ray.put(np.zeros(200 * 1024, dtype=np.uint8))
object_refs = [ray.put(np.zeros(200 * 1024, dtype=np.uint8))
for i in range(1000)]
start_time = time.time()
while time.time() - start_time < 30:
@@ -582,7 +582,7 @@ ray.init(address="{}")
@ray.remote
def g(x):
return
g.remote(ray.ObjectID(ray.utils.hex_to_binary("{}")))
g.remote(ray.ObjectRef(ray.utils.hex_to_binary("{}")))
time.sleep(1)
print("success")
"""
@@ -590,7 +590,7 @@ print("success")
# Create some drivers and let them exit and make sure everything is
# still alive.
for _ in range(3):
nonexistent_id = ray.ObjectID.from_random()
nonexistent_id = ray.ObjectRef.from_random()
driver_script = driver_script_template.format(address,
nonexistent_id.hex())
out = run_string_as_driver(driver_script)
@@ -606,7 +606,7 @@ import ray
ray.init(address="{}")
@ray.remote
def g():
ray.wait(ray.ObjectID(ray.utils.hex_to_binary("{}")))
ray.wait(ray.ObjectRef(ray.utils.hex_to_binary("{}")))
g.remote()
time.sleep(1)
print("success")
@@ -615,7 +615,7 @@ print("success")
# Create some drivers and let them exit and make sure everything is
# still alive.
for _ in range(3):
nonexistent_id = ray.ObjectID.from_random()
nonexistent_id = ray.ObjectRef.from_random()
driver_script = driver_script_template.format(address,
nonexistent_id.hex())
out = run_string_as_driver(driver_script)
+4 -4
View File
@@ -58,8 +58,8 @@ def test_worker_failed(ray_start_workers_separate_multinode):
# Submit more tasks than there are workers so that all workers and
# cores are utilized.
object_ids = [f.remote(i) for i in range(num_initial_workers * num_nodes)]
object_ids += [f.remote(object_id) for object_id in object_ids]
object_refs = [f.remote(i) for i in range(num_initial_workers * num_nodes)]
object_refs += [f.remote(object_ref) for object_ref in object_refs]
# Allow the tasks some time to begin executing.
time.sleep(0.1)
# Kill the workers as the tasks execute.
@@ -68,9 +68,9 @@ def test_worker_failed(ray_start_workers_separate_multinode):
time.sleep(0.1)
# Make sure that we either get the object or we get an appropriate
# exception.
for object_id in object_ids:
for object_ref in object_refs:
try:
ray.get(object_id)
ray.get(object_ref)
except (ray.exceptions.RayTaskError, ray.exceptions.RayWorkerError):
pass
+3 -3
View File
@@ -232,7 +232,7 @@ def test_apply_async(pool):
with pytest.raises(Exception):
pool.apply_async(f, (1, 2), {"kwarg1": 3}).get()
# Won't return until the input ObjectID is fulfilled.
# Won't return until the input ObjectRef is fulfilled.
def ten_over(args):
signal, val = args
ray.get(signal.wait.remote())
@@ -245,7 +245,7 @@ def test_apply_async(pool):
with pytest.raises(TimeoutError):
result.get(timeout=0.01)
# Fulfill the ObjectID.
# Fulfill the ObjectRef.
ray.get(signal.send.remote())
result.wait(timeout=10)
assert result.ready()
@@ -257,7 +257,7 @@ def test_apply_async(pool):
with pytest.raises(ValueError, match="not ready"):
result.successful()
# Fulfill the ObjectID with 0, causing the task to fail (divide by zero).
# Fulfill the ObjectRef with 0, causing the task to fail (divide by zero).
ray.get(signal.send.remote())
result.wait(timeout=10)
assert result.ready()
+7 -7
View File
@@ -50,12 +50,12 @@ def test_object_broadcast(ray_start_cluster_with_resource):
def create_object():
return np.zeros(1024 * 1024, dtype=np.uint8)
object_ids = []
object_refs = []
for _ in range(3):
# Broadcast an object to all machines.
x_id = ray.put(x)
object_ids.append(x_id)
object_refs.append(x_id)
ray.get([
f._remote(args=[x_id], resources={str(i % num_nodes): 1})
for i in range(10 * num_nodes)
@@ -64,7 +64,7 @@ def test_object_broadcast(ray_start_cluster_with_resource):
for _ in range(3):
# Broadcast an object to all machines.
x_id = create_object.remote()
object_ids.append(x_id)
object_refs.append(x_id)
ray.get([
f._remote(args=[x_id], resources={str(i % num_nodes): 1})
for i in range(10 * num_nodes)
@@ -75,7 +75,7 @@ def test_object_broadcast(ray_start_cluster_with_resource):
transfer_events = ray.object_transfer_timeline()
# Make sure that each object was transferred a reasonable number of times.
for x_id in object_ids:
for x_id in object_refs:
relevant_events = [
event for event in transfer_events
if event["cat"] == "transfer_send"
@@ -141,12 +141,12 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
# Wait for the actors to start up.
ray.get([a.ready.remote() for a in actors])
object_ids = []
object_refs = []
# Broadcast a large object to all actors.
for _ in range(5):
x_id = ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
object_ids.append(x_id)
object_refs.append(x_id)
# Pass the object into a method for every actor.
ray.get([a.set_weights.remote(x_id) for a in actors])
@@ -155,7 +155,7 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
transfer_events = ray.object_transfer_timeline()
# Make sure that each object was transferred a reasonable number of times.
for x_id in object_ids:
for x_id in object_refs:
relevant_events = [
event for event in transfer_events
if event["cat"] == "transfer_send"
+4 -4
View File
@@ -28,8 +28,8 @@ class TestRedisPassword:
redis_ip, redis_port = address.split(":")
# Check that we can run a task
object_id = f.remote()
ray.get(object_id)
object_ref = f.remote()
ray.get(object_ref)
# Check that Redis connections require a password
redis_client = redis.StrictRedis(
@@ -55,8 +55,8 @@ class TestRedisPassword:
initialize_head=True, connect=True, head_node_args=node_args)
cluster.add_node(**node_args)
object_id = f.remote()
ray.get(object_id)
object_ref = f.remote()
ray.get(object_ref)
def test_redis_port(self, shutdown_only):
@ray.remote
+35 -35
View File
@@ -29,28 +29,28 @@ def one_worker_100MiB(request):
ray.shutdown()
def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
def _fill_object_store_and_get(obj, succeed=True, object_MiB=40,
num_objects=5):
for _ in range(num_objects):
ray.put(np.zeros(object_MiB * 1024 * 1024, dtype=np.uint8))
if type(oid) is bytes:
oid = ray.ObjectID(oid)
if type(obj) is bytes:
obj = ray.ObjectRef(obj)
if succeed:
wait_for_condition(
lambda: ray.worker.global_worker.core_worker.object_exists(oid))
lambda: ray.worker.global_worker.core_worker.object_exists(obj))
else:
wait_for_condition(
lambda: not ray.worker.global_worker.core_worker.object_exists(oid)
lambda: not ray.worker.global_worker.core_worker.object_exists(obj)
)
def _check_refcounts(expected):
actual = ray.worker.global_worker.core_worker.get_all_reference_counts()
assert len(expected) == len(actual)
for object_id, (local, submitted) in expected.items():
hex_id = object_id.hex().encode("ascii")
for object_ref, (local, submitted) in expected.items():
hex_id = object_ref.hex().encode("ascii")
assert hex_id in actual
assert local == actual[hex_id]["local"]
assert submitted == actual[hex_id]["submitted"]
@@ -70,13 +70,13 @@ def check_refcounts(expected, timeout=10):
def test_local_refcounts(ray_start_regular):
oid1 = ray.put(None)
check_refcounts({oid1: (1, 0)})
oid1_copy = copy.copy(oid1)
check_refcounts({oid1: (2, 0)})
del oid1
check_refcounts({oid1_copy: (1, 0)})
del oid1_copy
obj_ref1 = ray.put(None)
check_refcounts({obj_ref1: (1, 0)})
obj_ref1_copy = copy.copy(obj_ref1)
check_refcounts({obj_ref1: (2, 0)})
del obj_ref1
check_refcounts({obj_ref1_copy: (1, 0)})
del obj_ref1_copy
check_refcounts({})
@@ -233,13 +233,13 @@ def test_pending_task_dependency_pinning(one_worker_100MiB):
# store.
np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8)
signal = SignalActor.remote()
oid = pending.remote(np_array, signal.wait.remote())
obj_ref = pending.remote(np_array, signal.wait.remote())
for _ in range(2):
ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
ray.get(signal.send.remote())
ray.get(oid)
ray.get(obj_ref)
def test_feature_flag(shutdown_only):
@@ -273,20 +273,20 @@ def test_feature_flag(shutdown_only):
_fill_object_store_and_get(actor.get_large_object.remote(), succeed=False)
def test_out_of_band_serialized_object_id(one_worker_100MiB):
def test_out_of_band_serialized_object_ref(one_worker_100MiB):
assert len(
ray.worker.global_worker.core_worker.get_all_reference_counts()) == 0
oid = ray.put("hello")
_check_refcounts({oid: (1, 0)})
oid_str = ray.cloudpickle.dumps(oid)
_check_refcounts({oid: (2, 0)})
del oid
obj_ref = ray.put("hello")
_check_refcounts({obj_ref: (1, 0)})
obj_ref_str = ray.cloudpickle.dumps(obj_ref)
_check_refcounts({obj_ref: (2, 0)})
del obj_ref
assert len(
ray.worker.global_worker.core_worker.get_all_reference_counts()) == 1
assert ray.get(ray.cloudpickle.loads(oid_str)) == "hello"
assert ray.get(ray.cloudpickle.loads(obj_ref_str)) == "hello"
def test_captured_object_id(one_worker_100MiB):
def test_captured_object_ref(one_worker_100MiB):
captured_id = ray.put(np.zeros(10 * 1024 * 1024, dtype=np.uint8))
@ray.remote
@@ -295,16 +295,16 @@ def test_captured_object_id(one_worker_100MiB):
ray.get(captured_id) # noqa: F821
signal = SignalActor.remote()
oid = f.remote(signal)
obj_ref = f.remote(signal)
# Delete local references.
del f
del captured_id
# Test that the captured object ID is pinned despite having no local
# Test that the captured object ref is pinned despite having no local
# references.
ray.get(signal.send.remote())
_fill_object_store_and_get(oid)
_fill_object_store_and_get(obj_ref)
captured_id = ray.put(np.zeros(10 * 1024 * 1024, dtype=np.uint8))
@@ -316,16 +316,16 @@ def test_captured_object_id(one_worker_100MiB):
signal = SignalActor.remote()
actor = Actor.remote()
oid = actor.get.remote(signal)
obj_ref = actor.get.remote(signal)
# Delete local references.
del Actor
del captured_id
# Test that the captured object ID is pinned despite having no local
# Test that the captured object ref is pinned despite having no local
# references.
ray.get(signal.send.remote())
_fill_object_store_and_get(oid)
_fill_object_store_and_get(obj_ref)
# Remote function takes serialized reference and doesn't hold onto it after
@@ -343,7 +343,7 @@ def test_basic_serialized_reference(one_worker_100MiB, use_ray_put, failure):
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
signal = SignalActor.remote()
oid = pending.remote([array_oid], signal.wait.remote())
obj_ref = pending.remote([array_oid], signal.wait.remote())
# Remove the local reference.
array_oid_bytes = array_oid.binary()
@@ -355,7 +355,7 @@ def test_basic_serialized_reference(one_worker_100MiB, use_ray_put, failure):
# Fulfill the dependency, causing the task to finish.
ray.get(signal.send.remote())
try:
ray.get(oid)
ray.get(obj_ref)
assert not failure
except ray.exceptions.RayWorkerError:
assert failure
@@ -415,7 +415,7 @@ def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put,
# Test that a passed reference held by an actor after the method finishes
# is kept until the reference is removed from the actor. Also tests giving
# the actor a duplicate reference to the same object ID.
# the actor a duplicate reference to the same object ref.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put,
@@ -471,7 +471,7 @@ def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put,
# Test that a passed reference held by an actor after a task finishes
# is kept until the reference is removed from the worker. Also tests giving
# the worker a duplicate reference to the same object ID.
# the worker a duplicate reference to the same object ref.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put,
@@ -512,7 +512,7 @@ def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put,
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that an object containing object IDs within it pins the inner IDs.
# Test that an object containing object refs within it pins the inner IDs.
def test_basic_nested_ids(one_worker_100MiB):
inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
outer_oid = ray.put([inner_oid])
+18 -18
View File
@@ -32,24 +32,24 @@ def one_worker_100MiB(request):
ray.shutdown()
def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
def _fill_object_store_and_get(obj, succeed=True, object_MiB=40,
num_objects=5):
for _ in range(num_objects):
ray.put(np.zeros(object_MiB * 1024 * 1024, dtype=np.uint8))
if type(oid) is bytes:
oid = ray.ObjectID(oid)
if type(obj) is bytes:
obj = ray.ObjectRef(obj)
if succeed:
wait_for_condition(
lambda: ray.worker.global_worker.core_worker.object_exists(oid))
lambda: ray.worker.global_worker.core_worker.object_exists(obj))
else:
wait_for_condition(
lambda: not ray.worker.global_worker.core_worker.object_exists(oid)
lambda: not ray.worker.global_worker.core_worker.object_exists(obj)
)
# Test that an object containing object IDs within it pins the inner IDs
# Test that an object containing object refs within it pins the inner IDs
# recursively and for submitted tasks.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
@@ -99,11 +99,11 @@ def test_recursively_nest_ids(one_worker_100MiB, use_ray_put, failure):
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that serialized objectIDs returned from remote tasks are pinned until
# Test that serialized ObjectRefs returned from remote tasks are pinned until
# they go out of scope on the caller side.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_return_object_id(one_worker_100MiB, use_ray_put, failure):
def test_return_object_ref(one_worker_100MiB, use_ray_put, failure):
@ray.remote
def return_an_id():
return [
@@ -139,11 +139,11 @@ def test_return_object_id(one_worker_100MiB, use_ray_put, failure):
_fill_object_store_and_get(inner_oid_binary, succeed=False)
# Test that serialized objectIDs returned from remote tasks are pinned if
# Test that serialized ObjectRefs returned from remote tasks are pinned if
# passed into another remote task by the caller.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_pass_returned_object_id(one_worker_100MiB, use_ray_put, failure):
def test_pass_returned_object_ref(one_worker_100MiB, use_ray_put, failure):
@ray.remote
def return_an_id():
return [
@@ -179,7 +179,7 @@ def test_pass_returned_object_id(one_worker_100MiB, use_ray_put, failure):
def ref_not_exists():
worker = ray.worker.global_worker
inner_oid = ray.ObjectID(inner_oid_binary)
inner_oid = ray.ObjectRef(inner_oid_binary)
return not worker.core_worker.object_exists(inner_oid)
assert wait_for_condition(ref_not_exists)
@@ -191,8 +191,8 @@ def test_pass_returned_object_id(one_worker_100MiB, use_ray_put, failure):
# it finishes.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put,
failure):
def test_recursively_pass_returned_object_ref(one_worker_100MiB, use_ray_put,
failure):
@ray.remote
def return_an_id():
return put_object(
@@ -246,14 +246,14 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put,
# Call a recursive chain of tasks. The final task in the chain returns an
# ObjectID returned by a task that it submitted. Every other task in the chain
# returns the same ObjectID by calling ray.get() on its submitted task and
# ObjectRef returned by a task that it submitted. Every other task in the chain
# returns the same ObjectRef by calling ray.get() on its submitted task and
# returning the result. The reference should still exist while the driver has a
# reference to the final task's ObjectID.
# reference to the final task's ObjectRef.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put,
failure):
def test_recursively_return_borrowed_object_ref(one_worker_100MiB, use_ray_put,
failure):
@ray.remote
def recursive(num_tasks_left):
if num_tasks_left == 0:
+2 -2
View File
@@ -355,8 +355,8 @@ def test_driver_put_errors(ray_start_object_store_memory):
# def g(i):
# # Each instance of g submits and blocks on the result of another remote
# # task.
# object_ids = [f.remote(i, j) for j in range(10)]
# return ray.get(object_ids)
# object_refs = [f.remote(i, j) for j in range(10)]
# return ray.get(object_refs)
#
# ray.init(num_workers=1)
# ray.get([g.remote(i) for i in range(1000)])
+1 -1
View File
@@ -102,7 +102,7 @@ class RayTrialExecutor(TrialExecutor):
self._trial_queued = False
self._running = {}
# Since trial resume after paused should not run
# trial.train.remote(), thus no more new remote object id generated.
# trial.train.remote(), thus no more new remote object ref generated.
# We use self._paused to store paused trials here.
self._paused = {}
+3 -3
View File
@@ -97,9 +97,9 @@ class UtilMonitor(Thread):
def pin_in_object_store(obj):
"""Deprecated, use ray.put(value, weakref=False) instead."""
obj_id = ray.put(obj, weakref=False)
_pinned_objects.append(obj_id)
return obj_id
obj_ref = ray.put(obj, weakref=False)
_pinned_objects.append(obj_ref)
return obj_ref
def get_pinned_object(pinned_id):
+6 -6
View File
@@ -42,8 +42,8 @@ class ActorPool:
Arguments:
fn (func): Function that takes (actor, value) as argument and
returns an ObjectID computing the result over the value. The
actor will be considered busy until the ObjectID completes.
returns an ObjectRef computing the result over the value. The
actor will be considered busy until the ObjectRef completes.
values (list): List of values that fn(actor, value) should be
applied to.
@@ -69,8 +69,8 @@ class ActorPool:
Arguments:
fn (func): Function that takes (actor, value) as argument and
returns an ObjectID computing the result over the value. The
actor will be considered busy until the ObjectID completes.
returns an ObjectRef computing the result over the value. The
actor will be considered busy until the ObjectRef completes.
values (list): List of values that fn(actor, value) should be
applied to.
@@ -96,8 +96,8 @@ class ActorPool:
Arguments:
fn (func): Function that takes (actor, value) as argument and
returns an ObjectID computing the result over the value. The
actor will be considered busy until the ObjectID completes.
returns an ObjectRef computing the result over the value. The
actor will be considered busy until the ObjectRef completes.
value (object): Value to compute a result for.
Examples:
+6 -6
View File
@@ -402,10 +402,10 @@ class ParallelIterator(Generic[T]):
else:
ready, _ = ray.wait(
pending, num_returns=len(pending), timeout=timeout)
for obj_id in ready:
actor = futures.pop(obj_id)
for obj_ref in ready:
actor = futures.pop(obj_ref)
try:
batch = ray.get(obj_id)
batch = ray.get(obj_ref)
futures[actor.par_iter_slice_batch.remote(
step=num_partitions,
start=partition_index,
@@ -545,11 +545,11 @@ class ParallelIterator(Generic[T]):
else:
ready, _ = ray.wait(
pending, num_returns=len(pending), timeout=timeout)
for obj_id in ready:
actor = futures.pop(obj_id)
for obj_ref in ready:
actor = futures.pop(obj_ref)
try:
local_iter.shared_metrics.get().current_actor = actor
batch = ray.get(obj_id)
batch = ray.get(obj_ref)
futures[actor.par_iter_next_batch.remote(
batch_ms)] = actor
for item in batch:
+34 -34
View File
@@ -27,45 +27,45 @@ class PoolTaskError(Exception):
class ResultThread(threading.Thread):
def __init__(self,
object_ids,
object_refs,
callback=None,
error_callback=None,
total_object_ids=None):
total_object_refs=None):
threading.Thread.__init__(self, daemon=True)
self._got_error = False
self._object_ids = []
self._object_refs = []
self._num_ready = 0
self._results = []
self._ready_index_queue = queue.Queue()
self._callback = callback
self._error_callback = error_callback
self._total_object_ids = total_object_ids or len(object_ids)
self._total_object_refs = total_object_refs or len(object_refs)
self._indices = {}
# Thread-safe queue used to add ObjectIDs to fetch after creating
# Thread-safe queue used to add ObjectRefs to fetch after creating
# this thread (used to lazily submit for imap and imap_unordered).
self._new_object_ids = queue.Queue()
for object_id in object_ids:
self._add_object_id(object_id)
self._new_object_refs = queue.Queue()
for object_ref in object_refs:
self._add_object_ref(object_ref)
def _add_object_id(self, object_id):
self._indices[object_id] = len(self._object_ids)
self._object_ids.append(object_id)
def _add_object_ref(self, object_ref):
self._indices[object_ref] = len(self._object_refs)
self._object_refs.append(object_ref)
self._results.append(None)
def add_object_id(self, object_id):
self._new_object_ids.put(object_id)
def add_object_ref(self, object_ref):
self._new_object_refs.put(object_ref)
def run(self):
unready = copy.copy(self._object_ids)
while self._num_ready < self._total_object_ids:
unready = copy.copy(self._object_refs)
while self._num_ready < self._total_object_refs:
# Get as many new IDs from the queue as possible without blocking,
# unless we have no IDs to wait on, in which case we block.
while True:
try:
block = len(unready) == 0
new_object_id = self._new_object_ids.get(block=block)
self._add_object_id(new_object_id)
unready.append(new_object_id)
new_object_ref = self._new_object_refs.get(block=block)
self._add_object_ref(new_object_ref)
unready.append(new_object_ref)
except queue.Empty:
# queue.Empty means no result was retrieved if block=False.
break
@@ -114,12 +114,12 @@ class AsyncResult:
"""
def __init__(self,
chunk_object_ids,
chunk_object_refs,
callback=None,
error_callback=None,
single_result=False):
self._single_result = single_result
self._result_thread = ResultThread(chunk_object_ids, callback,
self._result_thread = ResultThread(chunk_object_refs, callback,
error_callback)
self._result_thread.start()
@@ -189,7 +189,7 @@ class IMapIterator:
self._chunksize = chunksize or pool._calculate_chunksize(iterable)
self._total_chunks = div_round_up(len(iterable), chunksize)
self._result_thread = ResultThread(
[], total_object_ids=self._total_chunks)
[], total_object_refs=self._total_chunks)
self._result_thread.start()
for _ in range(len(self._pool._actor_pool)):
@@ -204,7 +204,7 @@ class IMapIterator:
new_chunk_id = self._pool._submit_chunk(self._func, self._iterator,
self._chunksize, actor_index)
self._submitted_chunks.append(False)
self._result_thread.add_object_id(new_chunk_id)
self._result_thread.add_object_ref(new_chunk_id)
def __iter__(self):
return self
@@ -411,14 +411,14 @@ class Pool:
# Batch should be a list of tuples: (args, kwargs).
def _run_batch(self, actor_index, func, batch):
actor, count = self._actor_pool[actor_index]
object_id = actor.run_batch.remote(func, batch)
object_ref = actor.run_batch.remote(func, batch)
count += 1
assert self._maxtasksperchild == -1 or count <= self._maxtasksperchild
if count == self._maxtasksperchild:
self._stop_actor(actor)
actor, count = self._new_actor_entry()
self._actor_pool[actor_index] = (actor, count)
return object_id
return object_ref
def apply(self, func, args=None, kwargs=None):
"""Run the given function on a random actor process and return the
@@ -459,10 +459,10 @@ class Pool:
"""
self._check_running()
object_id = self._run_batch(self._random_actor_index(), func,
[(args, kwargs)])
object_ref = self._run_batch(self._random_actor_index(), func,
[(args, kwargs)])
return AsyncResult(
[object_id], callback, error_callback, single_result=True)
[object_ref], callback, error_callback, single_result=True)
def _calculate_chunksize(self, iterable):
chunksize, extra = divmod(len(iterable), len(self._actor_pool) * 4)
@@ -500,10 +500,10 @@ class Pool:
chunksize = self._calculate_chunksize(iterable)
iterator = iter(iterable)
chunk_object_ids = []
while len(chunk_object_ids) * chunksize < len(iterable):
actor_index = len(chunk_object_ids) % len(self._actor_pool)
chunk_object_ids.append(
chunk_object_refs = []
while len(chunk_object_refs) * chunksize < len(iterable):
actor_index = len(chunk_object_refs) % len(self._actor_pool)
chunk_object_refs.append(
self._submit_chunk(
func,
iterator,
@@ -511,7 +511,7 @@ class Pool:
actor_index,
unpack_args=unpack_args))
return chunk_object_ids
return chunk_object_refs
def _map_async(self,
func,
@@ -521,9 +521,9 @@ class Pool:
callback=None,
error_callback=None):
self._check_running()
object_ids = self._chunk_and_run(
object_refs = self._chunk_and_run(
func, iterable, chunksize=chunksize, unpack_args=unpack_args)
return AsyncResult(object_ids, callback, error_callback)
return AsyncResult(object_refs, callback, error_callback)
def map(self, func, iterable, chunksize=None):
"""Run the given function on each element in the iterable round-robin
+1 -1
View File
@@ -201,7 +201,7 @@ def check_for_failure(remote_values):
"""Checks remote values for any that returned and failed.
Args:
remote_values (list): List of object IDs representing functions
remote_values (list): List of object refs representing functions
that may fail in the middle of execution. For example, running
a SGD training loop in multiple parallel actor calls.
+2 -2
View File
@@ -233,8 +233,8 @@ def ensure_str(s, encoding="utf-8", errors="strict"):
return s.decode(encoding, errors)
def binary_to_object_id(binary_object_id):
return ray.ObjectID(binary_object_id)
def binary_to_object_ref(binary_object_ref):
return ray.ObjectRef(binary_object_ref)
def binary_to_task_id(binary_task_id):
+91 -89
View File
@@ -33,7 +33,7 @@ import ray.state
from ray import (
ActorID,
JobID,
ObjectID,
ObjectRef,
Language,
)
from ray import import_thread
@@ -121,7 +121,7 @@ class Worker:
# increment every time when `ray.shutdown` is called.
self._session_index = 0
# Functions to run to process the values returned by ray.get. Each
# postprocessor must take two arguments ("object_ids", and "values").
# postprocessor must take two arguments ("object_refs", and "values").
self._post_get_hooks = []
@property
@@ -224,10 +224,10 @@ class Worker:
"""
self.mode = mode
def put_object(self, value, object_id=None, pin_object=True):
"""Put value in the local object store with object id `objectid`.
def put_object(self, value, object_ref=None, pin_object=True):
"""Put value in the local object store with object reference `object_ref`.
This assumes that the value for `objectid` has not yet been placed in
This assumes that the value for `object_ref` has not yet been placed in
the local object store. If the plasma store is full, the worker will
automatically retry up to DEFAULT_PUT_OBJECT_RETRIES times. Each
retry will delay for an exponentially doubling amount of time,
@@ -236,70 +236,71 @@ class Worker:
Args:
value: The value to put in the object store.
object_id (object_id.ObjectID): The object ID of the value to be
object_ref (ObjectRef): The object ref of the value to be
put. If None, one will be generated.
pin_object: If set, the object will be pinned at the raylet.
Returns:
object_id.ObjectID: The object ID the object was put under.
ObjectRef: The object ref the object was put under.
Raises:
ray.exceptions.ObjectStoreFullError: This is raised if the attempt
to store the object fails because the object store is full even
after multiple retries.
"""
# Make sure that the value is not an object ID.
if isinstance(value, ObjectID):
# Make sure that the value is not an object ref.
if isinstance(value, ObjectRef):
raise TypeError(
"Calling 'put' on an ray.ObjectID is not allowed "
"(similarly, returning an ray.ObjectID from a remote "
"Calling 'put' on an ray.ObjectRef is not allowed "
"(similarly, returning an ray.ObjectRef from a remote "
"function is not allowed). If you really want to "
"do this, you can wrap the ray.ObjectID in a list and "
"do this, you can wrap the ray.ObjectRef in a list and "
"call 'put' on it (or return it).")
if self.mode == LOCAL_MODE:
assert object_id is None, ("Local Mode does not support "
"inserting with an objectID")
assert object_ref is None, ("Local Mode does not support "
"inserting with an ObjectRef")
serialized_value = self.get_serialization_context().serialize(value)
# This *must* be the first place that we construct this python
# ObjectID because an entry with 0 local references is created when
# ObjectRef because an entry with 0 local references is created when
# the object is Put() in the core worker, expecting that this python
# reference will be created. If another reference is created and
# removed before this one, it will corrupt the state in the
# reference counter.
return ray.ObjectID(
return ray.ObjectRef(
self.core_worker.put_serialized_object(
serialized_value, object_id=object_id, pin_object=pin_object))
serialized_value, object_ref=object_ref,
pin_object=pin_object))
def deserialize_objects(self, data_metadata_pairs, object_ids):
def deserialize_objects(self, data_metadata_pairs, object_refs):
context = self.get_serialization_context()
return context.deserialize_objects(data_metadata_pairs, object_ids)
return context.deserialize_objects(data_metadata_pairs, object_refs)
def get_objects(self, object_ids, timeout=None):
def get_objects(self, object_refs, timeout=None):
"""Get the values in the object store associated with the IDs.
Return the values from the local object store for object_ids. This will
block until all the values for object_ids have been written to the
local object store.
Return the values from the local object store for object_refs. This
will block until all the values for object_refs have been written to
the local object store.
Args:
object_ids (List[object_id.ObjectID]): A list of the object IDs
object_refs (List[object_ref.ObjectRef]): A list of the object refs
whose values should be retrieved.
timeout (float): timeout (float): The maximum amount of time in
seconds to wait before returning.
"""
# Make sure that the values are object IDs.
for object_id in object_ids:
if not isinstance(object_id, ObjectID):
# Make sure that the values are object refs.
for object_ref in object_refs:
if not isinstance(object_ref, ObjectRef):
raise TypeError(
"Attempting to call `get` on the value {}, "
"which is not an ray.ObjectID.".format(object_id))
"which is not an ray.ObjectRef.".format(object_ref))
timeout_ms = int(timeout * 1000) if timeout else -1
data_metadata_pairs = self.core_worker.get_objects(
object_ids, self.current_task_id, timeout_ms)
return self.deserialize_objects(data_metadata_pairs, object_ids)
object_refs, self.current_task_id, timeout_ms)
return self.deserialize_objects(data_metadata_pairs, object_refs)
def run_function_on_all_workers(self, function,
run_on_other_drivers=False):
@@ -470,7 +471,7 @@ def init(address=None,
redis_max_memory=None,
log_to_driver=True,
node_ip_address=ray_constants.NODE_DEFAULT_IP,
object_id_seed=None,
object_ref_seed=None,
local_mode=False,
redirect_worker_output=None,
redirect_output=None,
@@ -551,9 +552,9 @@ def init(address=None,
log_to_driver (bool): If true, the output from all of the worker
processes on all nodes will be directed to the driver.
node_ip_address (str): The IP address of the node that we are on.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
same driver in order to generate the object IDs in a consistent
object_ref_seed (int): Used to seed the deterministic generation of
object refs. The same value can be used across multiple runs of the
same driver in order to generate the object refs in a consistent
manner. However, the same ID should not be used for different
drivers.
local_mode (bool): If true, the code will be executed serially. This
@@ -690,7 +691,7 @@ def init(address=None,
redis_port=redis_port,
node_ip_address=node_ip_address,
raylet_ip_address=raylet_ip_address,
object_id_seed=object_id_seed,
object_ref_seed=object_ref_seed,
driver_mode=driver_mode,
redirect_worker_output=redirect_worker_output,
redirect_output=redirect_output,
@@ -780,7 +781,7 @@ def init(address=None,
raylet_ip_address=raylet_ip_address,
redis_address=redis_address,
redis_password=redis_password,
object_id_seed=object_id_seed,
object_ref_seed=object_ref_seed,
temp_dir=temp_dir,
load_code_from_local=load_code_from_local,
_internal_config=_internal_config)
@@ -1463,22 +1464,22 @@ def show_in_webui(message, key="", dtype="text"):
blocking_get_inside_async_warned = False
def get(object_ids, timeout=None):
def get(object_refs, timeout=None):
"""Get a remote object or a list of remote objects from the object store.
This method blocks until the object corresponding to the object ID is
This method blocks until the object corresponding to the object ref is
available in the local object store. If this object is not in the local
object store, it will be shipped from an object store that has it (once the
object has been created). If object_ids is a list, then the objects
object has been created). If object_refs is a list, then the objects
corresponding to each object in the list will be returned.
This method will issue a warning if it's running inside async context,
you can use ``await object_id`` instead of ``ray.get(object_id)``. For
a list of object ids, you can use ``await asyncio.gather(*object_ids)``.
you can use ``await object_ref`` instead of ``ray.get(object_ref)``. For
a list of object refs, you can use ``await asyncio.gather(*object_refs)``.
Args:
object_ids: Object ID of the object to get or a list of object IDs to
get.
object_refs: Object ref of the object to get or a list of object refs
to get.
timeout (Optional[float]): The maximum amount of time in seconds to
wait before returning.
@@ -1501,22 +1502,22 @@ def get(object_ids, timeout=None):
if not blocking_get_inside_async_warned:
logger.debug("Using blocking ray.get inside async actor. "
"This blocks the event loop. Please use `await` "
"on object id with asyncio.gather if you want to "
"on object ref with asyncio.gather if you want to "
"yield execution to the event loop instead.")
blocking_get_inside_async_warned = True
with profiling.profile("ray.get"):
is_individual_id = isinstance(object_ids, ray.ObjectID)
is_individual_id = isinstance(object_refs, ray.ObjectRef)
if is_individual_id:
object_ids = [object_ids]
object_refs = [object_refs]
if not isinstance(object_ids, list):
raise ValueError("'object_ids' must either be an object ID "
"or a list of object IDs.")
if not isinstance(object_refs, list):
raise ValueError("'object_refs' must either be an object ref "
"or a list of object refs.")
global last_task_error_raise_time
# TODO(ujvl): Consider how to allow user to retrieve the ready objects.
values = worker.get_objects(object_ids, timeout=timeout)
values = worker.get_objects(object_refs, timeout=timeout)
for i, value in enumerate(values):
if isinstance(value, RayError):
last_task_error_raise_time = time.time()
@@ -1529,7 +1530,7 @@ def get(object_ids, timeout=None):
# Run post processors.
for post_processor in worker._post_get_hooks:
values = post_processor(object_ids, values)
values = post_processor(object_refs, values)
if is_individual_id:
values = values[0]
@@ -1549,56 +1550,56 @@ def put(value, weakref=False):
It allows Ray to more aggressively reclaim memory.
Returns:
The object ID assigned to this value.
The object ref assigned to this value.
"""
worker = global_worker
worker.check_connected()
with profiling.profile("ray.put"):
try:
object_id = worker.put_object(value, pin_object=not weakref)
object_ref = worker.put_object(value, pin_object=not weakref)
except ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects.")
raise
return object_id
return object_ref
# Global variable to make sure we only send out the warning once.
blocking_wait_inside_async_warned = False
def wait(object_ids, num_returns=1, timeout=None):
def wait(object_refs, num_returns=1, timeout=None):
"""Return a list of IDs that are ready and a list of IDs that are not.
If timeout is set, the function returns either when the requested number of
IDs are ready or when the timeout is reached, whichever occurs first. If it
is not set, the function simply waits until that number of objects is ready
and returns that exact number of object IDs.
and returns that exact number of object refs.
This method returns two lists. The first list consists of object IDs that
This method returns two lists. The first list consists of object refs that
correspond to objects that are available in the object store. The second
list corresponds to the rest of the object IDs (which may or may not be
list corresponds to the rest of the object refs (which may or may not be
ready).
Ordering of the input list of object IDs is preserved. That is, if A
Ordering of the input list of object refs is preserved. That is, if A
precedes B in the input list, and both are in the ready list, then A will
precede B in the ready list. This also holds true if A and B are both in
the remaining list.
This method will issue a warning if it's running inside an async context.
Instead of ``ray.wait(object_ids)``, you can use
``await asyncio.wait(object_ids)``.
Instead of ``ray.wait(object_refs)``, you can use
``await asyncio.wait(object_refs)``.
Args:
object_ids (List[ObjectID]): List of object IDs for objects that may or
may not be ready. Note that these IDs must be unique.
num_returns (int): The number of object IDs that should be returned.
object_refs (List[ObjectRef]): List of object refs for objects that may
or may not be ready. Note that these IDs must be unique.
num_returns (int): The number of object refs that should be returned.
timeout (float): The maximum amount of time in seconds to wait before
returning.
Returns:
A list of object IDs that are ready and a list of the remaining object
A list of object refs that are ready and a list of the remaining object
IDs.
"""
worker = global_worker
@@ -1610,26 +1611,27 @@ def wait(object_ids, num_returns=1, timeout=None):
if not blocking_wait_inside_async_warned:
logger.debug("Using blocking ray.wait inside async method. "
"This blocks the event loop. Please use `await` "
"on object id with asyncio.wait. ")
"on object ref with asyncio.wait. ")
blocking_wait_inside_async_warned = True
if isinstance(object_ids, ObjectID):
raise TypeError("wait() expected a list of ray.ObjectID, got a single "
"ray.ObjectID")
if not isinstance(object_ids, list):
if isinstance(object_refs, ObjectRef):
raise TypeError(
"wait() expected a list of ray.ObjectID, got {}".format(
type(object_ids)))
"wait() expected a list of ray.ObjectRef, got a single "
"ray.ObjectRef")
if not isinstance(object_refs, list):
raise TypeError(
"wait() expected a list of ray.ObjectRef, got {}".format(
type(object_refs)))
if timeout is not None and timeout < 0:
raise ValueError("The 'timeout' argument must be nonnegative. "
"Received {}".format(timeout))
for object_id in object_ids:
if not isinstance(object_id, ObjectID):
raise TypeError("wait() expected a list of ray.ObjectID, "
"got list containing {}".format(type(object_id)))
for object_ref in object_refs:
if not isinstance(object_ref, ObjectRef):
raise TypeError("wait() expected a list of ray.ObjectRef, "
"got list containing {}".format(type(object_ref)))
worker.check_connected()
# TODO(swang): Check main thread.
@@ -1638,22 +1640,22 @@ def wait(object_ids, num_returns=1, timeout=None):
# TODO(rkn): This is a temporary workaround for
# https://github.com/ray-project/ray/issues/997. However, it should be
# fixed in Arrow instead of here.
if len(object_ids) == 0:
if len(object_refs) == 0:
return [], []
if len(object_ids) != len(set(object_ids)):
raise ValueError("Wait requires a list of unique object IDs.")
if len(object_refs) != len(set(object_refs)):
raise ValueError("Wait requires a list of unique object refs.")
if num_returns <= 0:
raise ValueError(
"Invalid number of objects to return %d." % num_returns)
if num_returns > len(object_ids):
if num_returns > len(object_refs):
raise ValueError("num_returns cannot be greater than the number "
"of objects provided to ray.wait.")
timeout = timeout if timeout is not None else 10**6
timeout_milliseconds = int(timeout * 1000)
ready_ids, remaining_ids = worker.core_worker.wait(
object_ids,
object_refs,
num_returns,
timeout_milliseconds,
worker.current_task_id,
@@ -1702,7 +1704,7 @@ def kill(actor, no_restart=True):
worker.core_worker.kill_actor(actor._ray_actor_id, no_restart)
def cancel(object_id, force=False):
def cancel(object_ref, force=False):
"""Cancels a task according to the following conditions.
If the specified task is pending execution, it will not be executed. If
@@ -1717,7 +1719,7 @@ def cancel(object_id, force=False):
Calling ray.get on a canceled task will raise a RayCancellationError.
Args:
object_id (ObjectID): ObjectID returned by the task
object_ref (ObjectRef): ObjectRef returned by the task
that should be canceled.
force (boolean): Whether to force-kill a running task by killing
the worker that is running the task.
@@ -1727,11 +1729,11 @@ def cancel(object_id, force=False):
worker = ray.worker.global_worker
worker.check_connected()
if not isinstance(object_id, ray.ObjectID):
if not isinstance(object_ref, ray.ObjectRef):
raise TypeError(
"ray.cancel() only supported for non-actor object IDs. "
"Got: {}.".format(type(object_id)))
return worker.core_worker.cancel_task(object_id, force)
"ray.cancel() only supported for non-actor object refs. "
"Got: {}.".format(type(object_ref)))
return worker.core_worker.cancel_task(object_ref, force)
def _mode(worker=global_worker):
@@ -1810,7 +1812,7 @@ def remote(*args, **kwargs):
It can also be used with specific keyword arguments:
* **num_return_vals:** This is only for *remote functions*. It specifies
the number of object IDs returned by the remote function invocation.
the number of object refs returned by the remote function invocation.
* **num_cpus:** The quantity of CPU cores to reserve for this task or for
the lifetime of the actor.
* **num_gpus:** The quantity of GPUs to reserve for this task or for the