[Core] Added support for submission-time task names. (#10449)

* Added support for submission-time task names.

* Suggestions from code review: add missing consts

Co-authored-by: SangBin Cho <rkooo567@gmail.com>

* Add num_returns arg to actor method options docstring example.

* Add process name line and proctitle assertion to submission-time task name section of advanced docs.

* Add submission-time task name --> proctitle test for Python worker.

* Added Python actor options tests for num_returns and name.

* Added Java test for submission-time task names.

* Add dashboard image to task name docs section.

* Move to fstrings.

Co-authored-by: SangBin Cho <rkooo567@gmail.com>
This commit is contained in:
Clark Zinzow
2020-09-03 12:45:24 -06:00
committed by GitHub
parent 71274954d1
commit 0c0b0d0a73
37 changed files with 361 additions and 135 deletions
+6 -4
View File
@@ -67,8 +67,9 @@ WaitResult AbstractRayRuntime::Wait(const std::vector<ObjectID> &ids, int num_ob
ObjectID AbstractRayRuntime::Call(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
InvocationSpec invocationSpec;
invocationSpec.task_id =
TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task
// TODO(Guyang Song): make it from different task
invocationSpec.task_id = TaskID::ForFakeTask();
invocationSpec.name = "";
invocationSpec.actor_id = ActorID::Nil();
invocationSpec.args = args;
invocationSpec.func_offset =
@@ -87,8 +88,9 @@ ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr,
const ActorID &actor,
std::shared_ptr<msgpack::sbuffer> args) {
InvocationSpec invocationSpec;
invocationSpec.task_id =
TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task
// TODO(Guyang Song): make it from different task
invocationSpec.task_id = TaskID::ForFakeTask();
invocationSpec.name = "";
invocationSpec.actor_id = actor;
invocationSpec.args = args;
invocationSpec.func_offset =
@@ -11,6 +11,7 @@ namespace api {
class InvocationSpec {
public:
TaskID task_id;
std::string name;
ActorID actor_id;
int actor_counter;
/// Remote function offset from base address.
@@ -30,8 +30,10 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy
std::unordered_map<std::string, double> required_resources;
std::unordered_map<std::string, double> required_placement_resources;
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(invocation.task_id, rpc::Language::CPP, functionDescriptor,
local_mode_ray_tuntime_.GetCurrentJobID(),
std::string task_name =
invocation.name.empty() ? functionDescriptor->DefaultTaskName() : invocation.name;
builder.SetCommonTaskSpec(invocation.task_id, task_name, rpc::Language::CPP,
functionDescriptor, local_mode_ray_tuntime_.GetCurrentJobID(),
local_mode_ray_tuntime_.GetCurrentTaskId(), 0,
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
required_resources, required_placement_resources,
+4 -3
View File
@@ -29,8 +29,8 @@ class DefaultWorker {
"", // driver_name
"", // stdout_file
"", // stderr_file
std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6,
_7), // task_execution_callback
std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7,
_8), // task_execution_callback
nullptr, // check_signals
nullptr, // gc_collect
nullptr, // spill_objects
@@ -51,7 +51,8 @@ class DefaultWorker {
void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); }
private:
Status ExecuteTask(TaskType task_type, const RayFunction &ray_function,
Status ExecuteTask(TaskType task_type, const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
+20
View File
@@ -51,6 +51,26 @@ And vary the number of return values for tasks (and actor methods too):
assert ray.get(id1) == 0
assert ray.get(id2) == 1
And specify a name for tasks (and actor methods too) at task submission time:
.. code-block:: python
import setproctitle
@ray.remote
def f(x):
assert setproctitle.getproctitle() == "ray::special_f"
return x + 1
obj = f.options(name="special_f").remote(3)
assert ray.get(obj) == 4
This name will appear as the task name in the machine view of the dashboard, will appear
as the worker process name when this task is executing (if a Python task), and will
appear as the task name in the logs.
.. image:: images/task_name_dashboard.png
Dynamic Custom Resources
------------------------
Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

@@ -12,9 +12,20 @@ public class BaseTaskCaller<T extends BaseTaskCaller<T>> {
private CallOptions.Builder builder = new CallOptions.Builder();
/**
* Set a custom resource requirement for resource {@code name}.
* This method can be called multiple times. If the same resource is set multiple times,
* the latest quantity will be used.
* Set a name for this task.
*
* @param name task name
* @return self
* @see CallOptions.Builder#setName(java.lang.String)
*/
public T setName(String name) {
builder.setName(name);
return self();
}
/**
* Set a custom resource requirement for resource {@code name}. This method can be called multiple
* times. If the same resource is set multiple times, the latest quantity will be used.
*
* @param name resource name
* @param value resource capacity
@@ -27,9 +38,8 @@ public class BaseTaskCaller<T extends BaseTaskCaller<T>> {
}
/**
* Set custom requirements for multiple resources.
* This method can be called multiple times. If the same resource is set multiple times,
* the latest quantity will be used.
* Set custom requirements for multiple resources. This method can be called multiple times. If
* the same resource is set multiple times, the latest quantity will be used.
*
* @param resources requirements for multiple resources.
* @return self
@@ -48,5 +58,4 @@ public class BaseTaskCaller<T extends BaseTaskCaller<T>> {
protected CallOptions buildOptions() {
return builder.build();
}
}
@@ -3,26 +3,36 @@ package io.ray.api.options;
import java.util.HashMap;
import java.util.Map;
/**
* The options for RayCall.
*/
/** The options for RayCall. */
public class CallOptions extends BaseTaskOptions {
private CallOptions(Map<String, Double> resources) {
public final String name;
private CallOptions(String name, Map<String, Double> resources) {
super(resources);
this.name = name;
}
/**
* This inner class for building CallOptions.
*/
/** This inner class for building CallOptions. */
public static class Builder {
private String name;
private Map<String, Double> resources = new HashMap<>();
/**
* Set a custom resource requirement for resource {@code name}.
* This method can be called multiple times. If the same resource is set multiple times,
* the latest quantity will be used.
* Set a name for this task.
*
* @param name task name
* @return self
*/
public Builder setName(String name) {
this.name = name;
return this;
}
/**
* Set a custom resource requirement for resource {@code name}. This method can be called
* multiple times. If the same resource is set multiple times, the latest quantity will be used.
*
* @param name resource name
* @param value resource capacity
@@ -34,9 +44,8 @@ public class CallOptions extends BaseTaskOptions {
}
/**
* Set custom requirements for multiple resources.
* This method can be called multiple times. If the same resource is set multiple times,
* the latest quantity will be used.
* Set custom requirements for multiple resources. This method can be called multiple times. If
* the same resource is set multiple times, the latest quantity will be used.
*
* @param resources requirements for multiple resources.
* @return self
@@ -47,7 +56,7 @@ public class CallOptions extends BaseTaskOptions {
}
public CallOptions build() {
return new CallOptions(resources);
return new CallOptions(name, resources);
}
}
}
@@ -0,0 +1,19 @@
package io.ray.test;
import io.ray.api.Ray;
import org.testng.Assert;
import org.testng.annotations.Test;
/** Task Name Test. */
public class TaskNameTest extends BaseTest {
private static int testFoo() {
return 0;
}
/** Test setting task name at task submission time. */
@Test
public void testSetName() {
Assert.assertEquals(0, (int) Ray.task(TaskNameTest::testFoo).setName("foo").remote().get());
}
}
+16 -10
View File
@@ -343,6 +343,7 @@ def switch_worker_log_if_needed(worker, next_job_id):
cdef execute_task(
CTaskType task_type,
const c_string name,
const CRayFunction &ray_function,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
@@ -386,16 +387,18 @@ cdef execute_task(
extra_data = (b'{"name": ' + function_name.encode("ascii") +
b' "task_id": ' + task_id.hex().encode("ascii") + b'}')
task_name = name.decode("utf-8")
title = f"ray::{task_name}"
if <int>task_type == <int>TASK_TYPE_NORMAL_TASK:
title = "ray::{}()".format(function_name)
next_title = "ray::IDLE"
function_executor = execution_info.function
else:
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
title = "ray::{}.{}()".format(class_name, function_name)
next_title = "ray::{}".format(class_name)
worker_name = "ray_{}_{}".format(class_name, os.getpid())
next_title = f"ray::{class_name}"
pid = os.getpid()
worker_name = f"ray_{class_name}_{pid}"
if c_resources.find(b"memory") != c_resources.end():
worker.memory_monitor.set_heap_limit(
worker_name,
@@ -470,8 +473,7 @@ cdef execute_task(
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
actor_title = "{}({}, {})".format(
class_name, repr(args), repr(kwargs))
actor_title = f"{class_name}({args!r}, {kwargs!r})"
core_worker.set_actor_title(actor_title.encode("utf-8"))
# Execute the task.
with core_worker.profile_event(b"task:execute"):
@@ -535,6 +537,7 @@ cdef execute_task(
cdef CRayStatus task_execution_handler(
CTaskType task_type,
const c_string task_name,
const CRayFunction &ray_function,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
@@ -547,8 +550,9 @@ cdef CRayStatus task_execution_handler(
try:
# The call to execute_task should never raise an exception. If
# it does, that indicates that there was an internal error.
execute_task(task_type, ray_function, c_resources, c_args,
c_arg_reference_ids, c_return_ids, returns)
execute_task(task_type, task_name, ray_function, c_resources,
c_args, c_arg_reference_ids, c_return_ids,
returns)
except Exception:
traceback_str = traceback.format_exc() + (
"An unexpected internal error occurred while the worker "
@@ -985,6 +989,7 @@ cdef class CoreWorker:
Language language,
FunctionDescriptor function_descriptor,
args,
c_string name,
int num_returns,
resources,
int max_retries,
@@ -1002,7 +1007,7 @@ cdef class CoreWorker:
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(
num_returns, c_resources)
name, num_returns, c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args(self, language, args, &args_vector)
@@ -1112,6 +1117,7 @@ cdef class CoreWorker:
ActorID actor_id,
FunctionDescriptor function_descriptor,
args,
c_string name,
int num_returns,
double num_method_cpus):
@@ -1126,7 +1132,7 @@ cdef class CoreWorker:
with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
c_resources[b"CPU"] = num_method_cpus
task_options = CTaskOptions(num_returns, c_resources)
task_options = CTaskOptions(name, num_returns, c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args(self, language, args, &args_vector)
+29 -4
View File
@@ -100,7 +100,27 @@ class ActorMethod:
def remote(self, *args, **kwargs):
return self._remote(args, kwargs)
def _remote(self, args=None, kwargs=None, num_returns=None):
def options(self, **options):
"""Convenience method for executing an actor method call with options.
Same arguments as func._remote(), but returns a wrapped function
that a non-underscore .remote() can be called on.
Examples:
# The following two calls are equivalent.
>>> actor.my_method._remote(args=[x, y], name="foo", num_returns=2)
>>> actor.my_method.options(name="foo", num_returns=2).remote(x, y)
"""
func_cls = self
class FuncWrapper:
def remote(self, *args, **kwargs):
return func_cls._remote(args=args, kwargs=kwargs, **options)
return FuncWrapper()
def _remote(self, args=None, kwargs=None, name="", num_returns=None):
if num_returns is None:
num_returns = self._num_returns
@@ -112,6 +132,7 @@ class ActorMethod:
self._method_name,
args=args,
kwargs=kwargs,
name=name,
num_returns=num_returns)
# Apply the decorator if there is one.
@@ -317,8 +338,10 @@ class ActorClass:
max_task_retries, num_cpus, num_gpus, memory,
object_store_memory, resources):
for attribute in [
"remote", "_remote", "_ray_from_modified_class",
"_ray_from_function_descriptor"
"remote",
"_remote",
"_ray_from_modified_class",
"_ray_from_function_descriptor",
]:
if hasattr(modified_class, attribute):
logger.warning("Creating an actor from class "
@@ -679,6 +702,7 @@ class ActorHandle:
method_name,
args=None,
kwargs=None,
name="",
num_returns=None):
"""Method execution stub for an actor handle.
@@ -691,6 +715,7 @@ class ActorHandle:
method_name: The name of the actor method to execute.
args: A list of arguments for the actor method.
kwargs: A dictionary of keyword arguments for the actor method.
name (str): The name to give the actor method call task.
num_returns (int): The number of return values for the method.
Returns:
@@ -724,7 +749,7 @@ class ActorHandle:
object_refs = worker.core_worker.submit_actor_task(
self._ray_actor_language, self._ray_actor_id, function_descriptor,
list_args, num_returns, self._ray_actor_method_cpus)
list_args, name, num_returns, self._ray_actor_method_cpus)
if len(object_refs) == 1:
object_refs = object_refs[0]
+1 -1
View File
@@ -241,7 +241,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CTaskOptions "ray::TaskOptions":
CTaskOptions()
CTaskOptions(int num_returns,
CTaskOptions(c_string name, int num_returns,
unordered_map[c_string, double] &resources)
cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
+1
View File
@@ -217,6 +217,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_string stderr_file
(CRayStatus(
CTaskType task_type,
const c_string name,
const CRayFunction &ray_function,
const unordered_map[c_string, double] &resources,
const c_vector[shared_ptr[CRayObject]] &args,
+3 -2
View File
@@ -152,7 +152,8 @@ class RemoteFunction:
resources=None,
max_retries=None,
placement_group=None,
placement_group_bundle_index=-1):
placement_group_bundle_index=-1,
name=""):
"""Submit the remote function for execution."""
worker = ray.worker.global_worker
worker.check_connected()
@@ -212,7 +213,7 @@ class RemoteFunction:
"Cross language remote function " \
"cannot be executed locally."
object_refs = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args,
self._language, self._function_descriptor, list_args, name,
num_returns, resources, max_retries, placement_group.id,
placement_group_bundle_index)
+31
View File
@@ -14,6 +14,10 @@ import ray
import ray.test_utils
import ray.cluster_utils
# NOTE: We have to import setproctitle after ray because we bundle setproctitle
# with ray.
import setproctitle
def test_caching_actors(shutdown_only):
# Test defining actors before ray.init() has been called.
@@ -673,6 +677,33 @@ def test_multiple_return_values(ray_start_regular_shared):
assert ray.get([id3a, id3b, id3c]) == [1, 2, 3]
def test_options_num_returns(ray_start_regular_shared):
@ray.remote
class Foo:
def method(self):
return 1, 2
f = Foo.remote()
obj = f.method.remote()
assert ray.get(obj) == (1, 2)
obj1, obj2 = f.method.options(num_returns=2).remote()
assert ray.get([obj1, obj2]) == [1, 2]
def test_options_name(ray_start_regular_shared):
@ray.remote
class Foo:
def method(self, name):
assert setproctitle.getproctitle() == f"ray::{name}"
f = Foo.remote()
ray.get(f.method.options(name="foo").remote("foo"))
ray.get(f.method.options(name="bar").remote("bar"))
def test_define_actor(ray_start_regular_shared):
@ray.remote
class Test:
+28 -6
View File
@@ -35,7 +35,7 @@ def attempt_to_load_balance(remote_function,
[remote_function.remote(*args) for _ in range(total_tasks)])
names = set(locations)
counts = [locations.count(name) for name in names]
logger.info("Counts are {}.".format(counts))
logger.info(f"Counts are {counts}.")
if (len(names) == num_nodes
and all(count >= minimum_count for count in counts)):
break
@@ -346,6 +346,28 @@ def test_ray_setproctitle(ray_start_2_cpus):
ray.get(unique_1.remote())
def test_ray_task_name_setproctitle(ray_start_2_cpus):
method_task_name = "foo"
@ray.remote
class UniqueName:
def __init__(self):
assert setproctitle.getproctitle() == "ray::UniqueName.__init__()"
def f(self):
assert setproctitle.getproctitle() == f"ray::{method_task_name}"
task_name = "bar"
@ray.remote
def unique_1():
assert task_name in setproctitle.getproctitle()
actor = UniqueName.remote()
ray.get(actor.f.options(name=method_task_name).remote())
ray.get(unique_1.options(name=task_name).remote())
@pytest.mark.skipif(
os.getenv("TRAVIS") is None,
reason="This test should only be run on Travis.")
@@ -508,7 +530,7 @@ def test_invalid_unicode_in_worker_log(shutdown_only):
# Wait till first worker log file is created.
while True:
log_file_paths = glob.glob("{}/worker*.out".format(logs_dir))
log_file_paths = glob.glob(f"{logs_dir}/worker*.out")
if len(log_file_paths) == 0:
time.sleep(0.2)
else:
@@ -546,13 +568,13 @@ def test_move_log_files_to_old(shutdown_only):
# Make sure no log files are in the "old" directory before the actors
# are killed.
assert len(glob.glob("{}/old/worker*.out".format(logs_dir))) == 0
assert len(glob.glob(f"{logs_dir}/old/worker*.out")) == 0
# Now kill the actors so the files get moved to logs/old/.
[a.__ray_terminate__.remote() for a in actors]
while True:
log_file_paths = glob.glob("{}/old/worker*.out".format(logs_dir))
log_file_paths = glob.glob(f"{logs_dir}/old/worker*.out")
if len(log_file_paths) > 0:
with open(log_file_paths[0], "r") as f:
assert "function f finished\n" in f.readlines()
@@ -641,7 +663,7 @@ Blacklisted: No
"""
constraints_dict = resource_spec._constraints_from_gpu_info(info_string)
expected_dict = {
"{}V100".format(ray_constants.RESOURCE_CONSTRAINT_PREFIX): 1
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}V100": 1,
}
assert constraints_dict == expected_dict
@@ -658,7 +680,7 @@ Blacklisted: No
"""
constraints_dict = resource_spec._constraints_from_gpu_info(info_string)
expected_dict = {
"{}T4".format(ray_constants.RESOURCE_CONSTRAINT_PREFIX): 1
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}T4": 1,
}
assert constraints_dict == expected_dict
+29 -3
View File
@@ -51,7 +51,14 @@ class FunctionDescriptorInterface : public MessageWrapper<rpc::FunctionDescripto
virtual std::string ToString() const = 0;
// A one-word summary of the function call site (e.g., __main__.foo).
virtual std::string CallSiteString() const { return ToString(); }
virtual std::string CallSiteString() const { return CallString(); }
// The function or method call, e.g. "foo()" or "Bar.foo()". This does not include the
// module/library.
virtual std::string CallString() const = 0;
// The default name for a task that executes this function.
virtual std::string DefaultTaskName() const { return CallString() + "()"; }
template <typename Subtype>
Subtype *As() {
@@ -79,6 +86,8 @@ class EmptyFunctionDescriptor : public FunctionDescriptorInterface {
inline bool operator!=(const EmptyFunctionDescriptor &other) const { return false; }
virtual std::string ToString() const { return "{type=EmptyFunctionDescriptor}"; }
virtual std::string CallString() const { return ""; }
};
class JavaFunctionDescriptor : public FunctionDescriptorInterface {
@@ -120,6 +129,12 @@ class JavaFunctionDescriptor : public FunctionDescriptorInterface {
", signature=" + typed_message_->signature() + "}";
}
virtual std::string CallString() const {
const std::string &class_name = typed_message_->class_name();
const std::string &function_name = typed_message_->function_name();
return class_name.empty() ? function_name : class_name + "." + function_name;
}
const std::string &ClassName() const { return typed_message_->class_name(); }
const std::string &FunctionName() const { return typed_message_->function_name(); }
@@ -174,8 +189,13 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface {
}
virtual std::string CallSiteString() const {
return typed_message_->module_name() + "." + typed_message_->class_name() + "." +
typed_message_->function_name();
return typed_message_->module_name() + "." + CallString();
}
virtual std::string CallString() const {
const std::string &class_name = typed_message_->class_name();
const std::string &function_name = typed_message_->function_name();
return class_name.empty() ? function_name : class_name + "." + function_name;
}
const std::string &ModuleName() const { return typed_message_->module_name(); }
@@ -229,6 +249,12 @@ class CppFunctionDescriptor : public FunctionDescriptorInterface {
", exec_function_offset=" + typed_message_->exec_function_offset() + "}";
}
virtual std::string CallString() const {
return typed_message_->lib_name() + "+" + typed_message_->function_offset();
}
virtual std::string DefaultTaskName() const { return CallString(); }
const std::string &LibName() const { return typed_message_->lib_name(); }
const std::string &FunctionOffset() const { return typed_message_->function_offset(); }
+5 -2
View File
@@ -192,6 +192,8 @@ bool TaskSpecification::IsDriverTask() const {
return message_->type() == TaskType::DRIVER_TASK;
}
const std::string TaskSpecification::GetName() const { return message_->name(); }
Language TaskSpecification::GetLanguage() const { return message_->language(); }
bool TaskSpecification::IsNormalTask() const {
@@ -299,8 +301,9 @@ std::string TaskSpecification::DebugString() const {
// Print function descriptor.
stream << FunctionDescriptor()->ToString();
stream << ", task_id=" << TaskId() << ", job_id=" << JobId()
<< ", num_args=" << NumArgs() << ", num_returns=" << NumReturns();
stream << ", task_id=" << TaskId() << ", task_name=" << GetName()
<< ", job_id=" << JobId() << ", num_args=" << NumArgs()
<< ", num_returns=" << NumReturns();
if (IsActorCreationTask()) {
// Print actor creation task spec.
+3
View File
@@ -135,6 +135,9 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
Language GetLanguage() const;
// Returns the task's name.
const std::string GetName() const;
/// Whether this task is a normal task.
bool IsNormalTask() const;
+2 -1
View File
@@ -80,7 +80,7 @@ class TaskSpecBuilder {
///
/// \return Reference to the builder object itself.
TaskSpecBuilder &SetCommonTaskSpec(
const TaskID &task_id, const Language &language,
const TaskID &task_id, const std::string name, const Language &language,
const ray::FunctionDescriptor &function_descriptor, const JobID &job_id,
const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id,
const rpc::Address &caller_address, uint64_t num_returns,
@@ -88,6 +88,7 @@ class TaskSpecBuilder {
const std::unordered_map<std::string, double> &required_placement_resources,
const PlacementGroupID &placement_group_id) {
message_->set_type(TaskType::NORMAL_TASK);
message_->set_name(name);
message_->set_language(language);
*message_->mutable_function_descriptor() = function_descriptor->GetMessage();
message_->set_job_id(job_id.Binary());
+5 -2
View File
@@ -54,9 +54,12 @@ class RayFunction {
/// Options for all tasks (actor and non-actor) except for actor creation.
struct TaskOptions {
TaskOptions() {}
TaskOptions(int num_returns, std::unordered_map<std::string, double> &resources)
: num_returns(num_returns), resources(resources) {}
TaskOptions(std::string name, int num_returns,
std::unordered_map<std::string, double> &resources)
: name(name), num_returns(num_returns), resources(resources) {}
/// The name of this task.
std::string name;
/// Number of returns of this task.
int num_returns = 1;
/// Resources required by this task.
+28 -14
View File
@@ -32,15 +32,16 @@ const int kInternalHeartbeatMillis = 1000;
void BuildCommonTaskSpec(
ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
const TaskID &current_task_id, const int task_index, const TaskID &caller_id,
const ray::rpc::Address &address, const ray::RayFunction &function,
const std::string name, const TaskID &current_task_id, const int task_index,
const TaskID &caller_id, const ray::rpc::Address &address,
const ray::RayFunction &function,
const std::vector<std::unique_ptr<ray::TaskArg>> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
std::vector<ObjectID> *return_ids, const ray::PlacementGroupID &placement_group_id) {
// Build common task spec.
builder.SetCommonTaskSpec(
task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
current_task_id, task_index, caller_id, address, num_returns, required_resources,
required_placement_resources, placement_group_id);
// Set task arguments.
@@ -1268,8 +1269,11 @@ void CoreWorker::SubmitTask(const RayFunction &function,
auto constrained_resources = AddPlacementGroupConstraint(
task_options.resources, placement_options.first, placement_options.second);
const std::unordered_map<std::string, double> required_resources;
auto task_name = task_options.name.empty()
? function.GetFunctionDescriptor()->DefaultTaskName()
: task_options.name;
// TODO(ekl) offload task building onto a thread pool for performance
BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id,
BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, task_options.num_returns,
constrained_resources, required_resources, return_ids,
@@ -1310,16 +1314,21 @@ Status CoreWorker::CreateActor(const RayFunction &function,
auto new_resource = AddPlacementGroupConstraint(
actor_creation_options.resources, actor_creation_options.placement_options.first,
actor_creation_options.placement_options.second);
BuildCommonTaskSpec(builder, job_id, actor_creation_task_id,
const auto actor_name = actor_creation_options.name;
const auto task_name =
actor_name.empty()
? function.GetFunctionDescriptor()->DefaultTaskName()
: actor_name + ":" + function.GetFunctionDescriptor()->CallString();
BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, 1, new_resource,
new_placement_resources, &return_ids,
actor_creation_options.placement_options.first);
builder.SetActorCreationTaskSpec(
actor_id, actor_creation_options.max_restarts,
actor_creation_options.dynamic_worker_options,
actor_creation_options.max_concurrency, actor_creation_options.is_detached,
actor_creation_options.name, actor_creation_options.is_asyncio, extension_data);
builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_restarts,
actor_creation_options.dynamic_worker_options,
actor_creation_options.max_concurrency,
actor_creation_options.is_detached, actor_name,
actor_creation_options.is_asyncio, extension_data);
// Add the actor handle before we submit the actor creation task, since the
// actor handle must be in scope by the time the GCS sends the
@@ -1340,7 +1349,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
if (task_spec.IsDetachedActor()) {
// Since local mode doesn't pass GCS actor management code path,
// it just register actor names in memory.
local_mode_named_actor_registry_.emplace(actor_creation_options.name, actor_id);
local_mode_named_actor_registry_.emplace(actor_name, actor_id);
}
ExecuteTaskLocalMode(task_spec);
} else {
@@ -1412,7 +1421,10 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun
worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(),
next_task_index, actor_handle->GetActorID());
const std::unordered_map<std::string, double> required_resources;
BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id,
const auto task_name = task_options.name.empty()
? function.GetFunctionDescriptor()->DefaultTaskName()
: task_options.name;
BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, num_returns, task_options.resources,
required_resources, return_ids, PlacementGroupID::Nil());
@@ -1687,8 +1699,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
CoreWorkerProcess::SetCurrentThreadWorkerId(GetWorkerID());
status = options_.task_execution_callback(
task_type, func, task_spec.GetRequiredResources().GetResourceMap(), args,
arg_reference_ids, return_ids, return_objects);
task_type, task_spec.GetName(), func,
task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids,
return_ids, return_objects);
absl::optional<rpc::Address> caller_address(
options_.is_local_mode ? absl::optional<rpc::Address>()
@@ -2143,6 +2156,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &
stats->set_task_queue_length(task_queue_length_);
stats->set_num_executed_tasks(num_executed_tasks_);
stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope());
stats->set_current_task_name(current_task_.GetName());
stats->set_current_task_func_desc(current_task_.FunctionDescriptor()->ToString());
stats->set_ip_address(rpc_address_.ip_address());
stats->set_port(rpc_address_.port());
+1 -1
View File
@@ -57,7 +57,7 @@ struct CoreWorkerOptions {
// Callback that must be implemented and provided by the language-specific worker
// frontend to execute tasks and return their results.
using TaskExecutionCallback = std::function<Status(
TaskType task_type, const RayFunction &ray_function,
TaskType task_type, const std::string task_name, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
@@ -105,7 +105,8 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
RayConfig::instance().initialize(raylet_config);
auto task_execution_callback =
[](ray::TaskType task_type, const ray::RayFunction &ray_function,
[](ray::TaskType task_type, const std::string task_name,
const ray::RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
@@ -90,13 +90,18 @@ inline std::unordered_map<std::string, double> ToResources(JNIEnv *env,
inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptions) {
std::unordered_map<std::string, double> resources;
std::string name = "";
if (callOptions) {
jobject java_resources =
env->GetObjectField(callOptions, java_base_task_options_resources);
resources = ToResources(env, java_resources);
auto java_name = (jstring)env->GetObjectField(callOptions, java_call_options_name);
if (java_name) {
name = JavaStringToNativeString(env, java_name);
}
}
ray::TaskOptions task_options{numReturns, resources};
ray::TaskOptions task_options{name, numReturns, resources};
return task_options;
}
+7
View File
@@ -75,6 +75,9 @@ jfieldID java_function_arg_value;
jclass java_base_task_options_class;
jfieldID java_base_task_options_resources;
jclass java_call_options_class;
jfieldID java_call_options_name;
jclass java_actor_creation_options_class;
jfieldID java_actor_creation_options_global;
jfieldID java_actor_creation_options_name;
@@ -198,6 +201,10 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
java_base_task_options_resources =
env->GetFieldID(java_base_task_options_class, "resources", "Ljava/util/Map;");
java_call_options_class = LoadClass(env, "io/ray/api/options/CallOptions");
java_call_options_name =
env->GetFieldID(java_call_options_class, "name", "Ljava/lang/String;");
java_placement_group_class =
LoadClass(env, "io/ray/runtime/placementgroup/PlacementGroupImpl");
java_placement_group_id =
+5
View File
@@ -128,6 +128,11 @@ extern jclass java_base_task_options_class;
/// resources field of BaseTaskOptions class
extern jfieldID java_base_task_options_resources;
/// CallOptions class
extern jclass java_call_options_class;
/// name field of CallOptions class
extern jfieldID java_call_options_name;
/// ActorCreationOptions class
extern jclass java_actor_creation_options_class;
/// global field of ActorCreationOptions class
+8 -8
View File
@@ -226,7 +226,7 @@ bool CoreWorkerTest::WaitForDirectCallActorState(const ActorID &actor_id, bool w
int CoreWorkerTest::GetActorPid(const ActorID &actor_id,
std::unordered_map<std::string, double> &resources) {
std::vector<std::unique_ptr<TaskArg>> args;
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"GetWorkerPid", "", "", "")};
@@ -308,7 +308,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer2, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -350,7 +350,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer2, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -412,7 +412,7 @@ void CoreWorkerTest::TestActorRestart(
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer1, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -455,7 +455,7 @@ void CoreWorkerTest::TestActorFailure(
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer1, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -539,12 +539,12 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
RAY_LOG(INFO) << "start creating " << num_tasks << " PushTaskRequests";
rpc::Address address;
for (int i = 0; i < num_tasks; i++) {
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
auto num_returns = options.num_returns;
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(),
builder.SetCommonTaskSpec(RandomTaskId(), options.name, function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0,
RandomTaskId(), address, num_returns, resources, resources,
PlacementGroupID::Nil());
@@ -587,7 +587,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -324,9 +324,10 @@ TaskSpecification BuildTaskSpec(const std::unordered_map<std::string, double> &r
const ray::FunctionDescriptor &function_descriptor) {
TaskSpecBuilder builder;
rpc::Address empty_address;
builder.SetCommonTaskSpec(TaskID::Nil(), Language::PYTHON, function_descriptor,
JobID::Nil(), TaskID::Nil(), 0, TaskID::Nil(), empty_address,
1, resources, resources, PlacementGroupID::Nil());
builder.SetCommonTaskSpec(TaskID::Nil(), "dummy_task", Language::PYTHON,
function_descriptor, JobID::Nil(), TaskID::Nil(), 0,
TaskID::Nil(), empty_address, 1, resources, resources,
PlacementGroupID::Nil());
return builder.Build();
}
+4 -3
View File
@@ -50,8 +50,8 @@ class MockWorker {
"", // driver_name
"", // stdout_file
"", // stderr_file
std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6,
_7), // task_execution_callback
std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7,
_8), // task_execution_callback
nullptr, // check_signals
nullptr, // gc_collect
nullptr, // spill_objects
@@ -71,7 +71,8 @@ class MockWorker {
void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); }
private:
Status ExecuteTask(TaskType task_type, const RayFunction &ray_function,
Status ExecuteTask(TaskType task_type, const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
+4 -3
View File
@@ -38,9 +38,10 @@ struct Mocker {
auto actor_id = ActorID::Of(job_id, RandomTaskId(), 0);
auto task_id = TaskID::ForActorCreationTask(actor_id);
auto resource = std::unordered_map<std::string, double>();
builder.SetCommonTaskSpec(task_id, Language::PYTHON, empty_descriptor, job_id,
TaskID::Nil(), 0, TaskID::Nil(), owner_address, 1, resource,
resource, PlacementGroupID::Nil());
builder.SetCommonTaskSpec(task_id, name + ":" + empty_descriptor->CallString(),
Language::PYTHON, empty_descriptor, job_id, TaskID::Nil(),
0, TaskID::Nil(), owner_address, 1, resource, resource,
PlacementGroupID::Nil());
builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name);
return builder.Build();
}
+34 -30
View File
@@ -150,42 +150,44 @@ message RayException {
message TaskSpec {
// Type of this task.
TaskType type = 1;
// Name of this task.
string name = 2;
// Language of this task.
Language language = 2;
Language language = 3;
// Function descriptor of this task uniquely describe the function to execute.
FunctionDescriptor function_descriptor = 3;
FunctionDescriptor function_descriptor = 4;
// ID of the job that this task belongs to.
bytes job_id = 4;
bytes job_id = 5;
// Task ID of the task.
bytes task_id = 5;
bytes task_id = 6;
// Task ID of the parent task.
bytes parent_task_id = 6;
bytes parent_task_id = 7;
// A count of the number of tasks submitted by the parent task before this one.
uint64 parent_counter = 7;
uint64 parent_counter = 8;
// Task ID of the caller. This is the same as parent_task_id for non-actors.
// This is the actor ID (embedded in a nil task ID) for actors.
bytes caller_id = 8;
bytes caller_id = 9;
/// Address of the caller.
Address caller_address = 9;
Address caller_address = 10;
// Task arguments.
repeated TaskArg args = 10;
repeated TaskArg args = 11;
// Number of return objects.
uint64 num_returns = 11;
uint64 num_returns = 12;
// Quantities of the different resources required by this task.
map<string, double> required_resources = 12;
map<string, double> required_resources = 13;
// The resources required for placing this task on a node. If this is empty,
// then the placement resources are equal to the required_resources.
map<string, double> required_placement_resources = 13;
map<string, double> required_placement_resources = 14;
// Task specification for an actor creation task.
// This field is only valid when `type == ACTOR_CREATION_TASK`.
ActorCreationTaskSpec actor_creation_task_spec = 14;
ActorCreationTaskSpec actor_creation_task_spec = 15;
// Task specification for an actor task.
// This field is only valid when `type == ACTOR_TASK`.
ActorTaskSpec actor_task_spec = 15;
ActorTaskSpec actor_task_spec = 16;
// Number of times this task may be retried on worker failure.
int32 max_retries = 16;
int32 max_retries = 17;
// Placement group that is associated with this task.
bytes placement_group_id = 17;
bytes placement_group_id = 18;
}
message Bundle {
@@ -358,34 +360,36 @@ message CoreWorkerStats {
int32 num_pending_tasks = 2;
// Number of object refs in local scope.
int32 num_object_refs_in_scope = 3;
// Name of the currently executing task.
string current_task_name = 4;
// String representation of the function descriptor of the currently executing task.
string current_task_func_desc = 4;
string current_task_func_desc = 5;
// IP address of the core worker.
string ip_address = 6;
string ip_address = 7;
// Port of the core worker.
int64 port = 7;
int64 port = 8;
// Actor ID.
bytes actor_id = 8;
bytes actor_id = 9;
// A map from the resource name (e.g. "CPU") to its allocation.
map<string, ResourceAllocations> used_resources = 9;
map<string, ResourceAllocations> used_resources = 10;
// A string displayed on Dashboard.
map<string, string> webui_display = 10;
map<string, string> webui_display = 11;
// Number of objects that are IN_PLASMA_ERROR in the local memory store.
int32 num_in_plasma = 11;
int32 num_in_plasma = 12;
// Number of objects stored in local memory.
int32 num_local_objects = 12;
int32 num_local_objects = 13;
// Used local object store memory.
int64 used_object_store_memory = 13;
int64 used_object_store_memory = 14;
// Length of the task queue.
int32 task_queue_length = 14;
int32 task_queue_length = 15;
// Number of executed tasks.
int32 num_executed_tasks = 15;
int32 num_executed_tasks = 16;
// Actor constructor.
string actor_title = 16;
string actor_title = 17;
// Local reference table.
repeated ObjectRefInfo object_refs = 17;
repeated ObjectRefInfo object_refs = 18;
// Job ID.
bytes job_id = 18;
bytes job_id = 19;
}
message MetricPoint {
@@ -264,7 +264,7 @@ Task CreateTask(const std::unordered_map<std::string, double> &required_resource
TaskID id = RandomTaskId();
JobID job_id = RandomJobId();
rpc::Address address;
spec_builder.SetCommonTaskSpec(id, Language::PYTHON,
spec_builder.SetCommonTaskSpec(id, "dummy_task", Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""),
job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0,
required_resources, {}, PlacementGroupID::Nil());
@@ -489,4 +489,4 @@ int main(int argc, char **argv) {
}
} // namespace raylet
} // namespace ray
} // namespace ray
@@ -65,7 +65,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
uint64_t num_returns) {
TaskSpecBuilder builder;
rpc::Address address;
builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON,
builder.SetCommonTaskSpec(RandomTaskId(), "example_task", Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""),
JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address,
num_returns, {}, {}, PlacementGroupID::Nil());
+2 -1
View File
@@ -12,7 +12,8 @@ void Transport::SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids) {
std::unordered_map<std::string, double> resources;
TaskOptions options{return_num, resources};
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
TaskOptions options{name, return_num, resources};
char meta_data[3] = {'R', 'A', 'W'};
std::shared_ptr<LocalMemoryBuffer> meta =
+4 -3
View File
@@ -496,8 +496,8 @@ class StreamingWorker {
"", // driver_name
"", // stdout_file
"", // stderr_file
std::bind(&StreamingWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6,
_7), // task_execution_callback
std::bind(&StreamingWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7,
_8), // task_execution_callback
nullptr, // check_signals
nullptr, // gc_collect
nullptr, // spill_objects
@@ -521,7 +521,8 @@ class StreamingWorker {
}
private:
Status ExecuteTask(TaskType task_type, const RayFunction &ray_function,
Status ExecuteTask(TaskType task_type, const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
+3 -3
View File
@@ -87,7 +87,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
msg.ToBytes(), nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{0, resources};
TaskOptions options{"", 0, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")};
@@ -103,7 +103,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{0, resources};
TaskOptions options{"", 0, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", test, "execute_test", "")};
@@ -119,7 +119,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{1, resources};
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", "", "check_current_test_status", "")};