diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 03d976751..5e111ef94 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -69,6 +69,7 @@ from ray._raylet import ( TaskID, UniqueID, Language, + PlacementGroupID, ) # noqa: E402 _config = _Config() @@ -170,4 +171,5 @@ __all__ += [ "ObjectRef", "TaskID", "UniqueID", + "PlacementGroupID", ] diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c70e76c54..3b106f5f2 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -38,6 +38,7 @@ from libcpp.string cimport string as c_string from libcpp.utility cimport pair from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector +from libcpp.pair cimport pair as c_pair from cython.operator import dereference, postincrement @@ -52,6 +53,7 @@ from ray.includes.common cimport ( CTaskArgByReference, CTaskArgByValue, CTaskType, + CPlacementStrategy, CRayFunction, LocalMemoryBuffer, move, @@ -64,15 +66,19 @@ from ray.includes.common cimport ( TASK_TYPE_ACTOR_TASK, WORKER_TYPE_WORKER, WORKER_TYPE_DRIVER, + PLACEMENT_STRATEGY_PACK, + PLACEMENT_STRATEGY_SPREAD, ) from ray.includes.unique_ids cimport ( CActorID, CActorCheckpointID, CObjectID, CClientID, + CPlacementGroupID, ) from ray.includes.libcoreworker cimport ( CActorCreationOptions, + CPlacementGroupCreationOptions, CCoreWorkerOptions, CCoreWorkerProcess, CTaskOptions, @@ -907,7 +913,10 @@ cdef class CoreWorker: c_bool is_detached, c_string name, c_bool is_asyncio, - c_string extension_data): + PlacementGroupID placement_group_id, + int64_t placement_group_bundle_index, + c_string extension_data + ): cdef: CRayFunction ray_function c_vector[unique_ptr[CTaskArg]] args_vector @@ -915,6 +924,8 @@ cdef class CoreWorker: unordered_map[c_string, double] c_resources unordered_map[c_string, double] c_placement_resources CActorID c_actor_id + CPlacementGroupID c_placement_group_id = \ + placement_group_id.native() with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) @@ -929,12 +940,43 @@ cdef class CoreWorker: CActorCreationOptions( max_restarts, max_task_retries, max_concurrency, c_resources, c_placement_resources, - dynamic_worker_options, is_detached, name, is_asyncio), + dynamic_worker_options, is_detached, name, is_asyncio, + c_pair[CPlacementGroupID, int64_t](c_placement_group_id, placement_group_bundle_index)), extension_data, &c_actor_id)) return ActorID(c_actor_id.Binary()) + def create_placement_group( + self, + c_string name, + c_vector[unordered_map[c_string, double]] bundles, + c_string strategy): + cdef: + CPlacementGroupID c_placement_group_id + CPlacementStrategy c_strategy + + if strategy == b"PACK": + c_strategy = PLACEMENT_STRATEGY_PACK + else: + if strategy == b"SPREAD": + c_strategy = PLACEMENT_STRATEGY_SPREAD + else: + raise TypeError(strategy) + + with nogil: + check_status( + CCoreWorkerProcess.GetCoreWorker(). + CreatePlacementGroup( + CPlacementGroupCreationOptions( + name, + c_strategy, + bundles + ), + &c_placement_group_id)) + + return PlacementGroupID(c_placement_group_id.Binary()) + def submit_actor_task(self, Language language, ActorID actor_id, diff --git a/python/ray/actor.py b/python/ray/actor.py index f6b418b71..095bb2fa1 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -411,7 +411,9 @@ class ActorClass: max_restarts=None, max_task_retries=None, name=None, - detached=False): + detached=False, + placement_group_id=None, + placement_group_bundle_index=None): """Create an actor. This method allows more flexibility than the remote method because @@ -436,6 +438,10 @@ class ActorClass: guaranteed when max_concurrency > 1. name: The globally unique name for the actor. detached: DEPRECATED. + placement_group_id: the placement group this actor belongs to, + or None if it doesn't belong to any group. + placement_group_bundle_index: the index of the bundle + if the actor belongs to a placement group. Returns: A handle to the newly created actor. @@ -446,7 +452,6 @@ class ActorClass: kwargs = {} if is_direct_call is not None and not is_direct_call: raise ValueError("Non-direct call actors are no longer supported.") - meta = self.__ray_metadata__ actor_has_async_methods = len( inspect.getmembers( @@ -498,6 +503,11 @@ class ActorClass: else: detached = False + if placement_group_id is not None and placement_group_bundle_index is \ + None: + raise ValueError("The placement_group_id is set." + "But the bundle_index is not set.") + # Set the actor's default resources if not already set. First three # conditions are to check that no resources were specified in the # decorator. Last three conditions are to check that no resources were @@ -568,6 +578,10 @@ class ActorClass: detached, name if name is not None else "", is_asyncio, + placement_group_id + if placement_group_id is not None else ray.PlacementGroupID.nil(), + placement_group_bundle_index + if placement_group_bundle_index is not None else -1, # Store actor_method_cpu in actor handle's extension data. extension_data=str(actor_method_cpu)) diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index 81e7f995c..64db9e7fc 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -1,8 +1,10 @@ from .api import get, wait from .dynamic_resources import set_resource - +from .placement_group import ( + placement_group, ) __all__ = [ "get", "wait", "set_resource", + "placement_group", ] diff --git a/python/ray/experimental/placement_group.py b/python/ray/experimental/placement_group.py new file mode 100644 index 000000000..105a33672 --- /dev/null +++ b/python/ray/experimental/placement_group.py @@ -0,0 +1,26 @@ +import ray +from typing import (List, Dict) + + +def placement_group(bundles: List[Dict[str, float]], + strategy: str = "PACK", + name: str = None): + """ + Create a placement group. + + This method is the api to create placement group. + + Args: + bundles: A list of bundles which represent the resources needed. + strategy: The strategy to create the placement group. + There are two build-in strategies for the time begin. + PACK: Packs Bundles close together inside processes or nodes as + tight as possible. + SPREAD: Places Bundles across distinct nodes or processes as even + as possible. + name: The name of the placement group. + """ + worker = ray.worker.global_worker + placement_group_id = worker.core_worker.create_placement_group( + name, bundles, strategy) + return placement_group_id diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 18c3b8d81..0fdf3036f 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -5,6 +5,7 @@ from libcpp.string cimport string as c_string from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector +from libcpp.pair cimport pair as c_pair from ray.includes.unique_ids cimport ( CActorID, @@ -12,6 +13,7 @@ from ray.includes.unique_ids cimport ( CWorkerID, CObjectID, CTaskID, + CPlacementGroupID, ) from ray.includes.function_descriptor cimport ( CFunctionDescriptor, @@ -142,6 +144,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: pass cdef cppclass CTaskType "ray::TaskType": pass + cdef cppclass CPlacementStrategy "ray::PlacementStrategy": + pass cdef cppclass CAddress "ray::rpc::Address": CAddress() const c_string &SerializeAsString() @@ -164,6 +168,11 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CTaskType TASK_TYPE_ACTOR_CREATION_TASK "ray::TaskType::ACTOR_CREATION_TASK" # noqa: E501 cdef CTaskType TASK_TYPE_ACTOR_TASK "ray::TaskType::ACTOR_TASK" +cdef extern from "src/ray/protobuf/common.pb.h" nogil: + cdef CPlacementStrategy PLACEMENT_STRATEGY_PACK \ + "ray::PlacementStrategy::PACK" + cdef CPlacementStrategy PLACEMENT_STRATEGY_SPREAD \ + "ray::PlacementStrategy::SPREAD" cdef extern from "ray/common/task/scheduling_resources.h" nogil: cdef cppclass ResourceSet "ray::ResourceSet": @@ -239,7 +248,17 @@ cdef extern from "ray/core_worker/common.h" nogil: const unordered_map[c_string, double] &resources, const unordered_map[c_string, double] &placement_resources, const c_vector[c_string] &dynamic_worker_options, - c_bool is_detached, c_string &name, c_bool is_asyncio) + c_bool is_detached, c_string &name, c_bool is_asyncio, + c_pair[CPlacementGroupID, int64_t] placement_options) + + cdef cppclass CPlacementGroupCreationOptions \ + "ray::PlacementGroupCreationOptions": + CPlacementGroupCreationOptions() + CPlacementGroupCreationOptions( + const c_string &name, + CPlacementStrategy strategy, + const c_vector[unordered_map[c_string, double]] &bundles + ) cdef extern from "ray/gcs/gcs_client.h" nogil: cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index de6f71be3..3a419f6b2 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -17,11 +17,13 @@ from ray.includes.unique_ids cimport ( CJobID, CTaskID, CObjectID, + CPlacementGroupID, ) from ray.includes.common cimport ( CAddress, CActorCreationOptions, CBuffer, + CPlacementGroupCreationOptions, CRayFunction, CRayObject, CRayStatus, @@ -91,6 +93,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[unique_ptr[CTaskArg]] &args, const CActorCreationOptions &options, const c_string &extension_data, CActorID *actor_id) + CRayStatus CreatePlacementGroup( + const CPlacementGroupCreationOptions &options, + CPlacementGroupID *placement_group_id) void SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, @@ -225,9 +230,12 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: void Initialize(const CCoreWorkerOptions &options) # Only call this in CoreWorker.__cinit__, # use CoreWorker.core_worker to access C++ CoreWorker. + @staticmethod CCoreWorker &GetCoreWorker() + @staticmethod void Shutdown() + @staticmethod void RunTaskExecutionLoop() diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 1f3b62822..22cbacac8 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -158,3 +158,18 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: @staticmethod CWorkerID FromBinary(const c_string &binary) + + cdef cppclass CPlacementGroupID "ray::PlacementGroupID" \ + (CBaseID[CPlacementGroupID]): + + @staticmethod + CPlacementGroupID FromBinary(const c_string &binary) + + @staticmethod + const CActorID Nil() + + @staticmethod + size_t Size() + + @staticmethod + CPlacementGroupID FromRandom() diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index eebbe6d47..1fdc0e863 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -19,7 +19,8 @@ from ray.includes.unique_ids cimport ( CObjectID, CTaskID, CUniqueID, - CWorkerID + CWorkerID, + CPlacementGroupID ) import ray @@ -331,6 +332,43 @@ cdef class ActorClassID(UniqueID): # This type alias is for backward compatibility. ObjectID = ObjectRef +cdef class PlacementGroupID(BaseID): + cdef CPlacementGroupID data + + def __init__(self, id): + check_id(id, CPlacementGroupID.Size()) + self.data = CPlacementGroupID.FromBinary(id) + + cdef CPlacementGroupID native(self): + return self.data + + @classmethod + def from_random(cls): + return cls(CPlacementGroupID.FromRandom().Binary()) + + @classmethod + def nil(cls): + return cls(CPlacementGroupID.Nil().Binary()) + + @classmethod + def size(cls): + return CPlacementGroupID.Size() + + def binary(self): + return self.data.Binary() + + def hex(self): + return decode(self.data.Hex()) + + def size(self): + return CPlacementGroupID.Size() + + def is_nil(self): + return self.data.IsNil() + + cdef size_t hash(self): + return self.data.Hash() + _ID_TYPES = [ ActorCheckpointID, ActorClassID, @@ -342,4 +380,5 @@ _ID_TYPES = [ ObjectID, TaskID, UniqueID, + PlacementGroupID, ] diff --git a/python/ray/state.py b/python/ray/state.py index 593ee2650..d99e9b0fe 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -246,11 +246,14 @@ class GlobalState: "JobID": binary_to_hex(actor_table_data.job_id), "Address": { "IPAddress": actor_table_data.address.ip_address, - "Port": actor_table_data.address.port + "Port": actor_table_data.address.port, + "NodeID": binary_to_hex(actor_table_data.address.raylet_id), }, "OwnerAddress": { "IPAddress": actor_table_data.owner_address.ip_address, - "Port": actor_table_data.owner_address.port + "Port": actor_table_data.owner_address.port, + "NodeID": binary_to_hex( + actor_table_data.owner_address.raylet_id), }, "State": actor_table_data.state, "Timestamp": actor_table_data.timestamp, diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 9a1132769..eadb90dae 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -487,3 +487,11 @@ py_test( tags = ["exclusive"], deps = ["//:ray_lib"], ) + +py_test( + name = "test_placement_group", + size = "medium", + srcs = SRCS + ["test_placement_group.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py new file mode 100644 index 000000000..1e8de3199 --- /dev/null +++ b/python/ray/tests/test_placement_group.py @@ -0,0 +1,149 @@ +import pytest +try: + import pytest_timeout +except ImportError: + pytest_timeout = None +import sys +import os + +import ray +import ray.test_utils +import ray.cluster_utils + + +@pytest.mark.skipif( + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + reason=("This edge case is not handled when GCS actor management is off. " + "We won't fix this because GCS actor management " + "will be on by default anyway.")) +def test_placement_group_pack(ray_start_cluster): + @ray.remote(num_cpus=2) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + cluster = ray_start_cluster + num_nodes = 2 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + placement_group_id = ray.experimental.placement_group( + name="name", strategy="PACK", bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }]) + actor_1 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=0).remote() + actor_2 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=1).remote() + + print(ray.get(actor_1.value.remote())) + print(ray.get(actor_2.value.remote())) + + # Get all actors. + actor_infos = ray.actors() + + # Make sure all actors in counter_list are collocated in one node. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + + assert actor_info_1 and actor_info_2 + + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + assert node_of_actor_1 == node_of_actor_2 + + +@pytest.mark.skipif( + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + reason=("This edge case is not handled when GCS actor management is off. " + "We won't fix this because GCS actor management " + "will be on by default anyway.")) +def test_placement_group_pack_best_effort(ray_start_cluster): + @ray.remote(num_cpus=2) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + # TODO(Shanly): + pass + + +@pytest.mark.skipif( + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + reason=("This edge case is not handled when GCS actor management is off. " + "We won't fix this because GCS actor management " + "will be on by default anyway.")) +def test_placement_group_spread(ray_start_cluster): + @ray.remote(num_cpus=2) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + cluster = ray_start_cluster + num_nodes = 2 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + placement_group_id = ray.experimental.placement_group( + name="name", strategy="SPREAD", bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }]) + actor_1 = Actor.options( + placement_group_id=placement_group_id, bundle_index=0).remote() + actor_2 = Actor.options( + placement_group_id=placement_group_id, bundle_index=1).remote() + + print(ray.get(actor_1.value.remote())) + print(ray.get(actor_2.value.remote())) + + # Get all actors. + actor_infos = ray.actors() + + # Make sure all actors in counter_list are collocated in one node. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + + assert actor_info_1 and actor_info_2 + + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + assert node_of_actor_1 != node_of_actor_2 + + +@pytest.mark.skipif( + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + reason=("This edge case is not handled when GCS actor management is off. " + "We won't fix this because GCS actor management " + "will be on by default anyway.")) +def test_placement_group_spread_best_effort(ray_start_cluster): + @ray.remote(num_cpus=2) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + # TODO(Shanly): + pass + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 3550327f9..4110d98e6 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -154,14 +154,17 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup( ray::rpc::SendReplyCallback send_reply_callback) { auto placement_group_id = PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id()); - + const auto &strategy = request.placement_group_spec().strategy(); + const auto &name = request.placement_group_spec().name(); RAY_LOG(INFO) << "Registering placement group, placement group id = " - << placement_group_id; + << placement_group_id << ", name = " << name + << ", strategy = " << PlacementStrategy_Name(strategy); RegisterPlacementGroup( request, [reply, send_reply_callback, placement_group_id]( std::shared_ptr placement_group) { RAY_LOG(INFO) << "Registered placement group, placement group id = " - << placement_group_id; + << placement_group_id << ", name = " << placement_group->GetName() + << ", strategy = " << placement_group->GetStrategy(); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); }); auto placement_group = std::make_shared(request); @@ -171,7 +174,7 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup( } void GcsPlacementGroupManager::ScheduleTick() { - reschedule_timer_.expires_from_now(boost::posix_time::milliseconds(5)); + reschedule_timer_.expires_from_now(boost::posix_time::milliseconds(500)); reschedule_timer_.async_wait([this](const boost::system::error_code &error) { if (error == boost::system::errc::operation_canceled) { return; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 2230cf525..a2a6f0735 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -154,14 +154,11 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode( std::shared_ptr node, ReserveResourceCallback callback) { RAY_CHECK(node); - auto node_id = ClientID::FromBinary(node->node_id()); - RAY_LOG(DEBUG) << "Start leasing resource from node " << node_id << " for bundle " - << bundle->BundleId().first << std::to_string(bundle->BundleId().second); - rpc::Address remote_address; remote_address.set_raylet_id(node->node_id()); remote_address.set_ip_address(node->node_manager_address()); remote_address.set_port(node->node_manager_port()); + auto node_id = ClientID::FromBinary(node->node_id()); auto lease_client = GetOrConnectLeaseClient(remote_address); RAY_LOG(DEBUG) << "Start leasing resource from node " << node_id << " for bundle " << bundle->BundleId().first << bundle->BundleId().second; @@ -183,7 +180,7 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode( << " for bundle " << bundle->BundleId().first << bundle->BundleId().second; } - // Remove the actor from the leasing map as the reply is returned from the + // Remove the bundle from the leasing map as the reply is returned from the // remote node. iter->second.erase(bundle_iter); if (iter->second.empty()) { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index aa208750e..3258993f8 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -152,7 +152,6 @@ class GcsServer { /// Worker info service std::unique_ptr worker_info_service_; /// Placement Group info handler and service - std::unique_ptr placement_group_info_handler_; std::unique_ptr placement_group_info_service_; /// Backend client std::shared_ptr redis_gcs_client_; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index eb5e806f2..cc266e334 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -410,8 +410,8 @@ class GcsTableStorage { std::unique_ptr object_table_; std::unique_ptr node_table_; std::unique_ptr node_resource_table_; - std::unique_ptr heartbeat_table_; std::unique_ptr placement_group_schedule_table_; + std::unique_ptr heartbeat_table_; std::unique_ptr heartbeat_batch_table_; std::unique_ptr error_info_table_; std::unique_ptr profile_table_; diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 89007cba1..ac10f87c1 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -835,6 +835,11 @@ Status RedisWorkerInfoAccessor::AsyncAdd( return Status::Invalid("Not implemented"); } +Status RedisPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( + const PlacementGroupSpecification &placement_group_spec) { + return Status::Invalid("Not implemented"); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index f279e6056..4869df8e9 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -473,6 +473,14 @@ class RedisWorkerInfoAccessor : public WorkerInfoAccessor { WorkerFailureSubscriptionExecutor worker_failure_sub_executor_; }; +class RedisPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor { + public: + virtual ~RedisPlacementGroupInfoAccessor() = default; + + Status AsyncCreatePlacementGroup( + const PlacementGroupSpecification &placement_group_spec) override; +}; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 6a8865a05..b9a6d1d7f 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -82,6 +82,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { error_accessor_.reset(new RedisErrorInfoAccessor(this)); stats_accessor_.reset(new RedisStatsInfoAccessor(this)); worker_accessor_.reset(new RedisWorkerInfoAccessor(this)); + placement_group_accessor_.reset(new RedisPlacementGroupInfoAccessor()); is_connected_ = true; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bb6d29cc1..3244bf146 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1986,8 +1986,8 @@ ResourceIdSet NodeManager::ScheduleBundle( std::string resource_name = bundle_id_str + "_" + resource.first; local_available_resources_.AddBundleResource(resource_name, resource.second); } - cluster_resource_map_[self_node_id_].UpdateBundleResource( - bundle_id_str, bundle_spec.GetRequiredResources()); + resource_map[self_node_id_].UpdateBundleResource(bundle_id_str, + bundle_spec.GetRequiredResources()); } return acquired_resources; } @@ -2566,7 +2566,6 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, auto acquired_resources = local_available_resources_.Acquire(spec.GetRequiredResources()); cluster_resource_map_[self_node_id_].Acquire(spec.GetRequiredResources()); - if (spec.IsActorCreationTask()) { // Check that the actor's placement resource requirements are satisfied. RAY_CHECK(spec.GetRequiredPlacementResources().IsSubset(