mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 00:55:31 +08:00
Fix actor ID collision in local mode (#5863)
* Fixed local mode actor id * Update python/ray/actor.py Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com> * Added hyphen to match comments * Added tests to test_local_mode * Helloworld * Better test naming * lint
This commit is contained in:
+1
-3
@@ -369,9 +369,7 @@ class ActorClass(object):
|
||||
# Instead, instantiate the actor locally and add it to the worker's
|
||||
# dictionary
|
||||
if worker.mode == ray.LOCAL_MODE:
|
||||
actor_id = ActorID.of(worker.current_job_id,
|
||||
worker.current_task_id,
|
||||
worker.task_context.task_index + 1)
|
||||
actor_id = ActorID.from_random()
|
||||
worker.actors[actor_id] = meta.modified_class(
|
||||
*copy.deepcopy(args), **copy.deepcopy(kwargs))
|
||||
core_handle = ray._raylet.ActorHandle(
|
||||
|
||||
@@ -211,30 +211,35 @@ cdef class TaskID(BaseID):
|
||||
|
||||
@classmethod
|
||||
def for_driver_task(cls, job_id):
|
||||
return cls(CTaskID.ForDriverTask(CJobID.FromBinary(job_id.binary())).Binary())
|
||||
return cls(CTaskID.ForDriverTask(
|
||||
CJobID.FromBinary(job_id.binary())).Binary())
|
||||
|
||||
@classmethod
|
||||
def for_actor_creation_task(cls, actor_id):
|
||||
assert isinstance(actor_id, ActorID)
|
||||
return cls(CTaskID.ForActorCreationTask(CActorID.FromBinary(actor_id.binary())).Binary())
|
||||
return cls(CTaskID.ForActorCreationTask(
|
||||
CActorID.FromBinary(actor_id.binary())).Binary())
|
||||
|
||||
@classmethod
|
||||
def for_actor_task(cls, job_id, parent_task_id, parent_task_counter, actor_id):
|
||||
def for_actor_task(cls, job_id, parent_task_id,
|
||||
parent_task_counter, actor_id):
|
||||
assert isinstance(job_id, JobID)
|
||||
assert isinstance(parent_task_id, TaskID)
|
||||
assert isinstance(actor_id, ActorID)
|
||||
return cls(CTaskID.ForActorTask(CJobID.FromBinary(job_id.binary()),
|
||||
CTaskID.FromBinary(parent_task_id.binary()),
|
||||
parent_task_counter,
|
||||
CActorID.FromBinary(actor_id.binary())).Binary())
|
||||
return cls(CTaskID.ForActorTask(
|
||||
CJobID.FromBinary(job_id.binary()),
|
||||
CTaskID.FromBinary(parent_task_id.binary()),
|
||||
parent_task_counter,
|
||||
CActorID.FromBinary(actor_id.binary())).Binary())
|
||||
|
||||
@classmethod
|
||||
def for_normal_task(cls, job_id, parent_task_id, parent_task_counter):
|
||||
assert isinstance(job_id, JobID)
|
||||
assert isinstance(parent_task_id, TaskID)
|
||||
return cls(CTaskID.ForNormalTask(CJobID.FromBinary(job_id.binary()),
|
||||
CTaskID.FromBinary(parent_task_id.binary()),
|
||||
parent_task_counter).Binary())
|
||||
return cls(CTaskID.ForNormalTask(
|
||||
CJobID.FromBinary(job_id.binary()),
|
||||
CTaskID.FromBinary(parent_task_id.binary()),
|
||||
parent_task_counter).Binary())
|
||||
|
||||
cdef class ClientID(UniqueID):
|
||||
|
||||
@@ -314,6 +319,10 @@ cdef class ActorID(BaseID):
|
||||
def nil(cls):
|
||||
return cls(CActorID.Nil().Binary())
|
||||
|
||||
@classmethod
|
||||
def from_random(cls):
|
||||
return cls(os.urandom(CActorID.Size()))
|
||||
|
||||
@classmethod
|
||||
def size(cls):
|
||||
return CActorID.Size()
|
||||
|
||||
@@ -1766,6 +1766,28 @@ def test_local_mode(shutdown_only):
|
||||
with pytest.raises(Exception, match=exception_str):
|
||||
ray.get(obj2)
|
||||
|
||||
# Check that Actors are not overwritten by remote calls from different
|
||||
# classes.
|
||||
@ray.remote
|
||||
class RemoteActor1(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def function1(self):
|
||||
return 0
|
||||
|
||||
@ray.remote
|
||||
class RemoteActor2(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def function2(self):
|
||||
return 1
|
||||
|
||||
actor1 = RemoteActor1.remote()
|
||||
_ = RemoteActor2.remote()
|
||||
assert ray.get(actor1.function1.remote()) == 0
|
||||
|
||||
|
||||
def test_resource_constraints(shutdown_only):
|
||||
num_workers = 20
|
||||
|
||||
Reference in New Issue
Block a user