[Multi-tenancy] Delete flag enable_multi_tenancy and remove old code path (#10573)

This commit is contained in:
Kai Yang
2020-12-10 19:01:40 +08:00
committed by GitHub
parent d681991773
commit e3b5deb741
47 changed files with 279 additions and 530 deletions
+20 -14
View File
@@ -21,26 +21,34 @@ Clusters are started with the :ref:`Ray Cluster Launcher <ref-automatic-cluster>
You can also create a Ray cluster using a standard cluster manager such as :ref:`Kubernetes <ray-k8s-deploy>`, :ref:`YARN <ray-yarn-deploy>`, or :ref:`SLURM <ray-slurm-deploy>`.
After a cluster is started, you need to connect your program to the Ray cluster.
After a cluster is started, you need to connect your program to the Ray cluster by starting a driver process on the same node as where you ran ``ray start``:
.. tabs::
.. group-tab:: python
.. code-tab:: python
You can connect to this Ray runtime by starting a Python process that calls the following on the same node as where you ran ``ray start``:
.. code-block:: python
# This must
import ray
ray.init(address='auto')
# This must
import ray
ray.init(address='auto')
.. group-tab:: java
If you want to run Java code, you need to specify the classpath via the ``--code-search-path`` option. See :ref:`code_search_path` for more details.
.. code-block:: java
import io.ray.api.Ray;
public class MyRayApp {
public static void main(String[] args) {
Ray.init();
...
}
}
.. code-block:: bash
$ ray start ... --code-search-path=/path/to/jars
java -classpath <classpath> \
-Dray.address=<address> \
<classname> <args>
and then the rest of your script should be able to leverage Ray as a distributed framework!
@@ -74,8 +82,6 @@ The most preferable way to run a Ray cluster is via the :ref:`Ray Cluster Launch
This section assumes that you have a list of machines and that the nodes in the cluster can communicate with each other. It also assumes that Ray is installed
on each machine. To install Ray, follow the `installation instructions`_.
To configure the Ray cluster to run Java code, you need to add the ``--code-search-path`` option. See :ref:`code_search_path` for more details.
.. _`installation instructions`: http://docs.ray.io/en/master/installation.html
Starting Ray on each machine
@@ -199,7 +205,7 @@ To run a distributed Ray program, you'll need to execute your program on the sam
.. code-block:: bash
java -classpath /path/to/jars/ \
java -classpath <classpath> \
-Dray.address=<address> \
<classname> <args>
+21 -6
View File
@@ -245,24 +245,32 @@ Java Applications
Code Search Path
~~~~~~~~~~~~~~~~
If you want to run a Java application in cluster mode, you must first run ``ray start`` to start the Ray cluster. In addition to any ``ray start`` parameters mentioned above, you must add ``--code-search-path`` to tell Ray where to load jars when starting Java workers. Your jar files must be distributed to all nodes of the Ray cluster before running your code, and this parameter must be set on both the head node and non-head nodes.
If you want to run a Java application in a multi-node cluster, you must specify the code search path in your driver. The code search path is to tell Ray where to load jars when starting Java workers. Your jar files must be distributed to the same path(s) on all nodes of the Ray cluster before running your code.
.. code-block:: bash
$ ray start ... --code-search-path=/path/to/jars
$ java -classpath <classpath> \
-Dray.address=<address> \
-Dray.job.code-search-path=/path/to/jars/ \
<classname> <args>
The ``/path/to/jars`` here points to a directory which contains jars. All jars in the directory will be loaded by workers. You can also provide multiple directories for this parameter.
The ``/path/to/jars/`` here points to a directory which contains jars. All jars in the directory will be loaded by workers. You can also provide multiple directories for this parameter.
.. code-block:: bash
$ ray start ... --code-search-path=/path/to/jars1:/path/to/jars2:/path/to/pys1:/path/to/pys2
$ java -classpath <classpath> \
-Dray.address=<address> \
-Dray.job.code-search-path=/path/to/jars1:/path/to/jars2:/path/to/pys1:/path/to/pys2 \
<classname> <args>
Code search path is also used for loading Python code if it's specified. This is required for :ref:`cross_language`. If code search path is specified, you can only run Python remote functions which can be found in the code search path.
You don't need to configure code search path if you run a Java application in a single-node cluster.
You don't need to configure code search path if you run a Java application in single machine mode.
See ``ray.job.code-search-path`` under :ref:`Driver Options <java-driver-options>` for more information.
.. note:: Currently we don't provide a way to configure Ray when running a Java application in single machine mode. If you need to configure Ray, run ``ray start`` to start the Ray cluster first.
.. _java-driver-options:
Driver Options
~~~~~~~~~~~~~~
@@ -289,4 +297,11 @@ The list of available driver options:
- Type: ``Boolean``
- Default: ``false``
- ``ray.job.code-search-path``
- The paths for Java workers to load code from. Currently only directories are supported. You can specify one or more directories split by a ``:``. You don't need to configure code search path if you run a Java application in single machine mode or local mode. Code search path is also used for loading Python code if it's specified. This is required for :ref:`cross_language`. If code search path is specified, you can only run Python remote functions which can be found in the code search path.
- Type: ``String``
- Default: empty string.
- Example: ``/path/to/jars1:/path/to/jars2:/path/to/pys1:/path/to/pys2``
.. _`Apache Arrow`: https://arrow.apache.org/
+32 -6
View File
@@ -5,20 +5,46 @@ Cross-language programming
This page will show you how to use Ray's cross-language programming feature.
Setup the cluster
Setup the driver
-----------------
We need to set the ``--code-search-path`` option on ``ray start`` command. See :ref:`code_search_path` for more details.
We need to set :ref:`code_search_path` in your driver.
.. code-block:: bash
.. tabs::
ray start ... --code-search-path=/path/to/code
.. group-tab:: Python
.. code-block:: python
ray.init(job_config=ray.job_config.JobConfig(code_search_path="/path/to/code"))
.. group-tab:: Java
.. code-block:: bash
java -classpath <classpath> \
-Dray.address=<address> \
-Dray.job.code-search-path=/path/to/code/ \
<classname> <args>
You may want to include multiple directories to load both Python and Java code for workers, if they are placed in different directories.
.. code-block:: bash
.. tabs::
ray start ... --code-search-path=/path/to/jars:/path/to/pys
.. group-tab:: Python
.. code-block:: python
ray.init(job_config=ray.job_config.JobConfig(code_search_path="/path/to/jars:/path/to/pys"))
.. group-tab:: Java
.. code-block:: bash
java -classpath <classpath> \
-Dray.address=<address> \
-Dray.job.code-search-path=/path/to/jars:/path/to/pys \
<classname> <args>
Python calling Java
-------------------
+19 -10
View File
@@ -125,25 +125,34 @@ Use ``ray start`` from the CLI to start a 1 node ray runtime on a machine. This
...
You can connect to this Ray runtime by starting a driver process on the same node as where you ran ``ray start``:
.. tabs::
.. group-tab:: python
.. code-tab:: python
You can connect to this Ray runtime by starting a Python process that calls the following on the same node as where you ran ``ray start``:
.. code-block:: python
# This must
import ray
ray.init(address='auto')
# This must
import ray
ray.init(address='auto')
.. group-tab:: java
.. code-block:: java
If you want to run Java code, you need to specify the classpath via the ``--code-search-path`` option. See :ref:`code_search_path` for more details.
import io.ray.api.Ray;
public class MyRayApp {
public static void main(String[] args) {
Ray.init();
...
}
}
.. code-block:: bash
$ ray start ... --code-search-path=/path/to/jars
java -classpath <classpath> \
-Dray.address=<address> \
<classname> <args>
You can connect other nodes to the head node, creating a Ray cluster by also calling ``ray start`` on those nodes. See :ref:`manual-cluster` for more details. Calling ``ray.init(address="auto")`` on any of the cluster machines will connect to the ray cluster.
@@ -13,7 +13,6 @@ import io.ray.runtime.generated.Common.WorkerType;
import io.ray.runtime.util.NetworkUtil;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -174,34 +173,12 @@ public class RayConfig {
if (config.hasPath("ray.job.code-search-path")) {
codeSearchPathString = config.getString("ray.job.code-search-path");
}
if (!StringUtils.isEmpty(codeSearchPathString)) {
codeSearchPath = Arrays.asList(codeSearchPathString.split(":"));
} else {
codeSearchPath = Collections.emptyList();
if (StringUtils.isEmpty(codeSearchPathString)) {
codeSearchPathString = System.getProperty("java.class.path");
}
codeSearchPath = Arrays.asList(codeSearchPathString.split(":"));
boolean enableMultiTenancy;
if (config.hasPath("ray.raylet.config.enable_multi_tenancy")) {
enableMultiTenancy =
Boolean.valueOf(config.getString("ray.raylet.config.enable_multi_tenancy"));
} else {
String envString = System.getenv("RAY_ENABLE_MULTI_TENANCY");
if (StringUtils.isNotBlank(envString)) {
enableMultiTenancy = "1".equals(envString);
} else {
enableMultiTenancy = true; // Default value
}
}
if (!enableMultiTenancy) {
if (!isDriver) {
numWorkersPerProcess = config.getInt("ray.raylet.config.num_workers_per_process_java");
} else {
numWorkersPerProcess = 1; // Actually this value isn't used in RayNativeRuntime.
}
} else {
numWorkersPerProcess = config.getInt("ray.job.num-java-workers-per-process");
}
numWorkersPerProcess = config.getInt("ray.job.num-java-workers-per-process");
headArgs = config.getStringList("ray.head-args");
@@ -5,7 +5,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.ray.runtime.config.RayConfig;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -31,12 +30,6 @@ public class RunManager {
*/
public static void startRayHead(RayConfig rayConfig) {
LOGGER.debug("Starting ray runtime @ {}.", rayConfig.nodeIp);
String codeSearchPath;
if (!rayConfig.codeSearchPath.isEmpty()) {
codeSearchPath = Joiner.on(File.pathSeparator).join(rayConfig.codeSearchPath);
} else {
codeSearchPath = System.getProperty("java.class.path");
}
List<String> command = new ArrayList<>();
command.add("ray");
command.add("start");
@@ -44,7 +37,6 @@ public class RunManager {
command.add("--redis-password");
command.add(rayConfig.redisPassword);
command.add("--system-config=" + new Gson().toJson(rayConfig.rayletConfigParameters));
command.add("--code-search-path=" + codeSearchPath);
command.addAll(rayConfig.headArgs);
String output;
try {
+2 -2
View File
@@ -39,7 +39,7 @@ echo "Build test jar."
bazel build //java:all_tests_deploy.jar
# Enable multi-worker feature in Java test
TEST_ARGS=(-Dray.raylet.config.num_workers_per_process_java=10 -Dray.job.num-java-workers-per-process=10)
TEST_ARGS=(-Dray.job.num-java-workers-per-process=10)
echo "Running tests under cluster mode."
# TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests,
@@ -57,7 +57,7 @@ case "${OSTYPE}" in
darwin*) ip=$(ipconfig getifaddr en0);;
*) echo "Can't get ip address for ${OSTYPE}"; exit 1;;
esac
RAY_BACKEND_LOG_LEVEL=debug ray start --head --port=6379 --redis-password=123456 --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar"
RAY_BACKEND_LOG_LEVEL=debug ray start --head --port=6379 --redis-password=123456
RAY_BACKEND_LOG_LEVEL=debug java -cp bazel-bin/java/all_tests_deploy.jar -Dray.address="$ip:6379"\
-Dray.redis.password='123456' -Dray.job.code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" io.ray.test.MultiDriverTest
ray stop
@@ -30,13 +30,13 @@ public class FailureTest extends BaseTest {
// This is needed by `testGetThrowsQuicklyWhenFoundException`.
// Set one worker per process. Otherwise, if `badFunc2` and `slowFunc` run in the same
// process, `sleep` will delay `System.exit`.
oldNumWorkersPerProcess = System.getProperty("ray.raylet.config.num_workers_per_process_java");
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
oldNumWorkersPerProcess = System.getProperty("ray.job.num-java-workers-per-process");
System.setProperty("ray.job.num-java-workers-per-process", "1");
}
@AfterClass
public void tearDown() {
System.setProperty("ray.raylet.config.num_workers_per_process_java", oldNumWorkersPerProcess);
System.setProperty("ray.job.num-java-workers-per-process", oldNumWorkersPerProcess);
}
public static int badFunc() {
@@ -14,7 +14,6 @@ public class JobConfigTest extends BaseTest {
@BeforeClass
public void setupJobConfig() {
System.setProperty("ray.raylet.config.enable_multi_tenancy", "true");
oldNumWorkersPerProcess = System.getProperty("ray.job.num-java-workers-per-process");
System.setProperty("ray.job.num-java-workers-per-process", "3");
System.setProperty("ray.job.jvm-options.0", "-DX=999");
@@ -25,7 +24,6 @@ public class JobConfigTest extends BaseTest {
@AfterClass
public void tearDownJobConfig() {
System.clearProperty("ray.raylet.config.enable_multi_tenancy");
System.setProperty("ray.job.num-java-workers-per-process", oldNumWorkersPerProcess);
System.clearProperty("ray.job.jvm-options.0");
System.clearProperty("ray.job.jvm-options.1");
@@ -18,13 +18,13 @@ public class KillActorTest extends BaseTest {
@BeforeClass
public void setUp() {
oldNumWorkersPerProcess = System.getProperty("ray.raylet.config.num_workers_per_process_java");
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
oldNumWorkersPerProcess = System.getProperty("ray.job.num-java-workers-per-process");
System.setProperty("ray.job.num-java-workers-per-process", "1");
}
@AfterClass
public void tearDown() {
System.setProperty("ray.raylet.config.num_workers_per_process_java", oldNumWorkersPerProcess);
System.setProperty("ray.job.num-java-workers-per-process", oldNumWorkersPerProcess);
}
public static class HangActor {
@@ -15,8 +15,6 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = {"cluster"})
@@ -27,16 +25,6 @@ public class MultiDriverTest extends BaseTest {
private static final int ACTOR_COUNT_PER_DRIVER = 10;
private static final String PID_LIST_PREFIX = "PID: ";
@BeforeClass
public void setUpClass() {
System.setProperty("ray.raylet.config.enable_multi_tenancy", "true");
}
@AfterClass
public void tearDownClass() {
System.clearProperty("ray.raylet.config.enable_multi_tenancy");
}
static int getPid() {
return SystemUtil.pid();
}
+12 -26
View File
@@ -136,7 +136,7 @@ def find_redis_address(address=None):
# --redis_address=123.456.78.910 --node_ip_address=123.456.78.910
# --raylet_socket_name=... --store_socket_name=... --object_manager_port=0
# --min_worker_port=10000 --max_worker_port=10999
# --node_manager_port=58578 --redis_port=6379 --num_initial_workers=8
# --node_manager_port=58578 --redis_port=6379
# --maximum_startup_concurrency=8
# --static_resource_list=node:123.456.78.910,1.0,object_store_memory,66
# --config_list=plasma_store_as_thread,True
@@ -1253,13 +1253,11 @@ def start_raylet(redis_address,
stderr_file=None,
config=None,
java_worker_options=None,
load_code_from_local=False,
huge_pages=False,
fate_share=None,
socket_to_use=None,
head_node=False,
start_initial_python_workers_for_first_job=False,
code_search_path=None):
start_initial_python_workers_for_first_job=False):
"""Start a raylet, which is a combined local scheduler and object manager.
Args:
@@ -1296,9 +1294,6 @@ def start_raylet(redis_address,
config (dict|None): Optional Raylet configuration that will
override defaults in RayConfig.
java_worker_options (list): The command options for Java worker.
code_search_path (list): Code search path for worker. code_search_path
is added to worker command in non-multi-tenancy mode and job_config
in multi-tenancy mode.
Returns:
ProcessInfo for the process that was started.
"""
@@ -1311,7 +1306,6 @@ def start_raylet(redis_address,
raise ValueError("Cannot use valgrind and profiler at the same time.")
assert resource_spec.resolved()
num_initial_workers = resource_spec.num_cpus
static_resources = resource_spec.to_resource_dict()
# Limit the number of workers that can be started in parallel by the
@@ -1348,7 +1342,6 @@ def start_raylet(redis_address,
raylet_name,
redis_password,
session_dir,
code_search_path,
)
else:
java_worker_command = []
@@ -1368,15 +1361,18 @@ def start_raylet(redis_address,
# Create the command that the Raylet will use to start workers.
start_worker_command = [
sys.executable, worker_path, f"--node-ip-address={node_ip_address}",
sys.executable,
worker_path,
f"--node-ip-address={node_ip_address}",
f"--node-manager-port={node_manager_port}",
f"--object-store-name={plasma_store_name}",
f"--raylet-name={raylet_name}", f"--redis-address={redis_address}",
f"--config-list={config_str}", f"--temp-dir={temp_dir}",
f"--metrics-agent-port={metrics_agent_port}"
f"--raylet-name={raylet_name}",
f"--redis-address={redis_address}",
f"--config-list={config_str}",
f"--temp-dir={temp_dir}",
f"--metrics-agent-port={metrics_agent_port}",
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
]
if code_search_path:
start_worker_command.append(f"--code-search-path={code_search_path}")
if redis_password:
start_worker_command += [f"--redis-password={redis_password}"]
@@ -1391,12 +1387,6 @@ def start_raylet(redis_address,
if max_worker_port is None:
max_worker_port = 0
if code_search_path is not None and len(code_search_path) > 0:
load_code_from_local = True
if load_code_from_local:
start_worker_command += ["--load-code-from-local"]
# Create agent command
agent_command = [
sys.executable,
@@ -1427,7 +1417,6 @@ def start_raylet(redis_address,
f"--node_ip_address={node_ip_address}",
f"--redis_address={gcs_ip_address}",
f"--redis_port={gcs_port}",
f"--num_initial_workers={num_initial_workers}",
f"--maximum_startup_concurrency={maximum_startup_concurrency}",
f"--static_resource_list={resource_argument}",
f"--config_list={config_str}",
@@ -1487,8 +1476,7 @@ def get_ray_jars_dir():
def build_java_worker_command(java_worker_options, redis_address,
node_manager_port, plasma_store_name,
raylet_name, redis_password, session_dir,
code_search_path):
raylet_name, redis_password, session_dir):
"""This method assembles the command used to start a Java worker.
Args:
@@ -1499,7 +1487,6 @@ def build_java_worker_command(java_worker_options, redis_address,
raylet_name (str): The name of the raylet socket to create.
redis_password (str): The password of connect to redis.
session_dir (str): The path of this session.
code_search_path (list): Teh job code search path.
Returns:
The command string for starting Java worker.
"""
@@ -1520,7 +1507,6 @@ def build_java_worker_command(java_worker_options, redis_address,
pairs.append(("ray.home", RAY_HOME))
pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs")))
pairs.append(("ray.session-dir", session_dir))
pairs.append(("ray.job.code-search-path", code_search_path))
command = ["java"] + ["-D{}={}".format(*pair) for pair in pairs]
command += ["RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER"]
+4 -4
View File
@@ -2,7 +2,7 @@
cluster_name: java
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 1
min_workers: 1
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 1
@@ -72,10 +72,10 @@ worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --code-search-path=~/ray-word-count/target
- ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 --code-search-path=ray-word-count/target
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
# To run the program, run `ray exec java.yaml "java -jar ray-word-count/target/ray-word-count-1.0-SNAPSHOT-jar-with-dependencies.jar"`
# To run the program, run `ray exec java.yaml "java -jar ray-word-count/target/ray-word-count-1.0-SNAPSHOT-jar-with-dependencies.jar -Dray.job.code-search-path=ray-word-count/target"`
-4
View File
@@ -51,10 +51,6 @@ cdef extern from "ray/common/ray_config.h" nogil:
uint64_t object_manager_default_chunk_size() const
int num_workers_per_process_python() const
int num_workers_per_process_java() const
uint32_t maximum_gcs_deletion_batch_size() const
int64_t max_direct_call_object_size() const
-8
View File
@@ -88,14 +88,6 @@ cdef class Config:
def object_manager_default_chunk_size():
return RayConfig.instance().object_manager_default_chunk_size()
@staticmethod
def num_workers_per_process_python():
return RayConfig.instance().num_workers_per_process_python()
@staticmethod
def num_workers_per_process_java():
return RayConfig.instance().num_workers_per_process_java()
@staticmethod
def maximum_gcs_deletion_batch_size():
return RayConfig.instance().maximum_gcs_deletion_batch_size()
+1 -7
View File
@@ -339,10 +339,6 @@ class Node:
"""Get the cluster Redis password"""
return self._ray_params.redis_password
@property
def load_code_from_local(self):
return self._ray_params.load_code_from_local
@property
def object_ref_seed(self):
"""Get the seed for deterministic generation of object refs"""
@@ -723,14 +719,12 @@ class Node:
stderr_file=stderr_file,
config=self._config,
java_worker_options=self._ray_params.java_worker_options,
load_code_from_local=self._ray_params.load_code_from_local,
huge_pages=self._ray_params.huge_pages,
fate_share=self.kernel_fate_share,
socket_to_use=self.socket,
head_node=self.head,
start_initial_python_workers_for_first_job=self._ray_params.
start_initial_python_workers_for_first_job,
code_search_path=self._ray_params.code_search_path)
start_initial_python_workers_for_first_job)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
+1 -8
View File
@@ -89,7 +89,6 @@ class RayParams:
contents to Redis.
autoscaling_config: path to autoscaling config file.
java_worker_options (list): The command options for Java worker.
load_code_from_local: Whether load code from local file or from GCS.
metrics_agent_port(int): The port to bind metrics agent.
metrics_export_port(int): The port at which metrics are exposed
through a Prometheus endpoint.
@@ -142,14 +141,12 @@ class RayParams:
include_log_monitor=None,
autoscaling_config=None,
java_worker_options=None,
load_code_from_local=False,
start_initial_python_workers_for_first_job=False,
_system_config=None,
enable_object_reconstruction=False,
metrics_agent_port=None,
metrics_export_port=None,
lru_evict=False,
code_search_path=None):
lru_evict=False):
self.object_ref_seed = object_ref_seed
self.redis_address = redis_address
self.num_cpus = num_cpus
@@ -186,7 +183,6 @@ class RayParams:
self.include_log_monitor = include_log_monitor
self.autoscaling_config = autoscaling_config
self.java_worker_options = java_worker_options
self.load_code_from_local = load_code_from_local
self.metrics_agent_port = metrics_agent_port
self.metrics_export_port = metrics_export_port
self.start_initial_python_workers_for_first_job = (
@@ -195,9 +191,6 @@ class RayParams:
self._lru_evict = lru_evict
self._enable_object_reconstruction = enable_object_reconstruction
self._check_usage()
self.code_search_path = code_search_path
if code_search_path is None:
self.code_search_path = []
# Set the internal config options for LRU eviction.
if lru_evict:
+1 -17
View File
@@ -389,25 +389,12 @@ def debug(address):
default=None,
type=str,
help="Overwrite the options to start Java workers.")
@click.option(
"--code-search-path",
default=None,
hidden=True,
type=str,
help="A list of directories or jar files separated by colon that specify "
"the search path for user code. This will be used as `CLASSPATH` in "
"Java and `PYTHONPATH` in Python.")
@click.option(
"--system-config",
default=None,
hidden=True,
type=json.loads,
help="Override system configuration defaults.")
@click.option(
"--load-code-from-local",
is_flag=True,
default=False,
help="Specify whether load code from local file or GCS serialization.")
@click.option(
"--lru-evict",
is_flag=True,
@@ -436,8 +423,7 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
head, include_dashboard, dashboard_host, dashboard_port, block,
plasma_directory, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, java_worker_options, load_code_from_local,
code_search_path, system_config, lru_evict,
temp_dir, java_worker_options, system_config, lru_evict,
enable_object_reconstruction, metrics_export_port, log_style,
log_color, verbose):
"""Start Ray processes manually on the local machine."""
@@ -496,8 +482,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
dashboard_host=dashboard_host,
dashboard_port=dashboard_port,
java_worker_options=java_worker_options,
load_code_from_local=load_code_from_local,
code_search_path=code_search_path,
_system_config=system_config,
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction,
+3 -2
View File
@@ -1,4 +1,5 @@
import pytest
import sys
import ray
import ray.cluster_utils
@@ -6,7 +7,7 @@ import ray.test_utils
def test_cross_language_raise_kwargs(shutdown_only):
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
with pytest.raises(Exception, match="kwargs"):
ray.java_function("a", "b").remote(x="arg1")
@@ -16,7 +17,7 @@ def test_cross_language_raise_kwargs(shutdown_only):
def test_cross_language_raise_exception(shutdown_only):
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
class PythonObject(object):
pass
+5 -9
View File
@@ -109,6 +109,7 @@ class Worker:
# by the worker should drop into the debugger at the specified
# breakpoint ID.
self.debugger_get_breakpoint = b""
self._load_code_from_local = False
@property
def connected(self):
@@ -122,7 +123,7 @@ class Worker:
@property
def load_code_from_local(self):
self.check_connected()
return self.node.load_code_from_local
return self._load_code_from_local
@property
def current_job_id(self):
@@ -222,6 +223,9 @@ class Worker:
"""
self.mode = mode
def set_load_code_from_local(self, load_code_from_local):
self._load_code_from_local = load_code_from_local
def put_object(self, value, object_ref=None, pin_object=True):
"""Put value in the local object store with object reference `object_ref`.
@@ -489,9 +493,7 @@ def init(
_memory=None,
_redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
_java_worker_options=None,
_code_search_path=None,
_temp_dir=None,
_load_code_from_local=False,
_lru_evict=False,
_metrics_export_port=None,
_system_config=None):
@@ -579,10 +581,7 @@ def init(
_temp_dir (str): If provided, specifies the root temporary
directory for the Ray process. Defaults to an OS-specific
conventional location, e.g., "/tmp/ray".
_load_code_from_local: Whether code should be loaded from a local
module or from the GCS.
_java_worker_options: Overwrite the options to start Java workers.
_code_search_path (list): Java classpath or python import path.
_lru_evict (bool): If True, when an object store is full, it will evict
objects in LRU order to make more space and when under memory
pressure, ray.ObjectLostError may be thrown. If False, then
@@ -701,9 +700,7 @@ def init(
redis_max_memory=_redis_max_memory,
plasma_store_socket_name=None,
temp_dir=_temp_dir,
load_code_from_local=_load_code_from_local,
java_worker_options=_java_worker_options,
code_search_path=_code_search_path,
start_initial_python_workers_for_first_job=True,
_system_config=_system_config,
lru_evict=_lru_evict,
@@ -749,7 +746,6 @@ def init(
redis_password=_redis_password,
object_ref_seed=None,
temp_dir=_temp_dir,
load_code_from_local=_load_code_from_local,
_system_config=_system_config,
lru_evict=_lru_evict,
enable_object_reconstruction=_enable_object_reconstruction,
+3 -1
View File
@@ -145,11 +145,14 @@ if __name__ == "__main__":
raylet_ip_address = args.node_ip_address
code_search_path = args.code_search_path
load_code_from_local = False
if code_search_path is not None:
load_code_from_local = True
for p in code_search_path.split(":"):
if os.path.isfile(p):
p = os.path.dirname(p)
sys.path.append(p)
ray.worker.global_worker.set_load_code_from_local(load_code_from_local)
ray_params = RayParams(
node_ip_address=args.node_ip_address,
@@ -160,7 +163,6 @@ if __name__ == "__main__":
plasma_store_socket_name=args.object_store_name,
raylet_socket_name=args.raylet_name,
temp_dir=args.temp_dir,
load_code_from_local=args.load_code_from_local,
metrics_agent_port=args.metrics_agent_port,
)
-14
View File
@@ -196,15 +196,6 @@ RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 5 * 1024 * 1024)
/// excessive memory usage during object broadcast to many receivers.
RAY_CONFIG(uint64_t, object_manager_max_bytes_in_flight, 2L * 1024 * 1024 * 1024)
/// Number of workers per Python worker process
RAY_CONFIG(int, num_workers_per_process_python, 1)
/// Number of workers per Java worker process
RAY_CONFIG(int, num_workers_per_process_java, 1)
/// Number of workers per CPP worker process
RAY_CONFIG(int, num_workers_per_process_cpp, 1)
/// Maximum number of ids in one batch to send to GCS to delete keys.
RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000)
@@ -301,11 +292,6 @@ RAY_CONFIG(bool, report_worker_backlog, true)
/// The timeout for synchronous GCS requests in seconds.
RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5)
/// Whether to enable multi tenancy features.
RAY_CONFIG(bool, enable_multi_tenancy,
getenv("RAY_ENABLE_MULTI_TENANCY") == nullptr ||
getenv("RAY_ENABLE_MULTI_TENANCY") == std::string("1"))
/// Whether to enable worker prestarting: https://github.com/ray-project/ray/issues/12052
RAY_CONFIG(bool, enable_worker_prestart,
getenv("RAY_ENABLE_WORKER_PRESTART") == nullptr ||
+1 -2
View File
@@ -148,8 +148,7 @@ std::string TestSetupUtil::StartRaylet(const std::string &store_socket_name,
"--node_manager_port=" + std::to_string(port),
"--node_ip_address=" + node_ip_address, "--redis_address=" + redis_address,
"--redis_port=6379", "--min-worker-port=0", "--max-worker-port=0",
"--num_initial_workers=1", "--maximum_startup_concurrency=10",
"--static_resource_list=" + resource,
"--maximum_startup_concurrency=10", "--static_resource_list=" + resource,
"--python_worker_command=" +
CreateCommandLine({TEST_MOCK_WORKER_EXEC_PATH, store_socket_name,
raylet_socket_name, std::to_string(port)}),
+3 -25
View File
@@ -111,9 +111,7 @@ WorkerContext::WorkerContext(WorkerType worker_type, const WorkerID &worker_id,
const JobID &job_id)
: worker_type_(worker_type),
worker_id_(worker_id),
current_job_id_(RayConfig::instance().enable_multi_tenancy()
? job_id
: (worker_type_ == WorkerType::DRIVER ? job_id : JobID::Nil())),
current_job_id_(job_id),
current_actor_id_(ActorID::Nil()),
current_actor_placement_group_id_(PlacementGroupID::Nil()),
placement_group_capture_child_tasks_(true),
@@ -163,29 +161,17 @@ const std::unordered_map<std::string, std::string>
return override_environment_variables_;
}
void WorkerContext::SetCurrentJobId(const JobID &job_id) {
RAY_CHECK(!RayConfig::instance().enable_multi_tenancy());
current_job_id_ = job_id;
}
void WorkerContext::SetCurrentTaskId(const TaskID &task_id) {
GetThreadContext().SetCurrentTaskId(task_id);
}
void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
GetThreadContext().SetCurrentTask(task_spec);
RAY_CHECK(current_job_id_ == task_spec.JobId());
if (task_spec.IsNormalTask()) {
if (!RayConfig::instance().enable_multi_tenancy()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
}
current_task_is_direct_call_ = true;
override_environment_variables_ = task_spec.OverrideEnvironmentVariables();
} else if (task_spec.IsActorCreationTask()) {
if (!RayConfig::instance().enable_multi_tenancy()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
}
RAY_CHECK(current_actor_id_.IsNil());
current_actor_id_ = task_spec.ActorCreationId();
current_actor_is_direct_call_ = true;
@@ -196,21 +182,13 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks();
override_environment_variables_ = task_spec.OverrideEnvironmentVariables();
} else if (task_spec.IsActorTask()) {
if (!RayConfig::instance().enable_multi_tenancy()) {
RAY_CHECK(current_job_id_ == task_spec.JobId());
}
RAY_CHECK(current_actor_id_ == task_spec.ActorId());
} else {
RAY_CHECK(false);
}
}
void WorkerContext::ResetCurrentTask(const TaskSpecification &task_spec) {
GetThreadContext().ResetCurrentTask();
if (!RayConfig::instance().enable_multi_tenancy() && task_spec.IsNormalTask()) {
SetCurrentJobId(JobID::Nil());
}
}
void WorkerContext::ResetCurrentTask() { GetThreadContext().ResetCurrentTask(); }
std::shared_ptr<const TaskSpecification> WorkerContext::GetCurrentTask() const {
return GetThreadContext().GetCurrentTask();
+1 -5
View File
@@ -42,16 +42,12 @@ class WorkerContext {
const std::unordered_map<std::string, std::string>
&GetCurrentOverrideEnvironmentVariables() const;
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
// TODO(edoakes): remove this once Python core worker uses the task interfaces.
void SetCurrentJobId(const JobID &job_id);
// TODO(edoakes): remove this once Python core worker uses the task interfaces.
void SetCurrentTaskId(const TaskID &task_id);
void SetCurrentTask(const TaskSpecification &task_spec);
void ResetCurrentTask(const TaskSpecification &task_spec);
void ResetCurrentTask();
std::shared_ptr<const TaskSpecification> GetCurrentTask() const;
+2 -10
View File
@@ -69,15 +69,7 @@ ray::JobID GetProcessJobID(const ray::CoreWorkerOptions &options) {
if (options.worker_type == ray::WorkerType::WORKER) {
// For workers, the job ID is assigned by Raylet via an environment variable.
const char *job_id_env = std::getenv(kEnvVarKeyJobId);
// TODO(kfstorm): Use `RAY_CHECK` instead once the `enable_multi_tenancy` flag is
// removed.
// RAY_CHECK(job_id_env);
if (!job_id_env) {
// Multi-tenancy is disabled.
// NOTE(kfstorm): We can't read `RayConfig::instance().enable_multi_tenancy()` here
// because `RayConfig` is not initialized yet.
return ray::JobID::Nil();
}
RAY_CHECK(job_id_env);
return ray::JobID::FromHex(job_id_env);
}
return options.job_id;
@@ -1859,7 +1851,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
if (!options_.is_local_mode) {
SetCurrentTaskId(TaskID::Nil());
worker_context_.ResetCurrentTask(task_spec);
worker_context_.ResetCurrentTask();
}
{
absl::MutexLock lock(&mutex_);
+1 -1
View File
@@ -198,7 +198,7 @@ struct CoreWorkerOptions {
/// thread with a worker. You can obtain the worker ID via
/// `CoreWorkerProcess::GetCoreWorker()->GetWorkerID()`. Currently a Java worker process
/// starts multiple workers by default, but can be configured to start only 1 worker by
/// overwriting the internal config `num_workers_per_process_java`.
/// speicifying `num_java_workers_per_process` in the job config.
///
/// If only 1 worker is started (either because the worker type is driver, or the
/// `num_workers` in `CoreWorkerOptions` is set to 1), all threads will be automatically
+2 -1
View File
@@ -620,8 +620,9 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
TaskSpecification task_spec;
size_t num_returns = 3;
task_spec.GetMutableMessage().set_job_id(job_id.Binary());
task_spec.GetMutableMessage().set_num_returns(num_returns);
context.ResetCurrentTask(task_spec);
context.ResetCurrentTask();
context.SetCurrentTask(task_spec);
ASSERT_EQ(context.GetCurrentTaskID(), task_spec.TaskId());
-3
View File
@@ -38,7 +38,6 @@ DEFINE_int32(max_worker_port, 0,
"The highest port that workers' gRPC servers will bind on.");
DEFINE_string(worker_port_list, "",
"An explicit list of ports that workers' gRPC servers will bind on.");
DEFINE_int32(num_initial_workers, 0, "Number of initial workers.");
DEFINE_int32(num_initial_python_workers_for_first_job, 0,
"Number of initial Python workers for the first job.");
DEFINE_int32(maximum_startup_concurrency, 1, "Maximum startup concurrency");
@@ -78,7 +77,6 @@ int main(int argc, char *argv[]) {
const int min_worker_port = static_cast<int>(FLAGS_min_worker_port);
const int max_worker_port = static_cast<int>(FLAGS_max_worker_port);
const std::string worker_port_list = FLAGS_worker_port_list;
const int num_initial_workers = static_cast<int>(FLAGS_num_initial_workers);
const int num_initial_python_workers_for_first_job =
static_cast<int>(FLAGS_num_initial_python_workers_for_first_job);
const int maximum_startup_concurrency =
@@ -183,7 +181,6 @@ int main(int argc, char *argv[]) {
<< node_manager_config.resource_config.ToString();
node_manager_config.node_manager_address = node_ip_address;
node_manager_config.node_manager_port = node_manager_port;
node_manager_config.num_initial_workers = num_initial_workers;
node_manager_config.num_workers_soft_limit = num_cpus;
node_manager_config.num_initial_python_workers_for_first_job =
num_initial_python_workers_for_first_job;
+11 -24
View File
@@ -133,7 +133,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(
io_service, config.num_initial_workers, config.num_workers_soft_limit,
io_service, config.num_workers_soft_limit,
config.num_initial_python_workers_for_first_job,
config.maximum_startup_concurrency, config.min_worker_port,
config.max_worker_port, config.worker_ports, gcs_client_,
@@ -345,15 +345,13 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_
RAY_CHECK(!job_data.is_dead());
worker_pool_.HandleJobStarted(job_id, job_data.config());
if (RayConfig::instance().enable_multi_tenancy()) {
// Tasks of this job may already arrived but failed to pop a worker because the job
// config is not local yet. So we trigger dispatching again here to try to
// reschedule these tasks.
if (new_scheduler_enabled_) {
ScheduleAndDispatch();
} else {
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
// Tasks of this job may already arrived but failed to pop a worker because the job
// config is not local yet. So we trigger dispatching again here to try to
// reschedule these tasks.
if (new_scheduler_enabled_) {
ScheduleAndDispatch();
} else {
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
}
@@ -1193,8 +1191,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
std::string worker_ip_address = string_from_flatbuf(*message->ip_address());
// TODO(suquark): Use `WorkerType` in `common.proto` without type converting.
rpc::WorkerType worker_type = static_cast<rpc::WorkerType>(message->worker_type());
if ((RayConfig::instance().enable_multi_tenancy() &&
(worker_type != rpc::WorkerType::SPILL_WORKER &&
if (((worker_type != rpc::WorkerType::SPILL_WORKER &&
worker_type != rpc::WorkerType::RESTORE_WORKER)) ||
worker_type == rpc::WorkerType::DRIVER) {
RAY_CHECK(!job_id.IsNil());
@@ -1627,9 +1624,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr));
}
// Prestart optimization is only needed when multi-tenancy is on.
if (RayConfig::instance().enable_multi_tenancy() &&
RayConfig::instance().enable_worker_prestart()) {
if (RayConfig::instance().enable_worker_prestart()) {
auto task_spec = task.GetTaskSpecification();
worker_pool_.PrestartWorkers(task_spec, request.backlog_size());
}
@@ -2522,12 +2517,6 @@ bool NodeManager::FinishAssignedTask(const std::shared_ptr<WorkerInterface> &wor
task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId());
task_dependency_manager_.TaskCanceled(task_id);
if (!RayConfig::instance().enable_multi_tenancy()) {
// Unset the worker's assigned job Id if this is not an actor.
if (!spec.IsActorCreationTask()) {
worker.AssignJobId(JobID::Nil());
}
}
if (!spec.IsActorCreationTask()) {
// Unset the worker's assigned task. We keep the assigned task ID for
// direct actor creation calls because this ID is used later if the actor
@@ -2776,9 +2765,7 @@ void NodeManager::FinishAssignTask(const std::shared_ptr<WorkerInterface> &worke
// We successfully assigned the task to the worker.
worker->AssignTaskId(spec.TaskId());
worker->SetOwnerAddress(spec.CallerAddress());
if (!RayConfig::instance().enable_multi_tenancy()) {
worker->AssignJobId(spec.JobId());
}
RAY_CHECK(worker->GetAssignedJobId() == spec.JobId());
// TODO(swang): For actors with multiple actor handles, to
// guarantee that tasks are replayed in the same order after a
// failure, we must update the task's execution dependency to be
-2
View File
@@ -70,8 +70,6 @@ struct NodeManagerConfig {
/// An explicit list of open ports that workers started will bind
/// on. This takes precedence over min_worker_port and max_worker_port.
std::vector<int> worker_ports;
/// The initial number of workers to create.
int num_initial_workers;
/// The soft limit of the number of workers.
int num_workers_soft_limit;
/// Number of initial Python workers for the first job.
@@ -40,7 +40,6 @@ class TestObjectManagerBase : public ::testing::Test {
static_resource_conf = {{"CPU", 1}, {"GPU", 1}};
node_manager_config.resource_config =
ray::raylet::ResourceSet(std::move(static_resource_conf));
node_manager_config.num_initial_workers = 0;
// Use a default worker that can execute empty tasks with dependencies.
std::vector<std::string> py_worker_command;
py_worker_command.push_back("python");
@@ -186,9 +186,6 @@ bool ClusterTaskManager::AttemptDispatchWork(const Work &work,
worker->SetAllocatedInstances(allocated_instances);
}
worker->AssignTaskId(spec.TaskId());
if (!RayConfig::instance().enable_multi_tenancy()) {
worker->AssignJobId(spec.JobId());
}
worker->SetAssignedTask(task);
*worker_leased = true;
dispatched = true;
-3
View File
@@ -33,9 +33,6 @@ class MockWorker : public WorkerInterface {
void AssignTaskId(const TaskID &task_id) {}
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
void AssignJobId(const JobID &job_id) {}
void SetAssignedTask(const Task &assigned_task) {}
const std::string IpAddress() const { return address_.ip_address(); }
-5
View File
@@ -111,11 +111,6 @@ const std::unordered_set<TaskID> &Worker::GetBlockedTaskIds() const {
return blocked_task_ids_;
}
void Worker::AssignJobId(const JobID &job_id) {
RAY_CHECK(!RayConfig::instance().enable_multi_tenancy());
assigned_job_id_ = job_id;
}
const JobID &Worker::GetAssignedJobId() const { return assigned_job_id_; }
void Worker::AssignActorId(const ActorID &actor_id) {
-4
View File
@@ -60,8 +60,6 @@ class WorkerInterface {
virtual bool AddBlockedTaskId(const TaskID &task_id) = 0;
virtual bool RemoveBlockedTaskId(const TaskID &task_id) = 0;
virtual const std::unordered_set<TaskID> &GetBlockedTaskIds() const = 0;
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
virtual void AssignJobId(const JobID &job_id) = 0;
virtual const JobID &GetAssignedJobId() const = 0;
virtual void AssignActorId(const ActorID &actor_id) = 0;
virtual const ActorID &GetActorId() const = 0;
@@ -151,8 +149,6 @@ class Worker : public WorkerInterface {
bool AddBlockedTaskId(const TaskID &task_id);
bool RemoveBlockedTaskId(const TaskID &task_id);
const std::unordered_set<TaskID> &GetBlockedTaskIds() const;
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
void AssignJobId(const JobID &job_id);
const JobID &GetAssignedJobId() const;
void AssignActorId(const ActorID &actor_id);
const ActorID &GetActorId() const;
+54 -135
View File
@@ -54,8 +54,7 @@ namespace ray {
namespace raylet {
WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
int num_workers_soft_limit,
WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers_soft_limit,
int num_initial_python_workers_for_first_job,
int maximum_startup_concurrency, int min_worker_port,
int max_worker_port, const std::vector<int> &worker_ports,
@@ -83,33 +82,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
for (const auto &entry : worker_commands) {
// Initialize the pool state for this language.
auto &state = states_by_lang_[entry.first];
if (!RayConfig::instance().enable_multi_tenancy()) {
switch (entry.first) {
case Language::PYTHON:
state.num_workers_per_process =
RayConfig::instance().num_workers_per_process_python();
break;
case Language::JAVA:
state.num_workers_per_process =
RayConfig::instance().num_workers_per_process_java();
break;
case Language::CPP:
state.num_workers_per_process =
RayConfig::instance().num_workers_per_process_cpp();
break;
default:
RAY_LOG(FATAL) << "The number of workers per process for "
<< Language_Name(entry.first) << " worker is not set.";
}
RAY_CHECK(state.num_workers_per_process > 0)
<< "Number of workers per process of language " << Language_Name(entry.first)
<< " must be positive.";
state.multiple_for_warning =
std::max(state.num_workers_per_process,
std::max(num_workers, maximum_startup_concurrency));
} else {
state.multiple_for_warning = maximum_startup_concurrency;
}
state.multiple_for_warning = maximum_startup_concurrency;
// Set worker command for this language.
state.worker_command = entry.second;
RAY_CHECK(!state.worker_command.empty()) << "Worker command must not be empty.";
@@ -131,27 +104,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
free_ports_->push(port);
}
}
if (!RayConfig::instance().enable_multi_tenancy()) {
Start(num_workers);
} else {
ScheduleIdleWorkerKilling();
}
}
void WorkerPool::Start(int num_workers) {
RAY_CHECK(!RayConfig::instance().enable_multi_tenancy());
for (auto &entry : states_by_lang_) {
if (entry.first == Language::JAVA) {
// Disable initial workers for Java.
continue;
}
auto &state = entry.second;
int num_worker_processes = static_cast<int>(
std::ceil(static_cast<double>(num_workers) / state.num_workers_per_process));
for (int i = 0; i < num_worker_processes; i++) {
StartWorkerProcess(entry.first, rpc::WorkerType::WORKER, JobID::Nil());
}
}
ScheduleIdleWorkerKilling();
}
WorkerPool::~WorkerPool() {
@@ -178,7 +131,7 @@ Process WorkerPool::StartWorkerProcess(
std::vector<std::string> dynamic_options,
std::unordered_map<std::string, std::string> override_environment_variables) {
rpc::JobConfig *job_config = nullptr;
if (RayConfig::instance().enable_multi_tenancy() && !IsIOWorkerType(worker_type)) {
if (!IsIOWorkerType(worker_type)) {
RAY_CHECK(!job_id.IsNil());
auto it = all_jobs_.find(job_id);
if (it == all_jobs_.end()) {
@@ -212,15 +165,12 @@ Process WorkerPool::StartWorkerProcess(
int workers_to_start = 1;
if (dynamic_options.empty()) {
if (!RayConfig::instance().enable_multi_tenancy()) {
workers_to_start = state.num_workers_per_process;
} else if (language == Language::JAVA) {
if (language == Language::JAVA) {
workers_to_start = job_config->num_java_workers_per_process();
}
}
// For non-multi-tenancy mode, job code search path is embedded in worker_command.
if (RayConfig::instance().enable_multi_tenancy() && job_config) {
if (job_config) {
// Note that we push the item to the front of the vector to make
// sure this is the freshest option than others.
if (!job_config->jvm_options().empty()) {
@@ -272,9 +222,6 @@ Process WorkerPool::StartWorkerProcess(
switch (language) {
case Language::JAVA:
for (auto &entry : raylet_config_) {
if (entry.first == "num_workers_per_process_java") {
continue;
}
std::string arg;
arg.append("-Dray.raylet.config.");
arg.append(entry.first);
@@ -282,17 +229,8 @@ Process WorkerPool::StartWorkerProcess(
arg.append(entry.second);
worker_command_args.push_back(arg);
}
if (!RayConfig::instance().enable_multi_tenancy()) {
// The value of `num_workers_per_process_java` may change depends on whether
// dynamic options is empty, so we can't use the value in `RayConfig`. We always
// overwrite the value here.
worker_command_args.push_back(
"-Dray.raylet.config.num_workers_per_process_java=" +
std::to_string(workers_to_start));
} else {
worker_command_args.push_back("-Dray.job.num-java-workers-per-process=" +
std::to_string(workers_to_start));
}
worker_command_args.push_back("-Dray.job.num-java-workers-per-process=" +
std::to_string(workers_to_start));
break;
default:
RAY_LOG(FATAL)
@@ -327,12 +265,12 @@ Process WorkerPool::StartWorkerProcess(
}
ProcessEnvironment env;
if (RayConfig::instance().enable_multi_tenancy() && !IsIOWorkerType(worker_type)) {
if (!IsIOWorkerType(worker_type)) {
// We pass the job ID to worker processes via an environment variable, so we don't
// need to add a new CLI parameter for both Python and Java workers.
env.emplace(kEnvVarKeyJobId, job_id.Hex());
}
if (RayConfig::instance().enable_multi_tenancy() && job_config) {
if (job_config) {
env.insert(job_config->worker_env().begin(), job_config->worker_env().end());
}
@@ -516,18 +454,16 @@ void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker)
io_worker_state.num_starting_io_workers--;
}
if (RayConfig::instance().enable_multi_tenancy()) {
// This is a workaround to finish driver registration after all initial workers are
// registered to Raylet if and only if Raylet is started by a Python driver and the
// job config is not set in `ray.init(...)`.
if (first_job_ == worker->GetAssignedJobId() &&
worker->GetLanguage() == Language::PYTHON) {
if (++first_job_registered_python_worker_count_ ==
first_job_driver_wait_num_python_workers_) {
if (first_job_send_register_client_reply_to_driver_) {
first_job_send_register_client_reply_to_driver_();
first_job_send_register_client_reply_to_driver_ = nullptr;
}
// This is a workaround to finish driver registration after all initial workers are
// registered to Raylet if and only if Raylet is started by a Python driver and the
// job config is not set in `ray.init(...)`.
if (first_job_ == worker->GetAssignedJobId() &&
worker->GetLanguage() == Language::PYTHON) {
if (++first_job_registered_python_worker_count_ ==
first_job_driver_wait_num_python_workers_) {
if (first_job_send_register_client_reply_to_driver_) {
first_job_send_register_client_reply_to_driver_();
first_job_send_register_client_reply_to_driver_ = nullptr;
}
}
}
@@ -554,18 +490,15 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
// Invoke the `send_reply_callback` later to only finish driver
// registration after all initial workers are registered to Raylet.
bool delay_callback = false;
// Multi-tenancy is enabled.
if (RayConfig().instance().enable_multi_tenancy()) {
// If this is the first job.
if (first_job_.IsNil()) {
first_job_ = job_id;
// If the number of Python workers we need to wait is positive.
if (num_initial_python_workers_for_first_job_ > 0) {
delay_callback = true;
// Start initial Python workers for the first job.
for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) {
StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id);
}
// If this is the first job.
if (first_job_.IsNil()) {
first_job_ = job_id;
// If the number of Python workers we need to wait is positive.
if (num_initial_python_workers_for_first_job_ > 0) {
delay_callback = true;
// Start initial Python workers for the first job.
for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) {
StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id);
}
}
}
@@ -695,11 +628,9 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
// Put the worker to the corresponding idle pool.
if (worker->GetActorId().IsNil()) {
state.idle.insert(worker);
if (RayConfig::instance().enable_multi_tenancy()) {
int64_t now = current_time_ms();
idle_of_all_languages_.emplace_back(worker, now);
idle_of_all_languages_map_[worker] = now;
}
int64_t now = current_time_ms();
idle_of_all_languages_.emplace_back(worker, now);
idle_of_all_languages_map_[worker] = now;
} else {
state.idle_actor[worker->GetActorId()] = worker;
}
@@ -858,40 +789,28 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
}
} else if (!task_spec.IsActorTask()) {
// Code path of normal task or actor creation task without dynamic worker options.
if (!RayConfig::instance().enable_multi_tenancy()) {
if (!state.idle.empty()) {
worker = std::move(*state.idle.begin());
state.idle.erase(state.idle.begin());
} else {
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
JobID::Nil());
}
} else {
// Find an available worker which is already assigned to this job.
// Try to pop the most recently pushed worker.
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
it++) {
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
it->first->GetAssignedJobId() != task_spec.JobId()) {
continue;
}
state.idle.erase(it->first);
// We can't erase a reverse_iterator.
auto lit = it.base();
lit--;
worker = std::move(lit->first);
idle_of_all_languages_.erase(lit);
idle_of_all_languages_map_.erase(worker);
break;
}
if (worker == nullptr) {
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId());
// Find an available worker which is already assigned to this job.
// Try to pop the most recently pushed worker.
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
it++) {
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
it->first->GetAssignedJobId() != task_spec.JobId()) {
continue;
}
state.idle.erase(it->first);
// We can't erase a reverse_iterator.
auto lit = it.base();
lit--;
worker = std::move(lit->first);
idle_of_all_languages_.erase(lit);
idle_of_all_languages_map_.erase(worker);
break;
}
if (worker == nullptr) {
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId());
}
} else {
// Code path of actor task.
@@ -907,7 +826,7 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
WarnAboutSize();
}
if (RayConfig::instance().enable_multi_tenancy() && worker) {
if (worker) {
RAY_CHECK(worker->GetAssignedJobId() == task_spec.JobId());
}
return worker;
+4 -13
View File
@@ -95,7 +95,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// process should create and register the specified number of workers, and add them to
/// the pool.
///
/// \param num_workers The number of workers to start, per language.
/// \param num_workers_soft_limit The soft limit of the number of workers.
/// \param num_initial_python_workers_for_first_job The number of initial Python
/// workers for the first job.
@@ -113,8 +112,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// \param raylet_config The raylet config list of this node.
/// \param starting_worker_timeout_callback The callback that will be triggered once
/// it times out to start a worker.
WorkerPool(boost::asio::io_service &io_service, int num_workers,
int num_workers_soft_limit, int num_initial_python_workers_for_first_job,
WorkerPool(boost::asio::io_service &io_service, int num_workers_soft_limit,
int num_initial_python_workers_for_first_job,
int maximum_startup_concurrency, int min_worker_port, int max_worker_port,
const std::vector<int> &worker_ports,
std::shared_ptr<gcs::GcsClient> gcs_client,
@@ -251,7 +250,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// Try to prestart a number of workers suitable the given task spec. Prestarting
/// is needed since core workers request one lease at a time, if starting is slow,
/// then it means it takes a long time to scale up when multi-tenancy is on.
/// then it means it takes a long time to scale up.
///
/// \param task_spec The returned worker must be able to execute this task.
/// \param backlog_size The number of tasks in the client backlog of this shape.
@@ -306,7 +305,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
protected:
/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
/// register num_workers_per_process workers, then add them to the pool.
/// register N workers, then add them to the pool.
/// Failure to start the worker process is a fatal error. If too many workers
/// are already being started, then this function will return without starting
/// any workers.
@@ -354,8 +353,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
struct State {
/// The commands and arguments used to start the worker process
std::vector<std::string> worker_command;
/// The number of workers per process.
int num_workers_per_process;
/// The pool of dedicated workers for actor creation tasks
/// with prefix or suffix worker command.
std::unordered_map<TaskID, std::shared_ptr<WorkerInterface>> idle_dedicated_workers;
@@ -392,12 +389,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::unordered_map<Language, State, std::hash<int>> states_by_lang_;
private:
/// Force-start at least num_workers workers for this language. Used for internal and
/// test purpose only.
///
/// \param num_workers The number of workers to start, per language.
void Start(int num_workers);
/// A helper function that returns the reference of the pool state
/// for a given language.
State &GetStateForLanguage(const Language &language);
+48 -83
View File
@@ -36,12 +36,9 @@ class WorkerPoolMock : public WorkerPool {
public:
explicit WorkerPoolMock(boost::asio::io_service &io_service,
const WorkerCommandMap &worker_commands)
: WorkerPool(io_service, 0, POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0,
0, {}, nullptr, worker_commands, {}, []() {}),
last_worker_process_() {
states_by_lang_[ray::Language::JAVA].num_workers_per_process =
NUM_WORKERS_PER_PROCESS_JAVA;
}
: WorkerPool(io_service, POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0,
{}, nullptr, worker_commands, {}, []() {}),
last_worker_process_() {}
~WorkerPoolMock() {
// Avoid killing real processes
@@ -103,14 +100,11 @@ class WorkerPoolMock : public WorkerPool {
std::unordered_map<Process, std::vector<std::string>> worker_commands_by_proc_;
};
class WorkerPoolTest : public ::testing::TestWithParam<bool> {
class WorkerPoolTest : public ::testing::Test {
public:
WorkerPoolTest() : error_message_type_(1), client_call_manager_(io_service_) {
bool enable_multi_tenancy = GetParam();
RayConfig::instance().initialize(
{{"enable_multi_tenancy", std::to_string(enable_multi_tenancy)},
{"num_workers_per_process_java", std::to_string(NUM_WORKERS_PER_PROCESS_JAVA)},
{"object_spilling_config", "mock_config"},
{{"object_spilling_config", "mock_config"},
{"max_io_workers", std::to_string(MAX_IO_WORKER_SIZE)}});
SetWorkerCommands(
{{Language::PYTHON, {"dummy_py_worker_command"}},
@@ -236,16 +230,10 @@ static inline TaskSpecification ExampleTaskSpec(
}
static inline std::string GetNumJavaWorkersPerProcessSystemProperty(int num) {
std::string key;
if (RayConfig::instance().enable_multi_tenancy()) {
key = "ray.job.num-java-workers-per-process";
} else {
key = "ray.raylet.config.num_workers_per_process_java";
}
return std::string("-D") + key + "=" + std::to_string(num);
return std::string("-Dray.job.num-java-workers-per-process=") + std::to_string(num);
}
TEST_P(WorkerPoolTest, CompareWorkerProcessObjects) {
TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) {
typedef Process T;
T a(T::CreateNewDummy()), b(T::CreateNewDummy()), empty = T();
ASSERT_TRUE(empty.IsNull());
@@ -259,7 +247,7 @@ TEST_P(WorkerPoolTest, CompareWorkerProcessObjects) {
ASSERT_TRUE(!std::equal_to<T>()(a, empty));
}
TEST_P(WorkerPoolTest, HandleWorkerRegistration) {
TEST_F(WorkerPoolTest, HandleWorkerRegistration) {
Process proc =
worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID);
std::vector<std::shared_ptr<WorkerInterface>> workers;
@@ -286,59 +274,49 @@ TEST_P(WorkerPoolTest, HandleWorkerRegistration) {
}
}
TEST_P(WorkerPoolTest, HandleUnknownWorkerRegistration) {
TEST_F(WorkerPoolTest, HandleUnknownWorkerRegistration) {
auto worker = CreateWorker(Process(), Language::PYTHON);
auto status = worker_pool_->RegisterWorker(worker, 1234, [](Status, int) {});
ASSERT_FALSE(status.ok());
}
TEST_P(WorkerPoolTest, StartupPythonWorkerProcessCount) {
TEST_F(WorkerPoolTest, StartupPythonWorkerProcessCount) {
TestStartupWorkerProcessCount(Language::PYTHON, 1, {"dummy_py_worker_command"});
}
TEST_P(WorkerPoolTest, StartupJavaWorkerProcessCount) {
TEST_F(WorkerPoolTest, StartupJavaWorkerProcessCount) {
TestStartupWorkerProcessCount(
Language::JAVA, NUM_WORKERS_PER_PROCESS_JAVA,
{"dummy_java_worker_command",
GetNumJavaWorkersPerProcessSystemProperty(NUM_WORKERS_PER_PROCESS_JAVA)});
}
TEST_P(WorkerPoolTest, InitialWorkerProcessCount) {
if (!RayConfig::instance().enable_multi_tenancy()) {
worker_pool_->Start(1);
// Here we try to start only 1 worker for each worker language. But since we disabled
// initial workers for Java, we expect to see only 1 worker which is a Python worker.
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 1);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1);
} else {
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0);
}
TEST_F(WorkerPoolTest, InitialWorkerProcessCount) {
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0);
}
TEST_P(WorkerPoolTest, TestPrestartingWorkers) {
if (RayConfig::instance().enable_multi_tenancy()) {
const auto task_spec = ExampleTaskSpec();
// Prestarts 2 workers.
worker_pool_->PrestartWorkers(task_spec, 2);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 2);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 2);
// Prestarts 1 more worker.
worker_pool_->PrestartWorkers(task_spec, 3);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3);
// No more needed.
worker_pool_->PrestartWorkers(task_spec, 1);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3);
// Capped by soft limit of 5.
worker_pool_->PrestartWorkers(task_spec, 20);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 5);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 5);
}
TEST_F(WorkerPoolTest, TestPrestartingWorkers) {
const auto task_spec = ExampleTaskSpec();
// Prestarts 2 workers.
worker_pool_->PrestartWorkers(task_spec, 2);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 2);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 2);
// Prestarts 1 more worker.
worker_pool_->PrestartWorkers(task_spec, 3);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3);
// No more needed.
worker_pool_->PrestartWorkers(task_spec, 1);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3);
// Capped by soft limit of 5.
worker_pool_->PrestartWorkers(task_spec, 20);
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 5);
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 5);
}
TEST_P(WorkerPoolTest, HandleWorkerPushPop) {
TEST_F(WorkerPoolTest, HandleWorkerPushPop) {
// Try to pop a worker from the empty pool and make sure we don't get one.
std::shared_ptr<WorkerInterface> popped_worker;
const auto task_spec = ExampleTaskSpec();
@@ -365,7 +343,7 @@ TEST_P(WorkerPoolTest, HandleWorkerPushPop) {
ASSERT_EQ(popped_worker, nullptr);
}
TEST_P(WorkerPoolTest, PopActorWorker) {
TEST_F(WorkerPoolTest, PopActorWorker) {
// Create a worker.
auto worker = CreateWorker(Process::CreateNewDummy());
// Add the worker to the pool.
@@ -387,7 +365,7 @@ TEST_P(WorkerPoolTest, PopActorWorker) {
ASSERT_EQ(actor->GetActorId(), actor_id);
}
TEST_P(WorkerPoolTest, PopWorkersOfMultipleLanguages) {
TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) {
// Create a Python Worker, and add it to the pool
auto py_worker = CreateWorker(Process::CreateNewDummy(), Language::PYTHON);
worker_pool_->PushWorker(py_worker);
@@ -405,7 +383,7 @@ TEST_P(WorkerPoolTest, PopWorkersOfMultipleLanguages) {
ASSERT_NE(worker_pool_->PopWorker(java_task_spec), nullptr);
}
TEST_P(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) {
TEST_F(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) {
const std::vector<std::string> java_worker_command = {
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER", "dummy_java_worker_command",
"RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER"};
@@ -426,25 +404,15 @@ TEST_P(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) {
const auto real_command =
worker_pool_->GetWorkerCommand(worker_pool_->LastStartedWorkerProcess());
if (RayConfig::instance().enable_multi_tenancy()) {
ASSERT_EQ(
real_command,
std::vector<std::string>(
{"test_op_0", "test_op_1", "-Dray.job.code-search-path=/test/code_serch_path",
"dummy_java_worker_command", GetNumJavaWorkersPerProcessSystemProperty(1)}));
} else {
ASSERT_EQ(real_command, std::vector<std::string>(
{"test_op_0", "test_op_1", "dummy_java_worker_command",
GetNumJavaWorkersPerProcessSystemProperty(1)}));
}
ASSERT_EQ(
real_command,
std::vector<std::string>(
{"test_op_0", "test_op_1", "-Dray.job.code-search-path=/test/code_serch_path",
"dummy_java_worker_command", GetNumJavaWorkersPerProcessSystemProperty(1)}));
worker_pool_->HandleJobFinished(JOB_ID);
}
TEST_P(WorkerPoolTest, PopWorkerMultiTenancy) {
if (!RayConfig::instance().enable_multi_tenancy()) {
return;
}
TEST_F(WorkerPoolTest, PopWorkerMultiTenancy) {
auto job_id1 = JOB_ID;
auto job_id2 = JobID::FromInt(2);
ASSERT_NE(job_id1, job_id2);
@@ -505,7 +473,7 @@ TEST_P(WorkerPoolTest, PopWorkerMultiTenancy) {
}
}
TEST_P(WorkerPoolTest, MaximumStartupConcurrency) {
TEST_F(WorkerPoolTest, MaximumStartupConcurrency) {
auto task_spec = ExampleTaskSpec();
std::vector<Process> started_processes;
@@ -550,7 +518,7 @@ TEST_P(WorkerPoolTest, MaximumStartupConcurrency) {
ASSERT_EQ(0, worker_pool_->NumWorkerProcessesStarting());
}
TEST_P(WorkerPoolTest, HandleIOWorkersPushPop) {
TEST_F(WorkerPoolTest, HandleIOWorkersPushPop) {
std::unordered_set<std::shared_ptr<WorkerInterface>> spill_pushed_worker;
std::unordered_set<std::shared_ptr<WorkerInterface>> restore_pushed_worker;
auto spill_worker_callback =
@@ -603,7 +571,7 @@ TEST_P(WorkerPoolTest, HandleIOWorkersPushPop) {
ASSERT_EQ(restore_pushed_worker.size(), 1);
}
TEST_P(WorkerPoolTest, MaxIOWorkerSimpleTest) {
TEST_F(WorkerPoolTest, MaxIOWorkerSimpleTest) {
// Make sure max number of spill workers are respected.
auto callback = [](std::shared_ptr<WorkerInterface> worker) {};
std::vector<Process> started_processes;
@@ -633,7 +601,7 @@ TEST_P(WorkerPoolTest, MaxIOWorkerSimpleTest) {
ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 0);
}
TEST_P(WorkerPoolTest, MaxIOWorkerComplicateTest) {
TEST_F(WorkerPoolTest, MaxIOWorkerComplicateTest) {
// Make sure max number of restore workers are respected.
// This test will test a little more complicated scneario.
// For example, it tests scenarios where there are
@@ -685,7 +653,7 @@ TEST_P(WorkerPoolTest, MaxIOWorkerComplicateTest) {
ASSERT_EQ(worker_pool_->NumSpillWorkerStarting(), 0);
}
TEST_P(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) {
TEST_F(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) {
auto callback = [](std::shared_ptr<WorkerInterface> worker) {};
// Run many pop spill/restore workers and make sure the max worker size doesn't exceed.
std::vector<Process> started_restore_processes;
@@ -732,7 +700,7 @@ TEST_P(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) {
ASSERT_EQ(worker_pool_->GetProcessSize(), 2 * MAX_IO_WORKER_SIZE);
}
TEST_P(WorkerPoolTest, DeleteWorkerPushPop) {
TEST_F(WorkerPoolTest, DeleteWorkerPushPop) {
/// Make sure delete workers always pop an I/O worker that has more idle worker in their
/// pools.
// 2 spill worker and 1 restore worker.
@@ -770,9 +738,6 @@ TEST_P(WorkerPoolTest, DeleteWorkerPushPop) {
});
}
INSTANTIATE_TEST_CASE_P(WorkerPoolMultiTenancyTest, WorkerPoolTest,
::testing::Values(true, false));
} // namespace raylet
} // namespace ray
@@ -12,10 +12,8 @@ class ClusterStarter {
static synchronized void startCluster(boolean isLocal) {
Preconditions.checkArgument(!Ray.isInitialized());
if (!isLocal) {
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
System.setProperty("ray.run-mode", "CLUSTER");
} else {
System.clearProperty("ray.raylet.config.num_workers_per_process_java");
System.setProperty("ray.run-mode", "SINGLE_PROCESS");
}
@@ -25,7 +23,6 @@ class ClusterStarter {
public static synchronized void stopCluster() {
// Disconnect to the cluster.
Ray.shutdown();
System.clearProperty("ray.raylet.config.num_workers_per_process_java");
System.clearProperty("ray.run-mode");
}
}
@@ -60,7 +60,6 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable {
Ray.shutdown();
System.setProperty("ray.head-args.0", "--num-cpus=4");
System.setProperty("ray.head-args.1", "--resources={\"RES-A\":4}");
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
System.setProperty("ray.run-mode", "CLUSTER");
System.setProperty("ray.redirect-output", "true");
Ray.init();
@@ -82,7 +81,6 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable {
Ray.shutdown();
System.setProperty("ray.head-args.0", "--num-cpus=4");
System.setProperty("ray.head-args.1", "--resources={\"RES-A\":4}");
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
System.setProperty("ray.run-mode", "CLUSTER");
System.setProperty("ray.redirect-output", "true");
@@ -139,7 +137,6 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable {
Ray.shutdown();
System.setProperty("ray.head-args.0", "--num-cpus=4");
System.setProperty("ray.head-args.1", "--resources={\"RES-A\":4}");
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
System.setProperty("ray.run-mode", "CLUSTER");
System.setProperty("ray.redirect-output", "true");
+2 -1
View File
@@ -1,5 +1,6 @@
import argparse
import logging
import sys
import time
import ray
@@ -65,7 +66,7 @@ if __name__ == "__main__":
args = parser.parse_args()
titles_file = str(args.titles_file)
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder() \
.option(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) \
+3 -1
View File
@@ -1,4 +1,5 @@
import subprocess
import sys
import time
from typing import List
@@ -8,7 +9,8 @@ from ray.streaming import StreamingContext
def test_word_count():
try:
ray.init(_load_code_from_local=True)
ray.init(
job_config=ray.job_config.JobConfig(code_search_path=sys.path))
# time.sleep(10) # for gdb to attach
ctx = StreamingContext.Builder() \
.option("streaming.context-backend.type", "local_file") \
+3 -3
View File
@@ -1,6 +1,7 @@
import json
import os
import subprocess
import sys
import ray
from ray.streaming import StreamingContext
@@ -34,9 +35,8 @@ def test_hybrid_stream():
print("java_worker_options", java_worker_options)
assert not ray.is_initialized()
ray.init(
_load_code_from_local=True,
_java_worker_options=java_worker_options,
_system_config={"num_workers_per_process_java": 1})
job_config=ray.job_config.JobConfig(code_search_path=sys.path),
_java_worker_options=java_worker_options)
sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt"
if os.path.exists(sink_file):
+5 -3
View File
@@ -1,9 +1,11 @@
import sys
import ray
from ray.streaming import StreamingContext
def test_data_stream():
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
stream = ctx.from_values(1, 2, 3)
java_stream = stream.as_java_stream()
@@ -17,7 +19,7 @@ def test_data_stream():
def test_key_data_stream():
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
key_stream = ctx.from_values(
"a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0])
@@ -32,7 +34,7 @@ def test_key_data_stream():
def test_stream_config():
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
stream = ctx.from_values(1, 2, 3)
stream.with_config("k1", "v1")
+2 -1
View File
@@ -1,11 +1,12 @@
import os
import sys
import ray
from ray.streaming import StreamingContext
def test_union_stream():
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder() \
.option("streaming.metrics.reporters", "") \
.build()
+3 -2
View File
@@ -1,11 +1,12 @@
import os
import sys
import ray
from ray.streaming import StreamingContext
from ray.test_utils import wait_for_condition
def test_word_count():
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder() \
.build()
ctx.read_text_file(__file__) \
@@ -24,7 +25,7 @@ def test_word_count():
def test_simple_word_count():
ray.init(_load_code_from_local=True)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder() \
.build()
sink_file = "/tmp/ray_streaming_test_simple_word_count.txt"