Basic Async Actor Call (#6183)

* Start trying to figure out where to put fibers

* Pass is_async flag from python to context

* Just running things in fiber works

* Yield implemented, need some debugging to make it work

* It worked!

* Remove debug prints

* Lint

* Revert the clang-format

* Remove unnecessary log

* Remove unncessary import

* Add attribution

* Address comment

* Add test

* Missed a merge conflict

* Make test pass and compile

* Address comment

* Rename async -> asyncio

* Move async test to py3 only

* Fix ignore path
This commit is contained in:
Simon Mo
2019-11-21 11:56:46 -08:00
committed by GitHub
parent c4132b501b
commit 29ba6bfc64
20 changed files with 242 additions and 28 deletions
+46 -4
View File
@@ -5,8 +5,14 @@
from cpython.exc cimport PyErr_CheckSignals
try:
import asyncio
except ImportError:
# Python2 doesn't have asyncio
asyncio = None
import numpy
import gc
import inspect
import threading
import time
import logging
@@ -71,6 +77,7 @@ from ray.includes.libcoreworker cimport (
CCoreWorker,
CTaskOptions,
ResourceMappingType,
CFiberEvent
)
from ray.includes.task cimport CTaskSpec
from ray.includes.ray_config cimport RayConfig
@@ -120,6 +127,7 @@ include "includes/libcoreworker.pxi"
logger = logging.getLogger(__name__)
MEMCOPY_THREADS = 12
PY3 = cpython.PY_MAJOR_VERSION >= 3
if cpython.PY_MAJOR_VERSION >= 3:
@@ -494,6 +502,7 @@ cdef execute_task(
CoreWorker core_worker = worker.core_worker
JobID job_id = core_worker.get_current_job_id()
CTaskID task_id = core_worker.core_worker.get().GetCurrentTaskId()
CFiberEvent fiber_event
# Automatically restrict the GPUs available to this task.
ray.utils.set_cuda_visible_devices(ray.get_gpu_ids())
@@ -547,7 +556,24 @@ cdef execute_task(
c_resources.find(b"object_store_memory")).second)))
def function_executor(*arguments, **kwarguments):
return execution_info.function(actor, *arguments, **kwarguments)
function = execution_info.function
result_or_coroutine = function(actor, *arguments, **kwarguments)
if PY3 and inspect.iscoroutine(result_or_coroutine):
coroutine = result_or_coroutine
loop = core_worker.create_or_get_event_loop()
future = asyncio.run_coroutine_threadsafe(coroutine, loop)
future.add_done_callback(
lambda future: fiber_event.Notify())
with nogil:
(core_worker.core_worker.get()
.YieldCurrentFiber(fiber_event))
return future.result()
return result_or_coroutine
with core_worker.profile_event(b"task", extra_data=extra_data):
try:
@@ -702,7 +728,10 @@ cdef write_serialized_object(
cdef class CoreWorker:
cdef unique_ptr[CCoreWorker] core_worker
cdef:
unique_ptr[CCoreWorker] core_worker
object async_thread
object async_event_loop
def __cinit__(self, is_driver, store_socket, raylet_socket,
JobID job_id, GcsClientOptions gcs_options, log_dir,
@@ -901,7 +930,8 @@ cdef class CoreWorker:
placement_resources,
c_bool is_direct_call,
int32_t max_concurrency,
c_bool is_detached):
c_bool is_detached,
c_bool is_asyncio):
cdef:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
@@ -923,7 +953,7 @@ cdef class CoreWorker:
CActorCreationOptions(
max_reconstructions, is_direct_call, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached),
dynamic_worker_options, is_detached, is_asyncio),
&c_actor_id))
return ActorID(c_actor_id.Binary())
@@ -1060,3 +1090,15 @@ cdef class CoreWorker:
else:
write_serialized_object(
serialized_object, returns[0][i].get().GetData())
def create_or_get_event_loop(self):
if self.async_event_loop is None:
self.async_event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.async_event_loop)
if self.async_thread is None:
self.async_thread = threading.Thread(
target=lambda: self.async_event_loop.run_forever()
)
self.async_thread.start()
return self.async_event_loop
+11 -2
View File
@@ -358,7 +358,8 @@ class ActorClass(object):
is_direct_call=None,
max_concurrency=None,
name=None,
detached=False):
detached=False,
is_asyncio=False):
"""Create an actor.
This method allows more flexibility than the remote method because
@@ -381,6 +382,8 @@ class ActorClass(object):
name: The globally unique name for the actor.
detached: Whether the actor should be kept alive after driver
exits.
is_asyncio: Turn on async actor calls. This only works with direct
actor calls.
Returns:
A handle to the newly created actor.
@@ -400,6 +403,12 @@ class ActorClass(object):
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
if is_asyncio and not is_direct_call:
raise ValueError(
"Setting is_asyncio requires is_direct_call=True.")
if is_asyncio and max_concurrency != 1:
raise ValueError("Setting is_asyncio requires max_concurrency=1.")
worker = ray.worker.get_global_worker()
if worker.mode is None:
raise Exception("Actors cannot be created before ray.init() "
@@ -487,7 +496,7 @@ class ActorClass(object):
function_descriptor.get_function_descriptor_list(),
creation_args, meta.max_reconstructions, resources,
actor_placement_resources, is_direct_call, max_concurrency,
detached)
detached, is_asyncio)
actor_handle = ActorHandle(
actor_id,
+1 -1
View File
@@ -217,7 +217,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached)
c_bool is_detached, c_bool is_asyncio)
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
+8
View File
@@ -48,6 +48,12 @@ cdef extern from "ray/core_worker/profiling.h" nogil:
cdef cppclass CProfileEvent "ray::worker::ProfileEvent":
void SetExtraData(const c_string &extra_data)
cdef extern from "ray/core_worker/transport/direct_actor_transport.h" nogil:
cdef cppclass CFiberEvent "ray::FiberEvent":
CFiberEvent()
void Wait()
void Notify()
cdef extern from "ray/core_worker/core_worker.h" nogil:
cdef cppclass CCoreWorker "ray::CoreWorker":
CCoreWorker(const CWorkerType worker_type, const CLanguage language,
@@ -125,3 +131,5 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus Delete(const c_vector[CObjectID] &object_ids,
c_bool local_only, c_bool delete_creating_tasks)
c_string MemoryUsageString()
void YieldCurrentFiber(CFiberEvent &coroutine_done)
@@ -3,6 +3,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import asyncio
import pytest
import ray
@@ -94,3 +95,32 @@ def test_args_intertwined(ray_start_regular):
local_method = local_actor.cls_args_intertwined
test_function(local_method, actor_method)
ray.get(remote_test_function.remote(local_method, actor_method))
def test_asyncio_actor(ray_start_regular):
@ray.remote
class AsyncBatcher(object):
def __init__(self):
self.batch = []
# The event currently need to be created from the same thread.
# We currently run async coroutines from a different thread.
self.event = None
async def add(self, x):
if self.event is None:
self.event = asyncio.Event()
self.batch.append(x)
if len(self.batch) >= 3:
self.event.set()
else:
await self.event.wait()
return sorted(self.batch)
a = AsyncBatcher.options(is_direct_call=True, is_asyncio=True).remote()
x1 = a.add.remote(1)
x2 = a.add.remote(2)
x3 = a.add.remote(3)
r1 = ray.get(x1)
r2 = ray.get(x2)
r3 = ray.get(x3)
assert r1 == [1, 2, 3]
assert r1 == r2 == r3
+1
View File
@@ -2852,3 +2852,4 @@ ray.get(actor.ping.remote())
run_string_as_driver(driver_script)
detached_actor = ray.experimental.get_actor(actor_name)
assert ray.get(detached_actor.ping.remote()) == "pong"