mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 18:06:25 +08:00
Introduce constant for ID_SIZE in python code. (#2517)
This commit is contained in:
committed by
Philipp Moritz
parent
64d00ff39e
commit
909d7172b1
+3
-3
@@ -48,7 +48,7 @@ def compute_actor_handle_id(actor_handle_id, num_forks):
|
||||
handle_id_hash.update(actor_handle_id.id())
|
||||
handle_id_hash.update(str(num_forks).encode("ascii"))
|
||||
handle_id = handle_id_hash.digest()
|
||||
assert len(handle_id) == 20
|
||||
assert len(handle_id) == ray_constants.ID_SIZE
|
||||
return ray.ObjectID(handle_id)
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ def compute_actor_handle_id_non_forked(actor_id, actor_handle_id,
|
||||
handle_id_hash.update(actor_handle_id.id())
|
||||
handle_id_hash.update(current_task_id.id())
|
||||
handle_id = handle_id_hash.digest()
|
||||
assert len(handle_id) == 20
|
||||
assert len(handle_id) == ray_constants.ID_SIZE
|
||||
return ray.ObjectID(handle_id)
|
||||
|
||||
|
||||
@@ -110,7 +110,7 @@ def compute_actor_method_function_id(class_name, attr):
|
||||
function_id_hash.update(class_name.encode("ascii"))
|
||||
function_id_hash.update(attr.encode("ascii"))
|
||||
function_id = function_id_hash.digest()
|
||||
assert len(function_id) == 20
|
||||
assert len(function_id) == ray_constants.ID_SIZE
|
||||
return ray.ObjectID(function_id)
|
||||
|
||||
|
||||
|
||||
@@ -8,24 +8,23 @@ import sys
|
||||
import unittest
|
||||
|
||||
import ray.local_scheduler as local_scheduler
|
||||
|
||||
ID_SIZE = 20
|
||||
import ray.ray_constants as ray_constants
|
||||
|
||||
|
||||
def random_object_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_function_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_driver_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_task_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
BASE_SIMPLE_OBJECTS = [
|
||||
@@ -110,15 +109,17 @@ class TestObjectID(unittest.TestCase):
|
||||
self.assertRaises(Exception, lambda: pickle.dumps(h))
|
||||
|
||||
def test_equality_comparisons(self):
|
||||
x1 = local_scheduler.ObjectID(ID_SIZE * b"a")
|
||||
x2 = local_scheduler.ObjectID(ID_SIZE * b"a")
|
||||
y1 = local_scheduler.ObjectID(ID_SIZE * b"b")
|
||||
y2 = local_scheduler.ObjectID(ID_SIZE * b"b")
|
||||
x1 = local_scheduler.ObjectID(ray_constants.ID_SIZE * b"a")
|
||||
x2 = local_scheduler.ObjectID(ray_constants.ID_SIZE * b"a")
|
||||
y1 = local_scheduler.ObjectID(ray_constants.ID_SIZE * b"b")
|
||||
y2 = local_scheduler.ObjectID(ray_constants.ID_SIZE * b"b")
|
||||
self.assertEqual(x1, x2)
|
||||
self.assertEqual(y1, y2)
|
||||
self.assertNotEqual(x1, y1)
|
||||
|
||||
random_strings = [np.random.bytes(ID_SIZE) for _ in range(256)]
|
||||
random_strings = [
|
||||
np.random.bytes(ray_constants.ID_SIZE) for _ in range(256)
|
||||
]
|
||||
object_ids1 = [
|
||||
local_scheduler.ObjectID(random_strings[i]) for i in range(256)
|
||||
]
|
||||
|
||||
@@ -13,6 +13,7 @@ import time
|
||||
|
||||
import ray
|
||||
import ray.gcs_utils
|
||||
import ray.ray_constants as ray_constants
|
||||
from ray.utils import (decode, binary_to_object_id, binary_to_hex,
|
||||
hex_to_binary)
|
||||
|
||||
@@ -496,7 +497,7 @@ class GlobalState(object):
|
||||
|
||||
else:
|
||||
# This is the raylet code path.
|
||||
NIL_CLIENT_ID = 20 * b"\xff"
|
||||
NIL_CLIENT_ID = ray_constants.ID_SIZE * b"\xff"
|
||||
message = self.redis_client.execute_command(
|
||||
"RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.CLIENT, "",
|
||||
NIL_CLIENT_ID)
|
||||
@@ -1235,7 +1236,7 @@ class GlobalState(object):
|
||||
for key in actor_keys:
|
||||
info = self.redis_client.hgetall(key)
|
||||
actor_id = key[len("Actor:"):]
|
||||
assert len(actor_id) == 20
|
||||
assert len(actor_id) == ray_constants.ID_SIZE
|
||||
actor_info[binary_to_hex(actor_id)] = {
|
||||
"class_id": binary_to_hex(info[b"class_id"]),
|
||||
"driver_id": binary_to_hex(info[b"driver_id"]),
|
||||
|
||||
@@ -18,32 +18,32 @@ import ray.plasma as plasma
|
||||
from ray.plasma.utils import create_object
|
||||
from ray import services
|
||||
from ray.experimental import state
|
||||
import ray.ray_constants as ray_constants
|
||||
import pyarrow as pa
|
||||
|
||||
USE_VALGRIND = False
|
||||
PLASMA_STORE_MEMORY = 1000000000
|
||||
ID_SIZE = 20
|
||||
NUM_CLUSTER_NODES = 2
|
||||
|
||||
NIL_WORKER_ID = 20 * b"\xff"
|
||||
NIL_OBJECT_ID = 20 * b"\xff"
|
||||
NIL_ACTOR_ID = 20 * b"\xff"
|
||||
NIL_WORKER_ID = ray_constants.ID_SIZE * b"\xff"
|
||||
NIL_OBJECT_ID = ray_constants.ID_SIZE * b"\xff"
|
||||
NIL_ACTOR_ID = ray_constants.ID_SIZE * b"\xff"
|
||||
|
||||
|
||||
def random_driver_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_task_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_function_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_object_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def new_port():
|
||||
|
||||
@@ -12,28 +12,28 @@ import unittest
|
||||
|
||||
import ray.local_scheduler as local_scheduler
|
||||
import ray.plasma as plasma
|
||||
import ray.ray_constants as ray_constants
|
||||
import pyarrow as pa
|
||||
|
||||
USE_VALGRIND = False
|
||||
ID_SIZE = 20
|
||||
|
||||
NIL_WORKER_ID = 20 * b"\xff"
|
||||
NIL_WORKER_ID = ray_constants.ID_SIZE * b"\xff"
|
||||
|
||||
|
||||
def random_object_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_driver_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_task_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def random_function_id():
|
||||
return local_scheduler.ObjectID(np.random.bytes(ID_SIZE))
|
||||
return local_scheduler.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
class TestLocalSchedulerClient(unittest.TestCase):
|
||||
|
||||
@@ -16,14 +16,14 @@ from ray.autoscaler.autoscaler import LoadMetrics, StandardAutoscaler
|
||||
import ray.cloudpickle as pickle
|
||||
import ray.gcs_utils
|
||||
import ray.utils
|
||||
import ray.ray_constants as ray_constants
|
||||
from ray.services import get_ip_address, get_port
|
||||
from ray.utils import binary_to_hex, binary_to_object_id, hex_to_binary
|
||||
from ray.worker import NIL_ACTOR_ID
|
||||
|
||||
# These variables must be kept in sync with the C codebase.
|
||||
# common/common.h
|
||||
DB_CLIENT_ID_SIZE = 20
|
||||
NIL_ID = b"\xff" * DB_CLIENT_ID_SIZE
|
||||
NIL_ID = b"\xff" * ray_constants.ID_SIZE
|
||||
|
||||
# common/task.h
|
||||
TASK_STATUS_LOST = 32
|
||||
@@ -348,8 +348,8 @@ class Monitor(object):
|
||||
This resets the number of heartbeats that we've missed from this plasma
|
||||
manager.
|
||||
"""
|
||||
# The first DB_CLIENT_ID_SIZE characters are the client ID.
|
||||
db_client_id = data[:DB_CLIENT_ID_SIZE]
|
||||
# The first ray_constants.ID_SIZE characters are the client ID.
|
||||
db_client_id = data[:ray_constants.ID_SIZE]
|
||||
# Reset the number of heartbeats that we've missed from this plasma
|
||||
# manager.
|
||||
self.live_plasma_managers[db_client_id] = 0
|
||||
|
||||
@@ -18,6 +18,7 @@ import unittest
|
||||
import ray
|
||||
from ray.plasma.utils import (random_object_id, create_object_with_id,
|
||||
create_object)
|
||||
import ray.ray_constants as ray_constants
|
||||
from ray import services
|
||||
import pyarrow as pa
|
||||
import pyarrow.plasma as plasma
|
||||
@@ -377,7 +378,7 @@ class TestPlasmaManager(unittest.TestCase):
|
||||
# Make sure that wait returns when the requested number of object IDs
|
||||
# are available and does not wait for all object IDs to be available.
|
||||
object_ids = [random_object_id() for _ in range(9)] + \
|
||||
[plasma.ObjectID(20 * b'\x00')]
|
||||
[plasma.ObjectID(ray_constants.ID_SIZE * b'\x00')]
|
||||
object_ids_perm = object_ids[:]
|
||||
random.shuffle(object_ids_perm)
|
||||
for i in range(10):
|
||||
|
||||
@@ -6,10 +6,11 @@ import numpy as np
|
||||
import random
|
||||
|
||||
import pyarrow.plasma as plasma
|
||||
import ray.ray_constants as ray_constants
|
||||
|
||||
|
||||
def random_object_id():
|
||||
return plasma.ObjectID(np.random.bytes(20))
|
||||
return plasma.ObjectID(np.random.bytes(ray_constants.ID_SIZE))
|
||||
|
||||
|
||||
def generate_metadata(length):
|
||||
|
||||
@@ -6,6 +6,7 @@ import copy
|
||||
import hashlib
|
||||
import inspect
|
||||
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.signature
|
||||
|
||||
# Default parameters for remote functions.
|
||||
@@ -37,7 +38,7 @@ def compute_function_id(function):
|
||||
pass
|
||||
# Compute the function ID.
|
||||
function_id = function_id_hash.digest()
|
||||
assert len(function_id) == 20
|
||||
assert len(function_id) == ray_constants.ID_SIZE
|
||||
function_id = ray.ObjectID(function_id)
|
||||
|
||||
return function_id
|
||||
|
||||
+3
-4
@@ -17,14 +17,13 @@ import ray.local_scheduler
|
||||
import ray.ray_constants as ray_constants
|
||||
|
||||
ERROR_KEY_PREFIX = b"Error:"
|
||||
DRIVER_ID_LENGTH = 20
|
||||
|
||||
|
||||
def _random_string():
|
||||
id_hash = hashlib.sha1()
|
||||
id_hash.update(uuid.uuid4().bytes)
|
||||
id_bytes = id_hash.digest()
|
||||
assert len(id_bytes) == 20
|
||||
assert len(id_bytes) == ray_constants.ID_SIZE
|
||||
return id_bytes
|
||||
|
||||
|
||||
@@ -157,14 +156,14 @@ def random_string():
|
||||
deterministic manner, then we will need to make some changes here.
|
||||
|
||||
Returns:
|
||||
A random byte string of length 20.
|
||||
A random byte string of length ray_constants.ID_SIZE.
|
||||
"""
|
||||
# Get the state of the numpy random number generator.
|
||||
numpy_state = np.random.get_state()
|
||||
# Try to use true randomness.
|
||||
np.random.seed(None)
|
||||
# Generate the random ID.
|
||||
random_id = np.random.bytes(20)
|
||||
random_id = np.random.bytes(ray_constants.ID_SIZE)
|
||||
# Reset the state of the numpy random number generator.
|
||||
np.random.set_state(numpy_state)
|
||||
return random_id
|
||||
|
||||
+9
-10
@@ -47,16 +47,14 @@ SILENT_MODE = 3
|
||||
PYTHON_MODE = 4
|
||||
|
||||
ERROR_KEY_PREFIX = b"Error:"
|
||||
DRIVER_ID_LENGTH = 20
|
||||
ERROR_ID_LENGTH = 20
|
||||
|
||||
# This must match the definition of NIL_ACTOR_ID in task.h.
|
||||
NIL_ID = 20 * b"\xff"
|
||||
NIL_ID = ray_constants.ID_SIZE * b"\xff"
|
||||
NIL_LOCAL_SCHEDULER_ID = NIL_ID
|
||||
NIL_FUNCTION_ID = NIL_ID
|
||||
NIL_ACTOR_ID = NIL_ID
|
||||
NIL_ACTOR_HANDLE_ID = NIL_ID
|
||||
NIL_CLIENT_ID = 20 * b"\xff"
|
||||
NIL_CLIENT_ID = ray_constants.ID_SIZE * b"\xff"
|
||||
|
||||
# This must be kept in sync with the `error_types` array in
|
||||
# common/state/error_table.h.
|
||||
@@ -1207,13 +1205,13 @@ def error_applies_to_driver(error_key, worker=global_worker):
|
||||
"""Return True if the error is for this driver and false otherwise."""
|
||||
# TODO(rkn): Should probably check that this is only called on a driver.
|
||||
# Check that the error key is formatted as in push_error_to_driver.
|
||||
assert len(error_key) == (len(ERROR_KEY_PREFIX) + DRIVER_ID_LENGTH + 1 +
|
||||
ERROR_ID_LENGTH), error_key
|
||||
assert len(error_key) == (len(ERROR_KEY_PREFIX) + ray_constants.ID_SIZE + 1
|
||||
+ ray_constants.ID_SIZE), error_key
|
||||
# If the driver ID in the error message is a sequence of all zeros, then
|
||||
# the message is intended for all drivers.
|
||||
generic_driver_id = DRIVER_ID_LENGTH * b"\x00"
|
||||
generic_driver_id = ray_constants.ID_SIZE * b"\x00"
|
||||
driver_id = error_key[len(ERROR_KEY_PREFIX):(
|
||||
len(ERROR_KEY_PREFIX) + DRIVER_ID_LENGTH)]
|
||||
len(ERROR_KEY_PREFIX) + ray_constants.ID_SIZE)]
|
||||
return (driver_id == worker.task_driver_id.id()
|
||||
or driver_id == generic_driver_id)
|
||||
|
||||
@@ -1951,7 +1949,7 @@ def print_error_messages_raylet(worker):
|
||||
assert gcs_entry.EntriesLength() == 1
|
||||
error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData(
|
||||
gcs_entry.Entries(0), 0)
|
||||
NIL_JOB_ID = 20 * b"\x00"
|
||||
NIL_JOB_ID = ray_constants.ID_SIZE * b"\x00"
|
||||
job_id = error_data.JobId()
|
||||
if job_id not in [worker.task_driver_id.id(), NIL_JOB_ID]:
|
||||
continue
|
||||
@@ -2182,7 +2180,8 @@ def connect(info,
|
||||
else:
|
||||
# Try to use true randomness.
|
||||
np.random.seed(None)
|
||||
worker.current_task_id = ray.ObjectID(np.random.bytes(20))
|
||||
worker.current_task_id = ray.ObjectID(
|
||||
np.random.bytes(ray_constants.ID_SIZE))
|
||||
# Reset the state of the numpy random number generator.
|
||||
np.random.set_state(numpy_state)
|
||||
# Set other fields needed for computing task IDs.
|
||||
|
||||
+6
-7
@@ -13,6 +13,7 @@ from collections import defaultdict, namedtuple, OrderedDict
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.test.test_utils
|
||||
|
||||
|
||||
@@ -2131,8 +2132,6 @@ class GlobalStateAPI(unittest.TestCase):
|
||||
|
||||
self.assertEqual(ray.global_state.object_table(), {})
|
||||
|
||||
ID_SIZE = 20
|
||||
|
||||
driver_id = ray.experimental.state.binary_to_hex(
|
||||
ray.worker.global_worker.worker_id)
|
||||
driver_task_id = ray.experimental.state.binary_to_hex(
|
||||
@@ -2150,14 +2149,14 @@ class GlobalStateAPI(unittest.TestCase):
|
||||
self.assertEqual(task_table[driver_task_id]["TaskSpec"]["TaskID"],
|
||||
driver_task_id)
|
||||
self.assertEqual(task_table[driver_task_id]["TaskSpec"]["ActorID"],
|
||||
ID_SIZE * "ff")
|
||||
ray_constants.ID_SIZE * "ff")
|
||||
self.assertEqual(task_table[driver_task_id]["TaskSpec"]["Args"],
|
||||
[])
|
||||
self.assertEqual(
|
||||
task_table[driver_task_id]["TaskSpec"]["DriverID"], driver_id)
|
||||
self.assertEqual(
|
||||
task_table[driver_task_id]["TaskSpec"]["FunctionID"],
|
||||
ID_SIZE * "ff")
|
||||
ray_constants.ID_SIZE * "ff")
|
||||
self.assertEqual(
|
||||
(task_table[driver_task_id]["TaskSpec"]["ReturnObjectIDs"]),
|
||||
[])
|
||||
@@ -2169,7 +2168,7 @@ class GlobalStateAPI(unittest.TestCase):
|
||||
driver_task_id)
|
||||
self.assertEqual(
|
||||
task_table[driver_task_id][0]["TaskSpec"]["ActorID"],
|
||||
ID_SIZE * "ff")
|
||||
ray_constants.ID_SIZE * "ff")
|
||||
self.assertEqual(task_table[driver_task_id][0]["TaskSpec"]["Args"],
|
||||
[])
|
||||
self.assertEqual(
|
||||
@@ -2177,7 +2176,7 @@ class GlobalStateAPI(unittest.TestCase):
|
||||
driver_id)
|
||||
self.assertEqual(
|
||||
task_table[driver_task_id][0]["TaskSpec"]["FunctionID"],
|
||||
ID_SIZE * "ff")
|
||||
ray_constants.ID_SIZE * "ff")
|
||||
self.assertEqual(
|
||||
(task_table[driver_task_id][0]["TaskSpec"]["ReturnObjectIDs"]),
|
||||
[])
|
||||
@@ -2222,7 +2221,7 @@ class GlobalStateAPI(unittest.TestCase):
|
||||
task_spec = task_table[task_id]["TaskSpec"]
|
||||
else:
|
||||
task_spec = task_table[task_id][0]["TaskSpec"]
|
||||
self.assertEqual(task_spec["ActorID"], ID_SIZE * "ff")
|
||||
self.assertEqual(task_spec["ActorID"], ray_constants.ID_SIZE * "ff")
|
||||
self.assertEqual(task_spec["Args"], [1, "hi", x_id])
|
||||
self.assertEqual(task_spec["DriverID"], driver_id)
|
||||
self.assertEqual(task_spec["ReturnObjectIDs"], [result_id])
|
||||
|
||||
Reference in New Issue
Block a user