diff --git a/doc/aliasing.md b/doc/aliasing.md index e88e928bf..38c5daa3b 100644 --- a/doc/aliasing.md +++ b/doc/aliasing.md @@ -62,9 +62,9 @@ counting and attempt to deallocate an object, we need to be able to determine all of the object references that refer to the same object, and this data structure comes in handy for that purpose. -## Pulls and Remote Calls +## Gets and Remote Calls -When a worker calls `pull(ref)`, it first sends a message to the scheduler +When a worker calls `ray.get(ref)`, it first sends a message to the scheduler asking the scheduler to ship the object referred to by `ref` to the worker's local object store. Then the worker asks its local object store for the object referred to by `ref`. If `ref` is a canonical object reference, then that's all diff --git a/doc/reference-counting.md b/doc/reference-counting.md index dc4cb4b56..9b6919acb 100644 --- a/doc/reference-counting.md +++ b/doc/reference-counting.md @@ -77,13 +77,13 @@ because they must be passed into `AliasObjRefs` at some point). The following problem has not yet been resolved. In the following code, the result `x` will be garbage. ```python -x = ray.pull(ra.zeros([10, 10], "float")) +x = ray.get(ra.zeros([10, 10], "float")) ``` When `ra.zeros` is called, a worker will create an array of zeros and store it in an object store. An object reference to the output is returned. The call -to `ray.pull` will not copy data from the object store process to the worker +to `ray.get` will not copy data from the object store process to the worker process, but will instead give the worker process a pointer to shared memory. -After the `ray.pull` call completes, the object reference returned by +After the `ray.get` call completes, the object reference returned by `ra.zeros` will go out of scope, and the object it refers to will be deallocated from the object store. This will cause the memory that `x` points to to be garbage. diff --git a/examples/imagenet/driver.py b/examples/imagenet/driver.py index ee54ee221..44e6ab846 100644 --- a/examples/imagenet/driver.py +++ b/examples/imagenet/driver.py @@ -26,7 +26,7 @@ if __name__ == "__main__": x = imagenet.load_tarfiles_from_s3(args.s3_bucket, map(str, images), [256, 256]) # TODO(pcm): implement unicode serialization mean_image = functions.compute_mean_image(x) - mean_image = ray.pull(mean_image) + mean_image = ray.get(mean_image) print "The mean image is:" print mean_image diff --git a/examples/imagenet/functions.py b/examples/imagenet/functions.py index 5efe3da7e..ce26220f9 100644 --- a/examples/imagenet/functions.py +++ b/examples/imagenet/functions.py @@ -6,13 +6,13 @@ import ray.arrays.remote as ra @ray.remote([List[ray.ObjRef]], [int]) def num_images(batches): shape_refs = [ra.shape(batch) for batch in batches] - return sum([ray.pull(shape_ref)[0] for shape_ref in shape_refs]) + return sum([ray.get(shape_ref)[0] for shape_ref in shape_refs]) @ray.remote([List[ray.ObjRef]], [np.ndarray]) def compute_mean_image(batches): if len(batches) == 0: raise Exception("No images were passed into `compute_mean_image`.") sum_image_refs = [ra.sum(batch, axis=0) for batch in batches] - sum_images = [ray.pull(ref) for ref in sum_image_refs] + sum_images = [ray.get(ref) for ref in sum_image_refs] n_images = num_images(batches) - return np.sum(sum_images, axis=0).astype("float64") / ray.pull(n_images) + return np.sum(sum_images, axis=0).astype("float64") / ray.get(n_images) diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index b5b719012..f54314e6a 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -22,8 +22,8 @@ if __name__ == "__main__": worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "worker.py") services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=16, worker_path=worker_path) - x_batches = [ray.push(batches[i][0]) for i in range(num_batches)] - y_batches = [ray.push(batches[i][1]) for i in range(num_batches)] + x_batches = [ray.put(batches[i][0]) for i in range(num_batches)] + y_batches = [ray.put(batches[i][1]) for i in range(num_batches)] # From the perspective of scipy.optimize.fmin_l_bfgs_b, full_loss is simply a # function which takes some parameters theta, and computes a loss. Similarly, @@ -35,14 +35,14 @@ if __name__ == "__main__": # from scipy.optimize.fmin_l_bfgs_b, which simply uses it to run the L-BFGS # algorithm. def full_loss(theta): - theta_ref = ray.push(theta) + theta_ref = ray.put(theta) val_ref = ra.sum_list(*[functions.loss(theta_ref, x_batches[i], y_batches[i]) for i in range(num_batches)]) - return ray.pull(val_ref) + return ray.get(val_ref) def full_grad(theta): - theta_ref = ray.push(theta) + theta_ref = ray.put(theta) grad_ref = ra.sum_list(*[functions.grad(theta_ref, x_batches[i], y_batches[i]) for i in range(num_batches)]) - return ray.pull(grad_ref).astype("float64") # This conversion is necessary for use with fmin_l_bfgs_b. + return ray.get(grad_ref).astype("float64") # This conversion is necessary for use with fmin_l_bfgs_b. theta_init = np.zeros(functions.dim) diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py index a9328c05a..015fc0fd7 100644 --- a/examples/rl_pong/driver.py +++ b/examples/rl_pong/driver.py @@ -34,12 +34,12 @@ grad_buffer = {k: np.zeros_like(v) for k, v in model.iteritems()} # update buffe rmsprop_cache = {k: np.zeros_like(v) for k, v in model.iteritems()} # rmsprop memory while True: - modelref = ray.push(model) + modelref = ray.put(model) grads = [] for i in range(batch_size): grads.append(functions.compgrad(modelref)) for i in range(batch_size): - grad = ray.pull(grads[i]) + grad = ray.get(grads[i]) for k in model: grad_buffer[k] += grad[0][k] # accumulate grad over batch running_reward = grad[1] if running_reward is None else running_reward * 0.99 + grad[1] * 0.01 print "Batch {}. episode reward total was {}. running mean: {}".format(batch_num, grad[1], running_reward) diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 72a510e97..ec5faea2e 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -1,4 +1,4 @@ import libraylib as lib import serialization -from worker import scheduler_info, task_info, register_module, connect, disconnect, pull, push, remote +from worker import scheduler_info, task_info, register_module, connect, disconnect, get, put, remote from libraylib import ObjRef diff --git a/lib/python/ray/arrays/distributed/core.py b/lib/python/ray/arrays/distributed/core.py index bd8da610c..29ce03028 100644 --- a/lib/python/ray/arrays/distributed/core.py +++ b/lib/python/ray/arrays/distributed/core.py @@ -55,13 +55,13 @@ class DistArray(object): def assemble(self): """Assemble an array on this node from a distributed array object reference.""" - first_block = ray.pull(self.objrefs[(0,) * self.ndim]) + first_block = ray.get(self.objrefs[(0,) * self.ndim]) dtype = first_block.dtype result = np.zeros(self.shape, dtype=dtype) for index in np.ndindex(*self.num_blocks): lower = DistArray.compute_block_lower(index, self.shape) upper = DistArray.compute_block_upper(index, self.shape) - result[[slice(l, u) for (l, u) in zip(lower, upper)]] = ray.pull(self.objrefs[index]) + result[[slice(l, u) for (l, u) in zip(lower, upper)]] = ray.get(self.objrefs[index]) return result def __getitem__(self, sliced): @@ -80,7 +80,7 @@ def numpy_to_dist(a): for index in np.ndindex(*result.num_blocks): lower = DistArray.compute_block_lower(index, a.shape) upper = DistArray.compute_block_upper(index, a.shape) - result.objrefs[index] = ray.push(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) + result.objrefs[index] = ray.put(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) return result @ray.remote([List[int], str], [DistArray]) diff --git a/lib/python/ray/arrays/distributed/linalg.py b/lib/python/ray/arrays/distributed/linalg.py index a3d509416..2f082b2c0 100644 --- a/lib/python/ray/arrays/distributed/linalg.py +++ b/lib/python/ray/arrays/distributed/linalg.py @@ -17,10 +17,10 @@ def tsqr(a): a.shape == (M, N) K == min(M, N) return values: - q: DistArray, if q_full = ray.context.pull(DistArray, q).assemble(), then + q: DistArray, if q_full = ray.get(DistArray, q).assemble(), then q_full.shape == (M, K) np.allclose(np.dot(q_full.T, q_full), np.eye(K)) == True - r: np.ndarray, if r_val = ray.context.pull(np.ndarray, r), then + r: np.ndarray, if r_val = ray.get(np.ndarray, r), then r_val.shape == (K, N) np.allclose(r, np.triu(r)) == True """ @@ -108,7 +108,7 @@ def modified_lu(q): for i in range(b): L[i, i] = 1 U = np.triu(q_work)[:b, :] - return numpy_to_dist(ray.push(L)), U, S # TODO(rkn): get rid of push and pull + return numpy_to_dist(ray.put(L)), U, S # TODO(rkn): get rid of put @ray.remote([np.ndarray, np.ndarray, np.ndarray, int], [np.ndarray, np.ndarray]) def tsqr_hr_helper1(u, s, y_top_block, b): @@ -127,7 +127,7 @@ def tsqr_hr(a): """Algorithm 6 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf""" q, r_temp = tsqr(a) y, u, s = modified_lu(q) - y_blocked = ray.pull(y) + y_blocked = ray.get(y) t, y_top = tsqr_hr_helper1(u, s, y_blocked.objrefs[0, 0], a.shape[1]) r = tsqr_hr_helper2(s, r_temp) return y, t, y_top, r @@ -150,21 +150,21 @@ def qr(a): a_work = DistArray() a_work.construct(a.shape, np.copy(a.objrefs)) - result_dtype = np.linalg.qr(ray.pull(a.objrefs[0, 0]))[0].dtype.name - r_res = ray.pull(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. - y_res = ray.pull(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. + result_dtype = np.linalg.qr(ray.get(a.objrefs[0, 0]))[0].dtype.name + r_res = ray.get(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to get this right after creating it. + y_res = ray.get(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to get this right after creating it. Ts = [] for i in range(min(a.num_blocks[0], a.num_blocks[1])): # this differs from the paper, which says "for i in range(a.num_blocks[1])", but that doesn't seem to make any sense when a.num_blocks[1] > a.num_blocks[0] sub_dist_array = subblocks(a_work, range(i, a_work.num_blocks[0]), [i]) y, t, _, R = tsqr_hr(sub_dist_array) - y_val = ray.pull(y) + y_val = ray.get(y) for j in range(i, a.num_blocks[0]): y_res.objrefs[j, i] = y_val.objrefs[j - i, 0] if a.shape[0] > a.shape[1]: # in this case, R needs to be square - R_shape = ray.pull(ra.shape(R)) + R_shape = ray.get(ra.shape(R)) eye_temp = ra.eye(R_shape[1], R_shape[0], dtype_name=result_dtype) r_res.objrefs[i, i] = ra.dot(eye_temp, R) else: diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 3ecd68986..976121455 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -72,7 +72,7 @@ class Worker(object): elif result == None: return None # can't subclass None and don't need to because there is a global None # TODO(pcm): close the associated memory segment; if we don't, this leaks memory (but very little, so it is ok for now) - result.ray_objref = objref # TODO(pcm): This could be done only for the "pull" case in the future if we want to increase performance + result.ray_objref = objref # TODO(pcm): This could be done only for the "get" case in the future if we want to increase performance result.ray_deallocator = RayDealloc(self.handle, segmentid) return result @@ -141,13 +141,13 @@ def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker, pr def disconnect(worker=global_worker): ray.lib.disconnect(worker.handle) -def pull(objref, worker=global_worker): +def get(objref, worker=global_worker): ray.lib.request_object(worker.handle, objref) if worker.print_task_info: print_task_info(ray.lib.task_info(worker.handle)) return worker.get_object(objref) -def push(value, worker=global_worker): +def put(value, worker=global_worker): objref = ray.lib.get_objref(worker.handle) worker.put_object(objref, value) if worker.print_task_info: diff --git a/protos/ray.proto b/protos/ray.proto index a9ab18230..32f0f2097 100644 --- a/protos/ray.proto +++ b/protos/ray.proto @@ -29,8 +29,8 @@ service Scheduler { rpc IncrementCount(ChangeCountRequest) returns (AckReply); // Decrement the count of the object reference rpc DecrementCount(ChangeCountRequest) returns (AckReply); - // Request an object reference for an object that will be pushed to an object store - rpc PushObj(PushObjRequest) returns (PushObjReply); + // Request an object reference for an object that will be put in an object store + rpc PutObj(PutObjRequest) returns (PutObjReply); // Request delivery of an object from an object store that holds the object to the local object store rpc RequestObj(RequestObjRequest) returns (AckReply); // Used by the worker to tell the scheduler that two objrefs should refer to the same object @@ -93,11 +93,11 @@ message RequestObjRequest { uint64 objref = 2; // Object reference of the object being requested } -message PushObjRequest { - uint64 workerid = 1; // Worker that tries to push an object +message PutObjRequest { + uint64 workerid = 1; // Worker that tries to put an object } -message PushObjReply { +message PutObjReply { uint64 objref = 1; // Object reference assigned by the scheduler to the object } @@ -158,7 +158,7 @@ message SchedulerInfoReply { // Object stores service ObjStore { - // Tell the object store to begin pulling an object from another object store (called by the scheduler) + // Tell the object store to begin getting an object from another object store (called by the scheduler) rpc StartDelivery(StartDeliveryRequest) returns (AckReply); // Accept incoming data from another object store, as a stream of object chunks rpc StreamObjTo(StreamObjToRequest) returns (stream ObjChunk); @@ -171,7 +171,7 @@ service ObjStore { } message StartDeliveryRequest { - string objstore_address = 1; // Object store to pull the object from + string objstore_address = 1; // Object store to get the object from uint64 objref = 2; // Reference of object that gets delivered } diff --git a/protos/types.proto b/protos/types.proto index 85d40f859..2491a28e3 100644 --- a/protos/types.proto +++ b/protos/types.proto @@ -71,18 +71,18 @@ message Task { repeated uint64 result = 3; // Object references for result } -message Push { - uint64 objref = 1; // The objref for the pushed object +message Put { + uint64 objref = 1; // The objref for the object that was put } // This is used internally by the scheduler. From the scheduler's perspective, -// the submission of tasks (via SubmitTask) and the submission of pushes (via -// PushObj) look very similar, and so it is useful to be able to handle them +// the submission of tasks (via SubmitTask) and the submission of puts (via +// PutObj) look very similar, and so it is useful to be able to handle them // together (for example in the computation graph). message Operation { Task task = 1; - Push push = 2; - uint64 creator_operationid = 3; // The id of the task that called this task or push. + Put put = 2; + uint64 creator_operationid = 3; // The id of the task that called this task or put. } message TaskStatus { diff --git a/src/computation_graph.cc b/src/computation_graph.cc index ce13bd6bc..f2db39a29 100644 --- a/src/computation_graph.cc +++ b/src/computation_graph.cc @@ -16,6 +16,6 @@ const Task& ComputationGraph::get_task(OperationId operationid) { RAY_CHECK_NEQ(operationid, ROOT_OPERATION, "ComputationGraph attempting to get_task with operationid == ROOT_OPERATION"); RAY_CHECK_NEQ(operationid, NO_OPERATION, "ComputationGraph attempting to get_task with operationid == NO_OPERATION"); RAY_CHECK_LT(operationid, operations_.size(), "ComputationGraph attempting to get_task with operationid " << operationid << ", but operationid >= operations_.size()."); - RAY_CHECK(operations_[operationid]->has_task(), "Calling get_task with operationid " << operationid << ", but this corresponds to a push not a task."); + RAY_CHECK(operations_[operationid]->has_task(), "Calling get_task with operationid " << operationid << ", but this corresponds to a put not a task."); return operations_[operationid]->task(); } diff --git a/src/computation_graph.h b/src/computation_graph.h index 25c83534e..98ead96f3 100644 --- a/src/computation_graph.h +++ b/src/computation_graph.h @@ -19,10 +19,10 @@ public: // the new operation. This method takes ownership over operation. OperationId add_operation(std::unique_ptr operation); // Return the task corresponding to a particular OperationId. If operationid - // corresponds to a push, then fail. + // corresponds to a put, then fail. const Task& get_task(OperationId operationid); private: - // maps an OperationId to the corresponding task or push + // maps an OperationId to the corresponding task or put std::vector > operations_; // spawned_operations_[operationid] is a vector of the OperationIds of the // operations spawned by the task with OperationId operationid diff --git a/src/objstore.cc b/src/objstore.cc index 0cb4072e6..242488e88 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -7,8 +7,8 @@ const size_t ObjStoreService::CHUNK_SIZE = 8 * 1024; // this method needs to be protected by a objstore_lock_ // TODO(rkn): Make sure that we do not in fact need the objstore_lock_. We want multiple deliveries to be able to happen simultaneously. -void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { - RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " is beginning to pull objref " << objref); +void ObjStoreService::get_data_from(ObjRef objref, ObjStore::Stub& stub) { + RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " is beginning to get objref " << objref); ObjChunk chunk; ClientContext context; StreamObjToRequest stream_request; @@ -76,7 +76,7 @@ Status ObjStoreService::StartDelivery(ServerContext* context, const StartDeliver } else { RAY_CHECK_NEQ(memory_[objref].second, MemoryStatusType::DEALLOCATED, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); - RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again."); + RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to get it again."); return Status::OK; } memory_[objref].second = MemoryStatusType::PRE_ALLOCED; @@ -84,7 +84,7 @@ Status ObjStoreService::StartDelivery(ServerContext* context, const StartDeliver delivery_threads_.push_back(std::make_shared([this, address, objref]() { std::lock_guard objstores_lock(objstores_lock_); ObjStore::Stub& stub = get_objstore_stub(address); - pull_data_from(objref, stub); + get_data_from(objref, stub); })); return Status::OK; } @@ -173,14 +173,14 @@ Status ObjStoreService::DeallocateObject(ServerContext* context, const Deallocat // -------------+-------------+------------------+---------------------------- // NOT_PRESENT | ALLOC | NOT_READY | allocate object // NOT_READY | WORKER_DONE | READY | send ObjReady to scheduler -// NOT_READY | GET | NOT_READY | add to pull queue +// NOT_READY | GET | NOT_READY | add to get queue // READY | GET | READY | return handle // READY | DEALLOC | DEALLOCATED | deallocate // -------------+-------------+------------------+---------------------------- void ObjStoreService::process_objstore_request(const ObjRequest request) { switch (request.type) { case ObjRequestType::ALIAS_DONE: { - process_pulls_for_objref(request.objref); + process_gets_for_objref(request.objref); } break; default: { @@ -216,8 +216,8 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { RAY_LOG(RAY_DEBUG, "Responding to GET request: returning objref " << request.objref); send_queues_[request.workerid].send(&item.first); } else if (item.second == MemoryStatusType::NOT_READY || item.second == MemoryStatusType::NOT_PRESENT || item.second == MemoryStatusType::PRE_ALLOCED) { - std::lock_guard lock(pull_queue_lock_); - pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); + std::lock_guard lock(get_queue_lock_); + get_queue_.push_back(std::make_pair(request.workerid, request.objref)); } else { RAY_CHECK(false, "A worker requested objref " << request.objref << ", but memory_[objref].second = " << memory_[request.objref].second); } @@ -265,16 +265,16 @@ void ObjStoreService::process_requests() { } } -void ObjStoreService::process_pulls_for_objref(ObjRef objref) { +void ObjStoreService::process_gets_for_objref(ObjRef objref) { std::pair& item = memory_[objref]; - std::lock_guard pull_queue_lock(pull_queue_lock_); - for (size_t i = 0; i < pull_queue_.size(); ++i) { - if (pull_queue_[i].second == objref) { + std::lock_guard get_queue_lock(get_queue_lock_); + for (size_t i = 0; i < get_queue_.size(); ++i) { + if (get_queue_[i].second == objref) { ObjHandle& elem = memory_[objref].first; - send_queues_[pull_queue_[i].first].send(&item.first); - // Remove the pull task from the queue - std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]); - pull_queue_.pop_back(); + send_queues_[get_queue_[i].first].send(&item.first); + // Remove the get task from the queue + std::swap(get_queue_[i], get_queue_[get_queue_.size() - 1]); + get_queue_.pop_back(); i -= 1; } } @@ -300,7 +300,7 @@ void ObjStoreService::object_ready(ObjRef objref, size_t metadata_offset) { item.first.set_metadata_offset(metadata_offset); item.second = MemoryStatusType::READY; } - process_pulls_for_objref(objref); + process_gets_for_objref(objref); // Tell the scheduler that the object arrived // TODO(pcm): put this in a separate thread so we don't have to pay the latency here ClientContext objready_context; diff --git a/src/objstore.h b/src/objstore.h index bb7ce4a47..f0088781b 100644 --- a/src/objstore.h +++ b/src/objstore.h @@ -46,13 +46,13 @@ public: Status ObjStoreInfo(ServerContext* context, const ObjStoreInfoRequest* request, ObjStoreInfoReply* reply) override; void start_objstore_service(); private: - void pull_data_from(ObjRef objref, ObjStore::Stub& stub); + void get_data_from(ObjRef objref, ObjStore::Stub& stub); // check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect ObjStore::Stub& get_objstore_stub(const std::string& objstore_address); void process_worker_request(const ObjRequest request); void process_objstore_request(const ObjRequest request); void process_requests(); - void process_pulls_for_objref(ObjRef objref); + void process_gets_for_objref(ObjRef objref); ObjHandle alloc(ObjRef objref, size_t size); void object_ready(ObjRef objref, size_t metadata_offset); @@ -66,8 +66,8 @@ private: std::unordered_map> objstores_; std::mutex objstores_lock_; std::unique_ptr scheduler_stub_; - std::vector > pull_queue_; - std::mutex pull_queue_lock_; + std::vector > get_queue_; + std::mutex get_queue_lock_; MessageQueue recv_queue_; // This queue is used by workers to send tasks to the object store. std::vector > send_queues_; // This maps workerid -> queue. The object store uses these queues to send replies to the relevant workers. std::thread communicator_thread_; diff --git a/src/scheduler.cc b/src/scheduler.cc index 9f93f7ce1..e1d4e02c8 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -57,7 +57,7 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ return Status::OK; } -Status SchedulerService::PushObj(ServerContext* context, const PushObjRequest* request, PushObjReply* reply) { +Status SchedulerService::PutObj(ServerContext* context, const PutObjRequest* request, PutObjReply* reply) { ObjRef objref = register_new_object(); ObjStoreId objstoreid = get_store(request->workerid()); reply->set_objref(objref); @@ -74,8 +74,8 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ ObjRef objref = request->objref(); RAY_CHECK_LT(objref, size, "internal error: no object with objref " << objref << " exists"); { - std::lock_guard pull_queue_lock(pull_queue_lock_); - pull_queue_.push_back(std::make_pair(request->workerid(), objref)); + std::lock_guard get_queue_lock(get_queue_lock_); + get_queue_.push_back(std::make_pair(request->workerid(), objref)); } schedule(); return Status::OK; @@ -313,7 +313,7 @@ void SchedulerService::deliver_object(ObjRef canonical_objref, ObjStoreId from, void SchedulerService::schedule() { // TODO(rkn): Do this more intelligently. - perform_pulls(); // See what we can do in pull_queue_ + perform_gets(); // See what we can do in get_queue_ if (scheduling_algorithm_ == SCHEDULING_ALGORITHM_NAIVE) { schedule_tasks_naively(); // See what we can do in task_queue_ } else if (scheduling_algorithm_ == SCHEDULING_ALGORITHM_LOCALITY_AWARE) { @@ -513,20 +513,20 @@ bool SchedulerService::is_canonical(ObjRef objref) { return objref == target_objrefs_[objref]; } -void SchedulerService::perform_pulls() { - std::lock_guard pull_queue_lock(pull_queue_lock_); - // Complete all pull tasks that can be completed. - for (int i = 0; i < pull_queue_.size(); ++i) { - const std::pair& pull = pull_queue_[i]; - ObjRef objref = pull.second; - WorkerId workerid = pull.first; +void SchedulerService::perform_gets() { + std::lock_guard get_queue_lock(get_queue_lock_); + // Complete all get tasks that can be completed. + for (int i = 0; i < get_queue_.size(); ++i) { + const std::pair& get = get_queue_[i]; + ObjRef objref = get.second; + WorkerId workerid = get.first; ObjStoreId objstoreid = get_store(workerid); if (!has_canonical_objref(objref)) { RAY_LOG(RAY_ALIAS, "objref " << objref << " does not have a canonical_objref, so continuing"); continue; } ObjRef canonical_objref = get_canonical_objref(objref); - RAY_LOG(RAY_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); + RAY_LOG(RAY_DEBUG, "attempting to get objref " << get.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); int num_stores; { std::lock_guard objects_lock(objects_lock_); @@ -539,9 +539,9 @@ void SchedulerService::perform_pulls() { std::lock_guard alias_notification_queue_lock(alias_notification_queue_lock_); alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref))); } - // Remove the pull task from the queue - std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]); - pull_queue_.pop_back(); + // Remove the get task from the queue + std::swap(get_queue_[i], get_queue_[get_queue_.size() - 1]); + get_queue_.pop_back(); i -= 1; } } @@ -782,7 +782,7 @@ void SchedulerService::do_on_locks(bool lock) { std::mutex *mutexes[] = { &successful_tasks_lock_, &failed_tasks_lock_, - &pull_queue_lock_, + &get_queue_lock_, &computation_graph_lock_, &fntable_lock_, &avail_workers_lock_, diff --git a/src/scheduler.h b/src/scheduler.h index f04f86ee7..ca5ef16fc 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -55,7 +55,7 @@ public: SchedulerService(SchedulingAlgorithmType scheduling_algorithm); Status SubmitTask(ServerContext* context, const SubmitTaskRequest* request, SubmitTaskReply* reply) override; - Status PushObj(ServerContext* context, const PushObjRequest* request, PushObjReply* reply) override; + Status PutObj(ServerContext* context, const PutObjRequest* request, PutObjReply* reply) override; Status RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) override; Status AliasObjRefs(ServerContext* context, const AliasObjRefsRequest* request, AckReply* reply) override; Status RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) override; @@ -101,7 +101,7 @@ private: // checks if objref is a canonical objref bool is_canonical(ObjRef objref); - void perform_pulls(); + void perform_gets(); // schedule tasks using the naive algorithm void schedule_tasks_naively(); // schedule tasks using a scheduling algorithm that takes into account data locality @@ -174,9 +174,9 @@ private: // List of pending tasks. std::deque task_queue_; std::mutex task_queue_lock_; - // List of pending pull calls. - std::vector > pull_queue_; - std::mutex pull_queue_lock_; + // List of pending get calls. + std::vector > get_queue_; + std::mutex get_queue_lock_; // List of failed tasks std::vector failed_tasks_; std::mutex failed_tasks_lock_; diff --git a/src/worker.cc b/src/worker.cc index 861c0b1d7..58eb31506 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -73,11 +73,11 @@ void Worker::request_object(ObjRef objref) { ObjRef Worker::get_objref() { // first get objref for the new object RAY_CHECK(connected_, "Attempted to perform get_objref but failed."); - PushObjRequest push_request; - PushObjReply push_reply; - ClientContext push_context; - Status push_status = scheduler_stub_->PushObj(&push_context, push_request, &push_reply); - return push_reply.objref(); + PutObjRequest request; + PutObjReply reply; + ClientContext context; + Status status = scheduler_stub_->PutObj(&context, request, &reply); + return reply.objref(); } slice Worker::get_object(ObjRef objref) { diff --git a/test/arrays_test.py b/test/arrays_test.py index 106d72337..12b1cf537 100644 --- a/test/arrays_test.py +++ b/test/arrays_test.py @@ -19,27 +19,27 @@ class ArraysSingleTest(unittest.TestCase): # test eye ref = ra.eye(3) - val = ray.pull(ref) + val = ray.get(ref) self.assertTrue(np.alltrue(val == np.eye(3))) # test zeros ref = ra.zeros([3, 4, 5]) - val = ray.pull(ref) + val = ray.get(ref) self.assertTrue(np.alltrue(val == np.zeros([3, 4, 5]))) # test qr - pass by value val_a = np.random.normal(size=[10, 13]) ref_q, ref_r = ra.linalg.qr(val_a) - val_q = ray.pull(ref_q) - val_r = ray.pull(ref_r) + val_q = ray.get(ref_q) + val_r = ray.get(ref_r) self.assertTrue(np.allclose(np.dot(val_q, val_r), val_a)) # test qr - pass by objref a = ra.random.normal([10, 13]) ref_q, ref_r = ra.linalg.qr(a) - val_a = ray.pull(a) - val_q = ray.pull(ref_q) - val_r = ray.pull(ref_r) + val_a = ray.get(a) + val_q = ray.get(ref_q) + val_r = ray.get(ref_r) self.assertTrue(np.allclose(np.dot(val_q, val_r), val_a)) services.cleanup() @@ -50,7 +50,7 @@ class ArraysDistTest(unittest.TestCase): [w] = services.start_singlenode_cluster(return_drivers=True) x = da.DistArray() - x.construct([2, 3, 4], np.array([[[ray.push(0, w)]]])) + x.construct([2, 3, 4], np.array([[[ray.put(0, w)]]])) capsule, _ = serialization.serialize(w.handle, x) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE y = serialization.deserialize(w.handle, capsule) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE self.assertEqual(x.shape, y.shape) @@ -75,25 +75,25 @@ class ArraysDistTest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path) x = da.zeros([9, 25, 51], "float") - self.assertTrue(np.alltrue(ray.pull(da.assemble(x)) == np.zeros([9, 25, 51]))) + self.assertTrue(np.alltrue(ray.get(da.assemble(x)) == np.zeros([9, 25, 51]))) x = da.ones([11, 25, 49], dtype_name="float") - self.assertTrue(np.alltrue(ray.pull(da.assemble(x)) == np.ones([11, 25, 49]))) + self.assertTrue(np.alltrue(ray.get(da.assemble(x)) == np.ones([11, 25, 49]))) x = da.random.normal([11, 25, 49]) y = da.copy(x) - self.assertTrue(np.alltrue(ray.pull(da.assemble(x)) == ray.pull(da.assemble(y)))) + self.assertTrue(np.alltrue(ray.get(da.assemble(x)) == ray.get(da.assemble(y)))) x = da.eye(25, dtype_name="float") - self.assertTrue(np.alltrue(ray.pull(da.assemble(x)) == np.eye(25))) + self.assertTrue(np.alltrue(ray.get(da.assemble(x)) == np.eye(25))) x = da.random.normal([25, 49]) y = da.triu(x) - self.assertTrue(np.alltrue(ray.pull(da.assemble(y)) == np.triu(ray.pull(da.assemble(x))))) + self.assertTrue(np.alltrue(ray.get(da.assemble(y)) == np.triu(ray.get(da.assemble(x))))) x = da.random.normal([25, 49]) y = da.tril(x) - self.assertTrue(np.alltrue(ray.pull(da.assemble(y)) == np.tril(ray.pull(da.assemble(x))))) + self.assertTrue(np.alltrue(ray.get(da.assemble(y)) == np.tril(ray.get(da.assemble(x))))) x = da.random.normal([25, 49]) y = da.random.normal([49, 18]) @@ -101,42 +101,42 @@ class ArraysDistTest(unittest.TestCase): w = da.assemble(z) u = da.assemble(x) v = da.assemble(y) - np.allclose(ray.pull(w), np.dot(ray.pull(u), ray.pull(v))) - self.assertTrue(np.allclose(ray.pull(w), np.dot(ray.pull(u), ray.pull(v)))) + np.allclose(ray.get(w), np.dot(ray.get(u), ray.get(v))) + self.assertTrue(np.allclose(ray.get(w), np.dot(ray.get(u), ray.get(v)))) # test add x = da.random.normal([23, 42]) y = da.random.normal([23, 42]) z = da.add(x, y) - self.assertTrue(np.allclose(ray.pull(da.assemble(z)), ray.pull(da.assemble(x)) + ray.pull(da.assemble(y)))) + self.assertTrue(np.allclose(ray.get(da.assemble(z)), ray.get(da.assemble(x)) + ray.get(da.assemble(y)))) # test subtract x = da.random.normal([33, 40]) y = da.random.normal([33, 40]) z = da.subtract(x, y) - self.assertTrue(np.allclose(ray.pull(da.assemble(z)), ray.pull(da.assemble(x)) - ray.pull(da.assemble(y)))) + self.assertTrue(np.allclose(ray.get(da.assemble(z)), ray.get(da.assemble(x)) - ray.get(da.assemble(y)))) # test transpose x = da.random.normal([234, 432]) y = da.transpose(x) - self.assertTrue(np.alltrue(ray.pull(da.assemble(x)).T == ray.pull(da.assemble(y)))) + self.assertTrue(np.alltrue(ray.get(da.assemble(x)).T == ray.get(da.assemble(y)))) # test numpy_to_dist x = da.random.normal([23, 45]) y = da.assemble(x) z = da.numpy_to_dist(y) w = da.assemble(z) - self.assertTrue(np.alltrue(ray.pull(da.assemble(x)) == ray.pull(da.assemble(z)))) - self.assertTrue(np.alltrue(ray.pull(y) == ray.pull(w))) + self.assertTrue(np.alltrue(ray.get(da.assemble(x)) == ray.get(da.assemble(z)))) + self.assertTrue(np.alltrue(ray.get(y) == ray.get(w))) # test da.tsqr for shape in [[123, da.BLOCK_SIZE], [7, da.BLOCK_SIZE], [da.BLOCK_SIZE, da.BLOCK_SIZE], [da.BLOCK_SIZE, 7], [10 * da.BLOCK_SIZE, da.BLOCK_SIZE]]: x = da.random.normal(shape) K = min(shape) q, r = da.linalg.tsqr(x) - x_val = ray.pull(da.assemble(x)) - q_val = ray.pull(da.assemble(q)) - r_val = ray.pull(r) + x_val = ray.get(da.assemble(x)) + q_val = ray.get(da.assemble(q)) + r_val = ray.get(r) self.assertTrue(r_val.shape == (K, shape[1])) self.assertTrue(np.alltrue(r_val == np.triu(r_val))) self.assertTrue(np.allclose(x_val, np.dot(q_val, r_val))) @@ -150,11 +150,11 @@ class ArraysDistTest(unittest.TestCase): m = ra.random.normal([d1, d2]) q, r = ra.linalg.qr(m) l, u, s = da.linalg.modified_lu(da.numpy_to_dist(q)) - q_val = ray.pull(q) - r_val = ray.pull(r) - l_val = ray.pull(da.assemble(l)) - u_val = ray.pull(u) - s_val = ray.pull(s) + q_val = ray.get(q) + r_val = ray.get(r) + l_val = ray.get(da.assemble(l)) + u_val = ray.get(u) + s_val = ray.get(s) s_mat = np.zeros((d1, d2)) for i in range(len(s_val)): s_mat[i, i] = s_val[i] @@ -170,11 +170,11 @@ class ArraysDistTest(unittest.TestCase): print "testing dist_tsqr_hr with d1 = " + str(d1) + ", d2 = " + str(d2) a = da.random.normal([d1, d2]) y, t, y_top, r = da.linalg.tsqr_hr(a) - a_val = ray.pull(da.assemble(a)) - y_val = ray.pull(da.assemble(y)) - t_val = ray.pull(t) - y_top_val = ray.pull(y_top) - r_val = ray.pull(r) + a_val = ray.get(da.assemble(a)) + y_val = ray.get(da.assemble(y)) + t_val = ray.get(t) + y_top_val = ray.get(y_top) + r_val = ray.get(r) tall_eye = np.zeros((d1, min(d1, d2))) np.fill_diagonal(tall_eye, 1) q = tall_eye - np.dot(y_val, np.dot(t_val, y_top_val.T)) @@ -189,9 +189,9 @@ class ArraysDistTest(unittest.TestCase): a = da.random.normal([d1, d2]) K = min(d1, d2) q, r = da.linalg.qr(a) - a_val = ray.pull(da.assemble(a)) - q_val = ray.pull(da.assemble(q)) - r_val = ray.pull(da.assemble(r)) + a_val = ray.get(da.assemble(a)) + q_val = ray.get(da.assemble(q)) + r_val = ray.get(da.assemble(r)) self.assertTrue(q_val.shape == (d1, K)) self.assertTrue(r_val.shape == (K, d2)) self.assertTrue(np.allclose(np.dot(q_val.T, q_val), np.eye(K))) diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index 03d34eaf3..22ed29035 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -45,33 +45,33 @@ class MicroBenchmarkTest(unittest.TestCase): print " worst: {}".format(elapsed_times[999]) self.assertTrue(average_elapsed_time < 0.002) # should take 0.001 - # measure the time required to submit a remote task to the scheduler and pull the result + # measure the time required to submit a remote task to the scheduler and get the result elapsed_times = [] for _ in range(1000): start_time = time.time() x = test_functions.trivial_function() - ray.pull(x) + ray.get(x) end_time = time.time() elapsed_times.append(end_time - start_time) elapsed_times = np.sort(elapsed_times) average_elapsed_time = sum(elapsed_times) / 1000 - print "Time required to submit a trivial function call and pull the result:" + print "Time required to submit a trivial function call and get the result:" print " Average: {}".format(average_elapsed_time) print " 90th percentile: {}".format(elapsed_times[900]) print " 99th percentile: {}".format(elapsed_times[990]) print " worst: {}".format(elapsed_times[999]) self.assertTrue(average_elapsed_time < 0.002) # should take 0.0013 - # measure the time required to do do a push + # measure the time required to do do a put elapsed_times = [] for _ in range(1000): start_time = time.time() - ray.push(1) + ray.put(1) end_time = time.time() elapsed_times.append(end_time - start_time) elapsed_times = np.sort(elapsed_times) average_elapsed_time = sum(elapsed_times) / 1000 - print "Time required to push an int:" + print "Time required to put an int:" print " Average: {}".format(average_elapsed_time) print " 90th percentile: {}".format(elapsed_times[900]) print " 99th percentile: {}".format(elapsed_times[990]) diff --git a/test/runtest.py b/test/runtest.py index 1caa7c0e6..e9c7542a7 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -59,10 +59,10 @@ class SerializationTest(unittest.TestCase): self.numpyTypeTest(w, 'float32') self.numpyTypeTest(w, 'float64') - ref0 = ray.push(0, w) - ref1 = ray.push(0, w) - ref2 = ray.push(0, w) - ref3 = ray.push(0, w) + ref0 = ray.put(0, w) + ref1 = ray.put(0, w) + ref2 = ray.put(0, w) + ref3 = ray.put(0, w) a = np.array([[ref0, ref1], [ref2, ref3]]) capsule, _ = serialization.serialize(w.handle, a) @@ -82,45 +82,45 @@ class ObjStoreTest(unittest.TestCase): def testObjStore(self): [w1, w2] = services.start_singlenode_cluster(return_drivers=True, num_objstores=2, num_workers_per_objstore=0) - # pushing and pulling an object shouldn't change it + # putting and getting an object shouldn't change it for data in ["h", "h" * 10000, 0, 0.0]: - objref = ray.push(data, w1) - result = ray.pull(objref, w1) + objref = ray.put(data, w1) + result = ray.get(objref, w1) self.assertEqual(result, data) - # pushing an object, shipping it to another worker, and pulling it shouldn't change it + # putting an object, shipping it to another worker, and getting it shouldn't change it for data in ["h", "h" * 10000, 0, 0.0, [1, 2, 3, "a", (1, 2)], ("a", ("b", 3))]: - objref = worker.push(data, w1) - result = worker.pull(objref, w2) + objref = worker.put(data, w1) + result = worker.get(objref, w2) self.assertEqual(result, data) - # pushing an array, shipping it to another worker, and pulling it shouldn't change it + # putting an array, shipping it to another worker, and getting it shouldn't change it for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25])]: - objref = worker.push(data, w1) - result = worker.pull(objref, w2) + objref = worker.put(data, w1) + result = worker.get(objref, w2) self.assertTrue(np.alltrue(result == data)) """ - # pulling multiple times shouldn't matter + # getting multiple times shouldn't matter for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]: - objref = worker.push(data, w1) - result = worker.pull(objref, w2) - result = worker.pull(objref, w2) - result = worker.pull(objref, w2) + objref = worker.put(data, w1) + result = worker.get(objref, w2) + result = worker.get(objref, w2) + result = worker.get(objref, w2) self.assertTrue(np.alltrue(result == data)) """ # shipping a numpy array inside something else should be fine data = ("a", np.random.normal(size=[10, 10])) - objref = worker.push(data, w1) - result = worker.pull(objref, w2) + objref = worker.put(data, w1) + result = worker.get(objref, w2) self.assertTrue(data[0] == result[0]) self.assertTrue(np.alltrue(data[1] == result[1])) # shipping a numpy array inside something else should be fine data = ["a", np.random.normal(size=[10, 10])] - objref = worker.push(data, w1) - result = worker.pull(objref, w2) + objref = worker.put(data, w1) + result = worker.get(objref, w2) self.assertTrue(data[0] == result[0]) self.assertTrue(np.alltrue(data[1] == result[1])) @@ -128,31 +128,31 @@ class ObjStoreTest(unittest.TestCase): class WorkerTest(unittest.TestCase): - def testPushPull(self): + def testPutGet(self): [w] = services.start_singlenode_cluster(return_drivers=True) for i in range(100): value_before = i * 10 ** 6 - objref = ray.push(value_before, w) - value_after = ray.pull(objref, w) + objref = ray.put(value_before, w) + value_after = ray.get(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = i * 10 ** 6 * 1.0 - objref = ray.push(value_before, w) - value_after = ray.pull(objref, w) + objref = ray.put(value_before, w) + value_after = ray.get(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = "h" * i - objref = ray.push(value_before, w) - value_after = ray.pull(objref, w) + objref = ray.put(value_before, w) + value_after = ray.get(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = [1] * i - objref = ray.push(value_before, w) - value_after = ray.pull(objref, w) + objref = ray.put(value_before, w) + value_after = ray.get(objref, w) self.assertEqual(value_before, value_after) services.cleanup() @@ -164,11 +164,11 @@ class APITest(unittest.TestCase): [w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=worker_path) objref = w.submit_task("test_functions.test_alias_f", []) - self.assertTrue(np.alltrue(ray.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(ray.get(objref[0], w) == np.ones([3, 4, 5]))) objref = w.submit_task("test_functions.test_alias_g", []) - self.assertTrue(np.alltrue(ray.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(ray.get(objref[0], w) == np.ones([3, 4, 5]))) objref = w.submit_task("test_functions.test_alias_h", []) - self.assertTrue(np.alltrue(ray.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(ray.get(objref[0], w) == np.ones([3, 4, 5]))) services.cleanup() @@ -177,35 +177,35 @@ class APITest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path) x = test_functions.keyword_fct1(1) - self.assertEqual(ray.pull(x), "1 hello") + self.assertEqual(ray.get(x), "1 hello") x = test_functions.keyword_fct1(1, "hi") - self.assertEqual(ray.pull(x), "1 hi") + self.assertEqual(ray.get(x), "1 hi") x = test_functions.keyword_fct1(1, b="world") - self.assertEqual(ray.pull(x), "1 world") + self.assertEqual(ray.get(x), "1 world") x = test_functions.keyword_fct2(a="w", b="hi") - self.assertEqual(ray.pull(x), "w hi") + self.assertEqual(ray.get(x), "w hi") x = test_functions.keyword_fct2(b="hi", a="w") - self.assertEqual(ray.pull(x), "w hi") + self.assertEqual(ray.get(x), "w hi") x = test_functions.keyword_fct2(a="w") - self.assertEqual(ray.pull(x), "w world") + self.assertEqual(ray.get(x), "w world") x = test_functions.keyword_fct2(b="hi") - self.assertEqual(ray.pull(x), "hello hi") + self.assertEqual(ray.get(x), "hello hi") x = test_functions.keyword_fct2("w") - self.assertEqual(ray.pull(x), "w world") + self.assertEqual(ray.get(x), "w world") x = test_functions.keyword_fct2("w", "hi") - self.assertEqual(ray.pull(x), "w hi") + self.assertEqual(ray.get(x), "w hi") x = test_functions.keyword_fct3(0, 1, c="w", d="hi") - self.assertEqual(ray.pull(x), "0 1 w hi") + self.assertEqual(ray.get(x), "0 1 w hi") x = test_functions.keyword_fct3(0, 1, d="hi", c="w") - self.assertEqual(ray.pull(x), "0 1 w hi") + self.assertEqual(ray.get(x), "0 1 w hi") x = test_functions.keyword_fct3(0, 1, c="w") - self.assertEqual(ray.pull(x), "0 1 w world") + self.assertEqual(ray.get(x), "0 1 w world") x = test_functions.keyword_fct3(0, 1, d="hi") - self.assertEqual(ray.pull(x), "0 1 hello hi") + self.assertEqual(ray.get(x), "0 1 hello hi") x = test_functions.keyword_fct3(0, 1) - self.assertEqual(ray.pull(x), "0 1 hello world") + self.assertEqual(ray.get(x), "0 1 hello world") services.cleanup() @@ -214,9 +214,9 @@ class APITest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path) x = test_functions.varargs_fct1(0, 1, 2) - self.assertEqual(ray.pull(x), "0 1 2") + self.assertEqual(ray.get(x), "0 1 2") x = test_functions.varargs_fct2(0, 1, 2) - self.assertEqual(ray.pull(x), "1 2") + self.assertEqual(ray.get(x), "1 2") self.assertTrue(test_functions.kwargs_exception_thrown) self.assertTrue(test_functions.varargs_and_kwargs_exception_thrown) @@ -241,14 +241,14 @@ class TaskStatusTest(unittest.TestCase): self.assertTrue(task['operationid'] not in task_ids) task_ids.add(task['operationid']) -def check_pull_deallocated(data): - x = ray.push(data) - ray.pull(x) +def check_get_deallocated(data): + x = ray.put(data) + ray.get(x) return x.val -def check_pull_not_deallocated(data): - x = ray.push(data) - y = ray.pull(x) +def check_get_not_deallocated(data): + x = ray.put(data) + y = ray.get(x) return y, x.val class ReferenceCountingTest(unittest.TestCase): @@ -258,7 +258,7 @@ class ReferenceCountingTest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) x = test_functions.test_alias_f() - ray.pull(x) + ray.get(x) time.sleep(0.1) objref_val = x.val self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val] == 1) @@ -267,7 +267,7 @@ class ReferenceCountingTest(unittest.TestCase): self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val] == -1) # -1 indicates deallocated y = test_functions.test_alias_h() - ray.pull(y) + ray.get(y) time.sleep(0.1) objref_val = y.val self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 0, 0]) @@ -303,22 +303,22 @@ class ReferenceCountingTest(unittest.TestCase): services.cleanup() - def testPull(self): + def testGet(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]: - objref_val = check_pull_deallocated(val) + objref_val = check_get_deallocated(val) self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], -1) if not isinstance(val, bool) and val is not None: - x, objref_val = check_pull_not_deallocated(val) + x, objref_val = check_get_not_deallocated(val) self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], 1) services.cleanup() @unittest.expectedFailure - def testPullFailing(self): + def testGetFailing(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) @@ -326,10 +326,10 @@ class ReferenceCountingTest(unittest.TestCase): # refcounts and therefore cannot keep the refcount up # (see 5281bd414f6b404f61e1fe25ec5f6651defee206). # The resulting behavior is still correct however because True, False and - # None are returned by pull "by value" and therefore can be reclaimed from + # None are returned by get "by value" and therefore can be reclaimed from # the object store safely. for val in [True, False, None]: - x, objref_val = check_pull_not_deallocated(val) + x, objref_val = check_get_not_deallocated(val) self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], 1) services.cleanup()