Split profile table among many keys in the GCS. (#3676)

* Divide profile table among many keys in GCS.

* Fix, and remove --collect-profiling-data arg.

* Remove reference in doc.
This commit is contained in:
Robert Nishihara
2019-01-02 21:33:01 -08:00
committed by Philipp Moritz
parent 93e9d2b82c
commit b6bcd18d65
10 changed files with 34 additions and 119 deletions
+17 -12
View File
@@ -406,20 +406,20 @@ class GlobalState(object):
return ip_filename_file
def _profile_table(self, component_id):
"""Get the profile events for a given component.
def _profile_table(self, batch_id):
"""Get the profile events for a given batch of profile events.
Args:
component_id: An identifier for a component.
batch_id: An identifier for a batch of profile events.
Returns:
A list of the profile events for the specified process.
A list of the profile events for the specified batch.
"""
# TODO(rkn): This method should support limiting the number of log
# events and should also support returning a window of events.
message = self._execute_command(component_id, "RAY.TABLE_LOOKUP",
message = self._execute_command(batch_id, "RAY.TABLE_LOOKUP",
ray.gcs_utils.TablePrefix.PROFILE, "",
component_id.id())
batch_id.id())
if message is None:
return []
@@ -459,16 +459,21 @@ class GlobalState(object):
def profile_table(self):
profile_table_keys = self._keys(
ray.gcs_utils.TablePrefix_PROFILE_string + "*")
component_identifiers_binary = [
batch_identifiers_binary = [
key[len(ray.gcs_utils.TablePrefix_PROFILE_string):]
for key in profile_table_keys
]
return {
binary_to_hex(component_id): self._profile_table(
binary_to_object_id(component_id))
for component_id in component_identifiers_binary
}
result = defaultdict(list)
for batch_id in batch_identifiers_binary:
profile_data = self._profile_table(binary_to_object_id(batch_id))
# Note that if keys are being evicted from Redis, then it is
# possible that the batch will be evicted before we get it.
if len(profile_data) > 0:
component_id = profile_data[0]["component_id"]
result[component_id].extend(profile_data)
return dict(result)
def _seconds_to_microseconds(self, time_in_seconds):
"""A helper function for converting seconds to microseconds."""
-3
View File
@@ -52,7 +52,6 @@ class RayParams(object):
node_manager_ports (list): A list of the ports to use for the node
managers. There should be one per node manager being started on
this node (typically just one).
collect_profiling_data: Whether to collect profiling data from workers.
node_ip_address (str): The IP address of the node that we are on.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
@@ -106,7 +105,6 @@ class RayParams(object):
redis_shard_ports=None,
object_manager_ports=None,
node_manager_ports=None,
collect_profiling_data=True,
node_ip_address=None,
object_id_seed=None,
num_workers=None,
@@ -143,7 +141,6 @@ class RayParams(object):
self.redis_shard_ports = redis_shard_ports
self.object_manager_ports = object_manager_ports
self.node_manager_ports = node_manager_ports
self.collect_profiling_data = collect_profiling_data
self.node_ip_address = node_ip_address
self.num_workers = num_workers
self.local_mode = local_mode
-13
View File
@@ -128,19 +128,6 @@ class Profiler(object):
self.events.append(event)
class NoopProfiler(object):
"""A no-op profile used when collect_profile_data=False."""
def start_flush_thread(self):
pass
def flush_profile_data(self):
pass
def add_event(self, event):
pass
class RayLogSpanRaylet(object):
"""An object used to enable logging a span of events with a with statement.
+5 -14
View File
@@ -126,14 +126,6 @@ def cli(logging_level, logging_format):
"for no limit. Once the limit is exceeded, redis will start LRU "
"eviction of entries. This only applies to the sharded "
"redis tables (task and object tables)."))
@click.option(
"--collect-profiling-data",
default=True,
type=bool,
help=("Whether to collect profiling data. Note that "
"profiling data cannot be LRU evicted, so if you set "
"redis_max_memory then profiling will also be disabled to prevent "
"it from consuming all available redis memory."))
@click.option(
"--num-workers",
required=False,
@@ -220,11 +212,11 @@ def cli(logging_level, logging_format):
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, object_store_memory,
redis_max_memory, collect_profiling_data, num_workers, num_cpus,
num_gpus, resources, head, no_ui, block, plasma_directory,
huge_pages, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, internal_config):
redis_max_memory, num_workers, num_cpus, num_gpus, resources, head,
no_ui, block, plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir,
internal_config):
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
@@ -292,7 +284,6 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
include_webui=(not no_ui),
+3 -5
View File
@@ -934,12 +934,10 @@ def start_raylet(ray_params,
"--object-store-name={} "
"--raylet-name={} "
"--redis-address={} "
"--collect-profiling-data={} "
"--temp-dir={}".format(
sys.executable, ray_params.worker_path,
ray_params.node_ip_address, plasma_store_name,
raylet_name, ray_params.redis_address, "1"
if ray_params.collect_profiling_data else "0",
raylet_name, ray_params.redis_address,
get_temp_root()))
if ray_params.redis_password:
start_worker_command += " --redis-password {}".format(
@@ -1498,8 +1496,8 @@ def start_ray_head(ray_params, cleanup=True):
following parameters could be checked: address_info,
object_manager_ports, node_manager_ports, node_ip_address,
redis_port, redis_shard_ports, num_workers, num_local_schedulers,
object_store_memory, redis_max_memory, collect_profiling_data,
worker_path, cleanup, redirect_worker_output, redirect_output,
object_store_memory, redis_max_memory, worker_path, cleanup,
redirect_worker_output, redirect_output,
start_workers_from_local_scheduler, resources, num_redis_shards,
redis_max_clients, redis_password, include_webui, huge_pages,
plasma_directory, autoscaling_config, plasma_store_socket_name,
+8 -20
View File
@@ -1282,12 +1282,12 @@ def _init(ray_params, driver_id=None):
following parameters could be checked: address_info,
start_ray_local, object_id_seed, num_workers,
num_local_schedulers, object_store_memory, redis_max_memory,
collect_profiling_data, local_mode, redirect_worker_output,
driver_mode, redirect_output, start_workers_from_local_scheduler,
num_cpus, num_gpus, resources, num_redis_shards,
redis_max_clients, redis_password, plasma_directory, huge_pages,
include_webui, driver_id, plasma_store_socket_name, temp_dir,
raylet_socket_name, _internal_config
local_mode, redirect_worker_output, driver_mode, redirect_output,
start_workers_from_local_scheduler, num_cpus, num_gpus, resources,
num_redis_shards, redis_max_clients, redis_password,
plasma_directory, huge_pages, include_webui, driver_id,
plasma_store_socket_name, temp_dir, raylet_socket_name,
_internal_config
driver_id: The ID of driver.
Returns:
@@ -1305,12 +1305,6 @@ def _init(ray_params, driver_id=None):
else:
ray_params.driver_mode = SCRIPT_MODE
if ray_params.redis_max_memory and ray_params.collect_profiling_data:
logger.warning(
"Profiling data cannot be LRU evicted, so it is disabled "
"when redis_max_memory is set.")
ray_params.collect_profiling_data = False
# Get addresses of existing services.
if ray_params.address_info is None:
ray_params.address_info = {}
@@ -1436,7 +1430,6 @@ def init(redis_address=None,
resources=None,
object_store_memory=None,
redis_max_memory=None,
collect_profiling_data=True,
node_ip_address=None,
object_id_seed=None,
num_workers=None,
@@ -1497,7 +1490,6 @@ def init(redis_address=None,
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
collect_profiling_data: Whether to collect profiling data from workers.
node_ip_address (str): The IP address of the node that we are on.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
@@ -1599,7 +1591,6 @@ def init(redis_address=None,
include_webui=include_webui,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir,
@@ -1816,7 +1807,7 @@ def connect(ray_params,
Args:
ray_params (ray.params.RayParams): The RayParams instance. The
following parameters could be checked: object_id_seed,
redis_password, collect_profiling_data
redis_password
info (dict): A dictionary with address of the Redis server and the
sockets of the plasma store and raylet.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and
@@ -1833,10 +1824,7 @@ def connect(ray_params,
if not faulthandler.is_enabled():
faulthandler.enable(all_threads=False)
if ray_params.collect_profiling_data:
worker.profiler = profiling.Profiler(worker)
else:
worker.profiler = profiling.NoopProfiler()
worker.profiler = profiling.Profiler(worker)
# Initialize some fields.
if mode is WORKER_MODE:
-5
View File
@@ -51,11 +51,6 @@ parser.add_argument(
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
parser.add_argument(
"--collect-profiling-data",
type=int, # int since argparse can't handle bool values
default=1,
help="Whether to collect profiling data from workers.")
parser.add_argument(
"--temp-dir",
required=False,