diff --git a/doc/source/cluster/index.rst b/doc/source/cluster/index.rst index 529c4993d..c95eca1cb 100644 --- a/doc/source/cluster/index.rst +++ b/doc/source/cluster/index.rst @@ -21,26 +21,34 @@ Clusters are started with the :ref:`Ray Cluster Launcher You can also create a Ray cluster using a standard cluster manager such as :ref:`Kubernetes `, :ref:`YARN `, or :ref:`SLURM `. -After a cluster is started, you need to connect your program to the Ray cluster. +After a cluster is started, you need to connect your program to the Ray cluster by starting a driver process on the same node as where you ran ``ray start``: .. tabs:: - .. group-tab:: python + .. code-tab:: python - You can connect to this Ray runtime by starting a Python process that calls the following on the same node as where you ran ``ray start``: - - .. code-block:: python - - # This must - import ray - ray.init(address='auto') + # This must + import ray + ray.init(address='auto') .. group-tab:: java - If you want to run Java code, you need to specify the classpath via the ``--code-search-path`` option. See :ref:`code_search_path` for more details. + .. code-block:: java + + import io.ray.api.Ray; + + public class MyRayApp { + + public static void main(String[] args) { + Ray.init(); + ... + } + } .. code-block:: bash - $ ray start ... --code-search-path=/path/to/jars + java -classpath \ + -Dray.address=
\ + and then the rest of your script should be able to leverage Ray as a distributed framework! @@ -74,8 +82,6 @@ The most preferable way to run a Ray cluster is via the :ref:`Ray Cluster Launch This section assumes that you have a list of machines and that the nodes in the cluster can communicate with each other. It also assumes that Ray is installed on each machine. To install Ray, follow the `installation instructions`_. -To configure the Ray cluster to run Java code, you need to add the ``--code-search-path`` option. See :ref:`code_search_path` for more details. - .. _`installation instructions`: http://docs.ray.io/en/master/installation.html Starting Ray on each machine @@ -199,7 +205,7 @@ To run a distributed Ray program, you'll need to execute your program on the sam .. code-block:: bash - java -classpath /path/to/jars/ \ + java -classpath \ -Dray.address=
\ diff --git a/doc/source/configure.rst b/doc/source/configure.rst index b1cadc2fa..5683bf269 100644 --- a/doc/source/configure.rst +++ b/doc/source/configure.rst @@ -245,24 +245,32 @@ Java Applications Code Search Path ~~~~~~~~~~~~~~~~ -If you want to run a Java application in cluster mode, you must first run ``ray start`` to start the Ray cluster. In addition to any ``ray start`` parameters mentioned above, you must add ``--code-search-path`` to tell Ray where to load jars when starting Java workers. Your jar files must be distributed to all nodes of the Ray cluster before running your code, and this parameter must be set on both the head node and non-head nodes. +If you want to run a Java application in a multi-node cluster, you must specify the code search path in your driver. The code search path is to tell Ray where to load jars when starting Java workers. Your jar files must be distributed to the same path(s) on all nodes of the Ray cluster before running your code. .. code-block:: bash - $ ray start ... --code-search-path=/path/to/jars + $ java -classpath \ + -Dray.address=
\ + -Dray.job.code-search-path=/path/to/jars/ \ + -The ``/path/to/jars`` here points to a directory which contains jars. All jars in the directory will be loaded by workers. You can also provide multiple directories for this parameter. +The ``/path/to/jars/`` here points to a directory which contains jars. All jars in the directory will be loaded by workers. You can also provide multiple directories for this parameter. .. code-block:: bash - $ ray start ... --code-search-path=/path/to/jars1:/path/to/jars2:/path/to/pys1:/path/to/pys2 + $ java -classpath \ + -Dray.address=
\ + -Dray.job.code-search-path=/path/to/jars1:/path/to/jars2:/path/to/pys1:/path/to/pys2 \ + -Code search path is also used for loading Python code if it's specified. This is required for :ref:`cross_language`. If code search path is specified, you can only run Python remote functions which can be found in the code search path. +You don't need to configure code search path if you run a Java application in a single-node cluster. -You don't need to configure code search path if you run a Java application in single machine mode. +See ``ray.job.code-search-path`` under :ref:`Driver Options ` for more information. .. note:: Currently we don't provide a way to configure Ray when running a Java application in single machine mode. If you need to configure Ray, run ``ray start`` to start the Ray cluster first. +.. _java-driver-options: + Driver Options ~~~~~~~~~~~~~~ @@ -289,4 +297,11 @@ The list of available driver options: - Type: ``Boolean`` - Default: ``false`` +- ``ray.job.code-search-path`` + + - The paths for Java workers to load code from. Currently only directories are supported. You can specify one or more directories split by a ``:``. You don't need to configure code search path if you run a Java application in single machine mode or local mode. Code search path is also used for loading Python code if it's specified. This is required for :ref:`cross_language`. If code search path is specified, you can only run Python remote functions which can be found in the code search path. + - Type: ``String`` + - Default: empty string. + - Example: ``/path/to/jars1:/path/to/jars2:/path/to/pys1:/path/to/pys2`` + .. _`Apache Arrow`: https://arrow.apache.org/ diff --git a/doc/source/cross-language.rst b/doc/source/cross-language.rst index f982233e6..0b742866e 100644 --- a/doc/source/cross-language.rst +++ b/doc/source/cross-language.rst @@ -5,20 +5,46 @@ Cross-language programming This page will show you how to use Ray's cross-language programming feature. -Setup the cluster +Setup the driver ----------------- -We need to set the ``--code-search-path`` option on ``ray start`` command. See :ref:`code_search_path` for more details. +We need to set :ref:`code_search_path` in your driver. -.. code-block:: bash +.. tabs:: - ray start ... --code-search-path=/path/to/code + .. group-tab:: Python + + .. code-block:: python + + ray.init(job_config=ray.job_config.JobConfig(code_search_path="/path/to/code")) + + .. group-tab:: Java + + .. code-block:: bash + + java -classpath \ + -Dray.address=
\ + -Dray.job.code-search-path=/path/to/code/ \ + You may want to include multiple directories to load both Python and Java code for workers, if they are placed in different directories. -.. code-block:: bash +.. tabs:: - ray start ... --code-search-path=/path/to/jars:/path/to/pys + .. group-tab:: Python + + .. code-block:: python + + ray.init(job_config=ray.job_config.JobConfig(code_search_path="/path/to/jars:/path/to/pys")) + + .. group-tab:: Java + + .. code-block:: bash + + java -classpath \ + -Dray.address=
\ + -Dray.job.code-search-path=/path/to/jars:/path/to/pys \ + Python calling Java ------------------- diff --git a/doc/source/starting-ray.rst b/doc/source/starting-ray.rst index f631d2c78..4a198e27e 100644 --- a/doc/source/starting-ray.rst +++ b/doc/source/starting-ray.rst @@ -125,25 +125,34 @@ Use ``ray start`` from the CLI to start a 1 node ray runtime on a machine. This ... +You can connect to this Ray runtime by starting a driver process on the same node as where you ran ``ray start``: + .. tabs:: - .. group-tab:: python + .. code-tab:: python - You can connect to this Ray runtime by starting a Python process that calls the following on the same node as where you ran ``ray start``: - - .. code-block:: python - - # This must - import ray - ray.init(address='auto') + # This must + import ray + ray.init(address='auto') .. group-tab:: java + .. code-block:: java - If you want to run Java code, you need to specify the classpath via the ``--code-search-path`` option. See :ref:`code_search_path` for more details. + import io.ray.api.Ray; + + public class MyRayApp { + + public static void main(String[] args) { + Ray.init(); + ... + } + } .. code-block:: bash - $ ray start ... --code-search-path=/path/to/jars + java -classpath \ + -Dray.address=
\ + You can connect other nodes to the head node, creating a Ray cluster by also calling ``ray start`` on those nodes. See :ref:`manual-cluster` for more details. Calling ``ray.init(address="auto")`` on any of the cluster machines will connect to the ray cluster. diff --git a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java index 17f1458f7..f82a7a124 100644 --- a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java @@ -13,7 +13,6 @@ import io.ray.runtime.generated.Common.WorkerType; import io.ray.runtime.util.NetworkUtil; import java.io.File; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -174,34 +173,12 @@ public class RayConfig { if (config.hasPath("ray.job.code-search-path")) { codeSearchPathString = config.getString("ray.job.code-search-path"); } - if (!StringUtils.isEmpty(codeSearchPathString)) { - codeSearchPath = Arrays.asList(codeSearchPathString.split(":")); - } else { - codeSearchPath = Collections.emptyList(); + if (StringUtils.isEmpty(codeSearchPathString)) { + codeSearchPathString = System.getProperty("java.class.path"); } + codeSearchPath = Arrays.asList(codeSearchPathString.split(":")); - boolean enableMultiTenancy; - if (config.hasPath("ray.raylet.config.enable_multi_tenancy")) { - enableMultiTenancy = - Boolean.valueOf(config.getString("ray.raylet.config.enable_multi_tenancy")); - } else { - String envString = System.getenv("RAY_ENABLE_MULTI_TENANCY"); - if (StringUtils.isNotBlank(envString)) { - enableMultiTenancy = "1".equals(envString); - } else { - enableMultiTenancy = true; // Default value - } - } - - if (!enableMultiTenancy) { - if (!isDriver) { - numWorkersPerProcess = config.getInt("ray.raylet.config.num_workers_per_process_java"); - } else { - numWorkersPerProcess = 1; // Actually this value isn't used in RayNativeRuntime. - } - } else { - numWorkersPerProcess = config.getInt("ray.job.num-java-workers-per-process"); - } + numWorkersPerProcess = config.getInt("ray.job.num-java-workers-per-process"); headArgs = config.getStringList("ray.head-args"); diff --git a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java index 6e0e66977..b6ed41f7f 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java @@ -5,7 +5,6 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.ray.runtime.config.RayConfig; -import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; @@ -31,12 +30,6 @@ public class RunManager { */ public static void startRayHead(RayConfig rayConfig) { LOGGER.debug("Starting ray runtime @ {}.", rayConfig.nodeIp); - String codeSearchPath; - if (!rayConfig.codeSearchPath.isEmpty()) { - codeSearchPath = Joiner.on(File.pathSeparator).join(rayConfig.codeSearchPath); - } else { - codeSearchPath = System.getProperty("java.class.path"); - } List command = new ArrayList<>(); command.add("ray"); command.add("start"); @@ -44,7 +37,6 @@ public class RunManager { command.add("--redis-password"); command.add(rayConfig.redisPassword); command.add("--system-config=" + new Gson().toJson(rayConfig.rayletConfigParameters)); - command.add("--code-search-path=" + codeSearchPath); command.addAll(rayConfig.headArgs); String output; try { diff --git a/java/test.sh b/java/test.sh index ef9510d49..3352f5b33 100755 --- a/java/test.sh +++ b/java/test.sh @@ -39,7 +39,7 @@ echo "Build test jar." bazel build //java:all_tests_deploy.jar # Enable multi-worker feature in Java test -TEST_ARGS=(-Dray.raylet.config.num_workers_per_process_java=10 -Dray.job.num-java-workers-per-process=10) +TEST_ARGS=(-Dray.job.num-java-workers-per-process=10) echo "Running tests under cluster mode." # TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests, @@ -57,7 +57,7 @@ case "${OSTYPE}" in darwin*) ip=$(ipconfig getifaddr en0);; *) echo "Can't get ip address for ${OSTYPE}"; exit 1;; esac -RAY_BACKEND_LOG_LEVEL=debug ray start --head --port=6379 --redis-password=123456 --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" +RAY_BACKEND_LOG_LEVEL=debug ray start --head --port=6379 --redis-password=123456 RAY_BACKEND_LOG_LEVEL=debug java -cp bazel-bin/java/all_tests_deploy.jar -Dray.address="$ip:6379"\ -Dray.redis.password='123456' -Dray.job.code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" io.ray.test.MultiDriverTest ray stop diff --git a/java/test/src/main/java/io/ray/test/FailureTest.java b/java/test/src/main/java/io/ray/test/FailureTest.java index 6529b1beb..923875d5f 100644 --- a/java/test/src/main/java/io/ray/test/FailureTest.java +++ b/java/test/src/main/java/io/ray/test/FailureTest.java @@ -30,13 +30,13 @@ public class FailureTest extends BaseTest { // This is needed by `testGetThrowsQuicklyWhenFoundException`. // Set one worker per process. Otherwise, if `badFunc2` and `slowFunc` run in the same // process, `sleep` will delay `System.exit`. - oldNumWorkersPerProcess = System.getProperty("ray.raylet.config.num_workers_per_process_java"); - System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); + oldNumWorkersPerProcess = System.getProperty("ray.job.num-java-workers-per-process"); + System.setProperty("ray.job.num-java-workers-per-process", "1"); } @AfterClass public void tearDown() { - System.setProperty("ray.raylet.config.num_workers_per_process_java", oldNumWorkersPerProcess); + System.setProperty("ray.job.num-java-workers-per-process", oldNumWorkersPerProcess); } public static int badFunc() { diff --git a/java/test/src/main/java/io/ray/test/JobConfigTest.java b/java/test/src/main/java/io/ray/test/JobConfigTest.java index f7a7b79c3..d4725131e 100644 --- a/java/test/src/main/java/io/ray/test/JobConfigTest.java +++ b/java/test/src/main/java/io/ray/test/JobConfigTest.java @@ -14,7 +14,6 @@ public class JobConfigTest extends BaseTest { @BeforeClass public void setupJobConfig() { - System.setProperty("ray.raylet.config.enable_multi_tenancy", "true"); oldNumWorkersPerProcess = System.getProperty("ray.job.num-java-workers-per-process"); System.setProperty("ray.job.num-java-workers-per-process", "3"); System.setProperty("ray.job.jvm-options.0", "-DX=999"); @@ -25,7 +24,6 @@ public class JobConfigTest extends BaseTest { @AfterClass public void tearDownJobConfig() { - System.clearProperty("ray.raylet.config.enable_multi_tenancy"); System.setProperty("ray.job.num-java-workers-per-process", oldNumWorkersPerProcess); System.clearProperty("ray.job.jvm-options.0"); System.clearProperty("ray.job.jvm-options.1"); diff --git a/java/test/src/main/java/io/ray/test/KillActorTest.java b/java/test/src/main/java/io/ray/test/KillActorTest.java index cf88896d8..88b5b483b 100644 --- a/java/test/src/main/java/io/ray/test/KillActorTest.java +++ b/java/test/src/main/java/io/ray/test/KillActorTest.java @@ -18,13 +18,13 @@ public class KillActorTest extends BaseTest { @BeforeClass public void setUp() { - oldNumWorkersPerProcess = System.getProperty("ray.raylet.config.num_workers_per_process_java"); - System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); + oldNumWorkersPerProcess = System.getProperty("ray.job.num-java-workers-per-process"); + System.setProperty("ray.job.num-java-workers-per-process", "1"); } @AfterClass public void tearDown() { - System.setProperty("ray.raylet.config.num_workers_per_process_java", oldNumWorkersPerProcess); + System.setProperty("ray.job.num-java-workers-per-process", oldNumWorkersPerProcess); } public static class HangActor { diff --git a/java/test/src/main/java/io/ray/test/MultiDriverTest.java b/java/test/src/main/java/io/ray/test/MultiDriverTest.java index f976ce18c..80d24d79d 100644 --- a/java/test/src/main/java/io/ray/test/MultiDriverTest.java +++ b/java/test/src/main/java/io/ray/test/MultiDriverTest.java @@ -15,8 +15,6 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = {"cluster"}) @@ -27,16 +25,6 @@ public class MultiDriverTest extends BaseTest { private static final int ACTOR_COUNT_PER_DRIVER = 10; private static final String PID_LIST_PREFIX = "PID: "; - @BeforeClass - public void setUpClass() { - System.setProperty("ray.raylet.config.enable_multi_tenancy", "true"); - } - - @AfterClass - public void tearDownClass() { - System.clearProperty("ray.raylet.config.enable_multi_tenancy"); - } - static int getPid() { return SystemUtil.pid(); } diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index a33f9c7f0..62adfd072 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -136,7 +136,7 @@ def find_redis_address(address=None): # --redis_address=123.456.78.910 --node_ip_address=123.456.78.910 # --raylet_socket_name=... --store_socket_name=... --object_manager_port=0 # --min_worker_port=10000 --max_worker_port=10999 - # --node_manager_port=58578 --redis_port=6379 --num_initial_workers=8 + # --node_manager_port=58578 --redis_port=6379 # --maximum_startup_concurrency=8 # --static_resource_list=node:123.456.78.910,1.0,object_store_memory,66 # --config_list=plasma_store_as_thread,True @@ -1253,13 +1253,11 @@ def start_raylet(redis_address, stderr_file=None, config=None, java_worker_options=None, - load_code_from_local=False, huge_pages=False, fate_share=None, socket_to_use=None, head_node=False, - start_initial_python_workers_for_first_job=False, - code_search_path=None): + start_initial_python_workers_for_first_job=False): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -1296,9 +1294,6 @@ def start_raylet(redis_address, config (dict|None): Optional Raylet configuration that will override defaults in RayConfig. java_worker_options (list): The command options for Java worker. - code_search_path (list): Code search path for worker. code_search_path - is added to worker command in non-multi-tenancy mode and job_config - in multi-tenancy mode. Returns: ProcessInfo for the process that was started. """ @@ -1311,7 +1306,6 @@ def start_raylet(redis_address, raise ValueError("Cannot use valgrind and profiler at the same time.") assert resource_spec.resolved() - num_initial_workers = resource_spec.num_cpus static_resources = resource_spec.to_resource_dict() # Limit the number of workers that can be started in parallel by the @@ -1348,7 +1342,6 @@ def start_raylet(redis_address, raylet_name, redis_password, session_dir, - code_search_path, ) else: java_worker_command = [] @@ -1368,15 +1361,18 @@ def start_raylet(redis_address, # Create the command that the Raylet will use to start workers. start_worker_command = [ - sys.executable, worker_path, f"--node-ip-address={node_ip_address}", + sys.executable, + worker_path, + f"--node-ip-address={node_ip_address}", f"--node-manager-port={node_manager_port}", f"--object-store-name={plasma_store_name}", - f"--raylet-name={raylet_name}", f"--redis-address={redis_address}", - f"--config-list={config_str}", f"--temp-dir={temp_dir}", - f"--metrics-agent-port={metrics_agent_port}" + f"--raylet-name={raylet_name}", + f"--redis-address={redis_address}", + f"--config-list={config_str}", + f"--temp-dir={temp_dir}", + f"--metrics-agent-port={metrics_agent_port}", + "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER", ] - if code_search_path: - start_worker_command.append(f"--code-search-path={code_search_path}") if redis_password: start_worker_command += [f"--redis-password={redis_password}"] @@ -1391,12 +1387,6 @@ def start_raylet(redis_address, if max_worker_port is None: max_worker_port = 0 - if code_search_path is not None and len(code_search_path) > 0: - load_code_from_local = True - - if load_code_from_local: - start_worker_command += ["--load-code-from-local"] - # Create agent command agent_command = [ sys.executable, @@ -1427,7 +1417,6 @@ def start_raylet(redis_address, f"--node_ip_address={node_ip_address}", f"--redis_address={gcs_ip_address}", f"--redis_port={gcs_port}", - f"--num_initial_workers={num_initial_workers}", f"--maximum_startup_concurrency={maximum_startup_concurrency}", f"--static_resource_list={resource_argument}", f"--config_list={config_str}", @@ -1487,8 +1476,7 @@ def get_ray_jars_dir(): def build_java_worker_command(java_worker_options, redis_address, node_manager_port, plasma_store_name, - raylet_name, redis_password, session_dir, - code_search_path): + raylet_name, redis_password, session_dir): """This method assembles the command used to start a Java worker. Args: @@ -1499,7 +1487,6 @@ def build_java_worker_command(java_worker_options, redis_address, raylet_name (str): The name of the raylet socket to create. redis_password (str): The password of connect to redis. session_dir (str): The path of this session. - code_search_path (list): Teh job code search path. Returns: The command string for starting Java worker. """ @@ -1520,7 +1507,6 @@ def build_java_worker_command(java_worker_options, redis_address, pairs.append(("ray.home", RAY_HOME)) pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs"))) pairs.append(("ray.session-dir", session_dir)) - pairs.append(("ray.job.code-search-path", code_search_path)) command = ["java"] + ["-D{}={}".format(*pair) for pair in pairs] command += ["RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER"] diff --git a/python/ray/autoscaler/aws/example-java.yaml b/python/ray/autoscaler/aws/example-java.yaml index e9bd11ad3..3d27807b2 100644 --- a/python/ray/autoscaler/aws/example-java.yaml +++ b/python/ray/autoscaler/aws/example-java.yaml @@ -2,7 +2,7 @@ cluster_name: java # The minimum number of workers nodes to launch in addition to the head # node. This number should be >= 0. -min_workers: 1 +min_workers: 1 # The maximum number of workers nodes to launch in addition to the head # node. This takes precedence over min_workers. max_workers: 1 @@ -72,10 +72,10 @@ worker_setup_commands: [] # Command to start ray on the head node. You don't need to change this. head_start_ray_commands: - ray stop - - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --code-search-path=~/ray-word-count/target + - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml # Command to start ray on worker nodes. You don't need to change this. worker_start_ray_commands: - ray stop - - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 --code-search-path=ray-word-count/target + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 -# To run the program, run `ray exec java.yaml "java -jar ray-word-count/target/ray-word-count-1.0-SNAPSHOT-jar-with-dependencies.jar"` +# To run the program, run `ray exec java.yaml "java -jar ray-word-count/target/ray-word-count-1.0-SNAPSHOT-jar-with-dependencies.jar -Dray.job.code-search-path=ray-word-count/target"` diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index f13757816..1efce3343 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -51,10 +51,6 @@ cdef extern from "ray/common/ray_config.h" nogil: uint64_t object_manager_default_chunk_size() const - int num_workers_per_process_python() const - - int num_workers_per_process_java() const - uint32_t maximum_gcs_deletion_batch_size() const int64_t max_direct_call_object_size() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 09b0ad497..766782fa1 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -88,14 +88,6 @@ cdef class Config: def object_manager_default_chunk_size(): return RayConfig.instance().object_manager_default_chunk_size() - @staticmethod - def num_workers_per_process_python(): - return RayConfig.instance().num_workers_per_process_python() - - @staticmethod - def num_workers_per_process_java(): - return RayConfig.instance().num_workers_per_process_java() - @staticmethod def maximum_gcs_deletion_batch_size(): return RayConfig.instance().maximum_gcs_deletion_batch_size() diff --git a/python/ray/node.py b/python/ray/node.py index 13d73956c..ac12809ff 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -339,10 +339,6 @@ class Node: """Get the cluster Redis password""" return self._ray_params.redis_password - @property - def load_code_from_local(self): - return self._ray_params.load_code_from_local - @property def object_ref_seed(self): """Get the seed for deterministic generation of object refs""" @@ -723,14 +719,12 @@ class Node: stderr_file=stderr_file, config=self._config, java_worker_options=self._ray_params.java_worker_options, - load_code_from_local=self._ray_params.load_code_from_local, huge_pages=self._ray_params.huge_pages, fate_share=self.kernel_fate_share, socket_to_use=self.socket, head_node=self.head, start_initial_python_workers_for_first_job=self._ray_params. - start_initial_python_workers_for_first_job, - code_search_path=self._ray_params.code_search_path) + start_initial_python_workers_for_first_job) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 6c9114603..f6bbc243c 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -89,7 +89,6 @@ class RayParams: contents to Redis. autoscaling_config: path to autoscaling config file. java_worker_options (list): The command options for Java worker. - load_code_from_local: Whether load code from local file or from GCS. metrics_agent_port(int): The port to bind metrics agent. metrics_export_port(int): The port at which metrics are exposed through a Prometheus endpoint. @@ -142,14 +141,12 @@ class RayParams: include_log_monitor=None, autoscaling_config=None, java_worker_options=None, - load_code_from_local=False, start_initial_python_workers_for_first_job=False, _system_config=None, enable_object_reconstruction=False, metrics_agent_port=None, metrics_export_port=None, - lru_evict=False, - code_search_path=None): + lru_evict=False): self.object_ref_seed = object_ref_seed self.redis_address = redis_address self.num_cpus = num_cpus @@ -186,7 +183,6 @@ class RayParams: self.include_log_monitor = include_log_monitor self.autoscaling_config = autoscaling_config self.java_worker_options = java_worker_options - self.load_code_from_local = load_code_from_local self.metrics_agent_port = metrics_agent_port self.metrics_export_port = metrics_export_port self.start_initial_python_workers_for_first_job = ( @@ -195,9 +191,6 @@ class RayParams: self._lru_evict = lru_evict self._enable_object_reconstruction = enable_object_reconstruction self._check_usage() - self.code_search_path = code_search_path - if code_search_path is None: - self.code_search_path = [] # Set the internal config options for LRU eviction. if lru_evict: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index fd0c286f2..bed882c5a 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -389,25 +389,12 @@ def debug(address): default=None, type=str, help="Overwrite the options to start Java workers.") -@click.option( - "--code-search-path", - default=None, - hidden=True, - type=str, - help="A list of directories or jar files separated by colon that specify " - "the search path for user code. This will be used as `CLASSPATH` in " - "Java and `PYTHONPATH` in Python.") @click.option( "--system-config", default=None, hidden=True, type=json.loads, help="Override system configuration defaults.") -@click.option( - "--load-code-from-local", - is_flag=True, - default=False, - help="Specify whether load code from local file or GCS serialization.") @click.option( "--lru-evict", is_flag=True, @@ -436,8 +423,7 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, head, include_dashboard, dashboard_host, dashboard_port, block, plasma_directory, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, - temp_dir, java_worker_options, load_code_from_local, - code_search_path, system_config, lru_evict, + temp_dir, java_worker_options, system_config, lru_evict, enable_object_reconstruction, metrics_export_port, log_style, log_color, verbose): """Start Ray processes manually on the local machine.""" @@ -496,8 +482,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, dashboard_host=dashboard_host, dashboard_port=dashboard_port, java_worker_options=java_worker_options, - load_code_from_local=load_code_from_local, - code_search_path=code_search_path, _system_config=system_config, lru_evict=lru_evict, enable_object_reconstruction=enable_object_reconstruction, diff --git a/python/ray/tests/test_cross_language.py b/python/ray/tests/test_cross_language.py index 3904f63df..10766b18b 100644 --- a/python/ray/tests/test_cross_language.py +++ b/python/ray/tests/test_cross_language.py @@ -1,4 +1,5 @@ import pytest +import sys import ray import ray.cluster_utils @@ -6,7 +7,7 @@ import ray.test_utils def test_cross_language_raise_kwargs(shutdown_only): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) with pytest.raises(Exception, match="kwargs"): ray.java_function("a", "b").remote(x="arg1") @@ -16,7 +17,7 @@ def test_cross_language_raise_kwargs(shutdown_only): def test_cross_language_raise_exception(shutdown_only): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) class PythonObject(object): pass diff --git a/python/ray/worker.py b/python/ray/worker.py index ffc4b4f2f..306a25465 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -109,6 +109,7 @@ class Worker: # by the worker should drop into the debugger at the specified # breakpoint ID. self.debugger_get_breakpoint = b"" + self._load_code_from_local = False @property def connected(self): @@ -122,7 +123,7 @@ class Worker: @property def load_code_from_local(self): self.check_connected() - return self.node.load_code_from_local + return self._load_code_from_local @property def current_job_id(self): @@ -222,6 +223,9 @@ class Worker: """ self.mode = mode + def set_load_code_from_local(self, load_code_from_local): + self._load_code_from_local = load_code_from_local + def put_object(self, value, object_ref=None, pin_object=True): """Put value in the local object store with object reference `object_ref`. @@ -489,9 +493,7 @@ def init( _memory=None, _redis_password=ray_constants.REDIS_DEFAULT_PASSWORD, _java_worker_options=None, - _code_search_path=None, _temp_dir=None, - _load_code_from_local=False, _lru_evict=False, _metrics_export_port=None, _system_config=None): @@ -579,10 +581,7 @@ def init( _temp_dir (str): If provided, specifies the root temporary directory for the Ray process. Defaults to an OS-specific conventional location, e.g., "/tmp/ray". - _load_code_from_local: Whether code should be loaded from a local - module or from the GCS. _java_worker_options: Overwrite the options to start Java workers. - _code_search_path (list): Java classpath or python import path. _lru_evict (bool): If True, when an object store is full, it will evict objects in LRU order to make more space and when under memory pressure, ray.ObjectLostError may be thrown. If False, then @@ -701,9 +700,7 @@ def init( redis_max_memory=_redis_max_memory, plasma_store_socket_name=None, temp_dir=_temp_dir, - load_code_from_local=_load_code_from_local, java_worker_options=_java_worker_options, - code_search_path=_code_search_path, start_initial_python_workers_for_first_job=True, _system_config=_system_config, lru_evict=_lru_evict, @@ -749,7 +746,6 @@ def init( redis_password=_redis_password, object_ref_seed=None, temp_dir=_temp_dir, - load_code_from_local=_load_code_from_local, _system_config=_system_config, lru_evict=_lru_evict, enable_object_reconstruction=_enable_object_reconstruction, diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 5574b2e4c..8b63b2c81 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -145,11 +145,14 @@ if __name__ == "__main__": raylet_ip_address = args.node_ip_address code_search_path = args.code_search_path + load_code_from_local = False if code_search_path is not None: + load_code_from_local = True for p in code_search_path.split(":"): if os.path.isfile(p): p = os.path.dirname(p) sys.path.append(p) + ray.worker.global_worker.set_load_code_from_local(load_code_from_local) ray_params = RayParams( node_ip_address=args.node_ip_address, @@ -160,7 +163,6 @@ if __name__ == "__main__": plasma_store_socket_name=args.object_store_name, raylet_socket_name=args.raylet_name, temp_dir=args.temp_dir, - load_code_from_local=args.load_code_from_local, metrics_agent_port=args.metrics_agent_port, ) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 11770be3a..75208e5c5 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -196,15 +196,6 @@ RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 5 * 1024 * 1024) /// excessive memory usage during object broadcast to many receivers. RAY_CONFIG(uint64_t, object_manager_max_bytes_in_flight, 2L * 1024 * 1024 * 1024) -/// Number of workers per Python worker process -RAY_CONFIG(int, num_workers_per_process_python, 1) - -/// Number of workers per Java worker process -RAY_CONFIG(int, num_workers_per_process_java, 1) - -/// Number of workers per CPP worker process -RAY_CONFIG(int, num_workers_per_process_cpp, 1) - /// Maximum number of ids in one batch to send to GCS to delete keys. RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000) @@ -301,11 +292,6 @@ RAY_CONFIG(bool, report_worker_backlog, true) /// The timeout for synchronous GCS requests in seconds. RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5) -/// Whether to enable multi tenancy features. -RAY_CONFIG(bool, enable_multi_tenancy, - getenv("RAY_ENABLE_MULTI_TENANCY") == nullptr || - getenv("RAY_ENABLE_MULTI_TENANCY") == std::string("1")) - /// Whether to enable worker prestarting: https://github.com/ray-project/ray/issues/12052 RAY_CONFIG(bool, enable_worker_prestart, getenv("RAY_ENABLE_WORKER_PRESTART") == nullptr || diff --git a/src/ray/common/test_util.cc b/src/ray/common/test_util.cc index bac0183d9..653955dde 100644 --- a/src/ray/common/test_util.cc +++ b/src/ray/common/test_util.cc @@ -148,8 +148,7 @@ std::string TestSetupUtil::StartRaylet(const std::string &store_socket_name, "--node_manager_port=" + std::to_string(port), "--node_ip_address=" + node_ip_address, "--redis_address=" + redis_address, "--redis_port=6379", "--min-worker-port=0", "--max-worker-port=0", - "--num_initial_workers=1", "--maximum_startup_concurrency=10", - "--static_resource_list=" + resource, + "--maximum_startup_concurrency=10", "--static_resource_list=" + resource, "--python_worker_command=" + CreateCommandLine({TEST_MOCK_WORKER_EXEC_PATH, store_socket_name, raylet_socket_name, std::to_string(port)}), diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index b2a59ca41..685871c93 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -111,9 +111,7 @@ WorkerContext::WorkerContext(WorkerType worker_type, const WorkerID &worker_id, const JobID &job_id) : worker_type_(worker_type), worker_id_(worker_id), - current_job_id_(RayConfig::instance().enable_multi_tenancy() - ? job_id - : (worker_type_ == WorkerType::DRIVER ? job_id : JobID::Nil())), + current_job_id_(job_id), current_actor_id_(ActorID::Nil()), current_actor_placement_group_id_(PlacementGroupID::Nil()), placement_group_capture_child_tasks_(true), @@ -163,29 +161,17 @@ const std::unordered_map return override_environment_variables_; } -void WorkerContext::SetCurrentJobId(const JobID &job_id) { - RAY_CHECK(!RayConfig::instance().enable_multi_tenancy()); - current_job_id_ = job_id; -} - void WorkerContext::SetCurrentTaskId(const TaskID &task_id) { GetThreadContext().SetCurrentTaskId(task_id); } void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { GetThreadContext().SetCurrentTask(task_spec); + RAY_CHECK(current_job_id_ == task_spec.JobId()); if (task_spec.IsNormalTask()) { - if (!RayConfig::instance().enable_multi_tenancy()) { - RAY_CHECK(current_job_id_.IsNil()); - SetCurrentJobId(task_spec.JobId()); - } current_task_is_direct_call_ = true; override_environment_variables_ = task_spec.OverrideEnvironmentVariables(); } else if (task_spec.IsActorCreationTask()) { - if (!RayConfig::instance().enable_multi_tenancy()) { - RAY_CHECK(current_job_id_.IsNil()); - SetCurrentJobId(task_spec.JobId()); - } RAY_CHECK(current_actor_id_.IsNil()); current_actor_id_ = task_spec.ActorCreationId(); current_actor_is_direct_call_ = true; @@ -196,21 +182,13 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks(); override_environment_variables_ = task_spec.OverrideEnvironmentVariables(); } else if (task_spec.IsActorTask()) { - if (!RayConfig::instance().enable_multi_tenancy()) { - RAY_CHECK(current_job_id_ == task_spec.JobId()); - } RAY_CHECK(current_actor_id_ == task_spec.ActorId()); } else { RAY_CHECK(false); } } -void WorkerContext::ResetCurrentTask(const TaskSpecification &task_spec) { - GetThreadContext().ResetCurrentTask(); - if (!RayConfig::instance().enable_multi_tenancy() && task_spec.IsNormalTask()) { - SetCurrentJobId(JobID::Nil()); - } -} +void WorkerContext::ResetCurrentTask() { GetThreadContext().ResetCurrentTask(); } std::shared_ptr WorkerContext::GetCurrentTask() const { return GetThreadContext().GetCurrentTask(); diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 11c481b06..3b28cef3c 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -42,16 +42,12 @@ class WorkerContext { const std::unordered_map &GetCurrentOverrideEnvironmentVariables() const; - // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. - // TODO(edoakes): remove this once Python core worker uses the task interfaces. - void SetCurrentJobId(const JobID &job_id); - // TODO(edoakes): remove this once Python core worker uses the task interfaces. void SetCurrentTaskId(const TaskID &task_id); void SetCurrentTask(const TaskSpecification &task_spec); - void ResetCurrentTask(const TaskSpecification &task_spec); + void ResetCurrentTask(); std::shared_ptr GetCurrentTask() const; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index afda32b9b..af901f8d6 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -69,15 +69,7 @@ ray::JobID GetProcessJobID(const ray::CoreWorkerOptions &options) { if (options.worker_type == ray::WorkerType::WORKER) { // For workers, the job ID is assigned by Raylet via an environment variable. const char *job_id_env = std::getenv(kEnvVarKeyJobId); - // TODO(kfstorm): Use `RAY_CHECK` instead once the `enable_multi_tenancy` flag is - // removed. - // RAY_CHECK(job_id_env); - if (!job_id_env) { - // Multi-tenancy is disabled. - // NOTE(kfstorm): We can't read `RayConfig::instance().enable_multi_tenancy()` here - // because `RayConfig` is not initialized yet. - return ray::JobID::Nil(); - } + RAY_CHECK(job_id_env); return ray::JobID::FromHex(job_id_env); } return options.job_id; @@ -1859,7 +1851,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, if (!options_.is_local_mode) { SetCurrentTaskId(TaskID::Nil()); - worker_context_.ResetCurrentTask(task_spec); + worker_context_.ResetCurrentTask(); } { absl::MutexLock lock(&mutex_); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 1b877ec2f..8e8167eae 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -198,7 +198,7 @@ struct CoreWorkerOptions { /// thread with a worker. You can obtain the worker ID via /// `CoreWorkerProcess::GetCoreWorker()->GetWorkerID()`. Currently a Java worker process /// starts multiple workers by default, but can be configured to start only 1 worker by -/// overwriting the internal config `num_workers_per_process_java`. +/// speicifying `num_java_workers_per_process` in the job config. /// /// If only 1 worker is started (either because the worker type is driver, or the /// `num_workers` in `CoreWorkerOptions` is set to 1), all threads will be automatically diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 626fb2d59..8591fa5df 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -620,8 +620,9 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { TaskSpecification task_spec; size_t num_returns = 3; + task_spec.GetMutableMessage().set_job_id(job_id.Binary()); task_spec.GetMutableMessage().set_num_returns(num_returns); - context.ResetCurrentTask(task_spec); + context.ResetCurrentTask(); context.SetCurrentTask(task_spec); ASSERT_EQ(context.GetCurrentTaskID(), task_spec.TaskId()); diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index a8e5ad5ca..60de24705 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -38,7 +38,6 @@ DEFINE_int32(max_worker_port, 0, "The highest port that workers' gRPC servers will bind on."); DEFINE_string(worker_port_list, "", "An explicit list of ports that workers' gRPC servers will bind on."); -DEFINE_int32(num_initial_workers, 0, "Number of initial workers."); DEFINE_int32(num_initial_python_workers_for_first_job, 0, "Number of initial Python workers for the first job."); DEFINE_int32(maximum_startup_concurrency, 1, "Maximum startup concurrency"); @@ -78,7 +77,6 @@ int main(int argc, char *argv[]) { const int min_worker_port = static_cast(FLAGS_min_worker_port); const int max_worker_port = static_cast(FLAGS_max_worker_port); const std::string worker_port_list = FLAGS_worker_port_list; - const int num_initial_workers = static_cast(FLAGS_num_initial_workers); const int num_initial_python_workers_for_first_job = static_cast(FLAGS_num_initial_python_workers_for_first_job); const int maximum_startup_concurrency = @@ -183,7 +181,6 @@ int main(int argc, char *argv[]) { << node_manager_config.resource_config.ToString(); node_manager_config.node_manager_address = node_ip_address; node_manager_config.node_manager_port = node_manager_port; - node_manager_config.num_initial_workers = num_initial_workers; node_manager_config.num_workers_soft_limit = num_cpus; node_manager_config.num_initial_python_workers_for_first_job = num_initial_python_workers_for_first_job; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f20956cea..4c43d3595 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -133,7 +133,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self initial_config_(config), local_available_resources_(config.resource_config), worker_pool_( - io_service, config.num_initial_workers, config.num_workers_soft_limit, + io_service, config.num_workers_soft_limit, config.num_initial_python_workers_for_first_job, config.maximum_startup_concurrency, config.min_worker_port, config.max_worker_port, config.worker_ports, gcs_client_, @@ -345,15 +345,13 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_ RAY_CHECK(!job_data.is_dead()); worker_pool_.HandleJobStarted(job_id, job_data.config()); - if (RayConfig::instance().enable_multi_tenancy()) { - // Tasks of this job may already arrived but failed to pop a worker because the job - // config is not local yet. So we trigger dispatching again here to try to - // reschedule these tasks. - if (new_scheduler_enabled_) { - ScheduleAndDispatch(); - } else { - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } + // Tasks of this job may already arrived but failed to pop a worker because the job + // config is not local yet. So we trigger dispatching again here to try to + // reschedule these tasks. + if (new_scheduler_enabled_) { + ScheduleAndDispatch(); + } else { + DispatchTasks(local_queues_.GetReadyTasksByClass()); } } @@ -1193,8 +1191,7 @@ void NodeManager::ProcessRegisterClientRequestMessage( std::string worker_ip_address = string_from_flatbuf(*message->ip_address()); // TODO(suquark): Use `WorkerType` in `common.proto` without type converting. rpc::WorkerType worker_type = static_cast(message->worker_type()); - if ((RayConfig::instance().enable_multi_tenancy() && - (worker_type != rpc::WorkerType::SPILL_WORKER && + if (((worker_type != rpc::WorkerType::SPILL_WORKER && worker_type != rpc::WorkerType::RESTORE_WORKER)) || worker_type == rpc::WorkerType::DRIVER) { RAY_CHECK(!job_id.IsNil()); @@ -1627,9 +1624,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr)); } - // Prestart optimization is only needed when multi-tenancy is on. - if (RayConfig::instance().enable_multi_tenancy() && - RayConfig::instance().enable_worker_prestart()) { + if (RayConfig::instance().enable_worker_prestart()) { auto task_spec = task.GetTaskSpecification(); worker_pool_.PrestartWorkers(task_spec, request.backlog_size()); } @@ -2522,12 +2517,6 @@ bool NodeManager::FinishAssignedTask(const std::shared_ptr &wor task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId()); task_dependency_manager_.TaskCanceled(task_id); - if (!RayConfig::instance().enable_multi_tenancy()) { - // Unset the worker's assigned job Id if this is not an actor. - if (!spec.IsActorCreationTask()) { - worker.AssignJobId(JobID::Nil()); - } - } if (!spec.IsActorCreationTask()) { // Unset the worker's assigned task. We keep the assigned task ID for // direct actor creation calls because this ID is used later if the actor @@ -2776,9 +2765,7 @@ void NodeManager::FinishAssignTask(const std::shared_ptr &worke // We successfully assigned the task to the worker. worker->AssignTaskId(spec.TaskId()); worker->SetOwnerAddress(spec.CallerAddress()); - if (!RayConfig::instance().enable_multi_tenancy()) { - worker->AssignJobId(spec.JobId()); - } + RAY_CHECK(worker->GetAssignedJobId() == spec.JobId()); // TODO(swang): For actors with multiple actor handles, to // guarantee that tasks are replayed in the same order after a // failure, we must update the task's execution dependency to be diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index b73eb6876..428b5549e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -70,8 +70,6 @@ struct NodeManagerConfig { /// An explicit list of open ports that workers started will bind /// on. This takes precedence over min_worker_port and max_worker_port. std::vector worker_ports; - /// The initial number of workers to create. - int num_initial_workers; /// The soft limit of the number of workers. int num_workers_soft_limit; /// Number of initial Python workers for the first job. diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 5f950f46e..0a1aae200 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -40,7 +40,6 @@ class TestObjectManagerBase : public ::testing::Test { static_resource_conf = {{"CPU", 1}, {"GPU", 1}}; node_manager_config.resource_config = ray::raylet::ResourceSet(std::move(static_resource_conf)); - node_manager_config.num_initial_workers = 0; // Use a default worker that can execute empty tasks with dependencies. std::vector py_worker_command; py_worker_command.push_back("python"); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 9cd6832f0..a62eaabba 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -186,9 +186,6 @@ bool ClusterTaskManager::AttemptDispatchWork(const Work &work, worker->SetAllocatedInstances(allocated_instances); } worker->AssignTaskId(spec.TaskId()); - if (!RayConfig::instance().enable_multi_tenancy()) { - worker->AssignJobId(spec.JobId()); - } worker->SetAssignedTask(task); *worker_leased = true; dispatched = true; diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 459a31e02..4d64507b8 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -33,9 +33,6 @@ class MockWorker : public WorkerInterface { void AssignTaskId(const TaskID &task_id) {} - // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. - void AssignJobId(const JobID &job_id) {} - void SetAssignedTask(const Task &assigned_task) {} const std::string IpAddress() const { return address_.ip_address(); } diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index acce4c984..81e8a98ac 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -111,11 +111,6 @@ const std::unordered_set &Worker::GetBlockedTaskIds() const { return blocked_task_ids_; } -void Worker::AssignJobId(const JobID &job_id) { - RAY_CHECK(!RayConfig::instance().enable_multi_tenancy()); - assigned_job_id_ = job_id; -} - const JobID &Worker::GetAssignedJobId() const { return assigned_job_id_; } void Worker::AssignActorId(const ActorID &actor_id) { diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index a40855abe..7cd19868e 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -60,8 +60,6 @@ class WorkerInterface { virtual bool AddBlockedTaskId(const TaskID &task_id) = 0; virtual bool RemoveBlockedTaskId(const TaskID &task_id) = 0; virtual const std::unordered_set &GetBlockedTaskIds() const = 0; - // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. - virtual void AssignJobId(const JobID &job_id) = 0; virtual const JobID &GetAssignedJobId() const = 0; virtual void AssignActorId(const ActorID &actor_id) = 0; virtual const ActorID &GetActorId() const = 0; @@ -151,8 +149,6 @@ class Worker : public WorkerInterface { bool AddBlockedTaskId(const TaskID &task_id); bool RemoveBlockedTaskId(const TaskID &task_id); const std::unordered_set &GetBlockedTaskIds() const; - // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. - void AssignJobId(const JobID &job_id); const JobID &GetAssignedJobId() const; void AssignActorId(const ActorID &actor_id); const ActorID &GetActorId() const; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index bc649ac02..a6640dc5b 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -54,8 +54,7 @@ namespace ray { namespace raylet { -WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, - int num_workers_soft_limit, +WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers_soft_limit, int num_initial_python_workers_for_first_job, int maximum_startup_concurrency, int min_worker_port, int max_worker_port, const std::vector &worker_ports, @@ -83,33 +82,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, for (const auto &entry : worker_commands) { // Initialize the pool state for this language. auto &state = states_by_lang_[entry.first]; - if (!RayConfig::instance().enable_multi_tenancy()) { - switch (entry.first) { - case Language::PYTHON: - state.num_workers_per_process = - RayConfig::instance().num_workers_per_process_python(); - break; - case Language::JAVA: - state.num_workers_per_process = - RayConfig::instance().num_workers_per_process_java(); - break; - case Language::CPP: - state.num_workers_per_process = - RayConfig::instance().num_workers_per_process_cpp(); - break; - default: - RAY_LOG(FATAL) << "The number of workers per process for " - << Language_Name(entry.first) << " worker is not set."; - } - RAY_CHECK(state.num_workers_per_process > 0) - << "Number of workers per process of language " << Language_Name(entry.first) - << " must be positive."; - state.multiple_for_warning = - std::max(state.num_workers_per_process, - std::max(num_workers, maximum_startup_concurrency)); - } else { - state.multiple_for_warning = maximum_startup_concurrency; - } + state.multiple_for_warning = maximum_startup_concurrency; // Set worker command for this language. state.worker_command = entry.second; RAY_CHECK(!state.worker_command.empty()) << "Worker command must not be empty."; @@ -131,27 +104,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, free_ports_->push(port); } } - if (!RayConfig::instance().enable_multi_tenancy()) { - Start(num_workers); - } else { - ScheduleIdleWorkerKilling(); - } -} - -void WorkerPool::Start(int num_workers) { - RAY_CHECK(!RayConfig::instance().enable_multi_tenancy()); - for (auto &entry : states_by_lang_) { - if (entry.first == Language::JAVA) { - // Disable initial workers for Java. - continue; - } - auto &state = entry.second; - int num_worker_processes = static_cast( - std::ceil(static_cast(num_workers) / state.num_workers_per_process)); - for (int i = 0; i < num_worker_processes; i++) { - StartWorkerProcess(entry.first, rpc::WorkerType::WORKER, JobID::Nil()); - } - } + ScheduleIdleWorkerKilling(); } WorkerPool::~WorkerPool() { @@ -178,7 +131,7 @@ Process WorkerPool::StartWorkerProcess( std::vector dynamic_options, std::unordered_map override_environment_variables) { rpc::JobConfig *job_config = nullptr; - if (RayConfig::instance().enable_multi_tenancy() && !IsIOWorkerType(worker_type)) { + if (!IsIOWorkerType(worker_type)) { RAY_CHECK(!job_id.IsNil()); auto it = all_jobs_.find(job_id); if (it == all_jobs_.end()) { @@ -212,15 +165,12 @@ Process WorkerPool::StartWorkerProcess( int workers_to_start = 1; if (dynamic_options.empty()) { - if (!RayConfig::instance().enable_multi_tenancy()) { - workers_to_start = state.num_workers_per_process; - } else if (language == Language::JAVA) { + if (language == Language::JAVA) { workers_to_start = job_config->num_java_workers_per_process(); } } - // For non-multi-tenancy mode, job code search path is embedded in worker_command. - if (RayConfig::instance().enable_multi_tenancy() && job_config) { + if (job_config) { // Note that we push the item to the front of the vector to make // sure this is the freshest option than others. if (!job_config->jvm_options().empty()) { @@ -272,9 +222,6 @@ Process WorkerPool::StartWorkerProcess( switch (language) { case Language::JAVA: for (auto &entry : raylet_config_) { - if (entry.first == "num_workers_per_process_java") { - continue; - } std::string arg; arg.append("-Dray.raylet.config."); arg.append(entry.first); @@ -282,17 +229,8 @@ Process WorkerPool::StartWorkerProcess( arg.append(entry.second); worker_command_args.push_back(arg); } - if (!RayConfig::instance().enable_multi_tenancy()) { - // The value of `num_workers_per_process_java` may change depends on whether - // dynamic options is empty, so we can't use the value in `RayConfig`. We always - // overwrite the value here. - worker_command_args.push_back( - "-Dray.raylet.config.num_workers_per_process_java=" + - std::to_string(workers_to_start)); - } else { - worker_command_args.push_back("-Dray.job.num-java-workers-per-process=" + - std::to_string(workers_to_start)); - } + worker_command_args.push_back("-Dray.job.num-java-workers-per-process=" + + std::to_string(workers_to_start)); break; default: RAY_LOG(FATAL) @@ -327,12 +265,12 @@ Process WorkerPool::StartWorkerProcess( } ProcessEnvironment env; - if (RayConfig::instance().enable_multi_tenancy() && !IsIOWorkerType(worker_type)) { + if (!IsIOWorkerType(worker_type)) { // We pass the job ID to worker processes via an environment variable, so we don't // need to add a new CLI parameter for both Python and Java workers. env.emplace(kEnvVarKeyJobId, job_id.Hex()); } - if (RayConfig::instance().enable_multi_tenancy() && job_config) { + if (job_config) { env.insert(job_config->worker_env().begin(), job_config->worker_env().end()); } @@ -516,18 +454,16 @@ void WorkerPool::OnWorkerStarted(const std::shared_ptr &worker) io_worker_state.num_starting_io_workers--; } - if (RayConfig::instance().enable_multi_tenancy()) { - // This is a workaround to finish driver registration after all initial workers are - // registered to Raylet if and only if Raylet is started by a Python driver and the - // job config is not set in `ray.init(...)`. - if (first_job_ == worker->GetAssignedJobId() && - worker->GetLanguage() == Language::PYTHON) { - if (++first_job_registered_python_worker_count_ == - first_job_driver_wait_num_python_workers_) { - if (first_job_send_register_client_reply_to_driver_) { - first_job_send_register_client_reply_to_driver_(); - first_job_send_register_client_reply_to_driver_ = nullptr; - } + // This is a workaround to finish driver registration after all initial workers are + // registered to Raylet if and only if Raylet is started by a Python driver and the + // job config is not set in `ray.init(...)`. + if (first_job_ == worker->GetAssignedJobId() && + worker->GetLanguage() == Language::PYTHON) { + if (++first_job_registered_python_worker_count_ == + first_job_driver_wait_num_python_workers_) { + if (first_job_send_register_client_reply_to_driver_) { + first_job_send_register_client_reply_to_driver_(); + first_job_send_register_client_reply_to_driver_ = nullptr; } } } @@ -554,18 +490,15 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver // Invoke the `send_reply_callback` later to only finish driver // registration after all initial workers are registered to Raylet. bool delay_callback = false; - // Multi-tenancy is enabled. - if (RayConfig().instance().enable_multi_tenancy()) { - // If this is the first job. - if (first_job_.IsNil()) { - first_job_ = job_id; - // If the number of Python workers we need to wait is positive. - if (num_initial_python_workers_for_first_job_ > 0) { - delay_callback = true; - // Start initial Python workers for the first job. - for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) { - StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id); - } + // If this is the first job. + if (first_job_.IsNil()) { + first_job_ = job_id; + // If the number of Python workers we need to wait is positive. + if (num_initial_python_workers_for_first_job_ > 0) { + delay_callback = true; + // Start initial Python workers for the first job. + for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) { + StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id); } } } @@ -695,11 +628,9 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { // Put the worker to the corresponding idle pool. if (worker->GetActorId().IsNil()) { state.idle.insert(worker); - if (RayConfig::instance().enable_multi_tenancy()) { - int64_t now = current_time_ms(); - idle_of_all_languages_.emplace_back(worker, now); - idle_of_all_languages_map_[worker] = now; - } + int64_t now = current_time_ms(); + idle_of_all_languages_.emplace_back(worker, now); + idle_of_all_languages_map_[worker] = now; } else { state.idle_actor[worker->GetActorId()] = worker; } @@ -858,40 +789,28 @@ std::shared_ptr WorkerPool::PopWorker( } } else if (!task_spec.IsActorTask()) { // Code path of normal task or actor creation task without dynamic worker options. - if (!RayConfig::instance().enable_multi_tenancy()) { - if (!state.idle.empty()) { - worker = std::move(*state.idle.begin()); - state.idle.erase(state.idle.begin()); - } else { - // There are no more non-actor workers available to execute this task. - // Start a new worker process. - proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, - JobID::Nil()); - } - } else { - // Find an available worker which is already assigned to this job. - // Try to pop the most recently pushed worker. - for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend(); - it++) { - if (task_spec.GetLanguage() != it->first->GetLanguage() || - it->first->GetAssignedJobId() != task_spec.JobId()) { - continue; - } - state.idle.erase(it->first); - // We can't erase a reverse_iterator. - auto lit = it.base(); - lit--; - worker = std::move(lit->first); - idle_of_all_languages_.erase(lit); - idle_of_all_languages_map_.erase(worker); - break; - } - if (worker == nullptr) { - // There are no more non-actor workers available to execute this task. - // Start a new worker process. - proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, - task_spec.JobId()); + // Find an available worker which is already assigned to this job. + // Try to pop the most recently pushed worker. + for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend(); + it++) { + if (task_spec.GetLanguage() != it->first->GetLanguage() || + it->first->GetAssignedJobId() != task_spec.JobId()) { + continue; } + state.idle.erase(it->first); + // We can't erase a reverse_iterator. + auto lit = it.base(); + lit--; + worker = std::move(lit->first); + idle_of_all_languages_.erase(lit); + idle_of_all_languages_map_.erase(worker); + break; + } + if (worker == nullptr) { + // There are no more non-actor workers available to execute this task. + // Start a new worker process. + proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, + task_spec.JobId()); } } else { // Code path of actor task. @@ -907,7 +826,7 @@ std::shared_ptr WorkerPool::PopWorker( WarnAboutSize(); } - if (RayConfig::instance().enable_multi_tenancy() && worker) { + if (worker) { RAY_CHECK(worker->GetAssignedJobId() == task_spec.JobId()); } return worker; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 78ae85564..62dfd20a5 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -95,7 +95,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// process should create and register the specified number of workers, and add them to /// the pool. /// - /// \param num_workers The number of workers to start, per language. /// \param num_workers_soft_limit The soft limit of the number of workers. /// \param num_initial_python_workers_for_first_job The number of initial Python /// workers for the first job. @@ -113,8 +112,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \param raylet_config The raylet config list of this node. /// \param starting_worker_timeout_callback The callback that will be triggered once /// it times out to start a worker. - WorkerPool(boost::asio::io_service &io_service, int num_workers, - int num_workers_soft_limit, int num_initial_python_workers_for_first_job, + WorkerPool(boost::asio::io_service &io_service, int num_workers_soft_limit, + int num_initial_python_workers_for_first_job, int maximum_startup_concurrency, int min_worker_port, int max_worker_port, const std::vector &worker_ports, std::shared_ptr gcs_client, @@ -251,7 +250,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Try to prestart a number of workers suitable the given task spec. Prestarting /// is needed since core workers request one lease at a time, if starting is slow, - /// then it means it takes a long time to scale up when multi-tenancy is on. + /// then it means it takes a long time to scale up. /// /// \param task_spec The returned worker must be able to execute this task. /// \param backlog_size The number of tasks in the client backlog of this shape. @@ -306,7 +305,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { protected: /// Asynchronously start a new worker process. Once the worker process has /// registered with an external server, the process should create and - /// register num_workers_per_process workers, then add them to the pool. + /// register N workers, then add them to the pool. /// Failure to start the worker process is a fatal error. If too many workers /// are already being started, then this function will return without starting /// any workers. @@ -354,8 +353,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { struct State { /// The commands and arguments used to start the worker process std::vector worker_command; - /// The number of workers per process. - int num_workers_per_process; /// The pool of dedicated workers for actor creation tasks /// with prefix or suffix worker command. std::unordered_map> idle_dedicated_workers; @@ -392,12 +389,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::unordered_map> states_by_lang_; private: - /// Force-start at least num_workers workers for this language. Used for internal and - /// test purpose only. - /// - /// \param num_workers The number of workers to start, per language. - void Start(int num_workers); - /// A helper function that returns the reference of the pool state /// for a given language. State &GetStateForLanguage(const Language &language); diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 833be632b..ee8f3356b 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -36,12 +36,9 @@ class WorkerPoolMock : public WorkerPool { public: explicit WorkerPoolMock(boost::asio::io_service &io_service, const WorkerCommandMap &worker_commands) - : WorkerPool(io_service, 0, POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, - 0, {}, nullptr, worker_commands, {}, []() {}), - last_worker_process_() { - states_by_lang_[ray::Language::JAVA].num_workers_per_process = - NUM_WORKERS_PER_PROCESS_JAVA; - } + : WorkerPool(io_service, POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, + {}, nullptr, worker_commands, {}, []() {}), + last_worker_process_() {} ~WorkerPoolMock() { // Avoid killing real processes @@ -103,14 +100,11 @@ class WorkerPoolMock : public WorkerPool { std::unordered_map> worker_commands_by_proc_; }; -class WorkerPoolTest : public ::testing::TestWithParam { +class WorkerPoolTest : public ::testing::Test { public: WorkerPoolTest() : error_message_type_(1), client_call_manager_(io_service_) { - bool enable_multi_tenancy = GetParam(); RayConfig::instance().initialize( - {{"enable_multi_tenancy", std::to_string(enable_multi_tenancy)}, - {"num_workers_per_process_java", std::to_string(NUM_WORKERS_PER_PROCESS_JAVA)}, - {"object_spilling_config", "mock_config"}, + {{"object_spilling_config", "mock_config"}, {"max_io_workers", std::to_string(MAX_IO_WORKER_SIZE)}}); SetWorkerCommands( {{Language::PYTHON, {"dummy_py_worker_command"}}, @@ -236,16 +230,10 @@ static inline TaskSpecification ExampleTaskSpec( } static inline std::string GetNumJavaWorkersPerProcessSystemProperty(int num) { - std::string key; - if (RayConfig::instance().enable_multi_tenancy()) { - key = "ray.job.num-java-workers-per-process"; - } else { - key = "ray.raylet.config.num_workers_per_process_java"; - } - return std::string("-D") + key + "=" + std::to_string(num); + return std::string("-Dray.job.num-java-workers-per-process=") + std::to_string(num); } -TEST_P(WorkerPoolTest, CompareWorkerProcessObjects) { +TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) { typedef Process T; T a(T::CreateNewDummy()), b(T::CreateNewDummy()), empty = T(); ASSERT_TRUE(empty.IsNull()); @@ -259,7 +247,7 @@ TEST_P(WorkerPoolTest, CompareWorkerProcessObjects) { ASSERT_TRUE(!std::equal_to()(a, empty)); } -TEST_P(WorkerPoolTest, HandleWorkerRegistration) { +TEST_F(WorkerPoolTest, HandleWorkerRegistration) { Process proc = worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID); std::vector> workers; @@ -286,59 +274,49 @@ TEST_P(WorkerPoolTest, HandleWorkerRegistration) { } } -TEST_P(WorkerPoolTest, HandleUnknownWorkerRegistration) { +TEST_F(WorkerPoolTest, HandleUnknownWorkerRegistration) { auto worker = CreateWorker(Process(), Language::PYTHON); auto status = worker_pool_->RegisterWorker(worker, 1234, [](Status, int) {}); ASSERT_FALSE(status.ok()); } -TEST_P(WorkerPoolTest, StartupPythonWorkerProcessCount) { +TEST_F(WorkerPoolTest, StartupPythonWorkerProcessCount) { TestStartupWorkerProcessCount(Language::PYTHON, 1, {"dummy_py_worker_command"}); } -TEST_P(WorkerPoolTest, StartupJavaWorkerProcessCount) { +TEST_F(WorkerPoolTest, StartupJavaWorkerProcessCount) { TestStartupWorkerProcessCount( Language::JAVA, NUM_WORKERS_PER_PROCESS_JAVA, {"dummy_java_worker_command", GetNumJavaWorkersPerProcessSystemProperty(NUM_WORKERS_PER_PROCESS_JAVA)}); } -TEST_P(WorkerPoolTest, InitialWorkerProcessCount) { - if (!RayConfig::instance().enable_multi_tenancy()) { - worker_pool_->Start(1); - // Here we try to start only 1 worker for each worker language. But since we disabled - // initial workers for Java, we expect to see only 1 worker which is a Python worker. - ASSERT_EQ(worker_pool_->NumWorkersStarting(), 1); - ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1); - } else { - ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0); - ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0); - } +TEST_F(WorkerPoolTest, InitialWorkerProcessCount) { + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0); + ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0); } -TEST_P(WorkerPoolTest, TestPrestartingWorkers) { - if (RayConfig::instance().enable_multi_tenancy()) { - const auto task_spec = ExampleTaskSpec(); - // Prestarts 2 workers. - worker_pool_->PrestartWorkers(task_spec, 2); - ASSERT_EQ(worker_pool_->NumWorkersStarting(), 2); - ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 2); - // Prestarts 1 more worker. - worker_pool_->PrestartWorkers(task_spec, 3); - ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3); - ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3); - // No more needed. - worker_pool_->PrestartWorkers(task_spec, 1); - ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3); - ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3); - // Capped by soft limit of 5. - worker_pool_->PrestartWorkers(task_spec, 20); - ASSERT_EQ(worker_pool_->NumWorkersStarting(), 5); - ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 5); - } +TEST_F(WorkerPoolTest, TestPrestartingWorkers) { + const auto task_spec = ExampleTaskSpec(); + // Prestarts 2 workers. + worker_pool_->PrestartWorkers(task_spec, 2); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 2); + ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 2); + // Prestarts 1 more worker. + worker_pool_->PrestartWorkers(task_spec, 3); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3); + ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3); + // No more needed. + worker_pool_->PrestartWorkers(task_spec, 1); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3); + ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3); + // Capped by soft limit of 5. + worker_pool_->PrestartWorkers(task_spec, 20); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 5); + ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 5); } -TEST_P(WorkerPoolTest, HandleWorkerPushPop) { +TEST_F(WorkerPoolTest, HandleWorkerPushPop) { // Try to pop a worker from the empty pool and make sure we don't get one. std::shared_ptr popped_worker; const auto task_spec = ExampleTaskSpec(); @@ -365,7 +343,7 @@ TEST_P(WorkerPoolTest, HandleWorkerPushPop) { ASSERT_EQ(popped_worker, nullptr); } -TEST_P(WorkerPoolTest, PopActorWorker) { +TEST_F(WorkerPoolTest, PopActorWorker) { // Create a worker. auto worker = CreateWorker(Process::CreateNewDummy()); // Add the worker to the pool. @@ -387,7 +365,7 @@ TEST_P(WorkerPoolTest, PopActorWorker) { ASSERT_EQ(actor->GetActorId(), actor_id); } -TEST_P(WorkerPoolTest, PopWorkersOfMultipleLanguages) { +TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { // Create a Python Worker, and add it to the pool auto py_worker = CreateWorker(Process::CreateNewDummy(), Language::PYTHON); worker_pool_->PushWorker(py_worker); @@ -405,7 +383,7 @@ TEST_P(WorkerPoolTest, PopWorkersOfMultipleLanguages) { ASSERT_NE(worker_pool_->PopWorker(java_task_spec), nullptr); } -TEST_P(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) { +TEST_F(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) { const std::vector java_worker_command = { "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER", "dummy_java_worker_command", "RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER"}; @@ -426,25 +404,15 @@ TEST_P(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) { const auto real_command = worker_pool_->GetWorkerCommand(worker_pool_->LastStartedWorkerProcess()); - if (RayConfig::instance().enable_multi_tenancy()) { - ASSERT_EQ( - real_command, - std::vector( - {"test_op_0", "test_op_1", "-Dray.job.code-search-path=/test/code_serch_path", - "dummy_java_worker_command", GetNumJavaWorkersPerProcessSystemProperty(1)})); - } else { - ASSERT_EQ(real_command, std::vector( - {"test_op_0", "test_op_1", "dummy_java_worker_command", - GetNumJavaWorkersPerProcessSystemProperty(1)})); - } + ASSERT_EQ( + real_command, + std::vector( + {"test_op_0", "test_op_1", "-Dray.job.code-search-path=/test/code_serch_path", + "dummy_java_worker_command", GetNumJavaWorkersPerProcessSystemProperty(1)})); worker_pool_->HandleJobFinished(JOB_ID); } -TEST_P(WorkerPoolTest, PopWorkerMultiTenancy) { - if (!RayConfig::instance().enable_multi_tenancy()) { - return; - } - +TEST_F(WorkerPoolTest, PopWorkerMultiTenancy) { auto job_id1 = JOB_ID; auto job_id2 = JobID::FromInt(2); ASSERT_NE(job_id1, job_id2); @@ -505,7 +473,7 @@ TEST_P(WorkerPoolTest, PopWorkerMultiTenancy) { } } -TEST_P(WorkerPoolTest, MaximumStartupConcurrency) { +TEST_F(WorkerPoolTest, MaximumStartupConcurrency) { auto task_spec = ExampleTaskSpec(); std::vector started_processes; @@ -550,7 +518,7 @@ TEST_P(WorkerPoolTest, MaximumStartupConcurrency) { ASSERT_EQ(0, worker_pool_->NumWorkerProcessesStarting()); } -TEST_P(WorkerPoolTest, HandleIOWorkersPushPop) { +TEST_F(WorkerPoolTest, HandleIOWorkersPushPop) { std::unordered_set> spill_pushed_worker; std::unordered_set> restore_pushed_worker; auto spill_worker_callback = @@ -603,7 +571,7 @@ TEST_P(WorkerPoolTest, HandleIOWorkersPushPop) { ASSERT_EQ(restore_pushed_worker.size(), 1); } -TEST_P(WorkerPoolTest, MaxIOWorkerSimpleTest) { +TEST_F(WorkerPoolTest, MaxIOWorkerSimpleTest) { // Make sure max number of spill workers are respected. auto callback = [](std::shared_ptr worker) {}; std::vector started_processes; @@ -633,7 +601,7 @@ TEST_P(WorkerPoolTest, MaxIOWorkerSimpleTest) { ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 0); } -TEST_P(WorkerPoolTest, MaxIOWorkerComplicateTest) { +TEST_F(WorkerPoolTest, MaxIOWorkerComplicateTest) { // Make sure max number of restore workers are respected. // This test will test a little more complicated scneario. // For example, it tests scenarios where there are @@ -685,7 +653,7 @@ TEST_P(WorkerPoolTest, MaxIOWorkerComplicateTest) { ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 0); } -TEST_P(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) { +TEST_F(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) { auto callback = [](std::shared_ptr worker) {}; // Run many pop spill/restore workers and make sure the max worker size doesn't exceed. std::vector started_restore_processes; @@ -732,7 +700,7 @@ TEST_P(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) { ASSERT_EQ(worker_pool_->GetProcessSize(), 2 * MAX_IO_WORKER_SIZE); } -TEST_P(WorkerPoolTest, DeleteWorkerPushPop) { +TEST_F(WorkerPoolTest, DeleteWorkerPushPop) { /// Make sure delete workers always pop an I/O worker that has more idle worker in their /// pools. // 2 spill worker and 1 restore worker. @@ -770,9 +738,6 @@ TEST_P(WorkerPoolTest, DeleteWorkerPushPop) { }); } -INSTANTIATE_TEST_CASE_P(WorkerPoolMultiTenancyTest, WorkerPoolTest, - ::testing::Values(true, false)); - } // namespace raylet } // namespace ray diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java index 559e9f223..c303b4994 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/ClusterStarter.java @@ -12,10 +12,8 @@ class ClusterStarter { static synchronized void startCluster(boolean isLocal) { Preconditions.checkArgument(!Ray.isInitialized()); if (!isLocal) { - System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); System.setProperty("ray.run-mode", "CLUSTER"); } else { - System.clearProperty("ray.raylet.config.num_workers_per_process_java"); System.setProperty("ray.run-mode", "SINGLE_PROCESS"); } @@ -25,7 +23,6 @@ class ClusterStarter { public static synchronized void stopCluster() { // Disconnect to the cluster. Ray.shutdown(); - System.clearProperty("ray.raylet.config.num_workers_per_process_java"); System.clearProperty("ray.run-mode"); } } diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java index e8d58b00a..c93d19911 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java @@ -60,7 +60,6 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { Ray.shutdown(); System.setProperty("ray.head-args.0", "--num-cpus=4"); System.setProperty("ray.head-args.1", "--resources={\"RES-A\":4}"); - System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); System.setProperty("ray.run-mode", "CLUSTER"); System.setProperty("ray.redirect-output", "true"); Ray.init(); @@ -82,7 +81,6 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { Ray.shutdown(); System.setProperty("ray.head-args.0", "--num-cpus=4"); System.setProperty("ray.head-args.1", "--resources={\"RES-A\":4}"); - System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); System.setProperty("ray.run-mode", "CLUSTER"); System.setProperty("ray.redirect-output", "true"); @@ -139,7 +137,6 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { Ray.shutdown(); System.setProperty("ray.head-args.0", "--num-cpus=4"); System.setProperty("ray.head-args.1", "--resources={\"RES-A\":4}"); - System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); System.setProperty("ray.run-mode", "CLUSTER"); System.setProperty("ray.redirect-output", "true"); diff --git a/streaming/python/examples/wordcount.py b/streaming/python/examples/wordcount.py index 2f62b19da..d10782b52 100644 --- a/streaming/python/examples/wordcount.py +++ b/streaming/python/examples/wordcount.py @@ -1,5 +1,6 @@ import argparse import logging +import sys import time import ray @@ -65,7 +66,7 @@ if __name__ == "__main__": args = parser.parse_args() titles_file = str(args.titles_file) - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder() \ .option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \ diff --git a/streaming/python/tests/test_failover.py b/streaming/python/tests/test_failover.py index adab217e0..264a40099 100644 --- a/streaming/python/tests/test_failover.py +++ b/streaming/python/tests/test_failover.py @@ -1,4 +1,5 @@ import subprocess +import sys import time from typing import List @@ -8,7 +9,8 @@ from ray.streaming import StreamingContext def test_word_count(): try: - ray.init(_load_code_from_local=True) + ray.init( + job_config=ray.job_config.JobConfig(code_search_path=sys.path)) # time.sleep(10) # for gdb to attach ctx = StreamingContext.Builder() \ .option("streaming.context-backend.type", "local_file") \ diff --git a/streaming/python/tests/test_hybrid_stream.py b/streaming/python/tests/test_hybrid_stream.py index e257f0d9f..1a18b3fb1 100644 --- a/streaming/python/tests/test_hybrid_stream.py +++ b/streaming/python/tests/test_hybrid_stream.py @@ -1,6 +1,7 @@ import json import os import subprocess +import sys import ray from ray.streaming import StreamingContext @@ -34,9 +35,8 @@ def test_hybrid_stream(): print("java_worker_options", java_worker_options) assert not ray.is_initialized() ray.init( - _load_code_from_local=True, - _java_worker_options=java_worker_options, - _system_config={"num_workers_per_process_java": 1}) + job_config=ray.job_config.JobConfig(code_search_path=sys.path), + _java_worker_options=java_worker_options) sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt" if os.path.exists(sink_file): diff --git a/streaming/python/tests/test_stream.py b/streaming/python/tests/test_stream.py index f99033d19..27febb822 100644 --- a/streaming/python/tests/test_stream.py +++ b/streaming/python/tests/test_stream.py @@ -1,9 +1,11 @@ +import sys + import ray from ray.streaming import StreamingContext def test_data_stream(): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder().build() stream = ctx.from_values(1, 2, 3) java_stream = stream.as_java_stream() @@ -17,7 +19,7 @@ def test_data_stream(): def test_key_data_stream(): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder().build() key_stream = ctx.from_values( "a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0]) @@ -32,7 +34,7 @@ def test_key_data_stream(): def test_stream_config(): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder().build() stream = ctx.from_values(1, 2, 3) stream.with_config("k1", "v1") diff --git a/streaming/python/tests/test_union_stream.py b/streaming/python/tests/test_union_stream.py index 0c655b1d0..bab75e624 100644 --- a/streaming/python/tests/test_union_stream.py +++ b/streaming/python/tests/test_union_stream.py @@ -1,11 +1,12 @@ import os +import sys import ray from ray.streaming import StreamingContext def test_union_stream(): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder() \ .option("streaming.metrics.reporters", "") \ .build() diff --git a/streaming/python/tests/test_word_count.py b/streaming/python/tests/test_word_count.py index 13b09912c..8275ac6f6 100644 --- a/streaming/python/tests/test_word_count.py +++ b/streaming/python/tests/test_word_count.py @@ -1,11 +1,12 @@ import os +import sys import ray from ray.streaming import StreamingContext from ray.test_utils import wait_for_condition def test_word_count(): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder() \ .build() ctx.read_text_file(__file__) \ @@ -24,7 +25,7 @@ def test_word_count(): def test_simple_word_count(): - ray.init(_load_code_from_local=True) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder() \ .build() sink_file = "/tmp/ray_streaming_test_simple_word_count.txt"