diff --git a/doc/aliasing.md b/doc/aliasing.md deleted file mode 100644 index d2763d02b..000000000 --- a/doc/aliasing.md +++ /dev/null @@ -1,95 +0,0 @@ -# Aliasing - -An important feature of Ray is that a remote call sent to the scheduler -immediately returns object ids for the outputs of the task, and the actual -outputs of the task are only associated with the relevant object ids -after the task has been executed and the outputs have been computed. This allows -the worker to continue without blocking. - -However, to provide a more flexible API, we allow tasks to not only return -values, but to also return object ids to values. As an examples, consider -the following code. -```python -@ray.remote -def f() - return np.zeros(5) - -@ray.remote -def g() - return f() - -@ray.remote -def h() - return g() -``` -A call to `h` will immediate return an object id `ref_h` for the return -value of `h`. The task of executing `h` (call it `task_h`) will then be -scheduled for execution. When `task_h` is executed, it will call `g`, which will -immediately return an object id `ref_g` for the output of `g`. Then two -things will happen and can happen in any order: `AliasObjectIDs(ref_h, ref_g)` -will be called and `task_g` will be scheduled and executed. When `task_g` is -executed, it will call `f`, and immediately obtain an object id `ref_f` -for the output of `f`. Then two things will happen and can happen in either -order, `AliasObjectIDs(ref_g, ref_f)` will be called, and `f` will be executed. -When `f` is executed, it will create an actual array and `put_object` will be -called, which will store the array in the object store (it will also call -`SchedulerService::AddCanonicalObjectID(ref_f)`). - -From the scheduler's perspective, there are three important calls, -`AliasObjectIDs(ref_h, ref_g)`, `AliasObjectIDs(ref_g, ref_f)`, and -`AddCanonicalObjectID(ref_f)`. These three calls can happen in any order. - -The scheduler maintains a data structure called `target_objectids_`, which keeps -track of which object ids have been aliased together (`target_objectids_` -is a vector, but we can think of it as a graph). The call -`AliasObjectIDs(ref_h, ref_g)` updates `target_objectids_` with `ref_h -> ref_g`. -The call `AliasObjectIDs(ref_g, ref_f)` updates it with `ref_g -> ref_f`, and the -call `AddCanonicalObjectID(ref_f)` updates it with `ref_f -> ref_f`. The data -structure is initialized with `ref -> UNINITIALIZED_ALIAS` for each object -id `ref`. - -We refer to `ref_f` as a "canonical" object id. And in a pair such as -`ref_h -> ref_g`, we refer to `ref_h` as the "alias" object id and to -`ref_g` as the "target" object id. These details are available to the -scheduler, but a worker process just has an object id and doesn't know if -it is canonical or not. - -We also maintain a data structure `reverse_target_objectids_`, which maps in the -reverse direction (in the above example, we would have `ref_g -> ref_h`, -`ref_f -> ref_g`, and `ref_h -> UNINITIALIZED_ALIAS`). This data structure is -not particuarly important for the task of aliasing, but when we do id -counting and attempt to deallocate an object, we need to be able to determine -all of the object ids that refer to the same object, and this data -structure comes in handy for that purpose. - -## Gets and Remote Calls - -When a worker calls `ray.get(ref)`, it first sends a message to the scheduler -asking the scheduler to ship the object referred to by `ref` to the worker's -local object store. Then the worker asks its local object store for the object -referred to by `ref`. If `ref` is a canonical object id, then that's all -there is too it. However, if `ref` is not a canonical object id but -rather is an alias for the canonical object id `c_ref`, then the -scheduler also notifies the worker's local object store that `ref` is an -alias for `c_ref`. This is important because the object store does not keep -track of aliasing on its own (it only knows the bits about aliasing that the -scheduler tells it). Lastly, if the scheduler does not yet have enough -information to determine if `ref` is canonical, or if the scheduler cannot -yet determine what the canonical object id for `ref` is, then the -scheduler will wait until it has the relevant information. - -Similar things happen when a worker performs a remote call. If an object -id is passed to a remote call, the object referred to by that object -id will be shipped to the local object store of the worker that executes -the task. The scheduler will notify that object store about any aliasing that it -needs to be aware of. - -## Passing Object ids by Value -Currently, the graph of aliasing looks like a collection of chains, as in the -above example with `ref_h -> ref_g -> ref_f -> ref_f`. In the future, we will -allow object ids to be passed by value to remote calls (so the worker -has access to the object id object and not the object that the object -id refers to). If an object id that is passed by value is then -returned by the task, it is possible that a given object id could be -the target of multiple alias object ids. In this case, the graph of -aliasing will be a tree. diff --git a/doc/reference-counting.md b/doc/reference-counting.md index 11819dc05..3731f4554 100644 --- a/doc/reference-counting.md +++ b/doc/reference-counting.md @@ -50,29 +50,6 @@ these internal object IDs in the `contained_objectids_` data structure). 3. To handle the third case, we increment in the `serialize_task` method and decrement in the `deserialize_task` method. -## How to Handle Aliasing -Reference counting interacts with aliasing. Since multiple object IDs -may refer to the same object, we cannot deallocate that object until all of the -object IDs that refer to it have reference counts of 0. We keep track of -the number of separate aliases separately. If two object IDs refer to the -same object, the scheduler keeps track the number of occurrences of each of -those object IDs separately. This simplifies the scheduler's job because -it may not always know if two object IDs refer to the same object or not -(since it assigns them before hearing back about what they refer to). - -When we decrement the count for an object ID, if the count reaches 0, -we compute all object IDs that the scheduler knows to reference the same -object. If these object IDs all have count 0, then we deallocate the -object. Otherwise, we do not deallocate the object. - -You may ask, what if there is some object ID with a nonzero count which -refers to the same object, but the scheduler does not know it? This cannot -happen because the following invariant holds. If `a` and `b` are object -references that will be aliased together (through a call to -`AliasObjectIDs(a, b)`), then either the call has already happened, or both `a` -and `b` have positive reference counts (they must have positive reference counts -because they must be passed into `AliasObjectIDs` at some point). - ## Complications The following problem has not yet been resolved. In the following code, the result `x` will be garbage. diff --git a/lib/python/ray/array/distributed/linalg.py b/lib/python/ray/array/distributed/linalg.py index 270441999..d569cefe4 100644 --- a/lib/python/ray/array/distributed/linalg.py +++ b/lib/python/ray/array/distributed/linalg.py @@ -72,7 +72,7 @@ def tsqr(a): q_block_current = ra.dot.remote(q_block_current, ra.subarray.remote(q_tree[ith_index, j], lower, upper)) q_result.objectids[i] = q_block_current r = current_rs[0] - return q_result, r + return q_result, ray.get(r) # TODO(rkn): This is unoptimized, we really want a block version of this. @ray.remote(num_return_vals=3) @@ -103,7 +103,7 @@ def modified_lu(q): for i in range(b): L[i, i] = 1 U = np.triu(q_work)[:b, :] - return numpy_to_dist.remote(ray.put(L)), U, S # TODO(rkn): get rid of put + return ray.get(numpy_to_dist.remote(ray.put(L))), U, S # TODO(rkn): get rid of put @ray.remote(num_return_vals=2) def tsqr_hr_helper1(u, s, y_top_block, b): @@ -125,7 +125,7 @@ def tsqr_hr(a): y_blocked = ray.get(y) t, y_top = tsqr_hr_helper1.remote(u, s, y_blocked.objectids[0, 0], a.shape[1]) r = tsqr_hr_helper2.remote(s, r_temp) - return y, t, y_top, r + return ray.get(y), ray.get(t), ray.get(y_top), ray.get(r) @ray.remote def qr_helper1(a_rc, y_ri, t, W_c): @@ -183,4 +183,4 @@ def qr(a): y_col_block = subblocks.remote(y_res, [], [i]) q = subtract.remote(q, dot.remote(y_col_block, dot.remote(Ts[i], dot.remote(transpose.remote(y_col_block), q)))) - return q, r_res + return ray.get(q), r_res diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 7037bcb6f..ead96319b 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -1249,9 +1249,6 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker): """ for i in range(len(objectids)): if isinstance(outputs[i], raylib.ObjectID): - # An ObjectID is being returned, so we must alias objectids[i] so that it refers to the same object that outputs[i] refers to - _logger().info("Aliasing objectids {} and {}".format(objectids[i].id, outputs[i].id)) - worker.alias_objectids(objectids[i], outputs[i]) - pass - else: - worker.put_object(objectids[i], outputs[i]) + raise Exception("This remote function returned an ObjectID as its {}th return value. This is not allowed.".format(i)) + for i in range(len(objectids)): + worker.put_object(objectids[i], outputs[i]) diff --git a/test/failure_test.py b/test/failure_test.py index 87c9d33a3..48017b322 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -23,7 +23,6 @@ class TaskStatusTest(unittest.TestCase): reload(test_functions) ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE) - test_functions.test_alias_f.remote() test_functions.throw_exception_fct1.remote() test_functions.throw_exception_fct1.remote() for _ in range(100): # Retry if we need to wait longer. diff --git a/test/runtest.py b/test/runtest.py index 839b0d0c0..7819e6d77 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -182,19 +182,6 @@ class WorkerTest(unittest.TestCase): class APITest(unittest.TestCase): - def testObjectIDAliasing(self): - reload(test_functions) - ray.init(start_ray_local=True, num_workers=3) - - ref = test_functions.test_alias_f.remote() - assert_equal(ray.get(ref), np.ones([3, 4, 5])) - ref = test_functions.test_alias_g.remote() - assert_equal(ray.get(ref), np.ones([3, 4, 5])) - ref = test_functions.test_alias_h.remote() - assert_equal(ray.get(ref), np.ones([3, 4, 5])) - - ray.worker.cleanup() - def testKeywordArgs(self): reload(test_functions) ray.init(start_ray_local=True, num_workers=1) @@ -259,7 +246,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testDefiningRemoteFunctions(self): - ray.init(start_ray_local=True, num_workers=2) + ray.init(start_ray_local=True, num_workers=3) # Test that we can define a remote function in the shell. @ray.remote @@ -296,7 +283,7 @@ class APITest(unittest.TestCase): return x + 1 @ray.remote def l(x): - return k.remote(x) + return ray.get(k.remote(x)) @ray.remote def m(x): return ray.get(l.remote(x)) @@ -403,24 +390,6 @@ class ReferenceCountingTest(unittest.TestCase): reload(module) ray.init(start_ray_local=True, num_workers=1) - x = test_functions.test_alias_f.remote() - ray.get(x) - time.sleep(0.1) - objectid_val = x.id - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1) - - del x - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], -1) # -1 indicates deallocated - - y = test_functions.test_alias_h.remote() - ray.get(y) - time.sleep(0.1) - objectid_val = y.id - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 0, 0]) - - del y - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1]) - z = da.zeros.remote([da.BLOCK_SIZE, 2 * da.BLOCK_SIZE]) time.sleep(0.1) objectid_val = z.id @@ -493,7 +462,10 @@ class PythonModeTest(unittest.TestCase): reload(test_functions) ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE) - xref = test_functions.test_alias_h.remote() + @ray.remote + def f(): + return np.ones([3, 4, 5]) + xref = f.remote() assert_equal(xref, np.ones([3, 4, 5])) # remote functions should return by value assert_equal(xref, ray.get(xref)) # ray.get should be the identity y = np.random.normal(size=[11, 12]) diff --git a/test/test_functions.py b/test/test_functions.py index d12585730..b8a242ac0 100644 --- a/test/test_functions.py +++ b/test/test_functions.py @@ -8,20 +8,6 @@ import numpy as np def handle_int(a, b): return a + 1, b + 1 -# Test aliasing - -@ray.remote -def test_alias_f(): - return np.ones([3, 4, 5]) - -@ray.remote -def test_alias_g(): - return test_alias_f.remote() - -@ray.remote -def test_alias_h(): - return test_alias_g.remote() - # Test timing @ray.remote