[Core] Add option to override environment variables for tasks and actors (#11619)

This commit is contained in:
architkulkarni
2020-10-29 12:22:44 -07:00
committed by GitHub
parent e82ff08b0c
commit 4175569d96
16 changed files with 286 additions and 50 deletions
+19 -11
View File
@@ -1001,28 +1001,32 @@ cdef class CoreWorker:
int max_retries,
PlacementGroupID placement_group_id,
int64_t placement_group_bundle_index,
c_bool placement_group_capture_child_tasks):
c_bool placement_group_capture_child_tasks,
override_environment_variables):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectID] return_ids
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
unordered_map[c_string, c_string] \
c_override_environment_variables = \
override_environment_variables
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(
name, num_returns, c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args(self, language, args, &args_vector)
with nogil:
CCoreWorkerProcess.GetCoreWorker().SubmitTask(
ray_function, args_vector, task_options, &return_ids,
max_retries, c_pair[CPlacementGroupID, int64_t](
ray_function, args_vector, CTaskOptions(
name, num_returns, c_resources,
c_override_environment_variables),
&return_ids, max_retries,
c_pair[CPlacementGroupID, int64_t](
c_placement_group_id, placement_group_bundle_index),
placement_group_capture_child_tasks)
@@ -1043,7 +1047,8 @@ cdef class CoreWorker:
PlacementGroupID placement_group_id,
int64_t placement_group_bundle_index,
c_bool placement_group_capture_child_tasks,
c_string extension_data
c_string extension_data,
override_environment_variables
):
cdef:
CRayFunction ray_function
@@ -1054,6 +1059,9 @@ cdef class CoreWorker:
CActorID c_actor_id
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
unordered_map[c_string, c_string] \
c_override_environment_variables = \
override_environment_variables
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
@@ -1072,7 +1080,8 @@ cdef class CoreWorker:
c_pair[CPlacementGroupID, int64_t](
c_placement_group_id,
placement_group_bundle_index),
placement_group_capture_child_tasks),
placement_group_capture_child_tasks,
c_override_environment_variables),
extension_data,
&c_actor_id))
@@ -1134,7 +1143,6 @@ cdef class CoreWorker:
cdef:
CActorID c_actor_id = actor_id.native()
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectID] return_ids
@@ -1142,7 +1150,6 @@ cdef class CoreWorker:
with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
c_resources[b"CPU"] = num_method_cpus
task_options = CTaskOptions(name, num_returns, c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args(self, language, args, &args_vector)
@@ -1151,7 +1158,8 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker().SubmitActorTask(
c_actor_id,
ray_function,
args_vector, task_options, &return_ids)
args_vector, CTaskOptions(name, num_returns, c_resources),
&return_ids)
return VectorToObjectRefs(return_ids)
+13 -4
View File
@@ -418,7 +418,8 @@ class ActorClass:
lifetime=None,
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None):
placement_group_capture_child_tasks=None,
override_environment_variables=None):
"""Configures and overrides the actor instantiation parameters.
The arguments are the same as those that can be passed
@@ -458,7 +459,9 @@ class ActorClass:
placement_group=placement_group,
placement_group_bundle_index=placement_group_bundle_index,
placement_group_capture_child_tasks=(
placement_group_capture_child_tasks))
placement_group_capture_child_tasks),
override_environment_variables=(
override_environment_variables))
return ActorOptionWrapper()
@@ -478,7 +481,8 @@ class ActorClass:
lifetime=None,
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None):
placement_group_capture_child_tasks=None,
override_environment_variables=None):
"""Create an actor.
This method allows more flexibility than the remote method because
@@ -515,6 +519,9 @@ class ActorClass:
placement_group_capture_child_tasks: Whether or not children tasks
of this actor should implicitly use the same placement group
as its parent. It is True by default.
override_environment_variables: Environment variables to override
and/or introduce for this actor. This is a dictionary mapping
variable names to their values.
Returns:
A handle to the newly created actor.
@@ -661,7 +668,9 @@ class ActorClass:
placement_group_bundle_index,
placement_group_capture_child_tasks,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))
extension_data=str(actor_method_cpu),
override_environment_variables=override_environment_variables
or dict())
actor_handle = ActorHandle(
meta.language,
+7 -1
View File
@@ -243,6 +243,10 @@ cdef extern from "ray/core_worker/common.h" nogil:
CTaskOptions()
CTaskOptions(c_string name, int num_returns,
unordered_map[c_string, double] &resources)
CTaskOptions(c_string name, int num_returns,
unordered_map[c_string, double] &resources,
const unordered_map[c_string, c_string]
&override_environment_variables)
cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
CActorCreationOptions()
@@ -255,7 +259,9 @@ cdef extern from "ray/core_worker/common.h" nogil:
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached, c_string &name, c_bool is_asyncio,
c_pair[CPlacementGroupID, int64_t] placement_options,
c_bool placement_group_capture_child_tasks)
c_bool placement_group_capture_child_tasks,
const unordered_map[c_string, c_string]
&override_environment_variables)
cdef cppclass CPlacementGroupCreationOptions \
"ray::PlacementGroupCreationOptions":
+17 -5
View File
@@ -138,10 +138,12 @@ class RemoteFunction:
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
override_environment_variables=None,
name=""):
"""Configures and overrides the task invocation parameters.
Options are overlapping values provided by :obj:`ray.remote`.
The arguments are the same as those that can be passed to
:obj:`ray.remote`.
Examples:
@@ -173,6 +175,8 @@ class RemoteFunction:
placement_group_bundle_index=placement_group_bundle_index,
placement_group_capture_child_tasks=(
placement_group_capture_child_tasks),
override_environment_variables=(
override_environment_variables),
name=name)
return FuncWrapper()
@@ -191,6 +195,7 @@ class RemoteFunction:
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
override_environment_variables=None,
name=""):
"""Submit the remote function for execution."""
worker = ray.worker.global_worker
@@ -260,11 +265,18 @@ class RemoteFunction:
"Cross language remote function " \
"cannot be executed locally."
object_refs = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args, name,
num_returns, resources, max_retries, placement_group.id,
self._language,
self._function_descriptor,
list_args,
name,
num_returns,
resources,
max_retries,
placement_group.id,
placement_group_bundle_index,
placement_group_capture_child_tasks)
placement_group_capture_child_tasks,
override_environment_variables=override_environment_variables
or dict())
if len(object_refs) == 1:
return object_refs[0]
elif len(object_refs) > 1:
+121
View File
@@ -790,6 +790,127 @@ def test_detect_docker_cpus():
cpuset_file_name=cpuset_file.name) == 0.42
def test_override_environment_variables_task(ray_start_regular):
@ray.remote
def get_env(key):
return os.environ.get(key)
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b"
}).remote("a")) == "b")
def test_override_environment_variables_actor(ray_start_regular):
@ray.remote
class EnvGetter:
def get(self, key):
return os.environ.get(key)
a = EnvGetter.options(override_environment_variables={
"a": "b",
"c": "d"
}).remote()
assert (ray.get(a.get.remote("a")) == "b")
assert (ray.get(a.get.remote("c")) == "d")
def test_override_environment_variables_nested_task(ray_start_regular):
@ray.remote
def get_env(key):
return os.environ.get(key)
@ray.remote
def get_env_wrapper(key):
return ray.get(get_env.remote(key))
assert (ray.get(
get_env_wrapper.options(override_environment_variables={
"a": "b"
}).remote("a")) == "b")
def test_override_environment_variables_multitenancy(shutdown_only):
ray.init(
job_config=ray.job_config.JobConfig(worker_env={
"foo1": "bar1",
"foo2": "bar2"
}))
@ray.remote
def get_env(key):
return os.environ.get(key)
assert ray.get(get_env.remote("foo1")) == "bar1"
assert ray.get(get_env.remote("foo2")) == "bar2"
assert ray.get(
get_env.options(override_environment_variables={
"foo1": "baz1"
}).remote("foo1")) == "baz1"
assert ray.get(
get_env.options(override_environment_variables={
"foo1": "baz1"
}).remote("foo2")) == "bar2"
def test_override_environment_variables_complex(shutdown_only):
ray.init(
job_config=ray.job_config.JobConfig(worker_env={
"a": "job_a",
"b": "job_b",
"z": "job_z"
}))
@ray.remote
def get_env(key):
return os.environ.get(key)
@ray.remote
class NestedEnvGetter:
def get(self, key):
return os.environ.get(key)
def get_task(self, key):
return ray.get(get_env.remote(key))
@ray.remote
class EnvGetter:
def get(self, key):
return os.environ.get(key)
def get_task(self, key):
return ray.get(get_env.remote(key))
def nested_get(self, key):
aa = NestedEnvGetter.options(override_environment_variables={
"c": "e",
"d": "dd"
}).remote()
return ray.get(aa.get.remote(key))
a = EnvGetter.options(override_environment_variables={
"a": "b",
"c": "d"
}).remote()
assert (ray.get(a.get.remote("a")) == "b")
assert (ray.get(a.get_task.remote("a")) == "b")
assert (ray.get(a.nested_get.remote("a")) == "b")
assert (ray.get(a.nested_get.remote("c")) == "e")
assert (ray.get(a.nested_get.remote("d")) == "dd")
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b"
}).remote("a")) == "b")
assert (ray.get(a.get.remote("z")) == "job_z")
assert (ray.get(a.get_task.remote("z")) == "job_z")
assert (ray.get(a.nested_get.remote("z")) == "job_z")
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b"
}).remote("z")) == "job_z")
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+6
View File
@@ -1857,6 +1857,12 @@ def remote(*args, **kwargs):
crashes unexpectedly. The minimum valid value is 0,
the default is 4 (default), and a value of -1 indicates
infinite retries.
override_environment_variables (Dict[str, str]): This specifies
environment variables to override for the actor or task. The
overrides are propagated to all child actors and tasks. This
is a dictionary mapping variable names to their values. Existing
variables can be overridden, new ones can be created, and an
existing variable can be unset by setting it to an empty string.
"""
worker = global_worker