diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 92a11650e..2413879c3 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -93,10 +93,6 @@ from ray.exceptions import ( ) from ray.experimental.no_return import NoReturn from ray.utils import decode -from ray.ray_constants import ( - DEFAULT_PUT_OBJECT_DELAY, - DEFAULT_PUT_OBJECT_RETRIES, -) cimport cpython @@ -673,32 +669,17 @@ cdef class CoreWorker: size_t data_size, ObjectID object_id, c_vector[CObjectID] contained_ids, CObjectID *c_object_id, shared_ptr[CBuffer] *data): - delay = ray_constants.DEFAULT_PUT_OBJECT_DELAY - for attempt in reversed( - range(ray_constants.DEFAULT_PUT_OBJECT_RETRIES)): - try: - if object_id is None: - with nogil: - check_status(self.core_worker.get().Create( - metadata, data_size, contained_ids, - c_object_id, data)) - else: - c_object_id[0] = object_id.native() - with nogil: - check_status(self.core_worker.get().Create( - metadata, data_size, - c_object_id[0], data)) - break - except ObjectStoreFullError as e: - if attempt: - logger.warning("Waiting {} seconds for space to free up " - "in the object store.".format(delay)) - gc.collect() - time.sleep(delay) - delay *= 2 - else: - self.dump_object_store_memory_usage() - raise e + if object_id is None: + with nogil: + check_status(self.core_worker.get().Create( + metadata, data_size, contained_ids, + c_object_id, data)) + else: + c_object_id[0] = object_id.native() + with nogil: + check_status(self.core_worker.get().Create( + metadata, data_size, + c_object_id[0], data)) # If data is nullptr, that means the ObjectID already existed, # which we ignore. diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 762b29caa..c96dbeb26 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -22,12 +22,6 @@ ID_SIZE = 20 # The default maximum number of bytes to allocate to the object store unless # overridden by the user. DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 20 * 10**9 -# The default number of retries to call `put` when the object store is full. -DEFAULT_PUT_OBJECT_RETRIES = 5 -# The default seconds for delay between calls to retry `put` when -# the object store is full. This delay is exponentially doubled up to -# DEFAULT_PUT_OBJECT_RETRIES times. -DEFAULT_PUT_OBJECT_DELAY = 1 # The smallest cap on the memory used by the object store that we allow. # This must be greater than MEMORY_RESOURCE_UNIT_BYTES * 0.7 OBJECT_STORE_MINIMUM_MEMORY_BYTES = 75 * 1024 * 1024 diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index b5a4a842a..bdace4d54 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -892,30 +892,33 @@ def test_connect_with_disconnected_node(shutdown_only): @pytest.mark.parametrize( "ray_start_cluster_head", [{ "num_cpus": 5, - "object_store_memory": 10**8 + "object_store_memory": 10**8, + "_internal_config": json.dumps({ + "object_store_full_max_retries": 0 + }) }], indirect=True) -@pytest.mark.parametrize("num_actors", [1, 2, 5]) -def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors): +def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head): @ray.remote class LargeMemoryActor: def some_expensive_task(self): return np.zeros(10**8 // 2, dtype=np.uint8) - actors = [LargeMemoryActor.remote() for _ in range(num_actors)] + actors = [LargeMemoryActor.remote() for _ in range(5)] for _ in range(10): pending = [a.some_expensive_task.remote() for a in actors] while pending: [done], pending = ray.wait(pending, num_returns=1) -@pytest.mark.parametrize( - "ray_start_cluster_head", [{ - "num_cpus": 2, - "object_store_memory": 10**8 - }], - indirect=True) -def test_fill_object_store_exception(ray_start_cluster_head): +def test_fill_object_store_exception(shutdown_only): + ray.init( + num_cpus=2, + object_store_memory=10**8, + _internal_config=json.dumps({ + "object_store_full_max_retries": 0 + })) + @ray.remote def expensive_task(): return np.zeros((10**8) // 10, dtype=np.uint8) diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 9eea21634..c4553795d 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) def one_worker_100MiB(request): config = json.dumps({ "distributed_ref_counting_enabled": 1, + "object_store_full_max_retries": 1, }) yield ray.init( num_cpus=1, diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index c68b178ef..7c47e2a27 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -238,3 +238,10 @@ RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20) /// Note: this only takes effect when gcs service is enabled. RAY_CONFIG(int64_t, gcs_service_connect_retries, 50) RAY_CONFIG(int64_t, gcs_service_connect_wait_milliseconds, 100) + +/// Maximum number of times to retry putting an object when the plasma store is full. +/// Can be set to -1 to enable unlimited retries. +RAY_CONFIG(int32_t, object_store_full_max_retries, 5) +/// Duration to sleep after failing to put an object in plasma because it is full. +/// This will be exponentially increased for each retry. +RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000) diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 672cc3820..8d08fc4b3 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -55,28 +55,46 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta const size_t data_size, const ObjectID &object_id, std::shared_ptr *data) { - auto plasma_id = object_id.ToPlasmaId(); - std::shared_ptr arrow_buffer; - { - std::lock_guard guard(store_client_mutex_); - arrow::Status status = - store_client_.Create(plasma_id, data_size, metadata ? metadata->Data() : nullptr, - metadata ? metadata->Size() : 0, &arrow_buffer); - if (plasma::IsPlasmaObjectExists(status)) { - RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: " - << object_id << "."; - return Status::OK(); + int32_t retries = 0; + int32_t max_retries = RayConfig::instance().object_store_full_max_retries(); + uint32_t delay = RayConfig::instance().object_store_full_initial_delay_ms(); + Status status; + bool should_retry = true; + while (should_retry) { + should_retry = false; + arrow::Status plasma_status; + std::shared_ptr arrow_buffer; + { + std::lock_guard guard(store_client_mutex_); + plasma_status = store_client_.Create( + object_id.ToPlasmaId(), data_size, metadata ? metadata->Data() : nullptr, + metadata ? metadata->Size() : 0, &arrow_buffer); } - if (plasma::IsPlasmaStoreFull(status)) { + if (plasma::IsPlasmaStoreFull(plasma_status)) { std::ostringstream message; message << "Failed to put object " << object_id << " in object store because it " << "is full. Object size is " << data_size << " bytes."; - return Status::ObjectStoreFull(message.str()); + status = Status::ObjectStoreFull(message.str()); + if (max_retries < 0 || retries < max_retries) { + RAY_LOG(ERROR) << message.str() << " Plasma store status:\n" + << MemoryUsageString() << "\nWaiting " << delay + << "ms for space to free up..."; + usleep(1000 * delay); + delay *= 2; + retries += 1; + should_retry = true; + } + } else if (plasma::IsPlasmaObjectExists(plasma_status)) { + RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: " + << object_id << "."; + status = Status::OK(); + } else { + RAY_ARROW_RETURN_NOT_OK(plasma_status); + *data = std::make_shared(PlasmaBuffer(arrow_buffer)); + status = Status::OK(); } - RAY_ARROW_RETURN_NOT_OK(status); } - *data = std::make_shared(PlasmaBuffer(arrow_buffer)); - return Status::OK(); + return status; } Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) {