mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 15:06:28 +08:00
Merge pull request #137 from amplab/memoryfix
Preparation to deallocate objects properly from object stores
This commit is contained in:
@@ -1,7 +1,34 @@
|
||||
import importlib
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
|
||||
# The following definitions are required because Python doesn't allow custom
|
||||
# attributes for primitive types. We need custom attributes for (a) implementing
|
||||
# destructors that close the shared memory segment that the object resides in
|
||||
# and (b) fixing https://github.com/amplab/ray/issues/72.
|
||||
|
||||
class Int(int):
|
||||
pass
|
||||
|
||||
class Float(float):
|
||||
pass
|
||||
|
||||
class List(list):
|
||||
pass
|
||||
|
||||
class Dict(dict):
|
||||
pass
|
||||
|
||||
class Tuple(tuple):
|
||||
pass
|
||||
|
||||
class Str(str):
|
||||
pass
|
||||
|
||||
class NDArray(np.ndarray):
|
||||
pass
|
||||
|
||||
def to_primitive(obj):
|
||||
if hasattr(obj, "serialize"):
|
||||
primitive_obj = ((type(obj).__module__, type(obj).__name__), obj.serialize())
|
||||
|
||||
@@ -12,6 +12,15 @@ import ray
|
||||
from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
|
||||
import serialization
|
||||
|
||||
class RayDealloc(object):
|
||||
def __init__(self, handle, segmentid):
|
||||
self.handle = handle
|
||||
self.segmentid = segmentid
|
||||
|
||||
def __del__(self):
|
||||
# TODO(pcm): This will be used to free the segment
|
||||
pass
|
||||
|
||||
class Worker(object):
|
||||
"""The methods in this class are considered unexposed to the user. The functions outside of this class are considered exposed."""
|
||||
|
||||
@@ -39,10 +48,33 @@ class Worker(object):
|
||||
WARNING: get_object can only be called on a canonical objref.
|
||||
"""
|
||||
if ray.lib.is_arrow(self.handle, objref):
|
||||
return ray.lib.get_arrow(self.handle, objref)
|
||||
result, segmentid = ray.lib.get_arrow(self.handle, objref)
|
||||
else:
|
||||
object_capsule = ray.lib.get_object(self.handle, objref)
|
||||
return serialization.deserialize(self.handle, object_capsule)
|
||||
object_capsule, segmentid = ray.lib.get_object(self.handle, objref)
|
||||
result = serialization.deserialize(self.handle, object_capsule)
|
||||
if isinstance(result, int):
|
||||
result = serialization.Int(result)
|
||||
elif isinstance(result, float):
|
||||
result = serialization.Float(result)
|
||||
elif isinstance(result, bool):
|
||||
return result # can't subclass bool, and don't need to because there is a global True/False
|
||||
# TODO(pcm): close the associated memory segment; if we don't, this leaks memory (but very little, so it is ok for now)
|
||||
elif isinstance(result, list):
|
||||
result = serialization.List(result)
|
||||
elif isinstance(result, dict):
|
||||
result = serialization.Dict(result)
|
||||
elif isinstance(result, tuple):
|
||||
result = serialization.Tuple(result)
|
||||
elif isinstance(result, str):
|
||||
result = serialization.Str(result)
|
||||
elif isinstance(result, np.ndarray):
|
||||
result = result.view(serialization.NDArray)
|
||||
elif result == None:
|
||||
return None # can't subclass None and don't need to because there is a global None
|
||||
# TODO(pcm): close the associated memory segment; if we don't, this leaks memory (but very little, so it is ok for now)
|
||||
# TODO(pcm): Here, we can add the object reference to fix https://github.com/amplab/ray/issues/72
|
||||
result.ray_deallocator = RayDealloc(self.handle, segmentid)
|
||||
return result
|
||||
|
||||
def alias_objrefs(self, alias_objref, target_objref):
|
||||
"""Make `alias_objref` refer to the same object that `target_objref` refers to."""
|
||||
|
||||
Reference in New Issue
Block a user