From b6bcd18d650b036cc2adbe5400a2a852f209b75e Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 2 Jan 2019 21:33:01 -0800 Subject: [PATCH] Split profile table among many keys in the GCS. (#3676) * Divide profile table among many keys in GCS. * Fix, and remove --collect-profiling-data arg. * Remove reference in doc. --- doc/source/redis-memory-management.rst | 3 --- python/ray/experimental/state.py | 29 +++++++++++++++----------- python/ray/parameter.py | 3 --- python/ray/profiling.py | 13 ------------ python/ray/scripts/scripts.py | 19 +++++------------ python/ray/services.py | 8 +++---- python/ray/worker.py | 28 +++++++------------------ python/ray/workers/default_worker.py | 5 ----- src/ray/gcs/tables.cc | 28 +------------------------ src/ray/gcs/tables.h | 17 --------------- 10 files changed, 34 insertions(+), 119 deletions(-) diff --git a/doc/source/redis-memory-management.rst b/doc/source/redis-memory-management.rst index 5e6edcc02..196bc81b9 100644 --- a/doc/source/redis-memory-management.rst +++ b/doc/source/redis-memory-management.rst @@ -10,6 +10,3 @@ to out-of-memory (OOM) errors. In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the previously documented flushing functionality. - -Note that profiling is disabled when ``redis_max_memory`` is set. This is because -profiling data cannot be LRU evicted. diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 2048444fb..be202054f 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -406,20 +406,20 @@ class GlobalState(object): return ip_filename_file - def _profile_table(self, component_id): - """Get the profile events for a given component. + def _profile_table(self, batch_id): + """Get the profile events for a given batch of profile events. Args: - component_id: An identifier for a component. + batch_id: An identifier for a batch of profile events. Returns: - A list of the profile events for the specified process. + A list of the profile events for the specified batch. """ # TODO(rkn): This method should support limiting the number of log # events and should also support returning a window of events. - message = self._execute_command(component_id, "RAY.TABLE_LOOKUP", + message = self._execute_command(batch_id, "RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.PROFILE, "", - component_id.id()) + batch_id.id()) if message is None: return [] @@ -459,16 +459,21 @@ class GlobalState(object): def profile_table(self): profile_table_keys = self._keys( ray.gcs_utils.TablePrefix_PROFILE_string + "*") - component_identifiers_binary = [ + batch_identifiers_binary = [ key[len(ray.gcs_utils.TablePrefix_PROFILE_string):] for key in profile_table_keys ] - return { - binary_to_hex(component_id): self._profile_table( - binary_to_object_id(component_id)) - for component_id in component_identifiers_binary - } + result = defaultdict(list) + for batch_id in batch_identifiers_binary: + profile_data = self._profile_table(binary_to_object_id(batch_id)) + # Note that if keys are being evicted from Redis, then it is + # possible that the batch will be evicted before we get it. + if len(profile_data) > 0: + component_id = profile_data[0]["component_id"] + result[component_id].extend(profile_data) + + return dict(result) def _seconds_to_microseconds(self, time_in_seconds): """A helper function for converting seconds to microseconds.""" diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 350b118e0..ee41153dd 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -52,7 +52,6 @@ class RayParams(object): node_manager_ports (list): A list of the ports to use for the node managers. There should be one per node manager being started on this node (typically just one). - collect_profiling_data: Whether to collect profiling data from workers. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the @@ -106,7 +105,6 @@ class RayParams(object): redis_shard_ports=None, object_manager_ports=None, node_manager_ports=None, - collect_profiling_data=True, node_ip_address=None, object_id_seed=None, num_workers=None, @@ -143,7 +141,6 @@ class RayParams(object): self.redis_shard_ports = redis_shard_ports self.object_manager_ports = object_manager_ports self.node_manager_ports = node_manager_ports - self.collect_profiling_data = collect_profiling_data self.node_ip_address = node_ip_address self.num_workers = num_workers self.local_mode = local_mode diff --git a/python/ray/profiling.py b/python/ray/profiling.py index d57d827cd..692c5752c 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -128,19 +128,6 @@ class Profiler(object): self.events.append(event) -class NoopProfiler(object): - """A no-op profile used when collect_profile_data=False.""" - - def start_flush_thread(self): - pass - - def flush_profile_data(self): - pass - - def add_event(self, event): - pass - - class RayLogSpanRaylet(object): """An object used to enable logging a span of events with a with statement. diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 492eac51e..d5b6747d4 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -126,14 +126,6 @@ def cli(logging_level, logging_format): "for no limit. Once the limit is exceeded, redis will start LRU " "eviction of entries. This only applies to the sharded " "redis tables (task and object tables).")) -@click.option( - "--collect-profiling-data", - default=True, - type=bool, - help=("Whether to collect profiling data. Note that " - "profiling data cannot be LRU evicted, so if you set " - "redis_max_memory then profiling will also be disabled to prevent " - "it from consuming all available redis memory.")) @click.option( "--num-workers", required=False, @@ -220,11 +212,11 @@ def cli(logging_level, logging_format): def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, - redis_max_memory, collect_profiling_data, num_workers, num_cpus, - num_gpus, resources, head, no_ui, block, plasma_directory, - huge_pages, autoscaling_config, no_redirect_worker_output, - no_redirect_output, plasma_store_socket_name, raylet_socket_name, - temp_dir, internal_config): + redis_max_memory, num_workers, num_cpus, num_gpus, resources, head, + no_ui, block, plasma_directory, huge_pages, autoscaling_config, + no_redirect_worker_output, no_redirect_output, + plasma_store_socket_name, raylet_socket_name, temp_dir, + internal_config): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -292,7 +284,6 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_port=redis_port, redis_shard_ports=redis_shard_ports, redis_max_memory=redis_max_memory, - collect_profiling_data=collect_profiling_data, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, include_webui=(not no_ui), diff --git a/python/ray/services.py b/python/ray/services.py index 083810156..0e061e449 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -934,12 +934,10 @@ def start_raylet(ray_params, "--object-store-name={} " "--raylet-name={} " "--redis-address={} " - "--collect-profiling-data={} " "--temp-dir={}".format( sys.executable, ray_params.worker_path, ray_params.node_ip_address, plasma_store_name, - raylet_name, ray_params.redis_address, "1" - if ray_params.collect_profiling_data else "0", + raylet_name, ray_params.redis_address, get_temp_root())) if ray_params.redis_password: start_worker_command += " --redis-password {}".format( @@ -1498,8 +1496,8 @@ def start_ray_head(ray_params, cleanup=True): following parameters could be checked: address_info, object_manager_ports, node_manager_ports, node_ip_address, redis_port, redis_shard_ports, num_workers, num_local_schedulers, - object_store_memory, redis_max_memory, collect_profiling_data, - worker_path, cleanup, redirect_worker_output, redirect_output, + object_store_memory, redis_max_memory, worker_path, cleanup, + redirect_worker_output, redirect_output, start_workers_from_local_scheduler, resources, num_redis_shards, redis_max_clients, redis_password, include_webui, huge_pages, plasma_directory, autoscaling_config, plasma_store_socket_name, diff --git a/python/ray/worker.py b/python/ray/worker.py index ced1f1cb8..d8590f4a0 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1282,12 +1282,12 @@ def _init(ray_params, driver_id=None): following parameters could be checked: address_info, start_ray_local, object_id_seed, num_workers, num_local_schedulers, object_store_memory, redis_max_memory, - collect_profiling_data, local_mode, redirect_worker_output, - driver_mode, redirect_output, start_workers_from_local_scheduler, - num_cpus, num_gpus, resources, num_redis_shards, - redis_max_clients, redis_password, plasma_directory, huge_pages, - include_webui, driver_id, plasma_store_socket_name, temp_dir, - raylet_socket_name, _internal_config + local_mode, redirect_worker_output, driver_mode, redirect_output, + start_workers_from_local_scheduler, num_cpus, num_gpus, resources, + num_redis_shards, redis_max_clients, redis_password, + plasma_directory, huge_pages, include_webui, driver_id, + plasma_store_socket_name, temp_dir, raylet_socket_name, + _internal_config driver_id: The ID of driver. Returns: @@ -1305,12 +1305,6 @@ def _init(ray_params, driver_id=None): else: ray_params.driver_mode = SCRIPT_MODE - if ray_params.redis_max_memory and ray_params.collect_profiling_data: - logger.warning( - "Profiling data cannot be LRU evicted, so it is disabled " - "when redis_max_memory is set.") - ray_params.collect_profiling_data = False - # Get addresses of existing services. if ray_params.address_info is None: ray_params.address_info = {} @@ -1436,7 +1430,6 @@ def init(redis_address=None, resources=None, object_store_memory=None, redis_max_memory=None, - collect_profiling_data=True, node_ip_address=None, object_id_seed=None, num_workers=None, @@ -1497,7 +1490,6 @@ def init(redis_address=None, to use, or None for no limit. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). - collect_profiling_data: Whether to collect profiling data from workers. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the @@ -1599,7 +1591,6 @@ def init(redis_address=None, include_webui=include_webui, object_store_memory=object_store_memory, redis_max_memory=redis_max_memory, - collect_profiling_data=collect_profiling_data, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, temp_dir=temp_dir, @@ -1816,7 +1807,7 @@ def connect(ray_params, Args: ray_params (ray.params.RayParams): The RayParams instance. The following parameters could be checked: object_id_seed, - redis_password, collect_profiling_data + redis_password info (dict): A dictionary with address of the Redis server and the sockets of the plasma store and raylet. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and @@ -1833,10 +1824,7 @@ def connect(ray_params, if not faulthandler.is_enabled(): faulthandler.enable(all_threads=False) - if ray_params.collect_profiling_data: - worker.profiler = profiling.Profiler(worker) - else: - worker.profiler = profiling.NoopProfiler() + worker.profiler = profiling.Profiler(worker) # Initialize some fields. if mode is WORKER_MODE: diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index f165be673..335b228ec 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -51,11 +51,6 @@ parser.add_argument( type=str, default=ray_constants.LOGGER_FORMAT, help=ray_constants.LOGGER_FORMAT_HELP) -parser.add_argument( - "--collect-profiling-data", - type=int, # int since argparse can't handle bool values - default=1, - help="Whether to collect profiling data from workers.") parser.add_argument( "--temp-dir", required=False, diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index e5770effa..8e60c8e67 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -257,39 +257,13 @@ std::string ErrorTable::DebugString() const { return Log::DebugString(); } -Status ProfileTable::AddProfileEvent(const std::string &event_type, - const std::string &component_type, - const UniqueID &component_id, - const std::string &node_ip_address, - double start_time, double end_time, - const std::string &extra_data) { - auto data = std::make_shared(); - - ProfileEventT profile_event; - profile_event.event_type = event_type; - profile_event.start_time = start_time; - profile_event.end_time = end_time; - profile_event.extra_data = extra_data; - - data->component_type = component_type; - data->component_id = component_id.binary(); - data->node_ip_address = node_ip_address; - data->profile_events.emplace_back(new ProfileEventT(profile_event)); - - return Append(JobID::nil(), component_id, data, - [](ray::gcs::AsyncGcsClient *client, const JobID &id, - const ProfileTableDataT &data) { - RAY_LOG(DEBUG) << "Profile message pushed callback"; - }); -} - Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events) { auto data = std::make_shared(); // There is some room for optimization here because the Append function will just // call "Pack" and undo the "UnPack". profile_events.UnPackTo(data.get()); - return Append(JobID::nil(), from_flatbuf(*profile_events.component_id()), data, + return Append(JobID::nil(), UniqueID::from_random(), data, [](ray::gcs::AsyncGcsClient *client, const JobID &id, const ProfileTableDataT &data) { RAY_LOG(DEBUG) << "Profile message pushed callback"; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 1e17210cd..e879d3dd0 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -501,23 +501,6 @@ class ProfileTable : private Log { prefix_ = TablePrefix::PROFILE; }; - /// Add a single profile event to the profile table. - /// - /// \param event_type The type of the event. - /// \param component_type The type of the component that the event came from. - /// \param component_id An identifier for the component that generated the event. - /// \param node_ip_address The IP address of the node that generated the event. - /// \param start_time The timestamp of the event start, this should be in seconds since - /// the Unix epoch. - /// \param end_time The timestamp of the event end, this should be in seconds since - /// the Unix epoch. If the event is a point event, this should be equal to start_time. - /// \param extra_data Additional data to associate with the event. - /// \return Status. - Status AddProfileEvent(const std::string &event_type, const std::string &component_type, - const UniqueID &component_id, const std::string &node_ip_address, - double start_time, double end_time, - const std::string &extra_data); - /// Add a batch of profiling events to the profile table. /// /// \param profile_events The profile events to record.