From 3d224c4edfc4f96d72e8b281fc39ca3573ed8382 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 26 Dec 2017 16:22:04 -0800 Subject: [PATCH] Second Part of Internal API Refactor (#1326) --- CMakeLists.txt | 15 ++- cmake/Modules/FindRay.cmake | 77 ++++++++++++ .../global_scheduler_services.py | 1 + src/common/CMakeLists.txt | 3 +- src/common/common.cc | 55 --------- src/common/common.h | 115 +---------------- src/common/common_protocol.cc | 21 ++-- src/common/common_protocol.h | 10 +- src/common/lib/python/common_extension.cc | 36 +++--- src/common/lib/python/common_extension.h | 12 +- src/common/logging.cc | 4 +- src/common/logging.h | 12 +- src/common/state/actor_notification_table.cc | 2 +- src/common/state/db_client_table.cc | 7 +- src/common/state/driver_table.cc | 2 +- src/common/state/error_table.cc | 13 +- src/common/state/error_table.h | 2 +- src/common/state/local_scheduler_table.cc | 7 +- src/common/state/object_table.cc | 6 +- src/common/state/redis.cc | 116 +++++++++--------- src/common/task.cc | 26 ++-- src/common/task.h | 16 ++- src/common/test/common_tests.cc | 24 ---- src/common/test/db_tests.cc | 14 ++- src/common/test/example_task.h | 16 +-- src/common/test/object_table_tests.cc | 51 ++++---- src/common/test/redis_tests.cc | 6 +- src/common/test/run_tests.sh | 1 - src/common/test/run_valgrind.sh | 13 +- src/common/test/task_table_tests.cc | 30 ++--- src/common/test/task_tests.cc | 92 +++++++------- src/global_scheduler/CMakeLists.txt | 2 +- src/global_scheduler/global_scheduler.cc | 35 +++--- .../global_scheduler_algorithm.cc | 9 +- src/local_scheduler/CMakeLists.txt | 9 +- src/local_scheduler/local_scheduler.cc | 95 +++++++------- .../local_scheduler_algorithm.cc | 58 ++++----- src/local_scheduler/local_scheduler_client.cc | 4 +- .../test/local_scheduler_tests.cc | 18 +-- src/local_scheduler/test/run_valgrind.sh | 2 +- src/plasma/CMakeLists.txt | 12 +- src/plasma/plasma_manager.cc | 5 +- src/plasma/plasma_manager.h | 10 +- src/plasma/test/manager_tests.cc | 8 +- src/plasma/test/run_valgrind.sh | 2 +- src/ray/CMakeLists.txt | 6 +- src/ray/api.h | 0 src/ray/gcs/CMakeLists.txt | 2 +- src/ray/gcs/client.cc | 2 +- src/ray/gcs/client.h | 2 +- src/ray/gcs/client_test.cc | 6 +- src/ray/gcs/format/gcs.fbs | 13 +- src/ray/gcs/tables.h | 10 +- src/ray/id.cc | 25 +++- src/ray/id.h | 10 +- src/ray/status.h | 17 --- src/ray/util/logging.h | 45 ------- src/thirdparty/download_thirdparty.sh | 2 +- 58 files changed, 537 insertions(+), 677 deletions(-) create mode 100644 cmake/Modules/FindRay.cmake delete mode 100644 src/common/test/common_tests.cc create mode 100644 src/ray/api.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c2f6aebf7..87b0cc01c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,11 @@ project(ray) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/Modules") +list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/src/thirdparty/arrow/python/cmake_modules) + +find_package(Arrow) +find_package(Plasma) + # This ensures that things like gnu++11 get passed correctly set(CMAKE_CXX_STANDARD 11) @@ -11,11 +16,11 @@ set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED ON) option(RAY_BUILD_STATIC - "Build the libarrow static libraries" + "Build the libray static libraries" ON) option(RAY_BUILD_SHARED - "Build the libarrow shared libraries" + "Build the libray shared libraries" ON) option(RAY_BUILD_TESTS @@ -29,10 +34,8 @@ enable_testing() include(ThirdpartyToolchain) -set(ARROW_DIR "${CMAKE_CURRENT_LIST_DIR}/src/thirdparty/arrow/" - CACHE STRING "Path of the arrow source directory") - -include_directories("${ARROW_DIR}/cpp/src/") +include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) +include_directories(SYSTEM ${PLASMA_INCLUDE_DIR}) include_directories("${CMAKE_CURRENT_LIST_DIR}/src/") add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/ray/) diff --git a/cmake/Modules/FindRay.cmake b/cmake/Modules/FindRay.cmake new file mode 100644 index 000000000..815a5aa42 --- /dev/null +++ b/cmake/Modules/FindRay.cmake @@ -0,0 +1,77 @@ +# - Find RAY +# This module defines +# RAY_INCLUDE_DIR, directory containing headers +# RAY_LIBS, directory containing ray libraries +# RAY_STATIC_LIB, path to static library +# RAY_SHARED_LIB, path to shared library +# RAY_FOUND, whether ray has been found + +include(FindPkgConfig) + +if ("$ENV{RAY_HOME}" STREQUAL "") + pkg_check_modules(RAY ray) + if (RAY_FOUND) + pkg_get_variable(RAY_ABI_VERSION ray abi_version) + message(STATUS "Ray ABI version: ${RAY_ABI_VERSION}") + pkg_get_variable(RAY_SO_VERSION ray so_version) + message(STATUS "Ray SO version: ${RAY_SO_VERSION}") + set(RAY_INCLUDE_DIR ${RAY_INCLUDE_DIRS}) + set(RAY_LIBS ${RAY_LIBRARY_DIRS}) + set(RAY_SEARCH_LIB_PATH ${RAY_LIBRARY_DIRS}) + endif() +else() + set(RAY_HOME "$ENV{RAY_HOME}") + + set(RAY_SEARCH_HEADER_PATHS + ${RAY_HOME}/include + ) + + set(RAY_SEARCH_LIB_PATH + ${RAY_HOME}/lib + ) + + find_path(RAY_INCLUDE_DIR ray/gcs/client.h PATHS + ${RAY_SEARCH_HEADER_PATHS} + # make sure we don't accidentally pick up a different version + NO_DEFAULT_PATH + ) +endif() + +find_library(RAY_LIB_PATH NAMES ray + PATHS + ${RAY_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) +get_filename_component(RAY_LIBS ${RAY_LIB_PATH} DIRECTORY) + +if (RAY_INCLUDE_DIR AND RAY_LIBS) + set(RAY_FOUND TRUE) + set(RAY_LIB_NAME ray) + + set(RAY_STATIC_LIB ${RAY_LIBS}/lib${RAY_LIB_NAME}.a) + + set(RAY_SHARED_LIB ${RAY_LIBS}/lib${RAY_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) +endif() + +if (RAY_FOUND) + if (NOT Ray_FIND_QUIETLY) + message(STATUS "Found the Ray core library: ${RAY_LIB_PATH}") + endif () +else () + if (NOT Ray_FIND_QUIETLY) + set(RAY_ERR_MSG "Could not find the Ray library. Looked for headers") + set(RAY_ERR_MSG "${RAY_ERR_MSG} in ${RAY_SEARCH_HEADER_PATHS}, and for libs") + set(RAY_ERR_MSG "${RAY_ERR_MSG} in ${RAY_SEARCH_LIB_PATH}") + if (Ray_FIND_REQUIRED) + message(FATAL_ERROR "${RAY_ERR_MSG}") + else (Ray_FIND_REQUIRED) + message(STATUS "${RAY_ERR_MSG}") + endif (Ray_FIND_REQUIRED) + endif () + set(RAY_FOUND FALSE) +endif () + +mark_as_advanced( + RAY_INCLUDE_DIR + RAY_STATIC_LIB + RAY_SHARED_LIB +) diff --git a/python/ray/global_scheduler/global_scheduler_services.py b/python/ray/global_scheduler/global_scheduler_services.py index a7001bb9d..db318f2ad 100644 --- a/python/ray/global_scheduler/global_scheduler_services.py +++ b/python/ray/global_scheduler/global_scheduler_services.py @@ -41,6 +41,7 @@ def start_global_scheduler(redis_address, node_ip_address, "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", + "--leak-check-heuristics=stdstring", "--error-exitcode=1"] + command, stdout=stdout_file, stderr=stderr_file) time.sleep(1.0) diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 4ac69ed5c..0d61673ee 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -78,11 +78,10 @@ target_link_libraries(common "${CMAKE_CURRENT_LIST_DIR}/thirdparty/hiredis/libhi function(define_test test_name library) add_executable(${test_name} test/${test_name}.cc ${ARGN}) add_dependencies(${test_name} hiredis flatbuffers_ep) - target_link_libraries(${test_name} common ${FLATBUFFERS_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a ${library} -lpthread) + target_link_libraries(${test_name} common ${FLATBUFFERS_STATIC_LIB} ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${library} -lpthread) target_compile_options(${test_name} PUBLIC "-DPLASMA_TEST -DLOCAL_SCHEDULER_TEST -DCOMMON_TEST -DRAY_COMMON_LOG_LEVEL=4") endfunction() -define_test(common_tests "") define_test(db_tests "") define_test(io_tests "") define_test(task_tests "") diff --git a/src/common/common.cc b/src/common/common.cc index 3588ab7d3..0a6da6a29 100644 --- a/src/common/common.cc +++ b/src/common/common.cc @@ -10,63 +10,8 @@ #include "io.h" #include -const UniqueID NIL_ID = UniqueID::nil(); - const unsigned char NIL_DIGEST[DIGEST_SIZE] = {0}; -UniqueID globally_unique_id(void) { - /* Use /dev/urandom for "real" randomness. */ - int fd; - int const flags = 0 /* for Windows compatibility */; - if ((fd = open("/dev/urandom", O_RDONLY, flags)) == -1) { - LOG_ERROR("Could not generate random number"); - } - UniqueID result; - CHECK(read_bytes(fd, &result.id[0], UNIQUE_ID_SIZE) >= 0); - close(fd); - return result; -} - -/* ObjectID equality function. */ -bool operator==(const ObjectID &x, const ObjectID &y) { - return UNIQUE_ID_EQ(x, y); -} - -bool ObjectID_equal(ObjectID first_id, ObjectID second_id) { - return UNIQUE_ID_EQ(first_id, second_id); -} - -bool ObjectID_is_nil(ObjectID id) { - return ObjectID_equal(id, NIL_OBJECT_ID); -} - -bool DBClientID_equal(DBClientID first_id, DBClientID second_id) { - return UNIQUE_ID_EQ(first_id, second_id); -} - -bool DBClientID_is_nil(DBClientID id) { - return IS_NIL_ID(id); -} - -bool WorkerID_equal(WorkerID first_id, WorkerID second_id) { - return UNIQUE_ID_EQ(first_id, second_id); -} - -char *ObjectID_to_string(ObjectID obj_id, char *id_string, int id_length) { - CHECK(id_length >= ID_STRING_SIZE); - static const char hex[] = "0123456789abcdef"; - char *buf = id_string; - - for (int i = 0; i < UNIQUE_ID_SIZE; i++) { - unsigned int val = obj_id.id[i]; - *buf++ = hex[val >> 4]; - *buf++ = hex[val & 0xf]; - } - *buf = '\0'; - - return id_string; -} - int64_t current_time_ms() { std::chrono::milliseconds ms_since_epoch = std::chrono::duration_cast( diff --git a/src/common/common.h b/src/common/common.h index 1b7b71b50..0ea258113 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -22,8 +22,9 @@ extern "C" { } #endif -#include "plasma/common.h" #include "arrow/util/macros.h" +#include "plasma/common.h" +#include "ray/id.h" #include "state/ray_config.h" @@ -113,118 +114,10 @@ extern "C" { * and is responsible for freeing it. */ #define OWNER -/** Definitions for unique ID types. */ -#define UNIQUE_ID_SIZE 20 - -#define UNIQUE_ID_EQ(id1, id2) (memcmp((id1).id, (id2).id, UNIQUE_ID_SIZE) == 0) - -#define IS_NIL_ID(id) UNIQUE_ID_EQ(id, NIL_ID) - -struct UniqueID { - unsigned char id[UNIQUE_ID_SIZE]; - UniqueID(const plasma::UniqueID &from) { - memcpy(&id[0], from.data(), UNIQUE_ID_SIZE); - } - UniqueID() {} - static const UniqueID nil() { - UniqueID result; - std::fill_n(result.id, UNIQUE_ID_SIZE, 255); - return result; - } - plasma::UniqueID to_plasma_id() { - plasma::UniqueID result; - memcpy(result.mutable_data(), &id[0], UNIQUE_ID_SIZE); - return result; - } -}; - -extern const UniqueID NIL_ID; - -/* Generate a globally unique ID. */ -UniqueID globally_unique_id(void); - -#define NIL_OBJECT_ID NIL_ID -#define NIL_WORKER_ID NIL_ID - -/** The object ID is the type used to identify objects. */ -typedef UniqueID ObjectID; - -#ifdef __cplusplus - -struct UniqueIDHasher { - /* ObjectID hashing function. */ - size_t operator()(const UniqueID &id) const { - size_t result; - memcpy(&result, id.id, sizeof(size_t)); - return result; - } -}; - -bool operator==(const ObjectID &x, const ObjectID &y); -#endif - -#define ID_STRING_SIZE (2 * UNIQUE_ID_SIZE + 1) - -/** - * Convert an object ID to a hexdecimal string. This function assumes that - * buffer points to an already allocated char array of size ID_STRING_SIZE. And - * it writes a null-terminated hex-formatted string to id_string. - * - * @param obj_id The object ID to convert to a string. - * @param id_string A buffer to write the string to. It is assumed that this is - * managed by the caller and is sufficiently long to store the object ID - * string. - * @param id_length The length of the id_string buffer. - */ -char *ObjectID_to_string(ObjectID obj_id, char *id_string, int id_length); - -/** - * Compare two object IDs. - * - * @param first_id The first object ID to compare. - * @param second_id The first object ID to compare. - * @return True if the object IDs are the same and false otherwise. - */ -bool ObjectID_equal(ObjectID first_id, ObjectID second_id); - -/** - * Compare a object ID to the nil ID. - * - * @param id The object ID to compare to nil. - * @return True if the object ID is equal to nil. - */ -bool ObjectID_is_nil(ObjectID id); - /** The worker ID is the ID of a worker or driver. */ -typedef UniqueID WorkerID; +typedef ray::UniqueID WorkerID; -/** - * Compare two worker IDs. - * - * @param first_id The first worker ID to compare. - * @param second_id The first worker ID to compare. - * @return True if the worker IDs are the same and false otherwise. - */ -bool WorkerID_equal(WorkerID first_id, WorkerID second_id); - -typedef UniqueID DBClientID; - -/** - * Compare two db client IDs. - * - * @param first_id The first db client ID to compare. - * @param second_id The first db client ID to compare. - * @return True if the db client IDs are the same and false otherwise. - */ -bool DBClientID_equal(DBClientID first_id, DBClientID second_id); - -/** - * Compare a db client ID to the nil ID. - * - * @param id The db client ID to compare to nil. - * @return True if the db client ID is equal to nil. - */ -bool DBClientID_is_nil(ObjectID id); +typedef ray::UniqueID DBClientID; #define MAX(x, y) ((x) >= (y) ? (x) : (y)) #define MIN(x, y) ((x) <= (y) ? (x) : (y)) diff --git a/src/common/common_protocol.cc b/src/common/common_protocol.cc index d27efa2ce..49716bbb0 100644 --- a/src/common/common_protocol.cc +++ b/src/common/common_protocol.cc @@ -2,21 +2,22 @@ flatbuffers::Offset to_flatbuf( flatbuffers::FlatBufferBuilder &fbb, - ObjectID object_id) { - return fbb.CreateString((char *) &object_id.id[0], sizeof(object_id.id)); + ray::ObjectID object_id) { + return fbb.CreateString(reinterpret_cast(object_id.data()), + sizeof(ray::ObjectID)); } -ObjectID from_flatbuf(const flatbuffers::String &string) { - ObjectID object_id; - CHECK(string.size() == sizeof(object_id.id)); - memcpy(&object_id.id[0], string.data(), sizeof(object_id.id)); +ray::ObjectID from_flatbuf(const flatbuffers::String &string) { + ray::ObjectID object_id; + CHECK(string.size() == sizeof(ray::ObjectID)); + memcpy(object_id.mutable_data(), string.data(), sizeof(ray::ObjectID)); return object_id; } -const std::vector from_flatbuf( +const std::vector from_flatbuf( const flatbuffers::Vector> &vector) { - std::vector object_ids; + std::vector object_ids; for (int64_t i = 0; i < vector.Length(); i++) { object_ids.push_back(from_flatbuf(*vector.Get(i))); } @@ -26,7 +27,7 @@ const std::vector from_flatbuf( flatbuffers::Offset< flatbuffers::Vector>> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, - ObjectID object_ids[], + ray::ObjectID object_ids[], int64_t num_objects) { std::vector> results; for (int64_t i = 0; i < num_objects; i++) { @@ -38,7 +39,7 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, flatbuffers::Offset< flatbuffers::Vector>> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, - const std::vector &object_ids) { + const std::vector &object_ids) { std::vector> results; for (auto object_id : object_ids) { results.push_back(to_flatbuf(fbb, object_id)); diff --git a/src/common/common_protocol.h b/src/common/common_protocol.h index 3ad8ffd5a..6c2c177dd 100644 --- a/src/common/common_protocol.h +++ b/src/common/common_protocol.h @@ -16,19 +16,19 @@ /// @return The flatbuffer string contining the object ID. flatbuffers::Offset to_flatbuf( flatbuffers::FlatBufferBuilder &fbb, - ObjectID object_id); + ray::ObjectID object_id); /// Convert a flatbuffer string to an object ID. /// /// @param string The flatbuffer string. /// @return The object ID. -ObjectID from_flatbuf(const flatbuffers::String &string); +ray::ObjectID from_flatbuf(const flatbuffers::String &string); /// Convert a flatbuffer vector of strings to a vector of object IDs. /// /// @param vector The flatbuffer vector. /// @return The vector of object IDs. -const std::vector from_flatbuf( +const std::vector from_flatbuf( const flatbuffers::Vector> &vector); @@ -41,7 +41,7 @@ const std::vector from_flatbuf( flatbuffers::Offset< flatbuffers::Vector>> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, - ObjectID object_ids[], + ray::ObjectID object_ids[], int64_t num_objects); /// Convert a vector of object IDs to a flatbuffer vector of strings. @@ -52,7 +52,7 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, flatbuffers::Offset< flatbuffers::Vector>> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, - const std::vector &object_ids); + const std::vector &object_ids); /// Convert a flatbuffer string to a std::string. /// diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 7806f4df0..a93a7ab48 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -44,7 +44,8 @@ TaskBuilder *g_task_builder = NULL; int PyStringToUniqueID(PyObject *object, ObjectID *object_id) { if (PyBytes_Check(object)) { - memcpy(&object_id->id[0], PyBytes_AsString(object), UNIQUE_ID_SIZE); + std::memcpy(object_id->mutable_data(), PyBytes_AsString(object), + sizeof(*object_id)); return 1; } else { PyErr_SetString(PyExc_TypeError, "must be a 20 character string"); @@ -73,7 +74,7 @@ static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) { "ObjectID: object id string needs to have length 20"); return -1; } - memcpy(&self->object_id.id[0], data, sizeof(self->object_id.id)); + std::memcpy(self->object_id.mutable_data(), data, sizeof(self->object_id)); return 0; } @@ -132,15 +133,14 @@ PyObject *PyTask_to_string(PyObject *self, PyObject *args) { static PyObject *PyObjectID_id(PyObject *self) { PyObjectID *s = (PyObjectID *) self; - return PyBytes_FromStringAndSize((char *) &s->object_id.id[0], - sizeof(s->object_id.id)); + return PyBytes_FromStringAndSize((const char *) s->object_id.data(), + sizeof(s->object_id)); } static PyObject *PyObjectID_hex(PyObject *self) { PyObjectID *s = (PyObjectID *) self; - char hex_id[ID_STRING_SIZE]; - ObjectID_to_string(s->object_id, hex_id, ID_STRING_SIZE); - PyObject *result = PyUnicode_FromString(hex_id); + std::string hex_id = s->object_id.hex(); + PyObject *result = PyUnicode_FromString(hex_id.c_str()); return result; } @@ -160,12 +160,10 @@ static PyObject *PyObjectID_richcompare(PyObjectID *self, result = Py_NotImplemented; break; case Py_EQ: - result = ObjectID_equal(self->object_id, other_id->object_id) ? Py_True - : Py_False; + result = self->object_id == other_id->object_id ? Py_True : Py_False; break; case Py_NE: - result = !ObjectID_equal(self->object_id, other_id->object_id) ? Py_True - : Py_False; + result = !(self->object_id == other_id->object_id) ? Py_True : Py_False; break; case Py_GT: result = Py_NotImplemented; @@ -188,9 +186,11 @@ static PyObject *PyObjectID_redis_shard_hash(PyObjectID *self) { } static long PyObjectID_hash(PyObjectID *self) { - PyObject *tuple = PyTuple_New(UNIQUE_ID_SIZE); - for (int i = 0; i < UNIQUE_ID_SIZE; ++i) { - PyTuple_SetItem(tuple, i, PyLong_FromLong(self->object_id.id[i])); + // TODO(pcm): Replace this with a faster hash function. This currently + // creates a tuple of length 20 and hashes it, which is slow + PyObject *tuple = PyTuple_New(kUniqueIDSize); + for (int i = 0; i < kUniqueIDSize; ++i) { + PyTuple_SetItem(tuple, i, PyLong_FromLong(self->object_id.data()[i])); } long hash = PyObject_Hash(tuple); Py_XDECREF(tuple); @@ -198,9 +198,7 @@ static long PyObjectID_hash(PyObjectID *self) { } static PyObject *PyObjectID_repr(PyObjectID *self) { - char hex_id[ID_STRING_SIZE]; - ObjectID_to_string(self->object_id, hex_id, ID_STRING_SIZE); - std::string repr = "ObjectID(" + std::string(hex_id) + ")"; + std::string repr = "ObjectID(" + self->object_id.hex() + ")"; PyObject *result = PyUnicode_FromString(repr.c_str()); return result; } @@ -274,9 +272,9 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { /* ID of the driver that this task originates from. */ UniqueID driver_id; /* ID of the actor this task should run on. */ - UniqueID actor_id = NIL_ACTOR_ID; + UniqueID actor_id = UniqueID::nil(); /* ID of the actor handle used to submit this task. */ - UniqueID actor_handle_id = NIL_ACTOR_ID; + UniqueID actor_handle_id = UniqueID::nil(); /* How many tasks have been launched on the actor so far? */ int actor_counter = 0; /* True if this is an actor checkpoint task and false otherwise. */ diff --git a/src/common/lib/python/common_extension.h b/src/common/lib/python/common_extension.h index b3e8e4f89..c72f3e8ea 100644 --- a/src/common/lib/python/common_extension.h +++ b/src/common/lib/python/common_extension.h @@ -9,7 +9,7 @@ #include "common.h" -typedef uint8_t TaskSpec; +typedef char TaskSpec; class TaskBuilder; extern PyObject *CommonError; @@ -17,14 +17,14 @@ extern PyObject *CommonError; // clang-format off typedef struct { PyObject_HEAD - ObjectID object_id; + ray::ObjectID object_id; } PyObjectID; typedef struct { PyObject_HEAD int64_t size; TaskSpec *spec; - std::vector *execution_dependencies; + std::vector *execution_dependencies; } PyTask; // clang-format on @@ -41,11 +41,11 @@ void init_pickle_module(void); extern TaskBuilder *g_task_builder; -int PyStringToUniqueID(PyObject *object, ObjectID *object_id); +int PyStringToUniqueID(PyObject *object, ray::ObjectID *object_id); -int PyObjectToUniqueID(PyObject *object, ObjectID *object_id); +int PyObjectToUniqueID(PyObject *object, ray::ObjectID *object_id); -PyObject *PyObjectID_make(ObjectID object_id); +PyObject *PyObjectID_make(ray::ObjectID object_id); PyObject *check_simple_value(PyObject *self, PyObject *args); diff --git a/src/common/logging.cc b/src/common/logging.cc index 500e24d19..9802dd3d0 100644 --- a/src/common/logging.cc +++ b/src/common/logging.cc @@ -50,7 +50,7 @@ void RayLogger_log(RayLogger *logger, if (log_level < logger->log_level) { return; } - if (log_level < RAY_DEBUG || log_level > RAY_FATAL) { + if (log_level < RAY_LOG_DEBUG || log_level > RAY_LOG_FATAL) { return; } struct timeval tv; @@ -79,7 +79,7 @@ void RayLogger_log(RayLogger *logger, int status = redisAsyncCommand(context, NULL, NULL, formatted_message, - (char *) db->client.id, sizeof(db->client.id)); + (char *) db->client.data(), sizeof(db->client)); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error while logging message to log table"); } diff --git a/src/common/logging.h b/src/common/logging.h index b122621e6..1fa57a60c 100644 --- a/src/common/logging.h +++ b/src/common/logging.h @@ -1,12 +1,12 @@ #ifndef LOGGING_H #define LOGGING_H -#define RAY_VERBOSE -1 -#define RAY_DEBUG 0 -#define RAY_INFO 1 -#define RAY_WARNING 2 -#define RAY_ERROR 3 -#define RAY_FATAL 4 +#define RAY_LOG_VERBOSE -1 +#define RAY_LOG_DEBUG 0 +#define RAY_LOG_INFO 1 +#define RAY_LOG_WARNING 2 +#define RAY_LOG_ERROR 3 +#define RAY_LOG_FATAL 4 /* Entity types. */ #define RAY_FUNCTION "FUNCTION" diff --git a/src/common/state/actor_notification_table.cc b/src/common/state/actor_notification_table.cc index fdbcba913..e99811960 100644 --- a/src/common/state/actor_notification_table.cc +++ b/src/common/state/actor_notification_table.cc @@ -12,7 +12,7 @@ void actor_notification_table_subscribe( sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; - init_table_callback(db_handle, NIL_ID, __func__, + init_table_callback(db_handle, UniqueID::nil(), __func__, new CommonCallbackData(sub_data), retry, NULL, redis_actor_notification_table_subscribe, NULL); } diff --git a/src/common/state/db_client_table.cc b/src/common/state/db_client_table.cc index 03c5042ea..ac9a809b9 100644 --- a/src/common/state/db_client_table.cc +++ b/src/common/state/db_client_table.cc @@ -24,7 +24,7 @@ void db_client_table_subscribe( sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; - init_table_callback(db_handle, NIL_ID, __func__, + init_table_callback(db_handle, UniqueID::nil(), __func__, new CommonCallbackData(sub_data), retry, (table_done_callback) done_callback, redis_db_client_table_subscribe, user_context); @@ -71,7 +71,7 @@ void db_client_table_cache_init(DBHandle *db_handle) { } DBClient db_client_table_cache_get(DBHandle *db_handle, DBClientID client_id) { - CHECK(!DBClientID_is_nil(client_id)); + CHECK(!client_id.is_nil()); return redis_cache_get_db_client(db_handle, client_id); } @@ -82,7 +82,8 @@ void plasma_manager_send_heartbeat(DBHandle *db_handle) { RayConfig::instance().heartbeat_timeout_milliseconds(); heartbeat_retry.fail_callback = NULL; - init_table_callback(db_handle, NIL_ID, __func__, new CommonCallbackData(NULL), + init_table_callback(db_handle, UniqueID::nil(), __func__, + new CommonCallbackData(NULL), (RetryInfo *) &heartbeat_retry, NULL, redis_plasma_manager_send_heartbeat, NULL); } diff --git a/src/common/state/driver_table.cc b/src/common/state/driver_table.cc index d458c6d40..b8732e986 100644 --- a/src/common/state/driver_table.cc +++ b/src/common/state/driver_table.cc @@ -9,7 +9,7 @@ void driver_table_subscribe(DBHandle *db_handle, (DriverTableSubscribeData *) malloc(sizeof(DriverTableSubscribeData)); sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; - init_table_callback(db_handle, NIL_ID, __func__, + init_table_callback(db_handle, UniqueID::nil(), __func__, new CommonCallbackData(sub_data), retry, NULL, redis_driver_table_subscribe, NULL); } diff --git a/src/common/state/error_table.cc b/src/common/state/error_table.cc index f7d30196e..582c84884 100644 --- a/src/common/state/error_table.cc +++ b/src/common/state/error_table.cc @@ -13,7 +13,7 @@ void push_error(DBHandle *db_handle, DBClientID driver_id, int error_index, size_t data_length, - unsigned char *data) { + const unsigned char *data) { CHECK(error_index >= 0 && error_index < MAX_ERROR_INDEX); /* Allocate a struct to hold the error information. */ ErrorInfo *info = (ErrorInfo *) malloc(sizeof(ErrorInfo) + data_length); @@ -22,10 +22,11 @@ void push_error(DBHandle *db_handle, info->data_length = data_length; memcpy(info->data, data, data_length); /* Generate a random key to identify this error message. */ - CHECK(sizeof(info->error_key) >= UNIQUE_ID_SIZE); - UniqueID error_key = globally_unique_id(); - memcpy(info->error_key, error_key.id, sizeof(info->error_key)); + CHECK(sizeof(info->error_key) >= sizeof(UniqueID)); + UniqueID error_key = UniqueID::from_random(); + memcpy(info->error_key, error_key.data(), sizeof(info->error_key)); - init_table_callback(db_handle, NIL_ID, __func__, new CommonCallbackData(info), - NULL, NULL, redis_push_error, NULL); + init_table_callback(db_handle, UniqueID::nil(), __func__, + new CommonCallbackData(info), NULL, NULL, + redis_push_error, NULL); } diff --git a/src/common/state/error_table.h b/src/common/state/error_table.h index 0b297283f..415ffbe4f 100644 --- a/src/common/state/error_table.h +++ b/src/common/state/error_table.h @@ -48,6 +48,6 @@ void push_error(DBHandle *db_handle, DBClientID driver_id, int error_index, size_t data_length, - unsigned char *data); + const unsigned char *data); #endif diff --git a/src/common/state/local_scheduler_table.cc b/src/common/state/local_scheduler_table.cc index 9ae058d36..bacf90253 100644 --- a/src/common/state/local_scheduler_table.cc +++ b/src/common/state/local_scheduler_table.cc @@ -13,7 +13,7 @@ void local_scheduler_table_subscribe( sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; - init_table_callback(db_handle, NIL_ID, __func__, + init_table_callback(db_handle, UniqueID::nil(), __func__, new CommonCallbackData(sub_data), retry, NULL, redis_local_scheduler_table_subscribe, NULL); } @@ -37,8 +37,9 @@ void local_scheduler_table_send_info(DBHandle *db_handle, data->size = fbb.GetSize(); memcpy(&data->flatbuffer_data[0], fbb.GetBufferPointer(), fbb.GetSize()); - init_table_callback(db_handle, NIL_ID, __func__, new CommonCallbackData(data), - retry, NULL, redis_local_scheduler_table_send_info, NULL); + init_table_callback(db_handle, UniqueID::nil(), __func__, + new CommonCallbackData(data), retry, NULL, + redis_local_scheduler_table_send_info, NULL); } void local_scheduler_table_disconnect(DBHandle *db_handle) { diff --git a/src/common/state/object_table.cc b/src/common/state/object_table.cc index 0c6d55252..13a45b834 100644 --- a/src/common/state/object_table.cc +++ b/src/common/state/object_table.cc @@ -67,7 +67,7 @@ void object_table_subscribe_to_notifications( sub_data->subscribe_all = subscribe_all; init_table_callback( - db_handle, NIL_OBJECT_ID, __func__, new CommonCallbackData(sub_data), + db_handle, ObjectID::nil(), __func__, new CommonCallbackData(sub_data), retry, (table_done_callback) done_callback, redis_object_table_subscribe_to_notifications, user_context); } @@ -85,7 +85,7 @@ void object_table_request_notifications(DBHandle *db_handle, data->num_object_ids = num_object_ids; memcpy(data->object_ids, object_ids, num_object_ids * sizeof(ObjectID)); - init_table_callback(db_handle, NIL_OBJECT_ID, __func__, + init_table_callback(db_handle, ObjectID::nil(), __func__, new CommonCallbackData(data), retry, NULL, redis_object_table_request_notifications, NULL); } @@ -101,7 +101,7 @@ void object_info_subscribe(DBHandle *db_handle, sub_data->subscribe_callback = subscribe_callback; sub_data->subscribe_context = subscribe_context; - init_table_callback(db_handle, NIL_OBJECT_ID, __func__, + init_table_callback(db_handle, ObjectID::nil(), __func__, new CommonCallbackData(sub_data), retry, (table_done_callback) done_callback, redis_object_info_subscribe, user_context); diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index e9a05888d..9d7e0ef1a 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -207,8 +207,8 @@ void db_connect_shard(const std::string &db_address, argv[0] = "RAY.CONNECT"; argvlen[0] = strlen(argv[0]); /* Set the client ID argument. */ - argv[1] = (char *) client.id; - argvlen[1] = sizeof(client.id); + argv[1] = (char *) client.data(); + argvlen[1] = sizeof(client); /* Set the node IP address argument. */ argv[2] = node_ip_address; argvlen[2] = strlen(node_ip_address); @@ -265,7 +265,7 @@ DBHandle *db_connect(const std::string &db_primary_address, } /* Create a client ID for this client. */ - DBClientID client = globally_unique_id(); + DBClientID client = DBClientID::from_random(); DBHandle *db = new DBHandle(); @@ -325,7 +325,7 @@ void db_disconnect(DBHandle *db) { * reconnect and get assigned a different client ID. */ redisReply *reply = (redisReply *) redisCommand(db->sync_context, "RAY.DISCONNECT %b", - db->client.id, sizeof(db->client.id)); + db->client.data(), sizeof(db->client)); CHECK(reply->type != REDIS_REPLY_ERROR); CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str); freeReplyObject(reply); @@ -408,8 +408,8 @@ void redis_object_table_add(TableCallbackData *callback_data) { int status = redisAsyncCommand( context, redis_object_table_add_callback, (void *) callback_data->timer_id, "RAY.OBJECT_TABLE_ADD %b %lld %b %b", - obj_id.id, sizeof(obj_id.id), (long long) object_size, digest, - (size_t) DIGEST_SIZE, db->client.id, sizeof(db->client.id)); + obj_id.data(), sizeof(obj_id), (long long) object_size, digest, + (size_t) DIGEST_SIZE, db->client.data(), sizeof(db->client)); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in redis_object_table_add"); @@ -456,7 +456,7 @@ void redis_object_table_remove(TableCallbackData *callback_data) { int status = redisAsyncCommand( context, redis_object_table_remove_callback, (void *) callback_data->timer_id, "RAY.OBJECT_TABLE_REMOVE %b %b", - obj_id.id, sizeof(obj_id.id), client_id->id, sizeof(client_id->id)); + obj_id.data(), sizeof(obj_id), client_id->data(), sizeof(*client_id)); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in redis_object_table_remove"); @@ -473,8 +473,8 @@ void redis_object_table_lookup(TableCallbackData *callback_data) { int status = redisAsyncCommand(context, redis_object_table_lookup_callback, (void *) callback_data->timer_id, - "RAY.OBJECT_TABLE_LOOKUP %b", obj_id.id, - sizeof(obj_id.id)); + "RAY.OBJECT_TABLE_LOOKUP %b", obj_id.data(), + sizeof(obj_id)); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in object_table lookup"); } @@ -508,10 +508,11 @@ void redis_result_table_add(TableCallbackData *callback_data) { redisAsyncContext *context = get_redis_context(db, id); /* Add the result entry to the result table. */ - int status = redisAsyncCommand( - context, redis_result_table_add_callback, - (void *) callback_data->timer_id, "RAY.RESULT_TABLE_ADD %b %b %d", id.id, - sizeof(id.id), info->task_id.id, sizeof(info->task_id.id), is_put); + int status = + redisAsyncCommand(context, redis_result_table_add_callback, + (void *) callback_data->timer_id, + "RAY.RESULT_TABLE_ADD %b %b %d", id.data(), sizeof(id), + info->task_id.data(), sizeof(info->task_id), is_put); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "Error in result table add"); } @@ -554,7 +555,7 @@ void redis_result_table_lookup_callback(redisAsyncContext *c, "Unexpected reply type %d in redis_result_table_lookup_callback", reply->type); /* Parse the task from the reply. */ - TaskID result_id = NIL_TASK_ID; + TaskID result_id = TaskID::nil(); bool is_put = false; if (reply->type == REDIS_REPLY_STRING) { auto message = flatbuffers::GetRoot(reply->str); @@ -581,14 +582,14 @@ void redis_result_table_lookup(TableCallbackData *callback_data) { int status = redisAsyncCommand(context, redis_result_table_lookup_callback, (void *) callback_data->timer_id, - "RAY.RESULT_TABLE_LOOKUP %b", id.id, sizeof(id.id)); + "RAY.RESULT_TABLE_LOOKUP %b", id.data(), sizeof(id)); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "Error in result table lookup"); } } DBClient redis_db_client_table_get(DBHandle *db, - unsigned char *client_id, + const unsigned char *client_id, size_t client_id_len) { redisReply *reply = (redisReply *) redisCommand(db->sync_context, "HGETALL %s%b", @@ -602,7 +603,7 @@ DBClient redis_db_client_table_get(DBHandle *db, const char *key = reply->element[j]->str; const char *value = reply->element[j + 1]->str; if (strcmp(key, "ray_client_id") == 0) { - memcpy(db_client.id.id, value, sizeof(db_client.id)); + memcpy(db_client.id.mutable_data(), value, sizeof(db_client.id)); num_fields++; } else if (strcmp(key, "client_type") == 0) { db_client.client_type = std::string(value); @@ -637,8 +638,8 @@ void redis_cache_set_db_client(DBHandle *db, DBClient client) { DBClient redis_cache_get_db_client(DBHandle *db, DBClientID db_client_id) { auto it = db->db_client_cache.find(db_client_id); if (it == db->db_client_cache.end()) { - DBClient db_client = - redis_db_client_table_get(db, db_client_id.id, sizeof(db_client_id.id)); + DBClient db_client = redis_db_client_table_get(db, db_client_id.data(), + sizeof(db_client_id)); db->db_client_cache[db_client_id] = db_client; it = db->db_client_cache.find(db_client_id); } @@ -672,7 +673,8 @@ void redis_object_table_lookup_callback(redisAsyncContext *c, for (size_t j = 0; j < reply->elements; ++j) { CHECK(reply->element[j]->type == REDIS_REPLY_STRING); DBClientID manager_id; - memcpy(manager_id.id, reply->element[j]->str, sizeof(manager_id.id)); + memcpy(manager_id.mutable_data(), reply->element[j]->str, + sizeof(manager_id)); manager_ids.push_back(manager_id); } @@ -743,7 +745,7 @@ void object_table_redis_subscribe_to_notifications_callback( if (callback_data->done_callback != NULL) { object_table_lookup_done_callback done_callback = (object_table_lookup_done_callback) callback_data->done_callback; - done_callback(NIL_ID, false, std::vector(), + done_callback(ray::UniqueID::nil(), false, std::vector(), callback_data->user_context); } /* If the initial SUBSCRIBE was successful, clean up the timer, but don't @@ -783,7 +785,7 @@ void redis_object_table_subscribe_to_notifications( db->subscribe_contexts[i], object_table_redis_subscribe_to_notifications_callback, (void *) callback_data->timer_id, "SUBSCRIBE %s%b", - object_channel_prefix, db->client.id, sizeof(db->client.id)); + object_channel_prefix, db->client.data(), sizeof(db->client)); } if ((status == REDIS_ERR) || db->subscribe_contexts[i]->err) { @@ -827,11 +829,11 @@ void redis_object_table_request_notifications( argv[0] = "RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS"; argvlen[0] = strlen(argv[0]); /* Set the client ID argument. */ - argv[1] = (char *) db->client.id; - argvlen[1] = sizeof(db->client.id); + argv[1] = (char *) db->client.data(); + argvlen[1] = sizeof(db->client); /* Set the object ID arguments. */ - argv[2] = (char *) object_ids[i].id; - argvlen[2] = sizeof(object_ids[i].id); + argv[2] = (char *) object_ids[i].data(); + argvlen[2] = sizeof(object_ids[i]); int status = redisAsyncCommandArgv( context, redis_object_table_request_notifications_callback, @@ -881,8 +883,8 @@ void redis_task_table_get_task(TableCallbackData *callback_data) { int status = redisAsyncCommand(context, redis_task_table_get_task_callback, (void *) callback_data->timer_id, - "RAY.TASK_TABLE_GET %b", task_id.id, - sizeof(task_id.id)); + "RAY.TASK_TABLE_GET %b", task_id.data(), + sizeof(task_id)); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in redis_task_table_get_task"); } @@ -942,8 +944,8 @@ void redis_task_table_add_task(TableCallbackData *callback_data) { int status = redisAsyncCommand( context, redis_task_table_add_task_callback, (void *) callback_data->timer_id, "RAY.TASK_TABLE_ADD %b %d %b %b %b", - task_id.id, sizeof(task_id.id), state, local_scheduler_id.id, - sizeof(local_scheduler_id.id), fbb.GetBufferPointer(), + task_id.data(), sizeof(task_id), state, local_scheduler_id.data(), + sizeof(local_scheduler_id), fbb.GetBufferPointer(), (size_t) fbb.GetSize(), spec, execution_spec->SpecSize()); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in redis_task_table_add_task"); @@ -1004,8 +1006,8 @@ void redis_task_table_update(TableCallbackData *callback_data) { int status = redisAsyncCommand( context, redis_task_table_update_callback, (void *) callback_data->timer_id, "RAY.TASK_TABLE_UPDATE %b %d %b %b", - task_id.id, sizeof(task_id.id), state, local_scheduler_id.id, - sizeof(local_scheduler_id.id), fbb.GetBufferPointer(), + task_id.data(), sizeof(task_id), state, local_scheduler_id.data(), + sizeof(local_scheduler_id), fbb.GetBufferPointer(), (size_t) fbb.GetSize()); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in redis_task_table_update"); @@ -1055,24 +1057,24 @@ void redis_task_table_test_and_update(TableCallbackData *callback_data) { int status; /* If the test local scheduler ID is NIL, then ignore it. */ - if (IS_NIL_ID(update_data->test_local_scheduler_id)) { + if (update_data->test_local_scheduler_id.is_nil()) { status = redisAsyncCommand( context, redis_task_table_test_and_update_callback, (void *) callback_data->timer_id, - "RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b", task_id.id, - sizeof(task_id.id), update_data->test_state_bitmask, - update_data->update_state, update_data->local_scheduler_id.id, - sizeof(update_data->local_scheduler_id.id)); + "RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b", task_id.data(), + sizeof(task_id), update_data->test_state_bitmask, + update_data->update_state, update_data->local_scheduler_id.data(), + sizeof(update_data->local_scheduler_id)); } else { status = redisAsyncCommand( context, redis_task_table_test_and_update_callback, (void *) callback_data->timer_id, - "RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b %b", task_id.id, - sizeof(task_id.id), update_data->test_state_bitmask, - update_data->update_state, update_data->local_scheduler_id.id, - sizeof(update_data->local_scheduler_id.id), - update_data->test_local_scheduler_id.id, - sizeof(update_data->test_local_scheduler_id.id)); + "RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b %b", task_id.data(), + sizeof(task_id), update_data->test_state_bitmask, + update_data->update_state, update_data->local_scheduler_id.data(), + sizeof(update_data->local_scheduler_id), + update_data->test_local_scheduler_id.data(), + sizeof(update_data->test_local_scheduler_id)); } if ((status == REDIS_ERR) || context->err) { @@ -1152,7 +1154,7 @@ void redis_task_table_subscribe(TableCallbackData *callback_data) { const char *TASK_CHANNEL_PREFIX = "TT:"; for (auto subscribe_context : db->subscribe_contexts) { int status; - if (IS_NIL_ID(data->local_scheduler_id)) { + if (data->local_scheduler_id.is_nil()) { /* TODO(swang): Implement the state_filter by translating the bitmask into * a Redis key-matching pattern. */ status = redisAsyncCommand( @@ -1164,8 +1166,8 @@ void redis_task_table_subscribe(TableCallbackData *callback_data) { status = redisAsyncCommand( subscribe_context, redis_task_table_subscribe_callback, (void *) callback_data->timer_id, "SUBSCRIBE %s%b:%d", - TASK_CHANNEL_PREFIX, (char *) local_scheduler_id.id, - sizeof(local_scheduler_id.id), data->state_filter); + TASK_CHANNEL_PREFIX, (char *) local_scheduler_id.data(), + sizeof(local_scheduler_id), data->state_filter); } if ((status == REDIS_ERR) || subscribe_context->err) { LOG_REDIS_DEBUG(subscribe_context, "error in redis_task_table_subscribe"); @@ -1201,7 +1203,7 @@ void redis_db_client_table_remove(TableCallbackData *callback_data) { int status = redisAsyncCommand(db->context, redis_db_client_table_remove_callback, (void *) callback_data->timer_id, "RAY.DISCONNECT %b", - callback_data->id.id, sizeof(callback_data->id.id)); + callback_data->id.data(), sizeof(callback_data->id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "error in db_client_table_remove"); } @@ -1512,7 +1514,7 @@ void redis_plasma_manager_send_heartbeat(TableCallbackData *callback_data) { * memory for callback data each time. */ int status = redisAsyncCommand( db->context, NULL, (void *) callback_data->timer_id, - "PUBLISH plasma_managers %b", db->client.id, sizeof(db->client.id)); + "PUBLISH plasma_managers %b", db->client.data(), sizeof(db->client)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "error in redis_plasma_manager_send_heartbeat"); @@ -1598,7 +1600,7 @@ void redis_actor_notification_table_subscribe( void redis_actor_table_mark_removed(DBHandle *db, ActorID actor_id) { int status = redisAsyncCommand(db->context, NULL, NULL, "HSET Actor:%b removed \"1\"", - actor_id.id, sizeof(actor_id.id)); + actor_id.data(), sizeof(actor_id)); if ((status == REDIS_ERR) || db->subscribe_context->err) { LOG_REDIS_DEBUG(db->context, "error in redis_actor_table_mark_removed"); } @@ -1633,7 +1635,7 @@ void redis_object_info_subscribe_callback(redisAsyncContext *c, ObjectInfoSubscribeData *data = (ObjectInfoSubscribeData *) callback_data->data->Get(); ObjectID object_id; - memcpy(object_id.id, payload->str, sizeof(object_id.id)); + memcpy(object_id.mutable_data(), payload->str, sizeof(object_id)); /* payload->str should have the format: "ObjectID:object_size_int" */ LOG_DEBUG("obj:info channel received message <%s>", payload->str); if (data->subscribe_callback) { @@ -1676,11 +1678,11 @@ void redis_push_error_hmset_callback(redisAsyncContext *c, /* Add the error to this driver's list of errors. */ ErrorInfo *info = (ErrorInfo *) callback_data->data->Get(); - int status = redisAsyncCommand(db->context, redis_push_error_rpush_callback, - (void *) callback_data->timer_id, - "RPUSH ErrorKeys Error:%b:%b", - info->driver_id.id, sizeof(info->driver_id.id), - info->error_key, sizeof(info->error_key)); + int status = redisAsyncCommand( + db->context, redis_push_error_rpush_callback, + (void *) callback_data->timer_id, "RPUSH ErrorKeys Error:%b:%b", + info->driver_id.data(), sizeof(info->driver_id), info->error_key, + sizeof(info->error_key)); if ((status == REDIS_ERR) || db->subscribe_context->err) { LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error rpush"); } @@ -1698,8 +1700,8 @@ void redis_push_error(TableCallbackData *callback_data) { int status = redisAsyncCommand( db->context, redis_push_error_hmset_callback, (void *) callback_data->timer_id, - "HMSET Error:%b:%b type %s message %s data %b", info->driver_id.id, - sizeof(info->driver_id.id), info->error_key, sizeof(info->error_key), + "HMSET Error:%b:%b type %s message %s data %b", info->driver_id.data(), + sizeof(info->driver_id), info->error_key, sizeof(info->error_key), error_type, error_message, info->data, info->data_length); if ((status == REDIS_ERR) || db->subscribe_context->err) { LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error hmset"); diff --git a/src/common/task.cc b/src/common/task.cc index 235a71b14..0cdd0f095 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -145,23 +145,23 @@ void free_task_builder(TaskBuilder *builder) { } bool TaskID_equal(TaskID first_id, TaskID second_id) { - return UNIQUE_ID_EQ(first_id, second_id); + return first_id == second_id; } bool TaskID_is_nil(TaskID id) { - return TaskID_equal(id, NIL_TASK_ID); + return id.is_nil(); } bool ActorID_equal(ActorID first_id, ActorID second_id) { - return UNIQUE_ID_EQ(first_id, second_id); + return first_id == second_id; } bool FunctionID_equal(FunctionID first_id, FunctionID second_id) { - return UNIQUE_ID_EQ(first_id, second_id); + return first_id == second_id; } bool FunctionID_is_nil(FunctionID id) { - return FunctionID_equal(id, NIL_FUNCTION_ID); + return id.is_nil(); } /* Functions for building tasks. */ @@ -181,8 +181,8 @@ void TaskSpec_start_construct(TaskBuilder *builder, function_id, num_returns); } -uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) { - return builder->Finish(size); +TaskSpec *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) { + return reinterpret_cast(builder->Finish(size)); } void TaskSpec_args_add_ref(TaskBuilder *builder, @@ -205,7 +205,7 @@ void TaskSpec_set_required_resource(TaskBuilder *builder, /* Functions for reading tasks. */ -TaskID TaskSpec_task_id(TaskSpec *spec) { +TaskID TaskSpec_task_id(const TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); return from_flatbuf(*message->task_id()); @@ -230,7 +230,7 @@ ActorID TaskSpec_actor_handle_id(TaskSpec *spec) { } bool TaskSpec_is_actor_task(TaskSpec *spec) { - return !ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID); + return !TaskSpec_actor_id(spec).is_nil(); } int64_t TaskSpec_actor_counter(TaskSpec *spec) { @@ -258,13 +258,13 @@ bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) { } } -UniqueID TaskSpec_driver_id(TaskSpec *spec) { +UniqueID TaskSpec_driver_id(const TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); return from_flatbuf(*message->driver_id()); } -TaskID TaskSpec_parent_task_id(TaskSpec *spec) { +TaskID TaskSpec_parent_task_id(const TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); return from_flatbuf(*message->parent_task_id()); @@ -451,14 +451,14 @@ bool TaskExecutionSpec::DependsOn(ObjectID object_id) { int count = TaskSpec_arg_id_count(spec, i); for (int j = 0; j < count; j++) { ObjectID arg_id = TaskSpec_arg_id(spec, i, j); - if (ObjectID_equal(arg_id, object_id)) { + if (arg_id == object_id) { return true; } } } // Iterate through the execution dependencies to see if it contains object_id. for (auto dependency_id : execution_dependencies_) { - if (ObjectID_equal(dependency_id, object_id)) { + if (dependency_id == object_id) { return true; } } diff --git a/src/common/task.h b/src/common/task.h index 1773a22f5..d52abf139 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -11,7 +11,9 @@ #include "format/common_generated.h" -typedef uint8_t TaskSpec; +using namespace ray; + +typedef char TaskSpec; class TaskExecutionSpec { public: @@ -82,10 +84,6 @@ class TaskExecutionSpec { class TaskBuilder; -#define NIL_TASK_ID NIL_ID -#define NIL_ACTOR_ID NIL_ID -#define NIL_FUNCTION_ID NIL_ID - typedef UniqueID FunctionID; /** The task ID is a deterministic hash of the function ID that the task @@ -189,7 +187,7 @@ void TaskSpec_start_construct(TaskBuilder *B, * @param spec The task spec whose ID and return object IDs should be computed. * @return Void. */ -uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size); +TaskSpec *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size); /** * Return the function ID of the task. @@ -256,7 +254,7 @@ bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index); * @param spec The task_spec in question. * @return The driver ID of the task. */ -UniqueID TaskSpec_driver_id(TaskSpec *spec); +UniqueID TaskSpec_driver_id(const TaskSpec *spec); /** * Return the task ID of the parent task. @@ -264,7 +262,7 @@ UniqueID TaskSpec_driver_id(TaskSpec *spec); * @param spec The task_spec in question. * @return The task ID of the parent task. */ -TaskID TaskSpec_parent_task_id(TaskSpec *spec); +TaskID TaskSpec_parent_task_id(const TaskSpec *spec); /** * Return the task counter of the parent task. For example, this equals 5 if @@ -281,7 +279,7 @@ int64_t TaskSpec_parent_counter(TaskSpec *spec); * @param spec The task_spec in question. * @return The task ID of the task. */ -TaskID TaskSpec_task_id(TaskSpec *spec); +TaskID TaskSpec_task_id(const TaskSpec *spec); /** * Get the number of arguments to this task. diff --git a/src/common/test/common_tests.cc b/src/common/test/common_tests.cc deleted file mode 100644 index 8acdc335e..000000000 --- a/src/common/test/common_tests.cc +++ /dev/null @@ -1,24 +0,0 @@ -#include "greatest.h" - -#include "common.h" - -SUITE(common_tests); - -TEST sha1_test(void) { - static char hex[ID_STRING_SIZE]; - UniqueID uid = globally_unique_id(); - ObjectID_to_string((ObjectID) uid, &hex[0], ID_STRING_SIZE); - PASS(); -} - -SUITE(common_tests) { - RUN_TEST(sha1_test); -} - -GREATEST_MAIN_DEFS(); - -int main(int argc, char **argv) { - GREATEST_MAIN_BEGIN(); - RUN_SUITE(common_tests); - GREATEST_MAIN_END(); -} diff --git a/src/common/test/db_tests.cc b/src/common/test/db_tests.cc index 5ed016404..1df7a37a4 100644 --- a/src/common/test/db_tests.cc +++ b/src/common/test/db_tests.cc @@ -81,7 +81,7 @@ TEST object_table_lookup_test(void) { manager_addr, db_connect_args2); db_attach(db1, loop, false); db_attach(db2, loop, false); - UniqueID id = globally_unique_id(); + UniqueID id = UniqueID::from_random(); RetryInfo retry = { .num_retries = NUM_RETRIES, .timeout = TIMEOUT, @@ -149,7 +149,7 @@ TEST task_table_test(void) { DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "local_scheduler", "127.0.0.1", std::vector()); db_attach(db, loop, false); - DBClientID local_scheduler_id = globally_unique_id(); + DBClientID local_scheduler_id = DBClientID::from_random(); TaskExecutionSpec spec = example_task_execution_spec(1, 1); task_table_test_task = Task_alloc(spec, TASK_STATUS_SCHEDULED, local_scheduler_id); @@ -185,12 +185,14 @@ TEST task_table_all_test(void) { db_attach(db, loop, false); TaskExecutionSpec spec = example_task_execution_spec(1, 1); /* Schedule two tasks on different local local schedulers. */ - Task *task1 = Task_alloc(spec, TASK_STATUS_SCHEDULED, globally_unique_id()); - Task *task2 = Task_alloc(spec, TASK_STATUS_SCHEDULED, globally_unique_id()); + Task *task1 = + Task_alloc(spec, TASK_STATUS_SCHEDULED, DBClientID::from_random()); + Task *task2 = + Task_alloc(spec, TASK_STATUS_SCHEDULED, DBClientID::from_random()); RetryInfo retry = { .num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED, + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_SCHEDULED, task_table_all_test_callback, NULL, &retry, NULL, NULL); event_loop_add_timer(loop, 50, (event_loop_timer_handler) timeout_handler, NULL); @@ -221,7 +223,7 @@ TEST unique_client_id_test(void) { } for (int i = 0; i < num_conns; ++i) { for (int j = 0; j < i; ++j) { - ASSERT(!DBClientID_equal(ids[i], ids[j])); + ASSERT(!(ids[i] == ids[j])); } } PASS(); diff --git a/src/common/test/example_task.h b/src/common/test/example_task.h index 8d44cd205..194766b8b 100644 --- a/src/common/test/example_task.h +++ b/src/common/test/example_task.h @@ -11,15 +11,15 @@ static inline TaskExecutionSpec example_task_execution_spec_with_args( int64_t num_args, int64_t num_returns, ObjectID arg_ids[]) { - TaskID parent_task_id = globally_unique_id(); - FunctionID func_id = globally_unique_id(); - TaskSpec_start_construct(g_task_builder, NIL_ID, parent_task_id, 0, - NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, + TaskID parent_task_id = TaskID::from_random(); + FunctionID func_id = FunctionID::from_random(); + TaskSpec_start_construct(g_task_builder, UniqueID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, num_returns); for (int64_t i = 0; i < num_args; ++i) { ObjectID arg_id; if (arg_ids == NULL) { - arg_id = globally_unique_id(); + arg_id = ObjectID::from_random(); } else { arg_id = arg_ids[i]; } @@ -46,7 +46,7 @@ static inline Task *example_task_with_args(int64_t num_args, ObjectID arg_ids[]) { TaskExecutionSpec spec = example_task_execution_spec_with_args(num_args, num_returns, arg_ids); - Task *instance = Task_alloc(spec, task_state, NIL_ID); + Task *instance = Task_alloc(spec, task_state, UniqueID::nil()); return instance; } @@ -54,7 +54,7 @@ static inline Task *example_task(int64_t num_args, int64_t num_returns, int task_state) { TaskExecutionSpec spec = example_task_execution_spec(num_args, num_returns); - Task *instance = Task_alloc(spec, task_state, NIL_ID); + Task *instance = Task_alloc(spec, task_state, UniqueID::nil()); return instance; } @@ -62,7 +62,7 @@ static inline bool Task_equals(Task *task1, Task *task2) { if (task1->state != task2->state) { return false; } - if (!DBClientID_equal(task1->local_scheduler_id, task2->local_scheduler_id)) { + if (!(task1->local_scheduler_id == task2->local_scheduler_id)) { return false; } auto execution_spec1 = Task_task_execution_spec(task1); diff --git a/src/common/test/object_table_tests.cc b/src/common/test/object_table_tests.cc index 584ba9475..4999c0037 100644 --- a/src/common/test/object_table_tests.cc +++ b/src/common/test/object_table_tests.cc @@ -38,13 +38,13 @@ void new_object_done_callback(ObjectID object_id, bool is_put, void *user_context) { new_object_succeeded = 1; - CHECK(ObjectID_equal(object_id, new_object_id)); - CHECK(TaskID_equal(task_id, new_object_task_id)); + CHECK(object_id == new_object_id); + CHECK(task_id == new_object_task_id); event_loop_stop(g_loop); } void new_object_lookup_callback(ObjectID object_id, void *user_context) { - CHECK(ObjectID_equal(object_id, new_object_id)); + CHECK(object_id == new_object_id); RetryInfo retry = { .num_retries = 5, .timeout = 100, @@ -78,7 +78,7 @@ void task_table_subscribe_done(TaskID task_id, void *user_context) { TEST new_object_test(void) { new_object_failed = 0; new_object_succeeded = 0; - new_object_id = globally_unique_id(); + new_object_id = ObjectID::from_random(); new_object_task = example_task(1, 1, TASK_STATUS_WAITING); new_object_task_spec = Task_task_execution_spec(new_object_task)->Spec(); new_object_task_id = TaskSpec_task_id(new_object_task_spec); @@ -91,8 +91,8 @@ TEST new_object_test(void) { .timeout = 100, .fail_callback = new_object_fail_callback, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - task_table_subscribe_done, db); + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + &retry, task_table_subscribe_done, db); event_loop_run(g_loop); db_disconnect(db); destroy_outstanding_callbacks(g_loop); @@ -109,15 +109,15 @@ void new_object_no_task_callback(ObjectID object_id, bool is_put, void *user_context) { new_object_succeeded = 1; - CHECK(IS_NIL_ID(task_id)); + CHECK(task_id.is_nil()); event_loop_stop(g_loop); } TEST new_object_no_task_test(void) { new_object_failed = 0; new_object_succeeded = 0; - new_object_id = globally_unique_id(); - new_object_task_id = globally_unique_id(); + new_object_id = ObjectID::from_random(); + new_object_task_id = TaskID::from_random(); g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", "127.0.0.1", std::vector()); @@ -167,7 +167,7 @@ TEST lookup_timeout_test(void) { RetryInfo retry = { .num_retries = 5, .timeout = 100, .fail_callback = lookup_fail_callback, }; - object_table_lookup(db, NIL_ID, &retry, lookup_done_callback, + object_table_lookup(db, UniqueID::nil(), &retry, lookup_done_callback, (void *) lookup_timeout_context); /* Disconnect the database to see if the lookup times out. */ close(db->context->c.fd); @@ -206,7 +206,7 @@ TEST add_timeout_test(void) { RetryInfo retry = { .num_retries = 5, .timeout = 100, .fail_callback = add_fail_callback, }; - object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry, + object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry, add_done_callback, (void *) add_timeout_context); /* Disconnect the database to see if the lookup times out. */ close(db->context->c.fd); @@ -327,7 +327,7 @@ void add_lookup_callback(ObjectID object_id, bool success, void *user_context) { .timeout = 100, .fail_callback = lookup_retry_fail_callback, }; - object_table_lookup(db, NIL_ID, &retry, add_lookup_done_callback, + object_table_lookup(db, UniqueID::nil(), &retry, add_lookup_done_callback, (void *) db); } @@ -346,7 +346,7 @@ TEST add_lookup_test(void) { .timeout = 100, .fail_callback = lookup_retry_fail_callback, }; - object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry, + object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry, add_lookup_callback, (void *) db); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, @@ -381,7 +381,8 @@ void add_remove_lookup_callback(ObjectID object_id, .timeout = 100, .fail_callback = lookup_retry_fail_callback, }; - object_table_lookup(db, NIL_ID, &retry, add_remove_lookup_done_callback, + object_table_lookup(db, UniqueID::nil(), &retry, + add_remove_lookup_done_callback, (void *) lookup_retry_context); } @@ -393,8 +394,8 @@ void add_remove_callback(ObjectID object_id, bool success, void *user_context) { .timeout = 100, .fail_callback = lookup_retry_fail_callback, }; - object_table_remove(db, NIL_ID, NULL, &retry, add_remove_lookup_callback, - (void *) db); + object_table_remove(db, UniqueID::nil(), NULL, &retry, + add_remove_lookup_callback, (void *) db); } TEST add_remove_lookup_test(void) { @@ -408,7 +409,7 @@ TEST add_remove_lookup_test(void) { .timeout = 100, .fail_callback = lookup_retry_fail_callback, }; - object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry, + object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry, add_remove_callback, (void *) db); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, @@ -454,7 +455,7 @@ TEST lookup_late_test(void) { .timeout = 0, .fail_callback = lookup_late_fail_callback, }; - object_table_lookup(db, NIL_ID, &retry, lookup_late_done_callback, + object_table_lookup(db, UniqueID::nil(), &retry, lookup_late_done_callback, (void *) lookup_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, @@ -496,7 +497,7 @@ TEST add_late_test(void) { RetryInfo retry = { .num_retries = 0, .timeout = 0, .fail_callback = add_late_fail_callback, }; - object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry, + object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry, add_late_done_callback, (void *) add_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, @@ -594,7 +595,7 @@ void subscribe_success_object_available_callback( const std::vector &manager_vector, void *user_context) { CHECK(user_context == (void *) subscribe_success_context); - CHECK(ObjectID_equal(object_id, subscribe_id)); + CHECK(object_id == subscribe_id); CHECK(manager_vector.size() == 1); subscribe_success_succeeded = 1; } @@ -609,7 +610,7 @@ TEST subscribe_success_test(void) { DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); - subscribe_id = globally_unique_id(); + subscribe_id = ObjectID::from_random(); RetryInfo retry = { .num_retries = 0, @@ -679,7 +680,7 @@ TEST subscribe_object_present_test(void) { DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); - UniqueID id = globally_unique_id(); + UniqueID id = UniqueID::from_random(); RetryInfo retry = { .num_retries = 0, .timeout = 100, .fail_callback = fatal_fail_callback, }; @@ -730,7 +731,7 @@ TEST subscribe_object_not_present_test(void) { DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", "127.0.0.1", std::vector()); db_attach(db, g_loop, false); - UniqueID id = globally_unique_id(); + UniqueID id = UniqueID::from_random(); RetryInfo retry = { .num_retries = 0, .timeout = 100, .fail_callback = NULL, }; @@ -795,7 +796,7 @@ TEST subscribe_object_available_later_test(void) { DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); - UniqueID id = globally_unique_id(); + UniqueID id = UniqueID::from_random(); RetryInfo retry = { .num_retries = 0, .timeout = 100, .fail_callback = NULL, }; @@ -850,7 +851,7 @@ TEST subscribe_object_available_subscribe_all(void) { DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", "127.0.0.1", db_connect_args); db_attach(db, g_loop, false); - UniqueID id = globally_unique_id(); + UniqueID id = UniqueID::from_random(); RetryInfo retry = { .num_retries = 0, .timeout = 100, .fail_callback = NULL, }; diff --git a/src/common/test/redis_tests.cc b/src/common/test/redis_tests.cc index 46fead5bb..9acd82981 100644 --- a/src/common/test/redis_tests.cc +++ b/src/common/test/redis_tests.cc @@ -167,7 +167,7 @@ void logging_read_callback(event_loop *loop, DBHandle *conn = (DBHandle *) context; char *cmd = read_log_message(fd); redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd, - (char *) conn->client.id, sizeof(conn->client.id)); + (char *) conn->client.data(), sizeof(conn->client)); free(cmd); } @@ -200,8 +200,8 @@ TEST logging_test(void) { int client_fd = connect_ipc_sock(socket_pathname); ASSERT(client_fd >= 0); connections.push_back(client_fd); - RayLogger *logger = RayLogger_init("worker", RAY_INFO, 0, &client_fd); - RayLogger_log(logger, RAY_INFO, "TEST", "Message"); + RayLogger *logger = RayLogger_init("worker", RAY_LOG_INFO, 0, &client_fd); + RayLogger_log(logger, RAY_LOG_INFO, "TEST", "Message"); event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, logging_accept_callback, conn); diff --git a/src/common/test/run_tests.sh b/src/common/test/run_tests.sh index cc5c73eb2..036d7264d 100644 --- a/src/common/test/run_tests.sh +++ b/src/common/test/run_tests.sh @@ -13,7 +13,6 @@ sleep 1s ./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1 ./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380 -./src/common/common_tests ./src/common/db_tests ./src/common/io_tests ./src/common/task_tests diff --git a/src/common/test/run_valgrind.sh b/src/common/test/run_valgrind.sh index 9424d9a8e..db906b008 100644 --- a/src/common/test/run_valgrind.sh +++ b/src/common/test/run_valgrind.sh @@ -15,13 +15,12 @@ sleep 1s ./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1 ./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380 -valgrind --leak-check=full --error-exitcode=1 ./src/common/common_tests -valgrind --leak-check=full --error-exitcode=1 ./src/common/db_tests -valgrind --leak-check=full --error-exitcode=1 ./src/common/io_tests -valgrind --leak-check=full --error-exitcode=1 ./src/common/task_tests -valgrind --leak-check=full --error-exitcode=1 ./src/common/redis_tests -valgrind --leak-check=full --error-exitcode=1 ./src/common/task_table_tests -valgrind --leak-check=full --error-exitcode=1 ./src/common/object_table_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/db_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/io_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/redis_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_table_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/object_table_tests ./src/common/thirdparty/redis/src/redis-cli shutdown ./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown diff --git a/src/common/test/task_table_tests.cc b/src/common/test/task_table_tests.cc index 1b9eccd37..e9adf466f 100644 --- a/src/common/test/task_table_tests.cc +++ b/src/common/test/task_table_tests.cc @@ -38,7 +38,7 @@ void lookup_nil_success_callback(Task *task, void *context) { } TEST lookup_nil_test(void) { - lookup_nil_id = globally_unique_id(); + lookup_nil_id = TaskID::from_random(); g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", "127.0.0.1", std::vector()); @@ -116,8 +116,8 @@ TEST add_lookup_test(void) { .fail_callback = add_lookup_fail_callback, }; /* Wait for subscription to succeed before adding the task. */ - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - subscribe_success_callback, (void *) db); + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + &retry, subscribe_success_callback, (void *) db); /* Disconnect the database to see if the lookup times out. */ event_loop_run(g_loop); db_disconnect(db); @@ -156,8 +156,8 @@ TEST subscribe_timeout_test(void) { .timeout = 100, .fail_callback = subscribe_fail_callback, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - subscribe_done_callback, + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + &retry, subscribe_done_callback, (void *) subscribe_timeout_context); /* Disconnect the database to see if the subscribe times out. */ close(db->subscribe_context->c.fd); @@ -198,8 +198,8 @@ TEST publish_timeout_test(void) { RetryInfo retry = { .num_retries = 5, .timeout = 100, .fail_callback = publish_fail_callback, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - NULL, NULL); + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + &retry, NULL, NULL); task_table_add_task(db, task, &retry, publish_done_callback, (void *) publish_timeout_context); /* Disconnect the database to see if the publish times out. */ @@ -270,8 +270,8 @@ TEST subscribe_retry_test(void) { .timeout = 100, .fail_callback = subscribe_retry_fail_callback, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - subscribe_retry_done_callback, + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + &retry, subscribe_retry_done_callback, (void *) subscribe_retry_context); /* Disconnect the database to see if the subscribe times out. */ close(db->subscribe_context->c.fd); @@ -321,8 +321,8 @@ TEST publish_retry_test(void) { .timeout = 100, .fail_callback = publish_retry_fail_callback, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - NULL, NULL); + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + &retry, NULL, NULL); task_table_add_task(db, task, &retry, publish_retry_done_callback, (void *) publish_retry_context); /* Disconnect the database to see if the publish times out. */ @@ -374,8 +374,8 @@ TEST subscribe_late_test(void) { .timeout = 0, .fail_callback = subscribe_late_fail_callback, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - subscribe_late_done_callback, + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + &retry, subscribe_late_done_callback, (void *) subscribe_late_context); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, @@ -420,8 +420,8 @@ TEST publish_late_test(void) { .timeout = 0, .fail_callback = publish_late_fail_callback, }; - task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, NULL, NULL, - NULL); + task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL, + NULL, NULL, NULL); task_table_add_task(db, task, &retry, publish_late_done_callback, (void *) publish_late_context); /* Install handler for terminating the event loop. */ diff --git a/src/common/test/task_tests.cc b/src/common/test/task_tests.cc index 303ec6392..a18c67bc9 100644 --- a/src/common/test/task_tests.cc +++ b/src/common/test/task_tests.cc @@ -12,31 +12,32 @@ SUITE(task_tests); TEST task_test(void) { - TaskID parent_task_id = globally_unique_id(); - FunctionID func_id = globally_unique_id(); + TaskID parent_task_id = TaskID::from_random(); + FunctionID func_id = FunctionID::from_random(); TaskBuilder *builder = make_task_builder(); - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, func_id, 2); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 2); - UniqueID arg1 = globally_unique_id(); + UniqueID arg1 = UniqueID::from_random(); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, (uint8_t *) "hello", 5); - UniqueID arg2 = globally_unique_id(); + UniqueID arg2 = UniqueID::from_random(); TaskSpec_args_add_ref(builder, &arg2, 1); TaskSpec_args_add_val(builder, (uint8_t *) "world", 5); /* Finish constructing the spec. This constructs the task ID and the * return IDs. */ int64_t size; - uint8_t *spec = TaskSpec_finish_construct(builder, &size); + TaskSpec *spec = TaskSpec_finish_construct(builder, &size); /* Check that the spec was constructed as expected. */ ASSERT(TaskSpec_num_args(spec) == 4); ASSERT(TaskSpec_num_returns(spec) == 2); ASSERT(FunctionID_equal(TaskSpec_function(spec), func_id)); - ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 0, 0), arg1)); + ASSERT(TaskSpec_arg_id(spec, 0, 0) == arg1); ASSERT(memcmp(TaskSpec_arg_val(spec, 1), (uint8_t *) "hello", TaskSpec_arg_length(spec, 1)) == 0); - ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 2, 0), arg2)); + ASSERT(TaskSpec_arg_id(spec, 2, 0) == arg2); ASSERT(memcmp(TaskSpec_arg_val(spec, 3), (uint8_t *) "world", TaskSpec_arg_length(spec, 3)) == 0); @@ -48,22 +49,24 @@ TEST task_test(void) { TEST deterministic_ids_test(void) { TaskBuilder *builder = make_task_builder(); /* Define the inputs to the task construction. */ - TaskID parent_task_id = globally_unique_id(); - FunctionID func_id = globally_unique_id(); - UniqueID arg1 = globally_unique_id(); + TaskID parent_task_id = TaskID::from_random(); + FunctionID func_id = FunctionID::from_random(); + UniqueID arg1 = UniqueID::from_random(); uint8_t *arg2 = (uint8_t *) "hello world"; /* Construct a first task. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, func_id, 3); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size1; TaskSpec *spec1 = TaskSpec_finish_construct(builder, &size1); /* Construct a second identical task. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, func_id, 3); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size2; @@ -71,52 +74,57 @@ TEST deterministic_ids_test(void) { /* Check that these tasks have the same task IDs and the same return IDs. */ ASSERT(TaskID_equal(TaskSpec_task_id(spec1), TaskSpec_task_id(spec2))); - ASSERT(ObjectID_equal(TaskSpec_return(spec1, 0), TaskSpec_return(spec2, 0))); - ASSERT(ObjectID_equal(TaskSpec_return(spec1, 1), TaskSpec_return(spec2, 1))); - ASSERT(ObjectID_equal(TaskSpec_return(spec1, 2), TaskSpec_return(spec2, 2))); + ASSERT(TaskSpec_return(spec1, 0) == TaskSpec_return(spec2, 0)); + ASSERT(TaskSpec_return(spec1, 1) == TaskSpec_return(spec2, 1)); + ASSERT(TaskSpec_return(spec1, 2) == TaskSpec_return(spec2, 2)); /* Check that the return IDs are all distinct. */ - ASSERT(!ObjectID_equal(TaskSpec_return(spec1, 0), TaskSpec_return(spec2, 1))); - ASSERT(!ObjectID_equal(TaskSpec_return(spec1, 0), TaskSpec_return(spec2, 2))); - ASSERT(!ObjectID_equal(TaskSpec_return(spec1, 1), TaskSpec_return(spec2, 2))); + ASSERT(!(TaskSpec_return(spec1, 0) == TaskSpec_return(spec2, 1))); + ASSERT(!(TaskSpec_return(spec1, 0) == TaskSpec_return(spec2, 2))); + ASSERT(!(TaskSpec_return(spec1, 1) == TaskSpec_return(spec2, 2))); /* Create more tasks that are only mildly different. */ /* Construct a task with a different parent task ID. */ - TaskSpec_start_construct(builder, NIL_ID, globally_unique_id(), 0, - NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); + TaskSpec_start_construct(builder, DriverID::nil(), TaskID::from_random(), 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size3; TaskSpec *spec3 = TaskSpec_finish_construct(builder, &size3); /* Construct a task with a different parent counter. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 1, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, func_id, 3); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 1, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size4; TaskSpec *spec4 = TaskSpec_finish_construct(builder, &size4); /* Construct a task with a different function ID. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, globally_unique_id(), 3); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, + FunctionID::from_random(), 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size5; TaskSpec *spec5 = TaskSpec_finish_construct(builder, &size5); /* Construct a task with a different object ID argument. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, func_id, 3); - ObjectID object_id = globally_unique_id(); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 3); + ObjectID object_id = ObjectID::from_random(); TaskSpec_args_add_ref(builder, &object_id, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size6; TaskSpec *spec6 = TaskSpec_finish_construct(builder, &size6); /* Construct a task with a different value argument. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, func_id, 3); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 3); TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, (uint8_t *) "hello_world", 11); int64_t size7; @@ -136,9 +144,8 @@ TEST deterministic_ids_test(void) { for (int task_index2 = 0; task_index2 < 6; ++task_index2) { for (int return_index2 = 0; return_index2 < 3; ++return_index2) { if (task_index1 != task_index2 && return_index1 != return_index2) { - ASSERT(!ObjectID_equal( - TaskSpec_return(specs[task_index1], return_index1), - TaskSpec_return(specs[task_index2], return_index2))); + ASSERT(!(TaskSpec_return(specs[task_index1], return_index1) == + TaskSpec_return(specs[task_index2], return_index2))); } } } @@ -158,15 +165,16 @@ TEST deterministic_ids_test(void) { TEST send_task(void) { TaskBuilder *builder = make_task_builder(); - TaskID parent_task_id = globally_unique_id(); - FunctionID func_id = globally_unique_id(); - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, - NIL_ACTOR_ID, 0, false, func_id, 2); - ObjectID object_id = globally_unique_id(); + TaskID parent_task_id = TaskID::from_random(); + FunctionID func_id = FunctionID::from_random(); + TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0, + ActorID::nil(), ActorID::nil(), 0, false, func_id, + 2); + ObjectID object_id = ObjectID::from_random(); TaskSpec_args_add_ref(builder, &object_id, 1); TaskSpec_args_add_val(builder, (uint8_t *) "Hello", 5); TaskSpec_args_add_val(builder, (uint8_t *) "World", 5); - object_id = globally_unique_id(); + object_id = ObjectID::from_random(); TaskSpec_args_add_ref(builder, &object_id, 1); int64_t size; TaskSpec *spec = TaskSpec_finish_construct(builder, &size); diff --git a/src/global_scheduler/CMakeLists.txt b/src/global_scheduler/CMakeLists.txt index bd6365bb8..eaf5c11c2 100644 --- a/src/global_scheduler/CMakeLists.txt +++ b/src/global_scheduler/CMakeLists.txt @@ -7,4 +7,4 @@ include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall") add_executable(global_scheduler global_scheduler.cc global_scheduler_algorithm.cc) -target_link_libraries(global_scheduler common ${HIREDIS_LIB}) +target_link_libraries(global_scheduler common ${HIREDIS_LIB} ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB}) diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index 989d09188..8a0a64a80 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -64,15 +64,14 @@ void assign_task_to_local_scheduler_retry(UniqueID id, void assign_task_to_local_scheduler(GlobalSchedulerState *state, Task *task, DBClientID local_scheduler_id) { - char id_string[ID_STRING_SIZE]; + std::string id_string = local_scheduler_id.hex(); TaskSpec *spec = Task_task_execution_spec(task)->Spec(); - LOG_DEBUG("assigning task to local_scheduler_id = %s", - ObjectID_to_string(local_scheduler_id, id_string, ID_STRING_SIZE)); + LOG_DEBUG("assigning task to local_scheduler_id = %s", local_scheduler_id, + id_string.c_str()); Task_set_state(task, TASK_STATUS_SCHEDULED); Task_set_local_scheduler(task, local_scheduler_id); - LOG_DEBUG("Issuing a task table update for task = %s", - ObjectID_to_string(Task_task_id(task), id_string, ID_STRING_SIZE)); - ARROW_UNUSED(id_string); + id_string = Task_task_id(task).hex(); + LOG_DEBUG("Issuing a task table update for task = %s", id_string.c_str()); auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. .timeout = 0, // This value is unused. @@ -254,10 +253,8 @@ remove_local_scheduler( */ void process_new_db_client(DBClient *db_client, void *user_context) { GlobalSchedulerState *state = (GlobalSchedulerState *) user_context; - char id_string[ID_STRING_SIZE]; - LOG_DEBUG("db client table callback for db client = %s", - ObjectID_to_string(db_client->id, id_string, ID_STRING_SIZE)); - ARROW_UNUSED(id_string); + std::string id_string = db_client->id.hex(); + LOG_DEBUG("db client table callback for db client = %s", id_string.c_str()); if (strncmp(db_client->client_type.c_str(), "local_scheduler", strlen("local_scheduler")) == 0) { bool local_scheduler_present = @@ -296,10 +293,9 @@ void object_table_subscribe_callback(ObjectID object_id, void *user_context) { /* Extract global scheduler state from the callback context. */ GlobalSchedulerState *state = (GlobalSchedulerState *) user_context; - char id_string[ID_STRING_SIZE]; + std::string id_string = object_id.hex(); LOG_DEBUG("object table subscribe callback for OBJECT = %s", - ObjectID_to_string(object_id, id_string, ID_STRING_SIZE)); - ARROW_UNUSED(id_string); + id_string.c_str()); const std::vector managers = db_client_table_get_ip_addresses(state->db, manager_ids); @@ -315,8 +311,9 @@ void object_table_subscribe_callback(ObjectID object_id, state->scheduler_object_info_table[object_id]; obj_info_entry.data_size = data_size; + id_string = object_id.hex(); LOG_DEBUG("New object added to object_info_table with id = %s", - ObjectID_to_string(object_id, id_string, ID_STRING_SIZE)); + id_string.c_str()); LOG_DEBUG("\tmanager locations:"); for (size_t i = 0; i < managers.size(); i++) { LOG_DEBUG("\t\t%s", managers[i]); @@ -339,11 +336,9 @@ void local_scheduler_table_handler(DBClientID client_id, /* Extract global scheduler state from the callback context. */ GlobalSchedulerState *state = (GlobalSchedulerState *) user_context; ARROW_UNUSED(state); - char id_string[ID_STRING_SIZE]; - LOG_DEBUG( - "Local scheduler heartbeat from db_client_id %s", - ObjectID_to_string((ObjectID) client_id, id_string, ID_STRING_SIZE)); - ARROW_UNUSED(id_string); + std::string id_string = client_id.hex(); + LOG_DEBUG("Local scheduler heartbeat from db_client_id %s", + id_string.c_str()); LOG_DEBUG( "total workers = %d, task queue length = %d, available workers = %d", info.total_num_workers, info.task_queue_length, info.available_workers); @@ -436,7 +431,7 @@ void start_server(const char *node_ip_address, * submits tasks to the global scheduler before the global scheduler * successfully subscribes, then the local scheduler that submitted the tasks * will retry. */ - task_table_subscribe(g_state->db, NIL_ID, TASK_STATUS_WAITING, + task_table_subscribe(g_state->db, UniqueID::nil(), TASK_STATUS_WAITING, process_task_waiting, (void *) g_state, NULL, NULL, NULL); diff --git a/src/global_scheduler/global_scheduler_algorithm.cc b/src/global_scheduler/global_scheduler_algorithm.cc index 03d978bd9..0581f2792 100644 --- a/src/global_scheduler/global_scheduler_algorithm.cc +++ b/src/global_scheduler/global_scheduler_algorithm.cc @@ -128,7 +128,8 @@ bool handle_task_waiting(GlobalSchedulerState *state, double best_local_scheduler_score = INT32_MIN; CHECKM(best_local_scheduler_score < 0, "We might have a floating point underflow"); - DBClientID best_local_scheduler_id = NIL_ID; /* best node to send this task */ + DBClientID best_local_scheduler_id = + DBClientID::nil(); /* best node to send this task */ for (auto it = state->local_schedulers.begin(); it != state->local_schedulers.end(); it++) { /* For each local scheduler, calculate its score. Check hard constraints @@ -147,15 +148,15 @@ bool handle_task_waiting(GlobalSchedulerState *state, } if (!task_feasible) { - char id_string[ID_STRING_SIZE]; + std::string id_string = Task_task_id(task).hex(); LOG_ERROR( "Infeasible task. No nodes satisfy hard constraints for task = %s", - ObjectID_to_string(Task_task_id(task), id_string, ID_STRING_SIZE)); + id_string.c_str()); /* TODO(atumanov): propagate this error to the task's driver and/or * cache the task in case new local schedulers satisfy it in the future. */ return false; } - CHECKM(!IS_NIL_ID(best_local_scheduler_id), + CHECKM(!best_local_scheduler_id.is_nil(), "Task is feasible, but doesn't have a local scheduler assigned."); /* A local scheduler ID was found, so assign the task. */ assign_task_to_local_scheduler(state, task, best_local_scheduler_id); diff --git a/src/local_scheduler/CMakeLists.txt b/src/local_scheduler/CMakeLists.txt index 2513bae22..c2b011557 100644 --- a/src/local_scheduler/CMakeLists.txt +++ b/src/local_scheduler/CMakeLists.txt @@ -8,6 +8,7 @@ include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) # Include plasma list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/../thirdparty/arrow/python/cmake_modules) +find_package(Arrow) find_package(Plasma) include_directories(SYSTEM ${PLASMA_INCLUDE_DIR}) @@ -61,18 +62,18 @@ add_library(local_scheduler_client STATIC local_scheduler_client.cc) add_dependencies(local_scheduler_client gen_local_scheduler_fbs) if(APPLE) - target_link_libraries(local_scheduler_library "-undefined dynamic_lookup" local_scheduler_client common) + target_link_libraries(local_scheduler_library "-undefined dynamic_lookup" local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB}) else(APPLE) - target_link_libraries(local_scheduler_library local_scheduler_client common) + target_link_libraries(local_scheduler_library local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB}) endif(APPLE) add_dependencies(local_scheduler_library gen_local_scheduler_fbs) add_executable(local_scheduler local_scheduler.cc local_scheduler_algorithm.cc) -target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a -lpthread) +target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread) add_executable(local_scheduler_tests test/local_scheduler_tests.cc local_scheduler.cc local_scheduler_algorithm.cc) -target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a -lpthread) +target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread) target_compile_options(local_scheduler_tests PUBLIC "-DLOCAL_SCHEDULER_TEST") install(TARGETS local_scheduler_library DESTINATION ${CMAKE_SOURCE_DIR}/local_scheduler) diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 364c8f646..fa7c8d1f8 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -87,7 +87,7 @@ void kill_worker(LocalSchedulerState *state, release_resources(state, worker, worker->resources_in_use); /* Erase the algorithm state's reference to the worker. */ - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + if (worker->actor_id.is_nil()) { handle_worker_removed(state, state->algorithm_state, worker); } else { /* Let the scheduling algorithm process the absence of this worker. */ @@ -130,7 +130,7 @@ void kill_worker(LocalSchedulerState *state, TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec(); TaskID task_id = TaskSpec_task_id(spec); push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX, - sizeof(task_id), task_id.id); + sizeof(task_id), task_id.data()); } /* Clean up the task in progress. */ @@ -229,7 +229,7 @@ void start_worker(LocalSchedulerState *state, ActorID actor_id, bool reconstruct) { /* Non-actors can't be started in reconstruct mode. */ - if (ActorID_equal(actor_id, NIL_ACTOR_ID)) { + if (actor_id.is_nil()) { CHECK(!reconstruct); } /* We can't start a worker if we don't have the path to the worker script. */ @@ -256,10 +256,9 @@ void start_worker(LocalSchedulerState *state, /* Pass in the worker's actor ID. */ const char *actor_id_string = "--actor-id"; - char id_string[ID_STRING_SIZE]; - ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE); + std::string id_string = actor_id.hex(); command_vector.push_back(actor_id_string); - command_vector.push_back((const char *) id_string); + command_vector.push_back(id_string.c_str()); /* Add a flag for reconstructing the actor if necessary. */ const char *reconstruct_string = "--reconstruct"; @@ -405,7 +404,7 @@ LocalSchedulerState *LocalSchedulerState_init( /* Start the initial set of workers. */ for (int i = 0; i < num_workers; ++i) { - start_worker(state, NIL_ACTOR_ID, false); + start_worker(state, ActorID::nil(), false); } /* Initialize the time at which the previous heartbeat was sent. */ @@ -533,12 +532,12 @@ void assign_task_to_worker(LocalSchedulerState *state, // Check that actor tasks don't have non-CPU requirements. Any necessary // non-CPU resources (in particular, GPUs) should already have been acquired // by the actor worker. - if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + if (!worker->actor_id.is_nil()) { CHECK(required_resources.size() == 1); CHECK(required_resources.count("CPU") == 1); } - CHECK(ActorID_equal(worker->actor_id, TaskSpec_actor_id(spec))); + CHECK(worker->actor_id == TaskSpec_actor_id(spec)); /* Make sure the driver for this task is still alive. */ WorkerID driver_id = TaskSpec_driver_id(spec); CHECK(is_driver_alive(state, driver_id)); @@ -564,8 +563,9 @@ void assign_task_to_worker(LocalSchedulerState *state, } } - Task *task = Task_alloc(execution_spec, TASK_STATUS_RUNNING, - state->db ? get_db_client_id(state->db) : NIL_ID); + Task *task = + Task_alloc(execution_spec, TASK_STATUS_RUNNING, + state->db ? get_db_client_id(state->db) : DBClientID::nil()); /* Record which task this worker is executing. This will be freed in * process_message when the worker sends a GetTask message to the local * scheduler. */ @@ -586,7 +586,7 @@ void finish_task(LocalSchedulerState *state, /* Return dynamic resources back for the task in progress. */ CHECK(worker->resources_in_use["CPU"] == TaskSpec_get_required_resource(spec, "CPU")); - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + if (worker->actor_id.is_nil()) { CHECK(worker->gpus_in_use.size() == TaskSpec_get_required_resource(spec, "GPU")); release_resources(state, worker, worker->resources_in_use); @@ -647,7 +647,7 @@ void reconstruct_task_update_callback(Task *task, /* The test-and-set failed. The task is either: (1) not finished yet, (2) * lost, but not yet updated, or (3) already being reconstructed. */ DBClientID current_local_scheduler_id = Task_local_scheduler(task); - if (!DBClientID_is_nil(current_local_scheduler_id)) { + if (!current_local_scheduler_id.is_nil()) { DBClient current_local_scheduler = db_client_table_cache_get(state->db, current_local_scheduler_id); if (!current_local_scheduler.is_alive) { @@ -669,7 +669,7 @@ void reconstruct_task_update_callback(Task *task, * to ensure that reconstruction will happen. */ TaskExecutionSpec *execution_spec = Task_task_execution_spec(task); TaskSpec *spec = execution_spec->Spec(); - if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { + if (TaskSpec_actor_id(spec).is_nil()) { handle_task_submitted(state, state->algorithm_state, *execution_spec); } else { handle_actor_task_submitted(state, state->algorithm_state, *execution_spec); @@ -694,7 +694,7 @@ void reconstruct_put_task_update_callback(Task *task, /* The test-and-set failed. The task is either: (1) not finished yet, (2) * lost, but not yet updated, or (3) already being reconstructed. */ DBClientID current_local_scheduler_id = Task_local_scheduler(task); - if (!DBClientID_is_nil(current_local_scheduler_id)) { + if (!current_local_scheduler_id.is_nil()) { DBClient current_local_scheduler = db_client_table_cache_get(state->db, current_local_scheduler_id); if (!current_local_scheduler.is_alive) { @@ -713,7 +713,7 @@ void reconstruct_put_task_update_callback(Task *task, FunctionID function = TaskSpec_function(spec); push_error(state->db, TaskSpec_driver_id(spec), PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), - function.id); + function.data()); } } else { /* (1) The task is still executing and it is the driver task. We cannot @@ -722,7 +722,8 @@ void reconstruct_put_task_update_callback(Task *task, TaskSpec *spec = Task_task_execution_spec(task)->Spec(); FunctionID function = TaskSpec_function(spec); push_error(state->db, TaskSpec_driver_id(spec), - PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), function.id); + PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), + function.data()); } } else { /* The update to TASK_STATUS_RECONSTRUCTING succeeded, so continue with @@ -735,7 +736,7 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id, TaskID task_id, bool is_put, void *user_context) { - CHECKM(!IS_NIL_ID(task_id), + CHECKM(!task_id.is_nil(), "No task information found for object during reconstruction"); LocalSchedulerState *state = (LocalSchedulerState *) user_context; @@ -752,16 +753,17 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id, } /* If there are no other instances of the task running, it's safe for us to * claim responsibility for reconstruction. */ - task_table_test_and_update( - state->db, task_id, NIL_ID, (TASK_STATUS_DONE | TASK_STATUS_LOST), - TASK_STATUS_RECONSTRUCTING, NULL, done_callback, state); + task_table_test_and_update(state->db, task_id, DBClientID::nil(), + (TASK_STATUS_DONE | TASK_STATUS_LOST), + TASK_STATUS_RECONSTRUCTING, NULL, done_callback, + state); } void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id, TaskID task_id, bool is_put, void *user_context) { - if (IS_NIL_ID(task_id)) { + if (task_id.is_nil()) { /* NOTE(swang): For some reason, the result table update sometimes happens * after this lookup returns, possibly due to concurrent clients. In most * cases, this is okay because the initial execution is probably still @@ -774,8 +776,8 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id, LocalSchedulerState *state = (LocalSchedulerState *) user_context; /* If the task failed to finish, it's safe for us to claim responsibility for * reconstruction. */ - task_table_test_and_update(state->db, task_id, NIL_ID, TASK_STATUS_LOST, - TASK_STATUS_RECONSTRUCTING, NULL, + task_table_test_and_update(state->db, task_id, DBClientID::nil(), + TASK_STATUS_LOST, TASK_STATUS_RECONSTRUCTING, NULL, reconstruct_task_update_callback, state); } @@ -859,7 +861,7 @@ void handle_client_register(LocalSchedulerState *state, CHECK(!worker->registered); worker->registered = true; worker->is_worker = message->is_worker(); - CHECK(WorkerID_equal(worker->client_id, NIL_WORKER_ID)); + CHECK(worker->client_id.is_nil()); worker->client_id = from_flatbuf(*message->client_id()); /* Register the worker or driver. */ @@ -868,14 +870,14 @@ void handle_client_register(LocalSchedulerState *state, * running on the worker). */ worker->pid = message->worker_pid(); ActorID actor_id = from_flatbuf(*message->actor_id()); - if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) { + if (!actor_id.is_nil()) { /* Make sure that the local scheduler is aware that it is responsible for * this actor. */ CHECK(state->actor_mapping.count(actor_id) == 1); - CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id, - get_db_client_id(state->db))); + CHECK(state->actor_mapping[actor_id].local_scheduler_id == + get_db_client_id(state->db)); /* Update the worker struct with this actor ID. */ - CHECK(ActorID_equal(worker->actor_id, NIL_ACTOR_ID)); + CHECK(worker->actor_id.is_nil()); worker->actor_id = actor_id; /* Let the scheduling algorithm process the presence of this new * worker. */ @@ -917,7 +919,7 @@ void handle_client_register(LocalSchedulerState *state, /* If the worker is an actor that corresponds to a driver that has been * removed, then kill the worker. */ - if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) { + if (!actor_id.is_nil()) { WorkerID driver_id = state->actor_mapping[actor_id].driver_id; if (state->removed_drivers.count(driver_id) == 1) { kill_worker(state, worker, false, false); @@ -945,17 +947,17 @@ void handle_driver_removed_callback(WorkerID driver_id, void *user_context) { ActorID actor_id = (*it)->actor_id; Task *task = (*it)->task_in_progress; - if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) { + if (!actor_id.is_nil()) { /* This is an actor. */ CHECK(state->actor_mapping.count(actor_id) == 1); - if (WorkerID_equal(state->actor_mapping[actor_id].driver_id, driver_id)) { + if (state->actor_mapping[actor_id].driver_id == driver_id) { /* This actor was created by the removed driver, so kill the actor. */ LOG_DEBUG("Killing an actor for a removed driver."); kill_worker(state, *it, false, true); } } else if (task != NULL) { TaskSpec *spec = Task_task_execution_spec(task)->Spec(); - if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { + if (TaskSpec_driver_id(spec) == driver_id) { LOG_DEBUG("Killing a worker executing a task for a removed driver."); kill_worker(state, *it, false, true); } @@ -1018,7 +1020,7 @@ void process_message(event_loop *loop, } /* Handle the task submission. */ - if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { + if (TaskSpec_actor_id(spec).is_nil()) { handle_task_submitted(state, state->algorithm_state, execution_spec); } else { handle_actor_task_submitted(state, state->algorithm_state, @@ -1033,8 +1035,8 @@ void process_message(event_loop *loop, worker->disconnected = true; /* If the disconnected worker was not an actor, start a new worker to make * sure there are enough workers in the pool. */ - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { - start_worker(state, NIL_ACTOR_ID, false); + if (worker->actor_id.is_nil()) { + start_worker(state, ActorID::nil(), false); } } break; case MessageType_EventLogMessage: { @@ -1059,7 +1061,7 @@ void process_message(event_loop *loop, finish_task(state, worker, actor_checkpoint_failed); /* Let the scheduling algorithm process the fact that there is an available * worker. */ - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + if (worker->actor_id.is_nil()) { handle_worker_available(state, state->algorithm_state, worker); } else { handle_actor_worker_available(state, state->algorithm_state, worker, @@ -1080,7 +1082,7 @@ void process_message(event_loop *loop, release_resources(state, worker, cpu_resources); /* Let the scheduling algorithm process the fact that the worker is * blocked. */ - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + if (worker->actor_id.is_nil()) { handle_worker_blocked(state, state->algorithm_state, worker); } else { handle_actor_worker_blocked(state, state->algorithm_state, worker); @@ -1113,7 +1115,7 @@ void process_message(event_loop *loop, acquire_resources(state, worker, cpu_resources); /* Let the scheduling algorithm process the fact that the worker is * unblocked. */ - if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { + if (worker->actor_id.is_nil()) { handle_worker_unblocked(state, state->algorithm_state, worker); } else { handle_actor_worker_unblocked(state, state->algorithm_state, worker); @@ -1156,12 +1158,12 @@ void new_client_connection(event_loop *loop, /* We don't know whether this is a worker or not, so just initialize is_worker * to false. */ worker->is_worker = true; - worker->client_id = NIL_WORKER_ID; + worker->client_id = WorkerID::nil(); worker->task_in_progress = NULL; worker->is_blocked = false; worker->pid = 0; worker->is_child = false; - worker->actor_id = NIL_ACTOR_ID; + worker->actor_id = ActorID::nil(); worker->local_scheduler_state = state; state->workers.push_back(worker); event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, @@ -1203,7 +1205,7 @@ void handle_task_scheduled_callback(Task *original_task, return; } - if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { + if (TaskSpec_actor_id(spec).is_nil()) { /* This task does not involve an actor. Handle it normally. */ handle_task_scheduled(state, state->algorithm_state, *execution_spec); } else { @@ -1249,12 +1251,11 @@ void handle_actor_creation_callback(ActorID actor_id, * changed but that the local scheduler has. */ auto it = state->actor_mapping.find(actor_id); CHECK(it != state->actor_mapping.end()); - CHECK(WorkerID_equal(it->second.driver_id, driver_id)); - CHECK(!DBClientID_equal(it->second.local_scheduler_id, local_scheduler_id)); + CHECK(it->second.driver_id == driver_id); + CHECK(!(it->second.local_scheduler_id == local_scheduler_id)); /* If the actor was previously assigned to this local scheduler, kill the * actor. */ - if (DBClientID_equal(it->second.local_scheduler_id, - get_db_client_id(state->db))) { + if (it->second.local_scheduler_id == get_db_client_id(state->db)) { /* TODO(rkn): We should kill the actor here if it is still around. Also, * if it hasn't registered yet, we should keep track of its PID so we can * kill it anyway. */ @@ -1272,7 +1273,7 @@ void handle_actor_creation_callback(ActorID actor_id, /* If this local scheduler is responsible for the actor, then start a new * worker for the actor. */ - if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) { + if (local_scheduler_id == get_db_client_id(state->db)) { start_worker(state, actor_id, reconstruct); } /* Let the scheduling algorithm process the fact that a new actor has been diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index c6fbf2bc5..ff1cf2645 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -218,9 +218,9 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id, LocalSchedulerClient *worker) { LocalActorInfo entry; - entry.task_counters[NIL_ACTOR_ID] = 0; + entry.task_counters[ActorID::nil()] = 0; entry.assigned_task_counter = -1; - entry.assigned_task_handle_id = NIL_ACTOR_ID; + entry.assigned_task_handle_id = ActorID::nil(); entry.task_queue = new std::list(); entry.worker = worker; entry.worker_available = false; @@ -229,10 +229,8 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, algorithm_state->local_actor_infos[actor_id] = entry; /* Log some useful information about the actor that we created. */ - char id_string[ID_STRING_SIZE]; - LOG_DEBUG("Creating actor with ID %s.", - ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE)); - ARROW_UNUSED(id_string); + std::string id_string = actor_id.hex(); + LOG_DEBUG("Creating actor with ID %s.", id_string.c_str()); } void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { @@ -241,14 +239,12 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { algorithm_state->local_actor_infos.find(actor_id)->second; /* Log some useful information about the actor that we're removing. */ - char id_string[ID_STRING_SIZE]; + std::string id_string = actor_id.hex(); size_t count = entry.task_queue->size(); if (count > 0) { LOG_WARN("Removing actor with ID %s and %lld remaining tasks.", - ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE), - (long long) count); + id_string.c_str(), (long long) count); } - ARROW_UNUSED(id_string); entry.task_queue->clear(); delete entry.task_queue; @@ -271,7 +267,7 @@ bool dispatch_actor_task(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { /* Make sure this worker actually is an actor. */ - CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); + CHECK(!actor_id.is_nil()); /* Return if this actor doesn't have any pending tasks. */ if (algorithm_state->actors_with_pending_tasks.find(actor_id) == algorithm_state->actors_with_pending_tasks.end()) { @@ -283,8 +279,8 @@ bool dispatch_actor_task(LocalSchedulerState *state, * scheduler. This should be rare. */ return false; } - CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id, - get_db_client_id(state->db))) + CHECK(state->actor_mapping[actor_id].local_scheduler_id == + get_db_client_id(state->db)); /* Get the local actor entry for this actor. */ CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0); @@ -370,7 +366,7 @@ void finish_killed_task(LocalSchedulerState *state, // TODO(ekl): this writes an invalid arrow object, which is sufficient to // signal that the worker failed, but it would be nice to return more // detailed failure metadata in the future. - Status status = + arrow::Status status = state->plasma_conn->Create(object_id.to_plasma_id(), 1, NULL, 0, &data); if (!status.IsPlasmaObjectExists()) { ARROW_CHECK_OK(status); @@ -444,8 +440,7 @@ void insert_actor_task_queue(LocalSchedulerState *state, for (; it != entry.task_queue->end(); it++) { TaskSpec *pending_task_spec = it->Spec(); /* Skip tasks submitted by a different handle. */ - if (!ActorID_equal(task_handle_id, - TaskSpec_actor_handle_id(pending_task_spec))) { + if (!(task_handle_id == TaskSpec_actor_handle_id(pending_task_spec))) { continue; } /* A duplicate task submitted by the same handle. */ @@ -485,7 +480,7 @@ void queue_actor_task(LocalSchedulerState *state, bool from_global_scheduler) { TaskSpec *spec = execution_spec.Spec(); ActorID actor_id = TaskSpec_actor_id(spec); - DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); + DCHECK(!actor_id.is_nil()); /* Update the task table. */ if (state->db != NULL) { @@ -768,7 +763,7 @@ void dispatch_tasks(LocalSchedulerState *state, if (state->child_pids.size() == 0) { /* If there are no workers, including those pending PID registration, * then we must start a new one to replenish the worker pool. */ - start_worker(state, NIL_ACTOR_ID, false); + start_worker(state, ActorID::nil(), false); } return; } @@ -986,7 +981,7 @@ void give_task_to_local_scheduler(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, TaskExecutionSpec &execution_spec, DBClientID local_scheduler_id) { - if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) { + if (local_scheduler_id == get_db_client_id(state->db)) { LOG_WARN("Local scheduler is trying to assign a task to itself."); } CHECK(state->db != NULL); @@ -1034,7 +1029,8 @@ void give_task_to_global_scheduler(LocalSchedulerState *state, } /* Pass on the task to the global scheduler. */ DCHECK(state->config.global_scheduler_exists); - Task *task = Task_alloc(execution_spec, TASK_STATUS_WAITING, NIL_ID); + Task *task = + Task_alloc(execution_spec, TASK_STATUS_WAITING, DBClientID::nil()); DCHECK(state->db != NULL); auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. @@ -1102,8 +1098,8 @@ void handle_actor_task_submitted(LocalSchedulerState *state, return; } - if (DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id, - get_db_client_id(state->db))) { + if (state->actor_mapping[actor_id].local_scheduler_id == + get_db_client_id(state->db)) { /* This local scheduler is responsible for the actor, so handle the task * locally. */ queue_task_locally(state, algorithm_state, execution_spec, false); @@ -1167,8 +1163,8 @@ void handle_actor_task_scheduled(LocalSchedulerState *state, DCHECK(TaskSpec_is_actor_task(spec)); ActorID actor_id = TaskSpec_actor_id(spec); if (state->actor_mapping.count(actor_id) == 1) { - DCHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id, - get_db_client_id(state->db))); + DCHECK(state->actor_mapping[actor_id].local_scheduler_id == + get_db_client_id(state->db)); } else { /* This means that an actor has been assigned to this local scheduler, and a * task for that actor has been received by this local scheduler, but this @@ -1212,7 +1208,7 @@ void handle_worker_removed(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, LocalSchedulerClient *worker) { /* Make sure this is not an actor. */ - CHECK(ActorID_equal(worker->actor_id, NIL_ACTOR_ID)); + CHECK(worker->actor_id.is_nil()); /* Make sure that we remove the worker at most once. */ int num_times_removed = 0; @@ -1281,7 +1277,7 @@ void handle_actor_worker_available(LocalSchedulerState *state, LocalSchedulerClient *worker, bool actor_checkpoint_failed) { ActorID actor_id = worker->actor_id; - CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); + CHECK(!actor_id.is_nil()); /* Get the actor info for this worker. */ CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1); LocalActorInfo &entry = @@ -1302,7 +1298,7 @@ void handle_actor_worker_available(LocalSchedulerState *state, } } entry.assigned_task_counter = -1; - entry.assigned_task_handle_id = NIL_ACTOR_ID; + entry.assigned_task_handle_id = ActorID::nil(); entry.worker_available = true; /* Assign new tasks if possible. */ dispatch_all_tasks(state, algorithm_state); @@ -1455,7 +1451,7 @@ void handle_object_removed(LocalSchedulerState *state, int count = it->DependencyIdCount(i); for (int j = 0; j < count; ++j) { ObjectID dependency_id = it->DependencyId(i, j); - if (ObjectID_equal(dependency_id, removed_object_id)) { + if (dependency_id == removed_object_id) { /* Do not request a transfer from other plasma managers if this is an * execution dependency. */ bool request_transfer = it->IsStaticDependency(i); @@ -1483,7 +1479,7 @@ void handle_driver_removed(LocalSchedulerState *state, /* If the dependent task was a task for the removed driver, remove it from * this vector. */ TaskSpec *spec = (*task_it_it)->Spec(); - if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { + if (TaskSpec_driver_id(spec) == driver_id) { task_it_it = it->second.dependent_tasks.erase(task_it_it); } else { task_it_it++; @@ -1502,7 +1498,7 @@ void handle_driver_removed(LocalSchedulerState *state, auto it = algorithm_state->waiting_task_queue->begin(); while (it != algorithm_state->waiting_task_queue->end()) { TaskSpec *spec = it->Spec(); - if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { + if (TaskSpec_driver_id(spec) == driver_id) { it = algorithm_state->waiting_task_queue->erase(it); } else { it++; @@ -1513,7 +1509,7 @@ void handle_driver_removed(LocalSchedulerState *state, it = algorithm_state->dispatch_task_queue->begin(); while (it != algorithm_state->dispatch_task_queue->end()) { TaskSpec *spec = it->Spec(); - if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { + if (TaskSpec_driver_id(spec) == driver_id) { it = algorithm_state->dispatch_task_queue->erase(it); } else { it++; diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 32fa40614..1d63d8eb1 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -49,7 +49,7 @@ LocalSchedulerConnection *LocalSchedulerConnection_init( result->gpu_ids.push_back(reply_message->gpu_ids()->Get(i)); } /* If the worker is not an actor, there should not be any GPU IDs here. */ - if (ActorID_equal(result->actor_id, NIL_ACTOR_ID)) { + if (ActorID_equal(result->actor_id, ActorID::nil())) { CHECK(reply_message->gpu_ids()->size() == 0); } @@ -127,7 +127,7 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, /* Set the GPU IDs for this task. We only do this for non-actor tasks because * for actors the GPUs are associated with the actor itself and not with the * actor methods. */ - if (ActorID_equal(conn->actor_id, NIL_ACTOR_ID)) { + if (ActorID_equal(conn->actor_id, ActorID::nil())) { conn->gpu_ids.clear(); for (size_t i = 0; i < reply_message->gpu_ids()->size(); ++i) { conn->gpu_ids.push_back(reply_message->gpu_ids()->Get(i)); diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 6e532c3c6..792245ef6 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -126,7 +126,7 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, for (int i = 0; i < num_mock_workers; ++i) { mock->conns[i] = LocalSchedulerConnection_init(local_scheduler_socket_name.c_str(), - NIL_WORKER_ID, NIL_ACTOR_ID, true, 0); + WorkerID::nil(), ActorID::nil(), true, 0); } background_thread.join(); @@ -194,11 +194,11 @@ TEST object_reconstruction_test(void) { ASSERT(db_shards_addresses.size() == 1); context = redisConnect(db_shards_addresses[0].c_str(), db_shards_ports[0]); redisReply *reply = (redisReply *) redisCommand( - context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.id, - sizeof(return_id.id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); + context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.data(), + sizeof(return_id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); freeReplyObject(reply); reply = (redisReply *) redisCommand(context, "RAY.OBJECT_TABLE_REMOVE %b %s", - return_id.id, sizeof(return_id.id), + return_id.data(), sizeof(return_id), client_id); freeReplyObject(reply); redisFree(context); @@ -293,12 +293,12 @@ TEST object_reconstruction_recursive_test(void) { for (int i = 0; i < NUM_TASKS; ++i) { ObjectID return_id = TaskSpec_return(specs[i].Spec(), 0); redisReply *reply = (redisReply *) redisCommand( - context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.id, - sizeof(return_id.id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); + context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.data(), + sizeof(return_id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); freeReplyObject(reply); reply = (redisReply *) redisCommand( - context, "RAY.OBJECT_TABLE_REMOVE %b %s", return_id.id, - sizeof(return_id.id), client_id); + context, "RAY.OBJECT_TABLE_REMOVE %b %s", return_id.data(), + sizeof(return_id), client_id); freeReplyObject(reply); } redisFree(context); @@ -634,7 +634,7 @@ TEST start_kill_workers_test(void) { static_cast(num_workers - 1)); /* Start a worker after the local scheduler has been initialized. */ - start_worker(local_scheduler->local_scheduler_state, NIL_ACTOR_ID, false); + start_worker(local_scheduler->local_scheduler_state, ActorID::nil(), false); /* Accept the workers as clients to the plasma manager. */ int new_worker_fd = accept_client(local_scheduler->plasma_manager_fd); /* The new worker should register its process ID. */ diff --git a/src/local_scheduler/test/run_valgrind.sh b/src/local_scheduler/test/run_valgrind.sh index 180f039c7..912b0cdbe 100644 --- a/src/local_scheduler/test/run_valgrind.sh +++ b/src/local_scheduler/test/run_valgrind.sh @@ -17,7 +17,7 @@ sleep 1s ./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 100000000 & sleep 0.5s -valgrind --leak-check=full --show-leak-kinds=all --error-exitcode=1 ./src/local_scheduler/local_scheduler_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/local_scheduler/local_scheduler_tests ./src/common/thirdparty/redis/src/redis-cli shutdown ./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown killall plasma_store diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt index d3748ada4..56c79f9c3 100644 --- a/src/plasma/CMakeLists.txt +++ b/src/plasma/CMakeLists.txt @@ -5,12 +5,6 @@ project(plasma) # Recursively include common include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) -# Include plasma -list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/../thirdparty/arrow/python/cmake_modules) - -find_package(Plasma) -include_directories(SYSTEM ${PLASMA_INCLUDE_DIR}) - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -O3") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -O3 -Werror -Wall") @@ -28,7 +22,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") add_executable(plasma_manager plasma_manager.cc) -target_link_libraries(plasma_manager common ${PLASMA_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a -lpthread) +target_link_libraries(plasma_manager common ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread) -define_test(client_tests ${PLASMA_STATIC_LIB}) -define_test(manager_tests ${PLASMA_STATIC_LIB} plasma_manager.cc) +define_test(client_tests "") +define_test(manager_tests "" plasma_manager.cc) diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index f5ad1260b..e7e648dbd 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -1256,14 +1256,15 @@ void log_object_hash_mismatch_error_task_callback(Task *task, /* Push the error to the Python driver that caused the nondeterministic task * to be submitted. */ push_error(state->db, TaskSpec_driver_id(spec), - OBJECT_HASH_MISMATCH_ERROR_INDEX, sizeof(function), function.id); + OBJECT_HASH_MISMATCH_ERROR_INDEX, sizeof(function), + function.data()); } void log_object_hash_mismatch_error_result_callback(ObjectID object_id, TaskID task_id, bool is_put, void *user_context) { - CHECK(!IS_NIL_ID(task_id)); + CHECK(!task_id.is_nil()); PlasmaManagerState *state = (PlasmaManagerState *) user_context; /* Get the specification for the nondeterministic task. */ task_table_get_task(state->db, task_id, NULL, diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index c147dfb4a..0b3f374eb 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -59,7 +59,7 @@ void PlasmaManagerState_free(PlasmaManagerState *state); * manager. */ void process_transfer(event_loop *loop, - ObjectID object_id, + ray::ObjectID object_id, uint8_t addr[4], int port, ClientConnection *conn); @@ -81,7 +81,7 @@ void process_transfer(event_loop *loop, */ void process_data(event_loop *loop, int client_sock, - ObjectID object_id, + ray::ObjectID object_id, int64_t data_size, int64_t metadata_size, ClientConnection *conn); @@ -157,7 +157,7 @@ ClientConnection *ClientConnection_listen(event_loop *loop, /* Buffer for requests between plasma managers. */ typedef struct PlasmaRequestBuffer { int type; - ObjectID object_id; + ray::ObjectID object_id; uint8_t *data; int64_t data_size; uint8_t *metadata; @@ -175,7 +175,7 @@ typedef struct PlasmaRequestBuffer { * @param context The plasma manager state. * @return Void. */ -void call_request_transfer(ObjectID object_id, +void call_request_transfer(ray::ObjectID object_id, const std::vector &manager_vector, void *context); @@ -272,6 +272,6 @@ int get_client_sock(ClientConnection *conn); * @return A bool that is true if the requested object is local and false * otherwise. */ -bool is_object_local(PlasmaManagerState *state, ObjectID object_id); +bool is_object_local(PlasmaManagerState *state, ray::ObjectID object_id); #endif /* PLASMA_MANAGER_H */ diff --git a/src/plasma/test/manager_tests.cc b/src/plasma/test/manager_tests.cc index d3fb24290..22633c0af 100644 --- a/src/plasma/test/manager_tests.cc +++ b/src/plasma/test/manager_tests.cc @@ -135,7 +135,7 @@ TEST request_transfer_test(void) { int port; ARROW_CHECK_OK(plasma::ReadDataRequest( request_data.data(), request_data.size(), &object_id2, &address, &port)); - ASSERT(ObjectID_equal(object_id, object_id2)); + ASSERT(object_id == object_id2); free(address); /* Clean up. */ destroy_plasma_mock(remote_mock); @@ -188,7 +188,7 @@ TEST request_transfer_retry_test(void) { ARROW_CHECK_OK(plasma::ReadDataRequest( request_data.data(), request_data.size(), &object_id2, &address, &port)); free(address); - ASSERT(ObjectID_equal(object_id, object_id2)); + ASSERT(object_id == object_id2); /* Clean up. */ destroy_plasma_mock(remote_mock2); destroy_plasma_mock(remote_mock1); @@ -254,9 +254,9 @@ TEST object_notifications_test(void) { int flags = fcntl(fd[1], F_GETFL, 0); CHECK(fcntl(fd[1], F_SETFL, flags | O_NONBLOCK) == 0); - ObjectID object_id = globally_unique_id(); + ObjectID object_id = ObjectID::from_random(); ObjectInfoT info; - info.object_id = std::string((char *) &object_id.id[0], sizeof(object_id)); + info.object_id = object_id.binary(); info.data_size = 10; info.metadata_size = 1; info.create_time = 0; diff --git a/src/plasma/test/run_valgrind.sh b/src/plasma/test/run_valgrind.sh index 58263a932..8addb4354 100644 --- a/src/plasma/test/run_valgrind.sh +++ b/src/plasma/test/run_valgrind.sh @@ -7,5 +7,5 @@ set -e ./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 0 & sleep 1 -valgrind --leak-check=full --error-exitcode=1 ./src/plasma/manager_tests +valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/plasma/manager_tests killall plasma_store diff --git a/src/ray/CMakeLists.txt b/src/ray/CMakeLists.txt index c1be6700b..e40d8cc10 100644 --- a/src/ray/CMakeLists.txt +++ b/src/ray/CMakeLists.txt @@ -46,7 +46,5 @@ install( ADD_RAY_LIB(ray SOURCES ${RAY_SRCS} ${AE_SRCS} ${HIREDIS_SRCS} DEPENDENCIES gen_gcs_fbs - SHARED_LINK_LIBS - STATIC_LINK_LIBS) - -set(RAY_TEST_LINK_LIBS ray_static gtest gtest_main) + SHARED_LINK_LIBS "" + STATIC_LINK_LIBS "") diff --git a/src/ray/api.h b/src/ray/api.h new file mode 100644 index 000000000..e69de29bb diff --git a/src/ray/gcs/CMakeLists.txt b/src/ray/gcs/CMakeLists.txt index 624f78264..c537f0f5c 100644 --- a/src/ray/gcs/CMakeLists.txt +++ b/src/ray/gcs/CMakeLists.txt @@ -19,7 +19,7 @@ add_custom_command( add_custom_target(gen_gcs_fbs DEPENDS ${GCS_FBS_OUTPUT_FILES}) -ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static gtest gtest_main pthread) +ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread) install(FILES client.h diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index 601468a67..dcd310ad0 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -12,7 +12,7 @@ AsyncGcsClient::~AsyncGcsClient() {} Status AsyncGcsClient::Connect(const std::string &address, int port) { context_.reset(new RedisContext()); - RETURN_NOT_OK(context_->Connect(address, port)); + RAY_RETURN_NOT_OK(context_->Connect(address, port)); object_table_.reset(new ObjectTable(context_)); task_table_.reset(new TaskTable(context_)); return Status::OK(); diff --git a/src/ray/gcs/client.h b/src/ray/gcs/client.h index a1db98b30..643ca3e99 100644 --- a/src/ray/gcs/client.h +++ b/src/ray/gcs/client.h @@ -16,7 +16,7 @@ namespace gcs { class RedisContext; -class AsyncGcsClient { +class RAY_EXPORT AsyncGcsClient { public: AsyncGcsClient(); ~AsyncGcsClient(); diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 14f593e42..76962254e 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -57,13 +57,13 @@ TEST_F(TestGcs, TestObjectTable) { void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id, std::shared_ptr data) { - ASSERT_EQ(data->scheduling_state, 3); + ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED); } void TaskLookup(gcs::AsyncGcsClient *client, const TaskID &id, std::shared_ptr data) { - ASSERT_EQ(data->scheduling_state, 3); + ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED); aeStop(loop); } @@ -71,7 +71,7 @@ TEST_F(TestGcs, TestTaskTable) { loop = aeCreateEventLoop(1024); RAY_CHECK_OK(client_.context()->AttachToEventLoop(loop)); auto data = std::make_shared(); - data->scheduling_state = 3; + data->scheduling_state = SchedulingState_SCHEDULED; TaskID task_id = TaskID::from_random(); RAY_CHECK_OK(client_.task_table().Add(job_id_, task_id, data, &TaskAdded)); RAY_CHECK_OK( diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 333f2f233..e15d7714b 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -18,8 +18,19 @@ table ObjectTableData { managers: [string]; } +enum SchedulingState:int { + NONE = 0, + WAITING = 1, + SCHEDULED = 2, + QUEUED = 4, + RUNNING = 8, + DONE = 16, + LOST = 32, + RECONSTRUCTING = 64 +} + table TaskTableData { - scheduling_state: int; + scheduling_state: SchedulingState; scheduler_id: string; execution_arg_ids: [string]; task_info: string; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index c93af861a..e9cdcb216 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -51,9 +51,9 @@ class Table { const std::string &data) { (d->callback)(d->client, d->id, d->data); }); flatbuffers::FlatBufferBuilder fbb; fbb.Finish(Data::Pack(fbb, data.get())); - RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_ADD", id, - fbb.GetBufferPointer(), fbb.GetSize(), - callback_index)); + RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_ADD", id, + fbb.GetBufferPointer(), fbb.GetSize(), + callback_index)); return Status::OK(); } @@ -72,8 +72,8 @@ class Table { (d->callback)(d->client, d->id, result); }); std::vector nil; - RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), - nil.size(), callback_index)); + RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), + nil.size(), callback_index)); return Status::OK(); } diff --git a/src/ray/id.cc b/src/ray/id.cc index f15a702b2..1437b7e49 100644 --- a/src/ray/id.cc +++ b/src/ray/id.cc @@ -4,6 +4,10 @@ namespace ray { +UniqueID::UniqueID(const plasma::UniqueID &from) { + std::memcpy(&id_, from.data(), kUniqueIDSize); +} + UniqueID UniqueID::from_random() { UniqueID id; uint8_t *data = id.mutable_data(); @@ -22,16 +26,19 @@ UniqueID UniqueID::from_binary(const std::string &binary) { const UniqueID UniqueID::nil() { UniqueID result; - std::fill_n(result.id_, kUniqueIDSize, 255); + uint8_t *data = result.mutable_data(); + std::fill_n(data, kUniqueIDSize, 255); return result; } -bool is_nil(const UniqueID &rhs) { - int i = kUniqueIDSize; - const uint8_t *data = rhs.data(); - while (--i > 0 && data[i] == 255) { +bool UniqueID::is_nil() const { + const uint8_t *d = data(); + for (int i = 0; i < kUniqueIDSize; ++i) { + if (d[i] != 255) { + return false; + } } - return i != 0; + return true; } const uint8_t *UniqueID::data() const { @@ -61,6 +68,12 @@ std::string UniqueID::hex() const { return result; } +plasma::UniqueID UniqueID::to_plasma_id() { + plasma::UniqueID result; + std::memcpy(result.mutable_data(), &id_, kUniqueIDSize); + return result; +} + bool UniqueID::operator==(const UniqueID &rhs) const { return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0; } diff --git a/src/ray/id.h b/src/ray/id.h index 299e5f006..2e3cf5582 100644 --- a/src/ray/id.h +++ b/src/ray/id.h @@ -6,6 +6,7 @@ #include #include +#include "plasma/common.h" #include "ray/constants.h" #include "ray/util/visibility.h" @@ -13,22 +14,26 @@ namespace ray { class RAY_EXPORT UniqueID { public: + UniqueID() {} + UniqueID(const plasma::UniqueID &from); static UniqueID from_random(); static UniqueID from_binary(const std::string &binary); static const UniqueID nil(); - bool is_nil(const UniqueID &rhs) const; + bool is_nil() const; bool operator==(const UniqueID &rhs) const; const uint8_t *data() const; uint8_t *mutable_data(); size_t size() const; std::string binary() const; std::string hex() const; + plasma::UniqueID to_plasma_id(); private: uint8_t id_[kUniqueIDSize]; }; -static_assert(std::is_pod::value, "UniqueID must be plain old data"); +static_assert(std::is_standard_layout::value, + "UniqueID must be standard"); struct UniqueIDHasher { // ID hashing function. @@ -47,6 +52,7 @@ typedef UniqueID ClassID; typedef UniqueID ActorID; typedef UniqueID ActorHandleID; typedef UniqueID WorkerID; +typedef UniqueID DriverID; typedef UniqueID DBClientID; typedef UniqueID ConfigID; diff --git a/src/ray/status.h b/src/ray/status.h index a8c3d2715..74f3e76b4 100644 --- a/src/ray/status.h +++ b/src/ray/status.h @@ -45,23 +45,6 @@ namespace ray { -#define RETURN_NOT_OK(s) \ - do { \ - Status _s = (s); \ - if (RAY_PREDICT_FALSE(!_s.ok())) { \ - return _s; \ - } \ - } while (0) - -#define RETURN_NOT_OK_ELSE(s, else_) \ - do { \ - Status _s = (s); \ - if (!_s.ok()) { \ - else_; \ - return _s; \ - } \ - } while (0) - enum class StatusCode : char { OK = 0, OutOfMemory = 1, diff --git a/src/ray/util/logging.h b/src/ray/util/logging.h index 5d6b6cc27..aa65fc116 100644 --- a/src/ray/util/logging.h +++ b/src/ray/util/logging.h @@ -34,51 +34,6 @@ namespace ray { << __FILE__ << __LINE__ \ << " Check failed: " #condition " " -#ifdef NDEBUG -#define RAY_DFATAL RAY_WARNING - -#define DCHECK(condition) \ - RAY_IGNORE_EXPR(condition) \ - while (false) \ - ::ray::internal::NullLog() -#define DCHECK_EQ(val1, val2) \ - RAY_IGNORE_EXPR(val1) \ - while (false) \ - ::ray::internal::NullLog() -#define DCHECK_NE(val1, val2) \ - RAY_IGNORE_EXPR(val1) \ - while (false) \ - ::ray::internal::NullLog() -#define DCHECK_LE(val1, val2) \ - RAY_IGNORE_EXPR(val1) \ - while (false) \ - ::ray::internal::NullLog() -#define DCHECK_LT(val1, val2) \ - RAY_IGNORE_EXPR(val1) \ - while (false) \ - ::ray::internal::NullLog() -#define DCHECK_GE(val1, val2) \ - RAY_IGNORE_EXPR(val1) \ - while (false) \ - ::ray::internal::NullLog() -#define DCHECK_GT(val1, val2) \ - RAY_IGNORE_EXPR(val1) \ - while (false) \ - ::ray::internal::NullLog() - -#else -#define RAY_DFATAL RAY_FATAL - -#define DCHECK(condition) RAY_CHECK(condition) -#define DCHECK_EQ(val1, val2) RAY_CHECK((val1) == (val2)) -#define DCHECK_NE(val1, val2) RAY_CHECK((val1) != (val2)) -#define DCHECK_LE(val1, val2) RAY_CHECK((val1) <= (val2)) -#define DCHECK_LT(val1, val2) RAY_CHECK((val1) < (val2)) -#define DCHECK_GE(val1, val2) RAY_CHECK((val1) >= (val2)) -#define DCHECK_GT(val1, val2) RAY_CHECK((val1) > (val2)) - -#endif // NDEBUG - namespace internal { class NullLog { diff --git a/src/thirdparty/download_thirdparty.sh b/src/thirdparty/download_thirdparty.sh index 76e33f7af..42af51f87 100755 --- a/src/thirdparty/download_thirdparty.sh +++ b/src/thirdparty/download_thirdparty.sh @@ -13,4 +13,4 @@ fi cd $TP_DIR/arrow git fetch origin master -git checkout 9895181610c0a5ef1ba836300ea2036e1a3e5621 +git checkout 65f5add61829bd413aa8175ad2644cdd362d0c50