Remove legacy Ray code. (#3121)

* Remove legacy Ray code.

* Fix cmake and simplify monitor.

* Fix linting

* Updates

* Fix

* Implement some methods.

* Remove more plasma manager references.

* Fix

* Linting

* Fix

* Fix

* Make sure class IDs are strings.

* Some path fixes

* Fix

* Path fixes and update arrow

* Fixes.

* linting

* Fixes

* Java fixes

* Some java fixes

* TaskLanguage -> Language

* Minor

* Fix python test and remove unused method signature.

* Fix java tests

* Fix jenkins tests

* Remove commented out code.
This commit is contained in:
Robert Nishihara
2018-10-26 13:36:58 -07:00
committed by Philipp Moritz
parent 055daf17a0
commit 658c14282c
289 changed files with 2460 additions and 40708 deletions
+89 -306
View File
@@ -27,14 +27,13 @@ import ray.serialization as serialization
import ray.services as services
import ray.signature
import ray.tempfile_services as tempfile_services
import ray.local_scheduler
import ray.raylet
import ray.plasma
import ray.ray_constants as ray_constants
from ray import import_thread
from ray import profiling
from ray.function_manager import FunctionActorManager
from ray.utils import (
binary_to_hex,
check_oversized_pickle,
is_cython,
random_string,
@@ -56,14 +55,6 @@ NIL_ACTOR_ID = NIL_ID
NIL_ACTOR_HANDLE_ID = NIL_ID
NIL_CLIENT_ID = ray_constants.ID_SIZE * b"\xff"
# This must be kept in sync with the `error_types` array in
# common/state/error_table.h.
OBJECT_HASH_MISMATCH_ERROR_TYPE = b"object_hash_mismatch"
PUT_RECONSTRUCTION_ERROR_TYPE = b"put_reconstruction"
# This must be kept in sync with the `scheduling_state` enum in common/task.h.
TASK_STATUS_RUNNING = 8
# Default resource requirements for actors when no resource requirements are
# specified.
DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE = 1
@@ -461,13 +452,9 @@ class Worker(object):
]
for i in range(0, len(object_ids),
ray._config.worker_fetch_request_size()):
if not self.use_raylet:
self.plasma_client.fetch(plain_object_ids[i:(
i + ray._config.worker_fetch_request_size())])
else:
self.local_scheduler_client.reconstruct_objects(
object_ids[i:(
i + ray._config.worker_fetch_request_size())], True)
self.local_scheduler_client.reconstruct_objects(
object_ids[i:(i + ray._config.worker_fetch_request_size())],
True)
# Get the objects. We initially try to get the objects immediately.
final_results = self.retrieve_and_deserialize(plain_object_ids, 0)
@@ -497,25 +484,9 @@ class Worker(object):
ray._config.worker_fetch_request_size())
for i in range(0, len(object_ids_to_fetch),
fetch_request_size):
if not self.use_raylet:
for unready_id in ray_object_ids_to_fetch[i:(
i + fetch_request_size)]:
(self.local_scheduler_client.
reconstruct_objects([unready_id], False))
# Do another fetch for objects that aren't
# available locally yet, in case they were evicted
# since the last fetch. We divide the fetch into
# smaller fetches so as to not block the manager
# for a prolonged period of time in a single call.
# This is only necessary for legacy ray since
# reconstruction and fetch are implemented by
# different processes.
self.plasma_client.fetch(object_ids_to_fetch[i:(
i + fetch_request_size)])
else:
self.local_scheduler_client.reconstruct_objects(
ray_object_ids_to_fetch[i:(
i + fetch_request_size)], False)
self.local_scheduler_client.reconstruct_objects(
ray_object_ids_to_fetch[i:(
i + fetch_request_size)], False)
results = self.retrieve_and_deserialize(
object_ids_to_fetch,
max([
@@ -608,7 +579,7 @@ class Worker(object):
for arg in args:
if isinstance(arg, ray.ObjectID):
args_for_local_scheduler.append(arg)
elif ray.local_scheduler.check_simple_value(arg):
elif ray.raylet.check_simple_value(arg):
args_for_local_scheduler.append(arg)
else:
args_for_local_scheduler.append(put(arg))
@@ -641,14 +612,13 @@ class Worker(object):
task_index = self.task_index
self.task_index += 1
# Submit the task to local scheduler.
task = ray.local_scheduler.Task(
task = ray.raylet.Task(
driver_id, ray.ObjectID(
function_id.id()), args_for_local_scheduler,
num_return_vals, self.current_task_id, task_index,
actor_creation_id, actor_creation_dummy_object_id, actor_id,
actor_handle_id, actor_counter, is_actor_checkpoint_method,
execution_dependencies, resources, placement_resources,
self.use_raylet)
actor_handle_id, actor_counter, execution_dependencies,
resources, placement_resources)
self.local_scheduler_client.submit(task)
return task.returns()
@@ -925,26 +895,13 @@ class Worker(object):
# good to know where the system is hanging.
with self.lock:
function_name = execution_info.function_name
if not self.use_raylet:
extra_data = {
"function_name": function_name,
"task_id": task.task_id().hex(),
"worker_id": binary_to_hex(self.worker_id)
}
else:
extra_data = {
"name": function_name,
"task_id": task.task_id().hex()
}
extra_data = {
"name": function_name,
"task_id": task.task_id().hex()
}
with profiling.profile("task", extra_data=extra_data, worker=self):
self._process_task(task, execution_info)
# In the non-raylet code path, push all of the log events to the global
# state store. In the raylet code path, this is done periodically in a
# background thread.
if not self.use_raylet:
self.profiler.flush_profile_data()
# Increase the task execution counter.
self.function_actor_manager.increase_task_counter(
driver_id, function_id.id())
@@ -998,13 +955,10 @@ def get_gpu_ids():
raise Exception("ray.get_gpu_ids() currently does not work in PYTHON "
"MODE.")
if not global_worker.use_raylet:
assigned_ids = global_worker.local_scheduler_client.gpu_ids()
else:
all_resource_ids = global_worker.local_scheduler_client.resource_ids()
assigned_ids = [
resource_id for resource_id, _ in all_resource_ids.get("GPU", [])
]
all_resource_ids = global_worker.local_scheduler_client.resource_ids()
assigned_ids = [
resource_id for resource_id, _ in all_resource_ids.get("GPU", [])
]
# If the user had already set CUDA_VISIBLE_DEVICES, then respect that (in
# the sense that only GPU IDs that appear in CUDA_VISIBLE_DEVICES should be
# returned).
@@ -1019,17 +973,11 @@ def get_gpu_ids():
def get_resource_ids():
"""Get the IDs of the resources that are available to the worker.
This function is only supported in the raylet code path.
Returns:
A dictionary mapping the name of a resource to a list of pairs, where
each pair consists of the ID of a resource and the fraction of that
resource reserved for this worker.
"""
if not global_worker.use_raylet:
raise Exception("ray.get_resource_ids() is only supported in the "
"raylet code path.")
if _mode() == LOCAL_MODE:
raise Exception(
"ray.get_resource_ids() currently does not work in PYTHON "
@@ -1112,22 +1060,8 @@ def error_applies_to_driver(error_key, worker=global_worker):
def error_info(worker=global_worker):
"""Return information about failed tasks."""
worker.check_connected()
if worker.use_raylet:
return (global_state.error_messages(job_id=worker.task_driver_id) +
global_state.error_messages(job_id=ray_constants.NIL_JOB_ID))
error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1)
errors = []
for error_key in error_keys:
if error_applies_to_driver(error_key, worker=worker):
error_contents = worker.redis_client.hgetall(error_key)
error_contents = {
"type": ray.utils.decode(error_contents[b"type"]),
"message": ray.utils.decode(error_contents[b"message"]),
"data": ray.utils.decode(error_contents[b"data"])
}
errors.append(error_contents)
return errors
return (global_state.error_messages(job_id=worker.task_driver_id) +
global_state.error_messages(job_id=ray_constants.NIL_JOB_ID))
def _initialize_serialization(driver_id, worker=global_worker):
@@ -1223,7 +1157,6 @@ def _initialize_serialization(driver_id, worker=global_worker):
def get_address_info_from_redis_helper(redis_address,
node_ip_address,
use_raylet=True,
redis_password=None):
redis_ip_address, redis_port = redis_address.split(":")
# For this command to work, some other client (on the same machine as
@@ -1231,118 +1164,50 @@ def get_address_info_from_redis_helper(redis_address,
redis_client = redis.StrictRedis(
host=redis_ip_address, port=int(redis_port), password=redis_password)
if not use_raylet:
# The client table prefix must be kept in sync with the file
# "src/common/redis_module/ray_redis_module.cc" where it is defined.
client_keys = redis_client.keys("{}*".format(
ray.gcs_utils.DB_CLIENT_PREFIX))
# Filter to live clients on the same node and do some basic checking.
plasma_managers = []
local_schedulers = []
for key in client_keys:
info = redis_client.hgetall(key)
# Ignore clients that were deleted.
deleted = info[b"deleted"]
deleted = bool(int(deleted))
if deleted:
continue
assert b"ray_client_id" in info
assert b"node_ip_address" in info
assert b"client_type" in info
client_node_ip_address = ray.utils.decode(info[b"node_ip_address"])
if (client_node_ip_address == node_ip_address or
(client_node_ip_address == "127.0.0.1"
and redis_ip_address == ray.services.get_node_ip_address())):
if ray.utils.decode(info[b"client_type"]) == "plasma_manager":
plasma_managers.append(info)
elif (ray.utils.decode(
info[b"client_type"]) == "local_scheduler"):
local_schedulers.append(info)
# Make sure that we got at least one plasma manager and local
# scheduler.
assert len(plasma_managers) >= 1
assert len(local_schedulers) >= 1
# Build the address information.
object_store_addresses = []
for manager in plasma_managers:
address = ray.utils.decode(manager[b"manager_address"])
port = services.get_port(address)
object_store_addresses.append(
services.ObjectStoreAddress(
name=ray.utils.decode(manager[b"store_socket_name"]),
manager_name=ray.utils.decode(
manager[b"manager_socket_name"]),
manager_port=port))
scheduler_names = [
ray.utils.decode(scheduler[b"local_scheduler_socket_name"])
for scheduler in local_schedulers
]
client_info = {
"node_ip_address": node_ip_address,
"redis_address": redis_address,
"object_store_addresses": object_store_addresses,
"local_scheduler_socket_names": scheduler_names,
# Web UI should be running.
"webui_url": _webui_url_helper(redis_client)
}
return client_info
# Handle the raylet case.
else:
# In the raylet code path, all client data is stored in a zset at the
# key for the nil client.
client_key = b"CLIENT" + NIL_CLIENT_ID
clients = redis_client.zrange(client_key, 0, -1)
raylets = []
for client_message in clients:
client = ray.gcs_utils.ClientTableData.GetRootAsClientTableData(
client_message, 0)
client_node_ip_address = ray.utils.decode(
client.NodeManagerAddress())
if (client_node_ip_address == node_ip_address or
(client_node_ip_address == "127.0.0.1"
and redis_ip_address == ray.services.get_node_ip_address())):
raylets.append(client)
# Make sure that at least one raylet has started locally.
# This handles a race condition where Redis has started but
# the raylet has not connected.
if len(raylets) == 0:
raise Exception(
"Redis has started but no raylets have registered yet.")
object_store_addresses = [
services.ObjectStoreAddress(
name=ray.utils.decode(raylet.ObjectStoreSocketName()),
manager_name=None,
manager_port=None) for raylet in raylets
]
raylet_socket_names = [
ray.utils.decode(raylet.RayletSocketName()) for raylet in raylets
]
return {
"node_ip_address": node_ip_address,
"redis_address": redis_address,
"object_store_addresses": object_store_addresses,
"raylet_socket_names": raylet_socket_names,
# Web UI should be running.
"webui_url": _webui_url_helper(redis_client)
}
# In the raylet code path, all client data is stored in a zset at the
# key for the nil client.
client_key = b"CLIENT" + NIL_CLIENT_ID
clients = redis_client.zrange(client_key, 0, -1)
raylets = []
for client_message in clients:
client = ray.gcs_utils.ClientTableData.GetRootAsClientTableData(
client_message, 0)
client_node_ip_address = ray.utils.decode(client.NodeManagerAddress())
if (client_node_ip_address == node_ip_address or
(client_node_ip_address == "127.0.0.1"
and redis_ip_address == ray.services.get_node_ip_address())):
raylets.append(client)
# Make sure that at least one raylet has started locally.
# This handles a race condition where Redis has started but
# the raylet has not connected.
if len(raylets) == 0:
raise Exception(
"Redis has started but no raylets have registered yet.")
object_store_addresses = [
ray.utils.decode(raylet.ObjectStoreSocketName()) for raylet in raylets
]
raylet_socket_names = [
ray.utils.decode(raylet.RayletSocketName()) for raylet in raylets
]
return {
"node_ip_address": node_ip_address,
"redis_address": redis_address,
"object_store_addresses": object_store_addresses,
"raylet_socket_names": raylet_socket_names,
# Web UI should be running.
"webui_url": _webui_url_helper(redis_client)
}
def get_address_info_from_redis(redis_address,
node_ip_address,
num_retries=5,
use_raylet=True,
redis_password=None):
counter = 0
while True:
try:
return get_address_info_from_redis_helper(
redis_address,
node_ip_address,
use_raylet=use_raylet,
redis_password=redis_password)
redis_address, node_ip_address, redis_password=redis_password)
except Exception:
if counter == num_retries:
raise
@@ -1414,7 +1279,6 @@ def _init(address_info=None,
plasma_directory=None,
huge_pages=False,
include_webui=True,
use_raylet=None,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
@@ -1474,7 +1338,6 @@ def _init(address_info=None,
Store with hugetlbfs support. Requires plasma_directory.
include_webui: Boolean flag indicating whether to start the web
UI, which is a Jupyter notebook.
use_raylet: True if the new raylet code path should be used.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
@@ -1497,16 +1360,6 @@ def _init(address_info=None,
else:
driver_mode = SCRIPT_MODE
if use_raylet is None:
if os.environ.get("RAY_USE_XRAY") == "0":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY' with "
"value {}. This turns OFF xray.".format(
os.environ.get("RAY_USE_XRAY")))
use_raylet = False
else:
use_raylet = True
# Get addresses of existing services.
if address_info is None:
address_info = {}
@@ -1561,7 +1414,6 @@ def _init(address_info=None,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
include_webui=include_webui,
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
@@ -1610,10 +1462,7 @@ def _init(address_info=None,
node_ip_address = services.get_node_ip_address(redis_address)
# Get the address info of the processes to connect to from Redis.
address_info = get_address_info_from_redis(
redis_address,
node_ip_address,
use_raylet=use_raylet,
redis_password=redis_password)
redis_address, node_ip_address, redis_password=redis_password)
# Connect this driver to Redis, the object store, and the local scheduler.
# Choose the first object store and local scheduler if there are multiple.
@@ -1625,18 +1474,11 @@ def _init(address_info=None,
driver_address_info = {
"node_ip_address": node_ip_address,
"redis_address": address_info["redis_address"],
"store_socket_name": (
address_info["object_store_addresses"][0].name),
"store_socket_name": address_info["object_store_addresses"][0],
"webui_url": address_info["webui_url"]
}
if not use_raylet:
driver_address_info["manager_socket_name"] = (
address_info["object_store_addresses"][0].manager_name)
driver_address_info["local_scheduler_socket_name"] = (
address_info["local_scheduler_socket_names"][0])
else:
driver_address_info["raylet_socket_name"] = (
address_info["raylet_socket_names"][0])
driver_address_info["raylet_socket_name"] = (
address_info["raylet_socket_names"][0])
# We only pass `temp_dir` to a worker (WORKER_MODE).
# It can't be a worker here.
@@ -1645,7 +1487,6 @@ def _init(address_info=None,
object_id_seed=object_id_seed,
mode=driver_mode,
worker=global_worker,
use_raylet=use_raylet,
redis_password=redis_password)
return address_info
@@ -1669,7 +1510,6 @@ def init(redis_address=None,
plasma_directory=None,
huge_pages=False,
include_webui=True,
use_raylet=None,
configure_logging=True,
logging_level=logging.INFO,
logging_format=ray_constants.LOGGER_FORMAT,
@@ -1736,7 +1576,6 @@ def init(redis_address=None,
Store with hugetlbfs support. Requires plasma_directory.
include_webui: Boolean flag indicating whether to start the web
UI, which is a Jupyter notebook.
use_raylet: True if the new raylet code path should be used.
configure_logging: True if allow the logging cofiguration here.
Otherwise, the users may want to configure it by their own.
logging_level: Logging level, default will be loging.INFO.
@@ -1767,22 +1606,6 @@ def init(redis_address=None,
else:
raise Exception("Perhaps you called ray.init twice by accident?")
if use_raylet is None:
if os.environ.get("RAY_USE_XRAY") == "0":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY' with "
"value {}. This turns OFF xray.".format(
os.environ.get("RAY_USE_XRAY")))
use_raylet = False
else:
use_raylet = True
if not use_raylet and redis_password is not None:
raise Exception("Setting the 'redis_password' argument is not "
"supported in legacy Ray. To run Ray with "
"password-protected Redis ports, set "
"'use_raylet=True'.")
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
@@ -1809,7 +1632,6 @@ def init(redis_address=None,
huge_pages=huge_pages,
include_webui=include_webui,
object_store_memory=object_store_memory,
use_raylet=use_raylet,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
@@ -1887,9 +1709,6 @@ def print_error_messages_raylet(worker):
This runs in a separate thread on the driver and prints error messages in
the background.
"""
if not worker.use_raylet:
raise Exception("This function is specific to the raylet code path.")
worker.error_message_pubsub_client = worker.redis_client.pubsub(
ignore_subscribe_messages=True)
# Exports that are published after the call to
@@ -2004,7 +1823,6 @@ def connect(info,
object_id_seed=None,
mode=WORKER_MODE,
worker=global_worker,
use_raylet=True,
redis_password=None):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
@@ -2015,7 +1833,6 @@ def connect(info,
deterministic.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and
LOCAL_MODE.
use_raylet: True if the new raylet code path should be used.
redis_password (str): Prevents external clients without the password
from connecting to Redis if provided.
"""
@@ -2038,7 +1855,6 @@ def connect(info,
worker.actor_id = NIL_ACTOR_ID
worker.connected = True
worker.set_mode(mode)
worker.use_raylet = use_raylet
# If running Ray in LOCAL_MODE, there is no need to create call
# create_worker or to start the worker service.
@@ -2067,7 +1883,6 @@ def connect(info,
traceback_str = traceback.format_exc()
ray.utils.push_error_to_driver_through_redis(
worker.redis_client,
worker.use_raylet,
ray_constants.VERSION_MISMATCH_PUSH_ERROR,
traceback_str,
driver_id=None)
@@ -2108,7 +1923,6 @@ def connect(info,
"driver_id": worker.worker_id,
"start_time": time.time(),
"plasma_store_socket": info["store_socket_name"],
"plasma_manager_socket": info.get("manager_socket_name"),
"local_scheduler_socket": info.get("local_scheduler_socket_name"),
"raylet_socket": info.get("raylet_socket_name")
}
@@ -2123,7 +1937,6 @@ def connect(info,
worker_dict = {
"node_ip_address": worker.node_ip_address,
"plasma_store_socket": info["store_socket_name"],
"plasma_manager_socket": info["manager_socket_name"],
"local_scheduler_socket": info["local_scheduler_socket_name"]
}
if redirect_worker_output:
@@ -2135,18 +1948,10 @@ def connect(info,
raise Exception("This code should be unreachable.")
# Create an object store client.
if not worker.use_raylet:
worker.plasma_client = thread_safe_client(
plasma.connect(info["store_socket_name"],
info["manager_socket_name"], 64))
else:
worker.plasma_client = thread_safe_client(
plasma.connect(info["store_socket_name"], "", 64))
worker.plasma_client = thread_safe_client(
plasma.connect(info["store_socket_name"], "", 64))
if not worker.use_raylet:
local_scheduler_socket = info["local_scheduler_socket_name"]
else:
local_scheduler_socket = info["raylet_socket_name"]
local_scheduler_socket = info["raylet_socket_name"]
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.
@@ -2177,28 +1982,22 @@ def connect(info,
# rerun the driver.
nil_actor_counter = 0
driver_task = ray.local_scheduler.Task(
worker.task_driver_id, ray.ObjectID(NIL_FUNCTION_ID), [], 0,
worker.current_task_id, worker.task_index,
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
nil_actor_counter, False, [], {"CPU": 0}, {}, worker.use_raylet)
driver_task = ray.raylet.Task(worker.task_driver_id,
ray.ObjectID(NIL_FUNCTION_ID), [], 0,
worker.current_task_id,
worker.task_index,
ray.ObjectID(NIL_ACTOR_ID),
ray.ObjectID(NIL_ACTOR_ID),
ray.ObjectID(NIL_ACTOR_ID),
ray.ObjectID(NIL_ACTOR_ID),
nil_actor_counter, [], {"CPU": 0}, {})
# Add the driver task to the task table.
if not worker.use_raylet:
global_state._execute_command(
driver_task.task_id(), "RAY.TASK_TABLE_ADD",
driver_task.task_id().id(), TASK_STATUS_RUNNING,
NIL_LOCAL_SCHEDULER_ID,
driver_task.execution_dependencies_string(), 0,
ray.local_scheduler.task_to_string(driver_task))
else:
global_state._execute_command(
driver_task.task_id(), "RAY.TABLE_ADD",
ray.gcs_utils.TablePrefix.RAYLET_TASK,
ray.gcs_utils.TablePubsub.RAYLET_TASK,
driver_task.task_id().id(),
driver_task._serialized_raylet_task())
global_state._execute_command(driver_task.task_id(), "RAY.TABLE_ADD",
ray.gcs_utils.TablePrefix.RAYLET_TASK,
ray.gcs_utils.TablePubsub.RAYLET_TASK,
driver_task.task_id().id(),
driver_task._serialized_raylet_task())
# Set the driver's current task ID to the task ID assigned to the
# driver task.
@@ -2207,9 +2006,9 @@ def connect(info,
# A non-driver worker begins without an assigned task.
worker.current_task_id = ray.ObjectID(NIL_ID)
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(
worker.local_scheduler_client = ray.raylet.LocalSchedulerClient(
local_scheduler_socket, worker.worker_id, is_worker,
worker.current_task_id, worker.use_raylet)
worker.current_task_id)
# Start the import thread
import_thread.ImportThread(worker, mode).start()
@@ -2221,16 +2020,10 @@ def connect(info,
# temporarily using this implementation which constantly queries the
# scheduler for new error messages.
if mode == SCRIPT_MODE:
if not worker.use_raylet:
t = threading.Thread(
target=print_error_messages,
name="ray_print_error_messages",
args=(worker, ))
else:
t = threading.Thread(
target=print_error_messages_raylet,
name="ray_print_error_messages",
args=(worker, ))
t = threading.Thread(
target=print_error_messages_raylet,
name="ray_print_error_messages",
args=(worker, ))
# Making the thread a daemon causes it to exit when the main thread
# exits.
t.daemon = True
@@ -2238,7 +2031,7 @@ def connect(info,
# If we are using the raylet code path and we are not in local mode, start
# a background thread to periodically flush profiling data to the GCS.
if mode != LOCAL_MODE and worker.use_raylet:
if mode != LOCAL_MODE:
worker.profiler.start_flush_thread()
if mode == SCRIPT_MODE:
@@ -2395,6 +2188,9 @@ def register_custom_serializer(cls,
# worker and not across workers.
class_id = random_string()
# Make sure class_id is a string.
class_id = ray.utils.binary_to_hex(class_id)
if driver_id is None:
driver_id_bytes = worker.task_driver_id.id()
else:
@@ -2481,7 +2277,7 @@ def put(value, worker=global_worker):
# In LOCAL_MODE, ray.put is the identity operation.
return value
object_id = worker.local_scheduler_client.compute_put_id(
worker.current_task_id, worker.put_index, worker.use_raylet)
worker.current_task_id, worker.put_index)
worker.put_object(object_id, value)
worker.put_index += 1
return object_id
@@ -2554,21 +2350,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
raise Exception("num_returns cannot be greater than the number "
"of objects provided to ray.wait.")
timeout = timeout if timeout is not None else 2**30
if worker.use_raylet:
ready_ids, remaining_ids = worker.local_scheduler_client.wait(
object_ids, num_returns, timeout, False)
else:
object_id_strs = [
plasma.ObjectID(object_id.id()) for object_id in object_ids
]
ready_ids, remaining_ids = worker.plasma_client.wait(
object_id_strs, timeout, num_returns)
ready_ids = [
ray.ObjectID(object_id.binary()) for object_id in ready_ids
]
remaining_ids = [
ray.ObjectID(object_id.binary()) for object_id in remaining_ids
]
ready_ids, remaining_ids = worker.local_scheduler_client.wait(
object_ids, num_returns, timeout, False)
return ready_ids, remaining_ids