From 1ae7e7d29ef894660e444c9295babb5ce92accf1 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 27 Feb 2017 12:24:07 -0800 Subject: [PATCH] Rename photon -> local scheduler. (#322) --- .travis.yml | 8 +- CMakeLists.txt | 2 +- python/common/test/test.py | 34 +- .../{photon => local_scheduler}/__init__.py | 0 python/global_scheduler/test/test.py | 40 +-- .../{photon => local_scheduler}/__init__.py | 4 +- .../build/.gitkeep | 0 .../local_scheduler_services.py} | 2 +- .../{photon => local_scheduler}/test/test.py | 42 +-- python/ray/actor.py | 8 +- python/ray/experimental/state.py | 2 +- python/ray/services.py | 29 +- python/ray/worker.py | 75 ++--- python/setup.py | 4 +- scripts/stop_ray.sh | 2 +- src/common/CMakeLists.txt | 2 +- src/common/lib/python/common_extension.c | 4 +- src/global_scheduler/global_scheduler.c | 70 ++-- src/global_scheduler/global_scheduler.h | 28 +- .../global_scheduler_algorithm.c | 90 +++--- src/local_scheduler/CMakeLists.txt | 45 +++ .../build/.gitkeep | 0 .../local_scheduler.c} | 17 +- .../local_scheduler.h} | 8 +- .../local_scheduler_algorithm.c} | 9 +- .../local_scheduler_algorithm.h} | 10 +- .../local_scheduler_client.c} | 38 +-- .../local_scheduler_client.h} | 48 +-- .../local_scheduler_extension.c | 234 ++++++++++++++ .../local_scheduler_shared.h} | 10 +- .../test/local_scheduler_tests.c} | 303 ++++++++++-------- .../test/run_tests.sh | 2 +- .../test/run_valgrind.sh | 2 +- src/photon/CMakeLists.txt | 45 --- src/photon/photon_extension.c | 225 ------------- webui/backend/ray_ui.py | 4 +- 36 files changed, 758 insertions(+), 688 deletions(-) rename python/core/src/{photon => local_scheduler}/__init__.py (100%) rename python/{photon => local_scheduler}/__init__.py (50%) rename python/{photon => local_scheduler}/build/.gitkeep (100%) rename python/{photon/photon_services.py => local_scheduler/local_scheduler_services.py} (98%) rename python/{photon => local_scheduler}/test/test.py (79%) create mode 100644 src/local_scheduler/CMakeLists.txt rename src/{photon => local_scheduler}/build/.gitkeep (100%) rename src/{photon/photon_scheduler.c => local_scheduler/local_scheduler.c} (98%) rename src/{photon/photon_scheduler.h => local_scheduler/local_scheduler.h} (97%) rename src/{photon/photon_algorithm.c => local_scheduler/local_scheduler_algorithm.c} (99%) rename src/{photon/photon_algorithm.h => local_scheduler/local_scheduler_algorithm.h} (98%) rename src/{photon/photon_client.c => local_scheduler/local_scheduler_client.c} (62%) rename src/{photon/photon_client.h => local_scheduler/local_scheduler_client.h} (58%) create mode 100644 src/local_scheduler/local_scheduler_extension.c rename src/{photon/photon.h => local_scheduler/local_scheduler_shared.h} (95%) rename src/{photon/test/photon_tests.c => local_scheduler/test/local_scheduler_tests.c} (67%) rename src/{photon => local_scheduler}/test/run_tests.sh (91%) rename src/{photon => local_scheduler}/test/run_valgrind.sh (91%) delete mode 100644 src/photon/CMakeLists.txt delete mode 100644 src/photon/photon_extension.c diff --git a/.travis.yml b/.travis.yml index d6e3a5d87..853e83b74 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,12 +41,12 @@ matrix: - cd python/core - bash ../../src/common/test/run_valgrind.sh - bash ../../src/plasma/test/run_valgrind.sh - - bash ../../src/photon/test/run_valgrind.sh + - bash ../../src/local_scheduler/test/run_valgrind.sh - cd ../.. script: - python ./python/plasma/test/test.py valgrind - - python ./python/photon/test/test.py valgrind + - python ./python/local_scheduler/test/test.py valgrind - python ./python/global_scheduler/test/test.py valgrind install: @@ -58,7 +58,7 @@ install: - cd python/core - bash ../../src/common/test/run_tests.sh - bash ../../src/plasma/test/run_tests.sh - - bash ../../src/photon/test/run_tests.sh + - bash ../../src/local_scheduler/test/run_tests.sh - cd ../.. script: @@ -69,7 +69,7 @@ script: - python python/common/test/test.py - python python/common/redis_module/runtest.py - python python/plasma/test/test.py - - python python/photon/test/test.py + - python python/local_scheduler/test/test.py - python python/global_scheduler/test/test.py - python test/runtest.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b69562aa..2c2bc9294 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,6 @@ project(ray) add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/common/) add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/plasma/) -add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/photon/) +add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/local_scheduler/) add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/global_scheduler/) add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/numbuf/) diff --git a/python/common/test/test.py b/python/common/test/test.py index 14b76e556..8bc27daab 100644 --- a/python/common/test/test.py +++ b/python/common/test/test.py @@ -7,21 +7,21 @@ import pickle import sys import unittest -import photon +import local_scheduler ID_SIZE = 20 def random_object_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_function_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_driver_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_task_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) BASE_SIMPLE_OBJECTS = [ 0, 1, 100000, 0.0, 0.5, 0.9, 100000.1, (), [], {}, @@ -65,9 +65,9 @@ class TestSerialization(unittest.TestCase): def test_serialize_by_value(self): for val in SIMPLE_OBJECTS: - self.assertTrue(photon.check_simple_value(val)) + self.assertTrue(local_scheduler.check_simple_value(val)) for val in COMPLEX_OBJECTS: - self.assertFalse(photon.check_simple_value(val)) + self.assertFalse(local_scheduler.check_simple_value(val)) class TestObjectID(unittest.TestCase): @@ -92,17 +92,17 @@ class TestObjectID(unittest.TestCase): self.assertRaises(Exception, lambda : pickling.dumps(h)) def test_equality_comparisons(self): - x1 = photon.ObjectID(ID_SIZE * b"a") - x2 = photon.ObjectID(ID_SIZE * b"a") - y1 = photon.ObjectID(ID_SIZE * b"b") - y2 = photon.ObjectID(ID_SIZE * b"b") + x1 = local_scheduler.ObjectID(ID_SIZE * b"a") + x2 = local_scheduler.ObjectID(ID_SIZE * b"a") + y1 = local_scheduler.ObjectID(ID_SIZE * b"b") + y2 = local_scheduler.ObjectID(ID_SIZE * b"b") self.assertEqual(x1, x2) self.assertEqual(y1, y2) self.assertNotEqual(x1, y1) random_strings = [np.random.bytes(ID_SIZE) for _ in range(256)] - object_ids1 = [photon.ObjectID(random_strings[i]) for i in range(256)] - object_ids2 = [photon.ObjectID(random_strings[i]) for i in range(256)] + object_ids1 = [local_scheduler.ObjectID(random_strings[i]) for i in range(256)] + object_ids2 = [local_scheduler.ObjectID(random_strings[i]) for i in range(256)] self.assertEqual(len(set(object_ids1)), 256) self.assertEqual(len(set(object_ids1 + object_ids2)), 256) self.assertEqual(set(object_ids1), set(object_ids2)) @@ -121,7 +121,7 @@ class TestTask(unittest.TestCase): self.assertEqual(num_return_vals, len(task.returns())) self.assertEqual(len(args), len(retrieved_args)) for i in range(len(retrieved_args)): - if isinstance(retrieved_args[i], photon.ObjectID): + if isinstance(retrieved_args[i], local_scheduler.ObjectID): self.assertEqual(retrieved_args[i].id(), args[i].id()) else: self.assertEqual(retrieved_args[i], args[i]) @@ -160,10 +160,10 @@ class TestTask(unittest.TestCase): ] for args in args_list: for num_return_vals in [0, 1, 2, 3, 5, 10, 100]: - task = photon.Task(driver_id, function_id, args, num_return_vals, parent_id, 0) + task = local_scheduler.Task(driver_id, function_id, args, num_return_vals, parent_id, 0) self.check_task(task, function_id, num_return_vals, args) - data = photon.task_to_string(task) - task2 = photon.task_from_string(data) + data = local_scheduler.task_to_string(task) + task2 = local_scheduler.task_from_string(data) self.check_task(task2, function_id, num_return_vals, args) if __name__ == "__main__": diff --git a/python/core/src/photon/__init__.py b/python/core/src/local_scheduler/__init__.py similarity index 100% rename from python/core/src/photon/__init__.py rename to python/core/src/local_scheduler/__init__.py diff --git a/python/global_scheduler/test/test.py b/python/global_scheduler/test/test.py index 6245abfb0..dac0064d9 100644 --- a/python/global_scheduler/test/test.py +++ b/python/global_scheduler/test/test.py @@ -14,7 +14,7 @@ import time import unittest import global_scheduler -import photon +import local_scheduler import plasma from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object @@ -40,16 +40,16 @@ DB_CLIENT_PREFIX = "CL:" TASK_PREFIX = "TT:" def random_driver_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_task_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_function_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_object_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def new_port(): return random.randint(10000, 65535) @@ -57,7 +57,7 @@ def new_port(): class TestGlobalScheduler(unittest.TestCase): def setUp(self): - # Start one Redis server and N pairs of (plasma, photon) + # Start one Redis server and N pairs of (plasma, local_scheduler) node_ip_address = "127.0.0.1" redis_port, self.redis_process = services.start_redis(cleanup=False) redis_address = services.address(node_ip_address, redis_port) @@ -69,7 +69,7 @@ class TestGlobalScheduler(unittest.TestCase): self.plasma_manager_pids = [] self.local_scheduler_pids = [] self.plasma_clients = [] - self.photon_clients = [] + self.local_scheduler_clients = [] for i in range(NUM_CLUSTER_NODES): # Start the Plasma store. Plasma store name is randomly generated. @@ -83,15 +83,15 @@ class TestGlobalScheduler(unittest.TestCase): plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name) self.plasma_clients.append(plasma_client) # Start the local scheduler. - local_scheduler_name, p4 = photon.start_local_scheduler( + local_scheduler_name, p4 = local_scheduler.start_local_scheduler( plasma_store_name, plasma_manager_name=plasma_manager_name, plasma_address=plasma_address, redis_address=redis_address, static_resource_list=[10, 0]) # Connect to the scheduler. - photon_client = photon.PhotonClient(local_scheduler_name, NIL_ACTOR_ID) - self.photon_clients.append(photon_client) + local_scheduler_client = local_scheduler.LocalSchedulerClient(local_scheduler_name, NIL_ACTOR_ID) + self.local_scheduler_clients.append(local_scheduler_client) self.local_scheduler_pids.append(p4) def tearDown(self): @@ -148,11 +148,11 @@ class TestGlobalScheduler(unittest.TestCase): return db_client_id def test_task_default_resources(self): - task1 = photon.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0) + task1 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0) self.assertEqual(task1.required_resources(), [1.0, 0.0]) - task2 = photon.Task(random_driver_id(), random_function_id(), - [random_object_id()], 0, random_task_id(), 0, - photon.ObjectID(NIL_ACTOR_ID), 0, [1.0, 2.0]) + task2 = local_scheduler.Task(random_driver_id(), random_function_id(), + [random_object_id()], 0, random_task_id(), 0, + local_scheduler.ObjectID(NIL_ACTOR_ID), 0, [1.0, 2.0]) self.assertEqual(task2.required_resources(), [1.0, 2.0]) def test_redis_only_single_task(self): @@ -161,7 +161,7 @@ class TestGlobalScheduler(unittest.TestCase): task state transitions in Redis only. TODO(atumanov): implement. """ # Check precondition for this test: - # There should be 2n+1 db clients: the global scheduler + one photon and one plasma per node. + # There should be 2n+1 db clients: the global scheduler + one local scheduler and one plasma per node. self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 2 * NUM_CLUSTER_NODES + 1) db_client_id = self.get_plasma_manager_id() @@ -182,11 +182,11 @@ class TestGlobalScheduler(unittest.TestCase): plasma_client = self.plasma_clients[0] object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True) - # Sleep before submitting task to photon. + # Sleep before submitting task to local scheduler. time.sleep(0.1) # Submit a task to Redis. - task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0) - self.photon_clients[0].submit(task) + task = local_scheduler.Task(random_driver_id(), random_function_id(), [local_scheduler.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0) + self.local_scheduler_clients[0].submit(task) time.sleep(0.1) # There should now be a task in Redis, and it should get assigned to the # local scheduler @@ -231,8 +231,8 @@ class TestGlobalScheduler(unittest.TestCase): if timesync: # Give 10ms for object info handler to fire (long enough to yield CPU). time.sleep(0.010) - task = photon.Task(random_driver_id(), random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0) - self.photon_clients[0].submit(task) + task = local_scheduler.Task(random_driver_id(), random_function_id(), [local_scheduler.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0) + self.local_scheduler_clients[0].submit(task) # Check that there are the correct number of tasks in Redis and that they # all get assigned to the local scheduler. num_retries = 10 diff --git a/python/photon/__init__.py b/python/local_scheduler/__init__.py similarity index 50% rename from python/photon/__init__.py rename to python/local_scheduler/__init__.py index b2f2a1b11..4cd8b6d03 100644 --- a/python/photon/__init__.py +++ b/python/local_scheduler/__init__.py @@ -2,5 +2,5 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from core.src.photon.libphoton import * -from .photon_services import * +from core.src.local_scheduler.liblocal_scheduler_library import * +from .local_scheduler_services import * diff --git a/python/photon/build/.gitkeep b/python/local_scheduler/build/.gitkeep similarity index 100% rename from python/photon/build/.gitkeep rename to python/local_scheduler/build/.gitkeep diff --git a/python/photon/photon_services.py b/python/local_scheduler/local_scheduler_services.py similarity index 98% rename from python/photon/photon_services.py rename to python/local_scheduler/local_scheduler_services.py index 9e2a856e8..f338b22b5 100644 --- a/python/photon/photon_services.py +++ b/python/local_scheduler/local_scheduler_services.py @@ -60,7 +60,7 @@ def start_local_scheduler(plasma_store_name, raise Exception("If one of the plasma_manager_name and the redis_address is provided, then both must be provided.") if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") - local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/photon/photon_scheduler") + local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/local_scheduler/local_scheduler") local_scheduler_name = "/tmp/scheduler{}".format(random_name()) command = [local_scheduler_executable, "-s", local_scheduler_name, diff --git a/python/photon/test/test.py b/python/local_scheduler/test/test.py similarity index 79% rename from python/photon/test/test.py rename to python/local_scheduler/test/test.py index 47adfe24a..6ae557acf 100644 --- a/python/photon/test/test.py +++ b/python/local_scheduler/test/test.py @@ -12,7 +12,7 @@ import threading import time import unittest -import photon +import local_scheduler import plasma USE_VALGRIND = False @@ -21,27 +21,27 @@ ID_SIZE = 20 NIL_ACTOR_ID = 20 * b"\xff" def random_object_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_driver_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_task_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) def random_function_id(): - return photon.ObjectID(np.random.bytes(ID_SIZE)) + return local_scheduler.ObjectID(np.random.bytes(ID_SIZE)) -class TestPhotonClient(unittest.TestCase): +class TestLocalSchedulerClient(unittest.TestCase): def setUp(self): # Start Plasma store. plasma_store_name, self.p1 = plasma.start_plasma_store() self.plasma_client = plasma.PlasmaClient(plasma_store_name) # Start a local scheduler. - scheduler_name, self.p2 = photon.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND) + scheduler_name, self.p2 = local_scheduler.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND) # Connect to the scheduler. - self.photon_client = photon.PhotonClient(scheduler_name, NIL_ACTOR_ID) + self.local_scheduler_client = local_scheduler.LocalSchedulerClient(scheduler_name, NIL_ACTOR_ID) def tearDown(self): # Check that the processes are still alive. @@ -99,18 +99,18 @@ class TestPhotonClient(unittest.TestCase): for args in args_list: for num_return_vals in [0, 1, 2, 3, 5, 10, 100]: - task = photon.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0) + task = local_scheduler.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0) # Submit a task. - self.photon_client.submit(task) + self.local_scheduler_client.submit(task) # Get the task. - new_task = self.photon_client.get_task() + new_task = self.local_scheduler_client.get_task() self.assertEqual(task.function_id().id(), new_task.function_id().id()) retrieved_args = new_task.arguments() returns = new_task.returns() self.assertEqual(len(args), len(retrieved_args)) self.assertEqual(num_return_vals, len(returns)) for i in range(len(retrieved_args)): - if isinstance(args[i], photon.ObjectID): + if isinstance(args[i], local_scheduler.ObjectID): self.assertEqual(args[i].id(), retrieved_args[i].id()) else: self.assertEqual(args[i], retrieved_args[i]) @@ -118,21 +118,21 @@ class TestPhotonClient(unittest.TestCase): # Submit all of the tasks. for args in args_list: for num_return_vals in [0, 1, 2, 3, 5, 10, 100]: - task = photon.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0) - self.photon_client.submit(task) + task = local_scheduler.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0) + self.local_scheduler_client.submit(task) # Get all of the tasks. for args in args_list: for num_return_vals in [0, 1, 2, 3, 5, 10, 100]: - new_task = self.photon_client.get_task() + new_task = self.local_scheduler_client.get_task() def test_scheduling_when_objects_ready(self): # Create a task and submit it. object_id = random_object_id() - task = photon.Task(random_driver_id(), random_function_id(), [object_id], 0, random_task_id(), 0) - self.photon_client.submit(task) + task = local_scheduler.Task(random_driver_id(), random_function_id(), [object_id], 0, random_task_id(), 0) + self.local_scheduler_client.submit(task) # Launch a thread to get the task. def get_task(): - self.photon_client.get_task() + self.local_scheduler_client.get_task() t = threading.Thread(target=get_task) t.start() # Sleep to give the thread time to call get_task. @@ -148,12 +148,12 @@ class TestPhotonClient(unittest.TestCase): # Create a task with two dependencies and submit it. object_id1 = random_object_id() object_id2 = random_object_id() - task = photon.Task(random_driver_id(), random_function_id(), [object_id1, object_id2], 0, random_task_id(), 0) - self.photon_client.submit(task) + task = local_scheduler.Task(random_driver_id(), random_function_id(), [object_id1, object_id2], 0, random_task_id(), 0) + self.local_scheduler_client.submit(task) # Launch a thread to get the task. def get_task(): - self.photon_client.get_task() + self.local_scheduler_client.get_task() t = threading.Thread(target=get_task) t.start() diff --git a/python/ray/actor.py b/python/ray/actor.py index 796650383..1088ce7e2 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -6,7 +6,7 @@ import hashlib import inspect import json import numpy as np -import photon +import local_scheduler import random import traceback @@ -30,7 +30,7 @@ def random_string(): return np.random.bytes(20) def random_actor_id(): - return photon.ObjectID(random_string()) + return local_scheduler.ObjectID(random_string()) def get_actor_method_function_id(attr): """Get the function ID corresponding to an actor method. @@ -45,13 +45,13 @@ def get_actor_method_function_id(attr): function_id_hash.update(attr.encode("ascii")) function_id = function_id_hash.digest() assert len(function_id) == 20 - return photon.ObjectID(function_id) + return local_scheduler.ObjectID(function_id) def fetch_and_register_actor(key, worker): """Import an actor.""" driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids, actor_method_names = \ worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids", "actor_method_names"]) - actor_id = photon.ObjectID(actor_id_str) + actor_id = local_scheduler.ObjectID(actor_id_str) actor_name = actor_name.decode("ascii") module = module.decode("ascii") actor_method_names = json.loads(actor_method_names.decode("ascii")) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 05ce00b59..1059e13fc 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -6,6 +6,6 @@ def get_local_schedulers(worker): local_schedulers = [] for client in worker.redis_client.keys("CL:*"): client_info = worker.redis_client.hgetall(client) - if client_info[b"client_type"] == b"photon": + if client_info[b"client_type"] == b"local_scheduler": local_schedulers.append(client_info) return local_schedulers diff --git a/python/ray/services.py b/python/ray/services.py index b66262e8a..5d3ad1ad4 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -17,7 +17,7 @@ import time import threading # Ray modules -import photon +import local_scheduler import plasma import global_scheduler @@ -43,7 +43,7 @@ all_processes = OrderedDict([(PROCESS_TYPE_WORKER, []), (PROCESS_TYPE_WEB_UI, [])]) # True if processes are run in the valgrind profiler. -RUN_PHOTON_PROFILER = False +RUN_LOCAL_SCHEDULER_PROFILER = False RUN_PLASMA_MANAGER_PROFILER = False RUN_PLASMA_STORE_PROFILER = False @@ -90,7 +90,7 @@ def kill_process(p): """ if p.poll() is not None: # process has already terminated return True - if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER: + if RUN_LOCAL_SCHEDULER_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER: os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data. time.sleep(0.1) # Wait for profiling data to be written. @@ -415,17 +415,18 @@ def start_local_scheduler(redis_address, if num_gpus is None: # By default, assume this node has no GPUs. num_gpus = 0 - local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, - plasma_manager_name, - worker_path=worker_path, - node_ip_address=node_ip_address, - redis_address=redis_address, - plasma_address=plasma_address, - use_profiler=RUN_PHOTON_PROFILER, - stdout_file=stdout_file, - stderr_file=stderr_file, - static_resource_list=[num_cpus, num_gpus], - num_workers=num_workers) + local_scheduler_name, p = local_scheduler.start_local_scheduler( + plasma_store_name, + plasma_manager_name, + worker_path=worker_path, + node_ip_address=node_ip_address, + redis_address=redis_address, + plasma_address=plasma_address, + use_profiler=RUN_LOCAL_SCHEDULER_PROFILER, + stdout_file=stdout_file, + stderr_file=stderr_file, + static_resource_list=[num_cpus, num_gpus], + num_workers=num_workers) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) return local_scheduler_name diff --git a/python/ray/worker.py b/python/ray/worker.py index 9823ba0cd..f43ae56bf 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -26,7 +26,7 @@ import ray.pickling as pickling import ray.serialization as serialization import ray.services as services import numbuf -import photon +import local_scheduler import plasma SCRIPT_MODE = 0 @@ -53,7 +53,7 @@ def random_string(): return np.random.bytes(20) def random_object_id(): - return photon.ObjectID(random_string()) + return local_scheduler.ObjectID(random_string()) class FunctionID(object): def __init__(self, function_id): @@ -478,7 +478,7 @@ class Worker(object): # until GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat. while len(unready_ids) > 0: for unready_id in unready_ids: - self.photon_client.reconstruct_object(unready_id) + self.local_scheduler_client.reconstruct_object(unready_id) # Do another fetch for objects that aren't available locally yet, in case # they were evicted since the last fetch. self.plasma_client.fetch(list(unready_ids.keys())) @@ -496,7 +496,7 @@ class Worker(object): # If there were objects that we weren't able to get locally, let the local # scheduler know that we're now unblocked. if was_blocked: - self.photon_client.notify_unblocked() + self.local_scheduler_client.notify_unblocked() # Unwrap the object from the list (it was wrapped put_object). assert len(final_results) == len(object_ids) @@ -504,7 +504,7 @@ class Worker(object): assert final_results[i][0] == object_ids[i].id() return [result[1][0] for result in final_results] - def submit_task(self, function_id, func_name, args, actor_id=photon.ObjectID(NIL_ACTOR_ID)): + def submit_task(self, function_id, func_name, args, actor_id=local_scheduler.ObjectID(NIL_ACTOR_ID)): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with name @@ -521,32 +521,33 @@ class Worker(object): check_main_thread() # Put large or complex arguments that are passed by value in the object # store first. - args_for_photon = [] + args_for_local_scheduler = [] for arg in args: - if isinstance(arg, photon.ObjectID): - args_for_photon.append(arg) - elif photon.check_simple_value(arg): - args_for_photon.append(arg) + if isinstance(arg, local_scheduler.ObjectID): + args_for_local_scheduler.append(arg) + elif local_scheduler.check_simple_value(arg): + args_for_local_scheduler.append(arg) else: - args_for_photon.append(put(arg)) + args_for_local_scheduler.append(put(arg)) # Look up the various function properties. num_return_vals, num_cpus, num_gpus = self.function_properties[self.task_driver_id.id()][function_id.id()] - # Submit the task to Photon. - task = photon.Task(self.task_driver_id, - photon.ObjectID(function_id.id()), - args_for_photon, - num_return_vals, - self.current_task_id, - self.task_index, - actor_id, self.actor_counters[actor_id], - [num_cpus, num_gpus]) + # Submit the task to local scheduler. + task = local_scheduler.Task( + self.task_driver_id, + local_scheduler.ObjectID(function_id.id()), + args_for_local_scheduler, + num_return_vals, + self.current_task_id, + self.task_index, + actor_id, self.actor_counters[actor_id], + [num_cpus, num_gpus]) # Increment the worker's task index to track how many tasks have been # submitted by the current task so far. self.task_index += 1 self.actor_counters[actor_id] += 1 - self.photon_client.submit(task) + self.local_scheduler_client.submit(task) return task.returns() @@ -691,8 +692,8 @@ def initialize_numbuf(worker=global_worker): contained_objectids.append(obj) return obj.id() def objectid_custom_deserializer(serialized_obj): - return photon.ObjectID(serialized_obj) - serialization.add_class_to_whitelist(photon.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer) + return local_scheduler.ObjectID(serialized_obj) + serialization.add_class_to_whitelist(local_scheduler.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer) if worker.mode in [SCRIPT_MODE, SILENT_MODE]: # These should only be called on the driver because register_class will @@ -721,7 +722,7 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address): if info[b"node_ip_address"].decode("ascii") == node_ip_address: if info[b"client_type"].decode("ascii") == "plasma_manager": plasma_managers.append(info) - elif info[b"client_type"].decode("ascii") == "photon": + elif info[b"client_type"].decode("ascii") == "local_scheduler": local_schedulers.append(info) # Make sure that we got at one plasma manager and local scheduler. assert len(plasma_managers) >= 1 @@ -945,8 +946,8 @@ def cleanup(worker=global_worker): clusters in the tests, but the import and exit only happen once. """ disconnect(worker) - if hasattr(worker, "photon_client"): - del worker.photon_client + if hasattr(worker, "local_scheduler_client"): + del worker.local_scheduler_client if hasattr(worker, "plasma_client"): worker.plasma_client.shutdown() @@ -1040,7 +1041,7 @@ def fetch_and_register_remote_function(key, worker=global_worker): "module", "num_cpus", "num_gpus"]) - function_id = photon.ObjectID(function_id_str) + function_id = local_scheduler.ObjectID(function_id_str) function_name = function_name.decode("ascii") num_return_vals = int(num_return_vals) num_cpus = int(num_cpus) @@ -1208,7 +1209,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a # Create an object store client. worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) # Create the local scheduler client. - worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"], worker.actor_id) + worker.local_scheduler_client = local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id) # Register the worker with Redis. if mode in [SCRIPT_MODE, SILENT_MODE]: # The concept of a driver is the same as the concept of a "job". Register @@ -1244,12 +1245,12 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a else: # Try to use true randomness. np.random.seed(None) - worker.current_task_id = photon.ObjectID(np.random.bytes(20)) + worker.current_task_id = local_scheduler.ObjectID(np.random.bytes(20)) # When tasks are executed on remote workers in the context of multiple # drivers, the task driver ID is used to keep track of which driver is # responsible for the task so that error messages will be propagated to the # correct driver. - worker.task_driver_id = photon.ObjectID(worker.worker_id) + worker.task_driver_id = local_scheduler.ObjectID(worker.worker_id) # Reset the state of the numpy random number generator. np.random.set_state(numpy_state) # Set other fields needed for computing task IDs. @@ -1411,7 +1412,7 @@ def flush_log(worker=global_worker): """Send the logged worker events to the global state store.""" event_log_key = b"event_log:" + worker.worker_id + b":" + worker.current_task_id.id() event_log_value = json.dumps(worker.events) - worker.photon_client.log_event(event_log_key, event_log_value) + worker.local_scheduler_client.log_event(event_log_key, event_log_value) worker.events = [] def get(object_ids, worker=global_worker): @@ -1466,7 +1467,7 @@ def put(value, worker=global_worker): if worker.mode == PYTHON_MODE: # In PYTHON_MODE, ray.put is the identity operation return value - object_id = photon.compute_put_id(worker.current_task_id, worker.put_index) + object_id = local_scheduler.compute_put_id(worker.current_task_id, worker.put_index) worker.put_object(object_id, value) worker.put_index += 1 return object_id @@ -1499,8 +1500,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): object_id_strs = [object_id.id() for object_id in object_ids] timeout = timeout if timeout is not None else 2 ** 30 ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs, timeout, num_returns) - ready_ids = [photon.ObjectID(object_id) for object_id in ready_ids] - remaining_ids = [photon.ObjectID(object_id) for object_id in remaining_ids] + ready_ids = [local_scheduler.ObjectID(object_id) for object_id in ready_ids] + remaining_ids = [local_scheduler.ObjectID(object_id) for object_id in remaining_ids] return ready_ids, remaining_ids def wait_for_function(function_id, driver_id, timeout=5, worker=global_worker): @@ -1660,7 +1661,7 @@ def main_loop(worker=global_worker): check_main_thread() while True: with log_span("ray:get_task", worker=worker): - task = worker.photon_client.get_task() + task = worker.local_scheduler_client.get_task() function_id = task.function_id() # Wait until the function to be executed has actually been registered on @@ -1927,7 +1928,7 @@ def get_arguments_for_execution(function_name, serialized_args, worker=global_wo """ arguments = [] for (i, arg) in enumerate(serialized_args): - if isinstance(arg, photon.ObjectID): + if isinstance(arg, local_scheduler.ObjectID): # get the object from the local object store argument = worker.get_object([arg])[0] if isinstance(argument, RayTaskError): @@ -1961,7 +1962,7 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker): function. """ for i in range(len(objectids)): - if isinstance(outputs[i], photon.ObjectID): + if isinstance(outputs[i], local_scheduler.ObjectID): raise Exception("This remote function returned an ObjectID as its {}th return value. This is not allowed.".format(i)) for i in range(len(objectids)): worker.put_object(objectids[i], outputs[i]) diff --git a/python/setup.py b/python/setup.py index 42640d441..0d746cb73 100644 --- a/python/setup.py +++ b/python/setup.py @@ -24,8 +24,8 @@ setup(name="ray", "src/plasma/plasma_store", "src/plasma/plasma_manager", "src/plasma/libplasma.so", - "src/photon/photon_scheduler", - "src/photon/libphoton.so", + "src/local_scheduler/local_scheduler", + "src/local_scheduler/liblocal_scheduler_library.so", "src/numbuf/libarrow.so", "src/numbuf/libnumbuf.so", "src/global_scheduler/global_scheduler"]}, diff --git a/scripts/stop_ray.sh b/scripts/stop_ray.sh index 4ef16d6dc..59765829c 100755 --- a/scripts/stop_ray.sh +++ b/scripts/stop_ray.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -killall global_scheduler plasma_store plasma_manager photon_scheduler +killall global_scheduler plasma_store plasma_manager local_scheduler # Find the PID of the Redis process and kill it. kill $(ps aux | grep redis-server | awk '{ print $2 }') 2> /dev/null diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 67b17f974..f6a27f653 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -36,7 +36,7 @@ function(define_test test_name library) add_executable(${test_name} test/${test_name}.c ${ARGN}) add_dependencies(${test_name} hiredis flatcc) target_link_libraries(${test_name} common ${FLATBUFFERS_STATIC_LIB} ${library}) - target_compile_options(${test_name} PUBLIC "-DPLASMA_TEST -DPHOTON_TEST -DCOMMON_TEST -DRAY_COMMON_LOG_LEVEL=4 -DRAY_TIMEOUT=50") + target_compile_options(${test_name} PUBLIC "-DPLASMA_TEST -DLOCAL_SCHEDULER_TEST -DCOMMON_TEST -DRAY_COMMON_LOG_LEVEL=4 -DRAY_TIMEOUT=50") endfunction() define_test(common_tests "") diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.c index 4d1963c5f..94368d32d 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.c @@ -83,7 +83,7 @@ PyObject *PyObjectID_make(ObjectID object_id) { * * This is called from Python like * - * task = photon.task_from_string("...") + * task = local_scheduler.task_from_string("...") * * @param task_string String representation of the task specification. * @return Python task specification object. @@ -112,7 +112,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) { * * This is called from Python like * - * s = photon.task_to_string(task) + * s = local_scheduler.task_to_string(task) * * @param task Ray task specification Python object. * @return String representing the task specification. diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.c index f2bb8d3ec..3a7378f25 100644 --- a/src/global_scheduler/global_scheduler.c +++ b/src/global_scheduler/global_scheduler.c @@ -83,17 +83,21 @@ void GlobalSchedulerState_free(GlobalSchedulerState *state) { db_disconnect(state->db); utarray_free(state->local_schedulers); GlobalSchedulerPolicyState_free(state->policy_state); - /* Delete the plasma to photon association map. */ - HASH_ITER(plasma_photon_hh, state->plasma_photon_map, entry, tmp) { - HASH_DELETE(plasma_photon_hh, state->plasma_photon_map, entry); - /* The hash entry is shared with the photon_plasma hashmap and will be freed - * there. */ + /* Delete the plasma to local scheduler association map. */ + HASH_ITER(plasma_local_scheduler_hh, state->plasma_local_scheduler_map, entry, + tmp) { + HASH_DELETE(plasma_local_scheduler_hh, state->plasma_local_scheduler_map, + entry); + /* The hash entry is shared with the local_scheduler_plasma hashmap and will + * be freed there. */ free(entry->aux_address); } - /* Delete the photon to plasma association map. */ - HASH_ITER(photon_plasma_hh, state->photon_plasma_map, entry, tmp) { - HASH_DELETE(photon_plasma_hh, state->photon_plasma_map, entry); + /* Delete the local scheduler to plasma association map. */ + HASH_ITER(local_scheduler_plasma_hh, state->local_scheduler_plasma_map, entry, + tmp) { + HASH_DELETE(local_scheduler_plasma_hh, state->local_scheduler_plasma_map, + entry); /* Now free the shared hash entry -- no longer needed. */ free(entry); } @@ -135,13 +139,13 @@ void signal_handler(int signal) { /* End of the cleanup code. */ LocalScheduler *get_local_scheduler(GlobalSchedulerState *state, - DBClientID photon_id) { + DBClientID local_scheduler_id) { LocalScheduler *local_scheduler_ptr; for (int i = 0; i < utarray_len(state->local_schedulers); ++i) { local_scheduler_ptr = (LocalScheduler *) utarray_eltptr(state->local_schedulers, i); - if (DBClientID_equal(local_scheduler_ptr->id, photon_id)) { - LOG_DEBUG("photon_id matched cached local scheduler entry."); + if (DBClientID_equal(local_scheduler_ptr->id, local_scheduler_id)) { + LOG_DEBUG("local_scheduler_id matched cached local scheduler entry."); return local_scheduler_ptr; } } @@ -176,32 +180,36 @@ void process_new_db_client(DBClientID db_client_id, LOG_DEBUG("db client table callback for db client = %s", ObjectID_to_string(db_client_id, id_string, ID_STRING_SIZE)); UNUSED(id_string); - if (strncmp(client_type, "photon", strlen("photon")) == 0) { - /* Add plasma_manager ip:port -> photon_db_client_id association to state. - */ - AuxAddressEntry *plasma_photon_entry = calloc(1, sizeof(AuxAddressEntry)); - plasma_photon_entry->aux_address = strdup(aux_address); - plasma_photon_entry->photon_db_client_id = db_client_id; - HASH_ADD_KEYPTR(plasma_photon_hh, state->plasma_photon_map, - plasma_photon_entry->aux_address, - strlen(plasma_photon_entry->aux_address), - plasma_photon_entry); + if (strncmp(client_type, "local_scheduler", strlen("local_scheduler")) == 0) { + /* Add plasma_manager ip:port -> local_scheduler_db_client_id association to + * state. */ + AuxAddressEntry *plasma_local_scheduler_entry = + calloc(1, sizeof(AuxAddressEntry)); + plasma_local_scheduler_entry->aux_address = strdup(aux_address); + plasma_local_scheduler_entry->local_scheduler_db_client_id = db_client_id; + HASH_ADD_KEYPTR(plasma_local_scheduler_hh, + state->plasma_local_scheduler_map, + plasma_local_scheduler_entry->aux_address, + strlen(plasma_local_scheduler_entry->aux_address), + plasma_local_scheduler_entry); - /* Add photon_db_client_id -> plasma_manager ip:port association to state. - */ - HASH_ADD(photon_plasma_hh, state->photon_plasma_map, photon_db_client_id, - sizeof(plasma_photon_entry->photon_db_client_id), - plasma_photon_entry); + /* Add local_scheduler_db_client_id -> plasma_manager ip:port association to + * state. */ + HASH_ADD(local_scheduler_plasma_hh, state->local_scheduler_plasma_map, + local_scheduler_db_client_id, + sizeof(plasma_local_scheduler_entry->local_scheduler_db_client_id), + plasma_local_scheduler_entry); #if (RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG) { - /* Print the photon to plasma association map so far. */ + /* Print the local scheduler to plasma association map so far. */ AuxAddressEntry *entry, *tmp; - LOG_DEBUG("Photon to Plasma hash map so far:"); - HASH_ITER(plasma_photon_hh, state->plasma_photon_map, entry, tmp) { + LOG_DEBUG("Local scheduler to plasma hash map so far:"); + HASH_ITER(plasma_local_scheduler_hh, state->plasma_local_scheduler_map, + entry, tmp) { LOG_DEBUG("%s -> %s", entry->aux_address, - ObjectID_to_string(entry->photon_db_client_id, id_string, - ID_STRING_SIZE)); + ObjectID_to_string(entry->local_scheduler_db_client_id, + id_string, ID_STRING_SIZE)); } } #endif diff --git a/src/global_scheduler/global_scheduler.h b/src/global_scheduler/global_scheduler.h index 04a25aff6..6bd0cc2bc 100644 --- a/src/global_scheduler/global_scheduler.h +++ b/src/global_scheduler/global_scheduler.h @@ -44,17 +44,17 @@ typedef struct { } SchedulerObjectInfo; /** - * A struct used for caching Photon to Plasma association. + * A struct used for caching local scheduler to Plasma association. */ typedef struct { /** IP:port string for the plasma_manager. */ char *aux_address; - /** Photon db client id. */ - DBClientID photon_db_client_id; - /** Plasma_manager ip:port -> photon_db_client_id. */ - UT_hash_handle plasma_photon_hh; - /** Photon_db_client_id -> plasma_manager ip:port. */ - UT_hash_handle photon_plasma_hh; + /** Local scheduler db client id. */ + DBClientID local_scheduler_db_client_id; + /** Plasma_manager ip:port -> local_scheduler_db_client_id. */ + UT_hash_handle plasma_local_scheduler_hh; + /** local_scheduler_db_client_id -> plasma_manager ip:port. */ + UT_hash_handle local_scheduler_plasma_hh; } AuxAddressEntry; /** @@ -71,10 +71,10 @@ typedef struct { UT_array *local_schedulers; /** The state managed by the scheduling policy. */ GlobalSchedulerPolicyState *policy_state; - /** The plasma_manager ip:port -> photon_db_client_id association. */ - AuxAddressEntry *plasma_photon_map; - /** The photon_db_client_id -> plasma_manager ip:port association. */ - AuxAddressEntry *photon_plasma_map; + /** The plasma_manager ip:port -> local_scheduler_db_client_id association. */ + AuxAddressEntry *plasma_local_scheduler_map; + /** The local_scheduler_db_client_id -> plasma_manager ip:port association. */ + AuxAddressEntry *local_scheduler_plasma_map; /** Objects cached by this global scheduler instance. */ SchedulerObjectInfo *scheduler_object_info_table; /** An array of tasks that haven't been scheduled yet. */ @@ -83,15 +83,15 @@ typedef struct { /** * This is a helper method to look up the local scheduler struct that - * corresponds to a particular photon_id. + * corresponds to a particular local_scheduler_id. * * @param state The state of the global scheduler. - * @param The photon_id of the local scheduler. + * @param The local_scheduler_id of the local scheduler. * @return The corresponding local scheduler struct. If the global scheduler is * not aware of the local scheduler, then this will be NULL. */ LocalScheduler *get_local_scheduler(GlobalSchedulerState *state, - DBClientID photon_id); + DBClientID local_scheduler_id); /** * Assign the given task to the local scheduler, update Redis and scheduler data diff --git a/src/global_scheduler/global_scheduler_algorithm.c b/src/global_scheduler/global_scheduler_algorithm.c index 2b06a3f2b..3ddccc1db 100644 --- a/src/global_scheduler/global_scheduler_algorithm.c +++ b/src/global_scheduler/global_scheduler_algorithm.c @@ -154,42 +154,48 @@ void free_object_size_hashmap(ObjectSizeEntry *object_size_table) { } } -DBClientID get_photon_id(GlobalSchedulerState *state, - const char *plasma_location) { +DBClientID get_local_scheduler_id(GlobalSchedulerState *state, + const char *plasma_location) { AuxAddressEntry *aux_entry = NULL; - DBClientID photon_id = NIL_ID; + DBClientID local_scheduler_id = NIL_ID; if (plasma_location != NULL) { LOG_DEBUG("max object size location found : %s", plasma_location); - /* Lookup association of plasma location to photon. */ - HASH_FIND(plasma_photon_hh, state->plasma_photon_map, plasma_location, - uthash_strlen(plasma_location), aux_entry); + /* Lookup association of plasma location to local scheduler. */ + HASH_FIND(plasma_local_scheduler_hh, state->plasma_local_scheduler_map, + plasma_location, uthash_strlen(plasma_location), aux_entry); if (aux_entry) { - LOG_DEBUG("found photon db client association for plasma ip:port = %s", - aux_entry->aux_address); - /* Plasma to photon db client ID association found, get photon ID. */ - photon_id = aux_entry->photon_db_client_id; + LOG_DEBUG( + "found local scheduler db client association for plasma ip:port = %s", + aux_entry->aux_address); + /* Plasma to local scheduler db client ID association found, get local + * scheduler ID. */ + local_scheduler_id = aux_entry->local_scheduler_db_client_id; } else { - LOG_ERROR("photon db client association not found for plasma ip:port=%s", - plasma_location); + LOG_ERROR( + "local scheduler db client association not found for plasma " + "ip:port=%s", + plasma_location); } } char id_string[ID_STRING_SIZE]; - LOG_DEBUG("photon ID found = %s", - ObjectID_to_string(photon_id, id_string, ID_STRING_SIZE)); + LOG_DEBUG("local scheduler ID found = %s", + ObjectID_to_string(local_scheduler_id, id_string, ID_STRING_SIZE)); UNUSED(id_string); - if (IS_NIL_ID(photon_id)) { - return photon_id; + if (IS_NIL_ID(local_scheduler_id)) { + return local_scheduler_id; } - /* Check to make sure this photon_db_client_id matches one of the + /* Check to make sure this local_scheduler_db_client_id matches one of the * schedulers. */ - LocalScheduler *local_scheduler_ptr = get_local_scheduler(state, photon_id); + LocalScheduler *local_scheduler_ptr = + get_local_scheduler(state, local_scheduler_id); if (local_scheduler_ptr == NULL) { - LOG_WARN("photon_id didn't match any cached local scheduler entries"); + LOG_WARN( + "local_scheduler_id didn't match any cached local scheduler entries"); } - return photon_id; + return local_scheduler_id; } double inner_product(double a[], double b[], int size) { @@ -210,22 +216,25 @@ double calculate_object_size_fraction(GlobalSchedulerState *state, double object_size_fraction = 0; if (total_task_object_size > 0) { /* Does this node contribute anything to this task object size? */ - /* Lookup scheduler->id in photon_plasma_map to get plasma aux address, - * which is used as the key for object_size_table. - * This uses the plasma aux address to locate the object_size this node - * contributes. */ - AuxAddressEntry *photon_plasma_pair = NULL; - HASH_FIND(photon_plasma_hh, state->photon_plasma_map, &(scheduler->id), - sizeof(scheduler->id), photon_plasma_pair); - if (photon_plasma_pair != NULL) { + /* Lookup scheduler->id in local_scheduler_plasma_map to get plasma aux + * address, which is used as the key for object_size_table. This uses the + * plasma aux address to locate the object_size this node contributes. */ + AuxAddressEntry *local_scheduler_plasma_pair = NULL; + HASH_FIND(local_scheduler_plasma_hh, state->local_scheduler_plasma_map, + &(scheduler->id), sizeof(scheduler->id), + local_scheduler_plasma_pair); + if (local_scheduler_plasma_pair != NULL) { ObjectSizeEntry *s = NULL; - /* Found this node's photon to plasma mapping. Use the corresponding - * plasma key to see if this node has any cached objects for this task. */ - HASH_FIND_STR(object_size_table, photon_plasma_pair->aux_address, s); + /* Found this node's local scheduler to plasma mapping. Use the + * corresponding plasma key to see if this node has any cached objects for + * this task. */ + HASH_FIND_STR(object_size_table, local_scheduler_plasma_pair->aux_address, + s); if (s != NULL) { /* This node has some of this task's objects. Calculate what fraction. */ - CHECK(strcmp(s->object_location, photon_plasma_pair->aux_address) == 0); + CHECK(strcmp(s->object_location, + local_scheduler_plasma_pair->aux_address) == 0); object_size_fraction = MIN(1, (double) (s->total_object_size) / total_task_object_size); } @@ -286,9 +295,10 @@ bool handle_task_waiting(GlobalSchedulerState *state, /* Go through all the nodes, calculate the score for each, pick max score. */ LocalScheduler *scheduler = NULL; - double best_photon_score = INT32_MIN; - CHECKM(best_photon_score < 0, "We might have a floating point underflow"); - DBClientID best_photon_id = NIL_ID; /* best node to send this task */ + double best_local_scheduler_score = INT32_MIN; + CHECKM(best_local_scheduler_score < 0, + "We might have a floating point underflow"); + DBClientID best_local_scheduler_id = NIL_ID; /* best node to send this task */ for (scheduler = (LocalScheduler *) utarray_front(state->local_schedulers); scheduler != NULL; scheduler = (LocalScheduler *) utarray_next( state->local_schedulers, scheduler)) { @@ -300,9 +310,9 @@ bool handle_task_waiting(GlobalSchedulerState *state, task_feasible = true; /* This node satisfies the hard capacity constraint. Calculate its score. */ double score = -1 * calculate_cost_pending(state, scheduler); - if (score > best_photon_score) { - best_photon_score = score; - best_photon_id = scheduler->id; + if (score > best_local_scheduler_score) { + best_local_scheduler_score = score; + best_local_scheduler_id = scheduler->id; } } /* For each local scheduler. */ @@ -317,10 +327,10 @@ bool handle_task_waiting(GlobalSchedulerState *state, * cache the task in case new local schedulers satisfy it in the future. */ return false; } - CHECKM(!IS_NIL_ID(best_photon_id), + CHECKM(!IS_NIL_ID(best_local_scheduler_id), "Task is feasible, but doesn't have a local scheduler assigned."); /* A local scheduler ID was found, so assign the task. */ - assign_task_to_local_scheduler(state, task, best_photon_id); + assign_task_to_local_scheduler(state, task, best_local_scheduler_id); return true; } diff --git a/src/local_scheduler/CMakeLists.txt b/src/local_scheduler/CMakeLists.txt new file mode 100644 index 000000000..22c066077 --- /dev/null +++ b/src/local_scheduler/CMakeLists.txt @@ -0,0 +1,45 @@ +cmake_minimum_required(VERSION 2.8) + +project(local_scheduler) + +# Recursively include common +include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) + +if(APPLE) + SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") +endif(APPLE) + +include_directories("${PYTHON_INCLUDE_DIRS}") + +# set(CMAKE_C_FLAGS "${CMAKE_CXX_FLAGS} --std=c99 -Werror") + +if(UNIX AND NOT APPLE) + link_libraries(rt) +endif() + +include_directories("${CMAKE_CURRENT_LIST_DIR}/") +include_directories("${CMAKE_CURRENT_LIST_DIR}/../") +include_directories("${CMAKE_CURRENT_LIST_DIR}/../plasma/") + +add_library(local_scheduler_library SHARED + local_scheduler_extension.c + ../common/lib/python/common_extension.c) + +get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME) +if(APPLE) + add_custom_command(TARGET local_scheduler_library + POST_BUILD COMMAND ${CMAKE_INSTALL_NAME_TOOL} -change ${PYTHON_SHARED_LIBRARY} ${PYTHON_LIBRARIES} liblocal_scheduler_library.so) +endif(APPLE) + +add_library(local_scheduler_client STATIC local_scheduler_client.c) + +target_link_libraries(local_scheduler_library local_scheduler_client ${COMMON_LIB} ${PYTHON_LIBRARIES}) + +add_executable(local_scheduler local_scheduler.c local_scheduler_algorithm.c) +target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} plasma_lib) + +add_executable(local_scheduler_tests test/local_scheduler_tests.c local_scheduler.c local_scheduler_algorithm.c ) +target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} plasma_lib) +target_compile_options(local_scheduler_tests PUBLIC "-DLOCAL_SCHEDULER_TEST") + +install(TARGETS local_scheduler_library DESTINATION ${CMAKE_SOURCE_DIR}/local_scheduler) diff --git a/src/photon/build/.gitkeep b/src/local_scheduler/build/.gitkeep similarity index 100% rename from src/photon/build/.gitkeep rename to src/local_scheduler/build/.gitkeep diff --git a/src/photon/photon_scheduler.c b/src/local_scheduler/local_scheduler.c similarity index 98% rename from src/photon/photon_scheduler.c rename to src/local_scheduler/local_scheduler.c index 563b33831..a588a8b21 100644 --- a/src/photon/photon_scheduler.c +++ b/src/local_scheduler/local_scheduler.c @@ -12,9 +12,9 @@ #include "io.h" #include "logging.h" #include "object_info.h" -#include "photon.h" -#include "photon_scheduler.h" -#include "photon_algorithm.h" +#include "local_scheduler_shared.h" +#include "local_scheduler.h" +#include "local_scheduler_algorithm.h" #include "state/actor_notification_table.h" #include "state/db.h" #include "state/task_table.h" @@ -361,8 +361,8 @@ LocalSchedulerState *LocalSchedulerState_init( db_connect_args[4] = "num_gpus"; db_connect_args[5] = utstring_body(num_gpus); } - state->db = db_connect(redis_addr, redis_port, "photon", node_ip_address, - num_args, db_connect_args); + state->db = db_connect(redis_addr, redis_port, "local_scheduler", + node_ip_address, num_args, db_connect_args); utstring_free(num_cpus); utstring_free(num_gpus); free(db_connect_args); @@ -417,7 +417,7 @@ void update_dynamic_resources(LocalSchedulerState *state, if (!return_resources && state->dynamic_resources[i] < 0) { /* We are using more resources than we have been allocated. */ - LOG_WARN("photon dynamic resources dropped to %8.4f\t%8.4f\n", + LOG_WARN("local_scheduler dynamic resources dropped to %8.4f\t%8.4f\n", state->dynamic_resources[0], state->dynamic_resources[1]); } CHECK(state->dynamic_resources[i] <= state->static_resources[i]); @@ -911,7 +911,7 @@ void start_server(const char *node_ip_address, /* Only declare the main function if we are not in testing mode, since the test * suite has its own declaration of main. */ -#ifndef PHOTON_TEST +#ifndef LOCAL_SCHEDULER_TEST int main(int argc, char *argv[]) { signal(SIGTERM, signal_handler); /* Path of the listening socket of the local scheduler. */ @@ -922,7 +922,8 @@ int main(int argc, char *argv[]) { char *plasma_store_socket_name = NULL; /* Socket name for the local Plasma manager. */ char *plasma_manager_socket_name = NULL; - /* Address for the plasma manager associated with this Photon instance. */ + /* Address for the plasma manager associated with this local scheduler + * instance. */ char *plasma_manager_address = NULL; /* The IP address of the node that this local scheduler is running on. */ char *node_ip_address = NULL; diff --git a/src/photon/photon_scheduler.h b/src/local_scheduler/local_scheduler.h similarity index 97% rename from src/photon/photon_scheduler.h rename to src/local_scheduler/local_scheduler.h index 25a8dc35f..443cdb297 100644 --- a/src/photon/photon_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -1,5 +1,5 @@ -#ifndef PHOTON_SCHEDULER_H -#define PHOTON_SCHEDULER_H +#ifndef LOCAL_SCHEDULER_H +#define LOCAL_SCHEDULER_H #include "task.h" #include "event_loop.h" @@ -110,7 +110,7 @@ void update_dynamic_resources(LocalSchedulerState *state, bool return_resources); /** The following methods are for testing purposes only. */ -#ifdef PHOTON_TEST +#ifdef LOCAL_SCHEDULER_TEST LocalSchedulerState *LocalSchedulerState_init( const char *node_ip_address, event_loop *loop, @@ -136,4 +136,4 @@ void process_message(event_loop *loop, #endif -#endif /* PHOTON_SCHEDULER_H */ +#endif /* LOCAL_SCHEDULER_H */ diff --git a/src/photon/photon_algorithm.c b/src/local_scheduler/local_scheduler_algorithm.c similarity index 99% rename from src/photon/photon_algorithm.c rename to src/local_scheduler/local_scheduler_algorithm.c index f2d1c57e4..75760a363 100644 --- a/src/photon/photon_algorithm.c +++ b/src/local_scheduler/local_scheduler_algorithm.c @@ -1,4 +1,4 @@ -#include "photon_algorithm.h" +#include "local_scheduler_algorithm.h" #include #include "utarray.h" @@ -7,8 +7,8 @@ #include "state/task_table.h" #include "state/local_scheduler_table.h" #include "state/object_table.h" -#include "photon.h" -#include "photon_scheduler.h" +#include "local_scheduler_shared.h" +#include "local_scheduler.h" #include "common/task.h" /* Declared for convenience. */ @@ -65,7 +65,8 @@ typedef struct { UT_hash_handle hh; } LocalActorInfo; -/** Part of the photon state that is maintained by the scheduling algorithm. */ +/** Part of the local scheduler state that is maintained by the scheduling + * algorithm. */ struct SchedulingAlgorithmState { /** An array of pointers to tasks that are waiting for dependencies. */ task_queue_entry *waiting_task_queue; diff --git a/src/photon/photon_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h similarity index 98% rename from src/photon/photon_algorithm.h rename to src/local_scheduler/local_scheduler_algorithm.h index ede77a123..de4225fe7 100644 --- a/src/photon/photon_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -1,7 +1,7 @@ -#ifndef PHOTON_ALGORITHM_H -#define PHOTON_ALGORITHM_H +#ifndef LOCAL_SCHEDULER_ALGORITHM_H +#define LOCAL_SCHEDULER_ALGORITHM_H -#include "photon.h" +#include "local_scheduler_shared.h" #include "common/task.h" #include "state/local_scheduler_table.h" @@ -250,7 +250,7 @@ void print_worker_info(const char *message, SchedulingAlgorithmState *algorithm_state); /** The following methods are for testing purposes only. */ -#ifdef PHOTON_TEST +#ifdef LOCAL_SCHEDULER_TEST /** * Get the number of tasks currently waiting for object dependencies to become * available locally. @@ -269,4 +269,4 @@ int num_waiting_tasks(SchedulingAlgorithmState *algorithm_state); int num_dispatch_tasks(SchedulingAlgorithmState *algorithm_state); #endif -#endif /* PHOTON_ALGORITHM_H */ +#endif /* LOCAL_SCHEDULER_ALGORITHM_H */ diff --git a/src/photon/photon_client.c b/src/local_scheduler/local_scheduler_client.c similarity index 62% rename from src/photon/photon_client.c rename to src/local_scheduler/local_scheduler_client.c index 6adf736e2..cd550418e 100644 --- a/src/photon/photon_client.c +++ b/src/local_scheduler/local_scheduler_client.c @@ -1,14 +1,15 @@ -#include "photon_client.h" +#include "local_scheduler_client.h" #include "common/io.h" #include "common/task.h" #include -PhotonConnection *PhotonConnection_init(const char *photon_socket, - ActorID actor_id) { - PhotonConnection *result = - (PhotonConnection *) malloc(sizeof(PhotonConnection)); - result->conn = connect_ipc_sock_retry(photon_socket, -1, -1); +LocalSchedulerConnection *LocalSchedulerConnection_init( + const char *local_scheduler_socket, + ActorID actor_id) { + LocalSchedulerConnection *result = + (LocalSchedulerConnection *) malloc(sizeof(LocalSchedulerConnection)); + result->conn = connect_ipc_sock_retry(local_scheduler_socket, -1, -1); register_worker_info info; memset(&info, 0, sizeof(info)); /* Register the process ID with the local scheduler. */ @@ -20,16 +21,16 @@ PhotonConnection *PhotonConnection_init(const char *photon_socket, return result; } -void PhotonConnection_free(PhotonConnection *conn) { +void LocalSchedulerConnection_free(LocalSchedulerConnection *conn) { close(conn->conn); free(conn); } -void photon_log_event(PhotonConnection *conn, - uint8_t *key, - int64_t key_length, - uint8_t *value, - int64_t value_length) { +void local_scheduler_log_event(LocalSchedulerConnection *conn, + uint8_t *key, + int64_t key_length, + uint8_t *value, + int64_t value_length) { int64_t message_length = sizeof(key_length) + sizeof(value_length) + key_length + value_length; uint8_t *message = (uint8_t *) malloc(message_length); @@ -47,12 +48,12 @@ void photon_log_event(PhotonConnection *conn, free(message); } -void photon_submit(PhotonConnection *conn, task_spec *task) { +void local_scheduler_submit(LocalSchedulerConnection *conn, task_spec *task) { write_message(conn->conn, SUBMIT_TASK, task_spec_size(task), (uint8_t *) task); } -task_spec *photon_get_task(PhotonConnection *conn) { +task_spec *local_scheduler_get_task(LocalSchedulerConnection *conn) { write_message(conn->conn, GET_TASK, 0, NULL); int64_t type; int64_t length; @@ -66,19 +67,20 @@ task_spec *photon_get_task(PhotonConnection *conn) { return task; } -void photon_task_done(PhotonConnection *conn) { +void local_scheduler_task_done(LocalSchedulerConnection *conn) { write_message(conn->conn, TASK_DONE, 0, NULL); } -void photon_reconstruct_object(PhotonConnection *conn, ObjectID object_id) { +void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn, + ObjectID object_id) { write_message(conn->conn, RECONSTRUCT_OBJECT, sizeof(object_id), (uint8_t *) &object_id); } -void photon_log_message(PhotonConnection *conn) { +void local_scheduler_log_message(LocalSchedulerConnection *conn) { write_message(conn->conn, LOG_MESSAGE, 0, NULL); } -void photon_notify_unblocked(PhotonConnection *conn) { +void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn) { write_message(conn->conn, NOTIFY_UNBLOCKED, 0, NULL); } diff --git a/src/photon/photon_client.h b/src/local_scheduler/local_scheduler_client.h similarity index 58% rename from src/photon/photon_client.h rename to src/local_scheduler/local_scheduler_client.h index 86fe19f72..12dd7744a 100644 --- a/src/photon/photon_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -1,33 +1,36 @@ -#ifndef PHOTON_CLIENT_H -#define PHOTON_CLIENT_H +#ifndef LOCAL_SCHEDULER_CLIENT_H +#define LOCAL_SCHEDULER_CLIENT_H #include "common/task.h" -#include "photon.h" +#include "local_scheduler_shared.h" typedef struct { - /* File descriptor of the Unix domain socket that connects to photon. */ + /** File descriptor of the Unix domain socket that connects to local + * scheduler. */ int conn; -} PhotonConnection; +} LocalSchedulerConnection; /** * Connect to the local scheduler. * - * @param photon_socket The name of the socket to use to connect to the local - * scheduler. + * @param local_scheduler_socket The name of the socket to use to connect to the + * local scheduler. * @param actor_id The ID of the actor running on this worker. If no actor is * running on this actor, this should be NIL_ACTOR_ID. * @return The connection information. */ -PhotonConnection *PhotonConnection_init(const char *photon_socket, - ActorID actor_id); +LocalSchedulerConnection *LocalSchedulerConnection_init( + const char *local_scheduler_socket, + ActorID actor_id); /** * Disconnect from the local scheduler. * - * @param conn Photon connection information returned by PhotonConnection_init. + * @param conn Local scheduler connection information returned by + * LocalSchedulerConnection_init. * @return Void. */ -void PhotonConnection_free(PhotonConnection *conn); +void LocalSchedulerConnection_free(LocalSchedulerConnection *conn); /** * Submit a task to the local scheduler. @@ -36,7 +39,7 @@ void PhotonConnection_free(PhotonConnection *conn); * @param task The address of the task to submit. * @return Void. */ -void photon_submit(PhotonConnection *conn, task_spec *task); +void local_scheduler_submit(LocalSchedulerConnection *conn, task_spec *task); /** * Log an event to the event log. This will call RPUSH key value. We use RPUSH @@ -51,11 +54,11 @@ void photon_submit(PhotonConnection *conn, task_spec *task); * @param value_length The length of the value. * @return Void. */ -void photon_log_event(PhotonConnection *conn, - uint8_t *key, - int64_t key_length, - uint8_t *value, - int64_t value_length); +void local_scheduler_log_event(LocalSchedulerConnection *conn, + uint8_t *key, + int64_t key_length, + uint8_t *value, + int64_t value_length); /** * Get next task for this client. This will block until the scheduler assigns @@ -67,7 +70,7 @@ void photon_log_event(PhotonConnection *conn, * @param conn The connection information. * @return The address of the assigned task. */ -task_spec *photon_get_task(PhotonConnection *conn); +task_spec *local_scheduler_get_task(LocalSchedulerConnection *conn); /** * Tell the local scheduler that the client has finished executing a task. @@ -75,7 +78,7 @@ task_spec *photon_get_task(PhotonConnection *conn); * @param conn The connection information. * @return Void. */ -void photon_task_done(PhotonConnection *conn); +void local_scheduler_task_done(LocalSchedulerConnection *conn); /** * Tell the local scheduler to reconstruct an object. @@ -84,7 +87,8 @@ void photon_task_done(PhotonConnection *conn); * @param object_id The ID of the object to reconstruct. * @return Void. */ -void photon_reconstruct_object(PhotonConnection *conn, ObjectID object_id); +void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn, + ObjectID object_id); /** * Send a log message to the local scheduler. @@ -92,7 +96,7 @@ void photon_reconstruct_object(PhotonConnection *conn, ObjectID object_id); * @param conn The connection information. * @return Void. */ -void photon_log_message(PhotonConnection *conn); +void local_scheduler_log_message(LocalSchedulerConnection *conn); /** * Notify the local scheduler that this client (worker) is no longer blocked. @@ -100,6 +104,6 @@ void photon_log_message(PhotonConnection *conn); * @param conn The connection information. * @return Void. */ -void photon_notify_unblocked(PhotonConnection *conn); +void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn); #endif diff --git a/src/local_scheduler/local_scheduler_extension.c b/src/local_scheduler/local_scheduler_extension.c new file mode 100644 index 000000000..168d41d74 --- /dev/null +++ b/src/local_scheduler/local_scheduler_extension.c @@ -0,0 +1,234 @@ +#include + +#include "common_extension.h" +#include "local_scheduler_client.h" +#include "task.h" + +PyObject *LocalSchedulerError; + +// clang-format off +typedef struct { + PyObject_HEAD + LocalSchedulerConnection *local_scheduler_connection; +} PyLocalSchedulerClient; +// clang-format on + +static int PyLocalSchedulerClient_init(PyLocalSchedulerClient *self, + PyObject *args, + PyObject *kwds) { + char *socket_name; + ActorID actor_id; + if (!PyArg_ParseTuple(args, "sO&", &socket_name, PyStringToUniqueID, + &actor_id)) { + return -1; + } + /* Connect to the local scheduler. */ + self->local_scheduler_connection = + LocalSchedulerConnection_init(socket_name, actor_id); + return 0; +} + +static void PyLocalSchedulerClient_dealloc(PyLocalSchedulerClient *self) { + LocalSchedulerConnection_free( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection); + Py_TYPE(self)->tp_free((PyObject *) self); +} + +static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) { + PyObject *py_task; + if (!PyArg_ParseTuple(args, "O", &py_task)) { + return NULL; + } + local_scheduler_submit( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection, + ((PyTask *) py_task)->spec); + Py_RETURN_NONE; +} + +// clang-format off +static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) { + task_spec *task_spec; + /* Drop the global interpreter lock while we get a task because + * local_scheduler_get_task may block for a long time. */ + Py_BEGIN_ALLOW_THREADS + task_spec = local_scheduler_get_task( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection); + Py_END_ALLOW_THREADS + return PyTask_make(task_spec); +} +// clang-format on + +static PyObject *PyLocalSchedulerClient_reconstruct_object(PyObject *self, + PyObject *args) { + ObjectID object_id; + if (!PyArg_ParseTuple(args, "O&", PyStringToUniqueID, &object_id)) { + return NULL; + } + local_scheduler_reconstruct_object( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection, object_id); + Py_RETURN_NONE; +} + +static PyObject *PyLocalSchedulerClient_log_event(PyObject *self, + PyObject *args) { + const char *key; + int key_length; + const char *value; + int value_length; + if (!PyArg_ParseTuple(args, "s#s#", &key, &key_length, &value, + &value_length)) { + return NULL; + } + local_scheduler_log_event( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection, + (uint8_t *) key, key_length, (uint8_t *) value, value_length); + Py_RETURN_NONE; +} + +static PyObject *PyLocalSchedulerClient_notify_unblocked(PyObject *self) { + local_scheduler_notify_unblocked( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection); + Py_RETURN_NONE; +} + +static PyMethodDef PyLocalSchedulerClient_methods[] = { + {"submit", (PyCFunction) PyLocalSchedulerClient_submit, METH_VARARGS, + "Submit a task to the local scheduler."}, + {"get_task", (PyCFunction) PyLocalSchedulerClient_get_task, METH_NOARGS, + "Get a task from the local scheduler."}, + {"reconstruct_object", + (PyCFunction) PyLocalSchedulerClient_reconstruct_object, METH_VARARGS, + "Ask the local scheduler to reconstruct an object."}, + {"log_event", (PyCFunction) PyLocalSchedulerClient_log_event, METH_VARARGS, + "Log an event to the event log through the local scheduler."}, + {"notify_unblocked", (PyCFunction) PyLocalSchedulerClient_notify_unblocked, + METH_NOARGS, "Notify the local scheduler that we are unblocked."}, + {NULL} /* Sentinel */ +}; + +static PyTypeObject PyLocalSchedulerClientType = { + PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ + "local_scheduler.LocalSchedulerClient", /* tp_name */ + sizeof(PyLocalSchedulerClient), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor) PyLocalSchedulerClient_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "LocalSchedulerClient object", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + PyLocalSchedulerClient_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc) PyLocalSchedulerClient_init, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ +}; + +static PyMethodDef local_scheduler_methods[] = { + {"check_simple_value", check_simple_value, METH_VARARGS, + "Should the object be passed by value?"}, + {"compute_put_id", compute_put_id, METH_VARARGS, + "Return the object ID for a put call within a task."}, + {"task_from_string", PyTask_from_string, METH_VARARGS, + "Creates a Python PyTask object from a string representation of " + "task_spec."}, + {"task_to_string", PyTask_to_string, METH_VARARGS, + "Translates a PyTask python object to a byte string."}, + {NULL} /* Sentinel */ +}; + +#if PY_MAJOR_VERSION >= 3 +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + "liblocal_scheduler", /* m_name */ + "A module for the local scheduler.", /* m_doc */ + 0, /* m_size */ + local_scheduler_methods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; +#endif + +#if PY_MAJOR_VERSION >= 3 +#define INITERROR return NULL +#else +#define INITERROR return +#endif + +#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ +#define PyMODINIT_FUNC void +#endif + +#if PY_MAJOR_VERSION >= 3 +#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) +#else +#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) +#endif + +MOD_INIT(liblocal_scheduler_library) { + if (PyType_Ready(&PyTaskType) < 0) { + INITERROR; + } + + if (PyType_Ready(&PyObjectIDType) < 0) { + INITERROR; + } + + if (PyType_Ready(&PyLocalSchedulerClientType) < 0) { + INITERROR; + } + +#if PY_MAJOR_VERSION >= 3 + PyObject *m = PyModule_Create(&moduledef); +#else + PyObject *m = + Py_InitModule3("liblocal_scheduler_library", local_scheduler_methods, + "A module for the local scheduler."); +#endif + + init_pickle_module(); + + Py_INCREF(&PyTaskType); + PyModule_AddObject(m, "Task", (PyObject *) &PyTaskType); + + Py_INCREF(&PyObjectIDType); + PyModule_AddObject(m, "ObjectID", (PyObject *) &PyObjectIDType); + + Py_INCREF(&PyLocalSchedulerClientType); + PyModule_AddObject(m, "LocalSchedulerClient", + (PyObject *) &PyLocalSchedulerClientType); + + char local_scheduler_error[] = "local_scheduler.error"; + LocalSchedulerError = PyErr_NewException(local_scheduler_error, NULL, NULL); + Py_INCREF(LocalSchedulerError); + PyModule_AddObject(m, "local_scheduler_error", LocalSchedulerError); + +#if PY_MAJOR_VERSION >= 3 + return m; +#endif +} diff --git a/src/photon/photon.h b/src/local_scheduler/local_scheduler_shared.h similarity index 95% rename from src/photon/photon.h rename to src/local_scheduler/local_scheduler_shared.h index ca80c4600..cf117b64b 100644 --- a/src/photon/photon.h +++ b/src/local_scheduler/local_scheduler_shared.h @@ -1,5 +1,5 @@ -#ifndef PHOTON_H -#define PHOTON_H +#ifndef LOCAL_SCHEDULER_SHARED_H +#define LOCAL_SCHEDULER_SHARED_H #include "common/task.h" #include "common/state/table.h" @@ -8,7 +8,7 @@ #include "utarray.h" #include "uthash.h" -enum photon_message_type { +enum local_scheduler_message_type { /** Notify the local scheduler that a task has finished. */ TASK_DONE = 64, /** Get a new task from the local scheduler. */ @@ -34,7 +34,7 @@ UT_icd workers_icd; UT_icd pid_t_icd; /** This struct is used to register a new worker with the local scheduler. - * It is shipped as part of photon_connect */ + * It is shipped as part of local_scheduler_connect */ typedef struct { /** The ID of the actor. This is NIL_ACTOR_ID if the worker is not an actor. */ @@ -123,4 +123,4 @@ typedef struct { LocalSchedulerState *local_scheduler_state; } LocalSchedulerClient; -#endif /* PHOTON_H */ +#endif /* LOCAL_SCHEDULER_SHARED_H */ diff --git a/src/photon/test/photon_tests.c b/src/local_scheduler/test/local_scheduler_tests.c similarity index 67% rename from src/photon/test/photon_tests.c rename to src/local_scheduler/test/local_scheduler_tests.c index 70f341cc4..9c33a28ab 100644 --- a/src/photon/test/photon_tests.c +++ b/src/local_scheduler/test/local_scheduler_tests.c @@ -16,16 +16,17 @@ #include "state/object_table.h" #include "state/task_table.h" -#include "photon.h" -#include "photon_scheduler.h" -#include "photon_algorithm.h" -#include "photon_client.h" +#include "local_scheduler_shared.h" +#include "local_scheduler.h" +#include "local_scheduler_algorithm.h" +#include "local_scheduler_client.h" -SUITE(photon_tests); +SUITE(local_scheduler_tests); const char *plasma_store_socket_name = "/tmp/plasma_store_socket_1"; const char *plasma_manager_socket_name_format = "/tmp/plasma_manager_socket_%d"; -const char *photon_socket_name_format = "/tmp/photon_socket_%d"; +const char *local_scheduler_socket_name_format = + "/tmp/local_scheduler_socket_%d"; int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { event_loop_stop(loop); @@ -38,35 +39,36 @@ typedef struct { int plasma_manager_fd; /** A socket to communicate with the Plasma store. */ int plasma_store_fd; - /** Photon's socket for IPC requests. */ - int photon_fd; - /** Photon's local scheduler state. */ - LocalSchedulerState *photon_state; - /** Photon's event loop. */ + /** Local scheduler's socket for IPC requests. */ + int local_scheduler_fd; + /** Local scheduler's local scheduler state. */ + LocalSchedulerState *local_scheduler_state; + /** Local scheduler's event loop. */ event_loop *loop; - /** Number of Photon client connections, or mock workers. */ - int num_photon_conns; - /** Photon client connections. */ - PhotonConnection **conns; -} PhotonMock; + /** Number of local scheduler client connections, or mock workers. */ + int num_local_scheduler_conns; + /** Local scheduler client connections. */ + LocalSchedulerConnection **conns; +} LocalSchedulerMock; -PhotonMock *PhotonMock_init(int num_workers, int num_mock_workers) { +LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, + int num_mock_workers) { const char *node_ip_address = "127.0.0.1"; const char *redis_addr = node_ip_address; int redis_port = 6379; const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS, DEFAULT_NUM_GPUS}; - PhotonMock *mock = malloc(sizeof(PhotonMock)); - memset(mock, 0, sizeof(PhotonMock)); + LocalSchedulerMock *mock = malloc(sizeof(LocalSchedulerMock)); + memset(mock, 0, sizeof(LocalSchedulerMock)); mock->loop = event_loop_create(); - /* Bind to the Photon port and initialize the Photon scheduler. */ + /* Bind to the local scheduler port and initialize the local scheduler. */ UT_string *plasma_manager_socket_name = bind_ipc_sock_retry( plasma_manager_socket_name_format, &mock->plasma_manager_fd); mock->plasma_store_fd = connect_ipc_sock_retry(plasma_store_socket_name, 5, 100); - UT_string *photon_socket_name = - bind_ipc_sock_retry(photon_socket_name_format, &mock->photon_fd); - CHECK(mock->plasma_store_fd >= 0 && mock->photon_fd >= 0); + UT_string *local_scheduler_socket_name = bind_ipc_sock_retry( + local_scheduler_socket_name_format, &mock->local_scheduler_fd); + CHECK(mock->plasma_store_fd >= 0 && mock->local_scheduler_fd >= 0); UT_string *worker_command; utstring_new(worker_command); @@ -77,11 +79,12 @@ PhotonMock *PhotonMock_init(int num_workers, int num_mock_workers) { "--redis-address=%s:%d", node_ip_address, plasma_store_socket_name, utstring_body(plasma_manager_socket_name), - utstring_body(photon_socket_name), redis_addr, redis_port); + utstring_body(local_scheduler_socket_name), redis_addr, + redis_port); - mock->photon_state = LocalSchedulerState_init( + mock->local_scheduler_state = LocalSchedulerState_init( "127.0.0.1", mock->loop, redis_addr, redis_port, - utstring_body(photon_socket_name), plasma_store_socket_name, + utstring_body(local_scheduler_socket_name), plasma_store_socket_name, utstring_body(plasma_manager_socket_name), NULL, false, static_resource_conf, utstring_body(worker_command), num_workers); @@ -90,50 +93,50 @@ PhotonMock *PhotonMock_init(int num_workers, int num_mock_workers) { accept_client(mock->plasma_manager_fd); } - /* Connect a Photon client. */ - mock->num_photon_conns = num_mock_workers; - mock->conns = malloc(sizeof(PhotonConnection *) * num_mock_workers); + /* Connect a local scheduler client. */ + mock->num_local_scheduler_conns = num_mock_workers; + mock->conns = malloc(sizeof(LocalSchedulerConnection *) * num_mock_workers); for (int i = 0; i < num_mock_workers; ++i) { - mock->conns[i] = - PhotonConnection_init(utstring_body(photon_socket_name), NIL_ACTOR_ID); - new_client_connection(mock->loop, mock->photon_fd, - (void *) mock->photon_state, 0); + mock->conns[i] = LocalSchedulerConnection_init( + utstring_body(local_scheduler_socket_name), NIL_ACTOR_ID); + new_client_connection(mock->loop, mock->local_scheduler_fd, + (void *) mock->local_scheduler_state, 0); } utstring_free(worker_command); utstring_free(plasma_manager_socket_name); - utstring_free(photon_socket_name); + utstring_free(local_scheduler_socket_name); return mock; } -void PhotonMock_free(PhotonMock *mock) { +void LocalSchedulerMock_free(LocalSchedulerMock *mock) { /* Disconnect clients. */ - for (int i = 0; i < mock->num_photon_conns; ++i) { - PhotonConnection_free(mock->conns[i]); + for (int i = 0; i < mock->num_local_scheduler_conns; ++i) { + LocalSchedulerConnection_free(mock->conns[i]); } free(mock->conns); /* Kill all the workers and run the event loop again so that the task table * updates propagate and the tasks in progress are freed. */ - LocalSchedulerClient **worker = - (LocalSchedulerClient **) utarray_eltptr(mock->photon_state->workers, 0); + LocalSchedulerClient **worker = (LocalSchedulerClient **) utarray_eltptr( + mock->local_scheduler_state->workers, 0); while (worker != NULL) { kill_worker(*worker, true); worker = (LocalSchedulerClient **) utarray_eltptr( - mock->photon_state->workers, 0); + mock->local_scheduler_state->workers, 0); } event_loop_add_timer(mock->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(mock->loop); /* This also frees mock->loop. */ - LocalSchedulerState_free(mock->photon_state); + LocalSchedulerState_free(mock->local_scheduler_state); close(mock->plasma_store_fd); close(mock->plasma_manager_fd); free(mock); } -void reset_worker(PhotonMock *mock, LocalSchedulerClient *worker) { +void reset_worker(LocalSchedulerMock *mock, LocalSchedulerClient *worker) { if (worker->task_in_progress) { Task_free(worker->task_in_progress); worker->task_in_progress = NULL; @@ -146,8 +149,8 @@ void reset_worker(PhotonMock *mock, LocalSchedulerClient *worker) { * value, the task should get assigned to a worker again. */ TEST object_reconstruction_test(void) { - PhotonMock *photon = PhotonMock_init(0, 1); - PhotonConnection *worker = photon->conns[0]; + LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(0, 1); + LocalSchedulerConnection *worker = local_scheduler->conns[0]; /* Create a task with zero dependencies and one return value. */ task_spec *spec = example_task_spec(0, 1); @@ -170,41 +173,47 @@ TEST object_reconstruction_test(void) { if (pid == 0) { /* Make sure we receive the task twice. First from the initial submission, * and second from the reconstruct request. */ - photon_submit(worker, spec); - task_spec *task_assigned = photon_get_task(worker); + local_scheduler_submit(worker, spec); + task_spec *task_assigned = local_scheduler_get_task(worker); ASSERT_EQ(memcmp(task_assigned, spec, task_spec_size(spec)), 0); - task_spec *reconstruct_task = photon_get_task(worker); + task_spec *reconstruct_task = local_scheduler_get_task(worker); ASSERT_EQ(memcmp(reconstruct_task, spec, task_spec_size(spec)), 0); /* Clean up. */ free_task_spec(reconstruct_task); free_task_spec(task_assigned); free_task_spec(spec); - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); exit(0); } else { /* Run the event loop. NOTE: OSX appears to require the parent process to * listen for events on the open file descriptors. */ - event_loop_add_timer(photon->loop, 500, + event_loop_add_timer(local_scheduler->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); - event_loop_run(photon->loop); + event_loop_run(local_scheduler->loop); /* Set the task's status to TASK_STATUS_DONE to prevent the race condition * that would suppress object reconstruction. */ - Task *task = Task_alloc(spec, TASK_STATUS_DONE, - get_db_client_id(photon->photon_state->db)); - task_table_add_task(photon->photon_state->db, task, NULL, NULL, NULL); + Task *task = Task_alloc( + spec, TASK_STATUS_DONE, + get_db_client_id(local_scheduler->local_scheduler_state->db)); + task_table_add_task(local_scheduler->local_scheduler_state->db, task, NULL, + NULL, NULL); /* Trigger reconstruction, and run the event loop again. */ ObjectID return_id = task_return(spec, 0); - photon_reconstruct_object(worker, return_id); - event_loop_add_timer(photon->loop, 500, + local_scheduler_reconstruct_object(worker, return_id); + event_loop_add_timer(local_scheduler->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); - event_loop_run(photon->loop); + event_loop_run(local_scheduler->loop); /* Wait for the child process to exit and check that there are no tasks * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); free_task_spec(spec); - ASSERT_EQ(num_waiting_tasks(photon->photon_state->algorithm_state), 0); - ASSERT_EQ(num_dispatch_tasks(photon->photon_state->algorithm_state), 0); - PhotonMock_free(photon); + ASSERT_EQ(num_waiting_tasks( + local_scheduler->local_scheduler_state->algorithm_state), + 0); + ASSERT_EQ(num_dispatch_tasks( + local_scheduler->local_scheduler_state->algorithm_state), + 0); + LocalSchedulerMock_free(local_scheduler); PASS(); } } @@ -215,8 +224,8 @@ TEST object_reconstruction_test(void) { * should trigger reconstruction of all previous tasks in the lineage. */ TEST object_reconstruction_recursive_test(void) { - PhotonMock *photon = PhotonMock_init(0, 1); - PhotonConnection *worker = photon->conns[0]; + LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(0, 1); + LocalSchedulerConnection *worker = local_scheduler->conns[0]; /* Create a chain of tasks, each one dependent on the one before it. Mark * each object as available so that tasks will run immediately. */ const int NUM_TASKS = 10; @@ -224,8 +233,9 @@ TEST object_reconstruction_recursive_test(void) { specs[0] = example_task_spec(0, 1); for (int i = 1; i < NUM_TASKS; ++i) { ObjectID arg_id = task_return(specs[i - 1], 0); - handle_object_available(photon->photon_state, - photon->photon_state->algorithm_state, arg_id); + handle_object_available( + local_scheduler->local_scheduler_state, + local_scheduler->local_scheduler_state->algorithm_state, arg_id); specs[i] = example_task_spec_with_args(1, 1, &arg_id); } @@ -249,11 +259,11 @@ TEST object_reconstruction_recursive_test(void) { if (pid == 0) { /* Submit the tasks, and make sure each one gets assigned to a worker. */ for (int i = 0; i < NUM_TASKS; ++i) { - photon_submit(worker, specs[i]); + local_scheduler_submit(worker, specs[i]); } /* Make sure we receive each task from the initial submission. */ for (int i = 0; i < NUM_TASKS; ++i) { - task_spec *task_assigned = photon_get_task(worker); + task_spec *task_assigned = local_scheduler_get_task(worker); ASSERT_EQ(memcmp(task_assigned, specs[i], task_spec_size(task_assigned)), 0); free_task_spec(task_assigned); @@ -261,7 +271,7 @@ TEST object_reconstruction_recursive_test(void) { /* Check that the workers receive all tasks in the final return object's * lineage during reconstruction. */ for (int i = 0; i < NUM_TASKS; ++i) { - task_spec *task_assigned = photon_get_task(worker); + task_spec *task_assigned = local_scheduler_get_task(worker); bool found = false; for (int j = 0; j < NUM_TASKS; ++j) { if (specs[j] == NULL) { @@ -277,35 +287,41 @@ TEST object_reconstruction_recursive_test(void) { free_task_spec(task_assigned); ASSERT(found); } - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); exit(0); } else { /* Run the event loop. NOTE: OSX appears to require the parent process to * listen for events on the open file descriptors. */ - event_loop_add_timer(photon->loop, 500, + event_loop_add_timer(local_scheduler->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); - event_loop_run(photon->loop); + event_loop_run(local_scheduler->loop); /* Set the final task's status to TASK_STATUS_DONE to prevent the race * condition that would suppress object reconstruction. */ - Task *last_task = Task_alloc(specs[NUM_TASKS - 1], TASK_STATUS_DONE, - get_db_client_id(photon->photon_state->db)); - task_table_add_task(photon->photon_state->db, last_task, NULL, NULL, NULL); + Task *last_task = Task_alloc( + specs[NUM_TASKS - 1], TASK_STATUS_DONE, + get_db_client_id(local_scheduler->local_scheduler_state->db)); + task_table_add_task(local_scheduler->local_scheduler_state->db, last_task, + NULL, NULL, NULL); /* Trigger reconstruction for the last object, and run the event loop * again. */ ObjectID return_id = task_return(specs[NUM_TASKS - 1], 0); - photon_reconstruct_object(worker, return_id); - event_loop_add_timer(photon->loop, 500, + local_scheduler_reconstruct_object(worker, return_id); + event_loop_add_timer(local_scheduler->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); - event_loop_run(photon->loop); + event_loop_run(local_scheduler->loop); /* Wait for the child process to exit and check that there are no tasks * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); - ASSERT_EQ(num_waiting_tasks(photon->photon_state->algorithm_state), 0); - ASSERT_EQ(num_dispatch_tasks(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_waiting_tasks( + local_scheduler->local_scheduler_state->algorithm_state), + 0); + ASSERT_EQ(num_dispatch_tasks( + local_scheduler->local_scheduler_state->algorithm_state), + 0); for (int i = 0; i < NUM_TASKS; ++i) { free_task_spec(specs[i]); } - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); PASS(); } } @@ -319,13 +335,13 @@ task_spec *object_reconstruction_suppression_spec; void object_reconstruction_suppression_callback(ObjectID object_id, void *user_context) { /* Submit the task after adding the object to the object table. */ - PhotonConnection *worker = user_context; - photon_submit(worker, object_reconstruction_suppression_spec); + LocalSchedulerConnection *worker = user_context; + local_scheduler_submit(worker, object_reconstruction_suppression_spec); } TEST object_reconstruction_suppression_test(void) { - PhotonMock *photon = PhotonMock_init(0, 1); - PhotonConnection *worker = photon->conns[0]; + LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(0, 1); + LocalSchedulerConnection *worker = local_scheduler->conns[0]; object_reconstruction_suppression_spec = example_task_spec(0, 1); ObjectID return_id = task_return(object_reconstruction_suppression_spec, 0); @@ -333,48 +349,52 @@ TEST object_reconstruction_suppression_test(void) { if (pid == 0) { /* Make sure we receive the task once. This will block until the * object_table_add callback completes. */ - task_spec *task_assigned = photon_get_task(worker); + task_spec *task_assigned = local_scheduler_get_task(worker); ASSERT_EQ(memcmp(task_assigned, object_reconstruction_suppression_spec, task_spec_size(object_reconstruction_suppression_spec)), 0); /* Trigger a reconstruction. We will check that no tasks get queued as a * result of this line in the event loop process. */ - photon_reconstruct_object(worker, return_id); + local_scheduler_reconstruct_object(worker, return_id); /* Clean up. */ free_task_spec(task_assigned); free_task_spec(object_reconstruction_suppression_spec); - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); exit(0); } else { /* Connect a plasma manager client so we can call object_table_add. */ const char *db_connect_args[] = {"address", "127.0.0.1:12346"}; DBHandle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 2, db_connect_args); - db_attach(db, photon->loop, false); + db_attach(db, local_scheduler->loop, false); /* Add the object to the object table. */ object_table_add(db, return_id, 1, (unsigned char *) NIL_DIGEST, NULL, object_reconstruction_suppression_callback, (void *) worker); /* Run the event loop. NOTE: OSX appears to require the parent process to * listen for events on the open file descriptors. */ - event_loop_add_timer(photon->loop, 1000, + event_loop_add_timer(local_scheduler->loop, 1000, (event_loop_timer_handler) timeout_handler, NULL); - event_loop_run(photon->loop); + event_loop_run(local_scheduler->loop); /* Wait for the child process to exit and check that there are no tasks * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); - ASSERT_EQ(num_waiting_tasks(photon->photon_state->algorithm_state), 0); - ASSERT_EQ(num_dispatch_tasks(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_waiting_tasks( + local_scheduler->local_scheduler_state->algorithm_state), + 0); + ASSERT_EQ(num_dispatch_tasks( + local_scheduler->local_scheduler_state->algorithm_state), + 0); free_task_spec(object_reconstruction_suppression_spec); db_disconnect(db); - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); PASS(); } } TEST task_dependency_test(void) { - PhotonMock *photon = PhotonMock_init(0, 1); - LocalSchedulerState *state = photon->photon_state; + LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(0, 1); + LocalSchedulerState *state = local_scheduler->local_scheduler_state; SchedulingAlgorithmState *algorithm_state = state->algorithm_state; /* Get the first worker. */ LocalSchedulerClient *worker = @@ -395,7 +415,7 @@ TEST task_dependency_test(void) { handle_worker_available(state, algorithm_state, worker); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); - reset_worker(photon, worker); + reset_worker(local_scheduler, worker); /* Check that the task gets queued in the waiting queue if the task is * submitted and a worker is available, but the input is not. */ @@ -408,7 +428,7 @@ TEST task_dependency_test(void) { handle_object_available(state, algorithm_state, oid); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); - reset_worker(photon, worker); + reset_worker(local_scheduler, worker); /* Check that the task gets queued in the dispatch queue if the task is * submitted and the input is available, but no worker is available yet. */ @@ -419,7 +439,7 @@ TEST task_dependency_test(void) { handle_worker_available(state, algorithm_state, worker); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); - reset_worker(photon, worker); + reset_worker(local_scheduler, worker); /* If an object gets removed, check the first scenario again, where the task * gets queued in the waiting task if the task is submitted and a worker is @@ -443,13 +463,13 @@ TEST task_dependency_test(void) { ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); free_task_spec(spec); - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); PASS(); } TEST task_multi_dependency_test(void) { - PhotonMock *photon = PhotonMock_init(0, 1); - LocalSchedulerState *state = photon->photon_state; + LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(0, 1); + LocalSchedulerState *state = local_scheduler->local_scheduler_state; SchedulingAlgorithmState *algorithm_state = state->algorithm_state; /* Get the first worker. */ LocalSchedulerClient *worker = @@ -476,7 +496,7 @@ TEST task_multi_dependency_test(void) { handle_worker_available(state, algorithm_state, worker); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); - reset_worker(photon, worker); + reset_worker(local_scheduler, worker); /* Check that the task gets queued in the dispatch queue if the task is * submitted and the inputs are available, but no worker is available yet. */ @@ -514,77 +534,90 @@ TEST task_multi_dependency_test(void) { handle_worker_available(state, algorithm_state, worker); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); - reset_worker(photon, worker); + reset_worker(local_scheduler, worker); free_task_spec(spec); - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); PASS(); } TEST start_kill_workers_test(void) { /* Start some workers. */ int num_workers = 4; - PhotonMock *photon = PhotonMock_init(num_workers, 0); + LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(num_workers, 0); /* We start off with num_workers children processes, but no workers * registered yet. */ - ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers); - ASSERT_EQ(utarray_len(photon->photon_state->workers), 0); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), + num_workers); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), 0); - /* Make sure that each worker connects to the photon scheduler. This for loop - * will hang if one of the workers does not connect. */ + /* Make sure that each worker connects to the local_scheduler scheduler. This + * for loop will hang if one of the workers does not connect. */ for (int i = 0; i < num_workers; ++i) { - new_client_connection(photon->loop, photon->photon_fd, - (void *) photon->photon_state, 0); + new_client_connection(local_scheduler->loop, + local_scheduler->local_scheduler_fd, + (void *) local_scheduler->local_scheduler_state, 0); } /* After handling each worker's initial connection, we should now have all * workers accounted for, but we haven't yet matched up process IDs with our * children processes. */ - ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers); - ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), + num_workers); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), + num_workers); /* Each worker should register its process ID. */ - for (int i = 0; i < utarray_len(photon->photon_state->workers); ++i) { + for (int i = 0; + i < utarray_len(local_scheduler->local_scheduler_state->workers); ++i) { LocalSchedulerClient *worker = *(LocalSchedulerClient **) utarray_eltptr( - photon->photon_state->workers, i); - process_message(photon->photon_state->loop, worker->sock, worker, 0); + local_scheduler->local_scheduler_state->workers, i); + process_message(local_scheduler->local_scheduler_state->loop, worker->sock, + worker, 0); } - ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); - ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 0); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), + num_workers); /* After killing a worker, its state is cleaned up. */ LocalSchedulerClient *worker = *(LocalSchedulerClient **) utarray_eltptr( - photon->photon_state->workers, 0); + local_scheduler->local_scheduler_state->workers, 0); kill_worker(worker, false); - ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); - ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 0); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), + num_workers - 1); /* Start a worker after the local scheduler has been initialized. */ - start_worker(photon->photon_state, NIL_ACTOR_ID); + start_worker(local_scheduler->local_scheduler_state, NIL_ACTOR_ID); /* Accept the workers as clients to the plasma manager. */ - int new_worker_fd = accept_client(photon->plasma_manager_fd); + int new_worker_fd = accept_client(local_scheduler->plasma_manager_fd); /* The new worker should register its process ID. */ - ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1); - ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); - /* Make sure the new worker connects to the photon scheduler. */ - new_client_connection(photon->loop, photon->photon_fd, - (void *) photon->photon_state, 0); - ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1); - ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 1); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), + num_workers - 1); + /* Make sure the new worker connects to the local_scheduler scheduler. */ + new_client_connection(local_scheduler->loop, + local_scheduler->local_scheduler_fd, + (void *) local_scheduler->local_scheduler_state, 0); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 1); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), + num_workers); /* Make sure that the new worker registers its process ID. */ worker = *(LocalSchedulerClient **) utarray_eltptr( - photon->photon_state->workers, num_workers - 1); - process_message(photon->photon_state->loop, worker->sock, worker, 0); - ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); - ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + local_scheduler->local_scheduler_state->workers, num_workers - 1); + process_message(local_scheduler->local_scheduler_state->loop, worker->sock, + worker, 0); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 0); + ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), + num_workers); /* Clean up. */ close(new_worker_fd); - PhotonMock_free(photon); + LocalSchedulerMock_free(local_scheduler); PASS(); } -SUITE(photon_tests) { +SUITE(local_scheduler_tests) { RUN_REDIS_TEST(object_reconstruction_test); RUN_REDIS_TEST(object_reconstruction_recursive_test); RUN_REDIS_TEST(object_reconstruction_suppression_test); @@ -597,6 +630,6 @@ GREATEST_MAIN_DEFS(); int main(int argc, char **argv) { GREATEST_MAIN_BEGIN(); - RUN_SUITE(photon_tests); + RUN_SUITE(local_scheduler_tests); GREATEST_MAIN_END(); } diff --git a/src/photon/test/run_tests.sh b/src/local_scheduler/test/run_tests.sh similarity index 91% rename from src/photon/test/run_tests.sh rename to src/local_scheduler/test/run_tests.sh index fe88aeb63..36f31566c 100644 --- a/src/photon/test/run_tests.sh +++ b/src/local_scheduler/test/run_tests.sh @@ -9,6 +9,6 @@ set -e sleep 1s ./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 100000000 & sleep 0.5s -./src/photon/photon_tests +./src/local_scheduler/local_scheduler_tests ./src/common/thirdparty/redis/src/redis-cli shutdown killall plasma_store diff --git a/src/photon/test/run_valgrind.sh b/src/local_scheduler/test/run_valgrind.sh similarity index 91% rename from src/photon/test/run_valgrind.sh rename to src/local_scheduler/test/run_valgrind.sh index 3ffeb66d5..4aee85aed 100644 --- a/src/photon/test/run_valgrind.sh +++ b/src/local_scheduler/test/run_valgrind.sh @@ -9,6 +9,6 @@ set -e sleep 1s ./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 100000000 & sleep 0.5s -valgrind --leak-check=full --show-leak-kinds=all --error-exitcode=1 ./src/photon/photon_tests +valgrind --leak-check=full --show-leak-kinds=all --error-exitcode=1 ./src/local_scheduler/local_scheduler_tests ./src/common/thirdparty/redis/src/redis-cli shutdown killall plasma_store diff --git a/src/photon/CMakeLists.txt b/src/photon/CMakeLists.txt deleted file mode 100644 index ac4c0fd10..000000000 --- a/src/photon/CMakeLists.txt +++ /dev/null @@ -1,45 +0,0 @@ -cmake_minimum_required(VERSION 2.8) - -project(photon) - -# Recursively include common -include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) - -if(APPLE) - SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") -endif(APPLE) - -include_directories("${PYTHON_INCLUDE_DIRS}") - -# set(CMAKE_C_FLAGS "${CMAKE_CXX_FLAGS} --std=c99 -Werror") - -if(UNIX AND NOT APPLE) - link_libraries(rt) -endif() - -include_directories("${CMAKE_CURRENT_LIST_DIR}/") -include_directories("${CMAKE_CURRENT_LIST_DIR}/../") -include_directories("${CMAKE_CURRENT_LIST_DIR}/../plasma/") - -add_library(photon SHARED - photon_extension.c - ../common/lib/python/common_extension.c) - -get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME) -if(APPLE) - add_custom_command(TARGET photon - POST_BUILD COMMAND ${CMAKE_INSTALL_NAME_TOOL} -change ${PYTHON_SHARED_LIBRARY} ${PYTHON_LIBRARIES} libphoton.so) -endif(APPLE) - -add_library(photon_client STATIC photon_client.c) - -target_link_libraries(photon photon_client ${COMMON_LIB} ${PYTHON_LIBRARIES}) - -add_executable(photon_scheduler photon_scheduler.c photon_algorithm.c) -target_link_libraries(photon_scheduler photon_client common ${HIREDIS_LIB} plasma_lib) - -add_executable(photon_tests test/photon_tests.c photon_scheduler.c photon_algorithm.c ) -target_link_libraries(photon_tests photon_client common ${HIREDIS_LIB} plasma_lib) -target_compile_options(photon_tests PUBLIC "-DPHOTON_TEST") - -install(TARGETS photon DESTINATION ${CMAKE_SOURCE_DIR}/photon) diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c deleted file mode 100644 index c60d58996..000000000 --- a/src/photon/photon_extension.c +++ /dev/null @@ -1,225 +0,0 @@ -#include - -#include "common_extension.h" -#include "photon_client.h" -#include "task.h" - -PyObject *PhotonError; - -// clang-format off -typedef struct { - PyObject_HEAD - PhotonConnection *photon_connection; -} PyPhotonClient; -// clang-format on - -static int PyPhotonClient_init(PyPhotonClient *self, - PyObject *args, - PyObject *kwds) { - char *socket_name; - ActorID actor_id; - if (!PyArg_ParseTuple(args, "sO&", &socket_name, PyStringToUniqueID, - &actor_id)) { - return -1; - } - /* Connect to the Photon scheduler. */ - self->photon_connection = PhotonConnection_init(socket_name, actor_id); - return 0; -} - -static void PyPhotonClient_dealloc(PyPhotonClient *self) { - PhotonConnection_free(((PyPhotonClient *) self)->photon_connection); - Py_TYPE(self)->tp_free((PyObject *) self); -} - -static PyObject *PyPhotonClient_submit(PyObject *self, PyObject *args) { - PyObject *py_task; - if (!PyArg_ParseTuple(args, "O", &py_task)) { - return NULL; - } - photon_submit(((PyPhotonClient *) self)->photon_connection, - ((PyTask *) py_task)->spec); - Py_RETURN_NONE; -} - -// clang-format off -static PyObject *PyPhotonClient_get_task(PyObject *self) { - task_spec *task_spec; - /* Drop the global interpreter lock while we get a task because - * photon_get_task may block for a long time. */ - Py_BEGIN_ALLOW_THREADS - task_spec = photon_get_task(((PyPhotonClient *) self)->photon_connection); - Py_END_ALLOW_THREADS - return PyTask_make(task_spec); -} -// clang-format on - -static PyObject *PyPhotonClient_reconstruct_object(PyObject *self, - PyObject *args) { - ObjectID object_id; - if (!PyArg_ParseTuple(args, "O&", PyStringToUniqueID, &object_id)) { - return NULL; - } - photon_reconstruct_object(((PyPhotonClient *) self)->photon_connection, - object_id); - Py_RETURN_NONE; -} - -static PyObject *PyPhotonClient_log_event(PyObject *self, PyObject *args) { - const char *key; - int key_length; - const char *value; - int value_length; - if (!PyArg_ParseTuple(args, "s#s#", &key, &key_length, &value, - &value_length)) { - return NULL; - } - photon_log_event(((PyPhotonClient *) self)->photon_connection, - (uint8_t *) key, key_length, (uint8_t *) value, - value_length); - Py_RETURN_NONE; -} - -static PyObject *PyPhotonClient_notify_unblocked(PyObject *self) { - photon_notify_unblocked(((PyPhotonClient *) self)->photon_connection); - Py_RETURN_NONE; -} - -static PyMethodDef PyPhotonClient_methods[] = { - {"submit", (PyCFunction) PyPhotonClient_submit, METH_VARARGS, - "Submit a task to the local scheduler."}, - {"get_task", (PyCFunction) PyPhotonClient_get_task, METH_NOARGS, - "Get a task from the local scheduler."}, - {"reconstruct_object", (PyCFunction) PyPhotonClient_reconstruct_object, - METH_VARARGS, "Ask the local scheduler to reconstruct an object."}, - {"log_event", (PyCFunction) PyPhotonClient_log_event, METH_VARARGS, - "Log an event to the event log through the local scheduler."}, - {"notify_unblocked", (PyCFunction) PyPhotonClient_notify_unblocked, - METH_NOARGS, "Notify the local scheduler that we are unblocked."}, - {NULL} /* Sentinel */ -}; - -static PyTypeObject PyPhotonClientType = { - PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ - "photon.PhotonClient", /* tp_name */ - sizeof(PyPhotonClient), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor) PyPhotonClient_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_compare */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - 0, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT, /* tp_flags */ - "PhotonClient object", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - PyPhotonClient_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - (initproc) PyPhotonClient_init, /* tp_init */ - 0, /* tp_alloc */ - PyType_GenericNew, /* tp_new */ -}; - -static PyMethodDef photon_methods[] = { - {"check_simple_value", check_simple_value, METH_VARARGS, - "Should the object be passed by value?"}, - {"compute_put_id", compute_put_id, METH_VARARGS, - "Return the object ID for a put call within a task."}, - {"task_from_string", PyTask_from_string, METH_VARARGS, - "Creates a Python PyTask object from a string representation of " - "task_spec."}, - {"task_to_string", PyTask_to_string, METH_VARARGS, - "Translates a PyTask python object to a byte string."}, - {NULL} /* Sentinel */ -}; - -#if PY_MAJOR_VERSION >= 3 -static struct PyModuleDef moduledef = { - PyModuleDef_HEAD_INIT, - "libphoton", /* m_name */ - "A module for the local scheduler.", /* m_doc */ - 0, /* m_size */ - photon_methods, /* m_methods */ - NULL, /* m_reload */ - NULL, /* m_traverse */ - NULL, /* m_clear */ - NULL, /* m_free */ -}; -#endif - -#if PY_MAJOR_VERSION >= 3 -#define INITERROR return NULL -#else -#define INITERROR return -#endif - -#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ -#define PyMODINIT_FUNC void -#endif - -#if PY_MAJOR_VERSION >= 3 -#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) -#else -#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) -#endif - -MOD_INIT(libphoton) { - if (PyType_Ready(&PyTaskType) < 0) { - INITERROR; - } - - if (PyType_Ready(&PyObjectIDType) < 0) { - INITERROR; - } - - if (PyType_Ready(&PyPhotonClientType) < 0) { - INITERROR; - } - -#if PY_MAJOR_VERSION >= 3 - PyObject *m = PyModule_Create(&moduledef); -#else - PyObject *m = Py_InitModule3("libphoton", photon_methods, - "A module for the local scheduler."); -#endif - - init_pickle_module(); - - Py_INCREF(&PyTaskType); - PyModule_AddObject(m, "Task", (PyObject *) &PyTaskType); - - Py_INCREF(&PyObjectIDType); - PyModule_AddObject(m, "ObjectID", (PyObject *) &PyObjectIDType); - - Py_INCREF(&PyPhotonClientType); - PyModule_AddObject(m, "PhotonClient", (PyObject *) &PyPhotonClientType); - - char photon_error[] = "photon.error"; - PhotonError = PyErr_NewException(photon_error, NULL, NULL); - Py_INCREF(PhotonError); - PyModule_AddObject(m, "photon_error", PhotonError); - -#if PY_MAJOR_VERSION >= 3 - return m; -#endif -} diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index dd1241723..f12382858 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -86,7 +86,7 @@ async def handle_get_statistics(websocket, redis_conn): for client_key in client_keys: client_fields = await hgetall_as_dict(redis_conn, client_key) clients.append(client_fields) - ip_addresses = list(set([client[b"node_ip_address"].decode("ascii") for client in clients if client[b"client_type"] == b"photon"])) + ip_addresses = list(set([client[b"node_ip_address"].decode("ascii") for client in clients if client[b"client_type"] == b"local_scheduler"])) num_nodes = len(ip_addresses) reply = {"uptime": uptime, "start_date": start_date, @@ -222,7 +222,7 @@ async def send_heartbeats(websocket, redis_conn): clients = [] for client_key in client_keys: client_fields = await hgetall_as_dict(redis_conn, client_key) - if client_fields[b"client_type"] == b"photon": + if client_fields[b"client_type"] == b"local_scheduler": local_scheduler_id = hex_identifier(client_fields[b"ray_client_id"]) local_schedulers[local_scheduler_id] = {"node_ip_address": client_fields[b"node_ip_address"].decode("ascii"), "local_scheduler_socket_name": client_fields[b"local_scheduler_socket_name"].decode("ascii"),