Rename _submit -> _remote. (#3321)

This commit is contained in:
Robert Nishihara
2018-11-15 15:30:18 -08:00
committed by Philipp Moritz
parent 98edf752a9
commit d10cb570ab
3 changed files with 61 additions and 17 deletions
+26 -2
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
import copy
import hashlib
import inspect
import logging
import os
import signal
import traceback
@@ -19,6 +20,8 @@ from ray.utils import _random_string
DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1
logger = logging.getLogger(__name__)
def compute_actor_handle_id(actor_handle_id, num_forks):
"""Deterministically compute an actor handle ID.
@@ -223,9 +226,15 @@ class ActorMethod(object):
self._method_name))
def remote(self, *args, **kwargs):
return self._submit(args, kwargs)
return self._remote(args, kwargs)
def _submit(self, args, kwargs, num_return_vals=None):
logger.warn(
"WARNING: _submit() is being deprecated. Please use _remote().")
return self._remote(
args=args, kwargs=kwargs, num_return_vals=num_return_vals)
def _remote(self, args, kwargs, num_return_vals=None):
if num_return_vals is None:
num_return_vals = self._num_return_vals
@@ -320,7 +329,7 @@ class ActorClass(object):
Returns:
A handle to the newly created actor.
"""
return self._submit(args=args, kwargs=kwargs)
return self._remote(args=args, kwargs=kwargs)
def _submit(self,
args,
@@ -328,6 +337,21 @@ class ActorClass(object):
num_cpus=None,
num_gpus=None,
resources=None):
logger.warn(
"WARNING: _submit() is being deprecated. Please use _remote().")
return self._remote(
args=args,
kwargs=kwargs,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)
def _remote(self,
args,
kwargs,
num_cpus=None,
num_gpus=None,
resources=None):
"""Create an actor.
This method allows more flexibility than the remote method because
+21 -1
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
import copy
import hashlib
import inspect
import logging
import ray.ray_constants as ray_constants
import ray.signature
@@ -14,6 +15,8 @@ DEFAULT_REMOTE_FUNCTION_CPUS = 1
DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS = 1
DEFAULT_REMOTE_FUNCTION_MAX_CALLS = 0
logger = logging.getLogger(__name__)
def compute_function_id(function):
"""Compute an function ID for a function.
@@ -97,7 +100,7 @@ class RemoteFunction(object):
def remote(self, *args, **kwargs):
"""This runs immediately when a remote function is called."""
return self._submit(args=args, kwargs=kwargs)
return self._remote(args=args, kwargs=kwargs)
def _submit(self,
args=None,
@@ -106,6 +109,23 @@ class RemoteFunction(object):
num_cpus=None,
num_gpus=None,
resources=None):
logger.warn(
"WARNING: _submit() is being deprecated. Please use _remote().")
return self._remote(
args=args,
kwargs=kwargs,
num_return_vals=num_return_vals,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)
def _remote(self,
args=None,
kwargs=None,
num_return_vals=None,
num_cpus=None,
num_gpus=None,
resources=None):
"""An experimental alternate way to submit remote functions."""
worker = ray.worker.get_global_worker()
worker.check_connected()
+14 -14
View File
@@ -739,18 +739,18 @@ def test_defining_remote_functions(shutdown_only):
def g():
return ray.get_gpu_ids()
assert f._submit([0], num_return_vals=0) is None
id1 = f._submit(args=[1], num_return_vals=1)
assert f._remote([0], num_return_vals=0) is None
id1 = f._remote(args=[1], num_return_vals=1)
assert ray.get(id1) == [0]
id1, id2 = f._submit(args=[2], num_return_vals=2)
id1, id2 = f._remote(args=[2], num_return_vals=2)
assert ray.get([id1, id2]) == [0, 1]
id1, id2, id3 = f._submit(args=[3], num_return_vals=3)
id1, id2, id3 = f._remote(args=[3], num_return_vals=3)
assert ray.get([id1, id2, id3]) == [0, 1, 2]
assert ray.get(
g._submit(
g._remote(
args=[], num_cpus=1, num_gpus=1,
resources={"Custom": 1})) == [0]
infeasible_id = g._submit(args=[], resources={"NonexistentCustom": 1})
infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1})
ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=50)
assert len(ready_ids) == 0
assert len(remaining_ids) == 1
@@ -767,10 +767,10 @@ def test_defining_remote_functions(shutdown_only):
def gpu_ids(self):
return ray.get_gpu_ids()
a = Actor._submit(
a = Actor._remote(
args=[0], kwargs={"y": 1}, num_gpus=1, resources={"Custom": 1})
id1, id2, id3, id4 = a.method._submit(
id1, id2, id3, id4 = a.method._remote(
args=["test"], kwargs={"b": 2}, num_return_vals=4)
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]
@@ -1064,13 +1064,13 @@ def test_object_transfer_dump(ray_start_cluster):
# These objects will live on different nodes.
object_ids = [
f._submit(args=[1], resources={str(i): 1}) for i in range(num_nodes)
f._remote(args=[1], resources={str(i): 1}) for i in range(num_nodes)
]
# Broadcast each object from each machine to each other machine.
for object_id in object_ids:
ray.get([
f._submit(args=[object_id], resources={str(i): 1})
f._remote(args=[object_id], resources={str(i): 1})
for i in range(num_nodes)
])
@@ -1715,17 +1715,17 @@ def test_fractional_resources(shutdown_only):
pass
# Create an actor that requires 0.7 of the custom resource.
f1 = Foo2._submit([], {}, resources={"Custom": 0.7})
f1 = Foo2._remote([], {}, resources={"Custom": 0.7})
ray.get(f1.method.remote())
# Make sure that we cannot create an actor that requires 0.7 of the
# custom resource. TODO(rkn): Re-enable this once ray.wait is
# implemented.
f2 = Foo2._submit([], {}, resources={"Custom": 0.7})
f2 = Foo2._remote([], {}, resources={"Custom": 0.7})
ready, _ = ray.wait([f2.method.remote()], timeout=500)
assert len(ready) == 0
# Make sure we can start an actor that requries only 0.3 of the custom
# resource.
f3 = Foo2._submit([], {}, resources={"Custom": 0.3})
f3 = Foo2._remote([], {}, resources={"Custom": 0.3})
ray.get(f3.method.remote())
del f1, f3
@@ -1741,7 +1741,7 @@ def test_fractional_resources(shutdown_only):
test.remote()
with pytest.raises(ValueError):
Foo2._submit([], {}, resources={"Custom": 1.5})
Foo2._remote([], {}, resources={"Custom": 1.5})
def test_multiple_local_schedulers(shutdown_only):