Second Part of Internal API Refactor (#1326)

This commit is contained in:
Philipp Moritz
2017-12-26 16:22:04 -08:00
committed by Robert Nishihara
parent 4bb5b6bd5b
commit 3d224c4edf
58 changed files with 537 additions and 677 deletions
+9 -6
View File
@@ -4,6 +4,11 @@ project(ray)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/Modules")
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/src/thirdparty/arrow/python/cmake_modules)
find_package(Arrow)
find_package(Plasma)
# This ensures that things like gnu++11 get passed correctly
set(CMAKE_CXX_STANDARD 11)
@@ -11,11 +16,11 @@ set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
option(RAY_BUILD_STATIC
"Build the libarrow static libraries"
"Build the libray static libraries"
ON)
option(RAY_BUILD_SHARED
"Build the libarrow shared libraries"
"Build the libray shared libraries"
ON)
option(RAY_BUILD_TESTS
@@ -29,10 +34,8 @@ enable_testing()
include(ThirdpartyToolchain)
set(ARROW_DIR "${CMAKE_CURRENT_LIST_DIR}/src/thirdparty/arrow/"
CACHE STRING "Path of the arrow source directory")
include_directories("${ARROW_DIR}/cpp/src/")
include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
include_directories(SYSTEM ${PLASMA_INCLUDE_DIR})
include_directories("${CMAKE_CURRENT_LIST_DIR}/src/")
add_subdirectory(${CMAKE_CURRENT_LIST_DIR}/src/ray/)
+77
View File
@@ -0,0 +1,77 @@
# - Find RAY
# This module defines
# RAY_INCLUDE_DIR, directory containing headers
# RAY_LIBS, directory containing ray libraries
# RAY_STATIC_LIB, path to static library
# RAY_SHARED_LIB, path to shared library
# RAY_FOUND, whether ray has been found
include(FindPkgConfig)
if ("$ENV{RAY_HOME}" STREQUAL "")
pkg_check_modules(RAY ray)
if (RAY_FOUND)
pkg_get_variable(RAY_ABI_VERSION ray abi_version)
message(STATUS "Ray ABI version: ${RAY_ABI_VERSION}")
pkg_get_variable(RAY_SO_VERSION ray so_version)
message(STATUS "Ray SO version: ${RAY_SO_VERSION}")
set(RAY_INCLUDE_DIR ${RAY_INCLUDE_DIRS})
set(RAY_LIBS ${RAY_LIBRARY_DIRS})
set(RAY_SEARCH_LIB_PATH ${RAY_LIBRARY_DIRS})
endif()
else()
set(RAY_HOME "$ENV{RAY_HOME}")
set(RAY_SEARCH_HEADER_PATHS
${RAY_HOME}/include
)
set(RAY_SEARCH_LIB_PATH
${RAY_HOME}/lib
)
find_path(RAY_INCLUDE_DIR ray/gcs/client.h PATHS
${RAY_SEARCH_HEADER_PATHS}
# make sure we don't accidentally pick up a different version
NO_DEFAULT_PATH
)
endif()
find_library(RAY_LIB_PATH NAMES ray
PATHS
${RAY_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)
get_filename_component(RAY_LIBS ${RAY_LIB_PATH} DIRECTORY)
if (RAY_INCLUDE_DIR AND RAY_LIBS)
set(RAY_FOUND TRUE)
set(RAY_LIB_NAME ray)
set(RAY_STATIC_LIB ${RAY_LIBS}/lib${RAY_LIB_NAME}.a)
set(RAY_SHARED_LIB ${RAY_LIBS}/lib${RAY_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
endif()
if (RAY_FOUND)
if (NOT Ray_FIND_QUIETLY)
message(STATUS "Found the Ray core library: ${RAY_LIB_PATH}")
endif ()
else ()
if (NOT Ray_FIND_QUIETLY)
set(RAY_ERR_MSG "Could not find the Ray library. Looked for headers")
set(RAY_ERR_MSG "${RAY_ERR_MSG} in ${RAY_SEARCH_HEADER_PATHS}, and for libs")
set(RAY_ERR_MSG "${RAY_ERR_MSG} in ${RAY_SEARCH_LIB_PATH}")
if (Ray_FIND_REQUIRED)
message(FATAL_ERROR "${RAY_ERR_MSG}")
else (Ray_FIND_REQUIRED)
message(STATUS "${RAY_ERR_MSG}")
endif (Ray_FIND_REQUIRED)
endif ()
set(RAY_FOUND FALSE)
endif ()
mark_as_advanced(
RAY_INCLUDE_DIR
RAY_STATIC_LIB
RAY_SHARED_LIB
)
@@ -41,6 +41,7 @@ def start_global_scheduler(redis_address, node_ip_address,
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--leak-check-heuristics=stdstring",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
+1 -2
View File
@@ -78,11 +78,10 @@ target_link_libraries(common "${CMAKE_CURRENT_LIST_DIR}/thirdparty/hiredis/libhi
function(define_test test_name library)
add_executable(${test_name} test/${test_name}.cc ${ARGN})
add_dependencies(${test_name} hiredis flatbuffers_ep)
target_link_libraries(${test_name} common ${FLATBUFFERS_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a ${library} -lpthread)
target_link_libraries(${test_name} common ${FLATBUFFERS_STATIC_LIB} ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${library} -lpthread)
target_compile_options(${test_name} PUBLIC "-DPLASMA_TEST -DLOCAL_SCHEDULER_TEST -DCOMMON_TEST -DRAY_COMMON_LOG_LEVEL=4")
endfunction()
define_test(common_tests "")
define_test(db_tests "")
define_test(io_tests "")
define_test(task_tests "")
-55
View File
@@ -10,63 +10,8 @@
#include "io.h"
#include <functional>
const UniqueID NIL_ID = UniqueID::nil();
const unsigned char NIL_DIGEST[DIGEST_SIZE] = {0};
UniqueID globally_unique_id(void) {
/* Use /dev/urandom for "real" randomness. */
int fd;
int const flags = 0 /* for Windows compatibility */;
if ((fd = open("/dev/urandom", O_RDONLY, flags)) == -1) {
LOG_ERROR("Could not generate random number");
}
UniqueID result;
CHECK(read_bytes(fd, &result.id[0], UNIQUE_ID_SIZE) >= 0);
close(fd);
return result;
}
/* ObjectID equality function. */
bool operator==(const ObjectID &x, const ObjectID &y) {
return UNIQUE_ID_EQ(x, y);
}
bool ObjectID_equal(ObjectID first_id, ObjectID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}
bool ObjectID_is_nil(ObjectID id) {
return ObjectID_equal(id, NIL_OBJECT_ID);
}
bool DBClientID_equal(DBClientID first_id, DBClientID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}
bool DBClientID_is_nil(DBClientID id) {
return IS_NIL_ID(id);
}
bool WorkerID_equal(WorkerID first_id, WorkerID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}
char *ObjectID_to_string(ObjectID obj_id, char *id_string, int id_length) {
CHECK(id_length >= ID_STRING_SIZE);
static const char hex[] = "0123456789abcdef";
char *buf = id_string;
for (int i = 0; i < UNIQUE_ID_SIZE; i++) {
unsigned int val = obj_id.id[i];
*buf++ = hex[val >> 4];
*buf++ = hex[val & 0xf];
}
*buf = '\0';
return id_string;
}
int64_t current_time_ms() {
std::chrono::milliseconds ms_since_epoch =
std::chrono::duration_cast<std::chrono::milliseconds>(
+4 -111
View File
@@ -22,8 +22,9 @@ extern "C" {
}
#endif
#include "plasma/common.h"
#include "arrow/util/macros.h"
#include "plasma/common.h"
#include "ray/id.h"
#include "state/ray_config.h"
@@ -113,118 +114,10 @@ extern "C" {
* and is responsible for freeing it. */
#define OWNER
/** Definitions for unique ID types. */
#define UNIQUE_ID_SIZE 20
#define UNIQUE_ID_EQ(id1, id2) (memcmp((id1).id, (id2).id, UNIQUE_ID_SIZE) == 0)
#define IS_NIL_ID(id) UNIQUE_ID_EQ(id, NIL_ID)
struct UniqueID {
unsigned char id[UNIQUE_ID_SIZE];
UniqueID(const plasma::UniqueID &from) {
memcpy(&id[0], from.data(), UNIQUE_ID_SIZE);
}
UniqueID() {}
static const UniqueID nil() {
UniqueID result;
std::fill_n(result.id, UNIQUE_ID_SIZE, 255);
return result;
}
plasma::UniqueID to_plasma_id() {
plasma::UniqueID result;
memcpy(result.mutable_data(), &id[0], UNIQUE_ID_SIZE);
return result;
}
};
extern const UniqueID NIL_ID;
/* Generate a globally unique ID. */
UniqueID globally_unique_id(void);
#define NIL_OBJECT_ID NIL_ID
#define NIL_WORKER_ID NIL_ID
/** The object ID is the type used to identify objects. */
typedef UniqueID ObjectID;
#ifdef __cplusplus
struct UniqueIDHasher {
/* ObjectID hashing function. */
size_t operator()(const UniqueID &id) const {
size_t result;
memcpy(&result, id.id, sizeof(size_t));
return result;
}
};
bool operator==(const ObjectID &x, const ObjectID &y);
#endif
#define ID_STRING_SIZE (2 * UNIQUE_ID_SIZE + 1)
/**
* Convert an object ID to a hexdecimal string. This function assumes that
* buffer points to an already allocated char array of size ID_STRING_SIZE. And
* it writes a null-terminated hex-formatted string to id_string.
*
* @param obj_id The object ID to convert to a string.
* @param id_string A buffer to write the string to. It is assumed that this is
* managed by the caller and is sufficiently long to store the object ID
* string.
* @param id_length The length of the id_string buffer.
*/
char *ObjectID_to_string(ObjectID obj_id, char *id_string, int id_length);
/**
* Compare two object IDs.
*
* @param first_id The first object ID to compare.
* @param second_id The first object ID to compare.
* @return True if the object IDs are the same and false otherwise.
*/
bool ObjectID_equal(ObjectID first_id, ObjectID second_id);
/**
* Compare a object ID to the nil ID.
*
* @param id The object ID to compare to nil.
* @return True if the object ID is equal to nil.
*/
bool ObjectID_is_nil(ObjectID id);
/** The worker ID is the ID of a worker or driver. */
typedef UniqueID WorkerID;
typedef ray::UniqueID WorkerID;
/**
* Compare two worker IDs.
*
* @param first_id The first worker ID to compare.
* @param second_id The first worker ID to compare.
* @return True if the worker IDs are the same and false otherwise.
*/
bool WorkerID_equal(WorkerID first_id, WorkerID second_id);
typedef UniqueID DBClientID;
/**
* Compare two db client IDs.
*
* @param first_id The first db client ID to compare.
* @param second_id The first db client ID to compare.
* @return True if the db client IDs are the same and false otherwise.
*/
bool DBClientID_equal(DBClientID first_id, DBClientID second_id);
/**
* Compare a db client ID to the nil ID.
*
* @param id The db client ID to compare to nil.
* @return True if the db client ID is equal to nil.
*/
bool DBClientID_is_nil(ObjectID id);
typedef ray::UniqueID DBClientID;
#define MAX(x, y) ((x) >= (y) ? (x) : (y))
#define MIN(x, y) ((x) <= (y) ? (x) : (y))
+11 -10
View File
@@ -2,21 +2,22 @@
flatbuffers::Offset<flatbuffers::String> to_flatbuf(
flatbuffers::FlatBufferBuilder &fbb,
ObjectID object_id) {
return fbb.CreateString((char *) &object_id.id[0], sizeof(object_id.id));
ray::ObjectID object_id) {
return fbb.CreateString(reinterpret_cast<const char *>(object_id.data()),
sizeof(ray::ObjectID));
}
ObjectID from_flatbuf(const flatbuffers::String &string) {
ObjectID object_id;
CHECK(string.size() == sizeof(object_id.id));
memcpy(&object_id.id[0], string.data(), sizeof(object_id.id));
ray::ObjectID from_flatbuf(const flatbuffers::String &string) {
ray::ObjectID object_id;
CHECK(string.size() == sizeof(ray::ObjectID));
memcpy(object_id.mutable_data(), string.data(), sizeof(ray::ObjectID));
return object_id;
}
const std::vector<ObjectID> from_flatbuf(
const std::vector<ray::ObjectID> from_flatbuf(
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>
&vector) {
std::vector<ObjectID> object_ids;
std::vector<ray::ObjectID> object_ids;
for (int64_t i = 0; i < vector.Length(); i++) {
object_ids.push_back(from_flatbuf(*vector.Get(i)));
}
@@ -26,7 +27,7 @@ const std::vector<ObjectID> from_flatbuf(
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
ObjectID object_ids[],
ray::ObjectID object_ids[],
int64_t num_objects) {
std::vector<flatbuffers::Offset<flatbuffers::String>> results;
for (int64_t i = 0; i < num_objects; i++) {
@@ -38,7 +39,7 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
const std::vector<ObjectID> &object_ids) {
const std::vector<ray::ObjectID> &object_ids) {
std::vector<flatbuffers::Offset<flatbuffers::String>> results;
for (auto object_id : object_ids) {
results.push_back(to_flatbuf(fbb, object_id));
+5 -5
View File
@@ -16,19 +16,19 @@
/// @return The flatbuffer string contining the object ID.
flatbuffers::Offset<flatbuffers::String> to_flatbuf(
flatbuffers::FlatBufferBuilder &fbb,
ObjectID object_id);
ray::ObjectID object_id);
/// Convert a flatbuffer string to an object ID.
///
/// @param string The flatbuffer string.
/// @return The object ID.
ObjectID from_flatbuf(const flatbuffers::String &string);
ray::ObjectID from_flatbuf(const flatbuffers::String &string);
/// Convert a flatbuffer vector of strings to a vector of object IDs.
///
/// @param vector The flatbuffer vector.
/// @return The vector of object IDs.
const std::vector<ObjectID> from_flatbuf(
const std::vector<ray::ObjectID> from_flatbuf(
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>
&vector);
@@ -41,7 +41,7 @@ const std::vector<ObjectID> from_flatbuf(
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
ObjectID object_ids[],
ray::ObjectID object_ids[],
int64_t num_objects);
/// Convert a vector of object IDs to a flatbuffer vector of strings.
@@ -52,7 +52,7 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
const std::vector<ObjectID> &object_ids);
const std::vector<ray::ObjectID> &object_ids);
/// Convert a flatbuffer string to a std::string.
///
+17 -19
View File
@@ -44,7 +44,8 @@ TaskBuilder *g_task_builder = NULL;
int PyStringToUniqueID(PyObject *object, ObjectID *object_id) {
if (PyBytes_Check(object)) {
memcpy(&object_id->id[0], PyBytes_AsString(object), UNIQUE_ID_SIZE);
std::memcpy(object_id->mutable_data(), PyBytes_AsString(object),
sizeof(*object_id));
return 1;
} else {
PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
@@ -73,7 +74,7 @@ static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) {
"ObjectID: object id string needs to have length 20");
return -1;
}
memcpy(&self->object_id.id[0], data, sizeof(self->object_id.id));
std::memcpy(self->object_id.mutable_data(), data, sizeof(self->object_id));
return 0;
}
@@ -132,15 +133,14 @@ PyObject *PyTask_to_string(PyObject *self, PyObject *args) {
static PyObject *PyObjectID_id(PyObject *self) {
PyObjectID *s = (PyObjectID *) self;
return PyBytes_FromStringAndSize((char *) &s->object_id.id[0],
sizeof(s->object_id.id));
return PyBytes_FromStringAndSize((const char *) s->object_id.data(),
sizeof(s->object_id));
}
static PyObject *PyObjectID_hex(PyObject *self) {
PyObjectID *s = (PyObjectID *) self;
char hex_id[ID_STRING_SIZE];
ObjectID_to_string(s->object_id, hex_id, ID_STRING_SIZE);
PyObject *result = PyUnicode_FromString(hex_id);
std::string hex_id = s->object_id.hex();
PyObject *result = PyUnicode_FromString(hex_id.c_str());
return result;
}
@@ -160,12 +160,10 @@ static PyObject *PyObjectID_richcompare(PyObjectID *self,
result = Py_NotImplemented;
break;
case Py_EQ:
result = ObjectID_equal(self->object_id, other_id->object_id) ? Py_True
: Py_False;
result = self->object_id == other_id->object_id ? Py_True : Py_False;
break;
case Py_NE:
result = !ObjectID_equal(self->object_id, other_id->object_id) ? Py_True
: Py_False;
result = !(self->object_id == other_id->object_id) ? Py_True : Py_False;
break;
case Py_GT:
result = Py_NotImplemented;
@@ -188,9 +186,11 @@ static PyObject *PyObjectID_redis_shard_hash(PyObjectID *self) {
}
static long PyObjectID_hash(PyObjectID *self) {
PyObject *tuple = PyTuple_New(UNIQUE_ID_SIZE);
for (int i = 0; i < UNIQUE_ID_SIZE; ++i) {
PyTuple_SetItem(tuple, i, PyLong_FromLong(self->object_id.id[i]));
// TODO(pcm): Replace this with a faster hash function. This currently
// creates a tuple of length 20 and hashes it, which is slow
PyObject *tuple = PyTuple_New(kUniqueIDSize);
for (int i = 0; i < kUniqueIDSize; ++i) {
PyTuple_SetItem(tuple, i, PyLong_FromLong(self->object_id.data()[i]));
}
long hash = PyObject_Hash(tuple);
Py_XDECREF(tuple);
@@ -198,9 +198,7 @@ static long PyObjectID_hash(PyObjectID *self) {
}
static PyObject *PyObjectID_repr(PyObjectID *self) {
char hex_id[ID_STRING_SIZE];
ObjectID_to_string(self->object_id, hex_id, ID_STRING_SIZE);
std::string repr = "ObjectID(" + std::string(hex_id) + ")";
std::string repr = "ObjectID(" + self->object_id.hex() + ")";
PyObject *result = PyUnicode_FromString(repr.c_str());
return result;
}
@@ -274,9 +272,9 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
/* ID of the driver that this task originates from. */
UniqueID driver_id;
/* ID of the actor this task should run on. */
UniqueID actor_id = NIL_ACTOR_ID;
UniqueID actor_id = UniqueID::nil();
/* ID of the actor handle used to submit this task. */
UniqueID actor_handle_id = NIL_ACTOR_ID;
UniqueID actor_handle_id = UniqueID::nil();
/* How many tasks have been launched on the actor so far? */
int actor_counter = 0;
/* True if this is an actor checkpoint task and false otherwise. */
+6 -6
View File
@@ -9,7 +9,7 @@
#include "common.h"
typedef uint8_t TaskSpec;
typedef char TaskSpec;
class TaskBuilder;
extern PyObject *CommonError;
@@ -17,14 +17,14 @@ extern PyObject *CommonError;
// clang-format off
typedef struct {
PyObject_HEAD
ObjectID object_id;
ray::ObjectID object_id;
} PyObjectID;
typedef struct {
PyObject_HEAD
int64_t size;
TaskSpec *spec;
std::vector<ObjectID> *execution_dependencies;
std::vector<ray::ObjectID> *execution_dependencies;
} PyTask;
// clang-format on
@@ -41,11 +41,11 @@ void init_pickle_module(void);
extern TaskBuilder *g_task_builder;
int PyStringToUniqueID(PyObject *object, ObjectID *object_id);
int PyStringToUniqueID(PyObject *object, ray::ObjectID *object_id);
int PyObjectToUniqueID(PyObject *object, ObjectID *object_id);
int PyObjectToUniqueID(PyObject *object, ray::ObjectID *object_id);
PyObject *PyObjectID_make(ObjectID object_id);
PyObject *PyObjectID_make(ray::ObjectID object_id);
PyObject *check_simple_value(PyObject *self, PyObject *args);
+2 -2
View File
@@ -50,7 +50,7 @@ void RayLogger_log(RayLogger *logger,
if (log_level < logger->log_level) {
return;
}
if (log_level < RAY_DEBUG || log_level > RAY_FATAL) {
if (log_level < RAY_LOG_DEBUG || log_level > RAY_LOG_FATAL) {
return;
}
struct timeval tv;
@@ -79,7 +79,7 @@ void RayLogger_log(RayLogger *logger,
int status =
redisAsyncCommand(context, NULL, NULL, formatted_message,
(char *) db->client.id, sizeof(db->client.id));
(char *) db->client.data(), sizeof(db->client));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error while logging message to log table");
}
+6 -6
View File
@@ -1,12 +1,12 @@
#ifndef LOGGING_H
#define LOGGING_H
#define RAY_VERBOSE -1
#define RAY_DEBUG 0
#define RAY_INFO 1
#define RAY_WARNING 2
#define RAY_ERROR 3
#define RAY_FATAL 4
#define RAY_LOG_VERBOSE -1
#define RAY_LOG_DEBUG 0
#define RAY_LOG_INFO 1
#define RAY_LOG_WARNING 2
#define RAY_LOG_ERROR 3
#define RAY_LOG_FATAL 4
/* Entity types. */
#define RAY_FUNCTION "FUNCTION"
+1 -1
View File
@@ -12,7 +12,7 @@ void actor_notification_table_subscribe(
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, NIL_ID, __func__,
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(sub_data), retry, NULL,
redis_actor_notification_table_subscribe, NULL);
}
+4 -3
View File
@@ -24,7 +24,7 @@ void db_client_table_subscribe(
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, NIL_ID, __func__,
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(sub_data), retry,
(table_done_callback) done_callback,
redis_db_client_table_subscribe, user_context);
@@ -71,7 +71,7 @@ void db_client_table_cache_init(DBHandle *db_handle) {
}
DBClient db_client_table_cache_get(DBHandle *db_handle, DBClientID client_id) {
CHECK(!DBClientID_is_nil(client_id));
CHECK(!client_id.is_nil());
return redis_cache_get_db_client(db_handle, client_id);
}
@@ -82,7 +82,8 @@ void plasma_manager_send_heartbeat(DBHandle *db_handle) {
RayConfig::instance().heartbeat_timeout_milliseconds();
heartbeat_retry.fail_callback = NULL;
init_table_callback(db_handle, NIL_ID, __func__, new CommonCallbackData(NULL),
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(NULL),
(RetryInfo *) &heartbeat_retry, NULL,
redis_plasma_manager_send_heartbeat, NULL);
}
+1 -1
View File
@@ -9,7 +9,7 @@ void driver_table_subscribe(DBHandle *db_handle,
(DriverTableSubscribeData *) malloc(sizeof(DriverTableSubscribeData));
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, NIL_ID, __func__,
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(sub_data), retry, NULL,
redis_driver_table_subscribe, NULL);
}
+7 -6
View File
@@ -13,7 +13,7 @@ void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_index,
size_t data_length,
unsigned char *data) {
const unsigned char *data) {
CHECK(error_index >= 0 && error_index < MAX_ERROR_INDEX);
/* Allocate a struct to hold the error information. */
ErrorInfo *info = (ErrorInfo *) malloc(sizeof(ErrorInfo) + data_length);
@@ -22,10 +22,11 @@ void push_error(DBHandle *db_handle,
info->data_length = data_length;
memcpy(info->data, data, data_length);
/* Generate a random key to identify this error message. */
CHECK(sizeof(info->error_key) >= UNIQUE_ID_SIZE);
UniqueID error_key = globally_unique_id();
memcpy(info->error_key, error_key.id, sizeof(info->error_key));
CHECK(sizeof(info->error_key) >= sizeof(UniqueID));
UniqueID error_key = UniqueID::from_random();
memcpy(info->error_key, error_key.data(), sizeof(info->error_key));
init_table_callback(db_handle, NIL_ID, __func__, new CommonCallbackData(info),
NULL, NULL, redis_push_error, NULL);
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(info), NULL, NULL,
redis_push_error, NULL);
}
+1 -1
View File
@@ -48,6 +48,6 @@ void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_index,
size_t data_length,
unsigned char *data);
const unsigned char *data);
#endif
+4 -3
View File
@@ -13,7 +13,7 @@ void local_scheduler_table_subscribe(
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, NIL_ID, __func__,
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(sub_data), retry, NULL,
redis_local_scheduler_table_subscribe, NULL);
}
@@ -37,8 +37,9 @@ void local_scheduler_table_send_info(DBHandle *db_handle,
data->size = fbb.GetSize();
memcpy(&data->flatbuffer_data[0], fbb.GetBufferPointer(), fbb.GetSize());
init_table_callback(db_handle, NIL_ID, __func__, new CommonCallbackData(data),
retry, NULL, redis_local_scheduler_table_send_info, NULL);
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(data), retry, NULL,
redis_local_scheduler_table_send_info, NULL);
}
void local_scheduler_table_disconnect(DBHandle *db_handle) {
+3 -3
View File
@@ -67,7 +67,7 @@ void object_table_subscribe_to_notifications(
sub_data->subscribe_all = subscribe_all;
init_table_callback(
db_handle, NIL_OBJECT_ID, __func__, new CommonCallbackData(sub_data),
db_handle, ObjectID::nil(), __func__, new CommonCallbackData(sub_data),
retry, (table_done_callback) done_callback,
redis_object_table_subscribe_to_notifications, user_context);
}
@@ -85,7 +85,7 @@ void object_table_request_notifications(DBHandle *db_handle,
data->num_object_ids = num_object_ids;
memcpy(data->object_ids, object_ids, num_object_ids * sizeof(ObjectID));
init_table_callback(db_handle, NIL_OBJECT_ID, __func__,
init_table_callback(db_handle, ObjectID::nil(), __func__,
new CommonCallbackData(data), retry, NULL,
redis_object_table_request_notifications, NULL);
}
@@ -101,7 +101,7 @@ void object_info_subscribe(DBHandle *db_handle,
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, NIL_OBJECT_ID, __func__,
init_table_callback(db_handle, ObjectID::nil(), __func__,
new CommonCallbackData(sub_data), retry,
(table_done_callback) done_callback,
redis_object_info_subscribe, user_context);
+59 -57
View File
@@ -207,8 +207,8 @@ void db_connect_shard(const std::string &db_address,
argv[0] = "RAY.CONNECT";
argvlen[0] = strlen(argv[0]);
/* Set the client ID argument. */
argv[1] = (char *) client.id;
argvlen[1] = sizeof(client.id);
argv[1] = (char *) client.data();
argvlen[1] = sizeof(client);
/* Set the node IP address argument. */
argv[2] = node_ip_address;
argvlen[2] = strlen(node_ip_address);
@@ -265,7 +265,7 @@ DBHandle *db_connect(const std::string &db_primary_address,
}
/* Create a client ID for this client. */
DBClientID client = globally_unique_id();
DBClientID client = DBClientID::from_random();
DBHandle *db = new DBHandle();
@@ -325,7 +325,7 @@ void db_disconnect(DBHandle *db) {
* reconnect and get assigned a different client ID. */
redisReply *reply =
(redisReply *) redisCommand(db->sync_context, "RAY.DISCONNECT %b",
db->client.id, sizeof(db->client.id));
db->client.data(), sizeof(db->client));
CHECK(reply->type != REDIS_REPLY_ERROR);
CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str);
freeReplyObject(reply);
@@ -408,8 +408,8 @@ void redis_object_table_add(TableCallbackData *callback_data) {
int status = redisAsyncCommand(
context, redis_object_table_add_callback,
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_ADD %b %lld %b %b",
obj_id.id, sizeof(obj_id.id), (long long) object_size, digest,
(size_t) DIGEST_SIZE, db->client.id, sizeof(db->client.id));
obj_id.data(), sizeof(obj_id), (long long) object_size, digest,
(size_t) DIGEST_SIZE, db->client.data(), sizeof(db->client));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in redis_object_table_add");
@@ -456,7 +456,7 @@ void redis_object_table_remove(TableCallbackData *callback_data) {
int status = redisAsyncCommand(
context, redis_object_table_remove_callback,
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_REMOVE %b %b",
obj_id.id, sizeof(obj_id.id), client_id->id, sizeof(client_id->id));
obj_id.data(), sizeof(obj_id), client_id->data(), sizeof(*client_id));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in redis_object_table_remove");
@@ -473,8 +473,8 @@ void redis_object_table_lookup(TableCallbackData *callback_data) {
int status = redisAsyncCommand(context, redis_object_table_lookup_callback,
(void *) callback_data->timer_id,
"RAY.OBJECT_TABLE_LOOKUP %b", obj_id.id,
sizeof(obj_id.id));
"RAY.OBJECT_TABLE_LOOKUP %b", obj_id.data(),
sizeof(obj_id));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in object_table lookup");
}
@@ -508,10 +508,11 @@ void redis_result_table_add(TableCallbackData *callback_data) {
redisAsyncContext *context = get_redis_context(db, id);
/* Add the result entry to the result table. */
int status = redisAsyncCommand(
context, redis_result_table_add_callback,
(void *) callback_data->timer_id, "RAY.RESULT_TABLE_ADD %b %b %d", id.id,
sizeof(id.id), info->task_id.id, sizeof(info->task_id.id), is_put);
int status =
redisAsyncCommand(context, redis_result_table_add_callback,
(void *) callback_data->timer_id,
"RAY.RESULT_TABLE_ADD %b %b %d", id.data(), sizeof(id),
info->task_id.data(), sizeof(info->task_id), is_put);
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "Error in result table add");
}
@@ -554,7 +555,7 @@ void redis_result_table_lookup_callback(redisAsyncContext *c,
"Unexpected reply type %d in redis_result_table_lookup_callback",
reply->type);
/* Parse the task from the reply. */
TaskID result_id = NIL_TASK_ID;
TaskID result_id = TaskID::nil();
bool is_put = false;
if (reply->type == REDIS_REPLY_STRING) {
auto message = flatbuffers::GetRoot<ResultTableReply>(reply->str);
@@ -581,14 +582,14 @@ void redis_result_table_lookup(TableCallbackData *callback_data) {
int status =
redisAsyncCommand(context, redis_result_table_lookup_callback,
(void *) callback_data->timer_id,
"RAY.RESULT_TABLE_LOOKUP %b", id.id, sizeof(id.id));
"RAY.RESULT_TABLE_LOOKUP %b", id.data(), sizeof(id));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "Error in result table lookup");
}
}
DBClient redis_db_client_table_get(DBHandle *db,
unsigned char *client_id,
const unsigned char *client_id,
size_t client_id_len) {
redisReply *reply =
(redisReply *) redisCommand(db->sync_context, "HGETALL %s%b",
@@ -602,7 +603,7 @@ DBClient redis_db_client_table_get(DBHandle *db,
const char *key = reply->element[j]->str;
const char *value = reply->element[j + 1]->str;
if (strcmp(key, "ray_client_id") == 0) {
memcpy(db_client.id.id, value, sizeof(db_client.id));
memcpy(db_client.id.mutable_data(), value, sizeof(db_client.id));
num_fields++;
} else if (strcmp(key, "client_type") == 0) {
db_client.client_type = std::string(value);
@@ -637,8 +638,8 @@ void redis_cache_set_db_client(DBHandle *db, DBClient client) {
DBClient redis_cache_get_db_client(DBHandle *db, DBClientID db_client_id) {
auto it = db->db_client_cache.find(db_client_id);
if (it == db->db_client_cache.end()) {
DBClient db_client =
redis_db_client_table_get(db, db_client_id.id, sizeof(db_client_id.id));
DBClient db_client = redis_db_client_table_get(db, db_client_id.data(),
sizeof(db_client_id));
db->db_client_cache[db_client_id] = db_client;
it = db->db_client_cache.find(db_client_id);
}
@@ -672,7 +673,8 @@ void redis_object_table_lookup_callback(redisAsyncContext *c,
for (size_t j = 0; j < reply->elements; ++j) {
CHECK(reply->element[j]->type == REDIS_REPLY_STRING);
DBClientID manager_id;
memcpy(manager_id.id, reply->element[j]->str, sizeof(manager_id.id));
memcpy(manager_id.mutable_data(), reply->element[j]->str,
sizeof(manager_id));
manager_ids.push_back(manager_id);
}
@@ -743,7 +745,7 @@ void object_table_redis_subscribe_to_notifications_callback(
if (callback_data->done_callback != NULL) {
object_table_lookup_done_callback done_callback =
(object_table_lookup_done_callback) callback_data->done_callback;
done_callback(NIL_ID, false, std::vector<DBClientID>(),
done_callback(ray::UniqueID::nil(), false, std::vector<DBClientID>(),
callback_data->user_context);
}
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
@@ -783,7 +785,7 @@ void redis_object_table_subscribe_to_notifications(
db->subscribe_contexts[i],
object_table_redis_subscribe_to_notifications_callback,
(void *) callback_data->timer_id, "SUBSCRIBE %s%b",
object_channel_prefix, db->client.id, sizeof(db->client.id));
object_channel_prefix, db->client.data(), sizeof(db->client));
}
if ((status == REDIS_ERR) || db->subscribe_contexts[i]->err) {
@@ -827,11 +829,11 @@ void redis_object_table_request_notifications(
argv[0] = "RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS";
argvlen[0] = strlen(argv[0]);
/* Set the client ID argument. */
argv[1] = (char *) db->client.id;
argvlen[1] = sizeof(db->client.id);
argv[1] = (char *) db->client.data();
argvlen[1] = sizeof(db->client);
/* Set the object ID arguments. */
argv[2] = (char *) object_ids[i].id;
argvlen[2] = sizeof(object_ids[i].id);
argv[2] = (char *) object_ids[i].data();
argvlen[2] = sizeof(object_ids[i]);
int status = redisAsyncCommandArgv(
context, redis_object_table_request_notifications_callback,
@@ -881,8 +883,8 @@ void redis_task_table_get_task(TableCallbackData *callback_data) {
int status = redisAsyncCommand(context, redis_task_table_get_task_callback,
(void *) callback_data->timer_id,
"RAY.TASK_TABLE_GET %b", task_id.id,
sizeof(task_id.id));
"RAY.TASK_TABLE_GET %b", task_id.data(),
sizeof(task_id));
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in redis_task_table_get_task");
}
@@ -942,8 +944,8 @@ void redis_task_table_add_task(TableCallbackData *callback_data) {
int status = redisAsyncCommand(
context, redis_task_table_add_task_callback,
(void *) callback_data->timer_id, "RAY.TASK_TABLE_ADD %b %d %b %b %b",
task_id.id, sizeof(task_id.id), state, local_scheduler_id.id,
sizeof(local_scheduler_id.id), fbb.GetBufferPointer(),
task_id.data(), sizeof(task_id), state, local_scheduler_id.data(),
sizeof(local_scheduler_id), fbb.GetBufferPointer(),
(size_t) fbb.GetSize(), spec, execution_spec->SpecSize());
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in redis_task_table_add_task");
@@ -1004,8 +1006,8 @@ void redis_task_table_update(TableCallbackData *callback_data) {
int status = redisAsyncCommand(
context, redis_task_table_update_callback,
(void *) callback_data->timer_id, "RAY.TASK_TABLE_UPDATE %b %d %b %b",
task_id.id, sizeof(task_id.id), state, local_scheduler_id.id,
sizeof(local_scheduler_id.id), fbb.GetBufferPointer(),
task_id.data(), sizeof(task_id), state, local_scheduler_id.data(),
sizeof(local_scheduler_id), fbb.GetBufferPointer(),
(size_t) fbb.GetSize());
if ((status == REDIS_ERR) || context->err) {
LOG_REDIS_DEBUG(context, "error in redis_task_table_update");
@@ -1055,24 +1057,24 @@ void redis_task_table_test_and_update(TableCallbackData *callback_data) {
int status;
/* If the test local scheduler ID is NIL, then ignore it. */
if (IS_NIL_ID(update_data->test_local_scheduler_id)) {
if (update_data->test_local_scheduler_id.is_nil()) {
status = redisAsyncCommand(
context, redis_task_table_test_and_update_callback,
(void *) callback_data->timer_id,
"RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b", task_id.id,
sizeof(task_id.id), update_data->test_state_bitmask,
update_data->update_state, update_data->local_scheduler_id.id,
sizeof(update_data->local_scheduler_id.id));
"RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b", task_id.data(),
sizeof(task_id), update_data->test_state_bitmask,
update_data->update_state, update_data->local_scheduler_id.data(),
sizeof(update_data->local_scheduler_id));
} else {
status = redisAsyncCommand(
context, redis_task_table_test_and_update_callback,
(void *) callback_data->timer_id,
"RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b %b", task_id.id,
sizeof(task_id.id), update_data->test_state_bitmask,
update_data->update_state, update_data->local_scheduler_id.id,
sizeof(update_data->local_scheduler_id.id),
update_data->test_local_scheduler_id.id,
sizeof(update_data->test_local_scheduler_id.id));
"RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b %b", task_id.data(),
sizeof(task_id), update_data->test_state_bitmask,
update_data->update_state, update_data->local_scheduler_id.data(),
sizeof(update_data->local_scheduler_id),
update_data->test_local_scheduler_id.data(),
sizeof(update_data->test_local_scheduler_id));
}
if ((status == REDIS_ERR) || context->err) {
@@ -1152,7 +1154,7 @@ void redis_task_table_subscribe(TableCallbackData *callback_data) {
const char *TASK_CHANNEL_PREFIX = "TT:";
for (auto subscribe_context : db->subscribe_contexts) {
int status;
if (IS_NIL_ID(data->local_scheduler_id)) {
if (data->local_scheduler_id.is_nil()) {
/* TODO(swang): Implement the state_filter by translating the bitmask into
* a Redis key-matching pattern. */
status = redisAsyncCommand(
@@ -1164,8 +1166,8 @@ void redis_task_table_subscribe(TableCallbackData *callback_data) {
status = redisAsyncCommand(
subscribe_context, redis_task_table_subscribe_callback,
(void *) callback_data->timer_id, "SUBSCRIBE %s%b:%d",
TASK_CHANNEL_PREFIX, (char *) local_scheduler_id.id,
sizeof(local_scheduler_id.id), data->state_filter);
TASK_CHANNEL_PREFIX, (char *) local_scheduler_id.data(),
sizeof(local_scheduler_id), data->state_filter);
}
if ((status == REDIS_ERR) || subscribe_context->err) {
LOG_REDIS_DEBUG(subscribe_context, "error in redis_task_table_subscribe");
@@ -1201,7 +1203,7 @@ void redis_db_client_table_remove(TableCallbackData *callback_data) {
int status =
redisAsyncCommand(db->context, redis_db_client_table_remove_callback,
(void *) callback_data->timer_id, "RAY.DISCONNECT %b",
callback_data->id.id, sizeof(callback_data->id.id));
callback_data->id.data(), sizeof(callback_data->id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context, "error in db_client_table_remove");
}
@@ -1512,7 +1514,7 @@ void redis_plasma_manager_send_heartbeat(TableCallbackData *callback_data) {
* memory for callback data each time. */
int status = redisAsyncCommand(
db->context, NULL, (void *) callback_data->timer_id,
"PUBLISH plasma_managers %b", db->client.id, sizeof(db->client.id));
"PUBLISH plasma_managers %b", db->client.data(), sizeof(db->client));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context,
"error in redis_plasma_manager_send_heartbeat");
@@ -1598,7 +1600,7 @@ void redis_actor_notification_table_subscribe(
void redis_actor_table_mark_removed(DBHandle *db, ActorID actor_id) {
int status =
redisAsyncCommand(db->context, NULL, NULL, "HSET Actor:%b removed \"1\"",
actor_id.id, sizeof(actor_id.id));
actor_id.data(), sizeof(actor_id));
if ((status == REDIS_ERR) || db->subscribe_context->err) {
LOG_REDIS_DEBUG(db->context, "error in redis_actor_table_mark_removed");
}
@@ -1633,7 +1635,7 @@ void redis_object_info_subscribe_callback(redisAsyncContext *c,
ObjectInfoSubscribeData *data =
(ObjectInfoSubscribeData *) callback_data->data->Get();
ObjectID object_id;
memcpy(object_id.id, payload->str, sizeof(object_id.id));
memcpy(object_id.mutable_data(), payload->str, sizeof(object_id));
/* payload->str should have the format: "ObjectID:object_size_int" */
LOG_DEBUG("obj:info channel received message <%s>", payload->str);
if (data->subscribe_callback) {
@@ -1676,11 +1678,11 @@ void redis_push_error_hmset_callback(redisAsyncContext *c,
/* Add the error to this driver's list of errors. */
ErrorInfo *info = (ErrorInfo *) callback_data->data->Get();
int status = redisAsyncCommand(db->context, redis_push_error_rpush_callback,
(void *) callback_data->timer_id,
"RPUSH ErrorKeys Error:%b:%b",
info->driver_id.id, sizeof(info->driver_id.id),
info->error_key, sizeof(info->error_key));
int status = redisAsyncCommand(
db->context, redis_push_error_rpush_callback,
(void *) callback_data->timer_id, "RPUSH ErrorKeys Error:%b:%b",
info->driver_id.data(), sizeof(info->driver_id), info->error_key,
sizeof(info->error_key));
if ((status == REDIS_ERR) || db->subscribe_context->err) {
LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error rpush");
}
@@ -1698,8 +1700,8 @@ void redis_push_error(TableCallbackData *callback_data) {
int status = redisAsyncCommand(
db->context, redis_push_error_hmset_callback,
(void *) callback_data->timer_id,
"HMSET Error:%b:%b type %s message %s data %b", info->driver_id.id,
sizeof(info->driver_id.id), info->error_key, sizeof(info->error_key),
"HMSET Error:%b:%b type %s message %s data %b", info->driver_id.data(),
sizeof(info->driver_id), info->error_key, sizeof(info->error_key),
error_type, error_message, info->data, info->data_length);
if ((status == REDIS_ERR) || db->subscribe_context->err) {
LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error hmset");
+13 -13
View File
@@ -145,23 +145,23 @@ void free_task_builder(TaskBuilder *builder) {
}
bool TaskID_equal(TaskID first_id, TaskID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
return first_id == second_id;
}
bool TaskID_is_nil(TaskID id) {
return TaskID_equal(id, NIL_TASK_ID);
return id.is_nil();
}
bool ActorID_equal(ActorID first_id, ActorID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
return first_id == second_id;
}
bool FunctionID_equal(FunctionID first_id, FunctionID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
return first_id == second_id;
}
bool FunctionID_is_nil(FunctionID id) {
return FunctionID_equal(id, NIL_FUNCTION_ID);
return id.is_nil();
}
/* Functions for building tasks. */
@@ -181,8 +181,8 @@ void TaskSpec_start_construct(TaskBuilder *builder,
function_id, num_returns);
}
uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) {
return builder->Finish(size);
TaskSpec *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) {
return reinterpret_cast<TaskSpec *>(builder->Finish(size));
}
void TaskSpec_args_add_ref(TaskBuilder *builder,
@@ -205,7 +205,7 @@ void TaskSpec_set_required_resource(TaskBuilder *builder,
/* Functions for reading tasks. */
TaskID TaskSpec_task_id(TaskSpec *spec) {
TaskID TaskSpec_task_id(const TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return from_flatbuf(*message->task_id());
@@ -230,7 +230,7 @@ ActorID TaskSpec_actor_handle_id(TaskSpec *spec) {
}
bool TaskSpec_is_actor_task(TaskSpec *spec) {
return !ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID);
return !TaskSpec_actor_id(spec).is_nil();
}
int64_t TaskSpec_actor_counter(TaskSpec *spec) {
@@ -258,13 +258,13 @@ bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) {
}
}
UniqueID TaskSpec_driver_id(TaskSpec *spec) {
UniqueID TaskSpec_driver_id(const TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return from_flatbuf(*message->driver_id());
}
TaskID TaskSpec_parent_task_id(TaskSpec *spec) {
TaskID TaskSpec_parent_task_id(const TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return from_flatbuf(*message->parent_task_id());
@@ -451,14 +451,14 @@ bool TaskExecutionSpec::DependsOn(ObjectID object_id) {
int count = TaskSpec_arg_id_count(spec, i);
for (int j = 0; j < count; j++) {
ObjectID arg_id = TaskSpec_arg_id(spec, i, j);
if (ObjectID_equal(arg_id, object_id)) {
if (arg_id == object_id) {
return true;
}
}
}
// Iterate through the execution dependencies to see if it contains object_id.
for (auto dependency_id : execution_dependencies_) {
if (ObjectID_equal(dependency_id, object_id)) {
if (dependency_id == object_id) {
return true;
}
}
+7 -9
View File
@@ -11,7 +11,9 @@
#include "format/common_generated.h"
typedef uint8_t TaskSpec;
using namespace ray;
typedef char TaskSpec;
class TaskExecutionSpec {
public:
@@ -82,10 +84,6 @@ class TaskExecutionSpec {
class TaskBuilder;
#define NIL_TASK_ID NIL_ID
#define NIL_ACTOR_ID NIL_ID
#define NIL_FUNCTION_ID NIL_ID
typedef UniqueID FunctionID;
/** The task ID is a deterministic hash of the function ID that the task
@@ -189,7 +187,7 @@ void TaskSpec_start_construct(TaskBuilder *B,
* @param spec The task spec whose ID and return object IDs should be computed.
* @return Void.
*/
uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size);
TaskSpec *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size);
/**
* Return the function ID of the task.
@@ -256,7 +254,7 @@ bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index);
* @param spec The task_spec in question.
* @return The driver ID of the task.
*/
UniqueID TaskSpec_driver_id(TaskSpec *spec);
UniqueID TaskSpec_driver_id(const TaskSpec *spec);
/**
* Return the task ID of the parent task.
@@ -264,7 +262,7 @@ UniqueID TaskSpec_driver_id(TaskSpec *spec);
* @param spec The task_spec in question.
* @return The task ID of the parent task.
*/
TaskID TaskSpec_parent_task_id(TaskSpec *spec);
TaskID TaskSpec_parent_task_id(const TaskSpec *spec);
/**
* Return the task counter of the parent task. For example, this equals 5 if
@@ -281,7 +279,7 @@ int64_t TaskSpec_parent_counter(TaskSpec *spec);
* @param spec The task_spec in question.
* @return The task ID of the task.
*/
TaskID TaskSpec_task_id(TaskSpec *spec);
TaskID TaskSpec_task_id(const TaskSpec *spec);
/**
* Get the number of arguments to this task.
-24
View File
@@ -1,24 +0,0 @@
#include "greatest.h"
#include "common.h"
SUITE(common_tests);
TEST sha1_test(void) {
static char hex[ID_STRING_SIZE];
UniqueID uid = globally_unique_id();
ObjectID_to_string((ObjectID) uid, &hex[0], ID_STRING_SIZE);
PASS();
}
SUITE(common_tests) {
RUN_TEST(sha1_test);
}
GREATEST_MAIN_DEFS();
int main(int argc, char **argv) {
GREATEST_MAIN_BEGIN();
RUN_SUITE(common_tests);
GREATEST_MAIN_END();
}
+8 -6
View File
@@ -81,7 +81,7 @@ TEST object_table_lookup_test(void) {
manager_addr, db_connect_args2);
db_attach(db1, loop, false);
db_attach(db2, loop, false);
UniqueID id = globally_unique_id();
UniqueID id = UniqueID::from_random();
RetryInfo retry = {
.num_retries = NUM_RETRIES,
.timeout = TIMEOUT,
@@ -149,7 +149,7 @@ TEST task_table_test(void) {
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "local_scheduler",
"127.0.0.1", std::vector<std::string>());
db_attach(db, loop, false);
DBClientID local_scheduler_id = globally_unique_id();
DBClientID local_scheduler_id = DBClientID::from_random();
TaskExecutionSpec spec = example_task_execution_spec(1, 1);
task_table_test_task =
Task_alloc(spec, TASK_STATUS_SCHEDULED, local_scheduler_id);
@@ -185,12 +185,14 @@ TEST task_table_all_test(void) {
db_attach(db, loop, false);
TaskExecutionSpec spec = example_task_execution_spec(1, 1);
/* Schedule two tasks on different local local schedulers. */
Task *task1 = Task_alloc(spec, TASK_STATUS_SCHEDULED, globally_unique_id());
Task *task2 = Task_alloc(spec, TASK_STATUS_SCHEDULED, globally_unique_id());
Task *task1 =
Task_alloc(spec, TASK_STATUS_SCHEDULED, DBClientID::from_random());
Task *task2 =
Task_alloc(spec, TASK_STATUS_SCHEDULED, DBClientID::from_random());
RetryInfo retry = {
.num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED,
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_SCHEDULED,
task_table_all_test_callback, NULL, &retry, NULL, NULL);
event_loop_add_timer(loop, 50, (event_loop_timer_handler) timeout_handler,
NULL);
@@ -221,7 +223,7 @@ TEST unique_client_id_test(void) {
}
for (int i = 0; i < num_conns; ++i) {
for (int j = 0; j < i; ++j) {
ASSERT(!DBClientID_equal(ids[i], ids[j]));
ASSERT(!(ids[i] == ids[j]));
}
}
PASS();
+8 -8
View File
@@ -11,15 +11,15 @@ static inline TaskExecutionSpec example_task_execution_spec_with_args(
int64_t num_args,
int64_t num_returns,
ObjectID arg_ids[]) {
TaskID parent_task_id = globally_unique_id();
FunctionID func_id = globally_unique_id();
TaskSpec_start_construct(g_task_builder, NIL_ID, parent_task_id, 0,
NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id,
TaskID parent_task_id = TaskID::from_random();
FunctionID func_id = FunctionID::from_random();
TaskSpec_start_construct(g_task_builder, UniqueID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
num_returns);
for (int64_t i = 0; i < num_args; ++i) {
ObjectID arg_id;
if (arg_ids == NULL) {
arg_id = globally_unique_id();
arg_id = ObjectID::from_random();
} else {
arg_id = arg_ids[i];
}
@@ -46,7 +46,7 @@ static inline Task *example_task_with_args(int64_t num_args,
ObjectID arg_ids[]) {
TaskExecutionSpec spec =
example_task_execution_spec_with_args(num_args, num_returns, arg_ids);
Task *instance = Task_alloc(spec, task_state, NIL_ID);
Task *instance = Task_alloc(spec, task_state, UniqueID::nil());
return instance;
}
@@ -54,7 +54,7 @@ static inline Task *example_task(int64_t num_args,
int64_t num_returns,
int task_state) {
TaskExecutionSpec spec = example_task_execution_spec(num_args, num_returns);
Task *instance = Task_alloc(spec, task_state, NIL_ID);
Task *instance = Task_alloc(spec, task_state, UniqueID::nil());
return instance;
}
@@ -62,7 +62,7 @@ static inline bool Task_equals(Task *task1, Task *task2) {
if (task1->state != task2->state) {
return false;
}
if (!DBClientID_equal(task1->local_scheduler_id, task2->local_scheduler_id)) {
if (!(task1->local_scheduler_id == task2->local_scheduler_id)) {
return false;
}
auto execution_spec1 = Task_task_execution_spec(task1);
+26 -25
View File
@@ -38,13 +38,13 @@ void new_object_done_callback(ObjectID object_id,
bool is_put,
void *user_context) {
new_object_succeeded = 1;
CHECK(ObjectID_equal(object_id, new_object_id));
CHECK(TaskID_equal(task_id, new_object_task_id));
CHECK(object_id == new_object_id);
CHECK(task_id == new_object_task_id);
event_loop_stop(g_loop);
}
void new_object_lookup_callback(ObjectID object_id, void *user_context) {
CHECK(ObjectID_equal(object_id, new_object_id));
CHECK(object_id == new_object_id);
RetryInfo retry = {
.num_retries = 5,
.timeout = 100,
@@ -78,7 +78,7 @@ void task_table_subscribe_done(TaskID task_id, void *user_context) {
TEST new_object_test(void) {
new_object_failed = 0;
new_object_succeeded = 0;
new_object_id = globally_unique_id();
new_object_id = ObjectID::from_random();
new_object_task = example_task(1, 1, TASK_STATUS_WAITING);
new_object_task_spec = Task_task_execution_spec(new_object_task)->Spec();
new_object_task_id = TaskSpec_task_id(new_object_task_spec);
@@ -91,8 +91,8 @@ TEST new_object_test(void) {
.timeout = 100,
.fail_callback = new_object_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
task_table_subscribe_done, db);
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
&retry, task_table_subscribe_done, db);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
@@ -109,15 +109,15 @@ void new_object_no_task_callback(ObjectID object_id,
bool is_put,
void *user_context) {
new_object_succeeded = 1;
CHECK(IS_NIL_ID(task_id));
CHECK(task_id.is_nil());
event_loop_stop(g_loop);
}
TEST new_object_no_task_test(void) {
new_object_failed = 0;
new_object_succeeded = 0;
new_object_id = globally_unique_id();
new_object_task_id = globally_unique_id();
new_object_id = ObjectID::from_random();
new_object_task_id = TaskID::from_random();
g_loop = event_loop_create();
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
"127.0.0.1", std::vector<std::string>());
@@ -167,7 +167,7 @@ TEST lookup_timeout_test(void) {
RetryInfo retry = {
.num_retries = 5, .timeout = 100, .fail_callback = lookup_fail_callback,
};
object_table_lookup(db, NIL_ID, &retry, lookup_done_callback,
object_table_lookup(db, UniqueID::nil(), &retry, lookup_done_callback,
(void *) lookup_timeout_context);
/* Disconnect the database to see if the lookup times out. */
close(db->context->c.fd);
@@ -206,7 +206,7 @@ TEST add_timeout_test(void) {
RetryInfo retry = {
.num_retries = 5, .timeout = 100, .fail_callback = add_fail_callback,
};
object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry,
object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry,
add_done_callback, (void *) add_timeout_context);
/* Disconnect the database to see if the lookup times out. */
close(db->context->c.fd);
@@ -327,7 +327,7 @@ void add_lookup_callback(ObjectID object_id, bool success, void *user_context) {
.timeout = 100,
.fail_callback = lookup_retry_fail_callback,
};
object_table_lookup(db, NIL_ID, &retry, add_lookup_done_callback,
object_table_lookup(db, UniqueID::nil(), &retry, add_lookup_done_callback,
(void *) db);
}
@@ -346,7 +346,7 @@ TEST add_lookup_test(void) {
.timeout = 100,
.fail_callback = lookup_retry_fail_callback,
};
object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry,
object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry,
add_lookup_callback, (void *) db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
@@ -381,7 +381,8 @@ void add_remove_lookup_callback(ObjectID object_id,
.timeout = 100,
.fail_callback = lookup_retry_fail_callback,
};
object_table_lookup(db, NIL_ID, &retry, add_remove_lookup_done_callback,
object_table_lookup(db, UniqueID::nil(), &retry,
add_remove_lookup_done_callback,
(void *) lookup_retry_context);
}
@@ -393,8 +394,8 @@ void add_remove_callback(ObjectID object_id, bool success, void *user_context) {
.timeout = 100,
.fail_callback = lookup_retry_fail_callback,
};
object_table_remove(db, NIL_ID, NULL, &retry, add_remove_lookup_callback,
(void *) db);
object_table_remove(db, UniqueID::nil(), NULL, &retry,
add_remove_lookup_callback, (void *) db);
}
TEST add_remove_lookup_test(void) {
@@ -408,7 +409,7 @@ TEST add_remove_lookup_test(void) {
.timeout = 100,
.fail_callback = lookup_retry_fail_callback,
};
object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry,
object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry,
add_remove_callback, (void *) db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
@@ -454,7 +455,7 @@ TEST lookup_late_test(void) {
.timeout = 0,
.fail_callback = lookup_late_fail_callback,
};
object_table_lookup(db, NIL_ID, &retry, lookup_late_done_callback,
object_table_lookup(db, UniqueID::nil(), &retry, lookup_late_done_callback,
(void *) lookup_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
@@ -496,7 +497,7 @@ TEST add_late_test(void) {
RetryInfo retry = {
.num_retries = 0, .timeout = 0, .fail_callback = add_late_fail_callback,
};
object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry,
object_table_add(db, UniqueID::nil(), 0, (unsigned char *) NIL_DIGEST, &retry,
add_late_done_callback, (void *) add_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
@@ -594,7 +595,7 @@ void subscribe_success_object_available_callback(
const std::vector<DBClientID> &manager_vector,
void *user_context) {
CHECK(user_context == (void *) subscribe_success_context);
CHECK(ObjectID_equal(object_id, subscribe_id));
CHECK(object_id == subscribe_id);
CHECK(manager_vector.size() == 1);
subscribe_success_succeeded = 1;
}
@@ -609,7 +610,7 @@ TEST subscribe_success_test(void) {
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
"127.0.0.1", db_connect_args);
db_attach(db, g_loop, false);
subscribe_id = globally_unique_id();
subscribe_id = ObjectID::from_random();
RetryInfo retry = {
.num_retries = 0,
@@ -679,7 +680,7 @@ TEST subscribe_object_present_test(void) {
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
"127.0.0.1", db_connect_args);
db_attach(db, g_loop, false);
UniqueID id = globally_unique_id();
UniqueID id = UniqueID::from_random();
RetryInfo retry = {
.num_retries = 0, .timeout = 100, .fail_callback = fatal_fail_callback,
};
@@ -730,7 +731,7 @@ TEST subscribe_object_not_present_test(void) {
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
"127.0.0.1", std::vector<std::string>());
db_attach(db, g_loop, false);
UniqueID id = globally_unique_id();
UniqueID id = UniqueID::from_random();
RetryInfo retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
@@ -795,7 +796,7 @@ TEST subscribe_object_available_later_test(void) {
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
"127.0.0.1", db_connect_args);
db_attach(db, g_loop, false);
UniqueID id = globally_unique_id();
UniqueID id = UniqueID::from_random();
RetryInfo retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
@@ -850,7 +851,7 @@ TEST subscribe_object_available_subscribe_all(void) {
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
"127.0.0.1", db_connect_args);
db_attach(db, g_loop, false);
UniqueID id = globally_unique_id();
UniqueID id = UniqueID::from_random();
RetryInfo retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
+3 -3
View File
@@ -167,7 +167,7 @@ void logging_read_callback(event_loop *loop,
DBHandle *conn = (DBHandle *) context;
char *cmd = read_log_message(fd);
redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd,
(char *) conn->client.id, sizeof(conn->client.id));
(char *) conn->client.data(), sizeof(conn->client));
free(cmd);
}
@@ -200,8 +200,8 @@ TEST logging_test(void) {
int client_fd = connect_ipc_sock(socket_pathname);
ASSERT(client_fd >= 0);
connections.push_back(client_fd);
RayLogger *logger = RayLogger_init("worker", RAY_INFO, 0, &client_fd);
RayLogger_log(logger, RAY_INFO, "TEST", "Message");
RayLogger *logger = RayLogger_init("worker", RAY_LOG_INFO, 0, &client_fd);
RayLogger_log(logger, RAY_LOG_INFO, "TEST", "Message");
event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, logging_accept_callback,
conn);
-1
View File
@@ -13,7 +13,6 @@ sleep 1s
./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
./src/common/common_tests
./src/common/db_tests
./src/common/io_tests
./src/common/task_tests
+6 -7
View File
@@ -15,13 +15,12 @@ sleep 1s
./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
valgrind --leak-check=full --error-exitcode=1 ./src/common/common_tests
valgrind --leak-check=full --error-exitcode=1 ./src/common/db_tests
valgrind --leak-check=full --error-exitcode=1 ./src/common/io_tests
valgrind --leak-check=full --error-exitcode=1 ./src/common/task_tests
valgrind --leak-check=full --error-exitcode=1 ./src/common/redis_tests
valgrind --leak-check=full --error-exitcode=1 ./src/common/task_table_tests
valgrind --leak-check=full --error-exitcode=1 ./src/common/object_table_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/db_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/io_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/redis_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/task_table_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/common/object_table_tests
./src/common/thirdparty/redis/src/redis-cli shutdown
./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown
+15 -15
View File
@@ -38,7 +38,7 @@ void lookup_nil_success_callback(Task *task, void *context) {
}
TEST lookup_nil_test(void) {
lookup_nil_id = globally_unique_id();
lookup_nil_id = TaskID::from_random();
g_loop = event_loop_create();
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
"127.0.0.1", std::vector<std::string>());
@@ -116,8 +116,8 @@ TEST add_lookup_test(void) {
.fail_callback = add_lookup_fail_callback,
};
/* Wait for subscription to succeed before adding the task. */
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_success_callback, (void *) db);
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
&retry, subscribe_success_callback, (void *) db);
/* Disconnect the database to see if the lookup times out. */
event_loop_run(g_loop);
db_disconnect(db);
@@ -156,8 +156,8 @@ TEST subscribe_timeout_test(void) {
.timeout = 100,
.fail_callback = subscribe_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_done_callback,
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
&retry, subscribe_done_callback,
(void *) subscribe_timeout_context);
/* Disconnect the database to see if the subscribe times out. */
close(db->subscribe_context->c.fd);
@@ -198,8 +198,8 @@ TEST publish_timeout_test(void) {
RetryInfo retry = {
.num_retries = 5, .timeout = 100, .fail_callback = publish_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
NULL, NULL);
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
&retry, NULL, NULL);
task_table_add_task(db, task, &retry, publish_done_callback,
(void *) publish_timeout_context);
/* Disconnect the database to see if the publish times out. */
@@ -270,8 +270,8 @@ TEST subscribe_retry_test(void) {
.timeout = 100,
.fail_callback = subscribe_retry_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_retry_done_callback,
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
&retry, subscribe_retry_done_callback,
(void *) subscribe_retry_context);
/* Disconnect the database to see if the subscribe times out. */
close(db->subscribe_context->c.fd);
@@ -321,8 +321,8 @@ TEST publish_retry_test(void) {
.timeout = 100,
.fail_callback = publish_retry_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
NULL, NULL);
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
&retry, NULL, NULL);
task_table_add_task(db, task, &retry, publish_retry_done_callback,
(void *) publish_retry_context);
/* Disconnect the database to see if the publish times out. */
@@ -374,8 +374,8 @@ TEST subscribe_late_test(void) {
.timeout = 0,
.fail_callback = subscribe_late_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_late_done_callback,
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
&retry, subscribe_late_done_callback,
(void *) subscribe_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
@@ -420,8 +420,8 @@ TEST publish_late_test(void) {
.timeout = 0,
.fail_callback = publish_late_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, NULL, NULL,
NULL);
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
NULL, NULL, NULL);
task_table_add_task(db, task, &retry, publish_late_done_callback,
(void *) publish_late_context);
/* Install handler for terminating the event loop. */
+50 -42
View File
@@ -12,31 +12,32 @@
SUITE(task_tests);
TEST task_test(void) {
TaskID parent_task_id = globally_unique_id();
FunctionID func_id = globally_unique_id();
TaskID parent_task_id = TaskID::from_random();
FunctionID func_id = FunctionID::from_random();
TaskBuilder *builder = make_task_builder();
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 2);
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
2);
UniqueID arg1 = globally_unique_id();
UniqueID arg1 = UniqueID::from_random();
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "hello", 5);
UniqueID arg2 = globally_unique_id();
UniqueID arg2 = UniqueID::from_random();
TaskSpec_args_add_ref(builder, &arg2, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "world", 5);
/* Finish constructing the spec. This constructs the task ID and the
* return IDs. */
int64_t size;
uint8_t *spec = TaskSpec_finish_construct(builder, &size);
TaskSpec *spec = TaskSpec_finish_construct(builder, &size);
/* Check that the spec was constructed as expected. */
ASSERT(TaskSpec_num_args(spec) == 4);
ASSERT(TaskSpec_num_returns(spec) == 2);
ASSERT(FunctionID_equal(TaskSpec_function(spec), func_id));
ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 0, 0), arg1));
ASSERT(TaskSpec_arg_id(spec, 0, 0) == arg1);
ASSERT(memcmp(TaskSpec_arg_val(spec, 1), (uint8_t *) "hello",
TaskSpec_arg_length(spec, 1)) == 0);
ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 2, 0), arg2));
ASSERT(TaskSpec_arg_id(spec, 2, 0) == arg2);
ASSERT(memcmp(TaskSpec_arg_val(spec, 3), (uint8_t *) "world",
TaskSpec_arg_length(spec, 3)) == 0);
@@ -48,22 +49,24 @@ TEST task_test(void) {
TEST deterministic_ids_test(void) {
TaskBuilder *builder = make_task_builder();
/* Define the inputs to the task construction. */
TaskID parent_task_id = globally_unique_id();
FunctionID func_id = globally_unique_id();
UniqueID arg1 = globally_unique_id();
TaskID parent_task_id = TaskID::from_random();
FunctionID func_id = FunctionID::from_random();
UniqueID arg1 = UniqueID::from_random();
uint8_t *arg2 = (uint8_t *) "hello world";
/* Construct a first task. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
3);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size1;
TaskSpec *spec1 = TaskSpec_finish_construct(builder, &size1);
/* Construct a second identical task. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
3);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size2;
@@ -71,52 +74,57 @@ TEST deterministic_ids_test(void) {
/* Check that these tasks have the same task IDs and the same return IDs. */
ASSERT(TaskID_equal(TaskSpec_task_id(spec1), TaskSpec_task_id(spec2)));
ASSERT(ObjectID_equal(TaskSpec_return(spec1, 0), TaskSpec_return(spec2, 0)));
ASSERT(ObjectID_equal(TaskSpec_return(spec1, 1), TaskSpec_return(spec2, 1)));
ASSERT(ObjectID_equal(TaskSpec_return(spec1, 2), TaskSpec_return(spec2, 2)));
ASSERT(TaskSpec_return(spec1, 0) == TaskSpec_return(spec2, 0));
ASSERT(TaskSpec_return(spec1, 1) == TaskSpec_return(spec2, 1));
ASSERT(TaskSpec_return(spec1, 2) == TaskSpec_return(spec2, 2));
/* Check that the return IDs are all distinct. */
ASSERT(!ObjectID_equal(TaskSpec_return(spec1, 0), TaskSpec_return(spec2, 1)));
ASSERT(!ObjectID_equal(TaskSpec_return(spec1, 0), TaskSpec_return(spec2, 2)));
ASSERT(!ObjectID_equal(TaskSpec_return(spec1, 1), TaskSpec_return(spec2, 2)));
ASSERT(!(TaskSpec_return(spec1, 0) == TaskSpec_return(spec2, 1)));
ASSERT(!(TaskSpec_return(spec1, 0) == TaskSpec_return(spec2, 2)));
ASSERT(!(TaskSpec_return(spec1, 1) == TaskSpec_return(spec2, 2)));
/* Create more tasks that are only mildly different. */
/* Construct a task with a different parent task ID. */
TaskSpec_start_construct(builder, NIL_ID, globally_unique_id(), 0,
NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_start_construct(builder, DriverID::nil(), TaskID::from_random(), 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
3);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size3;
TaskSpec *spec3 = TaskSpec_finish_construct(builder, &size3);
/* Construct a task with a different parent counter. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 1, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 1,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
3);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size4;
TaskSpec *spec4 = TaskSpec_finish_construct(builder, &size4);
/* Construct a task with a different function ID. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, globally_unique_id(), 3);
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false,
FunctionID::from_random(), 3);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size5;
TaskSpec *spec5 = TaskSpec_finish_construct(builder, &size5);
/* Construct a task with a different object ID argument. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
ObjectID object_id = globally_unique_id();
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
3);
ObjectID object_id = ObjectID::from_random();
TaskSpec_args_add_ref(builder, &object_id, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size6;
TaskSpec *spec6 = TaskSpec_finish_construct(builder, &size6);
/* Construct a task with a different value argument. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
3);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "hello_world", 11);
int64_t size7;
@@ -136,9 +144,8 @@ TEST deterministic_ids_test(void) {
for (int task_index2 = 0; task_index2 < 6; ++task_index2) {
for (int return_index2 = 0; return_index2 < 3; ++return_index2) {
if (task_index1 != task_index2 && return_index1 != return_index2) {
ASSERT(!ObjectID_equal(
TaskSpec_return(specs[task_index1], return_index1),
TaskSpec_return(specs[task_index2], return_index2)));
ASSERT(!(TaskSpec_return(specs[task_index1], return_index1) ==
TaskSpec_return(specs[task_index2], return_index2)));
}
}
}
@@ -158,15 +165,16 @@ TEST deterministic_ids_test(void) {
TEST send_task(void) {
TaskBuilder *builder = make_task_builder();
TaskID parent_task_id = globally_unique_id();
FunctionID func_id = globally_unique_id();
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 2);
ObjectID object_id = globally_unique_id();
TaskID parent_task_id = TaskID::from_random();
FunctionID func_id = FunctionID::from_random();
TaskSpec_start_construct(builder, DriverID::nil(), parent_task_id, 0,
ActorID::nil(), ActorID::nil(), 0, false, func_id,
2);
ObjectID object_id = ObjectID::from_random();
TaskSpec_args_add_ref(builder, &object_id, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "Hello", 5);
TaskSpec_args_add_val(builder, (uint8_t *) "World", 5);
object_id = globally_unique_id();
object_id = ObjectID::from_random();
TaskSpec_args_add_ref(builder, &object_id, 1);
int64_t size;
TaskSpec *spec = TaskSpec_finish_construct(builder, &size);
+1 -1
View File
@@ -7,4 +7,4 @@ include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall")
add_executable(global_scheduler global_scheduler.cc global_scheduler_algorithm.cc)
target_link_libraries(global_scheduler common ${HIREDIS_LIB})
target_link_libraries(global_scheduler common ${HIREDIS_LIB} ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB})
+15 -20
View File
@@ -64,15 +64,14 @@ void assign_task_to_local_scheduler_retry(UniqueID id,
void assign_task_to_local_scheduler(GlobalSchedulerState *state,
Task *task,
DBClientID local_scheduler_id) {
char id_string[ID_STRING_SIZE];
std::string id_string = local_scheduler_id.hex();
TaskSpec *spec = Task_task_execution_spec(task)->Spec();
LOG_DEBUG("assigning task to local_scheduler_id = %s",
ObjectID_to_string(local_scheduler_id, id_string, ID_STRING_SIZE));
LOG_DEBUG("assigning task to local_scheduler_id = %s", local_scheduler_id,
id_string.c_str());
Task_set_state(task, TASK_STATUS_SCHEDULED);
Task_set_local_scheduler(task, local_scheduler_id);
LOG_DEBUG("Issuing a task table update for task = %s",
ObjectID_to_string(Task_task_id(task), id_string, ID_STRING_SIZE));
ARROW_UNUSED(id_string);
id_string = Task_task_id(task).hex();
LOG_DEBUG("Issuing a task table update for task = %s", id_string.c_str());
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
@@ -254,10 +253,8 @@ remove_local_scheduler(
*/
void process_new_db_client(DBClient *db_client, void *user_context) {
GlobalSchedulerState *state = (GlobalSchedulerState *) user_context;
char id_string[ID_STRING_SIZE];
LOG_DEBUG("db client table callback for db client = %s",
ObjectID_to_string(db_client->id, id_string, ID_STRING_SIZE));
ARROW_UNUSED(id_string);
std::string id_string = db_client->id.hex();
LOG_DEBUG("db client table callback for db client = %s", id_string.c_str());
if (strncmp(db_client->client_type.c_str(), "local_scheduler",
strlen("local_scheduler")) == 0) {
bool local_scheduler_present =
@@ -296,10 +293,9 @@ void object_table_subscribe_callback(ObjectID object_id,
void *user_context) {
/* Extract global scheduler state from the callback context. */
GlobalSchedulerState *state = (GlobalSchedulerState *) user_context;
char id_string[ID_STRING_SIZE];
std::string id_string = object_id.hex();
LOG_DEBUG("object table subscribe callback for OBJECT = %s",
ObjectID_to_string(object_id, id_string, ID_STRING_SIZE));
ARROW_UNUSED(id_string);
id_string.c_str());
const std::vector<std::string> managers =
db_client_table_get_ip_addresses(state->db, manager_ids);
@@ -315,8 +311,9 @@ void object_table_subscribe_callback(ObjectID object_id,
state->scheduler_object_info_table[object_id];
obj_info_entry.data_size = data_size;
id_string = object_id.hex();
LOG_DEBUG("New object added to object_info_table with id = %s",
ObjectID_to_string(object_id, id_string, ID_STRING_SIZE));
id_string.c_str());
LOG_DEBUG("\tmanager locations:");
for (size_t i = 0; i < managers.size(); i++) {
LOG_DEBUG("\t\t%s", managers[i]);
@@ -339,11 +336,9 @@ void local_scheduler_table_handler(DBClientID client_id,
/* Extract global scheduler state from the callback context. */
GlobalSchedulerState *state = (GlobalSchedulerState *) user_context;
ARROW_UNUSED(state);
char id_string[ID_STRING_SIZE];
LOG_DEBUG(
"Local scheduler heartbeat from db_client_id %s",
ObjectID_to_string((ObjectID) client_id, id_string, ID_STRING_SIZE));
ARROW_UNUSED(id_string);
std::string id_string = client_id.hex();
LOG_DEBUG("Local scheduler heartbeat from db_client_id %s",
id_string.c_str());
LOG_DEBUG(
"total workers = %d, task queue length = %d, available workers = %d",
info.total_num_workers, info.task_queue_length, info.available_workers);
@@ -436,7 +431,7 @@ void start_server(const char *node_ip_address,
* submits tasks to the global scheduler before the global scheduler
* successfully subscribes, then the local scheduler that submitted the tasks
* will retry. */
task_table_subscribe(g_state->db, NIL_ID, TASK_STATUS_WAITING,
task_table_subscribe(g_state->db, UniqueID::nil(), TASK_STATUS_WAITING,
process_task_waiting, (void *) g_state, NULL, NULL,
NULL);
@@ -128,7 +128,8 @@ bool handle_task_waiting(GlobalSchedulerState *state,
double best_local_scheduler_score = INT32_MIN;
CHECKM(best_local_scheduler_score < 0,
"We might have a floating point underflow");
DBClientID best_local_scheduler_id = NIL_ID; /* best node to send this task */
DBClientID best_local_scheduler_id =
DBClientID::nil(); /* best node to send this task */
for (auto it = state->local_schedulers.begin();
it != state->local_schedulers.end(); it++) {
/* For each local scheduler, calculate its score. Check hard constraints
@@ -147,15 +148,15 @@ bool handle_task_waiting(GlobalSchedulerState *state,
}
if (!task_feasible) {
char id_string[ID_STRING_SIZE];
std::string id_string = Task_task_id(task).hex();
LOG_ERROR(
"Infeasible task. No nodes satisfy hard constraints for task = %s",
ObjectID_to_string(Task_task_id(task), id_string, ID_STRING_SIZE));
id_string.c_str());
/* TODO(atumanov): propagate this error to the task's driver and/or
* cache the task in case new local schedulers satisfy it in the future. */
return false;
}
CHECKM(!IS_NIL_ID(best_local_scheduler_id),
CHECKM(!best_local_scheduler_id.is_nil(),
"Task is feasible, but doesn't have a local scheduler assigned.");
/* A local scheduler ID was found, so assign the task. */
assign_task_to_local_scheduler(state, task, best_local_scheduler_id);
+5 -4
View File
@@ -8,6 +8,7 @@ include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake)
# Include plasma
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/../thirdparty/arrow/python/cmake_modules)
find_package(Arrow)
find_package(Plasma)
include_directories(SYSTEM ${PLASMA_INCLUDE_DIR})
@@ -61,18 +62,18 @@ add_library(local_scheduler_client STATIC local_scheduler_client.cc)
add_dependencies(local_scheduler_client gen_local_scheduler_fbs)
if(APPLE)
target_link_libraries(local_scheduler_library "-undefined dynamic_lookup" local_scheduler_client common)
target_link_libraries(local_scheduler_library "-undefined dynamic_lookup" local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB})
else(APPLE)
target_link_libraries(local_scheduler_library local_scheduler_client common)
target_link_libraries(local_scheduler_library local_scheduler_client common ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB})
endif(APPLE)
add_dependencies(local_scheduler_library gen_local_scheduler_fbs)
add_executable(local_scheduler local_scheduler.cc local_scheduler_algorithm.cc)
target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a -lpthread)
target_link_libraries(local_scheduler local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread)
add_executable(local_scheduler_tests test/local_scheduler_tests.cc local_scheduler.cc local_scheduler_algorithm.cc)
target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a -lpthread)
target_link_libraries(local_scheduler_tests local_scheduler_client common ${HIREDIS_LIB} ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread)
target_compile_options(local_scheduler_tests PUBLIC "-DLOCAL_SCHEDULER_TEST")
install(TARGETS local_scheduler_library DESTINATION ${CMAKE_SOURCE_DIR}/local_scheduler)
+48 -47
View File
@@ -87,7 +87,7 @@ void kill_worker(LocalSchedulerState *state,
release_resources(state, worker, worker->resources_in_use);
/* Erase the algorithm state's reference to the worker. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
if (worker->actor_id.is_nil()) {
handle_worker_removed(state, state->algorithm_state, worker);
} else {
/* Let the scheduling algorithm process the absence of this worker. */
@@ -130,7 +130,7 @@ void kill_worker(LocalSchedulerState *state,
TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec();
TaskID task_id = TaskSpec_task_id(spec);
push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX,
sizeof(task_id), task_id.id);
sizeof(task_id), task_id.data());
}
/* Clean up the task in progress. */
@@ -229,7 +229,7 @@ void start_worker(LocalSchedulerState *state,
ActorID actor_id,
bool reconstruct) {
/* Non-actors can't be started in reconstruct mode. */
if (ActorID_equal(actor_id, NIL_ACTOR_ID)) {
if (actor_id.is_nil()) {
CHECK(!reconstruct);
}
/* We can't start a worker if we don't have the path to the worker script. */
@@ -256,10 +256,9 @@ void start_worker(LocalSchedulerState *state,
/* Pass in the worker's actor ID. */
const char *actor_id_string = "--actor-id";
char id_string[ID_STRING_SIZE];
ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE);
std::string id_string = actor_id.hex();
command_vector.push_back(actor_id_string);
command_vector.push_back((const char *) id_string);
command_vector.push_back(id_string.c_str());
/* Add a flag for reconstructing the actor if necessary. */
const char *reconstruct_string = "--reconstruct";
@@ -405,7 +404,7 @@ LocalSchedulerState *LocalSchedulerState_init(
/* Start the initial set of workers. */
for (int i = 0; i < num_workers; ++i) {
start_worker(state, NIL_ACTOR_ID, false);
start_worker(state, ActorID::nil(), false);
}
/* Initialize the time at which the previous heartbeat was sent. */
@@ -533,12 +532,12 @@ void assign_task_to_worker(LocalSchedulerState *state,
// Check that actor tasks don't have non-CPU requirements. Any necessary
// non-CPU resources (in particular, GPUs) should already have been acquired
// by the actor worker.
if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
if (!worker->actor_id.is_nil()) {
CHECK(required_resources.size() == 1);
CHECK(required_resources.count("CPU") == 1);
}
CHECK(ActorID_equal(worker->actor_id, TaskSpec_actor_id(spec)));
CHECK(worker->actor_id == TaskSpec_actor_id(spec));
/* Make sure the driver for this task is still alive. */
WorkerID driver_id = TaskSpec_driver_id(spec);
CHECK(is_driver_alive(state, driver_id));
@@ -564,8 +563,9 @@ void assign_task_to_worker(LocalSchedulerState *state,
}
}
Task *task = Task_alloc(execution_spec, TASK_STATUS_RUNNING,
state->db ? get_db_client_id(state->db) : NIL_ID);
Task *task =
Task_alloc(execution_spec, TASK_STATUS_RUNNING,
state->db ? get_db_client_id(state->db) : DBClientID::nil());
/* Record which task this worker is executing. This will be freed in
* process_message when the worker sends a GetTask message to the local
* scheduler. */
@@ -586,7 +586,7 @@ void finish_task(LocalSchedulerState *state,
/* Return dynamic resources back for the task in progress. */
CHECK(worker->resources_in_use["CPU"] ==
TaskSpec_get_required_resource(spec, "CPU"));
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
if (worker->actor_id.is_nil()) {
CHECK(worker->gpus_in_use.size() ==
TaskSpec_get_required_resource(spec, "GPU"));
release_resources(state, worker, worker->resources_in_use);
@@ -647,7 +647,7 @@ void reconstruct_task_update_callback(Task *task,
/* The test-and-set failed. The task is either: (1) not finished yet, (2)
* lost, but not yet updated, or (3) already being reconstructed. */
DBClientID current_local_scheduler_id = Task_local_scheduler(task);
if (!DBClientID_is_nil(current_local_scheduler_id)) {
if (!current_local_scheduler_id.is_nil()) {
DBClient current_local_scheduler =
db_client_table_cache_get(state->db, current_local_scheduler_id);
if (!current_local_scheduler.is_alive) {
@@ -669,7 +669,7 @@ void reconstruct_task_update_callback(Task *task,
* to ensure that reconstruction will happen. */
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
TaskSpec *spec = execution_spec->Spec();
if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) {
if (TaskSpec_actor_id(spec).is_nil()) {
handle_task_submitted(state, state->algorithm_state, *execution_spec);
} else {
handle_actor_task_submitted(state, state->algorithm_state, *execution_spec);
@@ -694,7 +694,7 @@ void reconstruct_put_task_update_callback(Task *task,
/* The test-and-set failed. The task is either: (1) not finished yet, (2)
* lost, but not yet updated, or (3) already being reconstructed. */
DBClientID current_local_scheduler_id = Task_local_scheduler(task);
if (!DBClientID_is_nil(current_local_scheduler_id)) {
if (!current_local_scheduler_id.is_nil()) {
DBClient current_local_scheduler =
db_client_table_cache_get(state->db, current_local_scheduler_id);
if (!current_local_scheduler.is_alive) {
@@ -713,7 +713,7 @@ void reconstruct_put_task_update_callback(Task *task,
FunctionID function = TaskSpec_function(spec);
push_error(state->db, TaskSpec_driver_id(spec),
PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function),
function.id);
function.data());
}
} else {
/* (1) The task is still executing and it is the driver task. We cannot
@@ -722,7 +722,8 @@ void reconstruct_put_task_update_callback(Task *task,
TaskSpec *spec = Task_task_execution_spec(task)->Spec();
FunctionID function = TaskSpec_function(spec);
push_error(state->db, TaskSpec_driver_id(spec),
PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), function.id);
PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function),
function.data());
}
} else {
/* The update to TASK_STATUS_RECONSTRUCTING succeeded, so continue with
@@ -735,7 +736,7 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id,
TaskID task_id,
bool is_put,
void *user_context) {
CHECKM(!IS_NIL_ID(task_id),
CHECKM(!task_id.is_nil(),
"No task information found for object during reconstruction");
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
@@ -752,16 +753,17 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id,
}
/* If there are no other instances of the task running, it's safe for us to
* claim responsibility for reconstruction. */
task_table_test_and_update(
state->db, task_id, NIL_ID, (TASK_STATUS_DONE | TASK_STATUS_LOST),
TASK_STATUS_RECONSTRUCTING, NULL, done_callback, state);
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
(TASK_STATUS_DONE | TASK_STATUS_LOST),
TASK_STATUS_RECONSTRUCTING, NULL, done_callback,
state);
}
void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
TaskID task_id,
bool is_put,
void *user_context) {
if (IS_NIL_ID(task_id)) {
if (task_id.is_nil()) {
/* NOTE(swang): For some reason, the result table update sometimes happens
* after this lookup returns, possibly due to concurrent clients. In most
* cases, this is okay because the initial execution is probably still
@@ -774,8 +776,8 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
/* If the task failed to finish, it's safe for us to claim responsibility for
* reconstruction. */
task_table_test_and_update(state->db, task_id, NIL_ID, TASK_STATUS_LOST,
TASK_STATUS_RECONSTRUCTING, NULL,
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
TASK_STATUS_LOST, TASK_STATUS_RECONSTRUCTING, NULL,
reconstruct_task_update_callback, state);
}
@@ -859,7 +861,7 @@ void handle_client_register(LocalSchedulerState *state,
CHECK(!worker->registered);
worker->registered = true;
worker->is_worker = message->is_worker();
CHECK(WorkerID_equal(worker->client_id, NIL_WORKER_ID));
CHECK(worker->client_id.is_nil());
worker->client_id = from_flatbuf(*message->client_id());
/* Register the worker or driver. */
@@ -868,14 +870,14 @@ void handle_client_register(LocalSchedulerState *state,
* running on the worker). */
worker->pid = message->worker_pid();
ActorID actor_id = from_flatbuf(*message->actor_id());
if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) {
if (!actor_id.is_nil()) {
/* Make sure that the local scheduler is aware that it is responsible for
* this actor. */
CHECK(state->actor_mapping.count(actor_id) == 1);
CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)));
CHECK(state->actor_mapping[actor_id].local_scheduler_id ==
get_db_client_id(state->db));
/* Update the worker struct with this actor ID. */
CHECK(ActorID_equal(worker->actor_id, NIL_ACTOR_ID));
CHECK(worker->actor_id.is_nil());
worker->actor_id = actor_id;
/* Let the scheduling algorithm process the presence of this new
* worker. */
@@ -917,7 +919,7 @@ void handle_client_register(LocalSchedulerState *state,
/* If the worker is an actor that corresponds to a driver that has been
* removed, then kill the worker. */
if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) {
if (!actor_id.is_nil()) {
WorkerID driver_id = state->actor_mapping[actor_id].driver_id;
if (state->removed_drivers.count(driver_id) == 1) {
kill_worker(state, worker, false, false);
@@ -945,17 +947,17 @@ void handle_driver_removed_callback(WorkerID driver_id, void *user_context) {
ActorID actor_id = (*it)->actor_id;
Task *task = (*it)->task_in_progress;
if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) {
if (!actor_id.is_nil()) {
/* This is an actor. */
CHECK(state->actor_mapping.count(actor_id) == 1);
if (WorkerID_equal(state->actor_mapping[actor_id].driver_id, driver_id)) {
if (state->actor_mapping[actor_id].driver_id == driver_id) {
/* This actor was created by the removed driver, so kill the actor. */
LOG_DEBUG("Killing an actor for a removed driver.");
kill_worker(state, *it, false, true);
}
} else if (task != NULL) {
TaskSpec *spec = Task_task_execution_spec(task)->Spec();
if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) {
if (TaskSpec_driver_id(spec) == driver_id) {
LOG_DEBUG("Killing a worker executing a task for a removed driver.");
kill_worker(state, *it, false, true);
}
@@ -1018,7 +1020,7 @@ void process_message(event_loop *loop,
}
/* Handle the task submission. */
if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) {
if (TaskSpec_actor_id(spec).is_nil()) {
handle_task_submitted(state, state->algorithm_state, execution_spec);
} else {
handle_actor_task_submitted(state, state->algorithm_state,
@@ -1033,8 +1035,8 @@ void process_message(event_loop *loop,
worker->disconnected = true;
/* If the disconnected worker was not an actor, start a new worker to make
* sure there are enough workers in the pool. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
start_worker(state, NIL_ACTOR_ID, false);
if (worker->actor_id.is_nil()) {
start_worker(state, ActorID::nil(), false);
}
} break;
case MessageType_EventLogMessage: {
@@ -1059,7 +1061,7 @@ void process_message(event_loop *loop,
finish_task(state, worker, actor_checkpoint_failed);
/* Let the scheduling algorithm process the fact that there is an available
* worker. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
if (worker->actor_id.is_nil()) {
handle_worker_available(state, state->algorithm_state, worker);
} else {
handle_actor_worker_available(state, state->algorithm_state, worker,
@@ -1080,7 +1082,7 @@ void process_message(event_loop *loop,
release_resources(state, worker, cpu_resources);
/* Let the scheduling algorithm process the fact that the worker is
* blocked. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
if (worker->actor_id.is_nil()) {
handle_worker_blocked(state, state->algorithm_state, worker);
} else {
handle_actor_worker_blocked(state, state->algorithm_state, worker);
@@ -1113,7 +1115,7 @@ void process_message(event_loop *loop,
acquire_resources(state, worker, cpu_resources);
/* Let the scheduling algorithm process the fact that the worker is
* unblocked. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
if (worker->actor_id.is_nil()) {
handle_worker_unblocked(state, state->algorithm_state, worker);
} else {
handle_actor_worker_unblocked(state, state->algorithm_state, worker);
@@ -1156,12 +1158,12 @@ void new_client_connection(event_loop *loop,
/* We don't know whether this is a worker or not, so just initialize is_worker
* to false. */
worker->is_worker = true;
worker->client_id = NIL_WORKER_ID;
worker->client_id = WorkerID::nil();
worker->task_in_progress = NULL;
worker->is_blocked = false;
worker->pid = 0;
worker->is_child = false;
worker->actor_id = NIL_ACTOR_ID;
worker->actor_id = ActorID::nil();
worker->local_scheduler_state = state;
state->workers.push_back(worker);
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
@@ -1203,7 +1205,7 @@ void handle_task_scheduled_callback(Task *original_task,
return;
}
if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) {
if (TaskSpec_actor_id(spec).is_nil()) {
/* This task does not involve an actor. Handle it normally. */
handle_task_scheduled(state, state->algorithm_state, *execution_spec);
} else {
@@ -1249,12 +1251,11 @@ void handle_actor_creation_callback(ActorID actor_id,
* changed but that the local scheduler has. */
auto it = state->actor_mapping.find(actor_id);
CHECK(it != state->actor_mapping.end());
CHECK(WorkerID_equal(it->second.driver_id, driver_id));
CHECK(!DBClientID_equal(it->second.local_scheduler_id, local_scheduler_id));
CHECK(it->second.driver_id == driver_id);
CHECK(!(it->second.local_scheduler_id == local_scheduler_id));
/* If the actor was previously assigned to this local scheduler, kill the
* actor. */
if (DBClientID_equal(it->second.local_scheduler_id,
get_db_client_id(state->db))) {
if (it->second.local_scheduler_id == get_db_client_id(state->db)) {
/* TODO(rkn): We should kill the actor here if it is still around. Also,
* if it hasn't registered yet, we should keep track of its PID so we can
* kill it anyway. */
@@ -1272,7 +1273,7 @@ void handle_actor_creation_callback(ActorID actor_id,
/* If this local scheduler is responsible for the actor, then start a new
* worker for the actor. */
if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) {
if (local_scheduler_id == get_db_client_id(state->db)) {
start_worker(state, actor_id, reconstruct);
}
/* Let the scheduling algorithm process the fact that a new actor has been
@@ -218,9 +218,9 @@ void create_actor(SchedulingAlgorithmState *algorithm_state,
ActorID actor_id,
LocalSchedulerClient *worker) {
LocalActorInfo entry;
entry.task_counters[NIL_ACTOR_ID] = 0;
entry.task_counters[ActorID::nil()] = 0;
entry.assigned_task_counter = -1;
entry.assigned_task_handle_id = NIL_ACTOR_ID;
entry.assigned_task_handle_id = ActorID::nil();
entry.task_queue = new std::list<TaskExecutionSpec>();
entry.worker = worker;
entry.worker_available = false;
@@ -229,10 +229,8 @@ void create_actor(SchedulingAlgorithmState *algorithm_state,
algorithm_state->local_actor_infos[actor_id] = entry;
/* Log some useful information about the actor that we created. */
char id_string[ID_STRING_SIZE];
LOG_DEBUG("Creating actor with ID %s.",
ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE));
ARROW_UNUSED(id_string);
std::string id_string = actor_id.hex();
LOG_DEBUG("Creating actor with ID %s.", id_string.c_str());
}
void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) {
@@ -241,14 +239,12 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) {
algorithm_state->local_actor_infos.find(actor_id)->second;
/* Log some useful information about the actor that we're removing. */
char id_string[ID_STRING_SIZE];
std::string id_string = actor_id.hex();
size_t count = entry.task_queue->size();
if (count > 0) {
LOG_WARN("Removing actor with ID %s and %lld remaining tasks.",
ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE),
(long long) count);
id_string.c_str(), (long long) count);
}
ARROW_UNUSED(id_string);
entry.task_queue->clear();
delete entry.task_queue;
@@ -271,7 +267,7 @@ bool dispatch_actor_task(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id) {
/* Make sure this worker actually is an actor. */
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
CHECK(!actor_id.is_nil());
/* Return if this actor doesn't have any pending tasks. */
if (algorithm_state->actors_with_pending_tasks.find(actor_id) ==
algorithm_state->actors_with_pending_tasks.end()) {
@@ -283,8 +279,8 @@ bool dispatch_actor_task(LocalSchedulerState *state,
* scheduler. This should be rare. */
return false;
}
CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)))
CHECK(state->actor_mapping[actor_id].local_scheduler_id ==
get_db_client_id(state->db));
/* Get the local actor entry for this actor. */
CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
@@ -370,7 +366,7 @@ void finish_killed_task(LocalSchedulerState *state,
// TODO(ekl): this writes an invalid arrow object, which is sufficient to
// signal that the worker failed, but it would be nice to return more
// detailed failure metadata in the future.
Status status =
arrow::Status status =
state->plasma_conn->Create(object_id.to_plasma_id(), 1, NULL, 0, &data);
if (!status.IsPlasmaObjectExists()) {
ARROW_CHECK_OK(status);
@@ -444,8 +440,7 @@ void insert_actor_task_queue(LocalSchedulerState *state,
for (; it != entry.task_queue->end(); it++) {
TaskSpec *pending_task_spec = it->Spec();
/* Skip tasks submitted by a different handle. */
if (!ActorID_equal(task_handle_id,
TaskSpec_actor_handle_id(pending_task_spec))) {
if (!(task_handle_id == TaskSpec_actor_handle_id(pending_task_spec))) {
continue;
}
/* A duplicate task submitted by the same handle. */
@@ -485,7 +480,7 @@ void queue_actor_task(LocalSchedulerState *state,
bool from_global_scheduler) {
TaskSpec *spec = execution_spec.Spec();
ActorID actor_id = TaskSpec_actor_id(spec);
DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
DCHECK(!actor_id.is_nil());
/* Update the task table. */
if (state->db != NULL) {
@@ -768,7 +763,7 @@ void dispatch_tasks(LocalSchedulerState *state,
if (state->child_pids.size() == 0) {
/* If there are no workers, including those pending PID registration,
* then we must start a new one to replenish the worker pool. */
start_worker(state, NIL_ACTOR_ID, false);
start_worker(state, ActorID::nil(), false);
}
return;
}
@@ -986,7 +981,7 @@ void give_task_to_local_scheduler(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
TaskExecutionSpec &execution_spec,
DBClientID local_scheduler_id) {
if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) {
if (local_scheduler_id == get_db_client_id(state->db)) {
LOG_WARN("Local scheduler is trying to assign a task to itself.");
}
CHECK(state->db != NULL);
@@ -1034,7 +1029,8 @@ void give_task_to_global_scheduler(LocalSchedulerState *state,
}
/* Pass on the task to the global scheduler. */
DCHECK(state->config.global_scheduler_exists);
Task *task = Task_alloc(execution_spec, TASK_STATUS_WAITING, NIL_ID);
Task *task =
Task_alloc(execution_spec, TASK_STATUS_WAITING, DBClientID::nil());
DCHECK(state->db != NULL);
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
@@ -1102,8 +1098,8 @@ void handle_actor_task_submitted(LocalSchedulerState *state,
return;
}
if (DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db))) {
if (state->actor_mapping[actor_id].local_scheduler_id ==
get_db_client_id(state->db)) {
/* This local scheduler is responsible for the actor, so handle the task
* locally. */
queue_task_locally(state, algorithm_state, execution_spec, false);
@@ -1167,8 +1163,8 @@ void handle_actor_task_scheduled(LocalSchedulerState *state,
DCHECK(TaskSpec_is_actor_task(spec));
ActorID actor_id = TaskSpec_actor_id(spec);
if (state->actor_mapping.count(actor_id) == 1) {
DCHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)));
DCHECK(state->actor_mapping[actor_id].local_scheduler_id ==
get_db_client_id(state->db));
} else {
/* This means that an actor has been assigned to this local scheduler, and a
* task for that actor has been received by this local scheduler, but this
@@ -1212,7 +1208,7 @@ void handle_worker_removed(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker) {
/* Make sure this is not an actor. */
CHECK(ActorID_equal(worker->actor_id, NIL_ACTOR_ID));
CHECK(worker->actor_id.is_nil());
/* Make sure that we remove the worker at most once. */
int num_times_removed = 0;
@@ -1281,7 +1277,7 @@ void handle_actor_worker_available(LocalSchedulerState *state,
LocalSchedulerClient *worker,
bool actor_checkpoint_failed) {
ActorID actor_id = worker->actor_id;
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
CHECK(!actor_id.is_nil());
/* Get the actor info for this worker. */
CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1);
LocalActorInfo &entry =
@@ -1302,7 +1298,7 @@ void handle_actor_worker_available(LocalSchedulerState *state,
}
}
entry.assigned_task_counter = -1;
entry.assigned_task_handle_id = NIL_ACTOR_ID;
entry.assigned_task_handle_id = ActorID::nil();
entry.worker_available = true;
/* Assign new tasks if possible. */
dispatch_all_tasks(state, algorithm_state);
@@ -1455,7 +1451,7 @@ void handle_object_removed(LocalSchedulerState *state,
int count = it->DependencyIdCount(i);
for (int j = 0; j < count; ++j) {
ObjectID dependency_id = it->DependencyId(i, j);
if (ObjectID_equal(dependency_id, removed_object_id)) {
if (dependency_id == removed_object_id) {
/* Do not request a transfer from other plasma managers if this is an
* execution dependency. */
bool request_transfer = it->IsStaticDependency(i);
@@ -1483,7 +1479,7 @@ void handle_driver_removed(LocalSchedulerState *state,
/* If the dependent task was a task for the removed driver, remove it from
* this vector. */
TaskSpec *spec = (*task_it_it)->Spec();
if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) {
if (TaskSpec_driver_id(spec) == driver_id) {
task_it_it = it->second.dependent_tasks.erase(task_it_it);
} else {
task_it_it++;
@@ -1502,7 +1498,7 @@ void handle_driver_removed(LocalSchedulerState *state,
auto it = algorithm_state->waiting_task_queue->begin();
while (it != algorithm_state->waiting_task_queue->end()) {
TaskSpec *spec = it->Spec();
if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) {
if (TaskSpec_driver_id(spec) == driver_id) {
it = algorithm_state->waiting_task_queue->erase(it);
} else {
it++;
@@ -1513,7 +1509,7 @@ void handle_driver_removed(LocalSchedulerState *state,
it = algorithm_state->dispatch_task_queue->begin();
while (it != algorithm_state->dispatch_task_queue->end()) {
TaskSpec *spec = it->Spec();
if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) {
if (TaskSpec_driver_id(spec) == driver_id) {
it = algorithm_state->dispatch_task_queue->erase(it);
} else {
it++;
@@ -49,7 +49,7 @@ LocalSchedulerConnection *LocalSchedulerConnection_init(
result->gpu_ids.push_back(reply_message->gpu_ids()->Get(i));
}
/* If the worker is not an actor, there should not be any GPU IDs here. */
if (ActorID_equal(result->actor_id, NIL_ACTOR_ID)) {
if (ActorID_equal(result->actor_id, ActorID::nil())) {
CHECK(reply_message->gpu_ids()->size() == 0);
}
@@ -127,7 +127,7 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
/* Set the GPU IDs for this task. We only do this for non-actor tasks because
* for actors the GPUs are associated with the actor itself and not with the
* actor methods. */
if (ActorID_equal(conn->actor_id, NIL_ACTOR_ID)) {
if (ActorID_equal(conn->actor_id, ActorID::nil())) {
conn->gpu_ids.clear();
for (size_t i = 0; i < reply_message->gpu_ids()->size(); ++i) {
conn->gpu_ids.push_back(reply_message->gpu_ids()->Get(i));
@@ -126,7 +126,7 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers,
for (int i = 0; i < num_mock_workers; ++i) {
mock->conns[i] =
LocalSchedulerConnection_init(local_scheduler_socket_name.c_str(),
NIL_WORKER_ID, NIL_ACTOR_ID, true, 0);
WorkerID::nil(), ActorID::nil(), true, 0);
}
background_thread.join();
@@ -194,11 +194,11 @@ TEST object_reconstruction_test(void) {
ASSERT(db_shards_addresses.size() == 1);
context = redisConnect(db_shards_addresses[0].c_str(), db_shards_ports[0]);
redisReply *reply = (redisReply *) redisCommand(
context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.id,
sizeof(return_id.id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id);
context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.data(),
sizeof(return_id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id);
freeReplyObject(reply);
reply = (redisReply *) redisCommand(context, "RAY.OBJECT_TABLE_REMOVE %b %s",
return_id.id, sizeof(return_id.id),
return_id.data(), sizeof(return_id),
client_id);
freeReplyObject(reply);
redisFree(context);
@@ -293,12 +293,12 @@ TEST object_reconstruction_recursive_test(void) {
for (int i = 0; i < NUM_TASKS; ++i) {
ObjectID return_id = TaskSpec_return(specs[i].Spec(), 0);
redisReply *reply = (redisReply *) redisCommand(
context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.id,
sizeof(return_id.id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id);
context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.data(),
sizeof(return_id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id);
freeReplyObject(reply);
reply = (redisReply *) redisCommand(
context, "RAY.OBJECT_TABLE_REMOVE %b %s", return_id.id,
sizeof(return_id.id), client_id);
context, "RAY.OBJECT_TABLE_REMOVE %b %s", return_id.data(),
sizeof(return_id), client_id);
freeReplyObject(reply);
}
redisFree(context);
@@ -634,7 +634,7 @@ TEST start_kill_workers_test(void) {
static_cast<size_t>(num_workers - 1));
/* Start a worker after the local scheduler has been initialized. */
start_worker(local_scheduler->local_scheduler_state, NIL_ACTOR_ID, false);
start_worker(local_scheduler->local_scheduler_state, ActorID::nil(), false);
/* Accept the workers as clients to the plasma manager. */
int new_worker_fd = accept_client(local_scheduler->plasma_manager_fd);
/* The new worker should register its process ID. */
+1 -1
View File
@@ -17,7 +17,7 @@ sleep 1s
./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 100000000 &
sleep 0.5s
valgrind --leak-check=full --show-leak-kinds=all --error-exitcode=1 ./src/local_scheduler/local_scheduler_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/local_scheduler/local_scheduler_tests
./src/common/thirdparty/redis/src/redis-cli shutdown
./src/common/thirdparty/redis/src/redis-cli -p 6380 shutdown
killall plasma_store
+3 -9
View File
@@ -5,12 +5,6 @@ project(plasma)
# Recursively include common
include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake)
# Include plasma
list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR}/../thirdparty/arrow/python/cmake_modules)
find_package(Plasma)
include_directories(SYSTEM ${PLASMA_INCLUDE_DIR})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -O3")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -O3 -Werror -Wall")
@@ -28,7 +22,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
add_executable(plasma_manager
plasma_manager.cc)
target_link_libraries(plasma_manager common ${PLASMA_STATIC_LIB} ${ARROW_DIR}/cpp/build/release/libarrow.a -lpthread)
target_link_libraries(plasma_manager common ${PLASMA_STATIC_LIB} ray_static ${ARROW_STATIC_LIB} -lpthread)
define_test(client_tests ${PLASMA_STATIC_LIB})
define_test(manager_tests ${PLASMA_STATIC_LIB} plasma_manager.cc)
define_test(client_tests "")
define_test(manager_tests "" plasma_manager.cc)
+3 -2
View File
@@ -1256,14 +1256,15 @@ void log_object_hash_mismatch_error_task_callback(Task *task,
/* Push the error to the Python driver that caused the nondeterministic task
* to be submitted. */
push_error(state->db, TaskSpec_driver_id(spec),
OBJECT_HASH_MISMATCH_ERROR_INDEX, sizeof(function), function.id);
OBJECT_HASH_MISMATCH_ERROR_INDEX, sizeof(function),
function.data());
}
void log_object_hash_mismatch_error_result_callback(ObjectID object_id,
TaskID task_id,
bool is_put,
void *user_context) {
CHECK(!IS_NIL_ID(task_id));
CHECK(!task_id.is_nil());
PlasmaManagerState *state = (PlasmaManagerState *) user_context;
/* Get the specification for the nondeterministic task. */
task_table_get_task(state->db, task_id, NULL,
+5 -5
View File
@@ -59,7 +59,7 @@ void PlasmaManagerState_free(PlasmaManagerState *state);
* manager.
*/
void process_transfer(event_loop *loop,
ObjectID object_id,
ray::ObjectID object_id,
uint8_t addr[4],
int port,
ClientConnection *conn);
@@ -81,7 +81,7 @@ void process_transfer(event_loop *loop,
*/
void process_data(event_loop *loop,
int client_sock,
ObjectID object_id,
ray::ObjectID object_id,
int64_t data_size,
int64_t metadata_size,
ClientConnection *conn);
@@ -157,7 +157,7 @@ ClientConnection *ClientConnection_listen(event_loop *loop,
/* Buffer for requests between plasma managers. */
typedef struct PlasmaRequestBuffer {
int type;
ObjectID object_id;
ray::ObjectID object_id;
uint8_t *data;
int64_t data_size;
uint8_t *metadata;
@@ -175,7 +175,7 @@ typedef struct PlasmaRequestBuffer {
* @param context The plasma manager state.
* @return Void.
*/
void call_request_transfer(ObjectID object_id,
void call_request_transfer(ray::ObjectID object_id,
const std::vector<std::string> &manager_vector,
void *context);
@@ -272,6 +272,6 @@ int get_client_sock(ClientConnection *conn);
* @return A bool that is true if the requested object is local and false
* otherwise.
*/
bool is_object_local(PlasmaManagerState *state, ObjectID object_id);
bool is_object_local(PlasmaManagerState *state, ray::ObjectID object_id);
#endif /* PLASMA_MANAGER_H */
+4 -4
View File
@@ -135,7 +135,7 @@ TEST request_transfer_test(void) {
int port;
ARROW_CHECK_OK(plasma::ReadDataRequest(
request_data.data(), request_data.size(), &object_id2, &address, &port));
ASSERT(ObjectID_equal(object_id, object_id2));
ASSERT(object_id == object_id2);
free(address);
/* Clean up. */
destroy_plasma_mock(remote_mock);
@@ -188,7 +188,7 @@ TEST request_transfer_retry_test(void) {
ARROW_CHECK_OK(plasma::ReadDataRequest(
request_data.data(), request_data.size(), &object_id2, &address, &port));
free(address);
ASSERT(ObjectID_equal(object_id, object_id2));
ASSERT(object_id == object_id2);
/* Clean up. */
destroy_plasma_mock(remote_mock2);
destroy_plasma_mock(remote_mock1);
@@ -254,9 +254,9 @@ TEST object_notifications_test(void) {
int flags = fcntl(fd[1], F_GETFL, 0);
CHECK(fcntl(fd[1], F_SETFL, flags | O_NONBLOCK) == 0);
ObjectID object_id = globally_unique_id();
ObjectID object_id = ObjectID::from_random();
ObjectInfoT info;
info.object_id = std::string((char *) &object_id.id[0], sizeof(object_id));
info.object_id = object_id.binary();
info.data_size = 10;
info.metadata_size = 1;
info.create_time = 0;
+1 -1
View File
@@ -7,5 +7,5 @@ set -e
./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 0 &
sleep 1
valgrind --leak-check=full --error-exitcode=1 ./src/plasma/manager_tests
valgrind --track-origins=yes --leak-check=full --show-leak-kinds=all --leak-check-heuristics=stdstring --error-exitcode=1 ./src/plasma/manager_tests
killall plasma_store
+2 -4
View File
@@ -46,7 +46,5 @@ install(
ADD_RAY_LIB(ray
SOURCES ${RAY_SRCS} ${AE_SRCS} ${HIREDIS_SRCS}
DEPENDENCIES gen_gcs_fbs
SHARED_LINK_LIBS
STATIC_LINK_LIBS)
set(RAY_TEST_LINK_LIBS ray_static gtest gtest_main)
SHARED_LINK_LIBS ""
STATIC_LINK_LIBS "")
View File
+1 -1
View File
@@ -19,7 +19,7 @@ add_custom_command(
add_custom_target(gen_gcs_fbs DEPENDS ${GCS_FBS_OUTPUT_FILES})
ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static gtest gtest_main pthread)
ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread)
install(FILES
client.h
+1 -1
View File
@@ -12,7 +12,7 @@ AsyncGcsClient::~AsyncGcsClient() {}
Status AsyncGcsClient::Connect(const std::string &address, int port) {
context_.reset(new RedisContext());
RETURN_NOT_OK(context_->Connect(address, port));
RAY_RETURN_NOT_OK(context_->Connect(address, port));
object_table_.reset(new ObjectTable(context_));
task_table_.reset(new TaskTable(context_));
return Status::OK();
+1 -1
View File
@@ -16,7 +16,7 @@ namespace gcs {
class RedisContext;
class AsyncGcsClient {
class RAY_EXPORT AsyncGcsClient {
public:
AsyncGcsClient();
~AsyncGcsClient();
+3 -3
View File
@@ -57,13 +57,13 @@ TEST_F(TestGcs, TestObjectTable) {
void TaskAdded(gcs::AsyncGcsClient *client,
const TaskID &id,
std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, 3);
ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED);
}
void TaskLookup(gcs::AsyncGcsClient *client,
const TaskID &id,
std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, 3);
ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED);
aeStop(loop);
}
@@ -71,7 +71,7 @@ TEST_F(TestGcs, TestTaskTable) {
loop = aeCreateEventLoop(1024);
RAY_CHECK_OK(client_.context()->AttachToEventLoop(loop));
auto data = std::make_shared<TaskTableDataT>();
data->scheduling_state = 3;
data->scheduling_state = SchedulingState_SCHEDULED;
TaskID task_id = TaskID::from_random();
RAY_CHECK_OK(client_.task_table().Add(job_id_, task_id, data, &TaskAdded));
RAY_CHECK_OK(
+12 -1
View File
@@ -18,8 +18,19 @@ table ObjectTableData {
managers: [string];
}
enum SchedulingState:int {
NONE = 0,
WAITING = 1,
SCHEDULED = 2,
QUEUED = 4,
RUNNING = 8,
DONE = 16,
LOST = 32,
RECONSTRUCTING = 64
}
table TaskTableData {
scheduling_state: int;
scheduling_state: SchedulingState;
scheduler_id: string;
execution_arg_ids: [string];
task_info: string;
+5 -5
View File
@@ -51,9 +51,9 @@ class Table {
const std::string &data) { (d->callback)(d->client, d->id, d->data); });
flatbuffers::FlatBufferBuilder fbb;
fbb.Finish(Data::Pack(fbb, data.get()));
RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_ADD", id,
fbb.GetBufferPointer(), fbb.GetSize(),
callback_index));
RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_ADD", id,
fbb.GetBufferPointer(), fbb.GetSize(),
callback_index));
return Status::OK();
}
@@ -72,8 +72,8 @@ class Table {
(d->callback)(d->client, d->id, result);
});
std::vector<uint8_t> nil;
RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(),
nil.size(), callback_index));
RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(),
nil.size(), callback_index));
return Status::OK();
}
+19 -6
View File
@@ -4,6 +4,10 @@
namespace ray {
UniqueID::UniqueID(const plasma::UniqueID &from) {
std::memcpy(&id_, from.data(), kUniqueIDSize);
}
UniqueID UniqueID::from_random() {
UniqueID id;
uint8_t *data = id.mutable_data();
@@ -22,16 +26,19 @@ UniqueID UniqueID::from_binary(const std::string &binary) {
const UniqueID UniqueID::nil() {
UniqueID result;
std::fill_n(result.id_, kUniqueIDSize, 255);
uint8_t *data = result.mutable_data();
std::fill_n(data, kUniqueIDSize, 255);
return result;
}
bool is_nil(const UniqueID &rhs) {
int i = kUniqueIDSize;
const uint8_t *data = rhs.data();
while (--i > 0 && data[i] == 255) {
bool UniqueID::is_nil() const {
const uint8_t *d = data();
for (int i = 0; i < kUniqueIDSize; ++i) {
if (d[i] != 255) {
return false;
}
}
return i != 0;
return true;
}
const uint8_t *UniqueID::data() const {
@@ -61,6 +68,12 @@ std::string UniqueID::hex() const {
return result;
}
plasma::UniqueID UniqueID::to_plasma_id() {
plasma::UniqueID result;
std::memcpy(result.mutable_data(), &id_, kUniqueIDSize);
return result;
}
bool UniqueID::operator==(const UniqueID &rhs) const {
return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
}
+8 -2
View File
@@ -6,6 +6,7 @@
#include <cstring>
#include <string>
#include "plasma/common.h"
#include "ray/constants.h"
#include "ray/util/visibility.h"
@@ -13,22 +14,26 @@ namespace ray {
class RAY_EXPORT UniqueID {
public:
UniqueID() {}
UniqueID(const plasma::UniqueID &from);
static UniqueID from_random();
static UniqueID from_binary(const std::string &binary);
static const UniqueID nil();
bool is_nil(const UniqueID &rhs) const;
bool is_nil() const;
bool operator==(const UniqueID &rhs) const;
const uint8_t *data() const;
uint8_t *mutable_data();
size_t size() const;
std::string binary() const;
std::string hex() const;
plasma::UniqueID to_plasma_id();
private:
uint8_t id_[kUniqueIDSize];
};
static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
static_assert(std::is_standard_layout<UniqueID>::value,
"UniqueID must be standard");
struct UniqueIDHasher {
// ID hashing function.
@@ -47,6 +52,7 @@ typedef UniqueID ClassID;
typedef UniqueID ActorID;
typedef UniqueID ActorHandleID;
typedef UniqueID WorkerID;
typedef UniqueID DriverID;
typedef UniqueID DBClientID;
typedef UniqueID ConfigID;
-17
View File
@@ -45,23 +45,6 @@
namespace ray {
#define RETURN_NOT_OK(s) \
do { \
Status _s = (s); \
if (RAY_PREDICT_FALSE(!_s.ok())) { \
return _s; \
} \
} while (0)
#define RETURN_NOT_OK_ELSE(s, else_) \
do { \
Status _s = (s); \
if (!_s.ok()) { \
else_; \
return _s; \
} \
} while (0)
enum class StatusCode : char {
OK = 0,
OutOfMemory = 1,
-45
View File
@@ -34,51 +34,6 @@ namespace ray {
<< __FILE__ << __LINE__ \
<< " Check failed: " #condition " "
#ifdef NDEBUG
#define RAY_DFATAL RAY_WARNING
#define DCHECK(condition) \
RAY_IGNORE_EXPR(condition) \
while (false) \
::ray::internal::NullLog()
#define DCHECK_EQ(val1, val2) \
RAY_IGNORE_EXPR(val1) \
while (false) \
::ray::internal::NullLog()
#define DCHECK_NE(val1, val2) \
RAY_IGNORE_EXPR(val1) \
while (false) \
::ray::internal::NullLog()
#define DCHECK_LE(val1, val2) \
RAY_IGNORE_EXPR(val1) \
while (false) \
::ray::internal::NullLog()
#define DCHECK_LT(val1, val2) \
RAY_IGNORE_EXPR(val1) \
while (false) \
::ray::internal::NullLog()
#define DCHECK_GE(val1, val2) \
RAY_IGNORE_EXPR(val1) \
while (false) \
::ray::internal::NullLog()
#define DCHECK_GT(val1, val2) \
RAY_IGNORE_EXPR(val1) \
while (false) \
::ray::internal::NullLog()
#else
#define RAY_DFATAL RAY_FATAL
#define DCHECK(condition) RAY_CHECK(condition)
#define DCHECK_EQ(val1, val2) RAY_CHECK((val1) == (val2))
#define DCHECK_NE(val1, val2) RAY_CHECK((val1) != (val2))
#define DCHECK_LE(val1, val2) RAY_CHECK((val1) <= (val2))
#define DCHECK_LT(val1, val2) RAY_CHECK((val1) < (val2))
#define DCHECK_GE(val1, val2) RAY_CHECK((val1) >= (val2))
#define DCHECK_GT(val1, val2) RAY_CHECK((val1) > (val2))
#endif // NDEBUG
namespace internal {
class NullLog {
+1 -1
View File
@@ -13,4 +13,4 @@ fi
cd $TP_DIR/arrow
git fetch origin master
git checkout 9895181610c0a5ef1ba836300ea2036e1a3e5621
git checkout 65f5add61829bd413aa8175ad2644cdd362d0c50