mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 10:19:18 +08:00
Remove ObjectID aliasing from the API. (#406)
* Remove ObjectID aliasing from the API. * Update documentation to remove aliasing.
This commit is contained in:
committed by
Philipp Moritz
parent
efb61ca9c7
commit
81f40774a7
@@ -1,95 +0,0 @@
|
||||
# Aliasing
|
||||
|
||||
An important feature of Ray is that a remote call sent to the scheduler
|
||||
immediately returns object ids for the outputs of the task, and the actual
|
||||
outputs of the task are only associated with the relevant object ids
|
||||
after the task has been executed and the outputs have been computed. This allows
|
||||
the worker to continue without blocking.
|
||||
|
||||
However, to provide a more flexible API, we allow tasks to not only return
|
||||
values, but to also return object ids to values. As an examples, consider
|
||||
the following code.
|
||||
```python
|
||||
@ray.remote
|
||||
def f()
|
||||
return np.zeros(5)
|
||||
|
||||
@ray.remote
|
||||
def g()
|
||||
return f()
|
||||
|
||||
@ray.remote
|
||||
def h()
|
||||
return g()
|
||||
```
|
||||
A call to `h` will immediate return an object id `ref_h` for the return
|
||||
value of `h`. The task of executing `h` (call it `task_h`) will then be
|
||||
scheduled for execution. When `task_h` is executed, it will call `g`, which will
|
||||
immediately return an object id `ref_g` for the output of `g`. Then two
|
||||
things will happen and can happen in any order: `AliasObjectIDs(ref_h, ref_g)`
|
||||
will be called and `task_g` will be scheduled and executed. When `task_g` is
|
||||
executed, it will call `f`, and immediately obtain an object id `ref_f`
|
||||
for the output of `f`. Then two things will happen and can happen in either
|
||||
order, `AliasObjectIDs(ref_g, ref_f)` will be called, and `f` will be executed.
|
||||
When `f` is executed, it will create an actual array and `put_object` will be
|
||||
called, which will store the array in the object store (it will also call
|
||||
`SchedulerService::AddCanonicalObjectID(ref_f)`).
|
||||
|
||||
From the scheduler's perspective, there are three important calls,
|
||||
`AliasObjectIDs(ref_h, ref_g)`, `AliasObjectIDs(ref_g, ref_f)`, and
|
||||
`AddCanonicalObjectID(ref_f)`. These three calls can happen in any order.
|
||||
|
||||
The scheduler maintains a data structure called `target_objectids_`, which keeps
|
||||
track of which object ids have been aliased together (`target_objectids_`
|
||||
is a vector, but we can think of it as a graph). The call
|
||||
`AliasObjectIDs(ref_h, ref_g)` updates `target_objectids_` with `ref_h -> ref_g`.
|
||||
The call `AliasObjectIDs(ref_g, ref_f)` updates it with `ref_g -> ref_f`, and the
|
||||
call `AddCanonicalObjectID(ref_f)` updates it with `ref_f -> ref_f`. The data
|
||||
structure is initialized with `ref -> UNINITIALIZED_ALIAS` for each object
|
||||
id `ref`.
|
||||
|
||||
We refer to `ref_f` as a "canonical" object id. And in a pair such as
|
||||
`ref_h -> ref_g`, we refer to `ref_h` as the "alias" object id and to
|
||||
`ref_g` as the "target" object id. These details are available to the
|
||||
scheduler, but a worker process just has an object id and doesn't know if
|
||||
it is canonical or not.
|
||||
|
||||
We also maintain a data structure `reverse_target_objectids_`, which maps in the
|
||||
reverse direction (in the above example, we would have `ref_g -> ref_h`,
|
||||
`ref_f -> ref_g`, and `ref_h -> UNINITIALIZED_ALIAS`). This data structure is
|
||||
not particuarly important for the task of aliasing, but when we do id
|
||||
counting and attempt to deallocate an object, we need to be able to determine
|
||||
all of the object ids that refer to the same object, and this data
|
||||
structure comes in handy for that purpose.
|
||||
|
||||
## Gets and Remote Calls
|
||||
|
||||
When a worker calls `ray.get(ref)`, it first sends a message to the scheduler
|
||||
asking the scheduler to ship the object referred to by `ref` to the worker's
|
||||
local object store. Then the worker asks its local object store for the object
|
||||
referred to by `ref`. If `ref` is a canonical object id, then that's all
|
||||
there is too it. However, if `ref` is not a canonical object id but
|
||||
rather is an alias for the canonical object id `c_ref`, then the
|
||||
scheduler also notifies the worker's local object store that `ref` is an
|
||||
alias for `c_ref`. This is important because the object store does not keep
|
||||
track of aliasing on its own (it only knows the bits about aliasing that the
|
||||
scheduler tells it). Lastly, if the scheduler does not yet have enough
|
||||
information to determine if `ref` is canonical, or if the scheduler cannot
|
||||
yet determine what the canonical object id for `ref` is, then the
|
||||
scheduler will wait until it has the relevant information.
|
||||
|
||||
Similar things happen when a worker performs a remote call. If an object
|
||||
id is passed to a remote call, the object referred to by that object
|
||||
id will be shipped to the local object store of the worker that executes
|
||||
the task. The scheduler will notify that object store about any aliasing that it
|
||||
needs to be aware of.
|
||||
|
||||
## Passing Object ids by Value
|
||||
Currently, the graph of aliasing looks like a collection of chains, as in the
|
||||
above example with `ref_h -> ref_g -> ref_f -> ref_f`. In the future, we will
|
||||
allow object ids to be passed by value to remote calls (so the worker
|
||||
has access to the object id object and not the object that the object
|
||||
id refers to). If an object id that is passed by value is then
|
||||
returned by the task, it is possible that a given object id could be
|
||||
the target of multiple alias object ids. In this case, the graph of
|
||||
aliasing will be a tree.
|
||||
@@ -50,29 +50,6 @@ these internal object IDs in the `contained_objectids_` data structure).
|
||||
3. To handle the third case, we increment in the `serialize_task` method and
|
||||
decrement in the `deserialize_task` method.
|
||||
|
||||
## How to Handle Aliasing
|
||||
Reference counting interacts with aliasing. Since multiple object IDs
|
||||
may refer to the same object, we cannot deallocate that object until all of the
|
||||
object IDs that refer to it have reference counts of 0. We keep track of
|
||||
the number of separate aliases separately. If two object IDs refer to the
|
||||
same object, the scheduler keeps track the number of occurrences of each of
|
||||
those object IDs separately. This simplifies the scheduler's job because
|
||||
it may not always know if two object IDs refer to the same object or not
|
||||
(since it assigns them before hearing back about what they refer to).
|
||||
|
||||
When we decrement the count for an object ID, if the count reaches 0,
|
||||
we compute all object IDs that the scheduler knows to reference the same
|
||||
object. If these object IDs all have count 0, then we deallocate the
|
||||
object. Otherwise, we do not deallocate the object.
|
||||
|
||||
You may ask, what if there is some object ID with a nonzero count which
|
||||
refers to the same object, but the scheduler does not know it? This cannot
|
||||
happen because the following invariant holds. If `a` and `b` are object
|
||||
references that will be aliased together (through a call to
|
||||
`AliasObjectIDs(a, b)`), then either the call has already happened, or both `a`
|
||||
and `b` have positive reference counts (they must have positive reference counts
|
||||
because they must be passed into `AliasObjectIDs` at some point).
|
||||
|
||||
## Complications
|
||||
The following problem has not yet been resolved. In the following code, the
|
||||
result `x` will be garbage.
|
||||
|
||||
@@ -72,7 +72,7 @@ def tsqr(a):
|
||||
q_block_current = ra.dot.remote(q_block_current, ra.subarray.remote(q_tree[ith_index, j], lower, upper))
|
||||
q_result.objectids[i] = q_block_current
|
||||
r = current_rs[0]
|
||||
return q_result, r
|
||||
return q_result, ray.get(r)
|
||||
|
||||
# TODO(rkn): This is unoptimized, we really want a block version of this.
|
||||
@ray.remote(num_return_vals=3)
|
||||
@@ -103,7 +103,7 @@ def modified_lu(q):
|
||||
for i in range(b):
|
||||
L[i, i] = 1
|
||||
U = np.triu(q_work)[:b, :]
|
||||
return numpy_to_dist.remote(ray.put(L)), U, S # TODO(rkn): get rid of put
|
||||
return ray.get(numpy_to_dist.remote(ray.put(L))), U, S # TODO(rkn): get rid of put
|
||||
|
||||
@ray.remote(num_return_vals=2)
|
||||
def tsqr_hr_helper1(u, s, y_top_block, b):
|
||||
@@ -125,7 +125,7 @@ def tsqr_hr(a):
|
||||
y_blocked = ray.get(y)
|
||||
t, y_top = tsqr_hr_helper1.remote(u, s, y_blocked.objectids[0, 0], a.shape[1])
|
||||
r = tsqr_hr_helper2.remote(s, r_temp)
|
||||
return y, t, y_top, r
|
||||
return ray.get(y), ray.get(t), ray.get(y_top), ray.get(r)
|
||||
|
||||
@ray.remote
|
||||
def qr_helper1(a_rc, y_ri, t, W_c):
|
||||
@@ -183,4 +183,4 @@ def qr(a):
|
||||
y_col_block = subblocks.remote(y_res, [], [i])
|
||||
q = subtract.remote(q, dot.remote(y_col_block, dot.remote(Ts[i], dot.remote(transpose.remote(y_col_block), q))))
|
||||
|
||||
return q, r_res
|
||||
return ray.get(q), r_res
|
||||
|
||||
@@ -1249,9 +1249,6 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker):
|
||||
"""
|
||||
for i in range(len(objectids)):
|
||||
if isinstance(outputs[i], raylib.ObjectID):
|
||||
# An ObjectID is being returned, so we must alias objectids[i] so that it refers to the same object that outputs[i] refers to
|
||||
_logger().info("Aliasing objectids {} and {}".format(objectids[i].id, outputs[i].id))
|
||||
worker.alias_objectids(objectids[i], outputs[i])
|
||||
pass
|
||||
else:
|
||||
worker.put_object(objectids[i], outputs[i])
|
||||
raise Exception("This remote function returned an ObjectID as its {}th return value. This is not allowed.".format(i))
|
||||
for i in range(len(objectids)):
|
||||
worker.put_object(objectids[i], outputs[i])
|
||||
|
||||
@@ -23,7 +23,6 @@ class TaskStatusTest(unittest.TestCase):
|
||||
reload(test_functions)
|
||||
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
|
||||
|
||||
test_functions.test_alias_f.remote()
|
||||
test_functions.throw_exception_fct1.remote()
|
||||
test_functions.throw_exception_fct1.remote()
|
||||
for _ in range(100): # Retry if we need to wait longer.
|
||||
|
||||
+6
-34
@@ -182,19 +182,6 @@ class WorkerTest(unittest.TestCase):
|
||||
|
||||
class APITest(unittest.TestCase):
|
||||
|
||||
def testObjectIDAliasing(self):
|
||||
reload(test_functions)
|
||||
ray.init(start_ray_local=True, num_workers=3)
|
||||
|
||||
ref = test_functions.test_alias_f.remote()
|
||||
assert_equal(ray.get(ref), np.ones([3, 4, 5]))
|
||||
ref = test_functions.test_alias_g.remote()
|
||||
assert_equal(ray.get(ref), np.ones([3, 4, 5]))
|
||||
ref = test_functions.test_alias_h.remote()
|
||||
assert_equal(ray.get(ref), np.ones([3, 4, 5]))
|
||||
|
||||
ray.worker.cleanup()
|
||||
|
||||
def testKeywordArgs(self):
|
||||
reload(test_functions)
|
||||
ray.init(start_ray_local=True, num_workers=1)
|
||||
@@ -259,7 +246,7 @@ class APITest(unittest.TestCase):
|
||||
ray.worker.cleanup()
|
||||
|
||||
def testDefiningRemoteFunctions(self):
|
||||
ray.init(start_ray_local=True, num_workers=2)
|
||||
ray.init(start_ray_local=True, num_workers=3)
|
||||
|
||||
# Test that we can define a remote function in the shell.
|
||||
@ray.remote
|
||||
@@ -296,7 +283,7 @@ class APITest(unittest.TestCase):
|
||||
return x + 1
|
||||
@ray.remote
|
||||
def l(x):
|
||||
return k.remote(x)
|
||||
return ray.get(k.remote(x))
|
||||
@ray.remote
|
||||
def m(x):
|
||||
return ray.get(l.remote(x))
|
||||
@@ -403,24 +390,6 @@ class ReferenceCountingTest(unittest.TestCase):
|
||||
reload(module)
|
||||
ray.init(start_ray_local=True, num_workers=1)
|
||||
|
||||
x = test_functions.test_alias_f.remote()
|
||||
ray.get(x)
|
||||
time.sleep(0.1)
|
||||
objectid_val = x.id
|
||||
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1)
|
||||
|
||||
del x
|
||||
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], -1) # -1 indicates deallocated
|
||||
|
||||
y = test_functions.test_alias_h.remote()
|
||||
ray.get(y)
|
||||
time.sleep(0.1)
|
||||
objectid_val = y.id
|
||||
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 0, 0])
|
||||
|
||||
del y
|
||||
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1])
|
||||
|
||||
z = da.zeros.remote([da.BLOCK_SIZE, 2 * da.BLOCK_SIZE])
|
||||
time.sleep(0.1)
|
||||
objectid_val = z.id
|
||||
@@ -493,7 +462,10 @@ class PythonModeTest(unittest.TestCase):
|
||||
reload(test_functions)
|
||||
ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE)
|
||||
|
||||
xref = test_functions.test_alias_h.remote()
|
||||
@ray.remote
|
||||
def f():
|
||||
return np.ones([3, 4, 5])
|
||||
xref = f.remote()
|
||||
assert_equal(xref, np.ones([3, 4, 5])) # remote functions should return by value
|
||||
assert_equal(xref, ray.get(xref)) # ray.get should be the identity
|
||||
y = np.random.normal(size=[11, 12])
|
||||
|
||||
@@ -8,20 +8,6 @@ import numpy as np
|
||||
def handle_int(a, b):
|
||||
return a + 1, b + 1
|
||||
|
||||
# Test aliasing
|
||||
|
||||
@ray.remote
|
||||
def test_alias_f():
|
||||
return np.ones([3, 4, 5])
|
||||
|
||||
@ray.remote
|
||||
def test_alias_g():
|
||||
return test_alias_f.remote()
|
||||
|
||||
@ray.remote
|
||||
def test_alias_h():
|
||||
return test_alias_g.remote()
|
||||
|
||||
# Test timing
|
||||
|
||||
@ray.remote
|
||||
|
||||
Reference in New Issue
Block a user