diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index 0018b4afa..1dc7c88d4 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -189,7 +189,7 @@ ClientConnection::ClientConnection(MessageHandler &message_handler, debug_label_(debug_label) {} template -const ClientID &ClientConnection::GetClientID() { +const ClientID &ClientConnection::GetClientId() { return client_id_; } diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index ae33d3aa5..86f0f20c1 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -152,7 +152,7 @@ class ClientConnection : public ServerConnection { } /// \return The ClientID of the remote client. - const ClientID &GetClientID(); + const ClientID &GetClientId(); /// \param client_id The ClientID of the remote client. void SetClientID(const ClientID &client_id); diff --git a/src/ray/object_manager/connection_pool.cc b/src/ray/object_manager/connection_pool.cc index c4c7c8df8..dcfb2a77b 100644 --- a/src/ray/object_manager/connection_pool.cc +++ b/src/ray/object_manager/connection_pool.cc @@ -19,7 +19,7 @@ void ConnectionPool::RegisterReceiver(ConnectionType type, const ClientID &clien void ConnectionPool::RemoveReceiver(std::shared_ptr conn) { std::unique_lock guard(connection_mutex); - ClientID client_id = conn->GetClientID(); + const ClientID client_id = conn->GetClientId(); if (message_receive_connections_.count(client_id) != 0) { Remove(message_receive_connections_, client_id, conn); } @@ -40,7 +40,7 @@ void ConnectionPool::RegisterSender(ConnectionType type, const ClientID &client_ void ConnectionPool::RemoveSender(const std::shared_ptr &conn) { std::unique_lock guard(connection_mutex); - ClientID client_id = conn->GetClientID(); + const ClientID client_id = conn->GetClientId(); if (message_send_connections_.count(client_id) != 0) { Remove(message_send_connections_, client_id, conn); } @@ -68,7 +68,7 @@ void ConnectionPool::ReleaseSender(ConnectionType type, SenderMapType &conn_map = (type == ConnectionType::MESSAGE) ? available_message_send_connections_ : available_transfer_send_connections_; - Return(conn_map, conn->GetClientID(), conn); + Return(conn_map, conn->GetClientId(), conn); } void ConnectionPool::Add(ReceiverMapType &conn_map, const ClientID &client_id, diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index cf68dd114..e9edf5f84 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -22,7 +22,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service, /*release_delay=*/2 * config_.max_sends), send_work_(send_service_), receive_work_(receive_service_), - connection_pool_() { + connection_pool_(), + gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) { RAY_CHECK(config_.max_sends > 0); RAY_CHECK(config_.max_receives > 0); main_service_ = &main_service; @@ -47,7 +48,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service, /*release_delay=*/2 * config_.max_sends), send_work_(send_service_), receive_work_(receive_service_), - connection_pool_() { + connection_pool_(), + gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) { RAY_CHECK(config_.max_sends > 0); RAY_CHECK(config_.max_receives > 0); // TODO(hme) Client ID is never set with this constructor. @@ -94,7 +96,7 @@ void ObjectManager::HandleObjectAdded( // Notify the object directory that the object has been added to this node. ObjectID object_id = ObjectID::from_binary(object_info.object_id); RAY_CHECK(local_objects_.count(object_id) == 0); - local_objects_[object_id] = object_info; + local_objects_[object_id].object_info = object_info; ray::Status status = object_directory_->ReportObjectAdded(object_id, client_id_, object_info); @@ -174,15 +176,10 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) { it->second.timer_set = false; } } else { - // New object locations were found. - if (!it->second.timer_set) { - // The timer was not set, which means that we weren't trying any - // clients. We now have some clients to try, so begin trying to - // Pull from one. If we fail to receive an object within the pull - // timeout, then this will try the rest of the clients in the list - // in succession. - TryPull(object_id); - } + // New object locations were found, so begin trying to pull from a + // client. This will be called every time a new client location + // appears. + TryPull(object_id); } }); } @@ -193,19 +190,30 @@ void ObjectManager::TryPull(const ObjectID &object_id) { return; } - // The timer should never fire if there are no expected client locations. - RAY_CHECK(!it->second.client_locations.empty()); - RAY_CHECK(local_objects_.count(object_id) == 0); + auto &client_vector = it->second.client_locations; - // Get the next client to try. - const ClientID client_id = std::move(it->second.client_locations.back()); - it->second.client_locations.pop_back(); + // The timer should never fire if there are no expected client locations. + RAY_CHECK(!client_vector.empty()); + RAY_CHECK(local_objects_.count(object_id) == 0); + // Make sure that there is at least one client which is not the local client. + // TODO(rkn): It may actually be possible for this check to fail. + RAY_CHECK(client_vector.size() != 1 || client_vector[0] != client_id_); + + // Choose a random client to pull the object from. + // Generate a random index. + std::uniform_int_distribution distribution(0, client_vector.size() - 1); + int client_index = distribution(gen_); + ClientID client_id = client_vector[client_index]; + // If the object manager somehow ended up choosing itself, choose a different + // object manager. if (client_id == client_id_) { - // If we're trying to pull from ourselves, skip this client and try the - // next one. - RAY_LOG(ERROR) << client_id_ << " attempted to pull an object from itself."; - const ClientID client_id = std::move(it->second.client_locations.back()); - it->second.client_locations.pop_back(); + std::swap(client_vector[client_index], client_vector[client_vector.size() - 1]); + client_vector.pop_back(); + RAY_LOG(ERROR) << "The object manager with client ID " << client_id_ + << " is trying to pull object " << object_id + << " but the object table suggests that this object manager " + << "already has the object."; + client_id = client_vector[client_index % client_vector.size()]; RAY_CHECK(client_id != client_id_); } @@ -379,10 +387,33 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { return; } + // If we haven't pushed this object to this same object manager yet, then push + // it. If we have, but it was a long time ago, then push it. If we have and it + // was recent, then don't do it again. + auto &recent_pushes = local_objects_[object_id].recent_pushes; + auto it = recent_pushes.find(client_id); + if (it == recent_pushes.end()) { + // We haven't pushed this specific object to this specific object manager + // yet (or if we have then the object must have been evicted and recreated + // locally). + recent_pushes[client_id] = current_sys_time_ms(); + } else { + int64_t current_time = current_sys_time_ms(); + if (current_time - it->second <= + RayConfig::instance().object_manager_repeated_push_delay_ms()) { + // We pushed this object to the object manager recently, so don't do it + // again. + return; + } else { + it->second = current_time; + } + } + RemoteConnectionInfo connection_info(client_id); object_directory_->LookupRemoteConnectionInfo(connection_info); if (connection_info.Connected()) { - const object_manager::protocol::ObjectInfoT &object_info = local_objects_[object_id]; + const object_manager::protocol::ObjectInfoT &object_info = + local_objects_[object_id].object_info; uint64_t data_size = static_cast(object_info.data_size + object_info.metadata_size); uint64_t metadata_size = static_cast(object_info.metadata_size); @@ -397,11 +428,11 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { // object manager from another object manager. ray::Status status = ExecuteSendObject( client_id, object_id, data_size, metadata_size, chunk_index, connection_info); + double end_time = current_sys_time_seconds(); // Notify the main thread that we have finished sending the chunk. main_service_->post( - [this, object_id, client_id, chunk_index, start_time, status]() { - double end_time = current_sys_time_seconds(); + [this, object_id, client_id, chunk_index, start_time, end_time, status]() { HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, status); }); @@ -746,6 +777,9 @@ void ObjectManager::ConnectClient(std::shared_ptr &conn, void ObjectManager::DisconnectClient(std::shared_ptr &conn, const uint8_t *message) { connection_pool_.RemoveReceiver(conn); + + // We don't need to clean up unfulfilled_push_requests_ because the + // unfulfilled push timers will fire and clean it up. } void ObjectManager::ReceivePullRequest(std::shared_ptr &conn, @@ -777,15 +811,16 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr &con uint64_t metadata_size = object_header->metadata_size(); receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() { double start_time = current_sys_time_seconds(); - const ClientID client_id = conn->GetClientID(); + const ClientID client_id = conn->GetClientId(); auto status = ExecuteReceiveObject(client_id, object_id, data_size, metadata_size, chunk_index, *conn); + double end_time = current_sys_time_seconds(); // Notify the main thread that we have finished receiving the object. - main_service_->post([this, object_id, client_id, chunk_index, start_time, status]() { - double end_time = current_sys_time_seconds(); - HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, - status); - }); + main_service_->post( + [this, object_id, client_id, chunk_index, start_time, end_time, status]() { + HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, + status); + }); }); } diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index bbbe0603f..b25833767 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -51,6 +52,14 @@ struct ObjectManagerConfig { int push_timeout_ms; }; +struct LocalObjectInfo { + /// Information from the object store about the object. + object_manager::protocol::ObjectInfoT object_info; + /// A map from the ID of a remote object manager to the timestamp of when + /// the object was last pushed to that object manager (if a push took place). + std::unordered_map recent_pushes; +}; + class ObjectManagerInterface { public: virtual ray::Status Pull(const ObjectID &object_id) = 0; @@ -102,7 +111,9 @@ class ObjectManager : public ObjectManagerInterface { /// \return Status of whether adding the subscription succeeded. ray::Status SubscribeObjDeleted(std::function callback); - /// Push an object to to the node manager on the node corresponding to client id. + /// Consider pushing an object to a remote object manager. This object manager + /// may choose to ignore the Push call (e.g., if Push is called twice in a row + /// on the same object, the second one might be ignored). /// /// \param object_id The object's object id. /// \param client_id The remote node's client id. @@ -382,8 +393,9 @@ class ObjectManager : public ObjectManagerInterface { /// Connection pool for reusing outgoing connections to remote object managers. ConnectionPool connection_pool_; - /// Cache of locally available objects. - std::unordered_map local_objects_; + /// Mapping from locally available objects to information about those objects + /// including when the object was last pushed to other object managers. + std::unordered_map local_objects_; /// This is used as the callback identifier in Pull for /// SubscribeObjectLocations. We only need one identifier because we never need to @@ -400,11 +412,16 @@ class ObjectManager : public ObjectManagerInterface { std::unordered_map>> unfulfilled_push_requests_; + /// The objects that this object manager is currently trying to fetch from + /// remote object managers. std::unordered_map pull_requests_; /// Profiling events that are to be batched together and added to the profile /// table in the GCS. std::vector profile_events_; + + /// Internally maintained random number generator. + std::mt19937_64 gen_; }; } // namespace ray diff --git a/src/ray/object_manager/object_manager_client_connection.h b/src/ray/object_manager/object_manager_client_connection.h index 5ecc9d36a..3b00b1a30 100644 --- a/src/ray/object_manager/object_manager_client_connection.h +++ b/src/ray/object_manager/object_manager_client_connection.h @@ -74,7 +74,7 @@ class SenderConnection : public boost::enable_shared_from_this } /// \return The ClientID of this connection. - const ClientID &GetClientID() { return client_id_; } + const ClientID &GetClientId() { return client_id_; } private: bool operator==(const SenderConnection &rhs) const { diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 31ea5aed7..c7f38b0c7 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -120,7 +120,7 @@ class TestObjectManagerBase : public ::testing::Test { store_id_1 = StartStore(UniqueID::from_random().hex()); store_id_2 = StartStore(UniqueID::from_random().hex()); - uint pull_timeout_ms = 1; + uint pull_timeout_ms = 1000; int max_sends_a = 2; int max_receives_a = 2; int max_sends_b = 3; diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index da30fdebd..2306b89c3 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -102,6 +102,9 @@ class RayConfig { int object_manager_push_timeout_ms() const { return object_manager_push_timeout_ms_; } + int object_manager_repeated_push_delay_ms() const { + return object_manager_repeated_push_delay_ms_; + } uint64_t object_manager_default_chunk_size() const { return object_manager_default_chunk_size_; } @@ -183,6 +186,8 @@ class RayConfig { object_manager_push_timeout_ms_ = pair.second; } else if (pair.first == "object_manager_default_chunk_size") { object_manager_default_chunk_size_ = pair.second; + } else if (pair.first == "object_manager_repeated_push_delay_ms") { + object_manager_repeated_push_delay_ms_ = pair.second; } else { RAY_LOG(FATAL) << "Received unexpected config parameter " << pair.first; } @@ -224,8 +229,9 @@ class RayConfig { max_tasks_to_spillback_(10), actor_creation_num_spillbacks_warning_(100), node_manager_forward_task_retry_timeout_milliseconds_(1000), - object_manager_pull_timeout_ms_(100), + object_manager_pull_timeout_ms_(10000), object_manager_push_timeout_ms_(10000), + object_manager_repeated_push_delay_ms_(60000), object_manager_default_chunk_size_(1000000), num_workers_per_process_(1), initialized_(false) {} @@ -348,6 +354,10 @@ class RayConfig { /// 0: giving up retrying immediately. int object_manager_push_timeout_ms_; + /// The period of time that an object manager will wait before pushing the + /// same object again to a specific object manager. + int object_manager_repeated_push_delay_ms_; + /// Default chunk size for multi-chunk transfers to use in the object manager. /// In the object manager, no single thread is permitted to transfer more /// data than what is specified by the chunk size unless the number of object diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f426da740..e5ce98456 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -763,7 +763,7 @@ void NodeManager::ProcessDisconnectClientMessage( DispatchTasks(local_queues_.GetReadyTasks()); } else if (is_driver) { // The client is a driver. - RAY_CHECK_OK(gcs_client_->driver_table().AppendDriverData(client->GetClientID(), + RAY_CHECK_OK(gcs_client_->driver_table().AppendDriverData(client->GetClientId(), /*is_dead=*/true)); auto driver_id = worker->GetAssignedTaskId(); RAY_CHECK(!driver_id.is_nil()); diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 41187d4d2..e8383cdf1 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -359,6 +359,8 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2, "use_pytorch": true, "sample_async": false}' +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA python -m pytest /ray/test/object_manager_test.py + python3 $ROOT_DIR/multi_node_docker_test.py \ --docker-image=$DOCKER_SHA \ --num-nodes=5 \ diff --git a/test/object_manager_test.py b/test/object_manager_test.py new file mode 100644 index 000000000..928d0dcd8 --- /dev/null +++ b/test/object_manager_test.py @@ -0,0 +1,307 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import defaultdict +import json +import multiprocessing +import numpy as np +import pytest +import time +import warnings + +import ray +from ray.test.cluster_utils import Cluster + +if (multiprocessing.cpu_count() < 40 + or ray.utils.get_system_memory() < 50 * 10**9): + warnings.warn("This test must be run on large machines.") + + +def create_cluster(num_nodes): + cluster = Cluster() + for i in range(num_nodes): + cluster.add_node(resources={str(i): 100}, object_store_memory=10**9) + + ray.init(redis_address=cluster.redis_address) + return cluster + + +@pytest.fixture() +def ray_start_cluster(): + num_nodes = 5 + cluster = create_cluster(num_nodes) + yield cluster, num_nodes + + # The code after the yield will run as teardown code. + ray.shutdown() + cluster.shutdown() + + +@pytest.fixture() +def ray_start_empty_cluster(): + cluster = Cluster() + yield cluster + + # The code after the yield will run as teardown code. + ray.shutdown() + cluster.shutdown() + + +# This test is here to make sure that when we broadcast an object to a bunch of +# machines, we don't have too many excess object transfers. +def test_object_broadcast(ray_start_cluster): + cluster, num_nodes = ray_start_cluster + + @ray.remote + def f(x): + return + + x = np.zeros(10**8, dtype=np.uint8) + + @ray.remote + def create_object(): + return np.zeros(10**8, dtype=np.uint8) + + object_ids = [] + + for _ in range(3): + # Broadcast an object to all machines. + x_id = ray.put(x) + object_ids.append(x_id) + ray.get([ + f._remote(args=[x_id], resources={str(i % num_nodes): 1}) + for i in range(10 * num_nodes) + ]) + + for _ in range(3): + # Broadcast an object to all machines. + x_id = create_object.remote() + object_ids.append(x_id) + ray.get([ + f._remote(args=[x_id], resources={str(i % num_nodes): 1}) + for i in range(10 * num_nodes) + ]) + + # Wait for profiling information to be pushed to the profile table. + time.sleep(1) + transfer_events = ray.global_state.chrome_tracing_object_transfer_dump() + + # Make sure that each object was transferred a reasonable number of times. + for x_id in object_ids: + relevant_events = [ + event for event in transfer_events + if event["cat"] == "transfer_send" + and event["args"][0] == x_id.hex() and event["args"][2] == 1 + ] + + # NOTE: Each event currently appears twice because we duplicate the + # send and receive boxes to underline them with a box (black if it is a + # send and gray if it is a receive). So we need to remove these extra + # boxes here. + deduplicated_relevant_events = [ + event for event in relevant_events if event["cname"] != "black" + ] + assert len(deduplicated_relevant_events) * 2 == len(relevant_events) + relevant_events = deduplicated_relevant_events + + # Each object must have been broadcast to each remote machine. + assert len(relevant_events) >= num_nodes - 1 + # If more object transfers than necessary have been done, print a + # warning. + if len(relevant_events) > num_nodes - 1: + warnings.warn("This object was transferred {} times, when only {} " + "transfers were required.".format( + len(relevant_events), num_nodes - 1)) + # Each object should not have been broadcast more than once from every + # machine to every other machine. Also, a pair of machines should not + # both have sent the object to each other. + assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2 + + # Make sure that no object was sent multiple times between the same + # pair of object managers. + send_counts = defaultdict(int) + for event in relevant_events: + # The pid identifies the sender and the tid identifies the + # receiver. + send_counts[(event["pid"], event["tid"])] += 1 + assert all(value == 1 for value in send_counts.values()) + + +# When submitting an actor method, we try to pre-emptively push its arguments +# to the actor's object manager. However, in the past we did not deduplicate +# the pushes and so the same object could get shipped to the same object +# manager many times. This test checks that that isn't happening. +def test_actor_broadcast(ray_start_cluster): + cluster, num_nodes = ray_start_cluster + + @ray.remote + class Actor(object): + def ready(self): + pass + + def set_weights(self, x): + pass + + actors = [ + Actor._remote(args=[], kwargs={}, resources={str(i % num_nodes): 1}) + for i in range(100) + ] + + # Wait for the actors to start up. + ray.get([a.ready.remote() for a in actors]) + + object_ids = [] + + # Broadcast a large object to all actors. + for _ in range(10): + x_id = ray.put(np.zeros(10**7, dtype=np.uint8)) + object_ids.append(x_id) + # Pass the object into a method for every actor. + ray.get([a.set_weights.remote(x_id) for a in actors]) + + # Wait for profiling information to be pushed to the profile table. + time.sleep(1) + transfer_events = ray.global_state.chrome_tracing_object_transfer_dump() + + # Make sure that each object was transferred a reasonable number of times. + for x_id in object_ids: + relevant_events = [ + event for event in transfer_events + if event["cat"] == "transfer_send" + and event["args"][0] == x_id.hex() and event["args"][2] == 1 + ] + + # NOTE: Each event currently appears twice because we duplicate the + # send and receive boxes to underline them with a box (black if it is a + # send and gray if it is a receive). So we need to remove these extra + # boxes here. + deduplicated_relevant_events = [ + event for event in relevant_events if event["cname"] != "black" + ] + assert len(deduplicated_relevant_events) * 2 == len(relevant_events) + relevant_events = deduplicated_relevant_events + + # Each object must have been broadcast to each remote machine. + assert len(relevant_events) >= num_nodes - 1 + # If more object transfers than necessary have been done, print a + # warning. + if len(relevant_events) > num_nodes - 1: + warnings.warn("This object was transferred {} times, when only {} " + "transfers were required.".format( + len(relevant_events), num_nodes - 1)) + # Each object should not have been broadcast more than once from every + # machine to every other machine. Also, a pair of machines should not + # both have sent the object to each other. + assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2 + + # Make sure that no object was sent multiple times between the same + # pair of object managers. + send_counts = defaultdict(int) + for event in relevant_events: + # The pid identifies the sender and the tid identifies the + # receiver. + send_counts[(event["pid"], event["tid"])] += 1 + assert all(value == 1 for value in send_counts.values()) + + +# The purpose of this test is to make sure that an object that was already been +# transferred to a node can be transferred again. +def test_object_transfer_retry(ray_start_empty_cluster): + cluster = ray_start_empty_cluster + + repeated_push_delay = 4 + + config = json.dumps({ + "object_manager_repeated_push_delay_ms": repeated_push_delay * 1000 + }) + cluster.add_node(_internal_config=config) + cluster.add_node(resources={"GPU": 1}, _internal_config=config) + ray.init(redis_address=cluster.redis_address) + + @ray.remote(num_gpus=1) + def f(size): + return np.zeros(size, dtype=np.uint8) + + x_ids = [f.remote(10**i) for i in [1, 2, 3, 4, 5, 6, 7]] + assert not any( + ray.worker.global_worker.plasma_client.contains( + ray.pyarrow.plasma.ObjectID(x_id.id())) for x_id in x_ids) + + start_time = time.time() + + # Get the objects locally to cause them to be transferred. + xs = ray.get(x_ids) + + # Cause all objects to be flushed. + del xs + x = np.zeros(10**7, dtype=np.uint8) + for _ in range(10): + ray.put(x) + assert not any( + ray.worker.global_worker.plasma_client.contains( + ray.pyarrow.plasma.ObjectID(x_id.id())) for x_id in x_ids) + + end_time = time.time() + + # Get the objects again and make sure they get transferred. + xs = ray.get(x_ids) + + end_transfer_time = time.time() + + # Make sure that the object was retransferred before the object manager + # repeated push delay expired. + if end_time - start_time <= repeated_push_delay: + warnings.warn("This test didn't really fail, but the timing is such " + "that it is not testing the thing it should be testing.") + # We should have had to wait for the repeated push delay. + assert end_transfer_time - start_time >= repeated_push_delay + + # Flush the objects again and wait longer than the repeated push delay and + # make sure that the objects are transferred again. + del xs + for _ in range(10): + ray.put(x) + assert not any( + ray.worker.global_worker.plasma_client.contains( + ray.pyarrow.plasma.ObjectID(x_id.id())) for x_id in x_ids) + + time.sleep(repeated_push_delay) + ray.get(x_ids) + + +# The purpose of this test is to make sure we can transfer many objects. In the +# past, this has caused failures in which object managers create too many open +# files and run out of resources. +def test_many_small_transfers(ray_start_cluster): + cluster, num_nodes = ray_start_cluster + + @ray.remote + def f(*args): + pass + + # This function creates 1000 objects on each machine and then transfers + # each object to every other machine. + def do_transfers(): + id_lists = [] + for i in range(num_nodes): + id_lists.append([ + f._remote(args=[], kwargs={}, resources={str(i): 1}) + for _ in range(1000) + ]) + ids = [] + for i in range(num_nodes): + for j in range(num_nodes): + if i == j: + continue + ids.append( + f._remote( + args=id_lists[j], kwargs={}, resources={str(i): 1})) + + # Wait for all of the transfers to finish. + ray.get(ids) + + do_transfers() + do_transfers() + do_transfers() + do_transfers() diff --git a/test/runtest.py b/test/runtest.py index b6e05042c..cea970905 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1231,6 +1231,7 @@ def test_multithreading(shutdown_only): def test_free_objects_multi_node(shutdown_only): + config = json.dumps({"object_manager_repeated_push_delay_ms": 1000}) ray.worker._init( start_ray_local=True, num_local_schedulers=3, @@ -1241,7 +1242,8 @@ def test_free_objects_multi_node(shutdown_only): "Custom1": 1 }, { "Custom2": 1 - }]) + }], + _internal_config=config) @ray.remote(resources={"Custom0": 1}) def run_on_0():