diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 2a67364bf..4c0535e29 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -657,7 +657,7 @@ cdef class CoreWorker: raylet_socket.encode("ascii"), job_id.native(), gcs_options.native()[0], log_dir.encode("utf-8"), node_ip_address.encode("utf-8"), task_execution_handler, - check_signals, False)) + check_signals)) def disconnect(self): with nogil: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 505bf1982..9b513ab36 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -84,8 +84,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[CObjectID] &arg_reference_ids, const c_vector[CObjectID] &return_ids, c_vector[shared_ptr[CRayObject]] *returns) nogil, - CRayStatus() nogil, - c_bool use_memory_store_) + CRayStatus() nogil) void Disconnect() CWorkerType &GetWorkerType() CLanguage &GetLanguage() diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 29fd7932d..764c86e10 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -4,9 +4,6 @@ from libc.stdint cimport uint8_t, uint32_t, int64_t cdef extern from "ray/common/id.h" namespace "ray" nogil: cdef cppclass CBaseID[T]: - @staticmethod - T from_random() - @staticmethod T FromBinary(const c_string &binary) @@ -32,7 +29,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: size_t Size() @staticmethod - CUniqueID from_random() + CUniqueID FromRandom() @staticmethod CUniqueID FromBinary(const c_string &binary) @@ -133,6 +130,9 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: @staticmethod CObjectID FromBinary(const c_string &binary) + @staticmethod + CObjectID FromRandom() + @staticmethod const CObjectID Nil() diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index f18ab5352..0f98709d7 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -110,7 +110,7 @@ cdef class UniqueID(BaseID): @classmethod def from_random(cls): - return cls(os.urandom(CUniqueID.Size())) + return cls(CUniqueID.FromRandom().Binary()) def size(self): return CUniqueID.Size() @@ -197,7 +197,7 @@ cdef class ObjectID(BaseID): @classmethod def from_random(cls): - return cls(os.urandom(CObjectID.Size())) + return cls(CObjectID.FromRandom().Binary()) cdef class TaskID(BaseID): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index e19927c3f..c09942393 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -9,7 +9,6 @@ import glob import io import json import logging -from multiprocessing import Process import os import random import re @@ -2998,27 +2997,6 @@ def test_object_id_properties(): id_dumps = pickle.dumps(object_id) id_from_dumps = pickle.loads(id_dumps) assert id_from_dumps == object_id - file_prefix = "test_object_id_properties" - - # Make sure the ids are fork safe. - def write(index): - str = ray.ObjectID.from_random().hex() - with open("{}{}".format(file_prefix, index), "w") as fo: - fo.write(str) - - def read(index): - with open("{}{}".format(file_prefix, index), "r") as fi: - for line in fi: - return line - - processes = [Process(target=write, args=(_, )) for _ in range(4)] - for process in processes: - process.start() - for process in processes: - process.join() - hexes = {read(i) for i in range(4)} - [os.remove("{}{}".format(file_prefix, i)) for i in range(4)] - assert len(hexes) == 4 @pytest.fixture diff --git a/python/ray/worker.py b/python/ray/worker.py index bc7b3edfc..fa8ed1df4 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -11,7 +11,6 @@ import inspect import io import json import logging -import numpy as np import os import redis import signal @@ -1491,7 +1490,7 @@ def connect(node, # Put something in the plasma store so that subsequent plasma store # accesses will be faster. Currently the first access is always slow, and # we don't want the user to experience this. - temporary_object_id = ray.ObjectID(np.random.bytes(20)) + temporary_object_id = ray.ObjectID.from_random() worker.put_object(1, object_id=temporary_object_id) ray.internal.free([temporary_object_id]) diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 0c611f405..bc82b206d 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -56,6 +56,7 @@ template class BaseID { public: BaseID(); + // Warning: this can duplicate IDs after a fork() call. We assume this never happens. static T FromRandom(); static T FromBinary(const std::string &binary); static const T &Nil(); @@ -102,6 +103,7 @@ class JobID : public BaseID { static size_t Size() { return kLength; } + // Warning: this can duplicate IDs after a fork() call. We assume this never happens. static JobID FromRandom() = delete; JobID() : BaseID() {} @@ -140,6 +142,7 @@ class ActorID : public BaseID { /// \return The `ActorID` with unique bytes being nil. static ActorID NilFromJob(const JobID &job_id); + // Warning: this can duplicate IDs after a fork() call. We assume this never happens. static ActorID FromRandom() = delete; /// Constructor of `ActorID`. @@ -167,6 +170,7 @@ class TaskID : public BaseID { static TaskID ComputeDriverTaskId(const WorkerID &driver_id); + // Warning: this can duplicate IDs after a fork() call. We assume this never happens. static TaskID FromRandom() = delete; /// The ID generated for driver task. @@ -310,6 +314,9 @@ class ObjectID : public BaseID { /// Create an object id randomly. /// + /// Warning: this can duplicate IDs after a fork() call. We assume this + /// never happens. + /// /// \param transport_type Which type of the transport that is used to /// transfer this object. /// diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 894118d6c..951f9048d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -46,7 +46,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, const std::string &log_dir, const std::string &node_ip_address, const TaskExecutionCallback &task_execution_callback, - std::function check_signals, bool use_memory_store) + std::function check_signals) : worker_type_(worker_type), language_(language), log_dir_(log_dir), @@ -55,8 +55,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, heartbeat_timer_(io_service_), worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */), gcs_client_(gcs_options), - object_interface_(worker_context_, raylet_client_, store_socket, use_memory_store, - check_signals), + object_interface_(worker_context_, raylet_client_, store_socket, check_signals), task_execution_service_work_(task_execution_service_), task_execution_callback_(task_execution_callback) { // Initialize logging if log_dir is passed. Otherwise, it must be initialized diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index a8a2b8715..f3ba51011 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -46,20 +46,14 @@ class CoreWorker { /// \parma[in] check_signals Language worker function to check for signals and handle /// them. If the function returns anything but StatusOK, any long-running /// operations in the core worker will short circuit and return that status. - /// \param[in] use_memory_store Whether or not to use the in-memory object store - /// in addition to the plasma store. /// /// NOTE(zhijunfu): the constructor would throw if a failure happens. - /// NOTE(edoakes): the use_memory_store flag is a stop-gap solution to the issue - /// that randomly generated ObjectIDs may use the memory store - /// instead of the plasma store. CoreWorker(const WorkerType worker_type, const Language language, const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, const std::string &log_dir, const std::string &node_ip_address, const TaskExecutionCallback &task_execution_callback, - std::function check_signals = nullptr, - bool use_memory_store = true); + std::function check_signals = nullptr); ~CoreWorker(); diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index e5582442d..43be90e11 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -25,7 +25,7 @@ void CoreWorkerObjectInterface::GroupObjectIdsByStoreProvider( // and are only used locally. // Thus we need to check whether this object is a task return object in additional // to whether it's from direct actor call before we can choose memory store provider. - if (use_memory_store_ && object_id.IsReturnObject() && + if (object_id.IsReturnObject() && object_id.GetTransportType() == static_cast(TaskTransportType::DIRECT_ACTOR)) { type = StoreProviderType::MEMORY; @@ -37,12 +37,10 @@ void CoreWorkerObjectInterface::GroupObjectIdsByStoreProvider( CoreWorkerObjectInterface::CoreWorkerObjectInterface( WorkerContext &worker_context, std::unique_ptr &raylet_client, - const std::string &store_socket, bool use_memory_store, - std::function check_signals) + const std::string &store_socket, std::function check_signals) : worker_context_(worker_context), raylet_client_(raylet_client), store_socket_(store_socket), - use_memory_store_(use_memory_store), memory_store_(std::make_shared()) { check_signals_ = check_signals; AddStoreProvider(StoreProviderType::PLASMA); diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index da5900810..48ae5de62 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -20,11 +20,9 @@ class CoreWorkerObjectInterface { public: /// \param[in] worker_context WorkerContext of the parent CoreWorker. /// \param[in] store_socket Path to the plasma store socket. - /// \param[in] use_memory_store Whether or not to use the in-memory object store - /// in addition to the plasma store. CoreWorkerObjectInterface(WorkerContext &worker_context, std::unique_ptr &raylet_client, - const std::string &store_socket, bool use_memory_store = true, + const std::string &store_socket, std::function check_signals = nullptr); /// Set options for this client's interactions with the object store. @@ -164,7 +162,6 @@ class CoreWorkerObjectInterface { std::unique_ptr &raylet_client_; std::string store_socket_; - bool use_memory_store_; /// In-memory store for return objects. This is used for `MEMORY` store provider. std::shared_ptr memory_store_; diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 1b10ffa8d..4e4f71ccd 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -89,6 +89,7 @@ template using EnumUnorderedMap = std::unordered_map; /// A helper function to fill random bytes into the `data`. +/// Warning: this is not fork-safe, we need to re-seed after that. template void FillRandom(T *data) { RAY_CHECK(data != nullptr);