mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 03:50:57 +08:00
Write logs to files instead of printing them to stdout
This commit is contained in:
@@ -198,7 +198,6 @@ def subblocks(a, *ranges):
|
||||
shape = [(len(ranges[i]) - 1) * BLOCK_SIZE + last_block_shape[i] for i in range(a.ndim)]
|
||||
result = DistArray(shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
print tuple([ranges[i][index[i]] for i in range(a.ndim)])
|
||||
result.objrefs[index] = a.objrefs[tuple([ranges[i][index[i]] for i in range(a.ndim)])]
|
||||
return result
|
||||
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
LOG_DIRECTORY = "/tmp/raylogs/"
|
||||
LOG_TIMESTAMP = "{:%Y-%m-%d=%H:%M:%S}"
|
||||
@@ -2,9 +2,11 @@ import subprocess32 as subprocess
|
||||
import os
|
||||
import atexit
|
||||
import time
|
||||
import datetime
|
||||
|
||||
import ray
|
||||
import ray.worker as worker
|
||||
from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
|
||||
|
||||
_services_path = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
@@ -66,11 +68,13 @@ def cleanup():
|
||||
# atexit.register(cleanup)
|
||||
|
||||
def start_scheduler(scheduler_address):
|
||||
p = subprocess.Popen([os.path.join(_services_path, "scheduler"), scheduler_address])
|
||||
scheduler_log_filename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-scheduler.log").format(datetime.datetime.now()))
|
||||
p = subprocess.Popen([os.path.join(_services_path, "scheduler"), scheduler_address, "--log-file-name", scheduler_log_filename])
|
||||
all_processes.append((p, scheduler_address))
|
||||
|
||||
def start_objstore(scheduler_address, objstore_address):
|
||||
p = subprocess.Popen([os.path.join(_services_path, "objstore"), scheduler_address, objstore_address])
|
||||
objstore_log_filename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-objstore-{}.log").format(datetime.datetime.now(), objstore_address))
|
||||
p = subprocess.Popen([os.path.join(_services_path, "objstore"), scheduler_address, objstore_address, "--log-file-name", objstore_log_filename])
|
||||
all_processes.append((p, objstore_address))
|
||||
|
||||
def start_worker(test_path, scheduler_address, objstore_address, worker_address):
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from types import ModuleType
|
||||
import typing
|
||||
import funcsigs
|
||||
@@ -5,6 +8,7 @@ import numpy as np
|
||||
import pynumbuf
|
||||
|
||||
import ray
|
||||
from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
|
||||
import serialization
|
||||
|
||||
class Worker(object):
|
||||
@@ -61,11 +65,11 @@ def task_info(worker=global_worker):
|
||||
return ray.lib.task_info(worker.handle);
|
||||
|
||||
def register_module(module, recursive=False, worker=global_worker):
|
||||
print "registering functions in module {}.".format(module.__name__)
|
||||
logging.info("registering functions in module {}.".format(module.__name__))
|
||||
for name in dir(module):
|
||||
val = getattr(module, name)
|
||||
if hasattr(val, "is_remote") and val.is_remote:
|
||||
print "registering {}.".format(val.func_name)
|
||||
logging.info("registering {}.".format(val.func_name))
|
||||
worker.register_function(val)
|
||||
# elif recursive and isinstance(val, ModuleType):
|
||||
# register_module(val, recursive, worker)
|
||||
@@ -74,6 +78,10 @@ def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker):
|
||||
if hasattr(worker, "handle"):
|
||||
del worker.handle
|
||||
worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr)
|
||||
FORMAT = "%(asctime)-15s %(message)s"
|
||||
log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_addr))
|
||||
logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=log_basename + ".log")
|
||||
ray.lib.set_log_config(log_basename + "-c++.log")
|
||||
|
||||
def disconnect(worker=global_worker):
|
||||
ray.lib.disconnect(worker.handle)
|
||||
@@ -109,10 +117,10 @@ def remote(arg_types, return_types, worker=global_worker):
|
||||
def remote_decorator(func):
|
||||
def func_executor(arguments):
|
||||
"""This is what gets executed remotely on a worker after a remote function is scheduled by the scheduler."""
|
||||
print "Calling function {}".format(func.__name__)
|
||||
logging.info("Calling function {}".format(func.__name__))
|
||||
result = func(*arguments)
|
||||
check_return_values(func_call, result) # throws an exception if result is invalid
|
||||
print "Finished executing function {}".format(func.__name__)
|
||||
logging.info("Finished executing function {}".format(func.__name__))
|
||||
return result
|
||||
def func_call(*args, **kwargs):
|
||||
"""This is what gets run immediately when a worker calls a remote function."""
|
||||
@@ -204,9 +212,9 @@ def get_arguments_for_execution(function, args, worker=global_worker):
|
||||
|
||||
if isinstance(arg, ray.lib.ObjRef):
|
||||
# get the object from the local object store
|
||||
print "Getting argument {} for function {}.".format(i, function.__name__)
|
||||
logging.info("Getting argument {} for function {}.".format(i, function.__name__))
|
||||
argument = worker.get_object(arg)
|
||||
print "Successfully retrieved argument {} for function {}.".format(i, function.__name__)
|
||||
logging.info("Successfully retrieved argument {} for function {}.".format(i, function.__name__))
|
||||
else:
|
||||
# pass the argument by value
|
||||
argument = arg
|
||||
@@ -224,7 +232,7 @@ def store_outputs_in_objstore(objrefs, outputs, worker=global_worker):
|
||||
for i in range(len(objrefs)):
|
||||
if isinstance(outputs[i], ray.lib.ObjRef):
|
||||
# An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to
|
||||
print "Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val)
|
||||
logging.info("Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val))
|
||||
worker.alias_objrefs(objrefs[i], outputs[i])
|
||||
pass
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user