Only install ray python packages. (#330)

* Only install ray python packages.

* Add some __init__.py files.

* Install Ray before building documentation.

* Fix install-ray.sh.

* Fix.
This commit is contained in:
Robert Nishihara
2017-03-01 23:34:44 -08:00
committed by Philipp Moritz
parent 39b7abefc5
commit 6a4bde54dc
39 changed files with 79 additions and 78 deletions
+1 -1
View File
@@ -6,10 +6,10 @@ import hashlib
import inspect
import json
import numpy as np
import local_scheduler
import random
import traceback
import ray.local_scheduler as local_scheduler
import ray.pickling as pickling
import ray.worker
import ray.experimental.state as state
View File
+303
View File
@@ -0,0 +1,303 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
import subprocess
import sys
import time
import unittest
import redis
import ray.services
OBJECT_INFO_PREFIX = "OI:"
OBJECT_LOCATION_PREFIX = "OL:"
OBJECT_SUBSCRIBE_PREFIX = "OS:"
TASK_PREFIX = "TT:"
OBJECT_CHANNEL_PREFIX = "OC:"
def integerToAsciiHex(num, numbytes):
retstr = b""
# Support 32 and 64 bit architecture.
assert(numbytes == 4 or numbytes == 8)
for i in range(numbytes):
curbyte = num & 0xff
if sys.version_info >= (3, 0):
retstr += bytes([curbyte])
else:
retstr += chr(curbyte)
num = num >> 8
return retstr
def get_next_message(pubsub_client, timeout_seconds=10):
"""Block until the next message is available on the pubsub channel."""
start_time = time.time()
while True:
message = pubsub_client.get_message()
if message is not None:
return message
time.sleep(0.1)
if time.time() - start_time > timeout_seconds:
raise Exception("Timed out while waiting for next message.")
class TestGlobalStateStore(unittest.TestCase):
def setUp(self):
redis_port, _ = ray.services.start_redis()
self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0)
def tearDown(self):
ray.services.cleanup()
def testInvalidObjectTableAdd(self):
# Check that Redis returns an error when RAY.OBJECT_TABLE_ADD is called with
# the wrong arguments.
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD")
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "hello")
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", "one", "hash2", "manager_id1")
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", 1, "hash2", "manager_id1", "extra argument")
# Check that Redis returns an error when RAY.OBJECT_TABLE_ADD adds an object
# ID that is already present with a different hash.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash2", "manager_id1")
# Check that it is fine if we add the same object ID multiple times with the
# same hash.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 2, "hash1", "manager_id2")
def testObjectTableAddAndLookup(self):
# Try calling RAY.OBJECT_TABLE_LOOKUP with an object ID that has not been
# added yet.
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(response, None)
# Add some managers and try again.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), {b"manager_id1", b"manager_id2"})
# Add a manager that already exists again and try again.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), {b"manager_id1", b"manager_id2"})
# Check that we properly handle NULL characters. In the past, NULL
# characters were handled improperly causing a "hash mismatch" error if two
# object IDs that agreed up to the NULL character were inserted with
# different hashes.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "\x00object_id3", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "\x00object_id4", 1, "hash2", "manager_id1")
# Check that NULL characters in the hash are handled properly.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "\x00hash1", "manager_id1")
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "\x00hash2", "manager_id1")
def testObjectTableAddAndRemove(self):
# Try removing a manager from an object ID that has not been added yet.
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1")
# Try calling RAY.OBJECT_TABLE_LOOKUP with an object ID that has not been
# added yet.
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(response, None)
# Add some managers and try again.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), {b"manager_id1", b"manager_id2"})
# Remove a manager that doesn't exist, and make sure we still have the same set.
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id3")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), {b"manager_id1", b"manager_id2"})
# Remove a manager that does exist. Make sure it gets removed the first
# time and does nothing the second time.
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), {b"manager_id2"})
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), {b"manager_id2"})
# Remove the last manager, and make sure we have an empty set.
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id2")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), set())
# Remove a manager from an empty set, and make sure we now have an empty set.
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id3")
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
self.assertEqual(set(response), set())
def testObjectTableSubscribeToNotifications(self):
data_size = 0xf1f0
p = self.redis.pubsub()
# Subscribe to an object ID.
p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", data_size, "hash1", "manager_id2")
# Receive the acknowledgement message.
self.assertEqual(get_next_message(p)["data"], 1)
# Request a notification and receive the data.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id1")
self.assertEqual(get_next_message(p)["data"], b"object_id1 %s MANAGERS manager_id2"\
%integerToAsciiHex(data_size, 8))
# Request a notification for an object that isn't there. Then add the object
# and receive the data. Only the first call to RAY.OBJECT_TABLE_ADD should
# trigger notifications.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id2", "object_id3")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id2")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id3")
self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1"\
%integerToAsciiHex(data_size, 8))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", data_size, "hash1", "manager_id3")
self.assertEqual(get_next_message(p)["data"], b"object_id2 %s MANAGERS manager_id3"\
%integerToAsciiHex(data_size, 8))
# Request notifications for object_id3 again.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id3")
self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\
%integerToAsciiHex(data_size, 8))
def testResultTableAddAndLookup(self):
# Try looking up something in the result table before anything is added.
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertIsNone(response)
# Adding the object to the object table should have no effect.
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertIsNone(response)
# Add the result to the result table. The lookup now returns the task ID.
task_id = b"task_id1"
self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id1", task_id)
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertEqual(response, task_id)
# Doing it again should still work.
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
self.assertEqual(response, task_id)
# Try another result table lookup. This should succeed.
task_id = b"task_id2"
self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id2", task_id)
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id2")
self.assertEqual(response, task_id)
def testInvalidTaskTableAdd(self):
# Check that Redis returns an error when RAY.TASK_TABLE_ADD is called with
# the wrong arguments.
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.TASK_TABLE_ADD")
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.TASK_TABLE_ADD", "hello")
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id", 3, "node_id")
with self.assertRaises(redis.ResponseError):
# Non-integer scheduling states should not be added.
self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id",
"invalid_state", "node_id", "task_spec")
with self.assertRaises(redis.ResponseError):
# Scheduling states with invalid width should not be added.
self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id", 101,
"node_id", "task_spec")
with self.assertRaises(redis.ResponseError):
# Should not be able to update a non-existent task.
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", 10,
"node_id")
def testTaskTableAddAndLookup(self):
TASK_STATUS_WAITING = 1
TASK_STATUS_SCHEDULED = 2
TASK_STATUS_QUEUED = 4
# Check that task table adds, updates, and lookups work correctly.
task_args = [TASK_STATUS_WAITING, b"node_id", b"task_spec"]
response = self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id",
*task_args)
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(response, task_args)
task_args[0] = TASK_STATUS_SCHEDULED
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", *task_args[:2])
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(response, task_args)
# If the current value, test value, and set value are all the same, the
# update happens, and the response is still the same task.
task_args = [task_args[0]] + task_args
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
# Check that the task entry is still the same.
get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response, task_args[1:])
# If the current value is the same as the test value, and the set value is
# different, the update happens, and the response is the entire task.
task_args[1] = TASK_STATUS_QUEUED
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
# Check that the update happened.
get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response, task_args[1:])
# If the current value is no longer the same as the test value, the
# response is nil.
task_args[1] = TASK_STATUS_WAITING
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, None)
# Check that the update did not happen.
get_response2 = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response2, get_response)
self.assertNotEqual(get_response2, task_args[1:])
# If the test value is a bitmask that matches the current value, the update
# happens.
task_args[0] = TASK_STATUS_SCHEDULED | TASK_STATUS_QUEUED
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, task_args[1:])
# If the test value is a bitmask that does not match the current value, the
# update does not happen.
task_args[1] = TASK_STATUS_SCHEDULED
old_response = response
response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE",
"task_id",
*task_args[:3])
self.assertEqual(response, None)
# Check that the update did not happen.
get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
self.assertEqual(get_response, old_response)
self.assertNotEqual(get_response, task_args[1:])
def testTaskTableSubscribe(self):
scheduling_state = 1
node_id = "node_id"
# Subscribe to the task table.
p = self.redis.pubsub()
p.psubscribe("{prefix}*:*".format(prefix=TASK_PREFIX))
p.psubscribe("{prefix}*:{state: >2}".format(prefix=TASK_PREFIX, state=scheduling_state))
p.psubscribe("{prefix}{node}:*".format(prefix=TASK_PREFIX, node=node_id))
task_args = [b"task_id", scheduling_state, node_id.encode("ascii"), b"task_spec"]
self.redis.execute_command("RAY.TASK_TABLE_ADD", *task_args)
# Receive the acknowledgement message.
self.assertEqual(get_next_message(p)["data"], 1)
self.assertEqual(get_next_message(p)["data"], 2)
self.assertEqual(get_next_message(p)["data"], 3)
# Receive the actual data.
for i in range(3):
message = get_next_message(p)["data"]
message = message.split()
message[1] = int(message[1])
self.assertEqual(message, task_args)
if __name__ == "__main__":
unittest.main(verbosity=2)
+170
View File
@@ -0,0 +1,170 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import pickle
import sys
import unittest
import ray.local_scheduler as local_scheduler
ID_SIZE = 20
def random_object_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_function_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_driver_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_task_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
BASE_SIMPLE_OBJECTS = [
0, 1, 100000, 0.0, 0.5, 0.9, 100000.1, (), [], {},
"", 990 * "h", u"", 990 * u"h"
]
if sys.version_info < (3, 0):
BASE_SIMPLE_OBJECTS += [long(0), long(1), long(100000), long(1 << 100)]
LIST_SIMPLE_OBJECTS = [[obj] for obj in BASE_SIMPLE_OBJECTS]
TUPLE_SIMPLE_OBJECTS = [(obj,) for obj in BASE_SIMPLE_OBJECTS]
DICT_SIMPLE_OBJECTS = [{(): obj} for obj in BASE_SIMPLE_OBJECTS]
SIMPLE_OBJECTS = (BASE_SIMPLE_OBJECTS +
LIST_SIMPLE_OBJECTS +
TUPLE_SIMPLE_OBJECTS +
DICT_SIMPLE_OBJECTS)
# Create some complex objects that cannot be serialized by value in tasks.
l = []
l.append(l)
class Foo(object):
def __init__(self):
pass
BASE_COMPLEX_OBJECTS = [999 * "h", 999 * u"h", l, Foo(), 10 * [10 * [10 * [1]]]]
LIST_COMPLEX_OBJECTS = [[obj] for obj in BASE_COMPLEX_OBJECTS]
TUPLE_COMPLEX_OBJECTS = [(obj,) for obj in BASE_COMPLEX_OBJECTS]
DICT_COMPLEX_OBJECTS = [{(): obj} for obj in BASE_COMPLEX_OBJECTS]
COMPLEX_OBJECTS = (BASE_COMPLEX_OBJECTS +
LIST_COMPLEX_OBJECTS +
TUPLE_COMPLEX_OBJECTS +
DICT_COMPLEX_OBJECTS)
class TestSerialization(unittest.TestCase):
def test_serialize_by_value(self):
for val in SIMPLE_OBJECTS:
self.assertTrue(local_scheduler.check_simple_value(val))
for val in COMPLEX_OBJECTS:
self.assertFalse(local_scheduler.check_simple_value(val))
class TestObjectID(unittest.TestCase):
def test_create_object_id(self):
object_id = random_object_id()
def test_cannot_pickle_object_ids(self):
object_ids = [random_object_id() for _ in range(256)]
def f():
return object_ids
def g(val=object_ids):
return 1
def h():
x = object_ids[0]
return 1
# Make sure that object IDs cannot be pickled (including functions that
# close over object IDs).
self.assertRaises(Exception, lambda : pickling.dumps(object_ids[0]))
self.assertRaises(Exception, lambda : pickling.dumps(object_ids))
self.assertRaises(Exception, lambda : pickling.dumps(f))
self.assertRaises(Exception, lambda : pickling.dumps(g))
self.assertRaises(Exception, lambda : pickling.dumps(h))
def test_equality_comparisons(self):
x1 = local_scheduler.ObjectID(ID_SIZE * b"a")
x2 = local_scheduler.ObjectID(ID_SIZE * b"a")
y1 = local_scheduler.ObjectID(ID_SIZE * b"b")
y2 = local_scheduler.ObjectID(ID_SIZE * b"b")
self.assertEqual(x1, x2)
self.assertEqual(y1, y2)
self.assertNotEqual(x1, y1)
random_strings = [np.random.bytes(ID_SIZE) for _ in range(256)]
object_ids1 = [local_scheduler.ObjectID(random_strings[i]) for i in range(256)]
object_ids2 = [local_scheduler.ObjectID(random_strings[i]) for i in range(256)]
self.assertEqual(len(set(object_ids1)), 256)
self.assertEqual(len(set(object_ids1 + object_ids2)), 256)
self.assertEqual(set(object_ids1), set(object_ids2))
def test_hashability(self):
x = random_object_id()
y = random_object_id()
{x: y}
set([x, y])
class TestTask(unittest.TestCase):
def check_task(self, task, function_id, num_return_vals, args):
self.assertEqual(function_id.id(), task.function_id().id())
retrieved_args = task.arguments()
self.assertEqual(num_return_vals, len(task.returns()))
self.assertEqual(len(args), len(retrieved_args))
for i in range(len(retrieved_args)):
if isinstance(retrieved_args[i], local_scheduler.ObjectID):
self.assertEqual(retrieved_args[i].id(), args[i].id())
else:
self.assertEqual(retrieved_args[i], args[i])
def test_create_and_serialize_task(self):
# TODO(rkn): The function ID should be a FunctionID object, not an ObjectID.
driver_id = random_driver_id()
parent_id = random_task_id()
function_id = random_function_id()
object_ids = [random_object_id() for _ in range(256)]
args_list = [
[],
1 * [1],
10 * [1],
100 * [1],
1000 * [1],
1 * ["a"],
10 * ["a"],
100 * ["a"],
1000 * ["a"],
[1, 1.3, 2, 1 << 100, "hi", u"hi", [1, 2]],
object_ids[:1],
object_ids[:2],
object_ids[:3],
object_ids[:4],
object_ids[:5],
object_ids[:10],
object_ids[:100],
object_ids[:256],
[1, object_ids[0]],
[object_ids[0], "a"],
[1, object_ids[0], "a"],
[object_ids[0], 1, object_ids[1], "a"],
object_ids[:3] + [1, "hi", 2.3] + object_ids[:5],
object_ids + 100 * ["a"] + object_ids
]
for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
task = local_scheduler.Task(driver_id, function_id, args, num_return_vals, parent_id, 0)
self.check_task(task, function_id, num_return_vals, args)
data = local_scheduler.task_to_string(task)
task2 = local_scheduler.task_from_string(data)
self.check_task(task2, function_id, num_return_vals, args)
if __name__ == "__main__":
unittest.main(verbosity=2)
View File
View File
View File
+5
View File
@@ -0,0 +1,5 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from .global_scheduler_services import *
@@ -0,0 +1,47 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import subprocess
import time
def start_global_scheduler(redis_address, use_valgrind=False,
use_profiler=False, stdout_file=None,
stderr_file=None):
"""Start a global scheduler process.
Args:
redis_address (str): The address of the Redis instance.
use_valgrind (bool): True if the global scheduler should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the global scheduler should be started inside a
profiler. If this is True, use_valgrind must be False.
stdout_file: A file handle opened for writing to redirect stdout to. If no
redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If no
redirection should happen, then this should be None.
Return:
The process ID of the global scheduler process.
"""
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
global_scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/global_scheduler/global_scheduler")
command = [global_scheduler_executable, "-r", redis_address]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
elif use_profiler:
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
else:
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
return pid
+284
View File
@@ -0,0 +1,284 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import os
import random
import redis
import signal
import subprocess
import sys
import threading
import time
import unittest
import ray.global_scheduler as global_scheduler
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
from ray.plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object
from ray import services
USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
ID_SIZE = 20
NUM_CLUSTER_NODES = 2
NIL_ACTOR_ID = 20 * b"\xff"
# These constants must match the scheduling state enum in task.h.
TASK_STATUS_WAITING = 1
TASK_STATUS_SCHEDULED = 2
TASK_STATUS_QUEUED = 4
TASK_STATUS_RUNNING = 8
TASK_STATUS_DONE = 16
# These constants are an implementation detail of ray_redis_module.c, so this
# must be kept in sync with that file.
DB_CLIENT_PREFIX = "CL:"
TASK_PREFIX = "TT:"
def random_driver_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_task_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_function_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_object_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def new_port():
return random.randint(10000, 65535)
class TestGlobalScheduler(unittest.TestCase):
def setUp(self):
# Start one Redis server and N pairs of (plasma, local_scheduler)
node_ip_address = "127.0.0.1"
redis_port, self.redis_process = services.start_redis(cleanup=False)
redis_address = services.address(node_ip_address, redis_port)
# Create a Redis client.
self.redis_client = redis.StrictRedis(host=node_ip_address, port=redis_port)
# Start one global scheduler.
self.p1 = global_scheduler.start_global_scheduler(redis_address, use_valgrind=USE_VALGRIND)
self.plasma_store_pids = []
self.plasma_manager_pids = []
self.local_scheduler_pids = []
self.plasma_clients = []
self.local_scheduler_clients = []
for i in range(NUM_CLUSTER_NODES):
# Start the Plasma store. Plasma store name is randomly generated.
plasma_store_name, p2 = plasma.start_plasma_store()
self.plasma_store_pids.append(p2)
# Start the Plasma manager.
# Assumption: Plasma manager name and port are randomly generated by the plasma module.
plasma_manager_name, p3, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address)
self.plasma_manager_pids.append(p3)
plasma_address = "{}:{}".format(node_ip_address, plasma_manager_port)
plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name)
self.plasma_clients.append(plasma_client)
# Start the local scheduler.
local_scheduler_name, p4 = local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name=plasma_manager_name,
plasma_address=plasma_address,
redis_address=redis_address,
static_resource_list=[10, 0])
# Connect to the scheduler.
local_scheduler_client = local_scheduler.LocalSchedulerClient(local_scheduler_name, NIL_ACTOR_ID)
self.local_scheduler_clients.append(local_scheduler_client)
self.local_scheduler_pids.append(p4)
def tearDown(self):
# Check that the processes are still alive.
self.assertEqual(self.p1.poll(), None)
for p2 in self.plasma_store_pids:
self.assertEqual(p2.poll(), None)
for p3 in self.plasma_manager_pids:
self.assertEqual(p3.poll(), None)
for p4 in self.local_scheduler_pids:
self.assertEqual(p4.poll(), None)
self.assertEqual(self.redis_process.poll(), None)
# Kill the global scheduler.
if USE_VALGRIND:
self.p1.send_signal(signal.SIGTERM)
self.p1.wait()
if self.p1.returncode != 0:
os._exit(-1)
else:
self.p1.kill()
# Kill local schedulers, plasma managers, and plasma stores.
for p2 in self.local_scheduler_pids:
p2.kill()
for p3 in self.plasma_manager_pids:
p3.kill()
for p4 in self.plasma_store_pids:
p4.kill()
# Kill Redis. In the event that we are using valgrind, this needs to happen
# after we kill the global scheduler.
self.redis_process.kill()
def get_plasma_manager_id(self):
"""Get the db_client_id with client_type equal to plasma_manager.
Iterates over all the client table keys, gets the db_client_id for the
client with client_type matching plasma_manager. Strips the client table
prefix. TODO(atumanov): write a separate function to get all plasma manager
client IDs.
Returns:
The db_client_id if one is found and otherwise None.
"""
db_client_id = None
client_list = self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))
for client_id in client_list:
response = self.redis_client.hget(client_id, b"client_type")
if response == b"plasma_manager":
db_client_id = client_id
break
return db_client_id
def test_task_default_resources(self):
task1 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0)
self.assertEqual(task1.required_resources(), [1.0, 0.0])
task2 = local_scheduler.Task(random_driver_id(), random_function_id(),
[random_object_id()], 0, random_task_id(), 0,
local_scheduler.ObjectID(NIL_ACTOR_ID), 0, [1.0, 2.0])
self.assertEqual(task2.required_resources(), [1.0, 2.0])
def test_redis_only_single_task(self):
"""
Tests global scheduler functionality by interacting with Redis and checking
task state transitions in Redis only. TODO(atumanov): implement.
"""
# Check precondition for this test:
# There should be 2n+1 db clients: the global scheduler + one local scheduler and one plasma per node.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))),
2 * NUM_CLUSTER_NODES + 1)
db_client_id = self.get_plasma_manager_id()
assert(db_client_id != None)
assert(db_client_id.startswith(b"CL:"))
db_client_id = db_client_id[len(b"CL:"):] # Remove the CL: prefix.
def test_integration_single_task(self):
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))),
2 * NUM_CLUSTER_NODES + 1)
num_return_vals = [0, 1, 2, 3, 5, 10]
# Insert the object into Redis.
data_size = 0xf1f0
metadata_size = 0x40
plasma_client = self.plasma_clients[0]
object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True)
# Sleep before submitting task to local scheduler.
time.sleep(0.1)
# Submit a task to Redis.
task = local_scheduler.Task(random_driver_id(), random_function_id(), [local_scheduler.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.local_scheduler_clients[0].submit(task)
time.sleep(0.1)
# There should now be a task in Redis, and it should get assigned to the
# local scheduler
num_retries = 10
while num_retries > 0:
task_entries = self.redis_client.keys("{}*".format(TASK_PREFIX))
self.assertLessEqual(len(task_entries), 1)
if len(task_entries) == 1:
task_contents = self.redis_client.hgetall(task_entries[0])
task_status = int(task_contents[b"state"])
self.assertTrue(task_status in [TASK_STATUS_WAITING,
TASK_STATUS_SCHEDULED,
TASK_STATUS_QUEUED])
if task_status == TASK_STATUS_QUEUED:
break
else:
print(task_status)
print("The task has not been scheduled yet, trying again.")
num_retries -= 1
time.sleep(1)
if num_retries <= 0 and task_status != TASK_STATUS_QUEUED:
# Failed to submit and schedule a single task -- bail.
self.tearDown()
sys.exit(1)
def integration_many_tasks_helper(self, timesync=True):
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))),
2 * NUM_CLUSTER_NODES + 1)
num_return_vals = [0, 1, 2, 3, 5, 10]
# Submit a bunch of tasks to Redis.
num_tasks = 1000
for _ in range(num_tasks):
# Create a new object for each task.
data_size = np.random.randint(1 << 20)
metadata_size = np.random.randint(1 << 10)
plasma_client = self.plasma_clients[0]
object_dep, memory_buffer, metadata = create_object(plasma_client, data_size, metadata_size, seal=True)
if timesync:
# Give 10ms for object info handler to fire (long enough to yield CPU).
time.sleep(0.010)
task = local_scheduler.Task(random_driver_id(), random_function_id(), [local_scheduler.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.local_scheduler_clients[0].submit(task)
# Check that there are the correct number of tasks in Redis and that they
# all get assigned to the local scheduler.
num_retries = 10
num_tasks_done = 0
while num_retries > 0:
task_entries = self.redis_client.keys("{}*".format(TASK_PREFIX))
self.assertLessEqual(len(task_entries), num_tasks)
# First, check if all tasks made it to Redis.
if len(task_entries) == num_tasks:
task_contents = [self.redis_client.hgetall(task_entries[i]) for i in range(len(task_entries))]
task_statuses = [int(contents[b"state"]) for contents in task_contents]
self.assertTrue(all([
status in [TASK_STATUS_WAITING,
TASK_STATUS_SCHEDULED,
TASK_STATUS_QUEUED] for status in task_statuses
]))
num_tasks_done = task_statuses.count(TASK_STATUS_QUEUED)
num_tasks_scheduled = task_statuses.count(TASK_STATUS_SCHEDULED)
num_tasks_waiting = task_statuses.count(TASK_STATUS_WAITING)
print("tasks in Redis = {}, tasks waiting = {}, tasks scheduled = {}, tasks queued = {}, retries left = {}"
.format(len(task_entries), num_tasks_waiting,
num_tasks_scheduled, num_tasks_done, num_retries))
if all([status == TASK_STATUS_QUEUED for status in task_statuses]):
# We're done, so pass.
break
num_retries -= 1
time.sleep(0.1)
if num_tasks_done != num_tasks:
# At least one of the tasks failed to schedule.
self.tearDown()
sys.exit(2)
def test_integration_many_tasks_handler_sync(self):
self.integration_many_tasks_helper(timesync=True)
def test_integration_many_tasks(self):
# More realistic case: should handle out of order object and task
# notifications.
self.integration_many_tasks_helper(timesync=False)
if __name__ == "__main__":
if len(sys.argv) > 1:
# Pop the argument so we don't mess with unittest's own argument parser.
if sys.argv[-1] == "valgrind":
arg = sys.argv.pop()
USE_VALGRIND = True
print("Using valgrind for tests")
unittest.main(verbosity=2)
+6
View File
@@ -0,0 +1,6 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.core.src.local_scheduler.liblocal_scheduler_library import *
from .local_scheduler_services import *
@@ -0,0 +1,111 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
import subprocess
import time
def random_name():
return str(random.randint(0, 99999999))
def start_local_scheduler(plasma_store_name,
plasma_manager_name=None,
worker_path=None,
plasma_address=None,
node_ip_address="127.0.0.1",
redis_address=None,
use_valgrind=False,
use_profiler=False,
stdout_file=None,
stderr_file=None,
static_resource_list=None,
num_workers=0):
"""Start a local scheduler process.
Args:
plasma_store_name (str): The name of the plasma store socket to connect to.
plasma_manager_name (str): The name of the plasma manager to connect to.
This does not need to be provided, but if it is, then the Redis address
must be provided as well.
worker_path (str): The path of the worker script to use when the local
scheduler starts up new workers.
plasma_address (str): The address of the plasma manager to connect to. This
is only used by the global scheduler to figure out which plasma managers
are connected to which local schedulers.
node_ip_address (str): The address of the node that this local scheduler is
running on.
redis_address (str): The address of the Redis instance to connect to. If
this is not provided, then the local scheduler will not connect to Redis.
use_valgrind (bool): True if the local scheduler should be started inside of
valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the local scheduler should be started inside a
profiler. If this is True, use_valgrind must be False.
stdout_file: A file handle opened for writing to redirect stdout to. If no
redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If no
redirection should happen, then this should be None.
static_resource_list (list): A list of integers specifying the local
scheduler's resource capacities. The resources should appear in an order
matching the order defined in task.h.
num_workers (int): The number of workers that the local scheduler should
start.
Return:
A tuple of the name of the local scheduler socket and the process ID of the
local scheduler process.
"""
if (plasma_manager_name == None) != (redis_address == None):
raise Exception("If one of the plasma_manager_name and the redis_address is provided, then both must be provided.")
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/local_scheduler/local_scheduler")
local_scheduler_name = "/tmp/scheduler{}".format(random_name())
command = [local_scheduler_executable,
"-s", local_scheduler_name,
"-p", plasma_store_name,
"-h", node_ip_address,
"-n", str(num_workers)]
if plasma_manager_name is not None:
command += ["-m", plasma_manager_name]
if worker_path is not None:
assert plasma_store_name is not None
assert plasma_manager_name is not None
assert redis_address is not None
start_worker_command = ("python {} "
"--node-ip-address={} "
"--object-store-name={} "
"--object-store-manager-name={} "
"--local-scheduler-name={} "
"--redis-address={}").format(worker_path,
node_ip_address,
plasma_store_name,
plasma_manager_name,
local_scheduler_name,
redis_address)
command += ["-w", start_worker_command]
if redis_address is not None:
command += ["-r", redis_address]
if plasma_address is not None:
command += ["-a", plasma_address]
if static_resource_list is not None:
assert all([isinstance(resource, int) or isinstance(resource, float) for resource in static_resource_list])
command += ["-c", ",".join([str(resource) for resource in static_resource_list])]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
elif use_profiler:
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
else:
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
return local_scheduler_name, pid
+205
View File
@@ -0,0 +1,205 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import os
import random
import signal
import subprocess
import sys
import threading
import time
import unittest
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
USE_VALGRIND = False
ID_SIZE = 20
NIL_ACTOR_ID = 20 * b"\xff"
def random_object_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_driver_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_task_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
def random_function_id():
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
class TestLocalSchedulerClient(unittest.TestCase):
def setUp(self):
# Start Plasma store.
plasma_store_name, self.p1 = plasma.start_plasma_store()
self.plasma_client = plasma.PlasmaClient(plasma_store_name)
# Start a local scheduler.
scheduler_name, self.p2 = local_scheduler.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND)
# Connect to the scheduler.
self.local_scheduler_client = local_scheduler.LocalSchedulerClient(scheduler_name, NIL_ACTOR_ID)
def tearDown(self):
# Check that the processes are still alive.
self.assertEqual(self.p1.poll(), None)
self.assertEqual(self.p2.poll(), None)
# Kill Plasma.
self.p1.kill()
# Kill the local scheduler.
if USE_VALGRIND:
self.p2.send_signal(signal.SIGTERM)
self.p2.wait()
if self.p2.returncode != 0:
os._exit(-1)
else:
self.p2.kill()
def test_submit_and_get_task(self):
function_id = random_function_id()
object_ids = [random_object_id() for i in range(256)]
# Create and seal the objects in the object store so that we can schedule
# all of the subsequent tasks.
for object_id in object_ids:
self.plasma_client.create(object_id.id(), 0)
self.plasma_client.seal(object_id.id())
# Define some arguments to use for the tasks.
args_list = [
[],
#{},
#(),
1 * [1],
10 * [1],
100 * [1],
1000 * [1],
1 * ["a"],
10 * ["a"],
100 * ["a"],
1000 * ["a"],
[1, 1.3, 1 << 100, "hi", u"hi", [1, 2]],
object_ids[:1],
object_ids[:2],
object_ids[:3],
object_ids[:4],
object_ids[:5],
object_ids[:10],
object_ids[:100],
object_ids[:256],
[1, object_ids[0]],
[object_ids[0], "a"],
[1, object_ids[0], "a"],
[object_ids[0], 1, object_ids[1], "a"],
object_ids[:3] + [1, "hi", 2.3] + object_ids[:5],
object_ids + 100 * ["a"] + object_ids
]
for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
task = local_scheduler.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0)
# Submit a task.
self.local_scheduler_client.submit(task)
# Get the task.
new_task = self.local_scheduler_client.get_task()
self.assertEqual(task.function_id().id(), new_task.function_id().id())
retrieved_args = new_task.arguments()
returns = new_task.returns()
self.assertEqual(len(args), len(retrieved_args))
self.assertEqual(num_return_vals, len(returns))
for i in range(len(retrieved_args)):
if isinstance(args[i], local_scheduler.ObjectID):
self.assertEqual(args[i].id(), retrieved_args[i].id())
else:
self.assertEqual(args[i], retrieved_args[i])
# Submit all of the tasks.
for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
task = local_scheduler.Task(random_driver_id(), function_id, args, num_return_vals, random_task_id(), 0)
self.local_scheduler_client.submit(task)
# Get all of the tasks.
for args in args_list:
for num_return_vals in [0, 1, 2, 3, 5, 10, 100]:
new_task = self.local_scheduler_client.get_task()
def test_scheduling_when_objects_ready(self):
# Create a task and submit it.
object_id = random_object_id()
task = local_scheduler.Task(random_driver_id(), random_function_id(), [object_id], 0, random_task_id(), 0)
self.local_scheduler_client.submit(task)
# Launch a thread to get the task.
def get_task():
self.local_scheduler_client.get_task()
t = threading.Thread(target=get_task)
t.start()
# Sleep to give the thread time to call get_task.
time.sleep(0.1)
# Create and seal the object ID in the object store. This should trigger a
# scheduling event.
self.plasma_client.create(object_id.id(), 0)
self.plasma_client.seal(object_id.id())
# Wait until the thread finishes so that we know the task was scheduled.
t.join()
def test_scheduling_when_objects_evicted(self):
# Create a task with two dependencies and submit it.
object_id1 = random_object_id()
object_id2 = random_object_id()
task = local_scheduler.Task(random_driver_id(), random_function_id(), [object_id1, object_id2], 0, random_task_id(), 0)
self.local_scheduler_client.submit(task)
# Launch a thread to get the task.
def get_task():
self.local_scheduler_client.get_task()
t = threading.Thread(target=get_task)
t.start()
# Make one of the dependencies available.
self.plasma_client.create(object_id1.id(), 1)
self.plasma_client.seal(object_id1.id())
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Force eviction of the first dependency.
num_objects = 4
object_size = plasma.DEFAULT_PLASMA_STORE_MEMORY // num_objects
for i in range(num_objects + 1):
object_id = random_object_id()
self.plasma_client.create(object_id.id(), object_size)
self.plasma_client.seal(object_id.id())
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Check that the first object dependency was evicted.
object1 = self.plasma_client.get([object_id1.id()], timeout_ms=0)
self.assertEqual(object1, [None])
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Create the second dependency.
self.plasma_client.create(object_id2.id(), 1)
self.plasma_client.seal(object_id2.id())
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Create the first dependency again. Both dependencies are now available.
self.plasma_client.create(object_id1.id(), 1)
self.plasma_client.seal(object_id1.id())
# Wait until the thread finishes so that we know the task was scheduled.
t.join()
if __name__ == "__main__":
if len(sys.argv) > 1:
# pop the argument so we don't mess with unittest's own argument parser
if sys.argv[-1] == "valgrind":
arg = sys.argv.pop()
USE_VALGRIND = True
print("Using valgrind for tests")
unittest.main(verbosity=2)
+29
View File
@@ -0,0 +1,29 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# See https://github.com/ray-project/ray/issues/131.
helpful_message = """
If you are using Anaconda, try fixing this problem by running:
conda install libgcc
"""
try:
from ray.core.src.numbuf.libnumbuf import *
except ImportError as e:
if hasattr(e, "msg") and isinstance(e.msg, str) and ("libstdc++" in e.msg or "CXX" in e.msg):
# This code path should be taken with Python 3.
e.msg += helpful_message
elif hasattr(e, "message") and isinstance(e.message, str) and ("libstdc++" in e.message or "CXX" in e.message):
# This code path should be taken with Python 2.
if hasattr(e, "args") and isinstance(e.args, tuple) and len(e.args) == 1 and isinstance(e.args[0], str):
e.args = (e.args[0] + helpful_message,)
else:
if not hasattr(e, "args"):
e.args = ()
elif not isinstance(e.args, tuple):
e.args = (e.args,)
e.args += (helpful_message,)
raise
View File
+5
View File
@@ -0,0 +1,5 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.plasma.plasma import *
+404
View File
@@ -0,0 +1,404 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
import subprocess
import sys
import time
import ray.core.src.plasma.libplasma as libplasma
from ray.core.src.plasma.libplasma import plasma_object_exists_error
from ray.core.src.plasma.libplasma import plasma_out_of_memory_error
PLASMA_ID_SIZE = 20
PLASMA_WAIT_TIMEOUT = 2 ** 30
class PlasmaBuffer(object):
"""This is the type of objects returned by calls to get with a PlasmaClient.
We define our own class instead of directly returning a buffer object so that
we can add a custom destructor which notifies Plasma that the object is no
longer being used, so the memory in the Plasma store backing the object can
potentially be freed.
Attributes:
buffer (buffer): A buffer containing an object in the Plasma store.
plasma_id (PlasmaID): The ID of the object in the buffer.
plasma_client (PlasmaClient): The PlasmaClient that we use to communicate
with the store and manager.
"""
def __init__(self, buff, plasma_id, plasma_client):
"""Initialize a PlasmaBuffer."""
self.buffer = buff
self.plasma_id = plasma_id
self.plasma_client = plasma_client
def __del__(self):
"""Notify Plasma that the object is no longer needed.
If the plasma client has been shut down, then don't do anything.
"""
if self.plasma_client.alive:
libplasma.release(self.plasma_client.conn, self.plasma_id)
def __getitem__(self, index):
"""Read from the PlasmaBuffer as if it were just a regular buffer."""
# We currently don't allow slicing plasma buffers. We should handle this
# better, but it requires some care because the slice may be backed by the
# same memory in the object store, but the original plasma buffer may go out
# of scope causing the memory to no longer be accessible.
assert not isinstance(index, slice)
value = self.buffer[index]
if sys.version_info >= (3, 0) and not isinstance(index, slice):
value = chr(value)
return value
def __setitem__(self, index, value):
"""Write to the PlasmaBuffer as if it were just a regular buffer.
This should fail because the buffer should be read only.
"""
# We currently don't allow slicing plasma buffers. We should handle this
# better, but it requires some care because the slice may be backed by the
# same memory in the object store, but the original plasma buffer may go out
# of scope causing the memory to no longer be accessible.
assert not isinstance(index, slice)
if sys.version_info >= (3, 0) and not isinstance(index, slice):
value = ord(value)
self.buffer[index] = value
def __len__(self):
"""Return the length of the buffer."""
return len(self.buffer)
def buffers_equal(buff1, buff2):
"""Compare two buffers. These buffers may be PlasmaBuffer objects.
This method should only be used in the tests. We implement a special helper
method for doing this because doing comparisons by slicing is much faster, but
we don't want to expose slicing of PlasmaBuffer objects because it currently
is not safe.
"""
buff1_to_compare = buff1.buffer if isinstance(buff1, PlasmaBuffer) else buff1
buff2_to_compare = buff2.buffer if isinstance(buff2, PlasmaBuffer) else buff2
return buff1_to_compare[:] == buff2_to_compare[:]
class PlasmaClient(object):
"""The PlasmaClient is used to interface with a plasma store and a plasma manager.
The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a
buffer, and get a buffer. Buffers are referred to by object IDs, which are
strings.
"""
def __init__(self, store_socket_name, manager_socket_name=None, release_delay=64):
"""Initialize the PlasmaClient.
Args:
store_socket_name (str): Name of the socket the plasma store is listening at.
manager_socket_name (str): Name of the socket the plasma manager is listening at.
"""
self.store_socket_name = store_socket_name
self.manager_socket_name = manager_socket_name
self.alive = True
if manager_socket_name is not None:
self.conn = libplasma.connect(store_socket_name, manager_socket_name, release_delay)
else:
self.conn = libplasma.connect(store_socket_name, "", release_delay)
def shutdown(self):
"""Shutdown the client so that it does not send messages.
If we kill the Plasma store and Plasma manager that this client is connected
to, then we can use this method to prevent the client from trying to send
messages to the killed processes.
"""
if self.alive:
libplasma.disconnect(self.conn)
self.alive = False
def create(self, object_id, size, metadata=None):
"""Create a new buffer in the PlasmaStore for a particular object ID.
The returned buffer is mutable until seal is called.
Args:
object_id (str): A string used to identify an object.
size (int): The size in bytes of the created buffer.
metadata (buffer): An optional buffer encoding whatever metadata the user
wishes to encode.
Raises:
plasma_object_exists_error: This exception is raised if the object could
not be created because there already is an object with the same ID in
the plasma store.
plasma_out_of_memory_error: This exception is raised if the object could
not be created because the plasma store is unable to evict enough
objects to create room for it.
"""
# Turn the metadata into the right type.
metadata = bytearray(b"") if metadata is None else metadata
buff = libplasma.create(self.conn, object_id, size, metadata)
return PlasmaBuffer(buff, object_id, self)
def get(self, object_ids, timeout_ms=-1):
"""Create a buffer from the PlasmaStore based on object ID.
If the object has not been sealed yet, this call will block. The retrieved
buffer is immutable.
Args:
object_ids (List[str]): A list of strings used to identify some objects.
timeout_ms (int): The number of milliseconds that the get call should
block before timing out and returning. Pass -1 if the call should block
and 0 if the call should return immediately.
"""
results = libplasma.get(self.conn, object_ids, timeout_ms)
assert len(object_ids) == len(results)
returns = []
for i in range(len(object_ids)):
if results[i] is None:
returns.append(None)
else:
returns.append(PlasmaBuffer(results[i][0], object_ids[i], self))
return returns
def get_metadata(self, object_ids, timeout_ms=-1):
"""Create a buffer from the PlasmaStore based on object ID.
If the object has not been sealed yet, this call will block until the object
has been sealed. The retrieved buffer is immutable.
Args:
object_ids (List[str]): A list of strings used to identify some objects.
timeout_ms (int): The number of milliseconds that the get call should
block before timing out and returning. Pass -1 if the call should block
and 0 if the call should return immediately.
"""
results = libplasma.get(self.conn, object_ids, timeout_ms)
assert len(object_ids) == len(results)
returns = []
for i in range(len(object_ids)):
if results[i] is None:
returns.append(None)
else:
returns.append(PlasmaBuffer(results[i][1], object_ids[i], self))
return returns
def contains(self, object_id):
"""Check if the object is present and has been sealed in the PlasmaStore.
Args:
object_id (str): A string used to identify an object.
"""
return libplasma.contains(self.conn, object_id)
def hash(self, object_id):
"""Compute the hash of an object in the object store.
Args:
object_id (str): A string used to identify an object.
Returns:
A digest string object's SHA256 hash. If the object isn't in the object
store, the string will have length zero.
"""
return libplasma.hash(self.conn, object_id)
def seal(self, object_id):
"""Seal the buffer in the PlasmaStore for a particular object ID.
Once a buffer has been sealed, the buffer is immutable and can only be
accessed through get.
Args:
object_id (str): A string used to identify an object.
"""
libplasma.seal(self.conn, object_id)
def delete(self, object_id):
"""Delete the buffer in the PlasmaStore for a particular object ID.
Once a buffer has been deleted, the buffer is no longer accessible.
Args:
object_id (str): A string used to identify an object.
"""
libplasma.delete(self.conn, object_id)
def evict(self, num_bytes):
"""Evict some objects until to recover some bytes.
Recover at least num_bytes bytes if possible.
Args:
num_bytes (int): The number of bytes to attempt to recover.
"""
return libplasma.evict(self.conn, num_bytes)
def transfer(self, addr, port, object_id):
"""Transfer local object with id object_id to another plasma instance
Args:
addr (str): IPv4 address of the plasma instance the object is sent to.
port (int): Port number of the plasma instance the object is sent to.
object_id (str): A string used to identify an object.
"""
return libplasma.transfer(self.conn, object_id, addr, port)
def fetch(self, object_ids):
"""Fetch the objects with the given IDs from other plasma manager instances.
Args:
object_ids (List[str]): A list of strings used to identify the objects.
"""
return libplasma.fetch(self.conn, object_ids)
def wait(self, object_ids, timeout=PLASMA_WAIT_TIMEOUT, num_returns=1):
"""Wait until num_returns objects in object_ids are ready.
Currently, the object ID arguments to wait must be unique.
Args:
object_ids (List[str]): List of object IDs to wait for.
timeout (int): Return to the caller after timeout milliseconds.
num_returns (int): We are waiting for this number of objects to be ready.
Returns:
ready_ids, waiting_ids (List[str], List[str]): List of object IDs that
are ready and list of object IDs we might still wait on respectively.
"""
# Check that the object ID arguments are unique. The plasma manager
# currently crashes if given duplicate object IDs.
if len(object_ids) != len(set(object_ids)):
raise Exception("Wait requires a list of unique object IDs.")
ready_ids, waiting_ids = libplasma.wait(self.conn, object_ids, timeout, num_returns)
return ready_ids, list(waiting_ids)
def subscribe(self):
"""Subscribe to notifications about sealed objects."""
self.notification_fd = libplasma.subscribe(self.conn)
def get_next_notification(self):
"""Get the next notification from the notification socket."""
return libplasma.receive_notification(self.notification_fd)
DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9
def random_name():
return str(random.randint(0, 99999999))
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=False, use_profiler=False,
stdout_file=None, stderr_file=None):
"""Start a plasma store process.
Args:
use_valgrind (bool): True if the plasma store should be started inside of
valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside a
profiler. If this is True, use_valgrind must be False.
stdout_file: A file handle opened for writing to redirect stdout to. If no
redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If no
redirection should happen, then this should be None.
Return:
A tuple of the name of the plasma store socket and the process ID of the
plasma store process.
"""
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/plasma/plasma_store")
plasma_store_name = "/tmp/plasma_store{}".format(random_name())
command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
elif use_profiler:
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
else:
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
return plasma_store_name, pid
def new_port():
return random.randint(10000, 65535)
def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1",
plasma_manager_port=None, num_retries=20,
use_valgrind=False, run_profiler=False,
stdout_file=None, stderr_file=None):
"""Start a plasma manager and return the ports it listens on.
Args:
store_name (str): The name of the plasma store socket.
redis_address (str): The address of the Redis server.
node_ip_address (str): The IP address of the node.
plasma_manager_port (int): The port to use for the plasma manager. If this
is not provided, a port will be generated at random.
use_valgrind (bool): True if the Plasma manager should be started inside of
valgrind and False otherwise.
stdout_file: A file handle opened for writing to redirect stdout to. If no
redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If no
redirection should happen, then this should be None.
Returns:
A tuple of the Plasma manager socket name, the process ID of the Plasma
manager process, and the port that the manager is listening on.
Raises:
Exception: An exception is raised if the manager could not be started.
"""
plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/plasma/plasma_manager")
plasma_manager_name = "/tmp/plasma_manager{}".format(random_name())
if plasma_manager_port is not None:
if num_retries != 1:
raise Exception("num_retries must be 1 if port is specified.")
else:
plasma_manager_port = new_port()
process = None
counter = 0
while counter < num_retries:
if counter > 0:
print("Plasma manager failed to start, retrying now.")
command = [plasma_manager_executable,
"-s", store_name,
"-m", plasma_manager_name,
"-h", node_ip_address,
"-p", str(plasma_manager_port),
"-r", redis_address]
if use_valgrind:
process = subprocess.Popen(["valgrind",
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
elif run_profiler:
process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
stdout=stdout_file, stderr=stderr_file)
else:
process = subprocess.Popen(command, stdout=stdout_file,
stderr=stderr_file)
# This sleep is critical. If the plasma_manager fails to start because the
# port is already in use, then we need it to fail within 0.1 seconds.
time.sleep(0.1)
# See if the process has terminated
if process.poll() == None:
return plasma_manager_name, process, plasma_manager_port
# Generate a new port and try again.
plasma_manager_port = new_port()
counter += 1
raise Exception("Couldn't start plasma manager.")
+865
View File
@@ -0,0 +1,865 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import os
import random
import signal
import socket
import struct
import subprocess
import sys
import tempfile
import threading
import time
import unittest
import ray.plasma as plasma
from ray.plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object
from ray import services
USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None):
client1_buff = client1.get([object_id])[0]
client2_buff = client2.get([object_id])[0]
client1_metadata = client1.get_metadata([object_id])[0]
client2_metadata = client2.get_metadata([object_id])[0]
unit_test.assertEqual(len(client1_buff), len(client2_buff))
unit_test.assertEqual(len(client1_metadata), len(client2_metadata))
# Check that the buffers from the two clients are the same.
unit_test.assertTrue(plasma.buffers_equal(client1_buff, client2_buff))
# Check that the metadata buffers from the two clients are the same.
unit_test.assertTrue(plasma.buffers_equal(client1_metadata, client2_metadata))
# If a reference buffer was provided, check that it is the same as well.
if memory_buffer is not None:
unit_test.assertTrue(plasma.buffers_equal(memory_buffer, client1_buff))
# If reference metadata was provided, check that it is the same as well.
if metadata is not None:
unit_test.assertTrue(plasma.buffers_equal(metadata, client1_metadata))
class TestPlasmaClient(unittest.TestCase):
def setUp(self):
# Start Plasma store.
plasma_store_name, self.p = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
# Connect to Plasma.
self.plasma_client = plasma.PlasmaClient(plasma_store_name, None, 64)
# For the eviction test
self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, None, 0)
def tearDown(self):
# Check that the Plasma store is still alive.
self.assertEqual(self.p.poll(), None)
# Kill the plasma store process.
if USE_VALGRIND:
self.p.send_signal(signal.SIGTERM)
self.p.wait()
if self.p.returncode != 0:
os._exit(-1)
else:
self.p.kill()
def test_create(self):
# Create an object id string.
object_id = random_object_id()
# Create a new buffer and write to it.
length = 50
memory_buffer = self.plasma_client.create(object_id, length)
for i in range(length):
memory_buffer[i] = chr(i % 256)
# Seal the object.
self.plasma_client.seal(object_id)
# Get the object.
memory_buffer = self.plasma_client.get([object_id])[0]
for i in range(length):
self.assertEqual(memory_buffer[i], chr(i % 256))
def test_create_with_metadata(self):
for length in range(1000):
# Create an object id string.
object_id = random_object_id()
# Create a random metadata string.
metadata = generate_metadata(length)
# Create a new buffer and write to it.
memory_buffer = self.plasma_client.create(object_id, length, metadata)
for i in range(length):
memory_buffer[i] = chr(i % 256)
# Seal the object.
self.plasma_client.seal(object_id)
# Get the object.
memory_buffer = self.plasma_client.get([object_id])[0]
for i in range(length):
self.assertEqual(memory_buffer[i], chr(i % 256))
# Get the metadata.
metadata_buffer = self.plasma_client.get_metadata([object_id])[0]
self.assertEqual(len(metadata), len(metadata_buffer))
for i in range(len(metadata)):
self.assertEqual(chr(metadata[i]), metadata_buffer[i])
def test_create_existing(self):
# This test is partially used to test the code path in which we create an
# object with an ID that already exists
length = 100
for _ in range(1000):
object_id = random_object_id()
self.plasma_client.create(object_id, length, generate_metadata(length))
try:
val = self.plasma_client.create(object_id, length, generate_metadata(length))
except plasma.plasma_object_exists_error as e:
pass
else:
self.assertTrue(False)
def test_get(self):
num_object_ids = 100
# Test timing out of get with various timeouts.
for timeout in [0, 10, 100, 1000]:
object_ids = [random_object_id() for _ in range(num_object_ids)]
results = self.plasma_client.get(object_ids, timeout_ms=timeout)
self.assertEqual(results, num_object_ids * [None])
data_buffers = []
metadata_buffers = []
for i in range(num_object_ids):
if i % 2 == 0:
data_buffer, metadata_buffer = create_object_with_id(self.plasma_client, object_ids[i], 2000, 2000)
data_buffers.append(data_buffer)
metadata_buffers.append(metadata_buffer)
# Test timing out from some but not all get calls with various timeouts.
for timeout in [0, 10, 100, 1000]:
data_results = self.plasma_client.get(object_ids, timeout_ms=timeout)
metadata_results = self.plasma_client.get(object_ids, timeout_ms=timeout)
for i in range(num_object_ids):
if i % 2 == 0:
self.assertTrue(plasma.buffers_equal(data_buffers[i // 2], data_results[i]))
# TODO(rkn): We should compare the metadata as well. But currently the
# types are different (e.g., memoryview versus bytearray).
# self.assertTrue(plasma.buffers_equal(metadata_buffers[i // 2], metadata_results[i]))
else:
self.assertIsNone(results[i])
def test_store_full(self):
# The store is started with 1GB, so make sure that create throws an
# exception when it is full.
def assert_create_raises_plasma_full(unit_test, size):
partial_size = np.random.randint(size)
try:
_, memory_buffer, _ = create_object(unit_test.plasma_client, partial_size, size - partial_size)
except plasma.plasma_out_of_memory_error as e:
pass
else:
# For some reason the above didn't throw an exception, so fail.
unit_test.assertTrue(False)
# Create a list to keep some of the buffers in scope.
memory_buffers = []
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 8, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 8. Make sure that we can't create an object of
# size 10 ** 8 + 1, but we can create one of size 10 ** 8.
assert_create_raises_plasma_full(self, 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0)
del memory_buffer
_, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0)
del memory_buffer
assert_create_raises_plasma_full(self, 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 7, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 7.
assert_create_raises_plasma_full(self, 10 ** 7 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 6, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 6.
assert_create_raises_plasma_full(self, 10 ** 6 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 5, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 5.
assert_create_raises_plasma_full(self, 10 ** 5 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 4, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 4.
assert_create_raises_plasma_full(self, 10 ** 4 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 3, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 3.
assert_create_raises_plasma_full(self, 10 ** 3 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 2, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 2.
assert_create_raises_plasma_full(self, 10 ** 2 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 1, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 1.
assert_create_raises_plasma_full(self, 10 ** 1 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 9 * 10 ** 0, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 10 ** 0.
assert_create_raises_plasma_full(self, 10 ** 0 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 1, 0)
def test_contains(self):
fake_object_ids = [random_object_id() for _ in range(100)]
real_object_ids = [random_object_id() for _ in range(100)]
for object_id in real_object_ids:
self.assertFalse(self.plasma_client.contains(object_id))
memory_buffer = self.plasma_client.create(object_id, 100)
self.plasma_client.seal(object_id)
self.assertTrue(self.plasma_client.contains(object_id))
for object_id in fake_object_ids:
self.assertFalse(self.plasma_client.contains(object_id))
for object_id in real_object_ids:
self.assertTrue(self.plasma_client.contains(object_id))
def test_hash(self):
# Check the hash of an object that doesn't exist.
object_id1 = random_object_id()
h = self.plasma_client.hash(object_id1)
length = 1000
# Create a random object, and check that the hash function always returns
# the same value.
metadata = generate_metadata(length)
memory_buffer = self.plasma_client.create(object_id1, length, metadata)
for i in range(length):
memory_buffer[i] = chr(i % 256)
self.plasma_client.seal(object_id1)
self.assertEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id1))
# Create a second object with the same value as the first, and check that
# their hashes are equal.
object_id2 = random_object_id()
memory_buffer = self.plasma_client.create(object_id2, length, metadata)
for i in range(length):
memory_buffer[i] = chr(i % 256)
self.plasma_client.seal(object_id2)
self.assertEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id2))
# Create a third object with a different value from the first two, and
# check that its hash is different.
object_id3 = random_object_id()
metadata = generate_metadata(length)
memory_buffer = self.plasma_client.create(object_id3, length, metadata)
for i in range(length):
memory_buffer[i] = chr((i + 1) % 256)
self.plasma_client.seal(object_id3)
self.assertNotEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id3))
# Create a fourth object with the same value as the third, but different
# metadata. Check that its hash is different from any of the previous
# three.
object_id4 = random_object_id()
metadata4 = generate_metadata(length)
memory_buffer = self.plasma_client.create(object_id4, length, metadata4)
for i in range(length):
memory_buffer[i] = chr((i + 1) % 256)
self.plasma_client.seal(object_id4)
self.assertNotEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id4))
self.assertNotEqual(self.plasma_client.hash(object_id3),
self.plasma_client.hash(object_id4))
def test_many_hashes(self):
hashes = []
length = 2 ** 10
for i in range(256):
object_id = random_object_id()
memory_buffer = self.plasma_client.create(object_id, length)
for j in range(length):
memory_buffer[j] = chr(i)
self.plasma_client.seal(object_id)
hashes.append(self.plasma_client.hash(object_id))
# Create objects of varying length. Each pair has two bits different.
for i in range(length):
object_id = random_object_id()
memory_buffer = self.plasma_client.create(object_id, length)
for j in range(length):
memory_buffer[j] = chr(0)
memory_buffer[i] = chr(1)
self.plasma_client.seal(object_id)
hashes.append(self.plasma_client.hash(object_id))
# Create objects of varying length, all with value 0.
for i in range(length):
object_id = random_object_id()
memory_buffer = self.plasma_client.create(object_id, i)
for j in range(i):
memory_buffer[j] = chr(0)
self.plasma_client.seal(object_id)
hashes.append(self.plasma_client.hash(object_id))
# Check that all hashes were unique.
self.assertEqual(len(set(hashes)), 256 + length + length)
# def test_individual_delete(self):
# length = 100
# # Create an object id string.
# object_id = random_object_id()
# # Create a random metadata string.
# metadata = generate_metadata(100)
# # Create a new buffer and write to it.
# memory_buffer = self.plasma_client.create(object_id, length, metadata)
# for i in range(length):
# memory_buffer[i] = chr(i % 256)
# # Seal the object.
# self.plasma_client.seal(object_id)
# # Check that the object is present.
# self.assertTrue(self.plasma_client.contains(object_id))
# # Delete the object.
# self.plasma_client.delete(object_id)
# # Make sure the object is no longer present.
# self.assertFalse(self.plasma_client.contains(object_id))
#
# def test_delete(self):
# # Create some objects.
# object_ids = [random_object_id() for _ in range(100)]
# for object_id in object_ids:
# length = 100
# # Create a random metadata string.
# metadata = generate_metadata(100)
# # Create a new buffer and write to it.
# memory_buffer = self.plasma_client.create(object_id, length, metadata)
# for i in range(length):
# memory_buffer[i] = chr(i % 256)
# # Seal the object.
# self.plasma_client.seal(object_id)
# # Check that the object is present.
# self.assertTrue(self.plasma_client.contains(object_id))
#
# # Delete the objects and make sure they are no longer present.
# for object_id in object_ids:
# # Delete the object.
# self.plasma_client.delete(object_id)
# # Make sure the object is no longer present.
# self.assertFalse(self.plasma_client.contains(object_id))
def test_illegal_functionality(self):
# Create an object id string.
object_id = random_object_id()
# Create a new buffer and write to it.
length = 1000
memory_buffer = self.plasma_client.create(object_id, length)
# Make sure we cannot access memory out of bounds.
self.assertRaises(Exception, lambda : memory_buffer[length])
# Seal the object.
self.plasma_client.seal(object_id)
# This test is commented out because it currently fails.
# # Make sure the object is ready only now.
# def illegal_assignment():
# memory_buffer[0] = chr(0)
# self.assertRaises(Exception, illegal_assignment)
# Get the object.
memory_buffer = self.plasma_client.get([object_id])[0]
# Make sure the object is read only.
def illegal_assignment():
memory_buffer[0] = chr(0)
self.assertRaises(Exception, illegal_assignment)
def test_evict(self):
client = self.plasma_client2
object_id1 = random_object_id()
b1 = client.create(object_id1, 1000)
client.seal(object_id1)
del b1
self.assertEqual(client.evict(1), 1000)
object_id2 = random_object_id()
object_id3 = random_object_id()
b2 = client.create(object_id2, 999)
b3 = client.create(object_id3, 998)
client.seal(object_id3)
del b3
self.assertEqual(client.evict(1000), 998)
object_id4 = random_object_id()
b4 = client.create(object_id4, 997)
client.seal(object_id4)
del b4
client.seal(object_id2)
del b2
self.assertEqual(client.evict(1), 997)
self.assertEqual(client.evict(1), 999)
object_id5 = random_object_id()
object_id6 = random_object_id()
object_id7 = random_object_id()
b5 = client.create(object_id5, 996)
b6 = client.create(object_id6, 995)
b7 = client.create(object_id7, 994)
client.seal(object_id5)
client.seal(object_id6)
client.seal(object_id7)
del b5
del b6
del b7
self.assertEqual(client.evict(2000), 996 + 995 + 994)
def test_subscribe(self):
# Subscribe to notifications from the Plasma Store.
sock = self.plasma_client.subscribe()
for i in [1, 10, 100, 1000, 10000, 100000]:
object_ids = [random_object_id() for _ in range(i)]
metadata_sizes = [np.random.randint(1000) for _ in range(i)]
data_sizes = [np.random.randint(1000) for _ in range(i)]
for j in range(i):
self.plasma_client.create(object_ids[j], size=data_sizes[j],
metadata=bytearray(np.random.bytes(metadata_sizes[j])))
self.plasma_client.seal(object_ids[j])
# Check that we received notifications for all of the objects.
for j in range(i):
recv_objid, recv_dsize, recv_msize = self.plasma_client.get_next_notification()
self.assertEqual(object_ids[j], recv_objid)
self.assertEqual(data_sizes[j], recv_dsize)
self.assertEqual(metadata_sizes[j], recv_msize)
def test_subscribe_deletions(self):
# Subscribe to notifications from the Plasma Store. We use plasma_client2
# to make sure that all used objects will get evicted properly.
sock = self.plasma_client2.subscribe()
for i in [1, 10, 100, 1000, 10000, 100000]:
object_ids = [random_object_id() for _ in range(i)]
# Add 1 to the sizes to make sure we have nonzero object sizes.
metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
data_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
for j in range(i):
x = self.plasma_client2.create(object_ids[j], size=data_sizes[j],
metadata=bytearray(np.random.bytes(metadata_sizes[j])))
self.plasma_client2.seal(object_ids[j])
del x
# Check that we received notifications for creating all of the objects.
for j in range(i):
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
self.assertEqual(object_ids[j], recv_objid)
self.assertEqual(data_sizes[j], recv_dsize)
self.assertEqual(metadata_sizes[j], recv_msize)
# Check that we receive notifications for deleting all objects, as we
# evict them.
for j in range(i):
self.assertEqual(self.plasma_client2.evict(1), data_sizes[j] + metadata_sizes[j])
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
self.assertEqual(object_ids[j], recv_objid)
self.assertEqual(-1, recv_dsize)
self.assertEqual(-1, recv_msize)
# Test multiple deletion notifications. The first 9 object IDs have size 0,
# and the last has a nonzero size. When Plasma evicts 1 byte, it will evict
# all objects, so we should receive deletion notifications for each.
num_object_ids = 10
object_ids = [random_object_id() for _ in range(num_object_ids)]
metadata_sizes = [0] * (num_object_ids - 1)
data_sizes = [0] * (num_object_ids - 1)
metadata_sizes.append(np.random.randint(1000))
data_sizes.append(np.random.randint(1000))
for i in range(num_object_ids):
x = self.plasma_client2.create(object_ids[i], size=data_sizes[i],
metadata=bytearray(np.random.bytes(metadata_sizes[i])))
self.plasma_client2.seal(object_ids[i])
del x
for i in range(num_object_ids):
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
self.assertEqual(object_ids[i], recv_objid)
self.assertEqual(data_sizes[i], recv_dsize)
self.assertEqual(metadata_sizes[i], recv_msize)
self.assertEqual(self.plasma_client2.evict(1), data_sizes[-1] + metadata_sizes[-1])
for i in range(num_object_ids):
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
self.assertEqual(object_ids[i], recv_objid)
self.assertEqual(-1, recv_dsize)
self.assertEqual(-1, recv_msize)
class TestPlasmaManager(unittest.TestCase):
def setUp(self):
# Start two PlasmaStores.
store_name1, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
# Start a Redis server.
redis_address = services.address("127.0.0.1", services.start_redis()[0])
# Start two PlasmaManagers.
manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(store_name1, redis_address, use_valgrind=USE_VALGRIND)
manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(store_name2, redis_address, use_valgrind=USE_VALGRIND)
# Connect two PlasmaClients.
self.client1 = plasma.PlasmaClient(store_name1, manager_name1)
self.client2 = plasma.PlasmaClient(store_name2, manager_name2)
# Store the processes that will be explicitly killed during tearDown so
# that a test case can remove ones that will be killed during the test.
# NOTE: If this specific order is changed, valgrind will fail.
self.processes_to_kill = [self.p4, self.p5, self.p2, self.p3]
def tearDown(self):
# Check that the processes are still alive.
for process in self.processes_to_kill:
self.assertEqual(process.poll(), None)
# Kill the Plasma store and Plasma manager processes.
if USE_VALGRIND:
time.sleep(1) # give processes opportunity to finish work
for process in self.processes_to_kill:
process.send_signal(signal.SIGTERM)
process.wait()
if process.returncode != 0:
print("aborting due to valgrind error")
os._exit(-1)
else:
for process in self.processes_to_kill:
process.kill()
# Clean up the Redis server.
services.cleanup()
def test_fetch(self):
for _ in range(10):
# Create an object.
object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
self.client1.fetch([object_id1])
self.assertEqual(self.client1.contains(object_id1), True)
self.assertEqual(self.client2.contains(object_id1), False)
# Fetch the object from the other plasma manager.
# TODO(rkn): Right now we must wait for the object table to be updated.
while not self.client2.contains(object_id1):
self.client2.fetch([object_id1])
# Compare the two buffers.
assert_get_object_equal(self, self.client1, self.client2, object_id1,
memory_buffer=memory_buffer1, metadata=metadata1)
# Test that we can call fetch on object IDs that don't exist yet.
object_id2 = random_object_id()
self.client1.fetch([object_id2])
self.assertEqual(self.client1.contains(object_id2), False)
memory_buffer2, metadata2 = create_object_with_id(self.client2, object_id2, 2000, 2000)
# # Check that the object has been fetched.
# self.assertEqual(self.client1.contains(object_id2), True)
# Compare the two buffers.
# assert_get_object_equal(self, self.client1, self.client2, object_id2,
# memory_buffer=memory_buffer2, metadata=metadata2)
# Test calling the same fetch request a bunch of times.
object_id3 = random_object_id()
self.assertEqual(self.client1.contains(object_id3), False)
self.assertEqual(self.client2.contains(object_id3), False)
for _ in range(10):
self.client1.fetch([object_id3])
self.client2.fetch([object_id3])
memory_buffer3, metadata3 = create_object_with_id(self.client1, object_id3, 2000, 2000)
for _ in range(10):
self.client1.fetch([object_id3])
self.client2.fetch([object_id3])
#TODO(rkn): Right now we must wait for the object table to be updated.
while not self.client2.contains(object_id3):
self.client2.fetch([object_id3])
assert_get_object_equal(self, self.client1, self.client2, object_id3,
memory_buffer=memory_buffer3, metadata=metadata3)
def test_fetch_multiple(self):
for _ in range(20):
# Create two objects and a third fake one that doesn't exist.
object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
missing_object_id = random_object_id()
object_id2, memory_buffer2, metadata2 = create_object(self.client1, 2000, 2000)
object_ids = [object_id1, missing_object_id, object_id2]
# Fetch the objects from the other plasma store. The second object ID
# should timeout since it does not exist.
# TODO(rkn): Right now we must wait for the object table to be updated.
while (not self.client2.contains(object_id1)) or (not self.client2.contains(object_id2)):
self.client2.fetch(object_ids)
# Compare the buffers of the objects that do exist.
assert_get_object_equal(self, self.client1, self.client2, object_id1,
memory_buffer=memory_buffer1, metadata=metadata1)
assert_get_object_equal(self, self.client1, self.client2, object_id2,
memory_buffer=memory_buffer2, metadata=metadata2)
# Fetch in the other direction. The fake object still does not exist.
self.client1.fetch(object_ids)
assert_get_object_equal(self, self.client2, self.client1, object_id1,
memory_buffer=memory_buffer1, metadata=metadata1)
assert_get_object_equal(self, self.client2, self.client1, object_id2,
memory_buffer=memory_buffer2, metadata=metadata2)
# Check that we can call fetch with duplicated object IDs.
object_id3 = random_object_id()
self.client1.fetch([object_id3, object_id3])
object_id4, memory_buffer4, metadata4 = create_object(self.client1, 2000, 2000)
time.sleep(0.1)
# TODO(rkn): Right now we must wait for the object table to be updated.
while not self.client2.contains(object_id4):
self.client2.fetch([object_id3, object_id3, object_id4, object_id4])
assert_get_object_equal(self, self.client2, self.client1, object_id4,
memory_buffer=memory_buffer4, metadata=metadata4)
def test_wait(self):
# Test timeout.
obj_id0 = random_object_id()
self.client1.wait([obj_id0], timeout=100, num_returns=1)
# If we get here, the test worked.
# Test wait if local objects available.
obj_id1 = random_object_id()
self.client1.create(obj_id1, 1000)
self.client1.seal(obj_id1)
ready, waiting = self.client1.wait([obj_id1], timeout=100, num_returns=1)
self.assertEqual(set(ready), set([obj_id1]))
self.assertEqual(waiting, [])
# Test wait if only one object available and only one object waited for.
obj_id2 = random_object_id()
self.client1.create(obj_id2, 1000)
# Don't seal.
ready, waiting = self.client1.wait([obj_id2, obj_id1], timeout=100, num_returns=1)
self.assertEqual(set(ready), set([obj_id1]))
self.assertEqual(set(waiting), set([obj_id2]))
# Test wait if object is sealed later.
obj_id3 = random_object_id()
def finish():
self.client2.create(obj_id3, 1000)
self.client2.seal(obj_id3)
t = threading.Timer(0.1, finish)
t.start()
ready, waiting = self.client1.wait([obj_id3, obj_id2, obj_id1], timeout=1000, num_returns=2)
self.assertEqual(set(ready), set([obj_id1, obj_id3]))
self.assertEqual(set(waiting), set([obj_id2]))
# Test if the appropriate number of objects is shown if some objects are not ready
ready, waiting = self.client1.wait([obj_id3, obj_id2, obj_id1], 100, 3)
self.assertEqual(set(ready), set([obj_id1, obj_id3]))
self.assertEqual(set(waiting), set([obj_id2]))
# Don't forget to seal obj_id2.
self.client1.seal(obj_id2)
# Test calling wait a bunch of times.
object_ids = []
# TODO(rkn): Increasing n to 100 (or larger) will cause failures. The
# problem appears to be that the number of timers added to the manager event
# loop slow down the manager so much that some of the asynchronous Redis
# commands timeout triggering fatal failure callbacks.
n = 40
for i in range(n * (n + 1) // 2):
if i % 2 == 0:
object_id, _, _ = create_object(self.client1, 200, 200)
else:
object_id, _, _ = create_object(self.client2, 200, 200)
object_ids.append(object_id)
# Try waiting for all of the object IDs on the first client.
waiting = object_ids
retrieved = []
for i in range(1, n + 1):
ready, waiting = self.client1.wait(waiting, timeout=1000, num_returns=i)
self.assertEqual(len(ready), i)
retrieved += ready
self.assertEqual(set(retrieved), set(object_ids))
ready, waiting = self.client1.wait(object_ids, timeout=1000, num_returns=len(object_ids))
self.assertEqual(set(ready), set(object_ids))
self.assertEqual(waiting, [])
# Try waiting for all of the object IDs on the second client.
waiting = object_ids
retrieved = []
for i in range(1, n + 1):
ready, waiting = self.client2.wait(waiting, timeout=1000, num_returns=i)
self.assertEqual(len(ready), i)
retrieved += ready
self.assertEqual(set(retrieved), set(object_ids))
ready, waiting = self.client2.wait(object_ids, timeout=1000, num_returns=len(object_ids))
self.assertEqual(set(ready), set(object_ids))
self.assertEqual(waiting, [])
# Make sure that wait returns when the requested number of object IDs are
# available and does not wait for all object IDs to be available.
object_ids = [random_object_id() for _ in range(9)] + [20 * b'\x00']
object_ids_perm = object_ids[:]
random.shuffle(object_ids_perm)
for i in range(10):
if i % 2 == 0:
create_object_with_id(self.client1, object_ids_perm[i], 2000, 2000)
else:
create_object_with_id(self.client2, object_ids_perm[i], 2000, 2000)
ready, waiting = self.client1.wait(object_ids, num_returns=(i + 1))
self.assertEqual(set(ready), set(object_ids_perm[:(i + 1)]))
self.assertEqual(set(waiting), set(object_ids_perm[(i + 1):]))
def test_transfer(self):
for _ in range(100):
# Create an object.
object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
# Transfer the buffer to the the other PlasmaStore.
self.client1.transfer("127.0.0.1", self.port2, object_id1)
# Compare the two buffers.
assert_get_object_equal(self, self.client1, self.client2, object_id1,
memory_buffer=memory_buffer1, metadata=metadata1)
# # Transfer the buffer again.
# self.client1.transfer("127.0.0.1", self.port2, object_id1)
# # Compare the two buffers.
# assert_get_object_equal(self, self.client1, self.client2, object_id1,
# memory_buffer=memory_buffer1, metadata=metadata1)
# Create an object.
object_id2, memory_buffer2, metadata2 = create_object(self.client2, 20000, 20000)
# Transfer the buffer to the the other PlasmaStore.
self.client2.transfer("127.0.0.1", self.port1, object_id2)
# Compare the two buffers.
assert_get_object_equal(self, self.client1, self.client2, object_id2,
memory_buffer=memory_buffer2, metadata=metadata2)
def test_illegal_put(self):
"""
Test doing a put at the same object ID, but with different object data. The
first put should succeed. The second put should cause the plasma manager to
exit with a fatal error.
"""
if USE_VALGRIND:
# Don't run this test when we are using valgrind because when processes
# die without freeing up their state, valgrind complains.
return
# Create and seal the first object.
length = 1000
object_id = random_object_id()
memory_buffer1 = self.client1.create(object_id, length)
for i in range(length):
memory_buffer1[i] = chr(i % 256)
self.client1.seal(object_id)
# Create and seal the second object. It has all the same data as the first
# object, with one bit flipped.
memory_buffer2 = self.client2.create(object_id, length)
for i in range(length):
j = i
if j == 0:
j += 1
memory_buffer2[i] = chr(j % 256)
self.client2.seal(object_id)
# Make sure that one of the plasma managers exited (the second one to call
# RAY.OBJECT_TABLE_ADD should have exited). In the vast majority of cases,
# this should be p5. However, on Travis, it is frequently p4.
time_left = 100
while time_left > 0:
if self.p5.poll() != None:
self.processes_to_kill.remove(self.p5)
break
if self.p4.poll() != None:
self.processes_to_kill.remove(self.p4)
break
time_left -= 0.1
time.sleep(0.1)
print("Time waiting for plasma manager to fail = {:.2}".format(100 - time_left))
# Check that exactly one of the plasma managers has died.
self.assertEqual([self.p5.poll(), self.p4.poll()].count(None), 1)
def test_illegal_functionality(self):
# Create an object id string.
object_id = random_object_id()
# Create a new buffer.
# memory_buffer = self.client1.create(object_id, 20000)
# This test is commented out because it currently fails.
# # Transferring the buffer before sealing it should fail.
# self.assertRaises(Exception, lambda : self.manager1.transfer(1, object_id))
def test_stresstest(self):
a = time.time()
object_ids = []
for i in range(10000): # TODO(pcm): increase this to 100000
object_id = random_object_id()
object_ids.append(object_id)
self.client1.create(object_id, 1)
self.client1.seal(object_id)
for object_id in object_ids:
self.client1.transfer("127.0.0.1", self.port2, object_id)
b = time.time() - a
print("it took", b, "seconds to put and transfer the objects")
class TestPlasmaManagerRecovery(unittest.TestCase):
def setUp(self):
# Start a Plasma store.
self.store_name, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND)
# Start a Redis server.
self.redis_address = services.address("127.0.0.1", services.start_redis()[0])
# Start a PlasmaManagers.
manager_name, self.p3, self.port1 = plasma.start_plasma_manager(
self.store_name,
self.redis_address,
use_valgrind=USE_VALGRIND)
# Connect a PlasmaClient.
self.client = plasma.PlasmaClient(self.store_name, manager_name)
# Store the processes that will be explicitly killed during tearDown so
# that a test case can remove ones that will be killed during the test.
self.processes_to_kill = [self.p2, self.p3]
def tearDown(self):
# Check that the processes are still alive.
for process in self.processes_to_kill:
self.assertEqual(process.poll(), None)
# Kill the Plasma store and Plasma manager processes.
if USE_VALGRIND:
time.sleep(1) # give processes opportunity to finish work
for process in self.processes_to_kill:
process.send_signal(signal.SIGTERM)
process.wait()
if process.returncode != 0:
print("aborting due to valgrind error")
os._exit(-1)
else:
for process in self.processes_to_kill:
process.kill()
# Clean up the Redis server.
services.cleanup()
def test_delayed_start(self):
num_objects = 10
# Create some objects using one client.
object_ids = [random_object_id() for _ in range(num_objects)]
for i in range(10):
create_object_with_id(self.client, object_ids[i], 2000, 2000)
# Wait until the objects have been sealed in the store.
ready, waiting = self.client.wait(object_ids, num_returns=num_objects)
self.assertEqual(set(ready), set(object_ids))
self.assertEqual(waiting, [])
# Start a second plasma manager attached to the same store.
manager_name, self.p5, self.port2 = plasma.start_plasma_manager(self.store_name, self.redis_address, use_valgrind=USE_VALGRIND)
self.processes_to_kill.append(self.p5)
# Check that the second manager knows about existing objects.
client2 = plasma.PlasmaClient(self.store_name, manager_name)
ready, waiting = [], object_ids
while True:
ready, waiting = client2.wait(object_ids, num_returns=num_objects, timeout=0)
if len(ready) == len(object_ids):
break
self.assertEqual(set(ready), set(object_ids))
self.assertEqual(waiting, [])
if __name__ == "__main__":
if len(sys.argv) > 1:
# pop the argument so we don't mess with unittest's own argument parser
if sys.argv[-1] == "valgrind":
arg = sys.argv.pop()
USE_VALGRIND = True
print("Using valgrind for tests")
unittest.main(verbosity=2)
+38
View File
@@ -0,0 +1,38 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import random
def random_object_id():
return np.random.bytes(20)
def generate_metadata(length):
metadata_buffer = bytearray(length)
if length > 0:
metadata_buffer[0] = random.randint(0, 255)
metadata_buffer[-1] = random.randint(0, 255)
for _ in range(100):
metadata_buffer[random.randint(0, length - 1)] = random.randint(0, 255)
return metadata_buffer
def write_to_data_buffer(buff, length):
if length > 0:
buff[0] = chr(random.randint(0, 255))
buff[-1] = chr(random.randint(0, 255))
for _ in range(100):
buff[random.randint(0, length - 1)] = chr(random.randint(0, 255))
def create_object_with_id(client, object_id, data_size, metadata_size, seal=True):
metadata = generate_metadata(metadata_size)
memory_buffer = client.create(object_id, data_size, metadata)
write_to_data_buffer(memory_buffer, data_size)
if seal:
client.seal(object_id)
return memory_buffer, metadata
def create_object(client, data_size, metadata_size, seal=True):
object_id = random_object_id()
memory_buffer, metadata = create_object_with_id(client, object_id, data_size, metadata_size, seal=seal)
return object_id, memory_buffer, metadata
+1 -1
View File
@@ -3,8 +3,8 @@ from __future__ import division
from __future__ import print_function
import numpy as np
import numbuf
import ray.numbuf as numbuf
import ray.pickling as pickling
def check_serializable(cls):
+5 -5
View File
@@ -17,9 +17,9 @@ import time
import threading
# Ray modules
import local_scheduler
import plasma
import global_scheduler
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
import ray.global_scheduler as global_scheduler
PROCESS_TYPE_WORKER = "worker"
PROCESS_TYPE_LOCAL_SCHEDULER = "local_scheduler"
@@ -217,8 +217,8 @@ def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None,
Raises:
Exception: An exception is raised if Redis could not be started.
"""
redis_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/common/redis_module/libray_redis_module.so")
redis_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "./core/src/common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(os.path.dirname(os.path.abspath(__file__)), "./core/src/common/redis_module/libray_redis_module.so")
assert os.path.isfile(redis_filepath)
assert os.path.isfile(redis_module)
counter = 0
+3 -3
View File
@@ -25,9 +25,9 @@ import traceback
import ray.pickling as pickling
import ray.serialization as serialization
import ray.services as services
import numbuf
import local_scheduler
import plasma
import ray.numbuf as numbuf
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
SCRIPT_MODE = 0
WORKER_MODE = 1
+1 -1
View File
@@ -37,7 +37,7 @@ if __name__ == "__main__":
error_explanation = """
This error is unexpected and should not have happened. Somehow a worker crashed
in an unanticipated way causing the main_loop to throw an exception, which is
being caught in "lib/python/ray/workers/default_worker.py".
being caught in "python/ray/workers/default_worker.py".
"""
while True: