[Core] put small objects in memory store (#8972)

* remove the put in memory store

* put small objects directly in memory store

* cast data type

* fix another place that uses Put to spill to plasma store

* fix multiple tests related to memory limits

* partially fix test_metrics

* remove not functioning codes

* fix core_worker_test

* refactor put to plasma codes

* add a flag for the new feature

* add flag to more places

* do a warmup round for the plasma store

* lint

* lint again

* fix warmup store

* Update _raylet.pyx

Co-authored-by: Eric Liang <ekhliang@gmail.com>
This commit is contained in:
Zhuohan Li
2020-07-09 15:39:40 -07:00
committed by GitHub
parent 34b85659d4
commit 8a76f4cbb5
18 changed files with 132 additions and 51 deletions
@@ -102,6 +102,8 @@ ray {
// See src/ray/ray_config_def.h for options.
config {
num_workers_per_process_java: 10
// TODO(zhuohan): enable this for java
put_small_object_in_memory_store: false
}
}
+9 -2
View File
@@ -774,9 +774,14 @@ cdef class CoreWorker:
CObjectID c_object_id
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata
int64_t put_threshold
c_bool put_small_object_in_memory_store
c_vector[CObjectID] c_object_id_vector
metadata = string_to_buffer(serialized_object.metadata)
put_threshold = RayConfig.instance().max_direct_call_object_size()
put_small_object_in_memory_store = (
RayConfig.instance().put_small_object_in_memory_store())
total_bytes = serialized_object.total_bytes
object_already_exists = self._create_put_buffer(
metadata, total_bytes, object_id,
@@ -787,7 +792,8 @@ cdef class CoreWorker:
if total_bytes > 0:
(<SerializedObject>serialized_object).write_to(
Buffer.make(data))
if self.is_local_mode:
if self.is_local_mode or (put_small_object_in_memory_store
and <int64_t>total_bytes < put_threshold):
c_object_id_vector.push_back(c_object_id)
check_status(CCoreWorkerProcess.GetCoreWorker().Put(
CRayObject(data, metadata, c_object_id_vector),
@@ -1103,7 +1109,8 @@ cdef class CoreWorker:
cdef:
CObjectID c_object_id = object_id.native()
CAddress c_owner_address = CAddress()
CCoreWorkerProcess.GetCoreWorker().PromoteToPlasmaAndGetOwnershipInfo(
CCoreWorkerProcess.GetCoreWorker().PromoteObjectToPlasma(c_object_id)
CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
c_object_id, &c_owner_address)
return (object_id,
c_owner_address.SerializeAsString())
+4 -2
View File
@@ -127,11 +127,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CActorHandle* GetNamedActorHandle(const c_string &name)
void AddLocalReference(const CObjectID &object_id)
void RemoveLocalReference(const CObjectID &object_id)
void PutObjectIntoPlasma(const CRayObject &object,
const CObjectID &object_id)
const CAddress &GetRpcAddress() const
CAddress GetOwnerAddress(const CObjectID &object_id) const
void PromoteObjectToPlasma(const CObjectID &object_id)
void PromoteToPlasmaAndGetOwnershipInfo(const CObjectID &object_id,
CAddress *owner_address)
void GetOwnershipInfo(const CObjectID &object_id,
CAddress *owner_address)
void RegisterOwnershipInfoAndResolveFuture(
const CObjectID &object_id,
const CObjectID &outer_object_id,
+2
View File
@@ -88,3 +88,5 @@ cdef extern from "ray/common/ray_config.h" nogil:
int64_t max_direct_call_object_size() const
c_bool gcs_actor_service_enabled() const
c_bool put_small_object_in_memory_store() const
+4
View File
@@ -157,3 +157,7 @@ cdef class Config:
@staticmethod
def maximum_gcs_deletion_batch_size():
return RayConfig.instance().maximum_gcs_deletion_batch_size()
@staticmethod
def put_small_object_in_memory_store():
return RayConfig.instance().put_small_object_in_memory_store()
+6 -5
View File
@@ -484,15 +484,16 @@ def test_shutdown_disconnect_global_state():
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_put_pins_object(ray_start_object_store_memory):
x_id = ray.put("HI")
obj = np.ones(200 * 1024, dtype=np.uint8)
x_id = ray.put(obj)
x_binary = x_id.binary()
assert ray.get(ray.ObjectID(x_binary)) == "HI"
assert (ray.get(ray.ObjectID(x_binary)) == obj).all()
# x cannot be evicted since x_id pins it
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
assert ray.get(x_id) == "HI"
assert ray.get(ray.ObjectID(x_binary)) == "HI"
assert (ray.get(x_id) == obj).all()
assert (ray.get(ray.ObjectID(x_binary)) == obj).all()
# now it can be evicted since x_id pins it but x_binary does not
del x_id
@@ -502,7 +503,7 @@ def test_put_pins_object(ray_start_object_store_memory):
ray.ObjectID(x_binary))
# weakref put
y_id = ray.put("HI", weakref=True)
y_id = ray.put(obj, weakref=True)
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
with pytest.raises(ray.exceptions.UnreconstructableError):
+9 -4
View File
@@ -2,6 +2,7 @@ import os
import signal
import sys
import time
import numpy as np
import pytest
@@ -54,7 +55,8 @@ def test_dying_worker_get(ray_start_2_cpus):
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(1, x_id)
obj = np.ones(200 * 1024, dtype=np.uint8)
ray.worker.global_worker.put_object(obj, x_id)
time.sleep(0.1)
# Make sure that nothing has died.
@@ -97,7 +99,8 @@ ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}")))
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(1, x_id)
obj = np.ones(200 * 1024, dtype=np.uint8)
ray.worker.global_worker.put_object(obj, x_id)
time.sleep(0.1)
# Make sure that nothing has died.
@@ -137,7 +140,8 @@ def test_dying_worker_wait(ray_start_2_cpus):
time.sleep(0.1)
# Create the object.
ray.worker.global_worker.put_object(1, x_id)
obj = np.ones(200 * 1024, dtype=np.uint8)
ray.worker.global_worker.put_object(obj, x_id)
time.sleep(0.1)
# Make sure that nothing has died.
@@ -180,7 +184,8 @@ ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))])
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# wait can return.
ray.worker.global_worker.put_object(1, x_id)
obj = np.ones(200 * 1024, dtype=np.uint8)
ray.worker.global_worker.put_object(obj, x_id)
time.sleep(0.1)
# Make sure that nothing has died.
+1 -1
View File
@@ -819,7 +819,7 @@ def test_raylet_crash_when_get(ray_start_regular):
time.sleep(2)
ray.worker._global_node.kill_raylet()
object_id = ray.put(None)
object_id = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
ray.internal.free(object_id)
while ray.worker.global_worker.core_worker.object_exists(object_id):
time.sleep(1)
+2 -1
View File
@@ -62,7 +62,8 @@ class TestMemoryLimits(unittest.TestCase):
num_cpus=1,
object_store_memory=300 * MB,
driver_object_store_memory=driver_quota)
z = ray.put("hi", weakref=True)
obj = np.ones(200 * 1024, dtype=np.uint8)
z = ray.put(obj, weakref=True)
a = LightActor._remote(object_store_memory=a_quota)
b = GreedyActor._remote(object_store_memory=b_quota)
for _ in range(5):
+4 -4
View File
@@ -4,6 +4,7 @@ import grpc
import pytest
import requests
import time
import numpy as np
import ray
from ray.core.generated import node_manager_pb2
@@ -180,7 +181,7 @@ def test_raylet_info_endpoint(shutdown_only):
self.local_storage = [f.remote() for _ in range(10)]
def remote_store(self):
self.remote_storage = ray.put("test")
self.remote_storage = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
def getpid(self):
return os.getpid()
@@ -443,9 +444,8 @@ def test_memory_dashboard(shutdown_only):
return True
def test_object_pineed_in_memory():
import numpy as np
a = ray.put(np.zeros(1))
a = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
b = ray.get(a) # Noqa F841
del a
@@ -469,7 +469,7 @@ def test_memory_dashboard(shutdown_only):
def f(arg):
time.sleep(1)
a = ray.put(None) # Noqa F841
a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) # Noqa F841
b = f.remote(a) # Noqa F841
wait_for_condition(memory_table_ready)
+3 -1
View File
@@ -174,8 +174,10 @@ def test_cleanup_on_driver_exit(call_ray_start):
driver_script = """
import time
import ray
import numpy as np
ray.init(address="{}")
object_ids = [ray.put(i) for i in range(1000)]
object_ids = [ray.put(np.zeros(200 * 1024, dtype=np.uint8))
for i in range(1000)]
start_time = time.time()
while time.time() - start_time < 30:
if len(ray.objects()) == 1000:
-8
View File
@@ -1286,14 +1286,6 @@ def connect(node,
worker.core_worker.set_object_store_client_options(
"ray_driver_{}".format(os.getpid()), driver_object_store_memory)
# Put something in the plasma store so that subsequent plasma store
# accesses will be faster. Currently the first access is always slow, and
# we don't want the user to experience this.
if mode != LOCAL_MODE:
temporary_object_id = ray.ObjectID.from_random()
worker.put_object(1, object_id=temporary_object_id)
ray.internal.free([temporary_object_id])
# Start the import thread
worker.import_thread = import_thread.ImportThread(worker, mode,
worker.threads_stopped)
+3
View File
@@ -315,3 +315,6 @@ RAY_CONFIG(int64_t, gcs_service_address_check_interval_milliseconds, 1000)
RAY_CONFIG(bool, gcs_actor_service_enabled,
getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") != nullptr &&
getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") == std::string("true"))
/// Whether start the Plasma Store as a Raylet thread.
RAY_CONFIG(bool, put_small_object_in_memory_store, false)
+41 -14
View File
@@ -360,9 +360,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
boost::bind(&CoreWorker::TriggerGlobalGC, this),
boost::bind(&CoreWorker::CurrentCallSite, this)));
memory_store_.reset(new CoreWorkerMemoryStore(
[this](const RayObject &obj, const ObjectID &obj_id) {
RAY_LOG(DEBUG) << "Promoting object to plasma " << obj_id;
RAY_CHECK_OK(Put(obj, /*contained_object_ids=*/{}, obj_id, /*pin_object=*/true));
[this](const RayObject &object, const ObjectID &object_id) {
PutObjectIntoPlasma(object, object_id);
return Status::OK();
},
options_.ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_,
options_.check_signals));
@@ -708,6 +708,33 @@ CoreWorker::GetAllReferenceCounts() const {
return counts;
}
void CoreWorker::PutObjectIntoPlasma(const RayObject &object, const ObjectID &object_id) {
bool object_exists;
RAY_CHECK_OK(plasma_store_provider_->Put(object, object_id, &object_exists));
if (!object_exists) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning put object " << object_id;
RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(
rpc_address_, {object_id},
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (!plasma_store_provider_->Release(object_id).ok()) {
RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id
<< "), might cause a leak in plasma.";
}
}));
}
RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
}
void CoreWorker::PromoteObjectToPlasma(const ObjectID &object_id) {
auto value = memory_store_->GetOrPromoteToPlasma(object_id);
if (value) {
PutObjectIntoPlasma(*value, object_id);
}
}
const rpc::Address &CoreWorker::GetRpcAddress() const { return rpc_address_; }
rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const {
@@ -722,15 +749,8 @@ rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const {
return owner_address;
}
void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id,
rpc::Address *owner_address) {
auto value = memory_store_->GetOrPromoteToPlasma(object_id);
if (value) {
RAY_LOG(DEBUG) << "Storing object promoted to plasma " << object_id;
RAY_CHECK_OK(
Put(*value, /*contained_object_ids=*/{}, object_id, /*pin_object=*/true));
}
void CoreWorker::GetOwnershipInfo(const ObjectID &object_id,
rpc::Address *owner_address) {
auto has_owner = reference_counter_->GetOwner(object_id, owner_address);
RAY_CHECK(has_owner)
<< "Object IDs generated randomly (ObjectID.from_random()) or out-of-band "
@@ -773,7 +793,11 @@ Status CoreWorker::Put(const RayObject &object,
const std::vector<ObjectID> &contained_object_ids,
const ObjectID &object_id, bool pin_object) {
bool object_exists;
if (options_.is_local_mode) {
if (options_.is_local_mode ||
(RayConfig::instance().put_small_object_in_memory_store() &&
static_cast<int64_t>(object.GetSize()) <
RayConfig::instance().max_direct_call_object_size())) {
RAY_LOG(DEBUG) << "Put " << object_id << " in memory store";
RAY_CHECK(memory_store_->Put(object, object_id));
return Status::OK();
}
@@ -805,7 +829,10 @@ Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t
ObjectID *object_id, std::shared_ptr<Buffer> *data) {
*object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(),
worker_context_.GetNextPutIndex());
if (options_.is_local_mode) {
if (options_.is_local_mode ||
(RayConfig::instance().put_small_object_in_memory_store() &&
static_cast<int64_t>(data_size) <
RayConfig::instance().max_direct_call_object_size())) {
*data = std::make_shared<LocalMemoryBuffer>(data_size);
} else {
RAY_RETURN_NOT_OK(
+19 -6
View File
@@ -338,6 +338,22 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// (local, submitted_task) reference counts. For debugging purposes.
std::unordered_map<ObjectID, std::pair<size_t, size_t>> GetAllReferenceCounts() const;
/// Put an object into plasma. It's a version of Put that directly put the
/// object into plasma and also pin the object.
///
/// \param[in] The ray object.
/// \param[in] object_id The object ID to serialize.
/// appended to the serialized object ID.
void PutObjectIntoPlasma(const RayObject &object, const ObjectID &object_id);
/// Promote an object to plasma. If the
/// object already exists locally, it will be put into the plasma store. If
/// it doesn't yet exist, it will be spilled to plasma once available.
///
/// \param[in] object_id The object ID to serialize.
/// appended to the serialized object ID.
void PromoteObjectToPlasma(const ObjectID &object_id);
/// Get the RPC address of this worker.
///
/// \param[out] The RPC address of this worker.
@@ -351,11 +367,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[out] The RPC address of the worker that owns this object.
rpc::Address GetOwnerAddress(const ObjectID &object_id) const;
/// Promote an object to plasma and get its owner information. This should be
/// Get the owner information of an object. This should be
/// called when serializing an object ID, and the returned information should
/// be stored with the serialized object ID. For plasma promotion, if the
/// object already exists locally, it will be put into the plasma store. If
/// it doesn't yet exist, it will be spilled to plasma once available.
/// be stored with the serialized object ID.
///
/// This can only be called on object IDs that we created via task
/// submission, ray.put, or object IDs that we deserialized. It cannot be
@@ -368,8 +382,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// appended to the serialized object ID.
/// \param[out] owner_address The address of the object's owner. This should
/// be appended to the serialized object ID.
void PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id,
rpc::Address *owner_address);
void GetOwnershipInfo(const ObjectID &object_id, rpc::Address *owner_address);
/// Add a reference to an ObjectID that was deserialized by the language
/// frontend. This will also start the process to resolve the future.
@@ -40,6 +40,7 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
}
buffer_tracker_ = std::make_shared<BufferTracker>();
RAY_CHECK_OK(store_client_.Connect(store_socket));
RAY_CHECK_OK(WarmupStore());
}
CoreWorkerPlasmaStoreProvider::~CoreWorkerPlasmaStoreProvider() {
@@ -419,4 +420,14 @@ void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes(
}
}
Status CoreWorkerPlasmaStoreProvider::WarmupStore() {
ObjectID object_id = ObjectID::FromRandom();
std::shared_ptr<Buffer> data;
RAY_RETURN_NOT_OK(Create(nullptr, 8, object_id, &data));
RAY_RETURN_NOT_OK(Seal(object_id));
RAY_RETURN_NOT_OK(Release(object_id));
RAY_RETURN_NOT_OK(Delete({object_id}, false, false));
return Status::OK();
}
} // namespace ray
@@ -138,6 +138,12 @@ class CoreWorkerPlasmaStoreProvider {
static void WarnIfAttemptedTooManyTimes(int num_attempts,
const absl::flat_hash_set<ObjectID> &remaining);
/// Put something in the plasma store so that subsequent plasma store accesses
/// will be faster. Currently the first access is always slow, and we don't
/// want the user to experience this.
/// \return status
Status WarmupStore();
const std::shared_ptr<raylet::RayletClient> raylet_client_;
plasma::PlasmaClient store_client_;
/// Used to look up a plasma object's owner.
+6 -3
View File
@@ -774,14 +774,15 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
const size_t array2_size = 200 * 1024;
uint8_t *array2 = new uint8_t[array2_size];
std::vector<RayObject> buffers;
buffers.emplace_back(std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1)),
std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1) / 2),
std::vector<ObjectID>());
buffers.emplace_back(std::make_shared<LocalMemoryBuffer>(array2, sizeof(array2)),
std::make_shared<LocalMemoryBuffer>(array2, sizeof(array2) / 2),
buffers.emplace_back(std::make_shared<LocalMemoryBuffer>(array2, array2_size),
std::make_shared<LocalMemoryBuffer>(array2, array2_size / 2),
std::vector<ObjectID>());
std::vector<ObjectID> ids(buffers.size());
@@ -822,6 +823,8 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
// to process the command.
usleep(200 * 1000);
ASSERT_TRUE(core_worker.Get(ids, 0, &results).IsTimedOut());
// Since array2 has been deleted from the plasma store, the Get should
// timeout and return nullptr for all results.
ASSERT_EQ(results.size(), 2);
ASSERT_TRUE(!results[0]);
ASSERT_TRUE(!results[1]);