diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index 43df8234a..79f3e3355 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -102,6 +102,8 @@ ray { // See src/ray/ray_config_def.h for options. config { num_workers_per_process_java: 10 + // TODO(zhuohan): enable this for java + put_small_object_in_memory_store: false } } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index fc4f23a6f..93535f970 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -774,9 +774,14 @@ cdef class CoreWorker: CObjectID c_object_id shared_ptr[CBuffer] data shared_ptr[CBuffer] metadata + int64_t put_threshold + c_bool put_small_object_in_memory_store c_vector[CObjectID] c_object_id_vector metadata = string_to_buffer(serialized_object.metadata) + put_threshold = RayConfig.instance().max_direct_call_object_size() + put_small_object_in_memory_store = ( + RayConfig.instance().put_small_object_in_memory_store()) total_bytes = serialized_object.total_bytes object_already_exists = self._create_put_buffer( metadata, total_bytes, object_id, @@ -787,7 +792,8 @@ cdef class CoreWorker: if total_bytes > 0: (serialized_object).write_to( Buffer.make(data)) - if self.is_local_mode: + if self.is_local_mode or (put_small_object_in_memory_store + and total_bytes < put_threshold): c_object_id_vector.push_back(c_object_id) check_status(CCoreWorkerProcess.GetCoreWorker().Put( CRayObject(data, metadata, c_object_id_vector), @@ -1103,7 +1109,8 @@ cdef class CoreWorker: cdef: CObjectID c_object_id = object_id.native() CAddress c_owner_address = CAddress() - CCoreWorkerProcess.GetCoreWorker().PromoteToPlasmaAndGetOwnershipInfo( + CCoreWorkerProcess.GetCoreWorker().PromoteObjectToPlasma(c_object_id) + CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo( c_object_id, &c_owner_address) return (object_id, c_owner_address.SerializeAsString()) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index d34fa37cd..a46356094 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -127,11 +127,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CActorHandle* GetNamedActorHandle(const c_string &name) void AddLocalReference(const CObjectID &object_id) void RemoveLocalReference(const CObjectID &object_id) + void PutObjectIntoPlasma(const CRayObject &object, + const CObjectID &object_id) const CAddress &GetRpcAddress() const CAddress GetOwnerAddress(const CObjectID &object_id) const void PromoteObjectToPlasma(const CObjectID &object_id) - void PromoteToPlasmaAndGetOwnershipInfo(const CObjectID &object_id, - CAddress *owner_address) + void GetOwnershipInfo(const CObjectID &object_id, + CAddress *owner_address) void RegisterOwnershipInfoAndResolveFuture( const CObjectID &object_id, const CObjectID &outer_object_id, diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index ff81b531c..e47d3359c 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -88,3 +88,5 @@ cdef extern from "ray/common/ray_config.h" nogil: int64_t max_direct_call_object_size() const c_bool gcs_actor_service_enabled() const + + c_bool put_small_object_in_memory_store() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 8014052d4..3b3b0ccb0 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -157,3 +157,7 @@ cdef class Config: @staticmethod def maximum_gcs_deletion_batch_size(): return RayConfig.instance().maximum_gcs_deletion_batch_size() + + @staticmethod + def put_small_object_in_memory_store(): + return RayConfig.instance().put_small_object_in_memory_store() diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 7d7095d8e..47392aba1 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -484,15 +484,16 @@ def test_shutdown_disconnect_global_state(): @pytest.mark.parametrize( "ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True) def test_put_pins_object(ray_start_object_store_memory): - x_id = ray.put("HI") + obj = np.ones(200 * 1024, dtype=np.uint8) + x_id = ray.put(obj) x_binary = x_id.binary() - assert ray.get(ray.ObjectID(x_binary)) == "HI" + assert (ray.get(ray.ObjectID(x_binary)) == obj).all() # x cannot be evicted since x_id pins it for _ in range(10): ray.put(np.zeros(10 * 1024 * 1024)) - assert ray.get(x_id) == "HI" - assert ray.get(ray.ObjectID(x_binary)) == "HI" + assert (ray.get(x_id) == obj).all() + assert (ray.get(ray.ObjectID(x_binary)) == obj).all() # now it can be evicted since x_id pins it but x_binary does not del x_id @@ -502,7 +503,7 @@ def test_put_pins_object(ray_start_object_store_memory): ray.ObjectID(x_binary)) # weakref put - y_id = ray.put("HI", weakref=True) + y_id = ray.put(obj, weakref=True) for _ in range(10): ray.put(np.zeros(10 * 1024 * 1024)) with pytest.raises(ray.exceptions.UnreconstructableError): diff --git a/python/ray/tests/test_component_failures.py b/python/ray/tests/test_component_failures.py index 256ad072a..f076f9b41 100644 --- a/python/ray/tests/test_component_failures.py +++ b/python/ray/tests/test_component_failures.py @@ -2,6 +2,7 @@ import os import signal import sys import time +import numpy as np import pytest @@ -54,7 +55,8 @@ def test_dying_worker_get(ray_start_2_cpus): assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. - ray.worker.global_worker.put_object(1, x_id) + obj = np.ones(200 * 1024, dtype=np.uint8) + ray.worker.global_worker.put_object(obj, x_id) time.sleep(0.1) # Make sure that nothing has died. @@ -97,7 +99,8 @@ ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}"))) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. - ray.worker.global_worker.put_object(1, x_id) + obj = np.ones(200 * 1024, dtype=np.uint8) + ray.worker.global_worker.put_object(obj, x_id) time.sleep(0.1) # Make sure that nothing has died. @@ -137,7 +140,8 @@ def test_dying_worker_wait(ray_start_2_cpus): time.sleep(0.1) # Create the object. - ray.worker.global_worker.put_object(1, x_id) + obj = np.ones(200 * 1024, dtype=np.uint8) + ray.worker.global_worker.put_object(obj, x_id) time.sleep(0.1) # Make sure that nothing has died. @@ -180,7 +184,8 @@ ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))]) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # wait can return. - ray.worker.global_worker.put_object(1, x_id) + obj = np.ones(200 * 1024, dtype=np.uint8) + ray.worker.global_worker.put_object(obj, x_id) time.sleep(0.1) # Make sure that nothing has died. diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index ffa763570..d903454b6 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -819,7 +819,7 @@ def test_raylet_crash_when_get(ray_start_regular): time.sleep(2) ray.worker._global_node.kill_raylet() - object_id = ray.put(None) + object_id = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) ray.internal.free(object_id) while ray.worker.global_worker.core_worker.object_exists(object_id): time.sleep(1) diff --git a/python/ray/tests/test_memory_limits.py b/python/ray/tests/test_memory_limits.py index 091f1b4bd..3167f1354 100644 --- a/python/ray/tests/test_memory_limits.py +++ b/python/ray/tests/test_memory_limits.py @@ -62,7 +62,8 @@ class TestMemoryLimits(unittest.TestCase): num_cpus=1, object_store_memory=300 * MB, driver_object_store_memory=driver_quota) - z = ray.put("hi", weakref=True) + obj = np.ones(200 * 1024, dtype=np.uint8) + z = ray.put(obj, weakref=True) a = LightActor._remote(object_store_memory=a_quota) b = GreedyActor._remote(object_store_memory=b_quota) for _ in range(5): diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index da24017b3..0d03812a7 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -4,6 +4,7 @@ import grpc import pytest import requests import time +import numpy as np import ray from ray.core.generated import node_manager_pb2 @@ -180,7 +181,7 @@ def test_raylet_info_endpoint(shutdown_only): self.local_storage = [f.remote() for _ in range(10)] def remote_store(self): - self.remote_storage = ray.put("test") + self.remote_storage = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) def getpid(self): return os.getpid() @@ -443,9 +444,8 @@ def test_memory_dashboard(shutdown_only): return True def test_object_pineed_in_memory(): - import numpy as np - a = ray.put(np.zeros(1)) + a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) b = ray.get(a) # Noqa F841 del a @@ -469,7 +469,7 @@ def test_memory_dashboard(shutdown_only): def f(arg): time.sleep(1) - a = ray.put(None) # Noqa F841 + a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) # Noqa F841 b = f.remote(a) # Noqa F841 wait_for_condition(memory_table_ready) diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 397ecc3d1..0c3142c6d 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -174,8 +174,10 @@ def test_cleanup_on_driver_exit(call_ray_start): driver_script = """ import time import ray +import numpy as np ray.init(address="{}") -object_ids = [ray.put(i) for i in range(1000)] +object_ids = [ray.put(np.zeros(200 * 1024, dtype=np.uint8)) + for i in range(1000)] start_time = time.time() while time.time() - start_time < 30: if len(ray.objects()) == 1000: diff --git a/python/ray/worker.py b/python/ray/worker.py index 88bdfecb2..4fe8b8ae8 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1286,14 +1286,6 @@ def connect(node, worker.core_worker.set_object_store_client_options( "ray_driver_{}".format(os.getpid()), driver_object_store_memory) - # Put something in the plasma store so that subsequent plasma store - # accesses will be faster. Currently the first access is always slow, and - # we don't want the user to experience this. - if mode != LOCAL_MODE: - temporary_object_id = ray.ObjectID.from_random() - worker.put_object(1, object_id=temporary_object_id) - ray.internal.free([temporary_object_id]) - # Start the import thread worker.import_thread = import_thread.ImportThread(worker, mode, worker.threads_stopped) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index df443a999..222a85bec 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -315,3 +315,6 @@ RAY_CONFIG(int64_t, gcs_service_address_check_interval_milliseconds, 1000) RAY_CONFIG(bool, gcs_actor_service_enabled, getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") != nullptr && getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") == std::string("true")) + +/// Whether start the Plasma Store as a Raylet thread. +RAY_CONFIG(bool, put_small_object_in_memory_store, false) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a9f6e3023..83df0889d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -360,9 +360,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ boost::bind(&CoreWorker::TriggerGlobalGC, this), boost::bind(&CoreWorker::CurrentCallSite, this))); memory_store_.reset(new CoreWorkerMemoryStore( - [this](const RayObject &obj, const ObjectID &obj_id) { - RAY_LOG(DEBUG) << "Promoting object to plasma " << obj_id; - RAY_CHECK_OK(Put(obj, /*contained_object_ids=*/{}, obj_id, /*pin_object=*/true)); + [this](const RayObject &object, const ObjectID &object_id) { + PutObjectIntoPlasma(object, object_id); + return Status::OK(); }, options_.ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_, options_.check_signals)); @@ -708,6 +708,33 @@ CoreWorker::GetAllReferenceCounts() const { return counts; } +void CoreWorker::PutObjectIntoPlasma(const RayObject &object, const ObjectID &object_id) { + bool object_exists; + RAY_CHECK_OK(plasma_store_provider_->Put(object, object_id, &object_exists)); + if (!object_exists) { + // Tell the raylet to pin the object **after** it is created. + RAY_LOG(DEBUG) << "Pinning put object " << object_id; + RAY_CHECK_OK(local_raylet_client_->PinObjectIDs( + rpc_address_, {object_id}, + [this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) { + // Only release the object once the raylet has responded to avoid the race + // condition that the object could be evicted before the raylet pins it. + if (!plasma_store_provider_->Release(object_id).ok()) { + RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id + << "), might cause a leak in plasma."; + } + })); + } + RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); +} + +void CoreWorker::PromoteObjectToPlasma(const ObjectID &object_id) { + auto value = memory_store_->GetOrPromoteToPlasma(object_id); + if (value) { + PutObjectIntoPlasma(*value, object_id); + } +} + const rpc::Address &CoreWorker::GetRpcAddress() const { return rpc_address_; } rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const { @@ -722,15 +749,8 @@ rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const { return owner_address; } -void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, - rpc::Address *owner_address) { - auto value = memory_store_->GetOrPromoteToPlasma(object_id); - if (value) { - RAY_LOG(DEBUG) << "Storing object promoted to plasma " << object_id; - RAY_CHECK_OK( - Put(*value, /*contained_object_ids=*/{}, object_id, /*pin_object=*/true)); - } - +void CoreWorker::GetOwnershipInfo(const ObjectID &object_id, + rpc::Address *owner_address) { auto has_owner = reference_counter_->GetOwner(object_id, owner_address); RAY_CHECK(has_owner) << "Object IDs generated randomly (ObjectID.from_random()) or out-of-band " @@ -773,7 +793,11 @@ Status CoreWorker::Put(const RayObject &object, const std::vector &contained_object_ids, const ObjectID &object_id, bool pin_object) { bool object_exists; - if (options_.is_local_mode) { + if (options_.is_local_mode || + (RayConfig::instance().put_small_object_in_memory_store() && + static_cast(object.GetSize()) < + RayConfig::instance().max_direct_call_object_size())) { + RAY_LOG(DEBUG) << "Put " << object_id << " in memory store"; RAY_CHECK(memory_store_->Put(object, object_id)); return Status::OK(); } @@ -805,7 +829,10 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t ObjectID *object_id, std::shared_ptr *data) { *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex()); - if (options_.is_local_mode) { + if (options_.is_local_mode || + (RayConfig::instance().put_small_object_in_memory_store() && + static_cast(data_size) < + RayConfig::instance().max_direct_call_object_size())) { *data = std::make_shared(data_size); } else { RAY_RETURN_NOT_OK( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c5d9f51f9..65db7724c 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -338,6 +338,22 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// (local, submitted_task) reference counts. For debugging purposes. std::unordered_map> GetAllReferenceCounts() const; + /// Put an object into plasma. It's a version of Put that directly put the + /// object into plasma and also pin the object. + /// + /// \param[in] The ray object. + /// \param[in] object_id The object ID to serialize. + /// appended to the serialized object ID. + void PutObjectIntoPlasma(const RayObject &object, const ObjectID &object_id); + + /// Promote an object to plasma. If the + /// object already exists locally, it will be put into the plasma store. If + /// it doesn't yet exist, it will be spilled to plasma once available. + /// + /// \param[in] object_id The object ID to serialize. + /// appended to the serialized object ID. + void PromoteObjectToPlasma(const ObjectID &object_id); + /// Get the RPC address of this worker. /// /// \param[out] The RPC address of this worker. @@ -351,11 +367,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[out] The RPC address of the worker that owns this object. rpc::Address GetOwnerAddress(const ObjectID &object_id) const; - /// Promote an object to plasma and get its owner information. This should be + /// Get the owner information of an object. This should be /// called when serializing an object ID, and the returned information should - /// be stored with the serialized object ID. For plasma promotion, if the - /// object already exists locally, it will be put into the plasma store. If - /// it doesn't yet exist, it will be spilled to plasma once available. + /// be stored with the serialized object ID. /// /// This can only be called on object IDs that we created via task /// submission, ray.put, or object IDs that we deserialized. It cannot be @@ -368,8 +382,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// appended to the serialized object ID. /// \param[out] owner_address The address of the object's owner. This should /// be appended to the serialized object ID. - void PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, - rpc::Address *owner_address); + void GetOwnershipInfo(const ObjectID &object_id, rpc::Address *owner_address); /// Add a reference to an ObjectID that was deserialized by the language /// frontend. This will also start the process to resolve the future. diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 616f8ed54..4adb6d3f3 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -40,6 +40,7 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( } buffer_tracker_ = std::make_shared(); RAY_CHECK_OK(store_client_.Connect(store_socket)); + RAY_CHECK_OK(WarmupStore()); } CoreWorkerPlasmaStoreProvider::~CoreWorkerPlasmaStoreProvider() { @@ -419,4 +420,14 @@ void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes( } } +Status CoreWorkerPlasmaStoreProvider::WarmupStore() { + ObjectID object_id = ObjectID::FromRandom(); + std::shared_ptr data; + RAY_RETURN_NOT_OK(Create(nullptr, 8, object_id, &data)); + RAY_RETURN_NOT_OK(Seal(object_id)); + RAY_RETURN_NOT_OK(Release(object_id)); + RAY_RETURN_NOT_OK(Delete({object_id}, false, false)); + return Status::OK(); +} + } // namespace ray diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 1ebc1f5b3..47fb54144 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -138,6 +138,12 @@ class CoreWorkerPlasmaStoreProvider { static void WarnIfAttemptedTooManyTimes(int num_attempts, const absl::flat_hash_set &remaining); + /// Put something in the plasma store so that subsequent plasma store accesses + /// will be faster. Currently the first access is always slow, and we don't + /// want the user to experience this. + /// \return status + Status WarmupStore(); + const std::shared_ptr raylet_client_; plasma::PlasmaClient store_client_; /// Used to look up a plasma object's owner. diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index f7a66476a..18ecec934 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -774,14 +774,15 @@ TEST_F(SingleNodeTest, TestObjectInterface) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; - uint8_t array2[] = {10, 11, 12, 13, 14, 15}; + const size_t array2_size = 200 * 1024; + uint8_t *array2 = new uint8_t[array2_size]; std::vector buffers; buffers.emplace_back(std::make_shared(array1, sizeof(array1)), std::make_shared(array1, sizeof(array1) / 2), std::vector()); - buffers.emplace_back(std::make_shared(array2, sizeof(array2)), - std::make_shared(array2, sizeof(array2) / 2), + buffers.emplace_back(std::make_shared(array2, array2_size), + std::make_shared(array2, array2_size / 2), std::vector()); std::vector ids(buffers.size()); @@ -822,6 +823,8 @@ TEST_F(SingleNodeTest, TestObjectInterface) { // to process the command. usleep(200 * 1000); ASSERT_TRUE(core_worker.Get(ids, 0, &results).IsTimedOut()); + // Since array2 has been deleted from the plasma store, the Get should + // timeout and return nullptr for all results. ASSERT_EQ(results.size(), 2); ASSERT_TRUE(!results[0]); ASSERT_TRUE(!results[1]);