diff --git a/CMakeLists.txt b/CMakeLists.txt index 32dc488ad..c21ba4420 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,10 +106,11 @@ add_library(pynumbuf STATIC ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/cpp/src/numbuf ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/python/src/pynumbuf/adapters/python.cc) target_link_libraries(pynumbuf ${ARROW_LIB} ${PYTHON_LIBRARIES}) -add_executable(objstore src/objstore.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(objstore ${ARROW_LIB} pynumbuf) -add_executable(scheduler src/scheduler.cc src/computation_graph.cc ${GENERATED_PROTOBUF_FILES}) -add_library(raylib SHARED src/raylib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(raylib ${ARROW_LIB} pynumbuf) +add_executable(objstore src/objstore.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(objstore ${ARROW_LIB} pynumbuf boost_system boost_filesystem) +add_executable(scheduler src/scheduler.cc src/computation_graph.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(scheduler boost_system boost_filesystem) +add_library(raylib SHARED src/raylib.cc src/worker.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(raylib ${ARROW_LIB} pynumbuf boost_system boost_filesystem) install(TARGETS objstore scheduler raylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/python/ray) diff --git a/include/ray/logging.h b/include/ray/logging.h index 744a92e4d..cb650cdd4 100644 --- a/include/ray/logging.h +++ b/include/ray/logging.h @@ -1,3 +1,14 @@ +#include +#include +#include + +struct RayConfig { + bool log_to_file = false; + std::ofstream logfile; +}; + +extern RayConfig global_ray_config; + #define RAY_VERBOSE -1 #define RAY_INFO 0 #define RAY_DEBUG 1 @@ -10,11 +21,18 @@ \ } else if (LEVEL == RAY_FATAL) { \ std::cerr << "fatal error occured: " << MESSAGE << std::endl; \ + if (global_ray_config.log_to_file) { \ + global_ray_config.logfile << "fatal error occured: " << MESSAGE << std::endl; \ + } \ std::exit(1); \ } else if (LEVEL == RAY_DEBUG) { \ \ } else { \ - std::cout << MESSAGE << std::endl; \ + if (global_ray_config.log_to_file) { \ + global_ray_config.logfile << MESSAGE << std::endl; \ + } else { \ + std::cout << MESSAGE << std::endl; \ + } \ } #define RAY_CHECK(condition, message) \ diff --git a/lib/python/ray/arrays/distributed/core.py b/lib/python/ray/arrays/distributed/core.py index be57ba235..bd8da610c 100644 --- a/lib/python/ray/arrays/distributed/core.py +++ b/lib/python/ray/arrays/distributed/core.py @@ -198,7 +198,6 @@ def subblocks(a, *ranges): shape = [(len(ranges[i]) - 1) * BLOCK_SIZE + last_block_shape[i] for i in range(a.ndim)] result = DistArray(shape) for index in np.ndindex(*result.num_blocks): - print tuple([ranges[i][index[i]] for i in range(a.ndim)]) result.objrefs[index] = a.objrefs[tuple([ranges[i][index[i]] for i in range(a.ndim)])] return result diff --git a/lib/python/ray/config.py b/lib/python/ray/config.py new file mode 100644 index 000000000..f6a54ec2c --- /dev/null +++ b/lib/python/ray/config.py @@ -0,0 +1,2 @@ +LOG_DIRECTORY = "/tmp/raylogs/" +LOG_TIMESTAMP = "{:%Y-%m-%d=%H:%M:%S}" diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index d7cc98b72..bd35bc14b 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -2,9 +2,11 @@ import subprocess32 as subprocess import os import atexit import time +import datetime import ray import ray.worker as worker +from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP _services_path = os.path.dirname(os.path.abspath(__file__)) @@ -66,11 +68,13 @@ def cleanup(): # atexit.register(cleanup) def start_scheduler(scheduler_address): - p = subprocess.Popen([os.path.join(_services_path, "scheduler"), scheduler_address]) + scheduler_log_filename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-scheduler.log").format(datetime.datetime.now())) + p = subprocess.Popen([os.path.join(_services_path, "scheduler"), scheduler_address, "--log-file-name", scheduler_log_filename]) all_processes.append((p, scheduler_address)) def start_objstore(scheduler_address, objstore_address): - p = subprocess.Popen([os.path.join(_services_path, "objstore"), scheduler_address, objstore_address]) + objstore_log_filename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-objstore-{}.log").format(datetime.datetime.now(), objstore_address)) + p = subprocess.Popen([os.path.join(_services_path, "objstore"), scheduler_address, objstore_address, "--log-file-name", objstore_log_filename]) all_processes.append((p, objstore_address)) def start_worker(test_path, scheduler_address, objstore_address, worker_address): diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index dd8b8f063..ca094c1da 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -1,3 +1,6 @@ +import datetime +import logging +import os from types import ModuleType import typing import funcsigs @@ -5,6 +8,7 @@ import numpy as np import pynumbuf import ray +from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP import serialization class Worker(object): @@ -61,11 +65,11 @@ def task_info(worker=global_worker): return ray.lib.task_info(worker.handle); def register_module(module, recursive=False, worker=global_worker): - print "registering functions in module {}.".format(module.__name__) + logging.info("registering functions in module {}.".format(module.__name__)) for name in dir(module): val = getattr(module, name) if hasattr(val, "is_remote") and val.is_remote: - print "registering {}.".format(val.func_name) + logging.info("registering {}.".format(val.func_name)) worker.register_function(val) # elif recursive and isinstance(val, ModuleType): # register_module(val, recursive, worker) @@ -74,6 +78,10 @@ def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker): if hasattr(worker, "handle"): del worker.handle worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr) + FORMAT = "%(asctime)-15s %(message)s" + log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_addr)) + logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=log_basename + ".log") + ray.lib.set_log_config(log_basename + "-c++.log") def disconnect(worker=global_worker): ray.lib.disconnect(worker.handle) @@ -109,10 +117,10 @@ def remote(arg_types, return_types, worker=global_worker): def remote_decorator(func): def func_executor(arguments): """This is what gets executed remotely on a worker after a remote function is scheduled by the scheduler.""" - print "Calling function {}".format(func.__name__) + logging.info("Calling function {}".format(func.__name__)) result = func(*arguments) check_return_values(func_call, result) # throws an exception if result is invalid - print "Finished executing function {}".format(func.__name__) + logging.info("Finished executing function {}".format(func.__name__)) return result def func_call(*args, **kwargs): """This is what gets run immediately when a worker calls a remote function.""" @@ -204,9 +212,9 @@ def get_arguments_for_execution(function, args, worker=global_worker): if isinstance(arg, ray.lib.ObjRef): # get the object from the local object store - print "Getting argument {} for function {}.".format(i, function.__name__) + logging.info("Getting argument {} for function {}.".format(i, function.__name__)) argument = worker.get_object(arg) - print "Successfully retrieved argument {} for function {}.".format(i, function.__name__) + logging.info("Successfully retrieved argument {} for function {}.".format(i, function.__name__)) else: # pass the argument by value argument = arg @@ -224,7 +232,7 @@ def store_outputs_in_objstore(objrefs, outputs, worker=global_worker): for i in range(len(objrefs)): if isinstance(outputs[i], ray.lib.ObjRef): # An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to - print "Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val) + logging.info("Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val)) worker.alias_objrefs(objrefs[i], outputs[i]) pass else: diff --git a/src/objstore.cc b/src/objstore.cc index 286ef229c..0cb4072e6 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -335,8 +335,23 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) { server->Wait(); } +RayConfig global_ray_config; + int main(int argc, char** argv) { - RAY_CHECK_EQ(argc, 3, "object store: expected two arguments (scheduler ip address and object store ip address)"); + RAY_CHECK_GE(argc, 3, "object store: expected at least two arguments (scheduler ip address and object store ip address)"); + + if (argc > 3) { + const char* log_file_name = get_cmd_option(argv, argv + argc, "--log-file-name"); + if (log_file_name) { + std::cout << "object store: writing to log file " << log_file_name << std::endl; + create_log_dir_or_die(log_file_name); + global_ray_config.log_to_file = true; + global_ray_config.logfile.open(log_file_name); + } else { + std::cout << "object store: writing logs to stdout; you can change this by passing --log-file-name to ./scheduler" << std::endl; + global_ray_config.log_to_file = false; + } + } start_objstore(argv[1], argv[2]); diff --git a/src/raylib.cc b/src/raylib.cc index 684e9e1d2..c727e67d4 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -11,6 +11,9 @@ #include "types.pb.h" #include "worker.h" +#include "utils.h" + +RayConfig global_ray_config; extern "C" { @@ -812,6 +815,17 @@ PyObject* task_info(PyObject* self, PyObject* args) { return dict; } +PyObject* set_log_config(PyObject* self, PyObject* args) { + const char* log_file_name; + if (!PyArg_ParseTuple(args, "s", &log_file_name)) { + return NULL; + } + create_log_dir_or_die(log_file_name); + global_ray_config.log_to_file = true; + global_ray_config.logfile.open(log_file_name); + Py_RETURN_NONE; +} + static PyMethodDef RayLibMethods[] = { { "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" }, { "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" }, @@ -835,6 +849,7 @@ static PyMethodDef RayLibMethods[] = { { "start_worker_service", start_worker_service, METH_VARARGS, "start the worker service" }, { "scheduler_info", scheduler_info, METH_VARARGS, "get info about scheduler state" }, { "task_info", task_info, METH_VARARGS, "get task statuses" }, + { "set_log_config", set_log_config, METH_VARARGS, "set filename for raylib logging" }, { NULL, NULL, 0, NULL } }; diff --git a/src/scheduler.cc b/src/scheduler.cc index cfaad0f67..5f58dc16f 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -794,26 +794,30 @@ void start_scheduler_service(const char* service_addr, SchedulingAlgorithmType s server->Wait(); } -char* get_cmd_option(char** begin, char** end, const std::string& option) { - char** it = std::find(begin, end, option); - if (it != end && ++it != end) { - return *it; - } - return 0; -} +RayConfig global_ray_config; int main(int argc, char** argv) { SchedulingAlgorithmType scheduling_algorithm = SCHEDULING_ALGORITHM_LOCALITY_AWARE; RAY_CHECK_GE(argc, 2, "scheduler: expected at least one argument (scheduler ip address)"); if (argc > 2) { - char* scheduling_algorithm_name = get_cmd_option(argv, argv + argc, "--scheduler-algorithm"); + const char* log_file_name = get_cmd_option(argv, argv + argc, "--log-file-name"); + if (log_file_name) { + std::cout << "scheduler: writing to log file " << log_file_name << std::endl; + create_log_dir_or_die(log_file_name); + global_ray_config.log_to_file = true; + global_ray_config.logfile.open(log_file_name); + } else { + std::cout << "scheduler: writing logs to stdout; you can change this by passing --log-file-name to ./scheduler" << std::endl; + global_ray_config.log_to_file = false; + } + const char* scheduling_algorithm_name = get_cmd_option(argv, argv + argc, "--scheduler-algorithm"); if (scheduling_algorithm_name) { - if(std::string(scheduling_algorithm_name) == "naive") { - std::cout << "using 'naive' scheduler" << std::endl; + if (std::string(scheduling_algorithm_name) == "naive") { + RAY_LOG(RAY_INFO, "scheduler: using 'naive' scheduler" << std::endl); scheduling_algorithm = SCHEDULING_ALGORITHM_NAIVE; } - if(std::string(scheduling_algorithm_name) == "locality_aware") { - std::cout << "using 'locality aware' scheduler" << std::endl; + if (std::string(scheduling_algorithm_name) == "locality_aware") { + RAY_LOG(RAY_INFO, "scheduler: using 'locality aware' scheduler" << std::endl); scheduling_algorithm = SCHEDULING_ALGORITHM_LOCALITY_AWARE; } } diff --git a/src/utils.cc b/src/utils.cc new file mode 100644 index 000000000..ba5e85989 --- /dev/null +++ b/src/utils.cc @@ -0,0 +1,39 @@ +#include "utils.h" + +#include + +#include "ray/ray.h" + +std::string::iterator split_ip_address(std::string& ip_address) { + if (ip_address[0] == '[') { // IPv6 + auto split_end = std::find(ip_address.begin() + 1, ip_address.end(), ']'); + if(split_end != ip_address.end()) { + split_end++; + } + if(split_end != ip_address.end() && *split_end == ':') { + return split_end; + } + RAY_CHECK(false, "ip address should contain a port number"); + } else { // IPv4 + auto split_point = std::find(ip_address.rbegin(), ip_address.rend(), ':').base(); + RAY_CHECK_NEQ(split_point, ip_address.begin(), "ip address should contain a port number"); + return split_point; + } +} + +const char* get_cmd_option(char** begin, char** end, const std::string& option) { + char** it = std::find(begin, end, option); + if (it != end && ++it != end) { + return *it; + } + return 0; +} + +void create_log_dir_or_die(const char* log_file_name) { + boost::filesystem::path log_file_path(log_file_name); + boost::system::error_code returned_error; + boost::filesystem::create_directories(log_file_path.parent_path(), returned_error); + if (returned_error) { + RAY_CHECK(false, "Failed to create directory for " << log_file_name); + } +} diff --git a/src/utils.h b/src/utils.h index f7d024d71..3e77e0a60 100644 --- a/src/utils.h +++ b/src/utils.h @@ -1,21 +1,12 @@ #ifndef RAY_UTILS_H #define RAY_UTILS_H -inline std::string::iterator split_ip_address(std::string& ip_address) { - if (ip_address[0] == '[') { // IPv6 - auto split_end = std::find(ip_address.begin() + 1, ip_address.end(), ']'); - if(split_end != ip_address.end()) { - split_end++; - } - if(split_end != ip_address.end() && *split_end == ':') { - return split_end; - } - RAY_CHECK(false, "ip address should contain a port number"); - } else { // IPv4 - auto split_point = std::find(ip_address.rbegin(), ip_address.rend(), ':').base(); - RAY_CHECK_NEQ(split_point, ip_address.begin(), "ip address should contain a port number"); - return split_point; - } -} +#include + +std::string::iterator split_ip_address(std::string& ip_address); + +const char* get_cmd_option(char** begin, char** end, const std::string& option); + +void create_log_dir_or_die(const char* log_file_name); #endif