diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 92125a580..d2016863c 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -67,8 +67,9 @@ WaitResult AbstractRayRuntime::Wait(const std::vector &ids, int num_ob ObjectID AbstractRayRuntime::Call(RemoteFunctionPtrHolder &fptr, std::shared_ptr args) { InvocationSpec invocationSpec; - invocationSpec.task_id = - TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + // TODO(Guyang Song): make it from different task + invocationSpec.task_id = TaskID::ForFakeTask(); + invocationSpec.name = ""; invocationSpec.actor_id = ActorID::Nil(); invocationSpec.args = args; invocationSpec.func_offset = @@ -87,8 +88,9 @@ ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, std::shared_ptr args) { InvocationSpec invocationSpec; - invocationSpec.task_id = - TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + // TODO(Guyang Song): make it from different task + invocationSpec.task_id = TaskID::ForFakeTask(); + invocationSpec.name = ""; invocationSpec.actor_id = actor; invocationSpec.args = args; invocationSpec.func_offset = diff --git a/cpp/src/ray/runtime/task/invocation_spec.h b/cpp/src/ray/runtime/task/invocation_spec.h index b131f2fd0..b436ef5fc 100644 --- a/cpp/src/ray/runtime/task/invocation_spec.h +++ b/cpp/src/ray/runtime/task/invocation_spec.h @@ -11,6 +11,7 @@ namespace api { class InvocationSpec { public: TaskID task_id; + std::string name; ActorID actor_id; int actor_counter; /// Remote function offset from base address. diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 6603dc1b6..8f2f28f2d 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -30,8 +30,10 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy std::unordered_map required_resources; std::unordered_map required_placement_resources; TaskSpecBuilder builder; - builder.SetCommonTaskSpec(invocation.task_id, rpc::Language::CPP, functionDescriptor, - local_mode_ray_tuntime_.GetCurrentJobID(), + std::string task_name = + invocation.name.empty() ? functionDescriptor->DefaultTaskName() : invocation.name; + builder.SetCommonTaskSpec(invocation.task_id, task_name, rpc::Language::CPP, + functionDescriptor, local_mode_ray_tuntime_.GetCurrentJobID(), local_mode_ray_tuntime_.GetCurrentTaskId(), 0, local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1, required_resources, required_placement_resources, diff --git a/cpp/src/ray/worker/default_worker.cc b/cpp/src/ray/worker/default_worker.cc index 2b841012e..81104509d 100644 --- a/cpp/src/ray/worker/default_worker.cc +++ b/cpp/src/ray/worker/default_worker.cc @@ -29,8 +29,8 @@ class DefaultWorker { "", // driver_name "", // stdout_file "", // stderr_file - std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, - _7), // task_execution_callback + std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, + _8), // task_execution_callback nullptr, // check_signals nullptr, // gc_collect nullptr, // spill_objects @@ -51,7 +51,8 @@ class DefaultWorker { void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); } private: - Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + Status ExecuteTask(TaskType task_type, const std::string task_name, + const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst index 52ce58bd2..3083fd9db 100644 --- a/doc/source/advanced.rst +++ b/doc/source/advanced.rst @@ -51,6 +51,26 @@ And vary the number of return values for tasks (and actor methods too): assert ray.get(id1) == 0 assert ray.get(id2) == 1 +And specify a name for tasks (and actor methods too) at task submission time: + +.. code-block:: python + + import setproctitle + + @ray.remote + def f(x): + assert setproctitle.getproctitle() == "ray::special_f" + return x + 1 + + obj = f.options(name="special_f").remote(3) + assert ray.get(obj) == 4 + +This name will appear as the task name in the machine view of the dashboard, will appear +as the worker process name when this task is executing (if a Python task), and will +appear as the task name in the logs. + +.. image:: images/task_name_dashboard.png + Dynamic Custom Resources ------------------------ diff --git a/doc/source/images/task_name_dashboard.png b/doc/source/images/task_name_dashboard.png new file mode 100644 index 000000000..4465e557b Binary files /dev/null and b/doc/source/images/task_name_dashboard.png differ diff --git a/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java b/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java index 0da239ca4..88c58e053 100644 --- a/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java +++ b/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java @@ -12,9 +12,20 @@ public class BaseTaskCaller> { private CallOptions.Builder builder = new CallOptions.Builder(); /** - * Set a custom resource requirement for resource {@code name}. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set a name for this task. + * + * @param name task name + * @return self + * @see CallOptions.Builder#setName(java.lang.String) + */ + public T setName(String name) { + builder.setName(name); + return self(); + } + + /** + * Set a custom resource requirement for resource {@code name}. This method can be called multiple + * times. If the same resource is set multiple times, the latest quantity will be used. * * @param name resource name * @param value resource capacity @@ -27,9 +38,8 @@ public class BaseTaskCaller> { } /** - * Set custom requirements for multiple resources. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set custom requirements for multiple resources. This method can be called multiple times. If + * the same resource is set multiple times, the latest quantity will be used. * * @param resources requirements for multiple resources. * @return self @@ -48,5 +58,4 @@ public class BaseTaskCaller> { protected CallOptions buildOptions() { return builder.build(); } - } diff --git a/java/api/src/main/java/io/ray/api/options/CallOptions.java b/java/api/src/main/java/io/ray/api/options/CallOptions.java index 7f086a267..37e474d55 100644 --- a/java/api/src/main/java/io/ray/api/options/CallOptions.java +++ b/java/api/src/main/java/io/ray/api/options/CallOptions.java @@ -3,26 +3,36 @@ package io.ray.api.options; import java.util.HashMap; import java.util.Map; -/** - * The options for RayCall. - */ +/** The options for RayCall. */ public class CallOptions extends BaseTaskOptions { - private CallOptions(Map resources) { + public final String name; + + private CallOptions(String name, Map resources) { super(resources); + this.name = name; } - /** - * This inner class for building CallOptions. - */ + /** This inner class for building CallOptions. */ public static class Builder { + private String name; private Map resources = new HashMap<>(); /** - * Set a custom resource requirement for resource {@code name}. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set a name for this task. + * + * @param name task name + * @return self + */ + public Builder setName(String name) { + this.name = name; + return this; + } + + /** + * Set a custom resource requirement for resource {@code name}. This method can be called + * multiple times. If the same resource is set multiple times, the latest quantity will be used. * * @param name resource name * @param value resource capacity @@ -34,9 +44,8 @@ public class CallOptions extends BaseTaskOptions { } /** - * Set custom requirements for multiple resources. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set custom requirements for multiple resources. This method can be called multiple times. If + * the same resource is set multiple times, the latest quantity will be used. * * @param resources requirements for multiple resources. * @return self @@ -47,7 +56,7 @@ public class CallOptions extends BaseTaskOptions { } public CallOptions build() { - return new CallOptions(resources); + return new CallOptions(name, resources); } } } diff --git a/java/test/src/main/java/io/ray/test/TaskNameTest.java b/java/test/src/main/java/io/ray/test/TaskNameTest.java new file mode 100644 index 000000000..502bb80d3 --- /dev/null +++ b/java/test/src/main/java/io/ray/test/TaskNameTest.java @@ -0,0 +1,19 @@ +package io.ray.test; + +import io.ray.api.Ray; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** Task Name Test. */ +public class TaskNameTest extends BaseTest { + + private static int testFoo() { + return 0; + } + + /** Test setting task name at task submission time. */ + @Test + public void testSetName() { + Assert.assertEquals(0, (int) Ray.task(TaskNameTest::testFoo).setName("foo").remote().get()); + } +} diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b5c4722d2..110e6f881 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -343,6 +343,7 @@ def switch_worker_log_if_needed(worker, next_job_id): cdef execute_task( CTaskType task_type, + const c_string name, const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, @@ -386,16 +387,18 @@ cdef execute_task( extra_data = (b'{"name": ' + function_name.encode("ascii") + b' "task_id": ' + task_id.hex().encode("ascii") + b'}') + task_name = name.decode("utf-8") + title = f"ray::{task_name}" + if task_type == TASK_TYPE_NORMAL_TASK: - title = "ray::{}()".format(function_name) next_title = "ray::IDLE" function_executor = execution_info.function else: actor = worker.actors[core_worker.get_actor_id()] class_name = actor.__class__.__name__ - title = "ray::{}.{}()".format(class_name, function_name) - next_title = "ray::{}".format(class_name) - worker_name = "ray_{}_{}".format(class_name, os.getpid()) + next_title = f"ray::{class_name}" + pid = os.getpid() + worker_name = f"ray_{class_name}_{pid}" if c_resources.find(b"memory") != c_resources.end(): worker.memory_monitor.set_heap_limit( worker_name, @@ -470,8 +473,7 @@ cdef execute_task( if (task_type == TASK_TYPE_ACTOR_CREATION_TASK): actor = worker.actors[core_worker.get_actor_id()] class_name = actor.__class__.__name__ - actor_title = "{}({}, {})".format( - class_name, repr(args), repr(kwargs)) + actor_title = f"{class_name}({args!r}, {kwargs!r})" core_worker.set_actor_title(actor_title.encode("utf-8")) # Execute the task. with core_worker.profile_event(b"task:execute"): @@ -535,6 +537,7 @@ cdef execute_task( cdef CRayStatus task_execution_handler( CTaskType task_type, + const c_string task_name, const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, @@ -547,8 +550,9 @@ cdef CRayStatus task_execution_handler( try: # The call to execute_task should never raise an exception. If # it does, that indicates that there was an internal error. - execute_task(task_type, ray_function, c_resources, c_args, - c_arg_reference_ids, c_return_ids, returns) + execute_task(task_type, task_name, ray_function, c_resources, + c_args, c_arg_reference_ids, c_return_ids, + returns) except Exception: traceback_str = traceback.format_exc() + ( "An unexpected internal error occurred while the worker " @@ -985,6 +989,7 @@ cdef class CoreWorker: Language language, FunctionDescriptor function_descriptor, args, + c_string name, int num_returns, resources, int max_retries, @@ -1002,7 +1007,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) task_options = CTaskOptions( - num_returns, c_resources) + name, num_returns, c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args(self, language, args, &args_vector) @@ -1112,6 +1117,7 @@ cdef class CoreWorker: ActorID actor_id, FunctionDescriptor function_descriptor, args, + c_string name, int num_returns, double num_method_cpus): @@ -1126,7 +1132,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): if num_method_cpus > 0: c_resources[b"CPU"] = num_method_cpus - task_options = CTaskOptions(num_returns, c_resources) + task_options = CTaskOptions(name, num_returns, c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args(self, language, args, &args_vector) diff --git a/python/ray/actor.py b/python/ray/actor.py index 7a60ec1ec..bc6959d7d 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -100,7 +100,27 @@ class ActorMethod: def remote(self, *args, **kwargs): return self._remote(args, kwargs) - def _remote(self, args=None, kwargs=None, num_returns=None): + def options(self, **options): + """Convenience method for executing an actor method call with options. + + Same arguments as func._remote(), but returns a wrapped function + that a non-underscore .remote() can be called on. + + Examples: + # The following two calls are equivalent. + >>> actor.my_method._remote(args=[x, y], name="foo", num_returns=2) + >>> actor.my_method.options(name="foo", num_returns=2).remote(x, y) + """ + + func_cls = self + + class FuncWrapper: + def remote(self, *args, **kwargs): + return func_cls._remote(args=args, kwargs=kwargs, **options) + + return FuncWrapper() + + def _remote(self, args=None, kwargs=None, name="", num_returns=None): if num_returns is None: num_returns = self._num_returns @@ -112,6 +132,7 @@ class ActorMethod: self._method_name, args=args, kwargs=kwargs, + name=name, num_returns=num_returns) # Apply the decorator if there is one. @@ -317,8 +338,10 @@ class ActorClass: max_task_retries, num_cpus, num_gpus, memory, object_store_memory, resources): for attribute in [ - "remote", "_remote", "_ray_from_modified_class", - "_ray_from_function_descriptor" + "remote", + "_remote", + "_ray_from_modified_class", + "_ray_from_function_descriptor", ]: if hasattr(modified_class, attribute): logger.warning("Creating an actor from class " @@ -679,6 +702,7 @@ class ActorHandle: method_name, args=None, kwargs=None, + name="", num_returns=None): """Method execution stub for an actor handle. @@ -691,6 +715,7 @@ class ActorHandle: method_name: The name of the actor method to execute. args: A list of arguments for the actor method. kwargs: A dictionary of keyword arguments for the actor method. + name (str): The name to give the actor method call task. num_returns (int): The number of return values for the method. Returns: @@ -724,7 +749,7 @@ class ActorHandle: object_refs = worker.core_worker.submit_actor_task( self._ray_actor_language, self._ray_actor_id, function_descriptor, - list_args, num_returns, self._ray_actor_method_cpus) + list_args, name, num_returns, self._ray_actor_method_cpus) if len(object_refs) == 1: object_refs = object_refs[0] diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index de489a9fe..d234c356c 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -241,7 +241,7 @@ cdef extern from "ray/core_worker/common.h" nogil: cdef cppclass CTaskOptions "ray::TaskOptions": CTaskOptions() - CTaskOptions(int num_returns, + CTaskOptions(c_string name, int num_returns, unordered_map[c_string, double] &resources) cdef cppclass CActorCreationOptions "ray::ActorCreationOptions": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index c2cb1f575..f7a0d14c5 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -217,6 +217,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_string stderr_file (CRayStatus( CTaskType task_type, + const c_string name, const CRayFunction &ray_function, const unordered_map[c_string, double] &resources, const c_vector[shared_ptr[CRayObject]] &args, diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 02a4735a2..7ddad8a0a 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -152,7 +152,8 @@ class RemoteFunction: resources=None, max_retries=None, placement_group=None, - placement_group_bundle_index=-1): + placement_group_bundle_index=-1, + name=""): """Submit the remote function for execution.""" worker = ray.worker.global_worker worker.check_connected() @@ -212,7 +213,7 @@ class RemoteFunction: "Cross language remote function " \ "cannot be executed locally." object_refs = worker.core_worker.submit_task( - self._language, self._function_descriptor, list_args, + self._language, self._function_descriptor, list_args, name, num_returns, resources, max_retries, placement_group.id, placement_group_bundle_index) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 6b7cc662b..5d885715d 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -14,6 +14,10 @@ import ray import ray.test_utils import ray.cluster_utils +# NOTE: We have to import setproctitle after ray because we bundle setproctitle +# with ray. +import setproctitle + def test_caching_actors(shutdown_only): # Test defining actors before ray.init() has been called. @@ -673,6 +677,33 @@ def test_multiple_return_values(ray_start_regular_shared): assert ray.get([id3a, id3b, id3c]) == [1, 2, 3] +def test_options_num_returns(ray_start_regular_shared): + @ray.remote + class Foo: + def method(self): + return 1, 2 + + f = Foo.remote() + + obj = f.method.remote() + assert ray.get(obj) == (1, 2) + + obj1, obj2 = f.method.options(num_returns=2).remote() + assert ray.get([obj1, obj2]) == [1, 2] + + +def test_options_name(ray_start_regular_shared): + @ray.remote + class Foo: + def method(self, name): + assert setproctitle.getproctitle() == f"ray::{name}" + + f = Foo.remote() + + ray.get(f.method.options(name="foo").remote("foo")) + ray.get(f.method.options(name="bar").remote("bar")) + + def test_define_actor(ray_start_regular_shared): @ray.remote class Test: diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index ffb6e0cea..14ef5b634 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -35,7 +35,7 @@ def attempt_to_load_balance(remote_function, [remote_function.remote(*args) for _ in range(total_tasks)]) names = set(locations) counts = [locations.count(name) for name in names] - logger.info("Counts are {}.".format(counts)) + logger.info(f"Counts are {counts}.") if (len(names) == num_nodes and all(count >= minimum_count for count in counts)): break @@ -346,6 +346,28 @@ def test_ray_setproctitle(ray_start_2_cpus): ray.get(unique_1.remote()) +def test_ray_task_name_setproctitle(ray_start_2_cpus): + method_task_name = "foo" + + @ray.remote + class UniqueName: + def __init__(self): + assert setproctitle.getproctitle() == "ray::UniqueName.__init__()" + + def f(self): + assert setproctitle.getproctitle() == f"ray::{method_task_name}" + + task_name = "bar" + + @ray.remote + def unique_1(): + assert task_name in setproctitle.getproctitle() + + actor = UniqueName.remote() + ray.get(actor.f.options(name=method_task_name).remote()) + ray.get(unique_1.options(name=task_name).remote()) + + @pytest.mark.skipif( os.getenv("TRAVIS") is None, reason="This test should only be run on Travis.") @@ -508,7 +530,7 @@ def test_invalid_unicode_in_worker_log(shutdown_only): # Wait till first worker log file is created. while True: - log_file_paths = glob.glob("{}/worker*.out".format(logs_dir)) + log_file_paths = glob.glob(f"{logs_dir}/worker*.out") if len(log_file_paths) == 0: time.sleep(0.2) else: @@ -546,13 +568,13 @@ def test_move_log_files_to_old(shutdown_only): # Make sure no log files are in the "old" directory before the actors # are killed. - assert len(glob.glob("{}/old/worker*.out".format(logs_dir))) == 0 + assert len(glob.glob(f"{logs_dir}/old/worker*.out")) == 0 # Now kill the actors so the files get moved to logs/old/. [a.__ray_terminate__.remote() for a in actors] while True: - log_file_paths = glob.glob("{}/old/worker*.out".format(logs_dir)) + log_file_paths = glob.glob(f"{logs_dir}/old/worker*.out") if len(log_file_paths) > 0: with open(log_file_paths[0], "r") as f: assert "function f finished\n" in f.readlines() @@ -641,7 +663,7 @@ Blacklisted: No """ constraints_dict = resource_spec._constraints_from_gpu_info(info_string) expected_dict = { - "{}V100".format(ray_constants.RESOURCE_CONSTRAINT_PREFIX): 1 + f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}V100": 1, } assert constraints_dict == expected_dict @@ -658,7 +680,7 @@ Blacklisted: No """ constraints_dict = resource_spec._constraints_from_gpu_info(info_string) expected_dict = { - "{}T4".format(ray_constants.RESOURCE_CONSTRAINT_PREFIX): 1 + f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}T4": 1, } assert constraints_dict == expected_dict diff --git a/src/ray/common/function_descriptor.h b/src/ray/common/function_descriptor.h index 17abcdc2c..149a4063d 100644 --- a/src/ray/common/function_descriptor.h +++ b/src/ray/common/function_descriptor.h @@ -51,7 +51,14 @@ class FunctionDescriptorInterface : public MessageWrapper Subtype *As() { @@ -79,6 +86,8 @@ class EmptyFunctionDescriptor : public FunctionDescriptorInterface { inline bool operator!=(const EmptyFunctionDescriptor &other) const { return false; } virtual std::string ToString() const { return "{type=EmptyFunctionDescriptor}"; } + + virtual std::string CallString() const { return ""; } }; class JavaFunctionDescriptor : public FunctionDescriptorInterface { @@ -120,6 +129,12 @@ class JavaFunctionDescriptor : public FunctionDescriptorInterface { ", signature=" + typed_message_->signature() + "}"; } + virtual std::string CallString() const { + const std::string &class_name = typed_message_->class_name(); + const std::string &function_name = typed_message_->function_name(); + return class_name.empty() ? function_name : class_name + "." + function_name; + } + const std::string &ClassName() const { return typed_message_->class_name(); } const std::string &FunctionName() const { return typed_message_->function_name(); } @@ -174,8 +189,13 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface { } virtual std::string CallSiteString() const { - return typed_message_->module_name() + "." + typed_message_->class_name() + "." + - typed_message_->function_name(); + return typed_message_->module_name() + "." + CallString(); + } + + virtual std::string CallString() const { + const std::string &class_name = typed_message_->class_name(); + const std::string &function_name = typed_message_->function_name(); + return class_name.empty() ? function_name : class_name + "." + function_name; } const std::string &ModuleName() const { return typed_message_->module_name(); } @@ -229,6 +249,12 @@ class CppFunctionDescriptor : public FunctionDescriptorInterface { ", exec_function_offset=" + typed_message_->exec_function_offset() + "}"; } + virtual std::string CallString() const { + return typed_message_->lib_name() + "+" + typed_message_->function_offset(); + } + + virtual std::string DefaultTaskName() const { return CallString(); } + const std::string &LibName() const { return typed_message_->lib_name(); } const std::string &FunctionOffset() const { return typed_message_->function_offset(); } diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index e9fa7ab4e..8f880e077 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -192,6 +192,8 @@ bool TaskSpecification::IsDriverTask() const { return message_->type() == TaskType::DRIVER_TASK; } +const std::string TaskSpecification::GetName() const { return message_->name(); } + Language TaskSpecification::GetLanguage() const { return message_->language(); } bool TaskSpecification::IsNormalTask() const { @@ -299,8 +301,9 @@ std::string TaskSpecification::DebugString() const { // Print function descriptor. stream << FunctionDescriptor()->ToString(); - stream << ", task_id=" << TaskId() << ", job_id=" << JobId() - << ", num_args=" << NumArgs() << ", num_returns=" << NumReturns(); + stream << ", task_id=" << TaskId() << ", task_name=" << GetName() + << ", job_id=" << JobId() << ", num_args=" << NumArgs() + << ", num_returns=" << NumReturns(); if (IsActorCreationTask()) { // Print actor creation task spec. diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index e300c7624..d079627ee 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -135,6 +135,9 @@ class TaskSpecification : public MessageWrapper { Language GetLanguage() const; + // Returns the task's name. + const std::string GetName() const; + /// Whether this task is a normal task. bool IsNormalTask() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index c0670fd2b..195e5c8dd 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -80,7 +80,7 @@ class TaskSpecBuilder { /// /// \return Reference to the builder object itself. TaskSpecBuilder &SetCommonTaskSpec( - const TaskID &task_id, const Language &language, + const TaskID &task_id, const std::string name, const Language &language, const ray::FunctionDescriptor &function_descriptor, const JobID &job_id, const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id, const rpc::Address &caller_address, uint64_t num_returns, @@ -88,6 +88,7 @@ class TaskSpecBuilder { const std::unordered_map &required_placement_resources, const PlacementGroupID &placement_group_id) { message_->set_type(TaskType::NORMAL_TASK); + message_->set_name(name); message_->set_language(language); *message_->mutable_function_descriptor() = function_descriptor->GetMessage(); message_->set_job_id(job_id.Binary()); diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 2cd161552..f2b1765c6 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -54,9 +54,12 @@ class RayFunction { /// Options for all tasks (actor and non-actor) except for actor creation. struct TaskOptions { TaskOptions() {} - TaskOptions(int num_returns, std::unordered_map &resources) - : num_returns(num_returns), resources(resources) {} + TaskOptions(std::string name, int num_returns, + std::unordered_map &resources) + : name(name), num_returns(num_returns), resources(resources) {} + /// The name of this task. + std::string name; /// Number of returns of this task. int num_returns = 1; /// Resources required by this task. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index abd3d7712..9315e07ea 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -32,15 +32,16 @@ const int kInternalHeartbeatMillis = 1000; void BuildCommonTaskSpec( ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, - const TaskID ¤t_task_id, const int task_index, const TaskID &caller_id, - const ray::rpc::Address &address, const ray::RayFunction &function, + const std::string name, const TaskID ¤t_task_id, const int task_index, + const TaskID &caller_id, const ray::rpc::Address &address, + const ray::RayFunction &function, const std::vector> &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, std::vector *return_ids, const ray::PlacementGroupID &placement_group_id) { // Build common task spec. builder.SetCommonTaskSpec( - task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, + task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, current_task_id, task_index, caller_id, address, num_returns, required_resources, required_placement_resources, placement_group_id); // Set task arguments. @@ -1268,8 +1269,11 @@ void CoreWorker::SubmitTask(const RayFunction &function, auto constrained_resources = AddPlacementGroupConstraint( task_options.resources, placement_options.first, placement_options.second); const std::unordered_map required_resources; + auto task_name = task_options.name.empty() + ? function.GetFunctionDescriptor()->DefaultTaskName() + : task_options.name; // TODO(ekl) offload task building onto a thread pool for performance - BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, + BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, task_options.num_returns, constrained_resources, required_resources, return_ids, @@ -1310,16 +1314,21 @@ Status CoreWorker::CreateActor(const RayFunction &function, auto new_resource = AddPlacementGroupConstraint( actor_creation_options.resources, actor_creation_options.placement_options.first, actor_creation_options.placement_options.second); - BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, + const auto actor_name = actor_creation_options.name; + const auto task_name = + actor_name.empty() + ? function.GetFunctionDescriptor()->DefaultTaskName() + : actor_name + ":" + function.GetFunctionDescriptor()->CallString(); + BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, task_name, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, new_resource, new_placement_resources, &return_ids, actor_creation_options.placement_options.first); - builder.SetActorCreationTaskSpec( - actor_id, actor_creation_options.max_restarts, - actor_creation_options.dynamic_worker_options, - actor_creation_options.max_concurrency, actor_creation_options.is_detached, - actor_creation_options.name, actor_creation_options.is_asyncio, extension_data); + builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_restarts, + actor_creation_options.dynamic_worker_options, + actor_creation_options.max_concurrency, + actor_creation_options.is_detached, actor_name, + actor_creation_options.is_asyncio, extension_data); // Add the actor handle before we submit the actor creation task, since the // actor handle must be in scope by the time the GCS sends the @@ -1340,7 +1349,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, if (task_spec.IsDetachedActor()) { // Since local mode doesn't pass GCS actor management code path, // it just register actor names in memory. - local_mode_named_actor_registry_.emplace(actor_creation_options.name, actor_id); + local_mode_named_actor_registry_.emplace(actor_name, actor_id); } ExecuteTaskLocalMode(task_spec); } else { @@ -1412,7 +1421,10 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), next_task_index, actor_handle->GetActorID()); const std::unordered_map required_resources; - BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, + const auto task_name = task_options.name.empty() + ? function.GetFunctionDescriptor()->DefaultTaskName() + : task_options.name; + BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, task_name, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, num_returns, task_options.resources, required_resources, return_ids, PlacementGroupID::Nil()); @@ -1687,8 +1699,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, CoreWorkerProcess::SetCurrentThreadWorkerId(GetWorkerID()); status = options_.task_execution_callback( - task_type, func, task_spec.GetRequiredResources().GetResourceMap(), args, - arg_reference_ids, return_ids, return_objects); + task_type, task_spec.GetName(), func, + task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids, + return_ids, return_objects); absl::optional caller_address( options_.is_local_mode ? absl::optional() @@ -2143,6 +2156,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & stats->set_task_queue_length(task_queue_length_); stats->set_num_executed_tasks(num_executed_tasks_); stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope()); + stats->set_current_task_name(current_task_.GetName()); stats->set_current_task_func_desc(current_task_.FunctionDescriptor()->ToString()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 78e786930..5d4c68134 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -57,7 +57,7 @@ struct CoreWorkerOptions { // Callback that must be implemented and provided by the language-specific worker // frontend to execute tasks and return their results. using TaskExecutionCallback = std::function &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index e1acf26f5..7f510b2c1 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -105,7 +105,8 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( RayConfig::instance().initialize(raylet_config); auto task_execution_callback = - [](ray::TaskType task_type, const ray::RayFunction &ray_function, + [](ray::TaskType task_type, const std::string task_name, + const ray::RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 0541b0a4b..118340e39 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -90,13 +90,18 @@ inline std::unordered_map ToResources(JNIEnv *env, inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptions) { std::unordered_map resources; + std::string name = ""; if (callOptions) { jobject java_resources = env->GetObjectField(callOptions, java_base_task_options_resources); resources = ToResources(env, java_resources); + auto java_name = (jstring)env->GetObjectField(callOptions, java_call_options_name); + if (java_name) { + name = JavaStringToNativeString(env, java_name); + } } - ray::TaskOptions task_options{numReturns, resources}; + ray::TaskOptions task_options{name, numReturns, resources}; return task_options; } diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index 1213de4f6..ed1860b15 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -75,6 +75,9 @@ jfieldID java_function_arg_value; jclass java_base_task_options_class; jfieldID java_base_task_options_resources; +jclass java_call_options_class; +jfieldID java_call_options_name; + jclass java_actor_creation_options_class; jfieldID java_actor_creation_options_global; jfieldID java_actor_creation_options_name; @@ -198,6 +201,10 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_base_task_options_resources = env->GetFieldID(java_base_task_options_class, "resources", "Ljava/util/Map;"); + java_call_options_class = LoadClass(env, "io/ray/api/options/CallOptions"); + java_call_options_name = + env->GetFieldID(java_call_options_class, "name", "Ljava/lang/String;"); + java_placement_group_class = LoadClass(env, "io/ray/runtime/placementgroup/PlacementGroupImpl"); java_placement_group_id = diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 37b7b0544..a5c14d76f 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -128,6 +128,11 @@ extern jclass java_base_task_options_class; /// resources field of BaseTaskOptions class extern jfieldID java_base_task_options_resources; +/// CallOptions class +extern jclass java_call_options_class; +/// name field of CallOptions class +extern jfieldID java_call_options_name; + /// ActorCreationOptions class extern jclass java_actor_creation_options_class; /// global field of ActorCreationOptions class diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index fdae29a66..b03218270 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -226,7 +226,7 @@ bool CoreWorkerTest::WaitForDirectCallActorState(const ActorID &actor_id, bool w int CoreWorkerTest::GetActorPid(const ActorID &actor_id, std::unordered_map &resources) { std::vector> args; - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "GetWorkerPid", "", "", "")}; @@ -308,7 +308,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso args.emplace_back(new TaskArgByValue( std::make_shared(buffer2, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -350,7 +350,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso args.emplace_back(new TaskArgByValue( std::make_shared(buffer2, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -412,7 +412,7 @@ void CoreWorkerTest::TestActorRestart( args.emplace_back(new TaskArgByValue( std::make_shared(buffer1, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -455,7 +455,7 @@ void CoreWorkerTest::TestActorFailure( args.emplace_back(new TaskArgByValue( std::make_shared(buffer1, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -539,12 +539,12 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { RAY_LOG(INFO) << "start creating " << num_tasks << " PushTaskRequests"; rpc::Address address; for (int i = 0; i < num_tasks; i++) { - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; auto num_returns = options.num_returns; TaskSpecBuilder builder; - builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(), + builder.SetCommonTaskSpec(RandomTaskId(), options.name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0, RandomTaskId(), address, num_returns, resources, resources, PlacementGroupID::Nil()); @@ -587,7 +587,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index ff21188ad..83ab5cdd7 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -324,9 +324,10 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r const ray::FunctionDescriptor &function_descriptor) { TaskSpecBuilder builder; rpc::Address empty_address; - builder.SetCommonTaskSpec(TaskID::Nil(), Language::PYTHON, function_descriptor, - JobID::Nil(), TaskID::Nil(), 0, TaskID::Nil(), empty_address, - 1, resources, resources, PlacementGroupID::Nil()); + builder.SetCommonTaskSpec(TaskID::Nil(), "dummy_task", Language::PYTHON, + function_descriptor, JobID::Nil(), TaskID::Nil(), 0, + TaskID::Nil(), empty_address, 1, resources, resources, + PlacementGroupID::Nil()); return builder.Build(); } diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index d705d97e5..29b6a26f0 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -50,8 +50,8 @@ class MockWorker { "", // driver_name "", // stdout_file "", // stderr_file - std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, - _7), // task_execution_callback + std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, + _8), // task_execution_callback nullptr, // check_signals nullptr, // gc_collect nullptr, // spill_objects @@ -71,7 +71,8 @@ class MockWorker { void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); } private: - Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + Status ExecuteTask(TaskType task_type, const std::string task_name, + const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 3c63e9fb6..fea0df303 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -38,9 +38,10 @@ struct Mocker { auto actor_id = ActorID::Of(job_id, RandomTaskId(), 0); auto task_id = TaskID::ForActorCreationTask(actor_id); auto resource = std::unordered_map(); - builder.SetCommonTaskSpec(task_id, Language::PYTHON, empty_descriptor, job_id, - TaskID::Nil(), 0, TaskID::Nil(), owner_address, 1, resource, - resource, PlacementGroupID::Nil()); + builder.SetCommonTaskSpec(task_id, name + ":" + empty_descriptor->CallString(), + Language::PYTHON, empty_descriptor, job_id, TaskID::Nil(), + 0, TaskID::Nil(), owner_address, 1, resource, resource, + PlacementGroupID::Nil()); builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name); return builder.Build(); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index fa2cf15a6..e879bc424 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -150,42 +150,44 @@ message RayException { message TaskSpec { // Type of this task. TaskType type = 1; + // Name of this task. + string name = 2; // Language of this task. - Language language = 2; + Language language = 3; // Function descriptor of this task uniquely describe the function to execute. - FunctionDescriptor function_descriptor = 3; + FunctionDescriptor function_descriptor = 4; // ID of the job that this task belongs to. - bytes job_id = 4; + bytes job_id = 5; // Task ID of the task. - bytes task_id = 5; + bytes task_id = 6; // Task ID of the parent task. - bytes parent_task_id = 6; + bytes parent_task_id = 7; // A count of the number of tasks submitted by the parent task before this one. - uint64 parent_counter = 7; + uint64 parent_counter = 8; // Task ID of the caller. This is the same as parent_task_id for non-actors. // This is the actor ID (embedded in a nil task ID) for actors. - bytes caller_id = 8; + bytes caller_id = 9; /// Address of the caller. - Address caller_address = 9; + Address caller_address = 10; // Task arguments. - repeated TaskArg args = 10; + repeated TaskArg args = 11; // Number of return objects. - uint64 num_returns = 11; + uint64 num_returns = 12; // Quantities of the different resources required by this task. - map required_resources = 12; + map required_resources = 13; // The resources required for placing this task on a node. If this is empty, // then the placement resources are equal to the required_resources. - map required_placement_resources = 13; + map required_placement_resources = 14; // Task specification for an actor creation task. // This field is only valid when `type == ACTOR_CREATION_TASK`. - ActorCreationTaskSpec actor_creation_task_spec = 14; + ActorCreationTaskSpec actor_creation_task_spec = 15; // Task specification for an actor task. // This field is only valid when `type == ACTOR_TASK`. - ActorTaskSpec actor_task_spec = 15; + ActorTaskSpec actor_task_spec = 16; // Number of times this task may be retried on worker failure. - int32 max_retries = 16; + int32 max_retries = 17; // Placement group that is associated with this task. - bytes placement_group_id = 17; + bytes placement_group_id = 18; } message Bundle { @@ -358,34 +360,36 @@ message CoreWorkerStats { int32 num_pending_tasks = 2; // Number of object refs in local scope. int32 num_object_refs_in_scope = 3; + // Name of the currently executing task. + string current_task_name = 4; // String representation of the function descriptor of the currently executing task. - string current_task_func_desc = 4; + string current_task_func_desc = 5; // IP address of the core worker. - string ip_address = 6; + string ip_address = 7; // Port of the core worker. - int64 port = 7; + int64 port = 8; // Actor ID. - bytes actor_id = 8; + bytes actor_id = 9; // A map from the resource name (e.g. "CPU") to its allocation. - map used_resources = 9; + map used_resources = 10; // A string displayed on Dashboard. - map webui_display = 10; + map webui_display = 11; // Number of objects that are IN_PLASMA_ERROR in the local memory store. - int32 num_in_plasma = 11; + int32 num_in_plasma = 12; // Number of objects stored in local memory. - int32 num_local_objects = 12; + int32 num_local_objects = 13; // Used local object store memory. - int64 used_object_store_memory = 13; + int64 used_object_store_memory = 14; // Length of the task queue. - int32 task_queue_length = 14; + int32 task_queue_length = 15; // Number of executed tasks. - int32 num_executed_tasks = 15; + int32 num_executed_tasks = 16; // Actor constructor. - string actor_title = 16; + string actor_title = 17; // Local reference table. - repeated ObjectRefInfo object_refs = 17; + repeated ObjectRefInfo object_refs = 18; // Job ID. - bytes job_id = 18; + bytes job_id = 19; } message MetricPoint { diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 57d329ca1..d09fe612b 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -264,7 +264,7 @@ Task CreateTask(const std::unordered_map &required_resource TaskID id = RandomTaskId(); JobID job_id = RandomJobId(); rpc::Address address; - spec_builder.SetCommonTaskSpec(id, Language::PYTHON, + spec_builder.SetCommonTaskSpec(id, "dummy_task", Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, required_resources, {}, PlacementGroupID::Nil()); @@ -489,4 +489,4 @@ int main(int argc, char **argv) { } } // namespace raylet -} // namespace ray \ No newline at end of file +} // namespace ray diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 27129ae50..52378e24d 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -65,7 +65,7 @@ static inline Task ExampleTask(const std::vector &arguments, uint64_t num_returns) { TaskSpecBuilder builder; rpc::Address address; - builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, + builder.SetCommonTaskSpec(RandomTaskId(), "example_task", Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address, num_returns, {}, {}, PlacementGroupID::Nil()); diff --git a/streaming/src/queue/transport.cc b/streaming/src/queue/transport.cc index 01841b324..cd30955fa 100644 --- a/streaming/src/queue/transport.cc +++ b/streaming/src/queue/transport.cc @@ -12,7 +12,8 @@ void Transport::SendInternal(std::shared_ptr buffer, RayFunction &function, int return_num, std::vector &return_ids) { std::unordered_map resources; - TaskOptions options{return_num, resources}; + std::string name = function.GetFunctionDescriptor()->DefaultTaskName(); + TaskOptions options{name, return_num, resources}; char meta_data[3] = {'R', 'A', 'W'}; std::shared_ptr meta = diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index 0c15d72c6..9bbcad803 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -496,8 +496,8 @@ class StreamingWorker { "", // driver_name "", // stdout_file "", // stderr_file - std::bind(&StreamingWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, - _7), // task_execution_callback + std::bind(&StreamingWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, + _8), // task_execution_callback nullptr, // check_signals nullptr, // gc_collect nullptr, // spill_objects @@ -521,7 +521,8 @@ class StreamingWorker { } private: - Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + Status ExecuteTask(TaskType task_type, const std::string task_name, + const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index f57c31e10..25c4c0061 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -87,7 +87,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue(std::make_shared( msg.ToBytes(), nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{0, resources}; + TaskOptions options{"", 0, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")}; @@ -103,7 +103,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{0, resources}; + TaskOptions options{"", 0, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", test, "execute_test", "")}; @@ -119,7 +119,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", "", "check_current_test_status", "")};