diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index c9d4ae8ce..538e52652 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -1151,8 +1151,6 @@ class GlobalState(object): worker_id = binary_to_hex(worker_key[len("Workers:"):]) workers_data[worker_id] = { - "local_scheduler_socket": (decode( - worker_info[b"local_scheduler_socket"])), "node_ip_address": decode(worker_info[b"node_ip_address"]), "plasma_store_socket": decode( worker_info[b"plasma_store_socket"]) diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 85fe0b89d..70dba3223 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -110,7 +110,7 @@ class ImportThread(object): run_on_other_drivers) = self.redis_client.hmget( key, ["driver_id", "function", "run_on_other_drivers"]) - if (run_on_other_drivers == "False" + if (utils.decode(run_on_other_drivers) == "False" and self.worker.mode == ray.SCRIPT_MODE and driver_id != self.worker.task_driver_id.id()): return diff --git a/python/ray/services.py b/python/ray/services.py index 981d06160..841fababd 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1162,7 +1162,6 @@ def start_worker(node_ip_address, sys.executable, "-u", worker_path, "--node-ip-address=" + node_ip_address, "--object-store-name=" + object_store_name, - "--local-scheduler-name=" + local_scheduler_name, "--redis-address=" + str(redis_address), "--temp-dir=" + get_temp_root() ] diff --git a/python/ray/worker.py b/python/ray/worker.py index bceaa6aab..f68bb4288 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -713,7 +713,7 @@ class Worker(object): "driver_id": self.task_driver_id.id(), "function_id": function_to_run_id, "function": pickled_function, - "run_on_other_drivers": run_on_other_drivers + "run_on_other_drivers": str(run_on_other_drivers) }) self.redis_client.rpush("Exports", key) # TODO(rkn): If the worker fails after it calls setnx and before it @@ -1446,12 +1446,8 @@ def _init(address_info=None, # Use 1 local scheduler if num_local_schedulers is not provided. If # existing local schedulers are provided, use that count as # num_local_schedulers. - local_schedulers = address_info.get("local_scheduler_socket_names", []) if num_local_schedulers is None: - if len(local_schedulers) > 0: - num_local_schedulers = len(local_schedulers) - else: - num_local_schedulers = 1 + num_local_schedulers = 1 # Use 1 additional redis shard if num_redis_shards is not provided. num_redis_shards = 1 if num_redis_shards is None else num_redis_shards @@ -2013,13 +2009,13 @@ def connect(info, "driver_id": worker.worker_id, "start_time": time.time(), "plasma_store_socket": info["store_socket_name"], - "local_scheduler_socket": info.get("local_scheduler_socket_name"), "raylet_socket": info.get("raylet_socket_name") } driver_info["name"] = (main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE") worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info) - if not worker.redis_client.exists("webui"): + if (not worker.redis_client.exists("webui") + and info["webui_url"] is not None): worker.redis_client.hmset("webui", {"url": info["webui_url"]}) is_worker = False elif mode == WORKER_MODE: @@ -2027,7 +2023,6 @@ def connect(info, worker_dict = { "node_ip_address": worker.node_ip_address, "plasma_store_socket": info["store_socket_name"], - "local_scheduler_socket": info["local_scheduler_socket_name"] } if redirect_worker_output: worker_dict["stdout_file"] = os.path.abspath(log_stdout_file.name) @@ -2041,7 +2036,7 @@ def connect(info, worker.plasma_client = thread_safe_client( plasma.connect(info["store_socket_name"], "", 64)) - local_scheduler_socket = info["raylet_socket_name"] + raylet_socket = info["raylet_socket_name"] # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. @@ -2100,8 +2095,7 @@ def connect(info, worker.multithreading_warned = False worker.local_scheduler_client = ray.raylet.LocalSchedulerClient( - local_scheduler_socket, worker.worker_id, is_worker, - worker.current_task_id) + raylet_socket, worker.worker_id, is_worker, worker.current_task_id) # Start the import thread import_thread.ImportThread(worker, mode).start() diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 4ec9e4d14..b9c9500e7 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -40,11 +40,6 @@ parser.add_argument( required=False, type=str, help="the object store manager's name") -parser.add_argument( - "--local-scheduler-name", - required=False, - type=str, - help="the local scheduler's name") parser.add_argument( "--raylet-name", required=False, type=str, help="the raylet's name") parser.add_argument( @@ -76,7 +71,6 @@ if __name__ == "__main__": "redis_password": args.redis_password, "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, - "local_scheduler_socket_name": args.local_scheduler_name, "raylet_socket_name": args.raylet_name } diff --git a/python/setup.py b/python/setup.py index 11c434b29..7198de232 100644 --- a/python/setup.py +++ b/python/setup.py @@ -138,7 +138,7 @@ requires = [ "colorama", "pytest", "pyyaml", - "redis~=2.10.6", + "redis", "setproctitle", # The six module is required by pyarrow. "six >= 1.0.0", diff --git a/test/runtest.py b/test/runtest.py index cea970905..767960a16 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2326,7 +2326,6 @@ def test_workers(shutdown_only): assert len(worker_info) >= num_workers for worker_id, info in worker_info.items(): assert "node_ip_address" in info - assert "local_scheduler_socket" in info assert "plasma_store_socket" in info assert "stderr_file" in info assert "stdout_file" in info