diff --git a/lib/orchpy/orchpy/services.py b/lib/orchpy/orchpy/services.py index cb6cdaca7..bc42b1744 100644 --- a/lib/orchpy/orchpy/services.py +++ b/lib/orchpy/orchpy/services.py @@ -81,7 +81,26 @@ def start_worker(test_path, scheduler_address, objstore_address, worker_address) "--worker-address=" + worker_address]) all_processes.append((p, worker_address)) -def start_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None): +def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None): + """ + Start an object store and associated workers that will be part of a larger cluster. + Assumes the scheduler has already been started. + + :param scheduler_address: ip address and port of the scheduler (which may run on a different node) + :param node_ip_address: ip address (without port) of the node this function is run on + :param num_workers: the number of workers to be started on this node + :worker_path: path of the source code that will be run on the worker + """ + objstore_address = address(node_ip_address, new_objstore_port()) + start_objstore(scheduler_address, objstore_address) + time.sleep(0.2) + for _ in range(num_workers): + start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + time.sleep(0.3) + orchpy.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + time.sleep(0.5) + +def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None): global drivers if num_workers_per_objstore > 0 and worker_path is None: raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore)) diff --git a/src/objstore.cc b/src/objstore.cc index b3e1e0b95..83db32bde 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -1,5 +1,7 @@ #include "objstore.h" + #include +#include "utils.h" const size_t ObjStoreService::CHUNK_SIZE = 8 * 1024; @@ -262,14 +264,17 @@ void ObjStoreService::process_requests() { recv_queue_.receive(&request); switch (request.type) { case ObjRequestType::ALLOC: { + ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Allocate object with objref " << request.objref << " and size " << request.size); process_worker_request(request); } break; case ObjRequestType::GET: { + ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Get object with objref " << request.objref); process_worker_request(request); } break; case ObjRequestType::WORKER_DONE: { + ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Finalize object with objref " << request.objref); process_worker_request(request); } break; @@ -304,6 +309,7 @@ ObjHandle ObjStoreService::alloc(ObjRef objref, size_t size) { ObjHandle handle = segmentpool_->allocate(size); segmentpool_lock_.unlock(); std::lock_guard memory_lock(memory_lock_); + ORCH_LOG(ORCH_VERBOSE, "Allocating space for objref " << objref << " on object store " << objstoreid_); if (memory_[objref].second != MemoryStatusType::NOT_PRESENT && memory_[objref].second != MemoryStatusType::PRE_ALLOCED) { ORCH_LOG(ORCH_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); } @@ -346,8 +352,11 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) { std::string objstore_address(objstore_addr); ObjStoreService service(objstore_address, scheduler_channel); service.start_objstore_service(); + std::string::iterator split_point = split_ip_address(objstore_address); + std::string port; + port.assign(split_point, objstore_address.end()); ServerBuilder builder; - builder.AddListeningPort(std::string(objstore_addr), grpc::InsecureServerCredentials()); + builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); @@ -356,6 +365,7 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) { int main(int argc, char** argv) { if (argc != 3) { + ORCH_LOG(ORCH_FATAL, "object store: expected two arguments (scheduler ip address and object store ip address)"); return 1; } diff --git a/src/scheduler.cc b/src/scheduler.cc index 8f0bdd0f1..49c1224c7 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -1,8 +1,10 @@ +#include "scheduler.h" + #include #include #include -#include "scheduler.h" +#include "utils.h" Status SchedulerService::RemoteCall(ServerContext* context, const RemoteCallRequest* request, RemoteCallReply* reply) { std::unique_ptr task(new Call(request->call())); // need to copy, because request is const @@ -661,18 +663,24 @@ void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector upstream_objrefs(downstream_objref, equivalent_objrefs); } -void start_scheduler_service(const char* server_address) { +void start_scheduler_service(const char* service_addr) { + std::string service_address(service_addr); + std::string::iterator split_point = split_ip_address(service_address); + std::string port; + port.assign(split_point, service_address.end()); SchedulerService service; ServerBuilder builder; - builder.AddListeningPort(std::string(server_address), grpc::InsecureServerCredentials()); + builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); server->Wait(); } int main(int argc, char** argv) { - if (argc != 2) + if (argc != 2) { + ORCH_LOG(ORCH_FATAL, "scheduler: expected one argument (scheduler ip address)"); return 1; + } start_scheduler_service(argv[1]); return 0; } diff --git a/src/utils.h b/src/utils.h new file mode 100644 index 000000000..e5809d17a --- /dev/null +++ b/src/utils.h @@ -0,0 +1,24 @@ +#ifndef ORCHESTRA_UTILS_H +#define ORCHESTRA_UTILS_H + +std::string::iterator split_ip_address(std::string& ip_address) { + if (ip_address[0] == '[') { // IPv6 + auto split_end = std::find(ip_address.begin() + 1, ip_address.end(), ']'); + if(split_end != ip_address.end()) { + split_end++; + } + if(split_end != ip_address.end() && *split_end == ':') { + return split_end; + } + ORCH_LOG(ORCH_FATAL, "ip address should contain a port number"); + } else { // IPv4 + auto split_point = std::find(ip_address.rbegin(), ip_address.rend(), ':').base(); + if (split_point == ip_address.begin()) { + ORCH_LOG(ORCH_FATAL, "ip address should contain a port number"); + } else { + return split_point; + } + } +} + +#endif diff --git a/src/worker.cc b/src/worker.cc index ec28aa56b..0c4c9f03e 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -1,5 +1,7 @@ #include "worker.h" +#include "utils.h" + #include extern "C" { @@ -289,14 +291,18 @@ void Worker::scheduler_info(ClientContext &context, SchedulerInfoRequest &reques // (in our case running in the main thread), whereas the WorkerService will // run in a separate thread and potentially utilize multiple threads. void Worker::start_worker_service() { - const char* server_address = worker_address_.c_str(); - worker_server_thread_ = std::thread([server_address]() { - WorkerServiceImpl service(server_address); + const char* service_addr = worker_address_.c_str(); + worker_server_thread_ = std::thread([service_addr]() { + std::string service_address(service_addr); + std::string::iterator split_point = split_ip_address(service_address); + std::string port; + port.assign(split_point, service_address.end()); + WorkerServiceImpl service(service_address); ServerBuilder builder; - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); - ORCH_LOG(ORCH_INFO, "worker server listening on " << server_address); + ORCH_LOG(ORCH_INFO, "worker server listening on " << service_address); server->Wait(); }); } diff --git a/test/arrays_test.py b/test/arrays_test.py index 685474b1f..18e53486b 100644 --- a/test/arrays_test.py +++ b/test/arrays_test.py @@ -22,7 +22,7 @@ class ArraysSingleTest(unittest.TestCase): def testMethods(self): test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - services.start_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path) + services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path) # test eye ref = single.eye(3, "float") @@ -54,7 +54,7 @@ class ArraysSingleTest(unittest.TestCase): class ArraysDistTest(unittest.TestCase): def testSerialization(self): - [w] = services.start_cluster(return_drivers=True) + [w] = services.start_singlenode_cluster(return_drivers=True) x = dist.DistArray() x.construct([2, 3, 4], np.array([[[orchpy.push(0, w)]]])) @@ -68,7 +68,7 @@ class ArraysDistTest(unittest.TestCase): def testAssemble(self): test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - services.start_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path) + services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path) a = single.ones([dist.BLOCK_SIZE, dist.BLOCK_SIZE], "float") b = single.zeros([dist.BLOCK_SIZE, dist.BLOCK_SIZE], "float") @@ -81,7 +81,7 @@ class ArraysDistTest(unittest.TestCase): def testMethods(self): test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - services.start_cluster(return_drivers=False, num_objstores=2, num_workers_per_objstore=8, worker_path=test_path) + services.start_singlenode_cluster(return_drivers=False, num_objstores=2, num_workers_per_objstore=8, worker_path=test_path) x = dist.zeros([9, 25, 51], "float") y = dist.assemble(x) diff --git a/test/runtest.py b/test/runtest.py index 349c00167..c91c4bbeb 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -31,7 +31,7 @@ class SerializationTest(unittest.TestCase): self.assertTrue((a == c).all()) def testSerialize(self): - [w] = services.start_cluster(return_drivers=True) + [w] = services.start_singlenode_cluster(return_drivers=True) self.roundTripTest(w, [1, "hello", 3.0]) self.roundTripTest(w, 42) @@ -69,7 +69,7 @@ class ObjStoreTest(unittest.TestCase): # Test setting up object stores, transfering data between them and retrieving data to a client def testObjStore(self): - [w1, w2] = services.start_cluster(return_drivers=True, num_objstores=2, num_workers_per_objstore=0) + [w1, w2] = services.start_singlenode_cluster(return_drivers=True, num_objstores=2, num_workers_per_objstore=0) # pushing and pulling an object shouldn't change it for data in ["h", "h" * 10000, 0, 0.0]: @@ -120,7 +120,7 @@ class SchedulerTest(unittest.TestCase): def testCall(self): test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - [w] = services.start_cluster(return_drivers=True, num_workers_per_objstore=1, worker_path=test_path) + [w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=1, worker_path=test_path) value_before = "test_string" objref = w.remote_call("test_functions.print_string", [value_before]) @@ -137,7 +137,7 @@ class SchedulerTest(unittest.TestCase): class WorkerTest(unittest.TestCase): def testPushPull(self): - [w] = services.start_cluster(return_drivers=True) + [w] = services.start_singlenode_cluster(return_drivers=True) for i in range(100): value_before = i * 10 ** 6 @@ -170,7 +170,7 @@ class APITest(unittest.TestCase): def testObjRefAliasing(self): test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - [w] = services.start_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=test_path) + [w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=test_path) objref = w.remote_call("test_functions.test_alias_f", []) self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5]))) @@ -186,7 +186,7 @@ class ReferenceCountingTest(unittest.TestCase): def testDeallocation(self): test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - services.start_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path) + services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path) x = test_functions.test_alias_f() orchpy.pull(x)