mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 04:33:40 +08:00
Set the process title in workers and actors (#3219)
This commit is contained in:
committed by
Robert Nishihara
parent
f3efcd2342
commit
725df3a485
@@ -165,11 +165,14 @@ class TestPolicyEvaluator(unittest.TestCase):
|
||||
},
|
||||
})
|
||||
pg.train()
|
||||
self.assertEqual(counts["sample"], 1)
|
||||
pg.train()
|
||||
pg.train()
|
||||
pg.train()
|
||||
self.assertEqual(counts["sample"], 4)
|
||||
self.assertGreater(counts["start"], 0)
|
||||
self.assertGreater(counts["end"], 0)
|
||||
self.assertGreater(counts["step"], 50)
|
||||
self.assertLess(counts["step"], 100)
|
||||
self.assertGreater(counts["step"], 200)
|
||||
self.assertLess(counts["step"], 400)
|
||||
|
||||
def testQueryEvaluators(self):
|
||||
register_env("test", lambda _: gym.make("CartPole-v0"))
|
||||
|
||||
@@ -388,6 +388,12 @@ def stop():
|
||||
"grep -v grep | awk '{ print $2 }') 2> /dev/null"
|
||||
],
|
||||
shell=True)
|
||||
subprocess.call(
|
||||
[
|
||||
"kill -9 $(ps aux | grep ' ray_' | "
|
||||
"grep -v grep | awk '{ print $2 }') 2> /dev/null"
|
||||
],
|
||||
shell=True)
|
||||
|
||||
# Find the PID of the Ray log monitor process and kill it.
|
||||
subprocess.call(
|
||||
@@ -591,7 +597,7 @@ export IFS="
|
||||
# Call sudo to prompt for password before anything has been printed.
|
||||
sudo true
|
||||
workers=$(
|
||||
ps aux | grep default_worker.py | grep -v grep | grep -v raylet/raylet
|
||||
ps aux | grep ' ray_' | grep -v grep
|
||||
)
|
||||
for worker in $workers; do
|
||||
echo "Stack dump for $worker";
|
||||
|
||||
+19
-1
@@ -2,6 +2,7 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from contextlib import contextmanager
|
||||
import atexit
|
||||
import colorama
|
||||
import hashlib
|
||||
@@ -10,6 +11,7 @@ import logging
|
||||
import numpy as np
|
||||
import os
|
||||
import redis
|
||||
import setproctitle
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
@@ -899,8 +901,15 @@ class Worker(object):
|
||||
"name": function_name,
|
||||
"task_id": task.task_id().hex()
|
||||
}
|
||||
if task.actor_id().id() == NIL_ACTOR_ID:
|
||||
title = "ray_worker:{}()".format(function_name)
|
||||
else:
|
||||
actor = self.actors[task.actor_id().id()]
|
||||
title = "ray_{}:{}()".format(actor.__class__.__name__,
|
||||
function_name)
|
||||
with profiling.profile("task", extra_data=extra_data, worker=self):
|
||||
self._process_task(task, execution_info)
|
||||
with _changeproctitle(title):
|
||||
self._process_task(task, execution_info)
|
||||
|
||||
# Increase the task execution counter.
|
||||
self.function_actor_manager.increase_task_counter(
|
||||
@@ -1844,6 +1853,7 @@ def connect(info,
|
||||
# Initialize some fields.
|
||||
if mode is WORKER_MODE:
|
||||
worker.worker_id = random_string()
|
||||
setproctitle.setproctitle("ray_worker")
|
||||
else:
|
||||
# This is the code path of driver mode.
|
||||
if driver_id is None:
|
||||
@@ -2087,6 +2097,14 @@ def disconnect(worker=global_worker):
|
||||
worker.serialization_context_map.clear()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _changeproctitle(title):
|
||||
old_title = setproctitle.getproctitle()
|
||||
setproctitle.setproctitle(title)
|
||||
yield
|
||||
setproctitle.setproctitle(old_title)
|
||||
|
||||
|
||||
def _try_to_compute_deterministic_class_id(cls, depth=5):
|
||||
"""Attempt to produce a deterministic class ID for a given class.
|
||||
|
||||
|
||||
@@ -151,6 +151,7 @@ setup(
|
||||
"pytest",
|
||||
"pyyaml",
|
||||
"redis",
|
||||
"setproctitle",
|
||||
# The six module is required by pyarrow.
|
||||
"six >= 1.0.0",
|
||||
"flatbuffers"
|
||||
|
||||
Reference in New Issue
Block a user