From 362ffa1f3ce7be66d2977a9e8bc70ee95171247c Mon Sep 17 00:00:00 2001 From: Wapaul1 Date: Wed, 10 Aug 2016 19:08:38 -0700 Subject: [PATCH] Changing hard coded ports for objstore and workers to choose unused ports (#365) * let grpc choose unused worker and object store ports * Add objstore addresses to scheduler info to bring back test --- lib/python/ray/services.py | 42 ++++++---------- lib/python/ray/worker.py | 25 ++++------ protos/ray.proto | 9 ++-- protos/types.proto | 5 ++ src/ipc.cc | 12 +++-- src/ipc.h | 6 ++- src/objstore.cc | 79 +++++++++++++++++++----------- src/objstore.h | 8 +++- src/raylib.cc | 32 ++++++++++--- src/scheduler.cc | 28 +++++++---- src/worker.cc | 98 +++++++++++++++++++++++++------------- src/worker.h | 29 +++++++---- test/runtest.py | 47 ++++++++++-------- 13 files changed, 259 insertions(+), 161 deletions(-) diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 9ab3a9605..841c398d9 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -2,6 +2,7 @@ import os import sys import time import subprocess32 as subprocess +import numpy as np # Ray modules import config @@ -19,17 +20,8 @@ TIMEOUT_SECONDS = 5 def address(host, port): return host + ":" + str(port) -scheduler_port_counter = 0 def new_scheduler_port(): - global scheduler_port_counter - scheduler_port_counter += 1 - return 10000 + scheduler_port_counter - -objstore_port_counter = 0 -def new_objstore_port(): - global objstore_port_counter - objstore_port_counter += 1 - return 20000 + objstore_port_counter + return np.random.randint(10000, 65536) def cleanup(): """When running in local mode, shutdown the Ray processes. @@ -68,26 +60,28 @@ def start_scheduler(scheduler_address, cleanup): this process will be killed by serices.cleanup() when the Python process that imported services exits. """ - p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler.log")], env=_services_env) + scheduler_port = scheduler_address.split(":")[1] + p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler-" + scheduler_port + ".log")], env=_services_env) if cleanup: all_processes.append(p) -def start_objstore(scheduler_address, objstore_address, cleanup): +def start_objstore(scheduler_address, node_ip_address, cleanup): """This method starts an object store process. Args: scheduler_address (str): The ip address and port of the scheduler to connect to. - objstore_address (str): The ip address and port to use for the object store. + node_ip_address (str): The ip address of the node running the object store. + The object store's port number will be chosen by the object store process. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. """ - p = subprocess.Popen(["objstore", scheduler_address, objstore_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", objstore_address]) + ".log")], env=_services_env) + p = subprocess.Popen(["objstore", scheduler_address, node_ip_address, "--log-file-prefix", config.get_log_file_path("")], env=_services_env) if cleanup: all_processes.append(p) -def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True, user_source_directory=None): +def start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True, user_source_directory=None): """This method starts a worker process. Args: @@ -96,8 +90,6 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre run. scheduler_address (str): The ip address and port of the scheduler to connect to. - objstore_address (Optional[str]): The ip address and port of the object - store to connect to. cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. This is True by default. @@ -114,8 +106,6 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre "--node-ip-address=" + node_ip_address, "--user-source-directory=" + user_source_directory, "--scheduler-address=" + scheduler_address] - if objstore_address is not None: - command.append("--objstore-address=" + objstore_address) p = subprocess.Popen(command) if cleanup: all_processes.append(p) @@ -139,13 +129,12 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None cleanup (bool): If cleanup is True, then the processes started by this command will be killed when the process that imported services exits. """ - objstore_address = address(node_ip_address, new_objstore_port()) - start_objstore(scheduler_address, objstore_address, cleanup=cleanup) + start_objstore(scheduler_address, node_ip_address, cleanup=cleanup) time.sleep(0.2) if worker_path is None: worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") for _ in range(num_workers): - start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, user_source_directory=user_source_directory, cleanup=cleanup) + start_worker(node_ip_address, worker_path, scheduler_address, user_source_directory=user_source_directory, cleanup=cleanup) time.sleep(0.5) def start_workers(scheduler_address, objstore_address, num_workers, worker_path): @@ -191,12 +180,9 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, scheduler_address = address(node_ip_address, new_scheduler_port()) start_scheduler(scheduler_address, cleanup=True) time.sleep(0.1) - objstore_addresses = [] # create objstores for i in range(num_objstores): - objstore_address = address(node_ip_address, new_objstore_port()) - objstore_addresses.append(objstore_address) - start_objstore(scheduler_address, objstore_address, cleanup=True) + start_objstore(scheduler_address, node_ip_address, cleanup=True) time.sleep(0.2) if i < num_objstores - 1: num_workers_to_start = num_workers / num_objstores @@ -205,7 +191,7 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, # remaining number of workers. num_workers_to_start = num_workers - (num_objstores - 1) * (num_workers / num_objstores) for _ in range(num_workers_to_start): - start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, cleanup=True) + start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True) time.sleep(0.3) - return scheduler_address, objstore_addresses + return scheduler_address diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index b3523d16d..3dbb00ac8 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -651,7 +651,7 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_ num_objstores = 1 if num_objstores is None else num_objstores # Start the scheduler, object store, and some workers. These will be killed # by the call to cleanup(), which happens when the Python script exits. - scheduler_address, _ = services.start_ray_local(num_objstores=num_objstores, num_workers=num_workers, worker_path=None) + scheduler_address = services.start_ray_local(num_objstores=num_objstores, num_workers=num_workers, worker_path=None) else: # In this case, there is an existing scheduler and object store, and we do # not need to start any processes. @@ -662,7 +662,7 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_ # Connect this driver to the scheduler and object store. The corresponing call # to disconnect will happen in the call to cleanup() when the Python script # exits. - connect(node_ip_address, scheduler_address, is_driver=True, worker=global_worker, mode=driver_mode) + connect(node_ip_address, scheduler_address, worker=global_worker, mode=driver_mode) def cleanup(worker=global_worker): """Disconnect the driver, and terminate any processes started in init. @@ -678,7 +678,7 @@ def cleanup(worker=global_worker): atexit.register(cleanup) -def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver=False, worker=global_worker, mode=raylib.WORKER_MODE): +def connect(node_ip_address, scheduler_address, objstore_address=None, worker=global_worker, mode=raylib.WORKER_MODE): """Connect this worker to the scheduler and an object store. Args: @@ -687,7 +687,6 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver objstore_address (Optional[str]): The ip address and port of the local object store. Normally, this argument should be omitted and the scheduler will tell the worker what object store to connect to. - is_driver (bool): True if this worker is a driver and false otherwise. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. """ @@ -699,7 +698,10 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver return worker.scheduler_address = scheduler_address - worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", is_driver) + # Create a worker object. This also creates the worker service, which can + # receive commands from the scheduler. This call also sets up a queue between + # the worker and the worker service. + worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", mode) worker.set_mode(mode) FORMAT = "%(asctime)-15s %(message)s" # Configure the Python logging module. Note that if we do not provide our own @@ -720,12 +722,6 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver _export_reusable_variable(name, reusable_variable) worker.cached_remote_functions = None reusables._cached_reusables = None - # Start the driver's WorkerService (if this is a driver). This will receive - # GRPC commands from the scheduler to print error messages. We pass in the - # mode below. This tells the WorkerService whether it is operating for a - # driver or a worker and whether it should surpress errors or not. - if is_driver: - raylib.start_worker_service(worker.handle, mode) def disconnect(worker=global_worker): """Disconnect this worker from the scheduler and object store.""" @@ -844,9 +840,8 @@ def main_loop(worker=global_worker): """ if not raylib.connected(worker.handle): raise Exception("Worker is attempting to enter main_loop but has not been connected yet.") - # We pass in raylib.WORKER_MODE below to indicate that the WorkerService is - # operating for a worker and not a driver. - raylib.start_worker_service(worker.handle, raylib.WORKER_MODE) + # Notify the scheduler that the worker is ready to start receiving tasks. + raylib.ready_for_new_task(worker.handle) def process_task(task): # wrapping these lines in a function should cause the local variables to go out of scope more quickly, which is useful for inspecting reference counts """Execute a task assigned to this worker. @@ -880,7 +875,7 @@ def main_loop(worker=global_worker): store_outputs_in_objstore(return_objectids, outputs, worker) # store output in local object store # Notify the scheduler that the task is done. This happens regardless of # whether the task succeeded or failed. - raylib.notify_task_completed(worker.handle) + raylib.ready_for_new_task(worker.handle) try: # Reinitialize the values of reusable variables that were used in the task # above so that changes made to their state do not affect other tasks. diff --git a/protos/ray.proto b/protos/ray.proto index 98b681aad..e1d29b775 100644 --- a/protos/ray.proto +++ b/protos/ray.proto @@ -65,15 +65,15 @@ message AckReply { message RegisterWorkerRequest { string node_ip_address = 1; // The IP address of the node the worker is running on. - string objstore_address = 2; // The address of the object store the worker should connect to. If omitted, this will be assigned by the scheduler. - bool is_driver = 3; // True if the worker is a driver, and false otherwise. + string worker_address = 2; // The address of the worker. + string objstore_address = 3; // The address of the object store the worker should connect to. If omitted, this will be assigned by the scheduler. + bool is_driver = 4; // True if the worker is a driver, and false otherwise. } message RegisterWorkerReply { uint64 workerid = 1; // Worker ID assigned by the scheduler uint64 objstoreid = 2; // The Object store ID of the worker's local object store - string worker_address = 3; // IP address of the worker being registered - string objstore_address = 4; // IP address of the object store the worker should connect to + string objstore_address = 3; // IP address of the object store the worker should connect to } message RegisterObjStoreRequest { @@ -165,6 +165,7 @@ message SchedulerInfoReply { repeated uint64 target_objectid = 4; // The target_objectids_ data structure repeated uint64 reference_count = 5; // The reference_counts_ data structure CompGraph computation_graph = 6; // The computation graph constructed so far + repeated ObjstoreData objstore = 7; // Information about the object stores } // Object stores diff --git a/protos/types.proto b/protos/types.proto index 538066668..054925401 100644 --- a/protos/types.proto +++ b/protos/types.proto @@ -66,6 +66,11 @@ message Failure { string error_message = 5; // The error message from the failure. } +message ObjstoreData { + uint64 objstoreid = 1; // The ID of the object store. + string address = 2; // The address of the object store. +} + // Union of possible object types message Obj { String string_data = 1; diff --git a/src/ipc.cc b/src/ipc.cc index dc5bcbe1d..b4f7ca17a 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -6,6 +6,7 @@ #include #include "ray/ray.h" +#include "utils.h" ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer, size_t metadata_offset) : segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer), metadata_offset_(metadata_offset) @@ -82,13 +83,16 @@ bool MessageQueue<>::receive(void * object, size_t size) { return true; } -MemorySegmentPool::MemorySegmentPool(ObjStoreId objstoreid, bool create) : objstoreid_(objstoreid), create_mode_(create) { } +MemorySegmentPool::MemorySegmentPool(ObjStoreId objstoreid, std::string& objstore_address, bool create) : objstoreid_(objstoreid), objstore_address_(objstore_address), create_mode_(create) { + std::string::iterator split_point = split_ip_address(objstore_address); + objstore_port_.assign(split_point, objstore_address.end()); +} // creates a memory segment if it is not already there; if the pool is in create mode, // space is allocated, if it is in open mode, the shared memory is mapped into the process void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { - RAY_LOG(RAY_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with create_mode_ = " << create_mode_); - RAY_CHECK(segmentid == segments_.size() || !create_mode_, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); + RAY_LOG(RAY_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with port " << objstore_port_ << " with create_mode_ = " << create_mode_); + RAY_CHECK(segmentid == segments_.size() || !create_mode_, "Object store " << objstoreid_ << " with port " << objstore_port_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); if (segmentid >= segments_.size()) { // resize and initialize segments_ int current_size = segments_.size(); segments_.resize(segmentid + 1); @@ -156,7 +160,7 @@ uint8_t* MemorySegmentPool::get_address(ObjHandle pointer) { // returns the name of the segment std::string MemorySegmentPool::get_segment_name(SegmentId segmentid) { - return std::string("ray-{BC200A09-2465-431D-AEC7-2F8530B04535}-objstore-") + std::to_string(objstoreid_) + std::string("-segment-") + std::to_string(segmentid); + return std::string("ray-{BC200A09-2465-431D-AEC7-2F8530B04535}-objstore-") + std::to_string(objstoreid_) + "-" + objstore_port_ + std::string("-segment-") + std::to_string(segmentid); } MemorySegmentPool::~MemorySegmentPool() { diff --git a/src/ipc.h b/src/ipc.h index fce649ae5..6ce42f4f9 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -117,7 +117,7 @@ enum SegmentStatusType {UNOPENED = 0, OPENED = 1, CLOSED = 2}; class MemorySegmentPool { public: - MemorySegmentPool(ObjStoreId objstoreid, bool create); // can be used in two modes: create mode and open mode (see above) + MemorySegmentPool(ObjStoreId objstoreid, std::string& objstore_address, bool create); // can be used in two modes: create mode and open mode (see above) ~MemorySegmentPool(); ObjHandle allocate(size_t nbytes); // allocate memory, potentially creating a new segment (only run on object store) void deallocate(ObjHandle pointer); // deallocate object, potentially deallocating a new segment (only run on object store) @@ -131,6 +131,10 @@ private: void close_segment(SegmentId segmentid); // close a segment bool create_mode_; // true in the object stores, false on the workers ObjStoreId objstoreid_; // the identity of the associated object store + // The address of the object store. + std::string objstore_address_; + // The port of the object store. This is used to help avoid name collisions. + std::string objstore_port_; size_t page_size_ = bip::mapped_region::get_page_size(); std::vector, SegmentStatusType> > segments_; }; diff --git a/src/objstore.cc b/src/objstore.cc index c5c120776..d6a030d14 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -39,16 +39,28 @@ void ObjStoreService::get_data_from(ObjectID objectid, ObjStore::Stub& stub) { RAY_LOG(RAY_DEBUG, "finished streaming data, objectid was " << objectid << " and size was " << num_bytes); } -ObjStoreService::ObjStoreService(const std::string& objstore_address, std::shared_ptr scheduler_channel) - : scheduler_stub_(Scheduler::NewStub(scheduler_channel)), objstore_address_(objstore_address) { - RAY_CHECK(recv_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), true), "error connecting recv_queue_"); +ObjStoreService::ObjStoreService(const std::string& scheduler_address) + : scheduler_address_(scheduler_address) { +} + +void ObjStoreService::register_objstore() { + RAY_CHECK(!objstore_address_.empty(), "The object store address must be set before register_objstore is called."); + // Create the scheduler stub. + auto scheduler_channel = grpc::CreateChannel(scheduler_address_, grpc::InsecureChannelCredentials()); + scheduler_stub_ = Scheduler::NewStub(scheduler_channel); + + // Create message queue to receive requests from workers. + std::string recv_queue_name = std::string("queue:") + objstore_address_ + std::string(":obj"); + RAY_LOG(RAY_INFO, "Object store creating queue with name " << recv_queue_name << " to receive requests from workers."); + RAY_CHECK(recv_queue_.connect(recv_queue_name, true), "error connecting recv_queue_"); + // Register the objecet store with the scheduler. ClientContext context; RegisterObjStoreRequest request; - request.set_objstore_address(objstore_address); + request.set_objstore_address(objstore_address_); RegisterObjStoreReply reply; scheduler_stub_->RegisterObjStore(&context, request, &reply); objstoreid_ = reply.objstoreid(); - segmentpool_ = std::make_shared(objstoreid_, true); + segmentpool_ = std::make_shared(objstoreid_, objstore_address_, true); } // this method needs to be protected by a objstores_lock_ @@ -319,20 +331,41 @@ void ObjStoreService::start_objstore_service() { }); } -void start_objstore(const char* scheduler_addr, const char* objstore_addr) { - auto scheduler_channel = grpc::CreateChannel(scheduler_addr, grpc::InsecureChannelCredentials()); - RAY_LOG(RAY_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr); - std::string objstore_address(objstore_addr); - ObjStoreService service(objstore_address, scheduler_channel); - service.start_objstore_service(); - std::string::iterator split_point = split_ip_address(objstore_address); - std::string port; - port.assign(split_point, objstore_address.end()); +void set_logfile(const char* log_file_prefix, const std::string& node_ip_address, int port) { + if (log_file_prefix) { + std::string log_file_name = std::string(log_file_prefix) + "objstore-" + node_ip_address + "-" + std::to_string(port) + ".log"; + create_log_dir_or_die(log_file_name.c_str()); + global_ray_config.log_to_file = true; + global_ray_config.logfile.open(log_file_name); + } else { + std::cout << "object store: writing logs to stdout; you can change this by passing --log-file-prefix to ./objstore" << std::endl; + global_ray_config.log_to_file = false; + } +} + +void start_objstore(const std::string& scheduler_address, const std::string& node_ip_address, const char* log_file_prefix) { + // Initialize the object store. + ObjStoreService service(scheduler_address); + int port; ServerBuilder builder; - builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); + // Get GRPC to assign an unused port. + builder.AddListeningPort(std::string("0.0.0.0:0"), grpc::InsecureServerCredentials(), &port); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); - + if (server == nullptr) { + RAY_CHECK(false, "Failed to create the object store server.") + } + // Set the object store address. + service.set_objstore_address(node_ip_address + ":" + std::to_string(port)); + // Set the logfile. + set_logfile(log_file_prefix, node_ip_address, port); + // Register the object store with the scheduler. + service.register_objstore(); + // Launch a thread to process incoming messages in the message queue from + // the workers. + service.start_objstore_service(); + // Process incoming GRPC calls. These may come from the schedeler or from + // other object stores. This method does not return. server->Wait(); } @@ -341,20 +374,12 @@ RayConfig global_ray_config; int main(int argc, char** argv) { RAY_CHECK_GE(argc, 3, "object store: expected at least two arguments (scheduler ip address and object store ip address)"); + const char* log_file_prefix = nullptr; if (argc > 3) { - const char* log_file_name = get_cmd_option(argv, argv + argc, "--log-file-name"); - if (log_file_name) { - std::cout << "object store: writing to log file " << log_file_name << std::endl; - create_log_dir_or_die(log_file_name); - global_ray_config.log_to_file = true; - global_ray_config.logfile.open(log_file_name); - } else { - std::cout << "object store: writing logs to stdout; you can change this by passing --log-file-name to ./scheduler" << std::endl; - global_ray_config.log_to_file = false; - } + log_file_prefix = get_cmd_option(argv, argv + argc, "--log-file-prefix"); } - start_objstore(argv[1], argv[2]); + start_objstore(argv[1], argv[2], log_file_prefix); return 0; } diff --git a/src/objstore.h b/src/objstore.h index 81a3531e2..e35566c9c 100644 --- a/src/objstore.h +++ b/src/objstore.h @@ -37,7 +37,12 @@ enum MemoryStatusType {READY = 0, NOT_READY = 1, DEALLOCATED = 2, NOT_PRESENT = class ObjStoreService final : public ObjStore::Service { public: - ObjStoreService(const std::string& objstore_address, std::shared_ptr scheduler_channel); + ObjStoreService(const std::string& scheduler_address); + // Create the scheduler stub, register the object store with the scheduler, + // and create a message queue for workers to connect to. + void register_objstore(); + // Set the object store address. + void set_objstore_address(const std::string& objstore_address) { objstore_address_ = objstore_address; } Status StartDelivery(ServerContext* context, const StartDeliveryRequest* request, AckReply* reply) override; Status StreamObjTo(ServerContext* context, const StreamObjToRequest* request, ServerWriter* writer) override; @@ -57,6 +62,7 @@ private: void object_ready(ObjectID objectid, size_t metadata_offset); static const size_t CHUNK_SIZE; + std::string scheduler_address_; std::string objstore_address_; ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table std::shared_ptr segmentpool_; diff --git a/src/raylib.cc b/src/raylib.cc index 9ce43b2e3..a16786741 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -665,12 +665,12 @@ static PyObject* create_worker(PyObject* self, PyObject* args) { // The object store address can be the empty string, in which case the // scheduler will choose the object store address. const char* objstore_address; - PyObject* is_driver_obj; - if (!PyArg_ParseTuple(args, "sssO", &node_ip_address, &scheduler_address, &objstore_address, &is_driver_obj)) { + Mode mode; + if (!PyArg_ParseTuple(args, "sssi", &node_ip_address, &scheduler_address, &objstore_address, &mode)) { return NULL; } - bool is_driver = PyObject_IsTrue(is_driver_obj); - Worker* worker = new Worker(std::string(scheduler_address)); + bool is_driver = (mode != Mode::WORKER_MODE); + Worker* worker = new Worker(std::string(node_ip_address), std::string(scheduler_address), mode); worker->register_worker(std::string(node_ip_address), std::string(objstore_address), is_driver); PyObject* t = PyTuple_New(2); @@ -800,12 +800,12 @@ static PyObject* submit_task(PyObject* self, PyObject* args) { return list; } -static PyObject* notify_task_completed(PyObject* self, PyObject* args) { +static PyObject* ready_for_new_task(PyObject* self, PyObject* args) { Worker* worker; if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) { return NULL; } - worker->notify_task_completed(); + worker->ready_for_new_task(); Py_RETURN_NONE; } @@ -920,18 +920,36 @@ static PyObject* scheduler_info(PyObject* self, PyObject* args) { SchedulerInfoReply reply; worker->scheduler_info(context, request, reply); + // Unpack the target object reference information. PyObject* target_objectid_list = PyList_New(reply.target_objectid_size()); for (size_t i = 0; i < reply.target_objectid_size(); ++i) { PyList_SetItem(target_objectid_list, i, PyInt_FromLong(reply.target_objectid(i))); } + // Unpack the reference count information. PyObject* reference_count_list = PyList_New(reply.reference_count_size()); for (size_t i = 0; i < reply.reference_count_size(); ++i) { PyList_SetItem(reference_count_list, i, PyInt_FromLong(reply.reference_count(i))); } + // Unpack the available worker information. + PyObject* available_worker_list = PyList_New(reply.avail_worker_size()); + for (size_t i = 0; i < reply.avail_worker_size(); ++i) { + PyList_SetItem(available_worker_list, i, PyInt_FromLong(reply.avail_worker(i))); + } + // Unpack the object store information. + PyObject* objstore_list = PyList_New(reply.objstore_size()); + for (size_t i = 0; i < reply.objstore_size(); ++i) { + PyObject* objstore_data = PyDict_New(); + set_dict_item_and_transfer_ownership(objstore_data, PyString_FromString("objstoreid"), PyInt_FromLong(reply.objstore(i).objstoreid())); + set_dict_item_and_transfer_ownership(objstore_data, PyString_FromString("address"), PyString_FromStringAndSize(reply.objstore(i).address().data(), reply.objstore(i).address().size())); + PyList_SetItem(objstore_list, i, objstore_data); + } + // Store the unpacked values in a dictionary to return. PyObject* dict = PyDict_New(); set_dict_item_and_transfer_ownership(dict, PyString_FromString("target_objectids"), target_objectid_list); set_dict_item_and_transfer_ownership(dict, PyString_FromString("reference_counts"), reference_count_list); + set_dict_item_and_transfer_ownership(dict, PyString_FromString("available_workers"), available_worker_list); + set_dict_item_and_transfer_ownership(dict, PyString_FromString("objstores"), objstore_list); return dict; } @@ -1059,7 +1077,7 @@ static PyMethodDef RayLibMethods[] = { { "alias_objectids", alias_objectids, METH_VARARGS, "make two objectids refer to the same object" }, { "wait_for_next_message", wait_for_next_message, METH_VARARGS, "get next message from scheduler (blocking)" }, { "submit_task", submit_task, METH_VARARGS, "call a remote function" }, - { "notify_task_completed", notify_task_completed, METH_VARARGS, "notify the scheduler that a task has been completed" }, + { "ready_for_new_task", ready_for_new_task, METH_VARARGS, "notify the scheduler that a task has been completed" }, { "start_worker_service", start_worker_service, METH_VARARGS, "start the worker service" }, { "scheduler_info", scheduler_info, METH_VARARGS, "get info about scheduler state" }, { "task_info", task_info, METH_VARARGS, "get information about task statuses and failures" }, diff --git a/src/scheduler.cc b/src/scheduler.cc index 636c198be..495b2de64 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -215,6 +215,7 @@ Status SchedulerService::RegisterObjStore(ServerContext* context, const Register } Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWorkerRequest* request, RegisterWorkerReply* reply) { + std::string worker_address = request->worker_address(); std::string objstore_address = request->objstore_address(); std::string node_ip_address = request->node_ip_address(); bool is_driver = request->is_driver(); @@ -250,19 +251,11 @@ Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWo } else { RAY_CHECK_NEQ(objstoreid, std::numeric_limits::max(), "Object store with address " << objstore_address << " not yet registered."); } - // Populate the worker information and generate a worker address. + // Populate the worker information. WorkerId workerid; - std::string worker_address; { auto workers = GET(workers_); workerid = workers->size(); - // Generate a random port number. This is currently a hack to avoid reusing - // port numbers when we run the tests. - std::random_device rd; - std::mt19937 rng(rd()); - std::uniform_int_distribution uni(0, 10000); - int port_number = 40000 + uni(rng); - worker_address = node_ip_address + ":" + std::to_string(port_number); workers->push_back(WorkerHandle()); auto channel = grpc::CreateChannel(worker_address, grpc::InsecureChannelCredentials()); (*workers)[workerid].channel = channel; @@ -279,7 +272,6 @@ Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWo RAY_LOG(RAY_INFO, "Finished registering worker with workerid " << workerid << ", worker address " << worker_address << " on node with IP address " << node_ip_address << ", is_driver = " << is_driver << ", assigned to object store with id " << objstoreid << " and address " << objstore_address); reply->set_workerid(workerid); reply->set_objstoreid(objstoreid); - reply->set_worker_address(worker_address); reply->set_objstore_address(objstore_address); schedule(); return Status::OK; @@ -724,27 +716,40 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn auto avail_workers = GET(avail_workers_); auto task_queue = GET(task_queue_); auto reference_counts = GET(reference_counts_); + auto objstores = GET(objstores_); auto target_objectids = GET(target_objectids_); auto function_table = reply->mutable_function_table(); + // Return info about the reference counts. for (int i = 0; i < reference_counts->size(); ++i) { reply->add_reference_count((*reference_counts)[i]); } + // Return info about the target objectids. for (int i = 0; i < target_objectids->size(); ++i) { reply->add_target_objectid((*target_objectids)[i]); } + // Return info about the function table. for (const auto& entry : *fntable) { (*function_table)[entry.first].set_num_return_vals(entry.second.num_return_vals()); for (const WorkerId& worker : entry.second.workers()) { (*function_table)[entry.first].add_workerid(worker); } } + // Return info about the task queue. for (const auto& entry : *task_queue) { reply->add_operationid(entry); } + // Return info about the available workers. for (const WorkerId& entry : *avail_workers) { reply->add_avail_worker(entry); } + // Return info about the computation graph. computation_graph->to_protobuf(reply->mutable_computation_graph()); + // Return info about the object stores. + for (int i = 0; i < objstores->size(); ++i) { + ObjstoreData* objstore_data = reply->add_objstore(); + objstore_data->set_objstoreid(i); + objstore_data->set_address((*objstores)[i].address); + } } // pick_objstore must be called with a canonical_objectid @@ -1064,6 +1069,9 @@ void start_scheduler_service(const char* service_addr, SchedulingAlgorithmType s builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); + if (server == nullptr) { + RAY_CHECK(false, "Failed to create the scheduler server.") + } server->Wait(); } diff --git a/src/worker.cc b/src/worker.cc index bff9a07f2..c504a7d0a 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -9,11 +9,8 @@ extern "C" { static PyObject *RayError; } -inline WorkerServiceImpl::WorkerServiceImpl(const std::string& worker_address, Mode mode) - : worker_address_(worker_address), - mode_(mode) { - RAY_CHECK(send_queue_.connect(worker_address_, false), "error connecting send_queue_"); -} +inline WorkerServiceImpl::WorkerServiceImpl(Mode mode) + : mode_(mode) {} Status WorkerServiceImpl::ExecuteTask(ServerContext* context, const ExecuteTaskRequest* request, AckReply* reply) { RAY_CHECK(mode_ == Mode::WORKER_MODE, "ExecuteTask can only be called on workers."); @@ -87,10 +84,23 @@ Status WorkerServiceImpl::PrintErrorMessage(ServerContext* context, const PrintE return Status::OK; } -Worker::Worker(const std::string& scheduler_address) - : scheduler_address_(scheduler_address) { - auto scheduler_channel = grpc::CreateChannel(scheduler_address, grpc::InsecureChannelCredentials()); +void WorkerServiceImpl::connect_to_queue() { + RAY_LOG(RAY_DEBUG, "Worker service creating queue with name " << worker_address_ << " to commmunicate with worker."); + RAY_CHECK(send_queue_.connect(worker_address_, true), "error connecting send_queue_"); +} + +Worker::Worker(const std::string& node_ip_address, const std::string& scheduler_address, Mode mode) + : node_ip_address_(node_ip_address), + scheduler_address_(scheduler_address), + mode_(mode) { + // Connect to the scheduler service. + RAY_LOG(RAY_DEBUG, "Worker creating a scheduler stub.") + auto scheduler_channel = grpc::CreateChannel(scheduler_address_, grpc::InsecureChannelCredentials()); scheduler_stub_ = Scheduler::NewStub(scheduler_channel); + // Start the worker service. This will find an unused port which is stored in + // worker_port_. This also sets up a message queue between the worker and the + // worker service. + start_worker_service(mode_); } @@ -122,6 +132,7 @@ void Worker::register_worker(const std::string& node_ip_address, const std::stri unsigned int retry_wait_milliseconds = 20; RegisterWorkerRequest request; request.set_node_ip_address(node_ip_address); + request.set_worker_address(worker_address_); // The object store address can be the empty string, in which case the // scheduler will assign an object store address. request.set_objstore_address(objstore_address); @@ -142,11 +153,15 @@ void Worker::register_worker(const std::string& node_ip_address, const std::stri workerid_ = reply.workerid(); objstoreid_ = reply.objstoreid(); objstore_address_ = reply.objstore_address(); - worker_address_ = reply.worker_address(); - segmentpool_ = std::make_shared(objstoreid_, false); - RAY_CHECK(receive_queue_.connect(worker_address_, true), "error connecting receive_queue_"); - RAY_CHECK(request_obj_queue_.connect(std::string("queue:") + objstore_address_ + std::string(":obj"), false), "error connecting request_obj_queue_"); - RAY_CHECK(receive_obj_queue_.connect(std::string("queue:") + objstore_address_ + std::string(":worker:") + std::to_string(workerid_) + std::string(":obj"), true), "error connecting receive_obj_queue_"); + segmentpool_ = std::make_shared(objstoreid_, objstore_address_, false); + // Connect to the queue for sending requests to the object store. + std::string request_obj_queue_name = std::string("queue:") + objstore_address_ + std::string(":obj"); + RAY_LOG(RAY_DEBUG, "Worker connecting to queue with name " << request_obj_queue_name << " to send requests to the object store."); + RAY_CHECK(request_obj_queue_.connect(request_obj_queue_name, false), "error connecting request_obj_queue_"); + // Create a queue for receiving messages from the object store. + std::string receive_obj_queue_name = std::string("queue:") + objstore_address_ + std::string(":worker:") + std::to_string(workerid_) + std::string(":obj"); + RAY_LOG(RAY_DEBUG, "Worker creating queue with name " << receive_obj_queue_name << " to receive messages from the object store."); + RAY_CHECK(receive_obj_queue_.connect(receive_obj_queue_name, true), "error connecting receive_obj_queue_"); connected_ = true; return; } @@ -374,7 +389,7 @@ std::unique_ptr Worker::receive_next_message() { return std::unique_ptr(message_ptr); } -void Worker::notify_task_completed() { +void Worker::ready_for_new_task() { RAY_CHECK(connected_, "Attempted to perform notify_task_completed but failed."); ClientContext context; ReadyForNewTaskRequest request; @@ -389,7 +404,7 @@ void Worker::disconnect() { // return. server_ptr_->Shutdown(); // Wait for the thread that launched the worker service to return. - worker_server_thread_->join(); + worker_server_thread_.join(); } // TODO(rkn): Should we be using pointers or references? And should they be const? @@ -430,34 +445,49 @@ void Worker::export_reusable_variable(const std::string& name, const std::string // (in our case running in the main thread), whereas the WorkerService will // run in a separate thread and potentially utilize multiple threads. void Worker::start_worker_service(Mode mode) { - const char* service_addr = worker_address_.c_str(); + RAY_LOG(RAY_DEBUG, "Worker is starting the worker service."); + // Signal when the worker service has started. + std::condition_variable worker_service_started; + // Lock for the above condition. + std::mutex worker_service_started_mutex; // Launch a new thread for running the worker service. We store this as a // field so that we can clean it up when we disconnect the worker. - worker_server_thread_ = std::unique_ptr(new std::thread([this, service_addr, mode]() { - std::string service_address(service_addr); - std::string::iterator split_point = split_ip_address(service_address); - std::string port; - port.assign(split_point, service_address.end()); - // Create the worker service. - WorkerServiceImpl service(service_address, mode); + worker_server_thread_ = std::thread([this, mode, &worker_service_started]() { ServerBuilder builder; - builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); + // Get GRPC to assign an unused port number. + builder.AddListeningPort(std::string("0.0.0.0:0"), grpc::InsecureServerCredentials(), &worker_port_); + // Create and start the worker service. + WorkerServiceImpl service(mode); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); server_ptr_ = server.get(); - RAY_LOG(RAY_INFO, "worker server listening on " << service_address); - // If this is part of a worker process (and not a driver process), then tell - // the scheduler that it is ready to start receiving tasks. - if (mode == Mode::WORKER_MODE) { - ClientContext context; - ReadyForNewTaskRequest request; - request.set_workerid(workerid_); - AckReply reply; - scheduler_stub_->ReadyForNewTask(&context, request, &reply); + if (server == nullptr) { + RAY_CHECK(false, "Failed to create the worker server.") } + RAY_LOG(RAY_DEBUG, "Worker service listening on " << worker_address_); + worker_address_ = node_ip_address_ + ":" + std::to_string(worker_port_); + service.set_worker_address(worker_address_); + // Connect the worker service by a queue to the worker object. + service.connect_to_queue(); + // Use the condition variable to notify the outside thread that the worker + // service has been started. + // TODO(rkn): Once this has been called, the outside thread will notify the + // scheduler that the worker is ready to receive tasks. This can happen + // before server->Wait() is called below. What happens to messages sent from + // the scheduler before the call to server->Wait()? + worker_service_started.notify_all(); // Wait for work and process work. This method does not return until // Shutdown is called from a different thread. server->Wait(); RAY_LOG(RAY_INFO, "Worker service thread returning.") - })); + }); + { + // Wait until we know the worker service has been started. + std::unique_lock lock(worker_service_started_mutex); + worker_service_started.wait(lock); + } + // Connect to the queue for receiving messages from the worker service. + std::string receive_queue_name = worker_address_; + RAY_LOG(RAY_DEBUG, "Worker connecting to queue with name " << receive_queue_name << " to commmunicate with worker service."); + RAY_CHECK(receive_queue_.connect(receive_queue_name, false), "error connecting receive_queue_"); } diff --git a/src/worker.h b/src/worker.h index cc84ead30..af480bbd9 100644 --- a/src/worker.h +++ b/src/worker.h @@ -1,6 +1,8 @@ #ifndef RAY_WORKER_H #define RAY_WORKER_H +#include +#include #include #include #include @@ -30,12 +32,16 @@ enum Mode {SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE}; class WorkerServiceImpl final : public WorkerService::Service { public: - WorkerServiceImpl(const std::string& worker_address, Mode mode); + WorkerServiceImpl(Mode mode); Status ExecuteTask(ServerContext* context, const ExecuteTaskRequest* request, AckReply* reply) override; Status ImportRemoteFunction(ServerContext* context, const ImportRemoteFunctionRequest* request, AckReply* reply) override; Status Die(ServerContext* context, const DieRequest* request, AckReply* reply) override; Status ImportReusableVariable(ServerContext* context, const ImportReusableVariableRequest* request, AckReply* reply) override; Status PrintErrorMessage(ServerContext* context, const PrintErrorMessageRequest* request, AckReply* reply) override; + // Set worker address. + void set_worker_address(const std::string& worker_address) { worker_address_ = worker_address; } + // Connect the worker service to the worker object via a queue. + void connect_to_queue(); private: std::string worker_address_; MessageQueue send_queue_; @@ -46,8 +52,10 @@ private: class Worker { public: - Worker(const std::string& scheduler_address); - + // This constructor constructs a stub for the scheduler service. It also + // starts the worker service, which also sets up a message queue between the + // worker and the worker service. + Worker(const std::string& node_ip_address, const std::string& scheduler_address, Mode mode); // Submit a remote task to the scheduler. If the function in the task is not // registered with the scheduler, we will sleep for retry_wait_milliseconds // and try to resubmit the task to the scheduler up to max_retries more times. @@ -84,16 +92,16 @@ class Worker { void register_remote_function(const std::string& name, size_t num_return_vals); // Notify the scheduler that a failure has occurred. void notify_failure(FailedType type, const std::string& name, const std::string& error_message); - // Start the worker server which accepts commands from the scheduler. For - // workers, these commands are stored in the message queue, which is read by - // the Python interpreter. For drivers, these commands are only for printing - // error messages. + // Start the worker server which accepts commands from the scheduler. This + // also creates a message queue that worker service uses to send messages to + // the worker. The queue is read by the Python interpreter. For drivers, these + // commands are only for printing error messages. void start_worker_service(Mode mode); // wait for next task from the RPC system. If null, it means there are no more tasks and the worker should shut down. std::unique_ptr receive_next_message(); // tell the scheduler that we are done with the current task and request the // next one. - void notify_task_completed(); + void ready_for_new_task(); // disconnect the worker void disconnect(); // return connected_ @@ -113,8 +121,8 @@ class Worker { bool connected_; const size_t CHUNK_SIZE = 8 * 1024; std::unique_ptr scheduler_stub_; - std::unique_ptr worker_server_thread_; Server* server_ptr_; + std::thread worker_server_thread_; MessageQueue receive_queue_; bip::managed_shared_memory segment_; WorkerId workerid_; @@ -122,6 +130,9 @@ class Worker { std::string scheduler_address_; std::string objstore_address_; std::string worker_address_; + std::string node_ip_address_; + int worker_port_; + Mode mode_; MessageQueue request_obj_queue_; MessageQueue receive_obj_queue_; std::shared_ptr segmentpool_; diff --git a/test/runtest.py b/test/runtest.py index c3cb985c3..7b9b60d2a 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -81,32 +81,51 @@ class ObjStoreTest(unittest.TestCase): # Test setting up object stores, transfering data between them and retrieving data to a client def testObjStore(self): - scheduler_address, objstore_addresses = ray.services.start_ray_local(num_objstores=2, num_workers=0, worker_path=None) + node_ip_address = "127.0.0.1" + scheduler_address = ray.services.start_ray_local(num_objstores=2, num_workers=0, worker_path=None) + ray.connect(node_ip_address, scheduler_address, mode=ray.SCRIPT_MODE) + objstore_addresses = [objstore_info["address"] for objstore_info in ray.scheduler_info()["objstores"]] w1 = ray.worker.Worker() w2 = ray.worker.Worker() - node_ip_address = "127.0.0.1" - ray.connect(node_ip_address, scheduler_address, objstore_addresses[0], is_driver=True, mode=ray.SCRIPT_MODE, worker=w1) ray.reusables._cached_reusables = [] # This is a hack to make the test run. - ray.connect(node_ip_address, scheduler_address, objstore_addresses[1], is_driver=True, mode=ray.SCRIPT_MODE, worker=w2) + ray.connect(node_ip_address, scheduler_address, objstore_address=objstore_addresses[0], mode=ray.SCRIPT_MODE, worker=w1) + ray.reusables._cached_reusables = [] # This is a hack to make the test run. + ray.connect(node_ip_address, scheduler_address, objstore_address=objstore_addresses[1], mode=ray.SCRIPT_MODE, worker=w2) # putting and getting an object shouldn't change it - for data in ["h", "h" * 10000, 0, 0.0]: + for data in RAY_TEST_OBJECTS: objectid = ray.put(data, w1) result = ray.get(objectid, w1) self.assertEqual(result, data) # putting an object, shipping it to another worker, and getting it shouldn't change it - for data in ["h", "h" * 10000, 0, 0.0, [1, 2, 3, "a", (1, 2)], ("a", ("b", 3))]: + for data in RAY_TEST_OBJECTS: objectid = ray.put(data, w1) result = ray.get(objectid, w2) self.assertEqual(result, data) + # putting an object, shipping it to another worker, and getting it shouldn't change it + for data in RAY_TEST_OBJECTS: + objectid = ray.put(data, w2) + result = ray.get(objectid, w1) + self.assertEqual(result, data) + + ARRAY_TEST_OBJECTS = [np.zeros([10, 20]), np.random.normal(size=[45, 25]), + ("a", np.random.normal(size=[10, 10])), + ["a", np.random.normal(size=[10, 10])]] + # putting an array, shipping it to another worker, and getting it shouldn't change it - for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25])]: + for data in ARRAY_TEST_OBJECTS: objectid = ray.put(data, w1) result = ray.get(objectid, w2) assert_equal(result, data) + # putting an array, shipping it to another worker, and getting it shouldn't change it + for data in ARRAY_TEST_OBJECTS: + objectid = ray.put(data, w2) + result = ray.get(objectid, w1) + assert_equal(result, data) + # This test fails. See https://github.com/amplab/ray/issues/159. # getting multiple times shouldn't matter # for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]: @@ -116,20 +135,6 @@ class ObjStoreTest(unittest.TestCase): # result = worker.get(objectid, w2) # assert_equal(result, data) - # shipping a numpy array inside something else should be fine - data = ("a", np.random.normal(size=[10, 10])) - objectid = ray.put(data, w1) - result = ray.get(objectid, w2) - self.assertEqual(data[0], result[0]) - assert_equal(data[1], result[1]) - - # shipping a numpy array inside something else should be fine - data = ["a", np.random.normal(size=[10, 10])] - objectid = ray.put(data, w1) - result = ray.get(objectid, w2) - self.assertEqual(data[0], result[0]) - assert_equal(data[1], result[1]) - # Getting a buffer after modifying it before it finishes should return updated buffer objectid = ray.libraylib.get_objectid(w1.handle) buf = ray.libraylib.allocate_buffer(w1.handle, objectid, 100)