From 3e492a79ec7b67dd1137535cead690339effc2ac Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 18 Dec 2020 15:59:03 -0800 Subject: [PATCH] Increase the number of unique bits for actors to avoid handle collisions (#12894) --- .../tests/test_stats_collector.py | 18 ++++++--------- dashboard/tests/test_memory_utils.py | 5 +++-- .../src/main/java/io/ray/api/id/ActorId.java | 2 +- .../src/main/java/io/ray/api/id/ObjectId.java | 2 +- .../src/main/java/io/ray/api/id/UniqueId.java | 2 +- .../java/io/ray/runtime/UniqueIdTest.java | 22 ++++++------------- python/ray/exceptions.py | 6 +++-- python/ray/includes/function_descriptor.pxi | 13 +++++++---- python/ray/includes/unique_ids.pxi | 2 +- python/ray/log_monitor.py | 2 +- python/ray/ray_constants.py | 2 +- python/ray/serialization.py | 5 +++-- python/ray/tests/test_advanced_3.py | 6 ++--- python/ray/tests/test_multi_node.py | 8 +++---- python/ray/utils.py | 4 ++-- python/ray/worker.py | 3 ++- src/ray/common/constants.h | 2 +- src/ray/common/id.h | 2 +- src/ray/core_worker/actor_manager.cc | 2 ++ 19 files changed, 54 insertions(+), 54 deletions(-) diff --git a/dashboard/modules/stats_collector/tests/test_stats_collector.py b/dashboard/modules/stats_collector/tests/test_stats_collector.py index f4246770a..bed6d650f 100644 --- a/dashboard/modules/stats_collector/tests/test_stats_collector.py +++ b/dashboard/modules/stats_collector/tests/test_stats_collector.py @@ -112,20 +112,16 @@ def test_memory_table(disable_aiohttp_cache, ray_start_with_dashboard): def check_mem_table(): resp = requests.get(f"{webui_url}/memory/memory_table") resp_data = resp.json() - if not resp_data["result"]: - return False + assert resp_data["result"] latest_memory_table = resp_data["data"]["memoryTable"] summary = latest_memory_table["summary"] - try: - # 1 ref per handle and per object the actor has a ref to - assert summary["totalActorHandles"] == len(actors) * 2 - # 1 ref for my_obj - assert summary["totalLocalRefCount"] == 1 - return True - except AssertionError: - return False + # 1 ref per handle and per object the actor has a ref to + assert summary["totalActorHandles"] == len(actors) * 2 + # 1 ref for my_obj + assert summary["totalLocalRefCount"] == 1 - wait_for_condition(check_mem_table, 10) + wait_until_succeeded_without_exception( + check_mem_table, (AssertionError, ), timeout_ms=1000) def test_get_all_node_details(disable_aiohttp_cache, ray_start_with_dashboard): diff --git a/dashboard/tests/test_memory_utils.py b/dashboard/tests/test_memory_utils.py index f58ecd8ae..212eeefad 100644 --- a/dashboard/tests/test_memory_utils.py +++ b/dashboard/tests/test_memory_utils.py @@ -7,8 +7,9 @@ from ray.new_dashboard.memory_utils import ( NODE_ADDRESS = "127.0.0.1" IS_DRIVER = True PID = 1 -OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA=" -ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000" + +OBJECT_ID = "ZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZg==" +ACTOR_ID = "fffffffffffffffffffffffffffffffff66d17ba010000c801000000" DECODED_ID = decode_object_ref_if_needed(OBJECT_ID) OBJECT_SIZE = 100 diff --git a/java/api/src/main/java/io/ray/api/id/ActorId.java b/java/api/src/main/java/io/ray/api/id/ActorId.java index 65a0cf19a..a21d4e79f 100644 --- a/java/api/src/main/java/io/ray/api/id/ActorId.java +++ b/java/api/src/main/java/io/ray/api/id/ActorId.java @@ -7,7 +7,7 @@ import java.util.Random; public class ActorId extends BaseId implements Serializable { - private static final int UNIQUE_BYTES_LENGTH = 4; + private static final int UNIQUE_BYTES_LENGTH = 12; public static final int LENGTH = JobId.LENGTH + UNIQUE_BYTES_LENGTH; diff --git a/java/api/src/main/java/io/ray/api/id/ObjectId.java b/java/api/src/main/java/io/ray/api/id/ObjectId.java index 9b1fa246f..78b677ac8 100644 --- a/java/api/src/main/java/io/ray/api/id/ObjectId.java +++ b/java/api/src/main/java/io/ray/api/id/ObjectId.java @@ -10,7 +10,7 @@ import java.util.Random; */ public class ObjectId extends BaseId implements Serializable { - public static final int LENGTH = 20; + public static final int LENGTH = 28; /** * Create an ObjectId from a ByteBuffer. diff --git a/java/api/src/main/java/io/ray/api/id/UniqueId.java b/java/api/src/main/java/io/ray/api/id/UniqueId.java index 03de53943..44b19f6a7 100644 --- a/java/api/src/main/java/io/ray/api/id/UniqueId.java +++ b/java/api/src/main/java/io/ray/api/id/UniqueId.java @@ -11,7 +11,7 @@ import java.util.Random; */ public class UniqueId extends BaseId implements Serializable { - public static final int LENGTH = 20; + public static final int LENGTH = 28; public static final UniqueId NIL = genNil(); /** diff --git a/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java b/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java index 25704f321..7496f1baf 100644 --- a/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java +++ b/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java @@ -1,7 +1,6 @@ package io.ray.runtime; import io.ray.api.id.UniqueId; -import io.ray.runtime.util.IdUtil; import java.nio.ByteBuffer; import java.util.Arrays; import javax.xml.bind.DatatypeConverter; @@ -13,12 +12,12 @@ public class UniqueIdTest { @Test public void testConstructUniqueId() { // Test `fromHexString()` - UniqueId id1 = UniqueId.fromHexString("00000000123456789ABCDEF123456789ABCDEF00"); - Assert.assertEquals("00000000123456789abcdef123456789abcdef00", id1.toString()); + UniqueId id1 = UniqueId.fromHexString("00000000123456789ABCDEF123456789ABCDEF0123456789ABCDEF00"); + Assert.assertEquals("00000000123456789abcdef123456789abcdef0123456789abcdef00", id1.toString()); Assert.assertFalse(id1.isNil()); try { - UniqueId id2 = UniqueId.fromHexString("000000123456789ABCDEF123456789ABCDEF00"); + UniqueId id2 = UniqueId.fromHexString("000000123456789ABCDEF123456789ABCDEF0123456789ABCDEF00"); // This shouldn't be happened. Assert.assertTrue(false); } catch (IllegalArgumentException e) { @@ -34,23 +33,16 @@ public class UniqueIdTest { } // Test `fromByteBuffer()` - byte[] bytes = DatatypeConverter.parseHexBinary("0123456789ABCDEF0123456789ABCDEF01234567"); - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes, 0, 20); + byte[] bytes = DatatypeConverter.parseHexBinary("0123456789ABCDEF0123456789ABCDEF012345670123456789ABCDEF"); + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes, 0, 28); UniqueId id4 = UniqueId.fromByteBuffer(byteBuffer); Assert.assertTrue(Arrays.equals(bytes, id4.getBytes())); - Assert.assertEquals("0123456789abcdef0123456789abcdef01234567", id4.toString()); + Assert.assertEquals("0123456789abcdef0123456789abcdef012345670123456789abcdef", id4.toString()); // Test `genNil()` UniqueId id6 = UniqueId.NIL; - Assert.assertEquals("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF".toLowerCase(), id6.toString()); + Assert.assertEquals("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF".toLowerCase(), id6.toString()); Assert.assertTrue(id6.isNil()); } - - @Test - void testMurmurHash() { - UniqueId id = UniqueId.fromHexString("3131313131313131313132323232323232323232"); - long remainder = Long.remainderUnsigned(IdUtil.murmurHashCode(id), 1000000000); - Assert.assertEquals(remainder, 787616861); - } } diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index b5a0b477c..56e943db6 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -142,7 +142,8 @@ class WorkerCrashedError(RayError): """Indicates that the worker died unexpectedly while executing a task.""" def __str__(self): - return "The worker died unexpectedly while executing this task." + return ("The worker died unexpectedly while executing this task. " + "Check python-core-worker-*.log files for more information.") class RayActorError(RayError): @@ -153,7 +154,8 @@ class RayActorError(RayError): """ def __str__(self): - return "The actor died unexpectedly before finishing this task." + return ("The actor died unexpectedly before finishing this task. " + "Check python-core-worker-*.log files for more information.") class RaySystemError(RayError): diff --git a/python/ray/includes/function_descriptor.pxi b/python/ray/includes/function_descriptor.pxi index a9ac11fdb..d2c4cbbf4 100644 --- a/python/ray/includes/function_descriptor.pxi +++ b/python/ray/includes/function_descriptor.pxi @@ -12,6 +12,7 @@ import hashlib import cython import inspect import uuid +import ray.ray_constants as ray_constants ctypedef object (*FunctionDescriptor_from_cpp)(const CFunctionDescriptor &) @@ -188,7 +189,8 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor): function_name = function.__name__ class_name = "" - pickled_function_hash = hashlib.sha1(pickled_function).hexdigest() + pickled_function_hash = hashlib.shake_128(pickled_function).hexdigest( + ray_constants.ID_SIZE) return cls(module_name, function_name, class_name, pickled_function_hash) @@ -208,7 +210,10 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor): module_name = target_class.__module__ class_name = target_class.__name__ # Use a random uuid as function hash to solve actor name conflict. - return cls(module_name, "__init__", class_name, str(uuid.uuid4())) + return cls( + module_name, "__init__", class_name, + hashlib.shake_128( + uuid.uuid4().bytes).hexdigest(ray_constants.ID_SIZE)) @property def module_name(self): @@ -268,14 +273,14 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor): Returns: ray.ObjectRef to represent the function descriptor. """ - function_id_hash = hashlib.sha1() + function_id_hash = hashlib.shake_128() # Include the function module and name in the hash. function_id_hash.update(self.typed_descriptor.ModuleName()) function_id_hash.update(self.typed_descriptor.FunctionName()) function_id_hash.update(self.typed_descriptor.ClassName()) function_id_hash.update(self.typed_descriptor.FunctionHash()) # Compute the function ID. - function_id = function_id_hash.digest() + function_id = function_id_hash.digest(ray_constants.ID_SIZE) return ray.FunctionID(function_id) def is_actor_method(self): diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index bcf766829..52a6730e6 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -31,7 +31,7 @@ def check_id(b, size=kUniqueIDSize): raise TypeError("Unsupported type: " + str(type(b))) if len(b) != size: raise ValueError("ID string needs to have length " + - str(size)) + str(size) + ", got " + str(len(b))) cdef extern from "ray/common/constants.h" nogil: diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index ac5fa5296..d6b3a314e 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -22,7 +22,7 @@ from ray.ray_logging import setup_component_logger logger = logging.getLogger(__name__) # The groups are worker id, job id, and pid. -JOB_LOG_PATTERN = re.compile(".*worker-([0-9a-f]{40})-(\d+)-(\d+)") +JOB_LOG_PATTERN = re.compile(".*worker-([0-9a-f]+)-(\d+)-(\d+)") class LogFileInfo: diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index be717ca3c..30b3b5c7b 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -19,7 +19,7 @@ def env_bool(key, default): return default -ID_SIZE = 20 +ID_SIZE = 28 # The default maximum number of bytes to allocate to the object store unless # overridden by the user. diff --git a/python/ray/serialization.py b/python/ray/serialization.py index dc9a2c40e..9a24f3ccc 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -74,7 +74,8 @@ def _try_to_compute_deterministic_class_id(cls, depth=5): new_class_id = pickle.dumps(pickle.loads(class_id)) if new_class_id == class_id: # We appear to have reached a fix point, so use this as the ID. - return hashlib.sha1(new_class_id).digest() + return hashlib.shake_128(new_class_id).digest( + ray_constants.ID_SIZE) class_id = new_class_id # We have not reached a fixed point, so we may end up with a different @@ -82,7 +83,7 @@ def _try_to_compute_deterministic_class_id(cls, depth=5): # same class definition being exported many many times. logger.warning( f"WARNING: Could not produce a deterministic class ID for class {cls}") - return hashlib.sha1(new_class_id).digest() + return hashlib.shake_128(new_class_id).digest(ray_constants.ID_SIZE) def object_ref_deserializer(reduced_obj_ref, owner_address): diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 7f1e8e639..b1bc25fbb 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -284,14 +284,14 @@ def test_workers(shutdown_only): def test_object_ref_properties(): - id_bytes = b"00112233445566778899" + id_bytes = b"0011223344556677889900001111" object_ref = ray.ObjectRef(id_bytes) assert object_ref.binary() == id_bytes object_ref = ray.ObjectRef.nil() assert object_ref.is_nil() - with pytest.raises(ValueError, match=r".*needs to have length 20.*"): + with pytest.raises(ValueError, match=r".*needs to have length.*"): ray.ObjectRef(id_bytes + b"1234") - with pytest.raises(ValueError, match=r".*needs to have length 20.*"): + with pytest.raises(ValueError, match=r".*needs to have length.*"): ray.ObjectRef(b"0123456789") object_ref = ray.ObjectRef.from_random() assert not object_ref.is_nil() diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index cb206112d..fbce475c1 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -741,10 +741,10 @@ ray.get(main_wait.release.remote()) driver1_out_split = driver1_out.split("\n") driver2_out_split = driver2_out.split("\n") - assert driver1_out_split[0][-1] == "1" - assert driver1_out_split[1][-1] == "2" - assert driver2_out_split[0][-1] == "3" - assert driver2_out_split[1][-1] == "4" + assert driver1_out_split[0][-1] == "1", driver1_out_split + assert driver1_out_split[1][-1] == "2", driver1_out_split + assert driver2_out_split[0][-1] == "3", driver2_out_split + assert driver2_out_split[1][-1] == "4", driver2_out_split if __name__ == "__main__": diff --git a/python/ray/utils.py b/python/ray/utils.py index a3940d6e8..2704e07cc 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -50,9 +50,9 @@ def get_ray_temp_dir(): def _random_string(): - id_hash = hashlib.sha1() + id_hash = hashlib.shake_128() id_hash.update(uuid.uuid4().bytes) - id_bytes = id_hash.digest() + id_bytes = id_hash.digest(ray_constants.ID_SIZE) assert len(id_bytes) == ray_constants.ID_SIZE return id_bytes diff --git a/python/ray/worker.py b/python/ray/worker.py index 495478ad7..627037098 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -345,7 +345,8 @@ class Worker: # actually run the function locally. pickled_function = pickle.dumps(function) - function_to_run_id = hashlib.sha1(pickled_function).digest() + function_to_run_id = hashlib.shake_128(pickled_function).digest( + ray_constants.ID_SIZE) key = b"FunctionsToRun:" + function_to_run_id # First run the function on the driver. # We always run the task locally. diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index 1636846f0..3a3461f2c 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -18,7 +18,7 @@ #include /// Length of Ray full-length IDs in bytes. -constexpr size_t kUniqueIDSize = 20; +constexpr size_t kUniqueIDSize = 28; /// An ObjectID's bytes are split into the task ID itself and the index of the /// object's creation. This is the maximum width of the object index in bits. diff --git a/src/ray/common/id.h b/src/ray/common/id.h index d12ba550d..bd55b27e5 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -124,7 +124,7 @@ class JobID : public BaseID { class ActorID : public BaseID { private: - static constexpr size_t kUniqueBytesLength = 4; + static constexpr size_t kUniqueBytesLength = 12; public: /// Length of `ActorID` in bytes. diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index e6ef4fc87..6b931082a 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -91,6 +91,8 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, std::placeholders::_1, std::placeholders::_2); RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( actor_id, actor_notification_callback, nullptr)); + } else { + RAY_LOG(ERROR) << "Actor handle already exists " << actor_id.Hex(); } return inserted;