diff --git a/.travis.yml b/.travis.yml index 323433dcb..c07b8a043 100644 --- a/.travis.yml +++ b/.travis.yml @@ -104,7 +104,6 @@ install: - bash ../../../src/ray/test/run_gcs_tests.sh # Raylet tests. - bash ../../../src/ray/test/run_object_manager_tests.sh - - bash ../../../src/ray/test/run_task_test.sh - ./src/ray/raylet/task_test - ./src/ray/raylet/worker_pool_test - ./src/ray/raylet/lineage_cache_test @@ -123,6 +122,8 @@ script: - python python/ray/local_scheduler/test/test.py - python python/ray/global_scheduler/test/test.py + - python -m pytest test/xray_test.py + - python test/runtest.py - python test/array_test.py - python test/actor_test.py diff --git a/doc/source/conf.py b/doc/source/conf.py index 2b83481fe..6accfe56b 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -38,7 +38,8 @@ MOCK_MODULES = ["gym", "ray.core.generated.TaskInfo", "ray.core.generated.TaskReply", "ray.core.generated.ResultTableReply", - "ray.core.generated.TaskExecutionDependencies"] + "ray.core.generated.TaskExecutionDependencies", + "ray.core.generated.ClientTableData"] for mod_name in MOCK_MODULES: sys.modules[mod_name] = mock.Mock() diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 5dbc79ddd..af07a2420 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -86,11 +86,13 @@ def cli(): help="enable support for huge pages in the object store") @click.option("--autoscaling-config", required=False, type=str, help="the file that contains the autoscaling config") +@click.option("--use-raylet", is_flag=True, default=False, + help="use the raylet code path, this is not supported yet") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_shard_ports, object_manager_port, object_store_memory, num_workers, num_cpus, num_gpus, resources, head, no_ui, block, plasma_directory, huge_pages, - autoscaling_config): + autoscaling_config, use_raylet): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -161,7 +163,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, include_webui=(not no_ui), plasma_directory=plasma_directory, huge_pages=huge_pages, - autoscaling_config=autoscaling_config) + autoscaling_config=autoscaling_config, + use_raylet=use_raylet) print(address_info) print("\nStarted Ray on this node. You can add additional nodes to " "the cluster by calling\n\n" @@ -227,7 +230,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redirect_output=True, resources=resources, plasma_directory=plasma_directory, - huge_pages=huge_pages) + huge_pages=huge_pages, + use_raylet=use_raylet) print(address_info) print("\nStarted Ray on this node. If you wish to terminate the " "processes that have been started, run\n\n" @@ -242,7 +246,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, @click.command() def stop(): subprocess.call(["killall global_scheduler plasma_store plasma_manager " - "local_scheduler"], shell=True) + "local_scheduler raylet"], shell=True) # Find the PID of the monitor process and kill it. subprocess.call(["kill $(ps aux | grep monitor.py | grep -v grep | " diff --git a/python/ray/services.py b/python/ray/services.py index 1209047d3..ab1ce70e9 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -28,6 +28,7 @@ import ray.global_scheduler as global_scheduler PROCESS_TYPE_MONITOR = "monitor" PROCESS_TYPE_LOG_MONITOR = "log_monitor" PROCESS_TYPE_WORKER = "worker" +PROCESS_TYPE_RAYLET = "raylet" PROCESS_TYPE_LOCAL_SCHEDULER = "local_scheduler" PROCESS_TYPE_PLASMA_MANAGER = "plasma_manager" PROCESS_TYPE_PLASMA_STORE = "plasma_store" @@ -43,6 +44,7 @@ PROCESS_TYPE_WEB_UI = "web_ui" all_processes = OrderedDict([(PROCESS_TYPE_MONITOR, []), (PROCESS_TYPE_LOG_MONITOR, []), (PROCESS_TYPE_WORKER, []), + (PROCESS_TYPE_RAYLET, []), (PROCESS_TYPE_LOCAL_SCHEDULER, []), (PROCESS_TYPE_PLASMA_MANAGER, []), (PROCESS_TYPE_PLASMA_STORE, []), @@ -51,6 +53,7 @@ all_processes = OrderedDict([(PROCESS_TYPE_MONITOR, []), (PROCESS_TYPE_WEB_UI, [])],) # True if processes are run in the valgrind profiler. +RUN_RAYLET_PROFILER = False RUN_LOCAL_SCHEDULER_PROFILER = False RUN_PLASMA_MANAGER_PROFILER = False RUN_PLASMA_STORE_PROFILER = False @@ -74,6 +77,10 @@ CREDIS_MEMBER_MODULE = os.path.join( os.path.abspath(os.path.dirname(__file__)), "core/src/credis/build/src/libmember.so") +# Location of the raylet executable. +RAYLET_EXECUTABLE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/ray/raylet/raylet") # ObjectStoreAddress tuples contain all information necessary to connect to an # object store. The fields are: @@ -123,8 +130,8 @@ def kill_process(p): if p.poll() is not None: # The process has already terminated. return True - if any([RUN_LOCAL_SCHEDULER_PROFILER, RUN_PLASMA_MANAGER_PROFILER, - RUN_PLASMA_STORE_PROFILER]): + if any([RUN_RAYLET_PROFILER, RUN_LOCAL_SCHEDULER_PROFILER, + RUN_PLASMA_MANAGER_PROFILER, RUN_PLASMA_STORE_PROFILER]): # Give process signal to write profiler data. os.kill(p.pid, signal.SIGINT) # Wait for profiling data to be written. @@ -860,12 +867,73 @@ def start_local_scheduler(redis_address, return local_scheduler_name +def start_raylet(redis_address, + node_ip_address, + plasma_store_name, + worker_path, + stdout_file=None, + stderr_file=None, + cleanup=True): + """Start a raylet, which is a combined local scheduler and object manager. + + Args: + redis_address (str): The address of the Redis instance. + node_ip_address (str): The IP address of the node that this local + scheduler is running on. + plasma_store_name (str): The name of the plasma store socket to connect + to. + worker_path (str): The path of the script to use when the local + scheduler starts up new workers. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + cleanup (bool): True if using Ray in local mode. If cleanup is true, + then this process will be killed by serices.cleanup() when the + Python process that imported services exits. + + Returns: + The raylet socket name. + """ + gcs_ip_address, gcs_port = redis_address.split(":") + raylet_name = "/tmp/raylet{}".format(random_name()) + + # Create the command that the Raylet will use to start workers. + start_worker_command = ("{} {} " + "--node-ip-address={} " + "--object-store-name={} " + "--raylet-name={} " + "--redis-address={}" + .format(sys.executable, + worker_path, + node_ip_address, + plasma_store_name, + raylet_name, + redis_address)) + + command = [RAYLET_EXECUTABLE, + raylet_name, + plasma_store_name, + node_ip_address, + gcs_ip_address, + gcs_port, + start_worker_command] + pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + + if cleanup: + all_processes[PROCESS_TYPE_RAYLET].append(pid) + record_log_files_in_redis(redis_address, node_ip_address, + [stdout_file, stderr_file]) + + return raylet_name + + def start_objstore(node_ip_address, redis_address, object_manager_port=None, store_stdout_file=None, store_stderr_file=None, manager_stdout_file=None, manager_stderr_file=None, objstore_memory=None, cleanup=True, plasma_directory=None, - huge_pages=False): + huge_pages=False, use_raylet=False): """This method starts an object store process. Args: @@ -893,6 +961,8 @@ def start_objstore(node_ip_address, redis_address, be created. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. + use_raylet: True if the new raylet code path should be used. This is + not supported yet. Return: A tuple of the Plasma store socket name, the Plasma manager socket @@ -936,33 +1006,41 @@ def start_objstore(node_ip_address, redis_address, plasma_directory=plasma_directory, huge_pages=huge_pages) # Start the plasma manager. - if object_manager_port is not None: - (plasma_manager_name, p2, - plasma_manager_port) = ray.plasma.start_plasma_manager( - plasma_store_name, - redis_address, - plasma_manager_port=object_manager_port, - node_ip_address=node_ip_address, - num_retries=1, - run_profiler=RUN_PLASMA_MANAGER_PROFILER, - stdout_file=manager_stdout_file, - stderr_file=manager_stderr_file) - assert plasma_manager_port == object_manager_port + if not use_raylet: + if object_manager_port is not None: + (plasma_manager_name, p2, + plasma_manager_port) = ray.plasma.start_plasma_manager( + plasma_store_name, + redis_address, + plasma_manager_port=object_manager_port, + node_ip_address=node_ip_address, + num_retries=1, + run_profiler=RUN_PLASMA_MANAGER_PROFILER, + stdout_file=manager_stdout_file, + stderr_file=manager_stderr_file) + assert plasma_manager_port == object_manager_port + else: + (plasma_manager_name, p2, + plasma_manager_port) = ray.plasma.start_plasma_manager( + plasma_store_name, + redis_address, + node_ip_address=node_ip_address, + run_profiler=RUN_PLASMA_MANAGER_PROFILER, + stdout_file=manager_stdout_file, + stderr_file=manager_stderr_file) else: - (plasma_manager_name, p2, - plasma_manager_port) = ray.plasma.start_plasma_manager( - plasma_store_name, - redis_address, - node_ip_address=node_ip_address, - run_profiler=RUN_PLASMA_MANAGER_PROFILER, - stdout_file=manager_stdout_file, - stderr_file=manager_stderr_file) + plasma_manager_port = None + plasma_manager_name = None + if cleanup: all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1) - all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2) record_log_files_in_redis(redis_address, node_ip_address, - [store_stdout_file, store_stderr_file, - manager_stdout_file, manager_stderr_file]) + [store_stdout_file, store_stderr_file]) + if not use_raylet: + if cleanup: + all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2) + record_log_files_in_redis(redis_address, node_ip_address, + [manager_stdout_file, manager_stderr_file]) return ObjectStoreAddress(plasma_store_name, plasma_manager_name, plasma_manager_port) @@ -1059,7 +1137,8 @@ def start_ray_processes(address_info=None, resources=None, plasma_directory=None, huge_pages=False, - autoscaling_config=None): + autoscaling_config=None, + use_raylet=False): """Helper method to start Ray processes. Args: @@ -1112,6 +1191,8 @@ def start_ray_processes(address_info=None, huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. autoscaling_config: path to autoscaling config file. + use_raylet: True if the new raylet code path should be used. This is + not supported yet. Returns: A dictionary of the address information for the processes that were @@ -1193,7 +1274,7 @@ def start_ray_processes(address_info=None, cleanup=cleanup) # Start the global scheduler, if necessary. - if include_global_scheduler: + if include_global_scheduler and not use_raylet: global_scheduler_stdout_file, global_scheduler_stderr_file = ( new_log_files("global_scheduler", redirect_output)) start_global_scheduler(redis_address, @@ -1235,71 +1316,90 @@ def start_ray_processes(address_info=None, manager_stderr_file=plasma_manager_stderr_file, objstore_memory=object_store_memory, cleanup=cleanup, plasma_directory=plasma_directory, - huge_pages=huge_pages) + huge_pages=huge_pages, + use_raylet=use_raylet) object_store_addresses.append(object_store_address) time.sleep(0.1) # Start any local schedulers that do not yet exist. - for i in range(len(local_scheduler_socket_names), num_local_schedulers): - # Connect the local scheduler to the object store at the same index. - object_store_address = object_store_addresses[i] - plasma_address = "{}:{}".format(node_ip_address, - object_store_address.manager_port) - # Determine how many workers this local scheduler should start. - if start_workers_from_local_scheduler: - num_local_scheduler_workers = workers_per_local_scheduler[i] - workers_per_local_scheduler[i] = 0 - else: - # If we're starting the workers from Python, the local scheduler - # should not start any workers. - num_local_scheduler_workers = 0 - # Start the local scheduler. Note that if we do not wish to redirect - # the worker output, then we cannot redirect the local scheduler - # output. - local_scheduler_stdout_file, local_scheduler_stderr_file = ( - new_log_files("local_scheduler_{}".format(i), - redirect_output=redirect_worker_output)) - local_scheduler_name = start_local_scheduler( + if not use_raylet: + for i in range(len(local_scheduler_socket_names), + num_local_schedulers): + # Connect the local scheduler to the object store at the same + # index. + object_store_address = object_store_addresses[i] + plasma_address = "{}:{}".format(node_ip_address, + object_store_address.manager_port) + # Determine how many workers this local scheduler should start. + if start_workers_from_local_scheduler: + num_local_scheduler_workers = workers_per_local_scheduler[i] + workers_per_local_scheduler[i] = 0 + else: + # If we're starting the workers from Python, the local + # scheduler should not start any workers. + num_local_scheduler_workers = 0 + # Start the local scheduler. Note that if we do not wish to + # redirect the worker output, then we cannot redirect the local + # scheduler output. + local_scheduler_stdout_file, local_scheduler_stderr_file = ( + new_log_files("local_scheduler_{}".format(i), + redirect_output=redirect_worker_output)) + local_scheduler_name = start_local_scheduler( + redis_address, + node_ip_address, + object_store_address.name, + object_store_address.manager_name, + worker_path, + plasma_address=plasma_address, + stdout_file=local_scheduler_stdout_file, + stderr_file=local_scheduler_stderr_file, + cleanup=cleanup, + resources=resources[i], + num_workers=num_local_scheduler_workers) + local_scheduler_socket_names.append(local_scheduler_name) + + # Make sure that we have exactly num_local_schedulers instances of + # object stores and local schedulers. + assert len(object_store_addresses) == num_local_schedulers + assert len(local_scheduler_socket_names) == num_local_schedulers + + else: + # Start the raylet. TODO(rkn): Modify this to allow starting + # multiple raylets on the same machine. + raylet_stdout_file, raylet_stderr_file = ( + new_log_files("raylet_{}".format(i), + redirect_output=redirect_output)) + address_info["raylet_socket_name"] = start_raylet( redis_address, node_ip_address, - object_store_address.name, - object_store_address.manager_name, + object_store_addresses[i].name, worker_path, - plasma_address=plasma_address, - stdout_file=local_scheduler_stdout_file, - stderr_file=local_scheduler_stderr_file, - cleanup=cleanup, - resources=resources[i], - num_workers=num_local_scheduler_workers) - local_scheduler_socket_names.append(local_scheduler_name) - time.sleep(0.1) + stdout_file=None, + stderr_file=None, + cleanup=cleanup) - # Make sure that we have exactly num_local_schedulers instances of object - # stores and local schedulers. - assert len(object_store_addresses) == num_local_schedulers - assert len(local_scheduler_socket_names) == num_local_schedulers + if not use_raylet: + # Start any workers that the local scheduler has not already started. + for i, num_local_scheduler_workers in enumerate( + workers_per_local_scheduler): + object_store_address = object_store_addresses[i] + local_scheduler_name = local_scheduler_socket_names[i] + for j in range(num_local_scheduler_workers): + worker_stdout_file, worker_stderr_file = new_log_files( + "worker_{}_{}".format(i, j), redirect_output) + start_worker(node_ip_address, + object_store_address.name, + object_store_address.manager_name, + local_scheduler_name, + redis_address, + worker_path, + stdout_file=worker_stdout_file, + stderr_file=worker_stderr_file, + cleanup=cleanup) + workers_per_local_scheduler[i] -= 1 - # Start any workers that the local scheduler has not already started. - for i, num_local_scheduler_workers in enumerate( - workers_per_local_scheduler): - object_store_address = object_store_addresses[i] - local_scheduler_name = local_scheduler_socket_names[i] - for j in range(num_local_scheduler_workers): - worker_stdout_file, worker_stderr_file = new_log_files( - "worker_{}_{}".format(i, j), redirect_output) - start_worker(node_ip_address, - object_store_address.name, - object_store_address.manager_name, - local_scheduler_name, - redis_address, - worker_path, - stdout_file=worker_stdout_file, - stderr_file=worker_stderr_file, - cleanup=cleanup) - workers_per_local_scheduler[i] -= 1 - - # Make sure that we've started all the workers. - assert(sum(workers_per_local_scheduler) == 0) + # Make sure that we've started all the workers. + assert(sum(workers_per_local_scheduler) == 0) # Try to start the web UI. if include_webui: @@ -1327,7 +1427,8 @@ def start_ray_node(node_ip_address, redirect_output=False, resources=None, plasma_directory=None, - huge_pages=False): + huge_pages=False, + use_raylet=False): """Start the Ray processes for a single node. This assumes that the Ray processes on some master node have already been @@ -1360,6 +1461,8 @@ def start_ray_node(node_ip_address, be created. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. + use_raylet: True if the new raylet code path should be used. This is + not supported yet. Returns: A dictionary of the address information for the processes that were @@ -1400,7 +1503,8 @@ def start_ray_head(address_info=None, include_webui=True, plasma_directory=None, huge_pages=False, - autoscaling_config=None): + autoscaling_config=None, + use_raylet=False): """Start Ray in local mode. Args: @@ -1447,6 +1551,8 @@ def start_ray_head(address_info=None, huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. autoscaling_config: path to autoscaling config file. + use_raylet: True if the new raylet code path should be used. This is + not supported yet. Returns: A dictionary of the address information for the processes that were @@ -1474,7 +1580,8 @@ def start_ray_head(address_info=None, redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, huge_pages=huge_pages, - autoscaling_config=autoscaling_config) + autoscaling_config=autoscaling_config, + use_raylet=use_raylet) def try_to_create_directory(directory_path): diff --git a/python/ray/worker.py b/python/ray/worker.py index 33f650b4b..ee4ecbedf 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -31,6 +31,9 @@ import ray.plasma from ray.utils import (FunctionProperties, random_string, binary_to_hex, is_cython) +# Import flatbuffer bindings. +from ray.core.generated.ClientTableData import ClientTableData + SCRIPT_MODE = 0 WORKER_MODE = 1 PYTHON_MODE = 2 @@ -50,6 +53,7 @@ NIL_LOCAL_SCHEDULER_ID = NIL_ID NIL_FUNCTION_ID = NIL_ID NIL_ACTOR_ID = NIL_ID NIL_ACTOR_HANDLE_ID = NIL_ID +NIL_CLIENT_ID = 20 * b"\xff" # This must be kept in sync with the `error_types` array in # common/state/error_table.h. @@ -452,9 +456,12 @@ class Worker(object): for object_id in object_ids] for i in range(0, len(object_ids), ray._config.worker_fetch_request_size()): - self.plasma_client.fetch( - plain_object_ids[i:(i + - ray._config.worker_fetch_request_size())]) + if not self.use_raylet: + self.plasma_client.fetch( + plain_object_ids + [i:(i + ray._config.worker_fetch_request_size())]) + else: + print("plasma_client.fetch has not been implemented yet") # Get the objects. We initially try to get the objects immediately. final_results = self.retrieve_and_deserialize(plain_object_ids, 0) @@ -478,9 +485,12 @@ class Worker(object): plasma.ObjectID, unready_ids.keys())) for i in range(0, len(object_ids_to_fetch), ray._config.worker_fetch_request_size()): - self.plasma_client.fetch( - object_ids_to_fetch[i:( - i + ray._config.worker_fetch_request_size())]) + if not self.use_raylet: + self.plasma_client.fetch( + object_ids_to_fetch[i:( + i + ray._config.worker_fetch_request_size())]) + else: + print("plasma_client.fetch has not been implemented yet") results = self.retrieve_and_deserialize( object_ids_to_fetch, max([ray._config.get_timeout_milliseconds(), @@ -496,7 +506,7 @@ class Worker(object): # If there were objects that we weren't able to get locally, let the # local scheduler know that we're now unblocked. - if was_blocked: + if was_blocked and not self.use_raylet: self.local_scheduler_client.notify_unblocked() assert len(final_results) == len(object_ids) @@ -1150,70 +1160,108 @@ def _initialize_serialization(worker=global_worker): use_dict=True) -def get_address_info_from_redis_helper(redis_address, node_ip_address): +def get_address_info_from_redis_helper(redis_address, node_ip_address, + use_raylet=False): redis_ip_address, redis_port = redis_address.split(":") # For this command to work, some other client (on the same machine as # Redis) must have run "CONFIG SET protected-mode no". redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port)) - # The client table prefix must be kept in sync with the file - # "src/common/redis_module/ray_redis_module.cc" where it is defined. - REDIS_CLIENT_TABLE_PREFIX = "CL:" - client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX)) - # Filter to live clients on the same node and do some basic checking. - plasma_managers = [] - local_schedulers = [] - for key in client_keys: - info = redis_client.hgetall(key) - # Ignore clients that were deleted. - deleted = info[b"deleted"] - deleted = bool(int(deleted)) - if deleted: - continue + if not use_raylet: + # The client table prefix must be kept in sync with the file + # "src/common/redis_module/ray_redis_module.cc" where it is defined. + REDIS_CLIENT_TABLE_PREFIX = "CL:" + client_keys = redis_client.keys( + "{}*".format(REDIS_CLIENT_TABLE_PREFIX)) + # Filter to live clients on the same node and do some basic checking. + plasma_managers = [] + local_schedulers = [] + for key in client_keys: + info = redis_client.hgetall(key) - assert b"ray_client_id" in info - assert b"node_ip_address" in info - assert b"client_type" in info - client_node_ip_address = info[b"node_ip_address"].decode("ascii") - if (client_node_ip_address == node_ip_address or - (client_node_ip_address == "127.0.0.1" and - redis_ip_address == ray.services.get_node_ip_address())): - if info[b"client_type"].decode("ascii") == "plasma_manager": - plasma_managers.append(info) - elif info[b"client_type"].decode("ascii") == "local_scheduler": - local_schedulers.append(info) - # Make sure that we got at least one plasma manager and local scheduler. - assert len(plasma_managers) >= 1 - assert len(local_schedulers) >= 1 - # Build the address information. - object_store_addresses = [] - for manager in plasma_managers: - address = manager[b"manager_address"].decode("ascii") - port = services.get_port(address) - object_store_addresses.append( - services.ObjectStoreAddress( - name=manager[b"store_socket_name"].decode("ascii"), - manager_name=manager[b"manager_socket_name"].decode("ascii"), - manager_port=port)) - scheduler_names = [ - scheduler[b"local_scheduler_socket_name"].decode("ascii") - for scheduler in local_schedulers] - client_info = {"node_ip_address": node_ip_address, - "redis_address": redis_address, - "object_store_addresses": object_store_addresses, - "local_scheduler_socket_names": scheduler_names, - # Web UI should be running. - "webui_url": _webui_url_helper(redis_client)} - return client_info + # Ignore clients that were deleted. + deleted = info[b"deleted"] + deleted = bool(int(deleted)) + if deleted: + continue + + assert b"ray_client_id" in info + assert b"node_ip_address" in info + assert b"client_type" in info + client_node_ip_address = info[b"node_ip_address"].decode("ascii") + if (client_node_ip_address == node_ip_address or + (client_node_ip_address == "127.0.0.1" and + redis_ip_address == ray.services.get_node_ip_address())): + if info[b"client_type"].decode("ascii") == "plasma_manager": + plasma_managers.append(info) + elif info[b"client_type"].decode("ascii") == "local_scheduler": + local_schedulers.append(info) + # Make sure that we got at least one plasma manager and local + # scheduler. + assert len(plasma_managers) >= 1 + assert len(local_schedulers) >= 1 + # Build the address information. + object_store_addresses = [] + for manager in plasma_managers: + address = manager[b"manager_address"].decode("ascii") + port = services.get_port(address) + object_store_addresses.append( + services.ObjectStoreAddress( + name=manager[b"store_socket_name"].decode("ascii"), + manager_name=manager[b"manager_socket_name"].decode( + "ascii"), + manager_port=port)) + scheduler_names = [ + scheduler[b"local_scheduler_socket_name"].decode("ascii") + for scheduler in local_schedulers] + client_info = {"node_ip_address": node_ip_address, + "redis_address": redis_address, + "object_store_addresses": object_store_addresses, + "local_scheduler_socket_names": scheduler_names, + # Web UI should be running. + "webui_url": _webui_url_helper(redis_client)} + return client_info + + # Handle the raylet case. + else: + # In the raylet code path, all client data is stored in a zset at the + # key for the nil client. + client_key = b"CLIENT:" + NIL_CLIENT_ID + clients = redis_client.zrange(client_key, 0, -1) + raylets = [] + for client_message in clients: + client = ClientTableData.GetRootAsClientTableData(client_message, + 0) + client_node_ip_address = client.NodeManagerAddress().decode( + "ascii") + if (client_node_ip_address == node_ip_address or + (client_node_ip_address == "127.0.0.1" and + redis_ip_address == ray.services.get_node_ip_address())): + raylets.append(client) + + # TODO(rkn): The ObjectStoreSocketName field does not exist. + object_store_addresses = [ + raylet.ObjectStoreSocketName().decode("ascii") + for raylet in raylets] + raylet_socket_names = [raylet.NodeManagerAddress().decode("ascii") for + raylet in raylets] + return {"node_ip_address": node_ip_address, + "redis_address": redis_address, + "object_store_addresses": object_store_addresses, + "raylet_socket_names": raylet_socket_names, + # Web UI should be running. + "webui_url": _webui_url_helper(redis_client)} -def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5): +def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5, + use_raylet=False): counter = 0 while True: try: return get_address_info_from_redis_helper(redis_address, - node_ip_address) + node_ip_address, + use_raylet=use_raylet) except Exception as e: if counter == num_retries: raise @@ -1281,7 +1329,8 @@ def _init(address_info=None, redis_max_clients=None, plasma_directory=None, huge_pages=False, - include_webui=True): + include_webui=True, + use_raylet=False): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -1336,6 +1385,8 @@ def _init(address_info=None, Store with hugetlbfs support. Requires plasma_directory. include_webui: Boolean flag indicating whether to start the web UI, which is a Jupyter notebook. + use_raylet: True if the new raylet code path should be used. This is + not supported yet. Returns: Address information about the started processes. @@ -1402,7 +1453,8 @@ def _init(address_info=None, redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, huge_pages=huge_pages, - include_webui=include_webui) + include_webui=include_webui, + use_raylet=use_raylet) else: if redis_address is None: raise Exception("When connecting to an existing cluster, " @@ -1439,7 +1491,8 @@ def _init(address_info=None, node_ip_address = services.get_node_ip_address(redis_address) # Get the address info of the processes to connect to from Redis. address_info = get_address_info_from_redis(redis_address, - node_ip_address) + node_ip_address, + use_raylet=use_raylet) # Connect this driver to Redis, the object store, and the local scheduler. # Choose the first object store and local scheduler if there are multiple. @@ -1453,13 +1506,17 @@ def _init(address_info=None, "redis_address": address_info["redis_address"], "store_socket_name": ( address_info["object_store_addresses"][0].name), - "manager_socket_name": ( - address_info["object_store_addresses"][0].manager_name), - "local_scheduler_socket_name": ( - address_info["local_scheduler_socket_names"][0]), "webui_url": address_info["webui_url"]} + if not use_raylet: + driver_address_info["manager_socket_name"] = ( + address_info["object_store_addresses"][0].manager_name) + driver_address_info["local_scheduler_socket_name"] = ( + address_info["local_scheduler_socket_names"][0]) + else: + driver_address_info["raylet_socket_name"] = ( + address_info["raylet_socket_name"]) connect(driver_address_info, object_id_seed=object_id_seed, - mode=driver_mode, worker=global_worker) + mode=driver_mode, worker=global_worker, use_raylet=use_raylet) return address_info @@ -1469,7 +1526,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_cpus=None, num_gpus=None, resources=None, num_custom_resource=None, num_redis_shards=None, redis_max_clients=None, plasma_directory=None, - huge_pages=False, include_webui=True, object_store_memory=None): + huge_pages=False, include_webui=True, object_store_memory=None, + use_raylet=False): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -1513,6 +1571,9 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, UI, which is a Jupyter notebook. object_store_memory: The amount of memory (in bytes) to start the object store with. + use_raylet: True if the new raylet code path should be used. This is + not supported yet. + Returns: Address information about the started processes. @@ -1539,7 +1600,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, plasma_directory=plasma_directory, huge_pages=huge_pages, include_webui=include_webui, - object_store_memory=object_store_memory) + object_store_memory=object_store_memory, + use_raylet=use_raylet) def cleanup(worker=global_worker): @@ -1818,7 +1880,8 @@ def import_thread(worker, mode): pass -def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): +def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, + use_raylet=False): """Connect this worker to the local scheduler, to Plasma, and to Redis. Args: @@ -1828,6 +1891,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): deterministic. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. + use_raylet: True if the new raylet code path should be used. This is + not supported yet. """ check_main_thread() # Do some basic checking to make sure we didn't call ray.init twice. @@ -1842,6 +1907,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): worker.actor_id = NIL_ACTOR_ID worker.connected = True worker.set_mode(mode) + worker.use_raylet = use_raylet # The worker.events field is used to aggregate logging information and # display it in the web UI. Note that Python lists protected by the GIL, # which is important because we will append to this field from multiple @@ -1909,8 +1975,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): "driver_id": worker.worker_id, "start_time": time.time(), "plasma_store_socket": info["store_socket_name"], - "plasma_manager_socket": info["manager_socket_name"], - "local_scheduler_socket": info["local_scheduler_socket_name"]} + "plasma_manager_socket": info.get("manager_socket_name"), + "local_scheduler_socket": info.get("local_scheduler_socket_name"), + "raylet_socket": info.get("raylet_socket_name")} driver_info["name"] = (main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE") worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info) @@ -1933,11 +2000,22 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): raise Exception("This code should be unreachable.") # Create an object store client. - worker.plasma_client = plasma.connect(info["store_socket_name"], - info["manager_socket_name"], - 64) + if not worker.use_raylet: + worker.plasma_client = plasma.connect(info["store_socket_name"], + info["manager_socket_name"], + 64) + else: + worker.plasma_client = plasma.connect(info["store_socket_name"], + "", + 64) + + if not worker.use_raylet: + local_scheduler_socket = info["local_scheduler_socket_name"] + else: + local_scheduler_socket = info["raylet_socket_name"] + worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient( - info["local_scheduler_socket_name"], worker.worker_id, is_worker) + local_scheduler_socket, worker.worker_id, is_worker) # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. @@ -2275,9 +2353,10 @@ def flush_log(worker=global_worker): """Send the logged worker events to the global state store.""" event_log_key = b"event_log:" + worker.worker_id event_log_value = json.dumps(worker.events) - worker.local_scheduler_client.log_event(event_log_key, - event_log_value, - time.time()) + if not worker.use_raylet: + worker.local_scheduler_client.log_event(event_log_key, + event_log_value, + time.time()) worker.events = [] @@ -2367,6 +2446,9 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): A list of object IDs that are ready and a list of the remaining object IDs. """ + if worker.use_raylet: + print("plasma_client.wait has not been implemented yet") + return if isinstance(object_ids, ray.local_scheduler.ObjectID): raise TypeError( diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 8551f118f..56f472ef2 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -16,10 +16,12 @@ parser.add_argument("--redis-address", required=True, type=str, help="the address to use for Redis") parser.add_argument("--object-store-name", required=True, type=str, help="the object store's name") -parser.add_argument("--object-store-manager-name", required=True, type=str, +parser.add_argument("--object-store-manager-name", required=False, type=str, help="the object store manager's name") -parser.add_argument("--local-scheduler-name", required=True, type=str, +parser.add_argument("--local-scheduler-name", required=False, type=str, help="the local scheduler's name") +parser.add_argument("--raylet-name", required=False, type=str, + help="the raylet's name") if __name__ == "__main__": @@ -29,9 +31,11 @@ if __name__ == "__main__": "redis_address": args.redis_address, "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, - "local_scheduler_socket_name": args.local_scheduler_name} + "local_scheduler_socket_name": args.local_scheduler_name, + "raylet_socket_name": args.raylet_name} - ray.worker.connect(info, mode=ray.WORKER_MODE) + ray.worker.connect(info, mode=ray.WORKER_MODE, + use_raylet=(args.raylet_name is not None)) error_explanation = """ This error is unexpected and should not have happened. Somehow a worker diff --git a/python/setup.py b/python/setup.py index 093c73917..2dbbffc22 100644 --- a/python/setup.py +++ b/python/setup.py @@ -23,6 +23,7 @@ ray_files = [ "ray/core/src/local_scheduler/local_scheduler", "ray/core/src/local_scheduler/liblocal_scheduler_library.so", "ray/core/src/global_scheduler/global_scheduler", + "ray/core/src/ray/raylet/raylet", "ray/WebUI.ipynb" ] diff --git a/src/ray/gcs/CMakeLists.txt b/src/ray/gcs/CMakeLists.txt index a3df33102..015051971 100644 --- a/src/ray/gcs/CMakeLists.txt +++ b/src/ray/gcs/CMakeLists.txt @@ -19,6 +19,15 @@ add_custom_command( add_custom_target(gen_gcs_fbs DEPENDS ${GCS_FBS_OUTPUT_FILES}) +# Generate Python bindings for the flatbuffers objects. +set(PYTHON_OUTPUT_DIR ${CMAKE_BINARY_DIR}/generated/) +add_custom_command( + TARGET gen_gcs_fbs + COMMAND ${FLATBUFFERS_COMPILER} -p -o ${PYTHON_OUTPUT_DIR} ${GCS_FBS_SRC} + DEPENDS ${FBS_DEPENDS} + COMMENT "Running flatc compiler on ${GCS_FBS_SRC}" + VERBATIM) + ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread ${Boost_SYSTEM_LIBRARY}) ADD_RAY_TEST(asio_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread ${Boost_SYSTEM_LIBRARY}) diff --git a/src/ray/python/default_worker.py b/src/ray/python/default_worker.py deleted file mode 100644 index 877c6220e..000000000 --- a/src/ray/python/default_worker.py +++ /dev/null @@ -1,18 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse - -from worker import Worker - -parser = argparse.ArgumentParser() -parser.add_argument("raylet_socket_name") -parser.add_argument("object_store_socket_name") - -if __name__ == '__main__': - args = parser.parse_args() - - worker = Worker(args.raylet_socket_name, args.object_store_socket_name, - is_worker=True) - worker.main_loop() diff --git a/src/ray/python/one_test_driver.py b/src/ray/python/one_test_driver.py deleted file mode 100644 index b18dc5b20..000000000 --- a/src/ray/python/one_test_driver.py +++ /dev/null @@ -1,33 +0,0 @@ -import argparse - -import ray -from worker import Worker, logger -from ray.utils import random_string - - -parser = argparse.ArgumentParser() -parser.add_argument("raylet_socket_name") -parser.add_argument("object_store_socket_name") - -if __name__ == '__main__': - args = parser.parse_args() - - driver = Worker(args.raylet_socket_name, args.object_store_socket_name, - is_worker=False) - - task1 = ray.local_scheduler.Task( - ray.local_scheduler.ObjectID(random_string()), - ray.local_scheduler.ObjectID(random_string()), - [], - 1, - ray.local_scheduler.ObjectID(random_string()), - 0) - logger.debug("submitting", task1.task_id()) - driver.node_manager_client.submit(task1) - - logger.debug("Return values were", task1.returns()) - print("[DRIVER] Return values were", task1.returns()) - # Make sure the tasks get executed and we can get the result of the - # last task - obj = driver.get(task1.returns(), timeout_ms=1000) - print("[DRIVER]: task1 driver.get result ", obj) diff --git a/src/ray/python/test_driver.py b/src/ray/python/test_driver.py deleted file mode 100644 index 5850f4d78..000000000 --- a/src/ray/python/test_driver.py +++ /dev/null @@ -1,40 +0,0 @@ -import argparse - -import ray -from worker import Worker, logger -from ray.utils import random_string - -parser = argparse.ArgumentParser() -parser.add_argument("raylet_socket_name") -parser.add_argument("object_store_socket_name") - -if __name__ == '__main__': - args = parser.parse_args() - - driver = Worker(args.raylet_socket_name, args.object_store_socket_name, - is_worker=False) - - task = ray.local_scheduler.Task( - ray.local_scheduler.ObjectID(random_string()), - ray.local_scheduler.ObjectID(random_string()), - [], - 1, - ray.local_scheduler.ObjectID(random_string()), - 0) - logger.debug("submitting %s", task.task_id()) - driver.node_manager_client.submit(task) - - logger.debug("Return values were %s", task.returns()) - task2 = ray.local_scheduler.Task( - ray.local_scheduler.ObjectID(random_string()), - ray.local_scheduler.ObjectID(random_string()), - task.returns(), - 1, - ray.local_scheduler.ObjectID(random_string()), - 0) - logger.debug("Submitting dependent task 2 %s", task2.task_id()) - driver.node_manager_client.submit(task2) - - # Make sure the tasks get executed and we can get the result of the last - # task. - obj = driver.get(task2.returns(), timeout_ms=1000) diff --git a/src/ray/python/test_driver_taskchains.py b/src/ray/python/test_driver_taskchains.py deleted file mode 100644 index 8a14c2f68..000000000 --- a/src/ray/python/test_driver_taskchains.py +++ /dev/null @@ -1,115 +0,0 @@ -import argparse - -import ray -from worker import Worker, logger -from ray.utils import random_string - - -parser = argparse.ArgumentParser() -parser.add_argument("raylet_socket_name") -parser.add_argument("object_store_socket_name") - - -def submit_task_withdep(driver_handle, task_object_dependencies=[]): - ''' submit a task that depend on a list of @args''' - task = ray.local_scheduler.Task( - ray.local_scheduler.ObjectID(random_string()), - ray.local_scheduler.ObjectID(random_string()), - task_object_dependencies, - 1, # num_returns - ray.local_scheduler.ObjectID(random_string()), - 0) - logger.debug("[DRIVER]: submitting task ", task.task_id()) - driver_handle.node_manager_client.submit(task) - logger.debug("[DRIVER]: task return values", task.returns()) - return task.returns() - - -def submit_tasks_nodep(driver_handle, num_tasks): - ''' submit a task that depend on a list of @args''' - for i in range(num_tasks): - task = ray.local_scheduler.Task( - ray.local_scheduler.ObjectID(random_string()), - ray.local_scheduler.ObjectID(random_string()), - [], - 1, # num_returns - ray.local_scheduler.ObjectID(random_string()), - 0) - - logger.debug("[DRIVER]: submitting task ", task.task_id()) - driver_handle.node_manager_client.submit(task) - logger.debug("[DRIVER]: task return values", task.returns()) - - -def submit_task_chains(num_chains, tasks_per_chain): - # return task placement map on output - chain_returns = [] - task_placement_map_ = {} - for chain_num in range(num_chains): - last_task_returns = [] - task_placement_map_[chain_num] = [] - for i in range(tasks_per_chain): - task_returns = submit_task_withdep( - driver, - task_object_dependencies=last_task_returns) - last_task_returns = task_returns - task_placement_map_[chain_num].append(task_returns[0]) - chain_returns.append(last_task_returns) - - logger.debug("chain_returns=", chain_returns) - chain_results = driver.get([r[0] for r in chain_returns], timeout_ms=5000) - print("[DRIVER]: chain return values: ", chain_results) - - return task_placement_map_ - - -def TEST_run_task_chains(num_chains, tasks_per_chain): - task_placement_map = submit_task_chains(num_chains=num_chains, - tasks_per_chain=tasks_per_chain) - logger.debug("[DRIVER]: task placement information, per chain:") - task_placement_total = [] - for chain_num in range(len(task_placement_map)): - task_placement_list = driver.get(task_placement_map[chain_num], - timeout_ms=5000) - task_placement_total += [t[1] for t in task_placement_list] - logger.debug(chain_num, task_placement_list) - logger.debug("task placement overall: ", task_placement_total) - task_placement_stats = [(v, task_placement_total.count(v)) - for v in set(task_placement_total)] - num_total_tasks = sum([t[1] for t in task_placement_stats]) - print("total tasks executed = ", num_total_tasks) - assert(num_total_tasks == num_chains * tasks_per_chain) - print("task placement breakdown: total=", task_placement_stats) - - -def TEST_run_tasks_nodep(num_tasks): - # This test is the same as having num_tasks chains with 1 task per chain - # In this test we assume the num_tasks x 1 chain structure. - task_placement_map = submit_task_chains(num_chains=num_tasks, - tasks_per_chain=1) - logger.debug("[DRIVER]: task placement information, per chain:") - task_placement_total = [] - for chain_num in range(len(task_placement_map)): - task_placement_list = driver.get(task_placement_map[chain_num], - timeout_ms=5000) - task_placement_total += [t[1] for t in task_placement_list] - logger.debug(chain_num, task_placement_list) - logger.debug("task placement overall: ", task_placement_total) - task_placement_stats = [(v, task_placement_total.count(v)) for v in - set(task_placement_total)] - num_total_tasks = sum([t[1] for t in task_placement_stats]) - print("total tasks executed = ", num_total_tasks) - assert(num_total_tasks == num_tasks) - print("task placement breakdown: total=", task_placement_stats) - - -if __name__ == '__main__': - args = parser.parse_args() - - driver = Worker(args.raylet_socket_name, args.object_store_socket_name, - is_worker=False) - - # Set up the experiment : number of chains and tasks per chain. - # TEST_run_task_chains(num_chains=10, tasks_per_chain=100) - - TEST_run_tasks_nodep(10000) diff --git a/src/ray/python/worker.py b/src/ray/python/worker.py deleted file mode 100644 index 072b386f9..000000000 --- a/src/ray/python/worker.py +++ /dev/null @@ -1,67 +0,0 @@ -import logging - -import ray -import pyarrow -import pyarrow.plasma as plasma -from ray.utils import random_string - - -logging.basicConfig() -logger = logging.getLogger(__name__) - -# The default return value to put in the object store. -RETURN_VALUE = 0 - - -class Worker(object): - - total_task_count = 0 - - def __init__(self, raylet_socket_name, object_store_socket_name, - is_worker): - # Connect to the Raylet and object store. - self.node_manager_client = ray.local_scheduler.LocalSchedulerClient( - raylet_socket_name, random_string(), is_worker) - self.plasma_client = plasma.connect(object_store_socket_name, "", 0) - self.serialization_context = pyarrow.default_serialization_context() - self.raylet_socket_name = raylet_socket_name - self.object_store_socket_name = object_store_socket_name - - def main_loop(self): - while True: - self.get_task() - - def get(self, object_ids, timeout_ms=-1): - for object_id in object_ids: - self.node_manager_client.reconstruct_object(object_id.id()) - plasma_ids = [plasma.ObjectID(argument.id()) for argument in - object_ids] - values = self.plasma_client.get(plasma_ids, timeout_ms, - self.serialization_context) - assert(all(value[0] == RETURN_VALUE for value in values)) - return values - - def get_task(self): - logger.debug("[WORKER] waiting for task") - task = self.node_manager_client.get_task() - logger.debug("Worker assigned %s with arguments %s", - ray.utils.binary_to_hex(task.task_id().id()), - " ".join([ray.utils.binary_to_hex(argument.id()) for - argument in task.arguments()])) - - # Get the arguments. NOTE(swang): This will hang forever if the - # arguments have been evicted. - arguments = self.get(task.arguments()) - - for object_id in task.returns(): - self.plasma_client.put((RETURN_VALUE, self.raylet_socket_name), - plasma.ObjectID(object_id.id())) - objval = self.plasma_client.get([plasma.ObjectID(object_id.id())]) - assert(all([o[0] == RETURN_VALUE for o in objval])) - - logger.debug("Worker returned %s", - " ".join([ray.utils.binary_to_hex(return_id.id()) for - return_id in task.returns()])) - - # Release the arguments. - del arguments diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 503599030..dd219f26a 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -5,13 +5,14 @@ #ifndef RAYLET_TEST int main(int argc, char *argv[]) { - RAY_CHECK(argc == 6); + RAY_CHECK(argc == 7); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); const std::string node_ip_address = std::string(argv[3]); const std::string redis_address = std::string(argv[4]); int redis_port = std::stoi(argv[5]); + const std::string worker_command = std::string(argv[6]); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; @@ -21,11 +22,13 @@ int main(int argc, char *argv[]) { ray::raylet::ResourceSet(std::move(static_resource_conf)); node_manager_config.num_initial_workers = 0; // Use a default worker that can execute empty tasks with dependencies. - node_manager_config.worker_command.push_back("python"); - node_manager_config.worker_command.push_back( - "../../../src/ray/python/default_worker.py"); - node_manager_config.worker_command.push_back(raylet_socket_name.c_str()); - node_manager_config.worker_command.push_back(store_socket_name.c_str()); + + std::stringstream worker_command_stream(worker_command); + std::string token; + while (getline(worker_command_stream, token, ' ')) { + node_manager_config.worker_command.push_back(token); + } + // TODO(swang): Set this from a global config. node_manager_config.heartbeat_period_ms = 100; @@ -41,6 +44,7 @@ int main(int argc, char *argv[]) { // Initialize the node manager. boost::asio::io_service main_service; std::unique_ptr object_manager_service; + object_manager_service.reset(new boost::asio::io_service()); ray::raylet::Raylet server(main_service, std::move(object_manager_service), raylet_socket_name, node_ip_address, redis_address, diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 1b356c366..f69f43244 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -21,7 +21,7 @@ namespace raylet { struct NodeManagerConfig { ResourceSet resource_config; int num_initial_workers; - std::vector worker_command; + std::vector worker_command; uint64_t heartbeat_period_ms; }; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index f1030a1a2..af6c15082 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -69,8 +69,8 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address, client_info.resources_total_capacity.push_back(resource_pair.second); } - RAY_LOG(DEBUG) << "NM LISTENING ON: IP " << client_info.node_manager_address << " PORT " - << client_info.node_manager_port; + RAY_LOG(DEBUG) << "Node manager listening on: IP " << client_info.node_manager_address + << " port " << client_info.node_manager_port; RAY_RETURN_NOT_OK(gcs_client_->client_table().Connect(client_info)); auto node_manager_client_added = [this](gcs::AsyncGcsClient *client, const UniqueID &id, diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index c9551c2c5..666c95e2c 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -8,9 +8,8 @@ namespace ray { namespace raylet { /// A constructor that initializes a worker pool with num_workers workers. -WorkerPool::WorkerPool(int num_workers, const std::vector &worker_command) +WorkerPool::WorkerPool(int num_workers, const std::vector &worker_command) : worker_command_(worker_command) { - worker_command_.push_back(NULL); // Ignore SIGCHLD signals. If we don't do this, then worker processes will // become zombies instead of dying gracefully. signal(SIGCHLD, SIG_IGN); @@ -37,9 +36,17 @@ void WorkerPool::StartWorker() { // Reset the SIGCHLD handler for the worker. signal(SIGCHLD, SIG_DFL); - // Try to execute the worker command. - int rv = execvp(worker_command_[0], (char *const *)worker_command_.data()); + // Extract pointers from the worker command to pass into execvp. + std::vector worker_command_args; + for (auto const &token : worker_command_) { + worker_command_args.push_back(token.c_str()); + } + worker_command_args.push_back(nullptr); + + // Try to execute the worker command. + int rv = execvp(worker_command_args[0], + const_cast(worker_command_args.data())); // The worker failed to start. This is a fatal error. RAY_LOG(FATAL) << "Failed to start worker with return value " << rv; } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 7c76eea11..a9ce9824a 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -25,7 +25,7 @@ class WorkerPool { /// pool. /// /// \param num_workers The number of workers to start. - WorkerPool(int num_workers, const std::vector &worker_command); + WorkerPool(int num_workers, const std::vector &worker_command); /// Destructor responsible for freeing a set of workers owned by this class. ~WorkerPool(); @@ -74,7 +74,7 @@ class WorkerPool { std::shared_ptr PopWorker(); private: - std::vector worker_command_; + std::vector worker_command_; /// The pool of idle workers. std::list> pool_; /// All workers that have registered and are still connected, including both diff --git a/src/ray/test/run_task_test.sh b/src/ray/test/run_task_test.sh deleted file mode 100644 index 56a593a8d..000000000 --- a/src/ray/test/run_task_test.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env bash - -# This needs to be run in the build tree, which is normally ray/python/ray/core - -# Cause the script to exit if a single command fails. -set -e -set -x - -# Tear down the Raylet. -#bash ../../../src/ray/test/stop_raylets.sh - -# Set up a single Raylet. -bash ../../../src/ray/test/start_raylets.sh - -sleep 1 - -# Connect a driver to the raylet and make sure it completes. -python ../../../src/ray/python/test_driver.py /tmp/raylet1 /tmp/store1 - -sleep 1 - -./src/common/thirdparty/redis/src/redis-cli -p 6379 shutdown -bash ../../../src/ray/test/stop_raylets.sh diff --git a/src/ray/test/start_redis.sh b/src/ray/test/start_redis.sh deleted file mode 100644 index 0b1c26ed9..000000000 --- a/src/ray/test/start_redis.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash - -# This needs to be run in the build tree, which is normally ray/python/ray/core - -# Cause the script to exit if a single command fails. -set -e - -# Start the GCS. -./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 >/dev/null & -sleep 1s - diff --git a/src/ray/test/stop_raylets.sh b/src/ray/test/stop_raylets.sh deleted file mode 100644 index 8387430ef..000000000 --- a/src/ray/test/stop_raylets.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash - -killall raylet -sleep 1 -killall plasma_store -sleep 1 -killall redis-server -sleep 1 -rm /tmp/store* /tmp/raylet* diff --git a/test/xray_test.py b/test/xray_test.py new file mode 100644 index 000000000..7e5fc9699 --- /dev/null +++ b/test/xray_test.py @@ -0,0 +1,49 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import ray + + +@pytest.fixture +def ray_start(): + # Start the Ray processes. + ray.init(num_cpus=1, use_raylet=True) + yield None + # The code after the yield will run as teardown code. + ray.worker.cleanup() + + +def test_basic_task_api(ray_start): + + # Test a simple function. + + @ray.remote + def f_simple(): + return 1 + + assert ray.get(f_simple.remote()) == 1 + + # Test multiple return values. + + @ray.remote(num_return_vals=3) + def f_multiple_returns(): + return 1, 2, 3 + + x_id1, x_id2, x_id3 = f_multiple_returns.remote() + assert ray.get([x_id1, x_id2, x_id3]) == [1, 2, 3] + + # Test arguments passed by value. + + @ray.remote + def f_args_by_value(x): + return x + + args = [1, 1.0, "test", b"test", (0, 1), [0, 1], {0: 1}] + for arg in args: + assert ray.get(f_args_by_value.remote(arg)) == arg + + # Test arguments passed by ID. + + # Test keyword arguments.