From b29fc0c4819d7dea95c67cb7539e70e2b05cadf2 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 15 Aug 2016 15:55:34 -0700 Subject: [PATCH] Use random string for worker c++ logfile. (#378) --- lib/python/ray/worker.py | 10 ++++++---- src/raylib.cc | 21 ++++++++------------- src/scheduler.cc | 4 ++-- src/worker.cc | 10 +++++----- 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 39dab8f88..4a136dad2 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -10,6 +10,7 @@ import numpy as np import colorama import atexit import threading +import string # Ray modules import config @@ -729,10 +730,13 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, worker=gl return worker.scheduler_address = scheduler_address + random_string = "".join(np.random.choice(list(string.ascii_uppercase + string.digits)) for _ in range(10)) + cpp_log_file_name = config.get_log_file_path("-".join(["worker", random_string, "c++"]) + ".log") + python_log_file_name = config.get_log_file_path("-".join(["worker", random_string]) + ".log") # Create a worker object. This also creates the worker service, which can # receive commands from the scheduler. This call also sets up a queue between # the worker and the worker service. - worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", mode) + worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", mode, cpp_log_file_name) # If this is a driver running in SCRIPT_MODE, start a thread to print error # messages asynchronously in the background. Ideally the scheduler would push # messages to the driver's worker service, but we ran into bugs when trying to @@ -749,14 +753,12 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, worker=gl # Configure the Python logging module. Note that if we do not provide our own # logger, then our logging will interfere with other Python modules that also # use the logging module. - log_handler = logging.FileHandler(config.get_log_file_path("-".join(["worker", worker.worker_address]) + ".log")) + log_handler = logging.FileHandler(python_log_file_name) log_handler.setLevel(logging.DEBUG) log_handler.setFormatter(logging.Formatter(FORMAT)) _logger().addHandler(log_handler) _logger().setLevel(logging.DEBUG) _logger().propagate = False - # Configure the logging from the worker C++ code. - raylib.set_log_config(config.get_log_file_path("-".join(["worker", worker.worker_address, "c++"]) + ".log")) if mode in [raylib.SCRIPT_MODE, raylib.SILENT_MODE]: for function_name, function_to_export in worker.cached_remote_functions: raylib.export_remote_function(worker.handle, function_name, function_to_export) diff --git a/src/raylib.cc b/src/raylib.cc index de7f1ab7c..f95b17fe2 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -666,11 +666,18 @@ static PyObject* create_worker(PyObject* self, PyObject* args) { // scheduler will choose the object store address. const char* objstore_address; int mode; - if (!PyArg_ParseTuple(args, "sssi", &node_ip_address, &scheduler_address, &objstore_address, &mode)) { + const char* log_file_name; + if (!PyArg_ParseTuple(args, "sssis", &node_ip_address, &scheduler_address, &objstore_address, &mode, &log_file_name)) { return NULL; } + // Set the logging file. + create_log_dir_or_die(log_file_name); + global_ray_config.log_to_file = true; + global_ray_config.logfile.open(log_file_name); + // Create the worker. bool is_driver = (mode != Mode::WORKER_MODE); Worker* worker = new Worker(std::string(node_ip_address), std::string(scheduler_address), static_cast(mode)); + // Register the worker. worker->register_worker(std::string(node_ip_address), std::string(objstore_address), is_driver); PyObject* t = PyTuple_New(2); @@ -1023,17 +1030,6 @@ static PyObject* dump_computation_graph(PyObject* self, PyObject* args) { Py_RETURN_NONE; } -static PyObject* set_log_config(PyObject* self, PyObject* args) { - const char* log_file_name; - if (!PyArg_ParseTuple(args, "s", &log_file_name)) { - return NULL; - } - create_log_dir_or_die(log_file_name); - global_ray_config.log_to_file = true; - global_ray_config.logfile.open(log_file_name); - Py_RETURN_NONE; -} - static PyObject* kill_workers(PyObject* self, PyObject* args) { Worker* worker; if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) { @@ -1074,7 +1070,6 @@ static PyMethodDef RayLibMethods[] = { { "export_remote_function", export_remote_function, METH_VARARGS, "export a remote function to workers" }, { "export_reusable_variable", export_reusable_variable, METH_VARARGS, "export a reusable variable to the workers" }, { "dump_computation_graph", dump_computation_graph, METH_VARARGS, "dump the current computation graph to a file" }, - { "set_log_config", set_log_config, METH_VARARGS, "set filename for raylib logging" }, { "kill_workers", kill_workers, METH_VARARGS, "kills all of the workers" }, { NULL, NULL, 0, NULL } }; diff --git a/src/scheduler.cc b/src/scheduler.cc index d4bc238f1..b179141da 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -1138,11 +1138,11 @@ int main(int argc, char** argv) { const char* scheduling_algorithm_name = get_cmd_option(argv, argv + argc, "--scheduler-algorithm"); if (scheduling_algorithm_name) { if (std::string(scheduling_algorithm_name) == "naive") { - RAY_LOG(RAY_INFO, "scheduler: using 'naive' scheduler" << std::endl); + RAY_LOG(RAY_INFO, "scheduler: using 'naive' scheduler"); scheduling_algorithm = SCHEDULING_ALGORITHM_NAIVE; } if (std::string(scheduling_algorithm_name) == "locality_aware") { - RAY_LOG(RAY_INFO, "scheduler: using 'locality aware' scheduler" << std::endl); + RAY_LOG(RAY_INFO, "scheduler: using 'locality aware' scheduler"); scheduling_algorithm = SCHEDULING_ALGORITHM_LOCALITY_AWARE; } } diff --git a/src/worker.cc b/src/worker.cc index 8c5fdf61d..f0aa3cbbf 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -13,7 +13,7 @@ extern "C" { inline WorkerServiceImpl::WorkerServiceImpl(const std::string& send_queue_name, Mode mode) : mode_(mode) { - RAY_LOG(RAY_DEBUG, "Worker service connecting to queue " << send_queue_name); + RAY_LOG(RAY_INFO, "Worker service connecting to queue " << send_queue_name); RAY_CHECK(send_queue_.connect(send_queue_name, false), "error connecting send_queue_"); } @@ -101,7 +101,7 @@ Worker::Worker(const std::string& node_ip_address, const std::string& scheduler_ std::mt19937 rng(rd()); std::uniform_int_distribution queue_name_generator(0, 10000000); receive_queue_name_ = "worker_receive_queue:" + std::to_string(queue_name_generator(rng)); - RAY_LOG(RAY_DEBUG, "Worker creating queue " << receive_queue_name_ << std::endl); + RAY_LOG(RAY_INFO, "Worker creating queue " << receive_queue_name_); RAY_CHECK(receive_queue_.connect(receive_queue_name_, true), "error connecting receive_queue_"); } @@ -162,11 +162,11 @@ void Worker::register_worker(const std::string& node_ip_address, const std::stri segmentpool_ = std::make_shared(objstoreid_, objstore_address_, false); // Connect to the queue for sending requests to the object store. std::string request_obj_queue_name = std::string("queue:") + objstore_address_ + std::string(":obj"); - RAY_LOG(RAY_DEBUG, "Worker connecting to queue with name " << request_obj_queue_name << " to send requests to the object store."); + RAY_LOG(RAY_INFO, "Worker connecting to queue with name " << request_obj_queue_name << " to send requests to the object store."); RAY_CHECK(request_obj_queue_.connect(request_obj_queue_name, false), "error connecting request_obj_queue_"); // Create a queue for receiving messages from the object store. std::string receive_obj_queue_name = std::string("queue:") + objstore_address_ + std::string(":worker:") + std::to_string(workerid_) + std::string(":obj"); - RAY_LOG(RAY_DEBUG, "Worker creating queue with name " << receive_obj_queue_name << " to receive messages from the object store."); + RAY_LOG(RAY_INFO, "Worker creating queue with name " << receive_obj_queue_name << " to receive messages from the object store."); RAY_CHECK(receive_obj_queue_.connect(receive_obj_queue_name, true), "error connecting receive_obj_queue_"); connected_ = true; return; @@ -481,7 +481,7 @@ void Worker::start_worker_service(Mode mode) { // Wait for the worker service to start. This essentially implements a // condition variable using atomics, but that failed on Mac OS X on Travis. while (!worker_service_started.load()) { - RAY_LOG(RAY_DEBUG, "Looping while waiting for the worker service to start."); + RAY_LOG(RAY_INFO, "Looping while waiting for the worker service to start."); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }