diff --git a/ci/long_running_tests/workloads/many_tasks_serialized_ids.py b/ci/long_running_tests/workloads/many_tasks_serialized_ids.py index f1db7a6fa..c754b851f 100644 --- a/ci/long_running_tests/workloads/many_tasks_serialized_ids.py +++ b/ci/long_running_tests/workloads/many_tasks_serialized_ids.py @@ -36,7 +36,7 @@ for i in range(num_nodes): object_store_memory=object_store_memory, redis_max_memory=redis_max_memory, dashboard_host="0.0.0.0", - _internal_config=config, + _internal_config=config if i == 0 else None, ) ray.init(address=cluster.address) diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index d56e8d031..4d8b72c9e 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -56,6 +56,15 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } catch (IOException e) { throw new RuntimeException("Failed to create the log directory.", e); } + + if (rayConfig.getRedisAddress() != null) { + GcsClient tempGcsClient = + new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword); + for (Map.Entry entry : + tempGcsClient.getInternalConfig().entrySet()) { + rayConfig.rayletConfigParameters.put(entry.getKey(), entry.getValue()); + } + } } private static void resetLibraryPath(RayConfig rayConfig) { diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java index 294646f06..6398614b2 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java @@ -93,6 +93,17 @@ public class GcsClient { return new ArrayList<>(nodes.values()); } + public Map getInternalConfig() { + Gcs.StoredConfig storedConfig; + byte[] conf = globalStateAccessor.getInternalConfig(); + try { + storedConfig = Gcs.StoredConfig.parseFrom(conf); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Received invalid internal config protobuf from GCS."); + } + return storedConfig.getConfigMap(); + } + private Map getResourcesForClient(UniqueId clientId) { byte[] resourceMapBytes = globalStateAccessor.getNodeResourceInfo(clientId); Gcs.ResourceMap resourceMap; diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java index a1c67b53d..57e344e03 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java @@ -78,6 +78,14 @@ public class GlobalStateAccessor { } } + public byte[] getInternalConfig() { + synchronized (GlobalStateAccessor.class) { + Preconditions.checkState(globalStateAccessorNativePointer != 0, + "Get internal config when global state accessor have been destroyed."); + return nativeGetInternalConfig(globalStateAccessorNativePointer); + } + } + /** * @return A list of actor info with ActorInfo protobuf schema. */ @@ -135,6 +143,8 @@ public class GlobalStateAccessor { private native byte[] nativeGetNodeResourceInfo(long nativePtr, byte[] nodeId); + private native byte[] nativeGetInternalConfig(long nativePtr); + private native List nativeGetAllActorInfo(long nativePtr); private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId); diff --git a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java index 2c68feac3..55a55ef49 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java @@ -194,7 +194,7 @@ public class RunManager { startGcs(); } startObjectStore(); - startRaylet(); + startRaylet(isHead); LOGGER.info("All processes started @ {}.", rayConfig.nodeIp); } catch (Exception e) { // Clean up started processes. @@ -292,7 +292,7 @@ public class RunManager { return ip + ":" + port; } - private void startRaylet() throws IOException { + private void startRaylet(boolean isHead) throws IOException { int hardwareConcurrency = Runtime.getRuntime().availableProcessors(); int maximumStartupConcurrency = Math.max(1, Math.min(rayConfig.resources.getOrDefault("CPU", 0.0).intValue(), hardwareConcurrency)); @@ -325,7 +325,8 @@ public class RunManager { .collect(Collectors.joining(","))), String.format("--python_worker_command=%s", buildPythonWorkerCommand()), String.format("--java_worker_command=%s", buildWorkerCommand()), - String.format("--redis_password=%s", redisPasswordOption) + String.format("--redis_password=%s", redisPasswordOption), + isHead ? "--head_node" : "" ); startProcess(command, null, "raylet"); diff --git a/python/ray/node.py b/python/ray/node.py index 57e048d2e..b4c9e3bab 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -92,6 +92,11 @@ class Node: "The raylet IP address should only be different than the node " "IP address when connecting to an existing raylet; i.e., when " "head=False and connect_only=True.") + if ray_params._internal_config and len( + ray_params._internal_config) > 0 and (not head + and not connect_only): + raise ValueError( + "Internal config parameters can only be set on the head node.") self._raylet_ip_address = raylet_ip_address @@ -663,7 +668,8 @@ class Node: plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, fate_share=self.kernel_fate_share, - socket_to_use=self.socket) + socket_to_use=self.socket, + head_node=self.head) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] diff --git a/python/ray/services.py b/python/ray/services.py index 41f91d37b..97cedae0f 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1260,7 +1260,8 @@ def start_raylet(redis_address, plasma_directory=None, huge_pages=False, fate_share=None, - socket_to_use=None): + socket_to_use=None, + head_node=False): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -1402,6 +1403,8 @@ def start_raylet(redis_address, command.append("--huge_pages") if socket_to_use: socket_to_use.close() + if head_node: + command.append("--head_node") process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_RAYLET, diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 766fc7843..ffbfad1b9 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -111,7 +111,9 @@ def _ray_start_cluster(**kwargs): init_kwargs.update(kwargs) cluster = Cluster() remote_nodes = [] - for _ in range(num_nodes): + for i in range(num_nodes): + if i > 0 and "_internal_config" in init_kwargs: + del init_kwargs["_internal_config"] remote_nodes.append(cluster.add_node(**init_kwargs)) # We assume driver will connect to the head (first node), # so ray init will be invoked if do_init is true @@ -200,8 +202,7 @@ def two_node_cluster(): cluster = ray.cluster_utils.Cluster( head_node_args={"_internal_config": internal_config}) for _ in range(2): - remote_node = cluster.add_node( - num_cpus=1, _internal_config=internal_config) + remote_node = cluster.add_node(num_cpus=1) ray.init(address=cluster.address) yield cluster, remote_node diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index fd012bd0f..65d8192a7 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -291,7 +291,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster): ray.init(address=cluster.address) # Node to place the actor. - actor_node = cluster.add_node(num_cpus=1, _internal_config=config) + actor_node = cluster.add_node(num_cpus=1) cluster.wait_for_nodes() @ray.remote(num_cpus=1, max_restarts=1, max_task_retries=-1) @@ -313,7 +313,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster): results = [actor.increase.remote() for _ in range(100)] # Kill actor node, while the above task is still being executed. cluster.remove_node(actor_node) - cluster.add_node(num_cpus=1, _internal_config=config) + cluster.add_node(num_cpus=1) cluster.wait_for_nodes() # Check that none of the tasks failed and the actor is restarted. seq = list(range(1, 101)) @@ -442,7 +442,8 @@ def test_caller_task_reconstruction(ray_start_regular): @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_internal_config_map( - initial_reconstruction_timeout_milliseconds=1000) + initial_reconstruction_timeout_milliseconds=1000, + num_heartbeats_timeout=10) ], indirect=True) def test_multiple_actor_restart(ray_start_cluster_head): @@ -454,14 +455,7 @@ def test_multiple_actor_restart(ray_start_cluster_head): num_actors_at_a_time = 3 num_function_calls_at_a_time = 10 - worker_nodes = [ - cluster.add_node( - num_cpus=3, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200, - "num_heartbeats_timeout": 10, - })) for _ in range(num_nodes) - ] + worker_nodes = [cluster.add_node(num_cpus=3) for _ in range(num_nodes)] @ray.remote(max_restarts=-1, max_task_retries=-1) class SlowCounter: diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 06ff4ade8..cd283ff00 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -242,7 +242,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): num_gpus=num_gpus_per_raylet, _internal_config=json.dumps({ "num_heartbeats_timeout": 1000 - })) + } if i == 0 else {})) ray.init(address=cluster.address) @ray.remote diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 7813caa55..ffa763570 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -842,15 +842,15 @@ def test_connect_with_disconnected_node(shutdown_only): info = relevant_errors(ray_constants.REMOVED_NODE_ERROR) assert len(info) == 0 # This node is killed by SIGKILL, ray_monitor will mark it to dead. - dead_node = cluster.add_node(num_cpus=0, _internal_config=config) + dead_node = cluster.add_node(num_cpus=0) cluster.remove_node(dead_node, allow_graceful=False) wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1) # This node is killed by SIGKILL, ray_monitor will mark it to dead. - dead_node = cluster.add_node(num_cpus=0, _internal_config=config) + dead_node = cluster.add_node(num_cpus=0) cluster.remove_node(dead_node, allow_graceful=False) wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2) # This node is killed by SIGTERM, ray_monitor will not mark it again. - removing_node = cluster.add_node(num_cpus=0, _internal_config=config) + removing_node = cluster.add_node(num_cpus=0) cluster.remove_node(removing_node, allow_graceful=True) with pytest.raises(RayTestTimeoutException): wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 3, timeout=2) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 27131537f..09df6308b 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -32,8 +32,11 @@ def test_shutdown(): @pytest.mark.parametrize( - "ray_start_cluster_head", - [generate_internal_config_map(num_heartbeats_timeout=20)], + "ray_start_cluster_head", [ + generate_internal_config_map( + num_heartbeats_timeout=20, + initial_reconstruction_timeout_milliseconds=12345) + ], indirect=True) def test_internal_config(ray_start_cluster_head): """Checks that the internal configuration setting works. @@ -41,12 +44,20 @@ def test_internal_config(ray_start_cluster_head): We set the cluster to timeout nodes after 2 seconds of no timeouts. We then remove a node, wait for 1 second to check that the cluster is out of sync, then wait another 2 seconds (giving 1 second of leeway) to check - that the client has timed out. + that the client has timed out. We also check to see if the config is set. """ cluster = ray_start_cluster_head worker = cluster.add_node() cluster.wait_for_nodes() + @ray.remote + def f(): + assert ray._config.initial_reconstruction_timeout_milliseconds( + ) == 12345 + assert ray._config.num_heartbeats_timeout() == 20 + + ray.get([f.remote() for _ in range(5)]) + cluster.remove_node(worker, allow_graceful=False) time.sleep(1) assert ray.cluster_resources()["CPU"] == 2 diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index ba946d591..efd915c04 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -214,10 +214,7 @@ def test_object_transfer_retry(ray_start_cluster): object_store_memory = 150 * 1024 * 1024 cluster.add_node( object_store_memory=object_store_memory, _internal_config=config) - cluster.add_node( - num_gpus=1, - object_store_memory=object_store_memory, - _internal_config=config) + cluster.add_node(num_gpus=1, object_store_memory=object_store_memory) ray.init(address=cluster.address) @ray.remote(num_gpus=1) diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index 3e05071d2..81e1324ed 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -63,15 +63,9 @@ def test_reconstruction_cached_dependency(ray_start_cluster, ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) cluster.add_node( - num_cpus=1, - resources={"node2": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) cluster.wait_for_nodes() @ray.remote(max_retries=0) @@ -92,10 +86,7 @@ def test_reconstruction_cached_dependency(ray_start_cluster, cluster.remove_node(node_to_kill, allow_graceful=False) cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) assert wait_for_condition( lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10) @@ -125,15 +116,9 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) cluster.add_node( - num_cpus=1, - resources={"node2": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) cluster.wait_for_nodes() @ray.remote(max_retries=1 if reconstruction_enabled else 0) @@ -149,10 +134,7 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): cluster.remove_node(node_to_kill, allow_graceful=False) cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) if reconstruction_enabled: ray.get(dependent_task.remote(obj)) @@ -177,15 +159,9 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) cluster.add_node( - num_cpus=1, - resources={"node2": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) cluster.wait_for_nodes() @ray.remote(max_retries=1 if reconstruction_enabled else 0) @@ -203,10 +179,7 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): cluster.remove_node(node_to_kill, allow_graceful=False) cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) for _ in range(20): ray.put(np.zeros(10**7, dtype=np.uint8)) @@ -232,15 +205,9 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) cluster.add_node( - num_cpus=1, - resources={"node2": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) cluster.wait_for_nodes() @ray.remote(max_retries=1 if reconstruction_enabled else 0) @@ -262,10 +229,7 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): cluster.remove_node(node_to_kill, allow_graceful=False) cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) if reconstruction_enabled: for obj in downstream: @@ -294,8 +258,7 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): cluster.add_node( num_cpus=0, _internal_config=config, object_store_memory=10**8) ray.init(address=cluster.address) - node_to_kill = cluster.add_node( - num_cpus=1, object_store_memory=10**8, _internal_config=config) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) cluster.wait_for_nodes() @ray.remote(max_retries=1 if reconstruction_enabled else 0) @@ -316,8 +279,7 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): ray.get(dependent_task.remote(obj)) cluster.remove_node(node_to_kill, allow_graceful=False) - cluster.add_node( - num_cpus=1, object_store_memory=10**8, _internal_config=config) + cluster.add_node(num_cpus=1, object_store_memory=10**8) if reconstruction_enabled: ray.get(dependent_task.remote(obj)) @@ -343,15 +305,9 @@ def test_reconstruction_stress(ray_start_cluster): ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) cluster.add_node( - num_cpus=1, - resources={"node2": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) cluster.wait_for_nodes() @ray.remote @@ -379,10 +335,7 @@ def test_reconstruction_stress(ray_start_cluster): cluster.remove_node(node_to_kill, allow_graceful=False) node_to_kill = cluster.add_node( - num_cpus=1, - resources={"node1": 1}, - object_store_memory=10**8, - _internal_config=config) + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) i = 0 while outputs: diff --git a/python/ray/tests/test_stress_failure.py b/python/ray/tests/test_stress_failure.py index d247edd50..871582f0b 100644 --- a/python/ray/tests/test_stress_failure.py +++ b/python/ray/tests/test_stress_failure.py @@ -29,11 +29,7 @@ def ray_start_reconstruction(request): }) for i in range(num_nodes - 1): cluster.add_node( - num_cpus=1, - object_store_memory=plasma_store_memory // num_nodes, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200 - })) + num_cpus=1, object_store_memory=plasma_store_memory // num_nodes) ray.init(address=cluster.address) yield plasma_store_memory, num_nodes, cluster diff --git a/python/ray/worker.py b/python/ray/worker.py index 03db98fa5..88bdfecb2 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -770,6 +770,9 @@ def init(address=None, if _internal_config is not None and len(_internal_config) != 0: raise ValueError("When connecting to an existing cluster, " "_internal_config must not be provided.") + if lru_evict: + raise ValueError("When connecting to an existing cluster, " + "lru_evict must not be provided.") # In this case, we only need to connect the node. ray_params = ray.parameter.RayParams( diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc index 596e320a7..90a8d21d6 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc @@ -13,7 +13,9 @@ // limitations under the License. #include "ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h" + #include + #include "ray/core_worker/common.h" #include "ray/core_worker/lib/java/jni_utils.h" #include "ray/gcs/gcs_client/global_state_accessor.h" @@ -79,6 +81,16 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo( return static_cast(NativeStringToJavaByteArray(env, node_resource_info)); } +JNIEXPORT jbyteArray JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetInternalConfig( + JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { + auto *gcs_accessor = + reinterpret_cast(gcs_accessor_ptr); + auto internal_config_string = gcs_accessor->GetInternalConfig(); + return static_cast( + NativeStringToJavaByteArray(env, internal_config_string)); +} + JNIEXPORT jobject JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo( JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h index 1606ab397..51ce96b83 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h @@ -75,6 +75,16 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *, jobje JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(JNIEnv *, jobject, jlong, jbyteArray); + +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeGetInternalConfig + * Signature: (J)[B + */ +JNIEXPORT jbyteArray JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetInternalConfig(JNIEnv *, jobject, + jlong); + /* * Class: io_ray_runtime_gcs_GlobalStateAccessor * Method: nativeGetAllActorInfo diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 9f30606bd..b0b690776 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -574,6 +574,22 @@ class NodeInfoAccessor { /// \param is_pubsub_server_restarted Whether pubsub server is restarted. virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + /// Set the internal config string that will be used by all nodes started in the + /// cluster. + /// + /// \param config Map of config options + /// \return Status + virtual Status AsyncSetInternalConfig( + std::unordered_map &config) = 0; + + /// Get the internal config string from GCS. + /// + /// \param callback Processes a map of config options + /// \return Status + virtual Status AsyncGetInternalConfig( + const OptionalItemCallback> + &callback) = 0; + protected: NodeInfoAccessor() = default; }; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index f95d6febb..bef6aa685 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include "global_state_accessor.h" +#include + namespace ray { namespace gcs { @@ -148,6 +148,27 @@ std::string GlobalStateAccessor::GetNodeResourceInfo(const ClientID &node_id) { return node_resource_map.SerializeAsString(); } +std::string GlobalStateAccessor::GetInternalConfig() { + rpc::StoredConfig config_proto; + std::promise promise; + auto on_done = [&config_proto, &promise]( + Status status, + const boost::optional> + stored_raylet_config) { + RAY_CHECK_OK(status); + if (stored_raylet_config.has_value()) { + config_proto.mutable_config()->insert(stored_raylet_config->begin(), + stored_raylet_config->end()); + } + promise.set_value(); + }; + + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetInternalConfig(on_done)); + promise.get_future().get(); + + return config_proto.SerializeAsString(); +} + std::vector GlobalStateAccessor::GetAllActorInfo() { std::vector actor_table_data; std::promise promise; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index 977a87537..d6fff6387 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -86,6 +86,12 @@ class GlobalStateAccessor { /// deserialized with protobuf function. std::string GetNodeResourceInfo(const ClientID &node_id); + /// Get internal config from GCS Service. + /// + /// \return map of internal config keys and values. It is stored as a StoredConfig proto + /// and serialized as a string to allow multi-language support. + std::string GetInternalConfig(); + /// Get information of all actors from GCS Service. /// /// \return All actor info. To support multi-language, we serialize each ActorTableData diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index d39963986..651b3cecd 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -767,6 +767,39 @@ void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar } } +Status ServiceBasedNodeInfoAccessor::AsyncSetInternalConfig( + std::unordered_map &config) { + rpc::SetInternalConfigRequest request; + request.mutable_config()->mutable_config()->insert(config.begin(), config.end()); + + client_impl_->GetGcsRpcClient().SetInternalConfig( + request, [](const Status &status, const rpc::SetInternalConfigReply &reply) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to set internal config: " << status.message(); + } + }); + return Status::OK(); +} + +Status ServiceBasedNodeInfoAccessor::AsyncGetInternalConfig( + const OptionalItemCallback> &callback) { + rpc::GetInternalConfigRequest request; + client_impl_->GetGcsRpcClient().GetInternalConfig( + request, + [callback](const Status &status, const rpc::GetInternalConfigReply &reply) { + if (status.ok() && reply.has_config()) { + RAY_LOG(DEBUG) << "Fetched internal config: " << reply.config().DebugString(); + callback(status, + std::unordered_map( + reply.config().config().begin(), reply.config().config().end())); + } else { + RAY_LOG(ERROR) << "Failed to get internal config: " << status.message(); + callback(status, boost::none); + } + }); + return Status::OK(); +} + ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 3d7eda914..08479f0ed 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -15,6 +15,7 @@ #pragma once #include + #include "ray/gcs/accessor.h" #include "ray/gcs/subscription_executor.h" #include "ray/util/sequencer.h" @@ -196,6 +197,13 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override; + Status AsyncSetInternalConfig( + std::unordered_map &config) override; + + Status AsyncGetInternalConfig( + const OptionalItemCallback> &callback) + override; + private: /// Save the subscribe operation in this function, so we can call it again when PubSub /// server restarts from a failure. diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 38e6e894f..8dc83e4f5 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/gcs/gcs_client/global_state_accessor.h" + #include "gtest/gtest.h" #include "ray/common/test_util.h" #include "ray/gcs/gcs_server/gcs_server.h" @@ -168,6 +169,28 @@ TEST_F(GlobalStateAccessorTest, TestNodeResourceTable) { } } +TEST_F(GlobalStateAccessorTest, TestInternalConfig) { + rpc::StoredConfig initial_proto; + initial_proto.ParseFromString(global_state_->GetInternalConfig()); + ASSERT_EQ(initial_proto.config().size(), 0); + std::promise promise; + std::unordered_map begin_config; + begin_config["key1"] = "value1"; + begin_config["key2"] = "value2"; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncSetInternalConfig(begin_config)); + std::string returned; + rpc::StoredConfig new_proto; + auto end = std::chrono::system_clock::now() + timeout_ms_; + while (std::chrono::system_clock::now() < end && new_proto.config().size() == 0) { + returned = global_state_->GetInternalConfig(); + new_proto.ParseFromString(returned); + } + ASSERT_EQ(new_proto.config().size(), begin_config.size()); + for (auto pair : new_proto.config()) { + ASSERT_EQ(pair.second, begin_config[pair.first]); + } +} + TEST_F(GlobalStateAccessorTest, TestProfileTable) { int profile_count = 100; ASSERT_EQ(global_state_->GetAllProfileInfo().size(), 0); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index a3299e7bd..3e0efdd28 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "gcs_node_manager.h" + #include #include #include @@ -288,6 +289,32 @@ void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &re } } +void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest &request, + rpc::SetInternalConfigReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto on_done = [reply, send_reply_callback, request](const Status status) { + RAY_LOG(DEBUG) << "Set internal config: " << request.config().DebugString(); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }; + RAY_CHECK_OK(gcs_table_storage_->InternalConfigTable().Put(UniqueID::Nil(), + request.config(), on_done)); +} + +void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest &request, + rpc::GetInternalConfigReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto get_internal_config = [reply, send_reply_callback]( + ray::Status status, + const boost::optional &config) { + if (config.has_value()) { + reply->mutable_config()->CopyFrom(config.get()); + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }; + RAY_CHECK_OK(gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), + get_internal_config)); +} + std::shared_ptr GcsNodeManager::GetNode( const ray::ClientID &node_id) const { auto iter = alive_nodes_.find(node_id); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 5c1ee55ea..27ff5778f 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -19,6 +19,7 @@ #include #include #include + #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "gcs_table_storage.h" @@ -79,6 +80,16 @@ class GcsNodeManager : public rpc::NodeInfoHandler { rpc::DeleteResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle setting internal config. + void HandleSetInternalConfig(const rpc::SetInternalConfigRequest &request, + rpc::SetInternalConfigReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Handle getting internal config. + void HandleGetInternalConfig(const rpc::GetInternalConfigRequest &request, + rpc::GetInternalConfigReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Add an alive node. /// /// \param node The info of the node to be added. diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.cc b/src/ray/gcs/gcs_server/gcs_table_storage.cc index fd4d4e780..a91fd8c48 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.cc +++ b/src/ray/gcs/gcs_server/gcs_table_storage.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "gcs_table_storage.h" + #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/gcs/callback.h" @@ -120,6 +121,7 @@ template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; +template class GcsTable; template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index 494b09727..ddc9a3b81 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -36,6 +36,7 @@ using rpc::ObjectTableDataList; using rpc::ProfileTableData; using rpc::ResourceMap; using rpc::ResourceTableData; +using rpc::StoredConfig; using rpc::TaskLeaseData; using rpc::TaskReconstructionData; using rpc::TaskTableData; @@ -273,6 +274,14 @@ class GcsWorkerTable : public GcsTable { } }; +class GcsInternalConfigTable : public GcsTable { + public: + explicit GcsInternalConfigTable(std::shared_ptr &store_client) + : GcsTable(store_client) { + table_name_ = TablePrefix_Name(TablePrefix::INTERNAL_CONFIG); + } +}; + /// \class GcsTableStorage /// /// This class is not meant to be used directly. All gcs table storage classes should @@ -354,6 +363,11 @@ class GcsTableStorage { return *worker_table_; } + GcsInternalConfigTable &InternalConfigTable() { + RAY_CHECK(internal_config_table_ != nullptr); + return *internal_config_table_; + } + protected: std::shared_ptr store_client_; std::unique_ptr job_table_; @@ -371,6 +385,7 @@ class GcsTableStorage { std::unique_ptr error_info_table_; std::unique_ptr profile_table_; std::unique_ptr worker_table_; + std::unique_ptr internal_config_table_; }; /// \class RedisGcsTableStorage @@ -395,6 +410,7 @@ class RedisGcsTableStorage : public GcsTableStorage { error_info_table_.reset(new GcsErrorInfoTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); + internal_config_table_.reset(new GcsInternalConfigTable(store_client_)); } }; @@ -420,6 +436,7 @@ class InMemoryGcsTableStorage : public GcsTableStorage { error_info_table_.reset(new GcsErrorInfoTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); + internal_config_table_.reset(new GcsInternalConfigTable(store_client_)); } }; diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index c5c220fcf..8d1878d23 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -15,6 +15,7 @@ #pragma once #include + #include "ray/common/id.h" #include "ray/gcs/accessor.h" #include "ray/gcs/callback.h" @@ -374,6 +375,17 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override {} + Status AsyncSetInternalConfig( + std::unordered_map &config) override { + return Status::NotImplemented("SetInternaConfig not implemented."); + } + + Status AsyncGetInternalConfig( + const OptionalItemCallback> &callback) + override { + return Status::NotImplemented("GetInternalConfig not implemented."); + } + private: RedisGcsClient *client_impl_{nullptr}; diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 07ca06745..71a061323 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -15,6 +15,7 @@ #include "ray/gcs/redis_gcs_client.h" #include + #include "ray/common/ray_config.h" #include "ray/gcs/redis_accessor.h" #include "ray/gcs/redis_context.h" diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 8d35b1911..11e80f27e 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -15,7 +15,6 @@ #include "ray/gcs/tables.h" #include "absl/time/clock.h" - #include "ray/common/common_protocol.h" #include "ray/common/grpc_util.h" #include "ray/common/ray_config.h" diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index bb092e980..91940a780 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -43,7 +43,8 @@ enum TablePrefix { DIRECT_ACTOR = 18; // WORKER is already used in WorkerType, so use WORKERS here. WORKERS = 19; - TABLE_PREFIX_MAX = 20; + INTERNAL_CONFIG = 20; + TABLE_PREFIX_MAX = 21; } // The channel that Add operations to the Table should be published on, if any. @@ -162,8 +163,8 @@ message ProfileTableData { string event_type = 1; // The start time of the event. double start_time = 2; - // The end time of the event. If the event is a point event, then this should - // be the same as the start time. + // The end time of the event. If the event is a point event, then this + // should be the same as the start time. double end_time = 3; // Additional data associated with the event. This data must be serialized // using JSON. @@ -316,6 +317,10 @@ message ResourceMap { map items = 1; } +message StoredConfig { + map config = 1; +} + message ObjectTableDataList { repeated ObjectTableData items = 1; } @@ -346,29 +351,33 @@ message PubSubMessage { bytes data = 2; } -// This enum type is used as object's metadata to indicate the object's creating -// task has failed because of a certain error. -// TODO(hchen): We may want to make these errors more specific. E.g., we may want -// to distinguish between intentional and expected actor failures, and between -// worker process failure and node failure. +// This enum type is used as object's metadata to indicate the object's +// creating task has failed because of a certain error. +// TODO(hchen): We may want to make these errors more specific. E.g., we may +// want to distinguish between intentional and expected actor failures, and +// between worker process failure and node failure. enum ErrorType { - // Indicates that a task failed because the worker died unexpectedly while executing it. + // Indicates that a task failed because the worker died unexpectedly while + // executing it. WORKER_DIED = 0; - // Indicates that a task failed because the actor died unexpectedly before finishing it. + // Indicates that a task failed because the actor died unexpectedly before + // finishing it. ACTOR_DIED = 1; // Indicates that an object is lost and cannot be restarted. - // Note, this currently only happens to actor objects. When the actor's state is already - // after the object's creating task, the actor cannot re-run the task. + // Note, this currently only happens to actor objects. When the actor's + // state is already after the object's creating task, the actor cannot + // re-run the task. // TODO(hchen): we may want to reuse this error type for more cases. E.g., // 1) A object that was put by the driver. - // 2) The object's creating task is already cleaned up from GCS (this currently + // 2) The object's creating task is already cleaned up from GCS (this + // currently // crashes raylet). OBJECT_UNRECONSTRUCTABLE = 2; // Indicates that a task failed due to user code failure. TASK_EXECUTION_EXCEPTION = 3; - // Indicates that the object has been placed in plasma. This error shouldn't ever be - // exposed to user code; it is only used internally to indicate the result of a direct - // call has been placed in plasma. + // Indicates that the object has been placed in plasma. This error shouldn't + // ever be exposed to user code; it is only used internally to indicate the + // result of a direct call has been placed in plasma. OBJECT_IN_PLASMA = 4; // Indicates that an object has been cancelled. TASK_CANCELLED = 5; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index ecbd3c661..829fb049a 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -231,6 +231,22 @@ message DeleteResourcesReply { GcsStatus status = 1; } +message SetInternalConfigRequest { + StoredConfig config = 1; +} + +message SetInternalConfigReply { + GcsStatus status = 1; +} + +message GetInternalConfigRequest { +} + +message GetInternalConfigReply { + GcsStatus status = 1; + StoredConfig config = 2; +} + // Service for node info access. service NodeInfoGcsService { // Register a node to GCS Service. @@ -247,6 +263,10 @@ service NodeInfoGcsService { rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesReply); // Delete resources of a node in GCS Service. rpc DeleteResources(DeleteResourcesRequest) returns (DeleteResourcesReply); + // Set cluster internal config. + rpc SetInternalConfig(SetInternalConfigRequest) returns (SetInternalConfigReply); + // Get cluster internal config. + rpc GetInternalConfig(GetInternalConfigRequest) returns (GetInternalConfigReply); } message GetObjectLocationsRequest { diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 91a0592dd..ab851224c 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -19,11 +19,10 @@ #include "ray/common/ray_config.h" #include "ray/common/status.h" #include "ray/common/task/task_common.h" +#include "ray/gcs/gcs_client/service_based_gcs_client.h" #include "ray/raylet/raylet.h" #include "ray/stats/stats.h" -#include "ray/gcs/gcs_client/service_based_gcs_client.h" - DEFINE_string(raylet_socket_name, "", "The socket name of raylet."); DEFINE_string(store_socket_name, "", "The socket name of object store."); DEFINE_int32(object_manager_port, -1, "The port of object manager."); @@ -48,6 +47,7 @@ DEFINE_bool(disable_stats, false, "Whether disable the stats."); DEFINE_string(stat_address, "127.0.0.1:8888", "The address that we report metrics to."); DEFINE_bool(enable_stdout_exporter, false, "Whether enable the stdout exporter for stats."); +DEFINE_bool(head_node, false, "Whether this is the head node of the cluster."); // store options DEFINE_int64(object_store_memory, -1, "The initial memory of the object store."); DEFINE_string(plasma_directory, "", "The shared memory directory of the object store."); @@ -85,6 +85,7 @@ int main(int argc, char *argv[]) { const bool disable_stats = FLAGS_disable_stats; const std::string stat_address = FLAGS_stat_address; const bool enable_stdout_exporter = FLAGS_enable_stdout_exporter; + const bool head_node = FLAGS_head_node; const int64_t object_store_memory = FLAGS_object_store_memory; const std::string plasma_directory = FLAGS_plasma_directory; const bool huge_pages = FLAGS_huge_pages; @@ -102,95 +103,13 @@ int main(int argc, char *argv[]) { std::unordered_map static_resource_conf; std::unordered_map raylet_config; - // Parse the configuration list. - std::istringstream config_string(config_list); - std::string config_name; - std::string config_value; - - while (std::getline(config_string, config_name, ',')) { - RAY_CHECK(std::getline(config_string, config_value, ',')); - // TODO(rkn): The line below could throw an exception. What should we do about this? - raylet_config[config_name] = config_value; - } - - RayConfig::instance().initialize(raylet_config); - - // Parse the resource list. - std::istringstream resource_string(static_resource_list); - std::string resource_name; - std::string resource_quantity; - - while (std::getline(resource_string, resource_name, ',')) { - RAY_CHECK(std::getline(resource_string, resource_quantity, ',')); - // TODO(rkn): The line below could throw an exception. What should we do about this? - static_resource_conf[resource_name] = std::stod(resource_quantity); - } - - node_manager_config.raylet_config = raylet_config; - node_manager_config.resource_config = ray::ResourceSet(std::move(static_resource_conf)); - RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " - << node_manager_config.resource_config.ToString(); - node_manager_config.node_manager_address = node_ip_address; - node_manager_config.node_manager_port = node_manager_port; - node_manager_config.num_initial_workers = num_initial_workers; - node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency; - node_manager_config.min_worker_port = min_worker_port; - node_manager_config.max_worker_port = max_worker_port; - - if (!python_worker_command.empty()) { - node_manager_config.worker_commands.emplace( - make_pair(ray::Language::PYTHON, ParseCommandLine(python_worker_command))); - } - if (!java_worker_command.empty()) { - node_manager_config.worker_commands.emplace( - make_pair(ray::Language::JAVA, ParseCommandLine(java_worker_command))); - } - if (python_worker_command.empty() && java_worker_command.empty()) { - RAY_CHECK(0) - << "Either Python worker command or Java worker command should be provided."; - } - - node_manager_config.heartbeat_period_ms = - RayConfig::instance().raylet_heartbeat_timeout_milliseconds(); - node_manager_config.debug_dump_period_ms = - RayConfig::instance().debug_dump_period_milliseconds(); - node_manager_config.free_objects_period_ms = - RayConfig::instance().free_objects_period_milliseconds(); - node_manager_config.fair_queueing_enabled = - RayConfig::instance().fair_queueing_enabled(); - node_manager_config.object_pinning_enabled = - RayConfig::instance().object_pinning_enabled(); - node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size(); - node_manager_config.store_socket_name = store_socket_name; - node_manager_config.temp_dir = temp_dir; - node_manager_config.session_dir = session_dir; - - // Configuration for the object manager. - ray::ObjectManagerConfig object_manager_config; - object_manager_config.object_manager_port = object_manager_port; - object_manager_config.store_socket_name = store_socket_name; - object_manager_config.pull_timeout_ms = - RayConfig::instance().object_manager_pull_timeout_ms(); - object_manager_config.push_timeout_ms = - RayConfig::instance().object_manager_push_timeout_ms(); - object_manager_config.object_store_memory = object_store_memory; - object_manager_config.plasma_directory = plasma_directory; - object_manager_config.huge_pages = huge_pages; - - int num_cpus = static_cast(static_resource_conf["CPU"]); - object_manager_config.rpc_service_threads_number = - std::min(std::max(2, num_cpus / 4), 8); - object_manager_config.object_chunk_size = - RayConfig::instance().object_manager_default_chunk_size(); - - RAY_LOG(DEBUG) << "Starting object manager with configuration: \n" - << "rpc_service_threads_number = " - << object_manager_config.rpc_service_threads_number - << ", object_chunk_size = " << object_manager_config.object_chunk_size; - - // Initialize the node manager. + // IO Service for node manager. boost::asio::io_service main_service; + // Ensure that the IO service keeps running. Without this, the service will exit as soon + // as there is no more work to be processed. + boost::asio::io_service::work main_work(main_service); + // Initialize gcs client ray::gcs::GcsClientOptions client_options(redis_address, redis_port, redis_password); std::shared_ptr gcs_client; @@ -199,10 +118,123 @@ int main(int argc, char *argv[]) { RAY_CHECK_OK(gcs_client->Connect(main_service)); - std::unique_ptr server(new ray::raylet::Raylet( - main_service, raylet_socket_name, node_ip_address, redis_address, redis_port, - redis_password, node_manager_config, object_manager_config, gcs_client)); - server->Start(); + // The internal_config is only set on the head node--other nodes get it from GCS. + if (head_node) { + // Parse the configuration list. + std::istringstream config_string(config_list); + std::string config_name; + std::string config_value; + + while (std::getline(config_string, config_name, ',')) { + RAY_CHECK(std::getline(config_string, config_value, ',')); + // TODO(rkn): The line below could throw an exception. What should we do about this? + raylet_config[config_name] = config_value; + } + RAY_CHECK_OK(gcs_client->Nodes().AsyncSetInternalConfig(raylet_config)); + } + + std::unique_ptr server(nullptr); + + RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig( + [&](::ray::Status status, + const boost::optional> + stored_raylet_config) { + // NOTE: We update the raylet_config map from above. This avoids a race + // condition between AsyncSetInternalConfig and AsyncGetInternalConfig on the + // head node. There is an unlikely race condition where a second node calls + // AsyncGetInternalConfig before the head finishes AsyncSetInternalConfig. + RAY_CHECK_OK(status); + if (stored_raylet_config.has_value()) { + for (auto pair : stored_raylet_config.get()) { + raylet_config[pair.first] = pair.second; + } + } + + RayConfig::instance().initialize(raylet_config); + + // Parse the resource list. + std::istringstream resource_string(static_resource_list); + std::string resource_name; + std::string resource_quantity; + + while (std::getline(resource_string, resource_name, ',')) { + RAY_CHECK(std::getline(resource_string, resource_quantity, ',')); + // TODO(rkn): The line below could throw an exception. What should we do + // about this? + static_resource_conf[resource_name] = std::stod(resource_quantity); + } + + node_manager_config.raylet_config = raylet_config; + node_manager_config.resource_config = + ray::ResourceSet(std::move(static_resource_conf)); + RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " + << node_manager_config.resource_config.ToString(); + node_manager_config.node_manager_address = node_ip_address; + node_manager_config.node_manager_port = node_manager_port; + node_manager_config.num_initial_workers = num_initial_workers; + node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency; + node_manager_config.min_worker_port = min_worker_port; + node_manager_config.max_worker_port = max_worker_port; + + if (!python_worker_command.empty()) { + node_manager_config.worker_commands.emplace( + make_pair(ray::Language::PYTHON, ParseCommandLine(python_worker_command))); + } + if (!java_worker_command.empty()) { + node_manager_config.worker_commands.emplace( + make_pair(ray::Language::JAVA, ParseCommandLine(java_worker_command))); + } + if (python_worker_command.empty() && java_worker_command.empty()) { + RAY_CHECK(0) << "Either Python worker command or Java worker command should be " + "provided."; + } + + node_manager_config.heartbeat_period_ms = + RayConfig::instance().raylet_heartbeat_timeout_milliseconds(); + node_manager_config.debug_dump_period_ms = + RayConfig::instance().debug_dump_period_milliseconds(); + node_manager_config.free_objects_period_ms = + RayConfig::instance().free_objects_period_milliseconds(); + node_manager_config.fair_queueing_enabled = + RayConfig::instance().fair_queueing_enabled(); + node_manager_config.object_pinning_enabled = + RayConfig::instance().object_pinning_enabled(); + node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size(); + node_manager_config.store_socket_name = store_socket_name; + node_manager_config.temp_dir = temp_dir; + node_manager_config.session_dir = session_dir; + + // Configuration for the object manager. + ray::ObjectManagerConfig object_manager_config; + object_manager_config.object_manager_port = object_manager_port; + object_manager_config.store_socket_name = store_socket_name; + object_manager_config.pull_timeout_ms = + RayConfig::instance().object_manager_pull_timeout_ms(); + object_manager_config.push_timeout_ms = + RayConfig::instance().object_manager_push_timeout_ms(); + object_manager_config.object_store_memory = object_store_memory; + object_manager_config.plasma_directory = plasma_directory; + object_manager_config.huge_pages = huge_pages; + + int num_cpus = static_cast(static_resource_conf["CPU"]); + object_manager_config.rpc_service_threads_number = + std::min(std::max(2, num_cpus / 4), 8); + object_manager_config.object_chunk_size = + RayConfig::instance().object_manager_default_chunk_size(); + + RAY_LOG(DEBUG) << "Starting object manager with configuration: \n" + << "rpc_service_threads_number = " + << object_manager_config.rpc_service_threads_number + << ", object_chunk_size = " + << object_manager_config.object_chunk_size; + + // Initialize the node manager. + server.reset(new ray::raylet::Raylet( + main_service, raylet_socket_name, node_ip_address, redis_address, redis_port, + redis_password, node_manager_config, object_manager_config, gcs_client)); + + server->Start(); + })); // Destroy the Raylet on a SIGTERM. The pointer to main_service is // guaranteed to be valid since this function will run the event loop diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index c5f3798b8..95e54b18c 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -178,6 +178,14 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, DeleteResources, node_info_grpc_client_, ) + /// Set internal config of the cluster in the GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, SetInternalConfig, + node_info_grpc_client_, ) + + /// Get internal config of the node from the GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetInternalConfig, + node_info_grpc_client_, ) + /// Get object's locations from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations, object_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index bb0a3eaa6..a8551f5d6 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -202,6 +202,14 @@ class NodeInfoGcsServiceHandler { virtual void HandleDeleteResources(const DeleteResourcesRequest &request, DeleteResourcesReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleSetInternalConfig(const SetInternalConfigRequest &request, + SetInternalConfigReply *reply, + SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetInternalConfig(const GetInternalConfigRequest &request, + GetInternalConfigReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeInfoGcsService`. @@ -227,6 +235,8 @@ class NodeInfoGrpcService : public GrpcService { NODE_INFO_SERVICE_RPC_HANDLER(GetResources); NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources); NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources); + NODE_INFO_SERVICE_RPC_HANDLER(SetInternalConfig); + NODE_INFO_SERVICE_RPC_HANDLER(GetInternalConfig); } private: