Add job table to state API (#5076)

This commit is contained in:
Philipp Moritz
2019-07-06 00:05:48 -07:00
committed by Robert Nishihara
parent 53d5a8a45f
commit c5253cc300
12 changed files with 143 additions and 17 deletions
+2 -1
View File
@@ -67,7 +67,7 @@ from ray._raylet import (
_config = _Config()
from ray.profiling import profile # noqa: E402
from ray.state import (global_state, nodes, tasks, objects, timeline,
from ray.state import (global_state, jobs, nodes, tasks, objects, timeline,
object_transfer_timeline, cluster_resources,
available_resources, errors) # noqa: E402
from ray.worker import (
@@ -101,6 +101,7 @@ __version__ = "0.8.0.dev2"
__all__ = [
"global_state",
"jobs",
"nodes",
"tasks",
"objects",
+1
View File
@@ -59,6 +59,7 @@ TablePrefix_RAYLET_TASK_string = "RAYLET_TASK"
TablePrefix_OBJECT_string = "OBJECT"
TablePrefix_ERROR_INFO_string = "ERROR_INFO"
TablePrefix_PROFILE_string = "PROFILE"
TablePrefix_JOB_string = "JOB"
def construct_error_message(job_id, error_type, message, timestamp):
+8 -7
View File
@@ -198,8 +198,8 @@ class Monitor(object):
"entries from redis shard {}.".format(
len(keys) - num_deleted, shard_index))
def xray_job_removed_handler(self, unused_channel, data):
"""Handle a notification that a job has been removed.
def xray_job_notification_handler(self, unused_channel, data):
"""Handle a notification that a job has been added or removed.
Args:
unused_channel: The message channel.
@@ -209,10 +209,11 @@ class Monitor(object):
job_data = gcs_entries.entries[0]
message = ray.gcs_utils.JobTableData.FromString(job_data)
job_id = message.job_id
logger.info("Monitor: "
"XRay Driver {} has been removed.".format(
binary_to_hex(job_id)))
self._xray_clean_up_entries_for_job(job_id)
if message.is_dead:
logger.info("Monitor: "
"XRay Driver {} has been removed.".format(
binary_to_hex(job_id)))
self._xray_clean_up_entries_for_job(job_id)
def process_messages(self, max_messages=10000):
"""Process all messages ready in the subscription channels.
@@ -242,7 +243,7 @@ class Monitor(object):
message_handler = self.xray_heartbeat_batch_handler
elif channel == ray.gcs_utils.XRAY_JOB_CHANNEL:
# Handles driver death.
message_handler = self.xray_job_removed_handler
message_handler = self.xray_job_notification_handler
else:
raise Exception("This code should be unreachable.")
+84
View File
@@ -385,6 +385,76 @@ class GlobalState(object):
return _parse_client_table(self.redis_client)
def _job_table(self, job_id):
"""Fetch and parse the job table information for a single job ID.
Args:
job_id: A job ID or hex string to get information about.
Returns:
A dictionary with information about the job ID in question.
"""
# Allow the argument to be either a JobID or a hex string.
if not isinstance(job_id, ray.JobID):
assert isinstance(job_id, str)
job_id = ray.JobID(hex_to_binary(job_id))
# Return information about a single job ID.
message = self.redis_client.execute_command(
"RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("JOB"), "",
job_id.binary())
if message is None:
return {}
gcs_entry = gcs_utils.GcsEntry.FromString(message)
assert len(gcs_entry.entries) > 0
job_info = {}
for i in range(len(gcs_entry.entries)):
entry = gcs_utils.JobTableData.FromString(gcs_entry.entries[i])
assert entry.job_id == job_id.binary()
job_info["JobID"] = job_id.hex()
job_info["NodeManagerAddress"] = entry.node_manager_address
job_info["DriverPid"] = entry.driver_pid
if entry.is_dead:
job_info["StopTime"] = entry.timestamp
else:
job_info["StartTime"] = entry.timestamp
return job_info
def job_table(self):
"""Fetch and parse the Redis job table.
Returns:
Information about the Ray jobs in the cluster,
namely a list of dicts with keys:
- "JobID" (identifier for the job),
- "NodeManagerAddress" (IP address of the driver for this job),
- "DriverPid" (process ID of the driver for this job),
- "StartTime" (UNIX timestamp of the start time of this job),
- "StopTime" (UNIX timestamp of the stop time of this job, if any)
"""
self._check_connected()
job_keys = self.redis_client.keys(gcs_utils.TablePrefix_JOB_string +
"*")
job_ids_binary = {
key[len(gcs_utils.TablePrefix_JOB_string):]
for key in job_keys
}
results = []
for job_id_binary in job_ids_binary:
results.append(self._job_table(binary_to_hex(job_id_binary)))
return results
def _profile_table(self, batch_id):
"""Get the profile events for a given batch of profile events.
@@ -982,6 +1052,20 @@ state = GlobalState()
global_state = DeprecatedGlobalState()
def jobs():
"""Get a list of the jobs in the cluster.
Returns:
Information from the job table, namely a list of dicts with keys:
- "JobID" (identifier for the job),
- "NodeManagerAddress" (IP address of the driver for this job),
- "DriverPid" (process ID of the driver for this job),
- "StartTime" (UNIX timestamp of the start time of this job),
- "StopTime" (UNIX timestamp of the stop time of this job, if any)
"""
return state.job_table()
def nodes():
"""Get a list of the nodes in the cluster.
+16 -3
View File
@@ -2423,15 +2423,22 @@ def wait_for_num_objects(num_objects, timeout=10):
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="New GCS API doesn't have a Python API yet.")
def test_global_state_api(shutdown_only):
with pytest.raises(Exception):
error_message = ("The ray global state API cannot be used "
"before ray.init has been called.")
with pytest.raises(Exception, match=error_message):
ray.objects()
with pytest.raises(Exception):
with pytest.raises(Exception, match=error_message):
ray.tasks()
with pytest.raises(Exception):
with pytest.raises(Exception, match=error_message):
ray.nodes()
with pytest.raises(Exception, match=error_message):
ray.jobs()
ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1})
resources = {"CPU": 5, "GPU": 3, "CustomResource": 1}
@@ -2509,6 +2516,12 @@ def test_global_state_api(shutdown_only):
object_table_entry = ray.objects(result_id)
assert object_table[result_id] == object_table_entry
job_table = ray.jobs()
assert len(job_table) == 1
assert job_table[0]["JobID"] == job_id
assert job_table[0]["NodeManagerAddress"] == node_ip_address
# TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we
# should use those, but they seem to conflict with Ray's use of faulthandler.