Remove unnecessary pip installs. (#21)

* Small cleanups in worker.py.

* Remove dependencies on subprocess32, graphviz, protobuf, and ipython.

* Retry starting the plasma manager if the port is in use.

* Whitespace

* Move start_plasma_manager into plasma.py.
This commit is contained in:
Robert Nishihara
2016-11-02 16:40:37 -07:00
committed by Philipp Moritz
parent 072f442c1f
commit 681ec570ba
8 changed files with 66 additions and 55 deletions
+2 -8
View File
@@ -9,6 +9,7 @@ import random
# Ray modules
import config
import plasma
# all_processes is a list of the scheduler, object store, and worker processes
# that have been started by this services module if Ray is being used in local
@@ -80,15 +81,8 @@ def start_objstore(node_ip_address, redis_address, cleanup):
store_name = "/tmp/ray_plasma_store{}".format(random_name())
p1 = subprocess.Popen([plasma_store_executable, "-s", store_name])
plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_manager")
manager_name = "/tmp/ray_plasma_manager{}".format(random_name())
manager_port = new_port()
p2 = subprocess.Popen([plasma_manager_executable,
"-s", store_name,
"-m", manager_name,
"-h", node_ip_address,
"-p", str(manager_port),
"-r", redis_address])
p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address)
if cleanup:
all_processes.append(p1)
+3 -6
View File
@@ -1086,8 +1086,7 @@ def main_loop(worker=global_worker):
# We record the traceback and notify the scheduler.
traceback_str = format_error_message(traceback.format_exc())
error_key = "ReusableVariableReinitializeError:{}".format(random_string())
worker.redis_client.hmset(error_key, {"task_instance_id": "NOTIMPLEMENTED",
"task_id": "NOTIMPLEMENTED",
worker.redis_client.hmset(error_key, {"task_id": "NOTIMPLEMENTED",
"function_id": function_id.id(),
"function_name": function_name,
"message": traceback_str})
@@ -1151,7 +1150,6 @@ def _export_reusable_variable(name, reusable, worker=global_worker):
"""
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
raise Exception("_export_reusable_variable can only be called on a driver.")
reusable_variable_id = name
key = "ReusableVariables:{}".format(reusable_variable_id)
worker.redis_client.hmset(key, {"name": name,
@@ -1161,11 +1159,10 @@ def _export_reusable_variable(name, reusable, worker=global_worker):
worker.driver_export_counter += 1
def export_remote_function(function_id, func_name, func, num_return_vals, worker=global_worker):
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
raise Exception("export_remote_function can only be called on a driver.")
key = "RemoteFunction:{}".format(function_id.id())
worker.num_return_vals[function_id.id()] = num_return_vals
pickled_func = pickling.dumps(func)
worker.redis_client.hmset(key, {"function_id": function_id.id(),
"name": func_name,