From 7f38cc1d03c1dedd43cb40ed58d9b50d9c74204b Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 23 Mar 2020 13:02:14 -0700 Subject: [PATCH] Debug statements and increase timeout for test array (#7713) --- python/ray/tests/test_array.py | 13 +++++++++++++ src/ray/core_worker/core_worker.cc | 4 +++- src/ray/core_worker/reference_count.cc | 15 +++++++++------ 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/python/ray/tests/test_array.py b/python/ray/tests/test_array.py index 62abafd1c..d5bd0166c 100644 --- a/python/ray/tests/test_array.py +++ b/python/ray/tests/test_array.py @@ -3,6 +3,7 @@ import numpy as np from numpy.testing import assert_equal, assert_almost_equal import pytest import sys +import json import ray import ray.experimental.array.remote as ra @@ -55,6 +56,18 @@ def test_distributed_array_assemble(ray_start_2_cpus, reload_modules): ])) +@pytest.mark.parametrize( + "ray_start_cluster_2_nodes", + [{ + "_internal_config": json.dumps({ + # NOTE(swang): If plasma store notifications to the raylet for new + # objects are delayed by long enough, then this causes concurrent + # fetch calls to timeout and mistakenly mark the object as lost. + # Set the timeout very high to prevent this. + "initial_reconstruction_timeout_milliseconds": 60000, + }) + }], + indirect=True) def test_distributed_array_methods(ray_start_cluster_2_nodes, reload_modules): x = da.zeros.remote([9, 25, 51], "float") assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5dea9ca3d..2a01d2ef8 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -115,7 +115,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, RayLog::StartRayLog(app_name.str(), RayLogLevel::INFO, log_dir_); RayLog::InstallFailureSignalHandler(); } - RAY_LOG(INFO) << "Initializing worker " << worker_context_.GetWorkerID(); // Initialize gcs client. if (getenv("RAY_GCS_SERVICE_ENABLED") != nullptr) { gcs_client_ = std::make_shared(gcs_options); @@ -169,6 +168,9 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, rpc_address_.set_port(core_worker_server_.GetPort()); rpc_address_.set_raylet_id(local_raylet_id.Binary()); rpc_address_.set_worker_id(worker_context_.GetWorkerID().Binary()); + RAY_LOG(INFO) << "Initializing worker at address: " << rpc_address_.ip_address() << ":" + << rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID() + << ", raylet " << local_raylet_id; reference_counter_ = std::make_shared( rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(), diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index a47b542d6..3b88de9f3 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -509,8 +509,8 @@ void ReferenceCounter::MergeRemoteBorrowers(const ObjectID &object_id, auto inserted = it->second.borrowers.insert(worker_addr).second; // If we are the owner of id, then send WaitForRefRemoved to borrower. if (inserted) { - RAY_LOG(DEBUG) << "Adding borrower " << worker_addr.ip_address << " to id " - << object_id; + RAY_LOG(DEBUG) << "Adding borrower " << worker_addr.ip_address << ":" + << worker_addr.port << " to id " << object_id; new_borrowers.push_back(worker_addr); } } @@ -519,8 +519,8 @@ void ReferenceCounter::MergeRemoteBorrowers(const ObjectID &object_id, for (const auto &nested_borrower : borrower_ref.borrowers) { auto inserted = it->second.borrowers.insert(nested_borrower).second; if (inserted) { - RAY_LOG(DEBUG) << "Adding borrower " << nested_borrower.ip_address << " to id " - << object_id; + RAY_LOG(DEBUG) << "Adding borrower " << nested_borrower.ip_address << ":" + << nested_borrower.port << " to id " << object_id; new_borrowers.push_back(nested_borrower); } } @@ -624,8 +624,9 @@ void ReferenceCounter::AddNestedObjectIdsInternal( // We do not own object_id. This is the case where we returned an object ID // from a task, and the task's caller executed in a remote process. for (const auto &inner_id : inner_ids) { - RAY_LOG(DEBUG) << "Adding borrower " << owner_address.ip_address << " to id " - << inner_id << ", borrower owns outer ID " << object_id; + RAY_LOG(DEBUG) << "Adding borrower " << owner_address.ip_address << ":" + << owner_address.port << " to id " << inner_id + << ", borrower owns outer ID " << object_id; auto inner_it = object_id_refs_.find(inner_id); RAY_CHECK(inner_it != object_id_refs_.end()); // Add the task's caller as a borrower. @@ -692,6 +693,8 @@ void ReferenceCounter::SetRefRemovedCallback( } if (it->second.RefCount() == 0) { + RAY_LOG(DEBUG) << "Ref count for borrowed object " << object_id + << " is already 0, responding to WaitForRefRemoved"; // We already stopped borrowing the object ID. Respond to the owner // immediately. ref_removed_callback(object_id);