Remove CoreWorkerObjectInterface (#6023)

This commit is contained in:
Edward Oakes
2019-10-28 12:48:41 -05:00
committed by Eric Liang
parent e08b5d0cae
commit c1418b04df
22 changed files with 568 additions and 921 deletions
+13 -12
View File
@@ -510,7 +510,8 @@ cdef execute_task(
if not execution_info:
function_descriptor = FunctionDescriptor.from_bytes_list(
ray_function.GetFunctionDescriptor())
execution_info = manager.get_execution_info(job_id, function_descriptor)
execution_info = manager.get_execution_info(
job_id, function_descriptor)
execution_infos[descriptor] = execution_info
function_name = execution_info.function_name
@@ -696,7 +697,7 @@ cdef class CoreWorker:
c_vector[CObjectID] c_object_ids = ObjectIDsToVector(object_ids)
with nogil:
check_status(self.core_worker.get().Objects().Get(
check_status(self.core_worker.get().Get(
c_object_ids, timeout_ms, &results))
return RayObjectsToDataMetadataPairs(results)
@@ -707,7 +708,7 @@ cdef class CoreWorker:
CObjectID c_object_id = object_id.native()
with nogil:
check_status(self.core_worker.get().Objects().Contains(
check_status(self.core_worker.get().Contains(
c_object_id, &has_object))
return has_object
@@ -721,12 +722,12 @@ cdef class CoreWorker:
try:
if object_id is None:
with nogil:
check_status(self.core_worker.get().Objects().Create(
check_status(self.core_worker.get().Create(
metadata, data_size, c_object_id, data))
else:
c_object_id[0] = object_id.native()
with nogil:
check_status(self.core_worker.get().Objects().Create(
check_status(self.core_worker.get().Create(
metadata, data_size, c_object_id[0], data))
break
except ObjectStoreFullError as e:
@@ -763,7 +764,7 @@ cdef class CoreWorker:
with nogil:
check_status(
self.core_worker.get().Objects().Seal(c_object_id))
self.core_worker.get().Seal(c_object_id))
return ObjectID(c_object_id.Binary())
@@ -788,7 +789,7 @@ cdef class CoreWorker:
with nogil:
check_status(
self.core_worker.get().Objects().Seal(c_object_id))
self.core_worker.get().Seal(c_object_id))
return ObjectID(c_object_id.Binary())
@@ -811,7 +812,7 @@ cdef class CoreWorker:
writer.write_to(inband, data, memcopy_threads)
with nogil:
check_status(
self.core_worker.get().Objects().Seal(c_object_id))
self.core_worker.get().Seal(c_object_id))
return ObjectID(c_object_id.Binary())
@@ -825,7 +826,7 @@ cdef class CoreWorker:
wait_ids = ObjectIDsToVector(object_ids)
with nogil:
check_status(self.core_worker.get().Objects().Wait(
check_status(self.core_worker.get().Wait(
wait_ids, num_returns, timeout_ms, &results))
assert len(results) == len(object_ids)
@@ -845,7 +846,7 @@ cdef class CoreWorker:
c_vector[CObjectID] free_ids = ObjectIDsToVector(object_ids)
with nogil:
check_status(self.core_worker.get().Objects().Delete(
check_status(self.core_worker.get().Delete(
free_ids, local_only, delete_creating_tasks))
def set_object_store_client_options(self, client_name,
@@ -853,7 +854,7 @@ cdef class CoreWorker:
try:
logger.debug("Setting plasma memory limit to {} for {}".format(
limit_bytes, client_name))
check_status(self.core_worker.get().Objects().SetClientOptions(
check_status(self.core_worker.get().SetClientOptions(
client_name.encode("ascii"), limit_bytes))
except RayError as e:
self.dump_object_store_memory_usage()
@@ -866,7 +867,7 @@ cdef class CoreWorker:
limit_bytes, client_name, e))
def dump_object_store_memory_usage(self):
message = self.core_worker.get().Objects().MemoryUsageString()
message = self.core_worker.get().MemoryUsageString()
logger.warning("Local object store memory usage:\n{}\n".format(
message.decode("utf-8")))
+19 -22
View File
@@ -48,27 +48,6 @@ cdef extern from "ray/core_worker/profiling.h" nogil:
cdef cppclass CProfileEvent "ray::worker::ProfileEvent":
void SetExtraData(const c_string &extra_data)
cdef extern from "ray/core_worker/object_interface.h" nogil:
cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface":
CRayStatus SetClientOptions(c_string client_name, int64_t limit)
CRayStatus Put(const CRayObject &object, CObjectID *object_id)
CRayStatus Put(const CRayObject &object, const CObjectID &object_id)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size, CObjectID *object_id,
shared_ptr[CBuffer] *data)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size, const CObjectID &object_id,
shared_ptr[CBuffer] *data)
CRayStatus Seal(const CObjectID &object_id)
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results)
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object)
CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects,
int64_t timeout_ms, c_vector[c_bool] *results)
CRayStatus Delete(const c_vector[CObjectID] &object_ids,
c_bool local_only, c_bool delete_creating_tasks)
c_string MemoryUsageString()
cdef extern from "ray/core_worker/core_worker.h" nogil:
cdef cppclass CCoreWorker "ray::CoreWorker":
CCoreWorker(const CWorkerType worker_type, const CLanguage language,
@@ -88,7 +67,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
void Disconnect()
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
CObjectInterface &Objects()
void StartExecutingTasks()
@@ -119,3 +97,22 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
*bytes)
void AddActiveObjectID(const CObjectID &object_id)
void RemoveActiveObjectID(const CObjectID &object_id)
CRayStatus SetClientOptions(c_string client_name, int64_t limit)
CRayStatus Put(const CRayObject &object, CObjectID *object_id)
CRayStatus Put(const CRayObject &object, const CObjectID &object_id)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size, CObjectID *object_id,
shared_ptr[CBuffer] *data)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size, const CObjectID &object_id,
shared_ptr[CBuffer] *data)
CRayStatus Seal(const CObjectID &object_id)
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results)
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object)
CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects,
int64_t timeout_ms, c_vector[c_bool] *results)
CRayStatus Delete(const c_vector[CObjectID] &object_ids,
c_bool local_only, c_bool delete_creating_tasks)
c_string MemoryUsageString()