diff --git a/.gitignore b/.gitignore index e8ae93323..fa9fe8f69 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ /src/thirdparty/catapult/ /src/thirdparty/flatbuffers/ /src/thirdparty/parquet-cpp +/thirdparty/pkg/ # Files generated by flatc should be ignored /src/common/format/*.py @@ -143,3 +144,6 @@ build # Pytest Cache **/.pytest_cache + +# Vscode +.vscode/ diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 230a576e6..82e811281 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -195,8 +195,8 @@ static PyObject *PyObjectID_richcompare(PyObjectID *self, static PyObject *PyObjectID_redis_shard_hash(PyObjectID *self) { /* NOTE: The hash function used here must match the one in get_redis_context * in src/common/state/redis.cc. Changes to the hash function should only be - * made through UniqueIDHasher in src/common/common.h */ - UniqueIDHasher hash; + * made through std::hash in src/common/common.h */ + std::hash hash; return PyLong_FromSize_t(hash(self->object_id)); } diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index 62d814a26..e167510ed 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -80,13 +80,13 @@ redisAsyncContext *get_redis_context(DBHandle *db, UniqueID id) { /* NOTE: The hash function used here must match the one in * PyObjectID_redis_shard_hash in src/common/lib/python/common_extension.cc. * Changes to the hash function should only be made through - * UniqueIDHasher in src/common/common.h */ - UniqueIDHasher index; + * std::hash in src/common/common.h */ + std::hash index; return db->contexts[index(id) % db->contexts.size()]; } redisAsyncContext *get_redis_subscribe_context(DBHandle *db, UniqueID id) { - UniqueIDHasher index; + std::hash index; return db->subscribe_contexts[index(id) % db->subscribe_contexts.size()]; } diff --git a/src/common/state/redis.h b/src/common/state/redis.h index dc879eb82..164069740 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -48,7 +48,7 @@ struct DBHandle { int64_t db_index; /** Cache for the IP addresses of db clients. This is an unordered map mapping * client IDs to addresses. */ - std::unordered_map db_client_cache; + std::unordered_map db_client_cache; /** Redis context for synchronous connections. This should only be used very * rarely, it is not asynchronous. */ redisContext *sync_context; diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index 3dcb12593..db97e76eb 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -234,11 +234,9 @@ void add_local_scheduler(GlobalSchedulerState *state, handle_new_local_scheduler(state, state->policy_state, db_client_id); } -std::unordered_map::iterator -remove_local_scheduler( +std::unordered_map::iterator remove_local_scheduler( GlobalSchedulerState *state, - std::unordered_map::iterator - it) { + std::unordered_map::iterator it) { RAY_CHECK(it != state->local_schedulers.end()); DBClientID local_scheduler_id = it->first; it = state->local_schedulers.erase(it); diff --git a/src/global_scheduler/global_scheduler.h b/src/global_scheduler/global_scheduler.h index 30a64b2fd..194559393 100644 --- a/src/global_scheduler/global_scheduler.h +++ b/src/global_scheduler/global_scheduler.h @@ -55,18 +55,15 @@ typedef struct { ray::gcs::AsyncGcsClient gcs_client; /** A hash table mapping local scheduler ID to the local schedulers that are * connected to Redis. */ - std::unordered_map - local_schedulers; + std::unordered_map local_schedulers; /** The state managed by the scheduling policy. */ GlobalSchedulerPolicyState *policy_state; /** The plasma_manager ip:port -> local_scheduler_db_client_id association. */ std::unordered_map plasma_local_scheduler_map; /** The local_scheduler_db_client_id -> plasma_manager ip:port association. */ - std::unordered_map - local_scheduler_plasma_map; + std::unordered_map local_scheduler_plasma_map; /** Objects cached by this global scheduler instance. */ - std::unordered_map - scheduler_object_info_table; + std::unordered_map scheduler_object_info_table; /** An array of tasks that haven't been scheduled yet. */ std::vector pending_tasks; } GlobalSchedulerState; diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 241d3e4fe..279c2bb8b 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -1052,8 +1052,8 @@ void handle_set_actor_frontier(LocalSchedulerState *state, ActorFrontier const &frontier) { /* Parse the ActorFrontier flatbuffer. */ ActorID actor_id = from_flatbuf(*frontier.actor_id()); - std::unordered_map task_counters; - std::unordered_map frontier_dependencies; + std::unordered_map task_counters; + std::unordered_map frontier_dependencies; for (size_t i = 0; i < frontier.handle_ids()->size(); ++i) { ActorID handle_id = from_flatbuf(*frontier.handle_ids()->Get(i)); task_counters[handle_id] = frontier.task_counters()->Get(i); diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 223b27ca5..d16b46f7d 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -53,13 +53,12 @@ typedef struct { * handle. This is used to guarantee execution of tasks on actors in the * order that the tasks were submitted, per handle. Tasks from different * handles to the same actor may be interleaved. */ - std::unordered_map task_counters; + std::unordered_map task_counters; /** These are the execution dependencies that make up the frontier of the * actor's runnable tasks. For each actor handle, we store the object ID * that represents the execution dependency for the next runnable task * submitted by that handle. */ - std::unordered_map - frontier_dependencies; + std::unordered_map frontier_dependencies; /** The return value of the most recently executed task. The next task to * execute should take this as an execution dependency at dispatch time. Set * to nil if there are no execution dependencies (e.g., this is the first @@ -85,12 +84,12 @@ struct SchedulingAlgorithmState { /** This is a hash table from actor ID to information about that actor. In * particular, a queue of tasks that are waiting to execute on that actor. * This is only used for actors that exist locally. */ - std::unordered_map local_actor_infos; + std::unordered_map local_actor_infos; /** This is a set of the IDs of the actors that have tasks waiting to run. * The purpose is to make it easier to dispatch tasks without looping over * all of the actors. Note that this is an optimization and is not strictly * necessary. */ - std::unordered_set actors_with_pending_tasks; + std::unordered_set actors_with_pending_tasks; /** A vector of actor tasks that have been submitted but this local scheduler * doesn't know which local scheduler is responsible for them, so cannot * assign them to the correct local scheduler yet. Whenever a notification @@ -112,13 +111,13 @@ struct SchedulingAlgorithmState { std::vector blocked_workers; /** A hash map of the objects that are available in the local Plasma store. * The key is the object ID. This information could be a little stale. */ - std::unordered_map local_objects; + std::unordered_map local_objects; /** A hash map of the objects that are not available locally. These are * currently being fetched by this local scheduler. The key is the object * ID. Every local_scheduler_fetch_timeout_milliseconds, a Plasma fetch * request will be sent the object IDs in this table. Each entry also holds * an array of queued tasks that are dependent on it. */ - std::unordered_map remote_objects; + std::unordered_map remote_objects; }; SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) { @@ -809,7 +808,7 @@ int rerun_actor_creation_tasks_timeout_handler(event_loop *loop, // Create a set of the dummy object IDs for the actor creation tasks to // reconstruct. - std::unordered_set actor_dummy_objects; + std::unordered_set actor_dummy_objects; for (auto const &execution_spec : state->algorithm_state->cached_submitted_actor_tasks) { ObjectID actor_creation_dummy_object_id = @@ -1805,9 +1804,9 @@ void print_worker_info(const char *message, << " blocked"; } -std::unordered_map -get_actor_task_counters(SchedulingAlgorithmState *algorithm_state, - ActorID actor_id) { +std::unordered_map get_actor_task_counters( + SchedulingAlgorithmState *algorithm_state, + ActorID actor_id) { RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0); return algorithm_state->local_actor_infos[actor_id].task_counters; } @@ -1815,8 +1814,7 @@ get_actor_task_counters(SchedulingAlgorithmState *algorithm_state, void set_actor_task_counters( SchedulingAlgorithmState *algorithm_state, ActorID actor_id, - const std::unordered_map - &task_counters) { + const std::unordered_map &task_counters) { RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0); /* Overwrite the current task counters for the actor. This is necessary * during reconstruction when resuming from a checkpoint so that we can @@ -1860,7 +1858,7 @@ void set_actor_task_counters( } } -std::unordered_map get_actor_frontier( +std::unordered_map get_actor_frontier( SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0); @@ -1871,8 +1869,7 @@ void set_actor_frontier( LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, ActorID actor_id, - const std::unordered_map - &frontier_dependencies) { + const std::unordered_map &frontier_dependencies) { RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0); auto entry = algorithm_state->local_actor_infos[actor_id]; entry.frontier_dependencies = frontier_dependencies; diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index 6ad0558c3..9238d5db5 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -362,9 +362,9 @@ void print_worker_info(const char *message, * @return A map from handle ID to the number of tasks submitted by that handle * that have executed so far. */ -std::unordered_map -get_actor_task_counters(SchedulingAlgorithmState *algorithm_state, - ActorID actor_id); +std::unordered_map get_actor_task_counters( + SchedulingAlgorithmState *algorithm_state, + ActorID actor_id); /** * Set the number of tasks, per actor handle, that have been executed on an @@ -381,8 +381,7 @@ get_actor_task_counters(SchedulingAlgorithmState *algorithm_state, void set_actor_task_counters( SchedulingAlgorithmState *algorithm_state, ActorID actor_id, - const std::unordered_map - &task_counters); + const std::unordered_map &task_counters); /** * Get the actor's frontier of task dependencies. @@ -395,7 +394,7 @@ void set_actor_task_counters( * @return A map from handle ID to execution dependency for the earliest * runnable task submitted through that handle. */ -std::unordered_map get_actor_frontier( +std::unordered_map get_actor_frontier( SchedulingAlgorithmState *algorithm_state, ActorID actor_id); @@ -414,8 +413,7 @@ void set_actor_frontier( LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, ActorID actor_id, - const std::unordered_map - &frontier_dependencies); + const std::unordered_map &frontier_dependencies); /** The following methods are for testing purposes only. */ #ifdef LOCAL_SCHEDULER_TEST diff --git a/src/local_scheduler/local_scheduler_shared.h b/src/local_scheduler/local_scheduler_shared.h index 762518a32..013cf7a78 100644 --- a/src/local_scheduler/local_scheduler_shared.h +++ b/src/local_scheduler/local_scheduler_shared.h @@ -48,16 +48,16 @@ struct LocalSchedulerState { std::list workers; /** A set of driver IDs corresponding to drivers that have been removed. This * is used to make sure we don't execute any tasks belong to dead drivers. */ - std::unordered_set removed_drivers; + std::unordered_set removed_drivers; /** A set of actors IDs corresponding to local actors that have been removed. * This ensures we can reject any tasks destined for dead actors. */ - std::unordered_set removed_actors; + std::unordered_set removed_actors; /** List of the process IDs for child processes (workers) started by the * local scheduler that have not sent a REGISTER_PID message yet. */ std::vector child_pids; /** A hash table mapping actor IDs to the db_client_id of the local scheduler * that is responsible for the actor. */ - std::unordered_map actor_mapping; + std::unordered_map actor_mapping; /** The handle to the database. */ DBHandle *db; /** The handle to the GCS (modern version of the above). */ diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index 6da9e28db..8529e1610 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -221,20 +221,20 @@ struct PlasmaManagerState { int port; /** Unordered map of outstanding fetch requests. The key is the object ID. The * value is the data needed to perform the fetch. */ - std::unordered_map fetch_requests; + std::unordered_map fetch_requests; /** Unordered map of outstanding wait requests. The key is the object ID. The * value is the vector of wait requests that are waiting for the object to * arrive locally. */ - std::unordered_map, UniqueIDHasher> + std::unordered_map> object_wait_requests_local; /** Unordered map of outstanding wait requests. The key is the object ID. The * value is the vector of wait requests that are waiting for the object to * be available somewhere in the system. */ - std::unordered_map, UniqueIDHasher> + std::unordered_map> object_wait_requests_remote; /** Initialize an empty unordered set for the cache of local available object. */ - std::unordered_set local_available_objects; + std::unordered_set local_available_objects; /** The time (in milliseconds since the Unix epoch) when the most recent * heartbeat was sent. */ int64_t previous_heartbeat_time; @@ -247,7 +247,7 @@ struct PlasmaManagerState { * object is removed. If object transfers between managers is parallelized, * then all objects being received from a remote manager will need to be * removed if the connection to the remote manager fails. */ - std::unordered_set receives_in_progress; + std::unordered_set receives_in_progress; }; PlasmaManagerState *g_manager_state = NULL; @@ -265,8 +265,7 @@ struct ClientConnection { /* A set of object IDs which are queued in the transfer_queue and waiting to * be sent. This is used to avoid sending the same object ID to the same * manager multiple times. */ - std::unordered_map - pending_object_transfers; + std::unordered_map pending_object_transfers; /** Buffer used to receive transfers (data fetches) we want to ignore */ PlasmaRequestBuffer *ignore_buffer; /** File descriptor for the socket connected to the other @@ -317,7 +316,7 @@ bool ClientConnection_request_finished(ClientConnection *client_conn) { return client_conn->cursor == -1; } -std::unordered_map, UniqueIDHasher> & +std::unordered_map> & object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) { /* We use different types of hash tables for different requests. */ RAY_CHECK(type == plasma::PLASMA_QUERY_LOCAL || diff --git a/src/plasma/plasma_protocol.cc b/src/plasma/plasma_protocol.cc index 1f97d9398..42fa84cc3 100644 --- a/src/plasma/plasma_protocol.cc +++ b/src/plasma/plasma_protocol.cc @@ -370,11 +370,10 @@ Status ReadGetRequest(uint8_t *data, return Status::OK(); } -Status SendGetReply( - int sock, - ObjectID object_ids[], - std::unordered_map &plasma_objects, - int64_t num_objects) { +Status SendGetReply(int sock, + ObjectID object_ids[], + std::unordered_map &plasma_objects, + int64_t num_objects) { flatbuffers::FlatBufferBuilder fbb; std::vector objects; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index e46ccbddf..d5c4df088 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -517,7 +517,7 @@ class ClientTable : private Log { /// The callback to call when a client is removed. ClientTableCallback client_removed_callback_; /// A cache for information about all clients. - std::unordered_map client_cache_; + std::unordered_map client_cache_; }; } // namespace gcs diff --git a/src/ray/id.cc b/src/ray/id.cc index e872aa294..4d9634623 100644 --- a/src/ray/id.cc +++ b/src/ray/id.cc @@ -81,6 +81,12 @@ bool UniqueID::operator==(const UniqueID &rhs) const { return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0; } +size_t UniqueID::hash() const { + size_t result; + std::memcpy(&result, id_, sizeof(size_t)); + return result; +} + std::ostream &operator<<(std::ostream &os, const UniqueID &id) { os << id.hex(); return os; diff --git a/src/ray/id.h b/src/ray/id.h index 51aa52f93..beb15fc5e 100644 --- a/src/ray/id.h +++ b/src/ray/id.h @@ -19,6 +19,7 @@ class RAY_EXPORT UniqueID { static UniqueID from_random(); static UniqueID from_binary(const std::string &binary); static const UniqueID nil(); + size_t hash() const; bool is_nil() const; bool operator==(const UniqueID &rhs) const; const uint8_t *data() const; @@ -35,15 +36,6 @@ class RAY_EXPORT UniqueID { static_assert(std::is_standard_layout::value, "UniqueID must be standard"); -struct UniqueIDHasher { - // ID hashing function. - size_t operator()(const UniqueID &id) const { - size_t result; - std::memcpy(&result, id.data(), sizeof(size_t)); - return result; - } -}; - std::ostream &operator<<(std::ostream &os, const UniqueID &id); typedef UniqueID TaskID; @@ -98,4 +90,15 @@ int64_t ComputeObjectIndex(const ObjectID &object_id); } // namespace ray +namespace std { +template <> +struct hash<::ray::UniqueID> { + size_t operator()(const ::ray::UniqueID &id) const { return id.hash(); } +}; + +template <> +struct hash { + size_t operator()(const ::ray::UniqueID &id) const { return id.hash(); } +}; +} #endif // RAY_ID_H_ diff --git a/src/ray/object_manager/connection_pool.h b/src/ray/object_manager/connection_pool.h index 4ce2133d4..15774a287 100644 --- a/src/ray/object_manager/connection_pool.h +++ b/src/ray/object_manager/connection_pool.h @@ -91,11 +91,10 @@ class ConnectionPool { private: /// A container type that maps ClientID to a connection type. using SenderMapType = - std::unordered_map>, - ray::UniqueIDHasher>; + std::unordered_map>>; using ReceiverMapType = - std::unordered_map>, - ray::UniqueIDHasher>; + std::unordered_map>>; /// Adds a receiver for ClientID to the given map. void Add(ReceiverMapType &conn_map, const ClientID &client_id, diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 3edc1be30..4030e09e7 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -179,11 +179,9 @@ class ObjectBufferPool { /// Determines the maximum chunk size to be transferred by a single thread. const uint64_t chunk_size_; /// The state of a buffer that's currently being used. - std::unordered_map - get_buffer_state_; + std::unordered_map get_buffer_state_; /// The state of a buffer that's currently being used. - std::unordered_map - create_buffer_state_; + std::unordered_map create_buffer_state_; /// Plasma client pool. plasma::PlasmaClient store_client_; diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 77f9a3a03..e7a6c8504 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -77,7 +77,7 @@ void ObjectDirectory::GetLocationsComplete( return; } // Build the set of current locations based on the entries in the log. - std::unordered_set locations; + std::unordered_set locations; for (auto entry : location_entries) { ClientID client_id = ClientID::from_binary(entry.manager); if (!entry.is_eviction) { diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index df326125c..7eca8c550 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -123,7 +123,7 @@ class ObjectDirectory : public ObjectDirectoryInterface { const std::vector &location_entries); /// Maintain map of in-flight GetLocation requests. - std::unordered_map existing_requests_; + std::unordered_map existing_requests_; /// Reference to the gcs client. std::shared_ptr gcs_client_; }; diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 1dad93f60..117a3073d 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -179,12 +179,11 @@ class ObjectManager { ConnectionPool connection_pool_; /// Timeout for failed pull requests. - std::unordered_map, - UniqueIDHasher> + std::unordered_map> pull_requests_; /// Cache of locally available objects. - std::unordered_map local_objects_; + std::unordered_map local_objects_; /// Handle starting, running, and stopping asio io_service. void StartIOService(); diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc index c1e6303fb..efb5699e6 100644 --- a/src/ray/raylet/actor_registration.cc +++ b/src/ray/raylet/actor_registration.cc @@ -23,7 +23,7 @@ const ObjectID ActorRegistration::GetExecutionDependency() const { return execution_dependency_; } -const std::unordered_map +const std::unordered_map &ActorRegistration::GetFrontier() const { return frontier_; } diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h index 486be2719..e5721b686 100644 --- a/src/ray/raylet/actor_registration.h +++ b/src/ray/raylet/actor_registration.h @@ -63,8 +63,7 @@ class ActorRegistration { /// /// \return The actor frontier, a map from handle ID to execution state for /// that handle. - const std::unordered_map &GetFrontier() - const; + const std::unordered_map &GetFrontier() const; /// Extend the frontier of the actor by a single task. This should be called /// whenever the actor executes a task. @@ -86,7 +85,7 @@ class ActorRegistration { /// The execution frontier of the actor, which represents which tasks have /// executed so far and which tasks may execute next, based on execution /// dependencies. This is indexed by handle. - std::unordered_map frontier_; + std::unordered_map frontier_; }; } // namespace raylet diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 11aef6007..592c26481 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -27,9 +27,8 @@ const TaskID LineageEntry::GetEntryId() const { return task_.GetTaskSpecification().TaskId(); } -const std::unordered_set LineageEntry::GetParentTaskIds() - const { - std::unordered_set parent_ids; +const std::unordered_set LineageEntry::GetParentTaskIds() const { + std::unordered_set parent_ids; // A task's parents are the tasks that created its arguments. auto dependencies = task_.GetDependencies(); for (auto &dependency : dependencies) { @@ -104,8 +103,7 @@ boost::optional Lineage::PopEntry(const UniqueID &task_id) { } } -const std::unordered_map - &Lineage::GetEntries() const { +const std::unordered_map &Lineage::GetEntries() const { return entries_; } diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 3b40f8d88..44b9b62f4 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -76,7 +76,7 @@ class LineageEntry { /// that created its arguments. /// /// \return The IDs of the parent entries. - const std::unordered_set GetParentTaskIds() const; + const std::unordered_set GetParentTaskIds() const; /// Get the task data. /// @@ -85,7 +85,6 @@ class LineageEntry { Task &TaskDataMutable(); - private: /// The current state of this entry according to its status in the GCS. GcsStatus status_; /// The task data to be written to the GCS. This is nullptr if the entry is @@ -139,8 +138,7 @@ class Lineage { /// Get all entries in the lineage. /// /// \return A const reference to the lineage entries. - const std::unordered_map &GetEntries() - const; + const std::unordered_map &GetEntries() const; /// Serialize this lineage to a ForwardTaskRequest flatbuffer. /// @@ -153,7 +151,7 @@ class Lineage { private: /// The lineage entries. - std::unordered_map entries_; + std::unordered_map entries_; }; /// \class LineageCache @@ -226,13 +224,13 @@ class LineageCache { // which tasks are flushable, to avoid iterating over tasks that are in // UNCOMMITTED_READY, but that have dependencies that have not been committed // yet. - std::unordered_set uncommitted_ready_tasks_; + std::unordered_set uncommitted_ready_tasks_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_; /// The tasks that we've subscribed to notifications for from the pubsub /// storage system. We will receive a notification for these tasks on commit. - std::unordered_set subscribed_tasks_; + std::unordered_set subscribed_tasks_; }; } // namespace raylet diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index de89a5b57..9a51a3cf9 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -68,21 +68,17 @@ class MockGcs : public gcs::TableInterface, callbacks_.clear(); } - const std::unordered_map, UniqueIDHasher> - &TaskTable() const { + const std::unordered_map> &TaskTable() const { return task_table_; } - const std::unordered_set &SubscribedTasks() const { - return subscribed_tasks_; - } + const std::unordered_set &SubscribedTasks() const { return subscribed_tasks_; } private: - std::unordered_map, UniqueIDHasher> - task_table_; + std::unordered_map> task_table_; std::vector> callbacks_; gcs::raylet::TaskTable::WriteCallback notification_callback_; - std::unordered_set subscribed_tasks_; + std::unordered_set subscribed_tasks_; }; class LineageCacheTest : public ::testing::Test { diff --git a/src/ray/raylet/mock_gcs_client.cc b/src/ray/raylet/mock_gcs_client.cc index c8c370227..69b197899 100644 --- a/src/ray/raylet/mock_gcs_client.cc +++ b/src/ray/raylet/mock_gcs_client.cc @@ -31,7 +31,7 @@ ray::Status ObjectTable::Add(const ObjectID &object_id, const ClientID &client_i const DoneCallback &done_callback) { if (client_lookup.count(object_id) == 0) { RAY_LOG(DEBUG) << "Add ObjectID set " << object_id; - client_lookup[object_id] = std::unordered_set(); + client_lookup[object_id] = std::unordered_set(); } else if (client_lookup[object_id].count(client_id) != 0) { return ray::Status::KeyError("ClientID already exists."); } diff --git a/src/ray/raylet/mock_gcs_client.h b/src/ray/raylet/mock_gcs_client.h index b519e6fea..f84b57dbc 100644 --- a/src/ray/raylet/mock_gcs_client.h +++ b/src/ray/raylet/mock_gcs_client.h @@ -31,9 +31,7 @@ class ObjectTable { private: std::vector empty_set_; - std::unordered_map, - UniqueIDHasher> - client_lookup; + std::unordered_map> client_lookup; }; class ClientInformation { @@ -53,7 +51,7 @@ class ClientInformation { class ClientTable { public: - typedef std::unordered_map info_type; + typedef std::unordered_map info_type; using ClientIDsCallback = std::function)>; using SingleInfoCallback = std::function; diff --git a/src/ray/raylet/monitor.h b/src/ray/raylet/monitor.h index 3b383160e..21bad9b3e 100644 --- a/src/ray/raylet/monitor.h +++ b/src/ray/raylet/monitor.h @@ -44,9 +44,9 @@ class Monitor { boost::asio::deadline_timer heartbeat_timer_; /// For each Raylet that we receive a heartbeat from, the number of ticks /// that may pass before the Raylet will be declared dead. - std::unordered_map heartbeats_; + std::unordered_map heartbeats_; /// The Raylets that have been marked as dead in the client table. - std::unordered_set dead_clients_; + std::unordered_set dead_clients_; }; } // namespace raylet diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c346d82d5..82740f1f5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -32,8 +32,7 @@ RAY_CHECK_ENUM(protocol::MessageType_SetActorFrontier, MessageType_SetActorFront /// A helper function to determine whether a given actor task has already been executed /// according to the given actor registry. Returns true if the task is a duplicate. bool CheckDuplicateActorTask( - const std::unordered_map - &actor_registry, + const std::unordered_map &actor_registry, const ray::raylet::TaskSpecification &spec) { auto actor_entry = actor_registry.find(spec.ActorId()); RAY_CHECK(actor_entry != actor_registry.end()); @@ -262,7 +261,7 @@ void NodeManager::HandleActorCreation(const ActorID &actor_id, // Dequeue any methods that were submitted before the actor's location was // known. const auto &methods = local_queues_.GetUncreatedActorMethods(); - std::unordered_set created_actor_method_ids; + std::unordered_set created_actor_method_ids; for (const auto &method : methods) { if (method.GetTaskSpecification().ActorId() == actor_id) { created_actor_method_ids.insert(method.GetTaskSpecification().TaskId()); @@ -482,7 +481,7 @@ void NodeManager::ScheduleTasks() { } // Extract decision for this local scheduler. - std::unordered_set local_task_ids; + std::unordered_set local_task_ids; // Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node. for (const auto &task_schedule : policy_decision) { TaskID task_id = task_schedule.first; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index e63aa1f40..3cf77327d 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -100,7 +100,7 @@ class NodeManager { /// The resources local to this node. const SchedulingResources local_resources_; // TODO(atumanov): Add resource information from other nodes. - std::unordered_map cluster_resource_map_; + std::unordered_map cluster_resource_map_; /// A pool of workers. WorkerPool worker_pool_; /// A set of queues to maintain tasks. @@ -114,9 +114,8 @@ class NodeManager { /// The lineage cache for the GCS object and task tables. LineageCache lineage_cache_; std::vector remote_clients_; - std::unordered_map - remote_server_connections_; - std::unordered_map actor_registry_; + std::unordered_map remote_server_connections_; + std::unordered_map actor_registry_; }; } // namespace raylet diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index 0ec8f3d08..4d13fdc4c 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -9,12 +9,11 @@ namespace raylet { SchedulingPolicy::SchedulingPolicy(const SchedulingQueue &scheduling_queue) : scheduling_queue_(scheduling_queue), gen_(rd_()) {} -std::unordered_map SchedulingPolicy::Schedule( - const std::unordered_map - &cluster_resources, +std::unordered_map SchedulingPolicy::Schedule( + const std::unordered_map &cluster_resources, const ClientID &local_client_id, const std::vector &others) { // The policy decision to be returned. - std::unordered_map decision; + std::unordered_map decision; // TODO(atumanov): protect DEBUG code blocks with ifdef DEBUG RAY_LOG(DEBUG) << "[Schedule] cluster resource map: "; for (const auto &client_resource_pair : cluster_resources) { diff --git a/src/ray/raylet/scheduling_policy.h b/src/ray/raylet/scheduling_policy.h index f049cfc22..6785f189f 100644 --- a/src/ray/raylet/scheduling_policy.h +++ b/src/ray/raylet/scheduling_policy.h @@ -28,9 +28,8 @@ class SchedulingPolicy { /// \param cluster_resources: a set of cluster resources representing /// configured and current resource capacity on each node. /// \return Scheduling decision, mapping tasks to node managers for placement. - std::unordered_map Schedule( - const std::unordered_map - &cluster_resources, + std::unordered_map Schedule( + const std::unordered_map &cluster_resources, const ClientID &local_client_id, const std::vector &others); /// \brief SchedulingPolicy destructor. diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 1ff6963cb..f8cd9d785 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -36,8 +36,7 @@ const std::list &SchedulingQueue::GetReadyMethods() const { // Helper function to remove tasks in the given set of task_ids from a // queue, and append them to the given vector removed_tasks. -void removeTasksFromQueue(std::list &queue, - std::unordered_set &task_ids, +void removeTasksFromQueue(std::list &queue, std::unordered_set &task_ids, std::vector &removed_tasks) { for (auto it = queue.begin(); it != queue.end();) { auto task_id = task_ids.find(it->GetTaskSpecification().TaskId()); @@ -58,8 +57,7 @@ void queueTasks(std::list &queue, const std::vector &tasks) { } } -std::vector SchedulingQueue::RemoveTasks( - std::unordered_set task_ids) { +std::vector SchedulingQueue::RemoveTasks(std::unordered_set task_ids) { // List of removed tasks to be returned. std::vector removed_tasks; diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index 068bfceb6..7da3050f3 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -77,7 +77,7 @@ class SchedulingQueue { /// \param tasks The set of task IDs to remove from the queue. The /// corresponding tasks must be contained in the queue. /// \return A vector of the tasks that were removed. - std::vector RemoveTasks(std::unordered_set tasks); + std::vector RemoveTasks(std::unordered_set tasks); /// Queue tasks that are destined for actors that have not yet been created. /// diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index e45a826b1..b336ed4db 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -73,14 +73,12 @@ class TaskDependencyManager { ObjectManager &object_manager_; /// A mapping from task ID of each subscribed task to its list of /// dependencies. - std::unordered_map, UniqueIDHasher> - task_dependencies_; + std::unordered_map> task_dependencies_; // A mapping from object ID of each object that is not locally available to // the list of subscribed tasks that are dependent on it. - std::unordered_map, UniqueIDHasher> - remote_object_dependencies_; + std::unordered_map> remote_object_dependencies_; // The set of locally available objects. - std::unordered_set local_objects_; + std::unordered_set local_objects_; // The callback to call when a subscribed task becomes ready. std::function task_ready_callback_; }; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 5cb5bb2bf..8b6ef1e54 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -103,7 +103,7 @@ class WorkerPool { /// The pool of idle workers. std::list> pool_; /// The pool of idle actor workers. - std::unordered_map, UniqueIDHasher> actor_pool_; + std::unordered_map> actor_pool_; /// All workers that have registered and are still connected, including both /// idle and executing. // TODO(swang): Make this a map to make GetRegisteredWorker faster.