From 2865128df0b28930cd5d178025150ecce0b7c2ec Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 26 Nov 2017 18:29:10 -0800 Subject: [PATCH] =?UTF-8?q?Remove=20counter=20from=20run=5Ffunction=5Fon?= =?UTF-8?q?=5Fall=5Fworkers.=20Also=20remove=20utilitie=E2=80=A6=20(#1260)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove counter from run_function_on_all_workers. Also remove utilities for copying directories across machines. * Fix linting. --- python/ray/experimental/__init__.py | 3 +- python/ray/experimental/utils.py | 82 ----------------------------- python/ray/worker.py | 15 ++---- test/runtest.py | 68 +++--------------------- 4 files changed, 10 insertions(+), 158 deletions(-) delete mode 100644 python/ray/experimental/utils.py diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index 94ab6498d..4f93e189f 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from .utils import copy_directory from .tfutils import TensorFlowVariables -__all__ = ["copy_directory", "TensorFlowVariables"] +__all__ = ["TensorFlowVariables"] diff --git a/python/ray/experimental/utils.py b/python/ray/experimental/utils.py deleted file mode 100644 index f802b1430..000000000 --- a/python/ray/experimental/utils.py +++ /dev/null @@ -1,82 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import io -import os -import tarfile -import sys - -import ray - - -def tarred_directory_as_bytes(source_dir): - """Tar a directory and return it as a byte string. - - Args: - source_dir (str): The name of the directory to tar. - - Returns: - A byte string representing the tarred file. - """ - # Get a BytesIO object. - string_file = io.BytesIO() - # Create an in-memory tarfile of the source directory. - with tarfile.open(mode="w:gz", fileobj=string_file) as tar: - tar.add(source_dir, arcname=os.path.basename(source_dir)) - string_file.seek(0) - return string_file.read() - - -def tarred_bytes_to_directory(tarred_bytes, target_dir): - """Take a byte string and untar it. - - Args: - tarred_bytes (str): A byte string representing the tarred file. This - should be the output of tarred_directory_as_bytes. - target_dir (str): The directory to create the untarred files in. - """ - string_file = io.BytesIO(tarred_bytes) - with tarfile.open(fileobj=string_file) as tar: - tar.extractall(path=target_dir) - - -def copy_directory(source_dir, target_dir=None): - """Copy a local directory to each machine in the Ray cluster. - - Note that both source_dir and target_dir must have the same basename). For - example, source_dir can be /a/b/c and target_dir can be /d/e/c. In this - case, the directory /d/e will be added to the Python path of each worker. - - Note that this method is not completely safe to use. For example, workers - that do not do the copying and only set their paths (only one worker per - node does the copying) may try to execute functions that use the files in - the directory being copied before the directory being copied has finished - untarring. - - Args: - source_dir (str): The directory to copy. - target_dir (str): The location to copy it to on the other machines. If - this is not provided, the source_dir will be used. If it is - provided and is different from source_dir, the source_dir also be - copied to the target_dir location on this machine. - """ - target_dir = source_dir if target_dir is None else target_dir - source_dir = os.path.abspath(source_dir) - target_dir = os.path.abspath(target_dir) - source_basename = os.path.basename(source_dir) - target_basename = os.path.basename(target_dir) - if source_basename != target_basename: - raise Exception("The source_dir and target_dir must have the same " - "base name, {} != {}".format(source_basename, - target_basename)) - tarred_bytes = tarred_directory_as_bytes(source_dir) - - def f(worker_info): - if worker_info["counter"] == 0: - tarred_bytes_to_directory(tarred_bytes, - os.path.dirname(target_dir)) - sys.path.append(os.path.dirname(target_dir)) - # Run this function on all workers to copy the directory to all nodes and - # to add the directory to the Python path of each worker. - ray.worker.global_worker.run_function_on_all_workers(f) diff --git a/python/ray/worker.py b/python/ray/worker.py index 50473868a..3d84b03ba 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -578,14 +578,9 @@ class Worker(object): function_to_run_id = hashlib.sha1(pickled_function).digest() key = b"FunctionsToRun:" + function_to_run_id - # First run the function on the driver. Pass in the number of - # workers on this node that have already started executing this - # remote function, and increment that value. Subtract 1 so that the - # counter starts at 0. - counter = self.redis_client.hincrby(self.node_ip_address, - key, 1) - 1 + # First run the function on the driver. # We always run the task locally. - function({"counter": counter, "worker": self}) + function({"worker": self}) # Check if the function has already been put into redis. function_exported = self.redis_client.setnx(b"Lock:" + key, 1) if not function_exported: @@ -1575,15 +1570,11 @@ def fetch_and_execute_function_to_run(key, worker=global_worker): """Run on arbitrary function on the worker.""" driver_id, serialized_function = worker.redis_client.hmget( key, ["driver_id", "function"]) - # Get the number of workers on this node that have already started - # executing this remote function, and increment that value. Subtract 1 so - # the counter starts at 0. - counter = worker.redis_client.hincrby(worker.node_ip_address, key, 1) - 1 try: # Deserialize the function. function = pickle.loads(serialized_function) # Run the function. - function({"counter": counter, "worker": worker}) + function({"worker": worker}) except Exception: # If an exception was thrown when the function was run, we record the # traceback and notify the scheduler of the failure. diff --git a/test/runtest.py b/test/runtest.py index 5b7672c12..5f4505781 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2,7 +2,6 @@ from __future__ import absolute_import, division, print_function import os import re -import shutil import string import sys import time @@ -1062,69 +1061,17 @@ class PythonModeTest(unittest.TestCase): ray.worker.cleanup() -class UtilsTest(unittest.TestCase): - def testCopyingDirectory(self): - # The functionality being tested here is really multi-node - # functionality, but this test just uses a single node. - - ray.init(num_workers=1) - - source_text = "hello world" - - temp_dir1 = os.path.join(os.path.dirname(__file__), "temp_dir1") - source_dir = os.path.join(temp_dir1, "dir") - source_file = os.path.join(source_dir, "file.txt") - temp_dir2 = os.path.join(os.path.dirname(__file__), "temp_dir2") - target_dir = os.path.join(temp_dir2, "dir") - target_file = os.path.join(target_dir, "file.txt") - - def remove_temporary_files(): - if os.path.exists(temp_dir1): - shutil.rmtree(temp_dir1) - if os.path.exists(temp_dir2): - shutil.rmtree(temp_dir2) - - # Remove the relevant files if they are left over from a previous run - # of this test. - remove_temporary_files() - - # Create the source files. - os.mkdir(temp_dir1) - os.mkdir(source_dir) - with open(source_file, "w") as f: - f.write(source_text) - - # Copy the source directory to the target directory. - ray.experimental.copy_directory(source_dir, target_dir) - time.sleep(0.5) - - # Check that the target files exist and are the same as the source - # files. - self.assertTrue(os.path.exists(target_dir)) - self.assertTrue(os.path.exists(target_file)) - with open(target_file, "r") as f: - self.assertEqual(f.read(), source_text) - - # Remove the relevant files to clean up. - remove_temporary_files() - - ray.worker.cleanup() - - class ResourcesTest(unittest.TestCase): def testResourceConstraints(self): num_workers = 20 ray.init(num_workers=num_workers, num_cpus=10, num_gpus=2) - # Attempt to wait for all of the workers to start up. - ray.worker.global_worker.run_function_on_all_workers( - lambda worker_info: sys.path.append(worker_info["counter"])) - @ray.remote(num_cpus=0) def get_worker_id(): - time.sleep(1) - return sys.path[-1] + time.sleep(0.1) + return os.getpid() + # Attempt to wait for all of the workers to start up. while True: if len( set( @@ -1196,15 +1143,12 @@ class ResourcesTest(unittest.TestCase): num_workers = 20 ray.init(num_workers=num_workers, num_cpus=10, num_gpus=10) - # Attempt to wait for all of the workers to start up. - ray.worker.global_worker.run_function_on_all_workers( - lambda worker_info: sys.path.append(worker_info["counter"])) - @ray.remote(num_cpus=0) def get_worker_id(): - time.sleep(1) - return sys.path[-1] + time.sleep(0.1) + return os.getpid() + # Attempt to wait for all of the workers to start up. while True: if len( set(