diff --git a/doc/source/tempfile.rst b/doc/source/tempfile.rst index 0666729b4..11b3a772c 100644 --- a/doc/source/tempfile.rst +++ b/doc/source/tempfile.rst @@ -47,7 +47,7 @@ A typical layout of temporary files could look like this: │   ├── monitor.out │   ├── plasma_store_0.err # array of plasma stores' outputs │   ├── plasma_store_0.out - │   ├── raylet_0.err # array of raylets' outputs. Control it with `--no-redirect-worker-output` (in Ray's command line) or `redirect_worker_output` (in ray.init()) + │   ├── raylet_0.err │   ├── raylet_0.out │   ├── redis-shard_0.err # array of redis shards' outputs │   ├── redis-shard_0.out @@ -80,4 +80,4 @@ The path you specified will be given as it is without being affected any other p Notes ----- -Temporary file policies are defined in ``python/ray/node.py``. \ No newline at end of file +Temporary file policies are defined in ``python/ray/node.py``. diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index 1a05f0013..9bdced7c7 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -35,6 +35,7 @@ class LinearModel(object): variables (TensorFlowVariables): Extracted variables and methods to manipulate them. """ + def __init__(self, shape): """Creates a LinearModel object.""" x = tf.placeholder(tf.float32, [None, shape[0]]) @@ -46,26 +47,33 @@ class LinearModel(object): y = tf.nn.softmax(tf.matmul(x, w) + b) y_ = tf.placeholder(tf.float32, [None, shape[1]]) self.y_ = y_ - cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), - reduction_indices=[1])) + cross_entropy = tf.reduce_mean( + -tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) self.cross_entropy = cross_entropy self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b]) self.sess = tf.Session() # In order to get and set the weights, we pass in the loss function to # Ray's TensorFlowVariables to automatically create methods to modify # the weights. - self.variables = ray.experimental.TensorFlowVariables(cross_entropy, - self.sess) + self.variables = ray.experimental.TensorFlowVariables( + cross_entropy, self.sess) def loss(self, xs, ys): """Computes the loss of the network.""" - return float(self.sess.run(self.cross_entropy, - feed_dict={self.x: xs, self.y_: ys})) + return float( + self.sess.run( + self.cross_entropy, feed_dict={ + self.x: xs, + self.y_: ys + })) def grad(self, xs, ys): """Computes the gradients of the network.""" - return self.sess.run(self.cross_entropy_grads, - feed_dict={self.x: xs, self.y_: ys}) + return self.sess.run( + self.cross_entropy_grads, feed_dict={ + self.x: xs, + self.y_: ys + }) @ray.remote @@ -110,7 +118,7 @@ def full_grad(theta): if __name__ == "__main__": - ray.init(redirect_output=True) + ray.init() # From the perspective of scipy.optimize.fmin_l_bfgs_b, full_loss is simply # a function which takes some parameters theta, and computes a loss. @@ -136,5 +144,5 @@ if __name__ == "__main__": # Use L-BFGS to minimize the loss function. print("Running L-BFGS.") - result = scipy.optimize.fmin_l_bfgs_b(full_loss, theta_init, maxiter=10, - fprime=full_grad, disp=True) + result = scipy.optimize.fmin_l_bfgs_b( + full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True) diff --git a/examples/resnet/resnet_main.py b/examples/resnet/resnet_main.py index 0e052966e..3a6bb6a47 100644 --- a/examples/resnet/resnet_main.py +++ b/examples/resnet/resnet_main.py @@ -23,22 +23,41 @@ if (tf_major < 1) or (tf_major == 1 and tf_minor < 2): "update Tensorflow to the latest version.") parser = argparse.ArgumentParser(description="Run the ResNet example.") -parser.add_argument("--dataset", default="cifar10", type=str, - help="Dataset to use: cifar10 or cifar100.") -parser.add_argument("--train_data_path", - default="cifar-10-batches-bin/data_batch*", type=str, - help="Data path for the training data.") -parser.add_argument("--eval_data_path", - default="cifar-10-batches-bin/test_batch.bin", type=str, - help="Data path for the testing data.") -parser.add_argument("--eval_dir", default="/tmp/resnet-model/eval", type=str, - help="Data path for the tensorboard logs.") -parser.add_argument("--eval_batch_count", default=50, type=int, - help="Number of batches to evaluate over.") -parser.add_argument("--num_gpus", default=0, type=int, - help="Number of GPUs to use for training.") -parser.add_argument("--redis-address", default=None, type=str, - help="The Redis address of the cluster.") +parser.add_argument( + "--dataset", + default="cifar10", + type=str, + help="Dataset to use: cifar10 or cifar100.") +parser.add_argument( + "--train_data_path", + default="cifar-10-batches-bin/data_batch*", + type=str, + help="Data path for the training data.") +parser.add_argument( + "--eval_data_path", + default="cifar-10-batches-bin/test_batch.bin", + type=str, + help="Data path for the testing data.") +parser.add_argument( + "--eval_dir", + default="/tmp/resnet-model/eval", + type=str, + help="Data path for the tensorboard logs.") +parser.add_argument( + "--eval_batch_count", + default=50, + type=int, + help="Number of batches to evaluate over.") +parser.add_argument( + "--num_gpus", + default=0, + type=int, + help="Number of GPUs to use for training.") +parser.add_argument( + "--redis-address", + default=None, + type=str, + help="The Redis address of the cluster.") FLAGS = parser.parse_args() @@ -87,9 +106,8 @@ class ResNetTrainActor(object): with tf.device("/gpu:0" if num_gpus > 0 else "/cpu:0"): # Build the model. - images, labels = cifar_input.build_input(data, - hps.batch_size, dataset, - False) + images, labels = cifar_input.build_input(data, hps.batch_size, + dataset, False) self.model = resnet_model.ResNet(hps, images, labels, "train") self.model.build_graph() config = tf.ConfigProto(allow_soft_placement=True) @@ -131,9 +149,8 @@ class ResNetTestActor(object): num_gpus=0) with tf.device("/cpu:0"): # Builds the testing network. - images, labels = cifar_input.build_input(data, - hps.batch_size, dataset, - False) + images, labels = cifar_input.build_input(data, hps.batch_size, + dataset, False) self.model = resnet_model.ResNet(hps, images, labels, "eval") self.model.build_graph() config = tf.ConfigProto(allow_soft_placement=True) @@ -159,8 +176,7 @@ class ResNetTestActor(object): sess = self.model.variables.sess for _ in range(self.eval_batch_count): summaries, loss, predictions, truth = sess.run( - [model.summaries, model.cost, model.predictions, - model.labels]) + [model.summaries, model.cost, model.predictions, model.labels]) truth = np.argmax(truth, axis=1) predictions = np.argmax(predictions, axis=1) @@ -170,8 +186,7 @@ class ResNetTestActor(object): precision = 1.0 * correct_prediction / total_prediction self.best_precision = max(precision, self.best_precision) precision_summ = tf.Summary() - precision_summ.value.add( - tag="Precision", simple_value=precision) + precision_summ.value.add(tag="Precision", simple_value=precision) self.summary_writer.add_summary(precision_summ, train_step) best_precision_summ = tf.Summary() best_precision_summ.value.add( @@ -192,7 +207,7 @@ class ResNetTestActor(object): def train(): num_gpus = FLAGS.num_gpus if FLAGS.redis_address is None: - ray.init(num_gpus=num_gpus, redirect_output=True) + ray.init(num_gpus=num_gpus) else: ray.init(redis_address=FLAGS.redis_address) train_data = get_data.remote(FLAGS.train_data_path, 50000, FLAGS.dataset) @@ -200,15 +215,16 @@ def train(): # Creates an actor for each gpu, or one if only using the cpu. Each actor # has access to the dataset. if FLAGS.num_gpus > 0: - train_actors = [ResNetTrainActor.remote(train_data, FLAGS.dataset, - num_gpus) - for _ in range(num_gpus)] + train_actors = [ + ResNetTrainActor.remote(train_data, FLAGS.dataset, num_gpus) + for _ in range(num_gpus) + ] else: train_actors = [ResNetTrainActor.remote(train_data, FLAGS.dataset, 0)] test_actor = ResNetTestActor.remote(test_data, FLAGS.dataset, FLAGS.eval_batch_count, FLAGS.eval_dir) - print("The log files for tensorboard are stored at ip {}." - .format(ray.get(test_actor.get_ip_addr.remote()))) + print("The log files for tensorboard are stored at ip {}.".format( + ray.get(test_actor.get_ip_addr.remote()))) step = 0 weight_id = train_actors[0].get_weights.remote() acc_id = test_actor.accuracy.remote(weight_id, step) @@ -218,11 +234,13 @@ def train(): print("Starting training loop. Use Ctrl-C to exit.") try: while True: - all_weights = ray.get([actor.compute_steps.remote(weight_id) - for actor in train_actors]) - mean_weights = {k: (sum(weights[k] for weights in all_weights) / - num_gpus) - for k in all_weights[0]} + all_weights = ray.get([ + actor.compute_steps.remote(weight_id) for actor in train_actors + ]) + mean_weights = { + k: (sum(weights[k] for weights in all_weights) / num_gpus) + for k in all_weights[0] + } weight_id = ray.put(mean_weights) step += 10 if step % 200 == 0: diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py index 0d1813f03..ab9a63446 100644 --- a/examples/rl_pong/driver.py +++ b/examples/rl_pong/driver.py @@ -146,17 +146,26 @@ class PongEnv(object): if __name__ == "__main__": parser = argparse.ArgumentParser(description="Train an RL agent on Pong.") - parser.add_argument("--batch-size", default=10, type=int, - help="The number of rollouts to do per batch.") - parser.add_argument("--redis-address", default=None, type=str, - help="The Redis address of the cluster.") - parser.add_argument("--iterations", default=-1, type=int, - help="The number of model updates to perform. By " - "default, training will not terminate.") + parser.add_argument( + "--batch-size", + default=10, + type=int, + help="The number of rollouts to do per batch.") + parser.add_argument( + "--redis-address", + default=None, + type=str, + help="The Redis address of the cluster.") + parser.add_argument( + "--iterations", + default=-1, + type=int, + help="The number of model updates to perform. By " + "default, training will not terminate.") args = parser.parse_args() batch_size = args.batch_size - ray.init(redis_address=args.redis_address, redirect_output=True) + ray.init(redis_address=args.redis_address) # Run the reinforcement learning. @@ -187,8 +196,8 @@ if __name__ == "__main__": # Accumulate the gradient over batch. for k in model: grad_buffer[k] += grad[k] - running_reward = (reward_sum if running_reward is None - else running_reward * 0.99 + reward_sum * 0.01) + running_reward = (reward_sum if running_reward is None else + running_reward * 0.99 + reward_sum * 0.01) end_time = time.time() print("Batch {} computed {} rollouts in {} seconds, " "running mean is {}".format(batch_num, batch_size, @@ -196,8 +205,8 @@ if __name__ == "__main__": running_reward)) for k, v in model.items(): g = grad_buffer[k] - rmsprop_cache[k] = (decay_rate * rmsprop_cache[k] + - (1 - decay_rate) * g ** 2) + rmsprop_cache[k] = ( + decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2) model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5) # Reset the batch gradient buffer. grad_buffer[k] = np.zeros_like(v) diff --git a/python/ray/actor.py b/python/ray/actor.py index ecad3e44a..26ff93399 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -125,12 +125,6 @@ class ActorMethod(object): def remote(self, *args, **kwargs): return self._remote(args, kwargs) - def _submit(self, args, kwargs, num_return_vals=None): - logger.warning( - "WARNING: _submit() is being deprecated. Please use _remote().") - return self._remote( - args=args, kwargs=kwargs, num_return_vals=num_return_vals) - def _remote(self, args, kwargs, num_return_vals=None): if num_return_vals is None: num_return_vals = self._num_return_vals @@ -238,21 +232,6 @@ class ActorClass(object): """ return self._remote(args=args, kwargs=kwargs) - def _submit(self, - args, - kwargs, - num_cpus=None, - num_gpus=None, - resources=None): - logger.warning( - "WARNING: _submit() is being deprecated. Please use _remote().") - return self._remote( - args=args, - kwargs=kwargs, - num_cpus=num_cpus, - num_gpus=num_gpus, - resources=resources) - def _remote(self, args, kwargs, diff --git a/python/ray/experimental/api.py b/python/ray/experimental/api.py index 32ce48c73..ba396d8d6 100644 --- a/python/ray/experimental/api.py +++ b/python/ray/experimental/api.py @@ -6,7 +6,7 @@ import ray import numpy as np -def get(object_ids, worker=None): +def get(object_ids): """Get a single or a collection of remote objects from the object store. This method is identical to `ray.get` except it adds support for tuples, @@ -19,11 +19,8 @@ def get(object_ids, worker=None): Returns: A Python object, a list of Python objects or a dict of {key: object}. """ - # There is a dependency on ray.worker which prevents importing - # global_worker at the top of this file - worker = ray.worker.global_worker if worker is None else worker if isinstance(object_ids, (tuple, np.ndarray)): - return ray.get(list(object_ids), worker) + return ray.get(list(object_ids)) elif isinstance(object_ids, dict): keys_to_get = [ k for k, v in object_ids.items() if isinstance(v, ray.ObjectID) @@ -38,10 +35,10 @@ def get(object_ids, worker=None): result[key] = value return result else: - return ray.get(object_ids, worker) + return ray.get(object_ids) -def wait(object_ids, num_returns=1, timeout=None, worker=None): +def wait(object_ids, num_returns=1, timeout=None): """Return a list of IDs that are ready and a list of IDs that are not. This method is identical to `ray.wait` except it adds support for tuples @@ -59,13 +56,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None): A list of object IDs that are ready and a list of the remaining object IDs. """ - worker = ray.worker.global_worker if worker is None else worker if isinstance(object_ids, (tuple, np.ndarray)): return ray.wait( - list(object_ids), - num_returns=num_returns, - timeout=timeout, - worker=worker) + list(object_ids), num_returns=num_returns, timeout=timeout) - return ray.wait( - object_ids, num_returns=num_returns, timeout=timeout, worker=worker) + return ray.wait(object_ids, num_returns=num_returns, timeout=timeout) diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index 3abed7ea6..df01088aa 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -441,7 +441,7 @@ class FunctionActorManager(object): # we spend too long in this loop. # The driver function may not be found in sys.path. Try to load # the function from GCS. - with profiling.profile("wait_for_function", worker=self._worker): + with profiling.profile("wait_for_function"): self._wait_for_function(function_descriptor, driver_id) try: info = self._function_execution_info[driver_id][function_id] diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index ba561919e..780db0be9 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -91,21 +91,18 @@ class ImportThread(object): # Handle the driver case first. if self.mode != ray.WORKER_MODE: if key.startswith(b"FunctionsToRun"): - with profiling.profile( - "fetch_and_run_function", worker=self.worker): + with profiling.profile("fetch_and_run_function"): self.fetch_and_execute_function_to_run(key) # Return because FunctionsToRun are the only things that # the driver should import. return if key.startswith(b"RemoteFunction"): - with profiling.profile( - "register_remote_function", worker=self.worker): + with profiling.profile("register_remote_function"): (self.worker.function_actor_manager. fetch_and_register_remote_function(key)) elif key.startswith(b"FunctionsToRun"): - with profiling.profile( - "fetch_and_run_function", worker=self.worker): + with profiling.profile("fetch_and_run_function"): self.fetch_and_execute_function_to_run(key) elif key.startswith(b"ActorClass"): # Keep track of the fact that this actor class has been diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 65fce457b..f91bd0f53 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -8,7 +8,7 @@ from ray import profiling __all__ = ["free"] -def free(object_ids, local_only=False, worker=None): +def free(object_ids, local_only=False): """Free a list of IDs from object stores. This function is a low-level API which should be used in restricted @@ -26,8 +26,7 @@ def free(object_ids, local_only=False, worker=None): local_only (bool): Whether only deleting the list of objects in local object store or all object stores. """ - if worker is None: - worker = ray.worker.get_global_worker() + worker = ray.worker.get_global_worker() if isinstance(object_ids, ray.ObjectID): object_ids = [object_ids] @@ -37,7 +36,7 @@ def free(object_ids, local_only=False, worker=None): type(object_ids))) worker.check_connected() - with profiling.profile("ray.free", worker=worker): + with profiling.profile("ray.free"): if len(object_ids) == 0: return diff --git a/python/ray/node.py b/python/ray/node.py index ed4944898..d5c09a1c6 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -197,7 +197,7 @@ class Node(object): raise FileExistsError(errno.EEXIST, "No usable temporary filename found") - def new_log_files(self, name, redirect_output=None): + def new_log_files(self, name, redirect_output=True): """Generate partially randomized filenames for log files. Args: @@ -262,7 +262,7 @@ class Node(object): redis_shard_ports=self._ray_params.redis_shard_ports, num_redis_shards=self._ray_params.num_redis_shards, redis_max_clients=self._ray_params.redis_max_clients, - redirect_worker_output=self._ray_params.redirect_worker_output, + redirect_worker_output=True, password=self._ray_params.redis_password, redis_max_memory=self._ray_params.redis_max_memory) assert ( @@ -272,7 +272,7 @@ class Node(object): def start_log_monitor(self): """Start the log monitor.""" - stdout_file, stderr_file = self.new_log_files("log_monitor", True) + stdout_file, stderr_file = self.new_log_files("log_monitor") process_info = ray.services.start_log_monitor( self.redis_address, self._logs_dir, @@ -286,7 +286,7 @@ class Node(object): def start_ui(self): """Start the web UI.""" - stdout_file, stderr_file = self.new_log_files("webui", True) + stdout_file, stderr_file = self.new_log_files("webui") notebook_name = self._make_inc_temp( suffix=".ipynb", prefix="ray_ui", directory_name=self._temp_dir) self._webui_url, process_info = ray.services.start_ui( diff --git a/python/ray/parameter.py b/python/ray/parameter.py index e1eb82216..49d2ace4e 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -90,11 +90,10 @@ class RayParams(object): node_manager_port=None, node_ip_address=None, object_id_seed=None, - num_workers=None, local_mode=False, driver_mode=None, - redirect_worker_output=True, - redirect_output=True, + redirect_worker_output=None, + redirect_output=None, num_redis_shards=None, redis_max_clients=None, redis_password=None, @@ -124,7 +123,6 @@ class RayParams(object): self.object_manager_port = object_manager_port self.node_manager_port = node_manager_port self.node_ip_address = node_ip_address - self.num_workers = num_workers self.local_mode = local_mode self.driver_mode = driver_mode self.redirect_worker_output = redirect_worker_output @@ -186,10 +184,15 @@ class RayParams(object): "'GPU' should not be included in the resource dictionary. Use " "num_gpus instead.") - if self.num_workers is not None: - raise ValueError( - "The 'num_workers' argument is deprecated. Please use " - "'num_cpus' instead.") + if self.redirect_worker_output is not None: + raise DeprecationWarning( + "The redirect_worker_output argument is deprecated. To " + "control logging to the driver, use the 'log_to_driver' " + "argument to 'ray.init()'") + + if self.redirect_output is not None: + raise DeprecationWarning( + "The redirect_output argument is deprecated.") if self.include_java is None and self.java_worker_options is not None: raise ValueError("Should not specify `java-worker-options` " diff --git a/python/ray/profiling.py b/python/ray/profiling.py index c5bd4bdee..76921a139 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -27,7 +27,7 @@ class _NullLogSpan(object): NULL_LOG_SPAN = _NullLogSpan() -def profile(event_type, extra_data=None, worker=None): +def profile(event_type, extra_data=None): """Profile a span of time so that it appears in the timeline visualization. Note that this only works in the raylet code path. @@ -57,8 +57,7 @@ def profile(event_type, extra_data=None, worker=None): Returns: An object that can profile a span of time via a "with" statement. """ - if worker is None: - worker = ray.worker.global_worker + worker = ray.worker.global_worker return RayLogSpanRaylet(worker.profiler, event_type, extra_data=extra_data) diff --git a/python/ray/rllib/examples/carla/train_ppo.py b/python/ray/rllib/examples/carla/train_ppo.py index 6c4924014..130acf3a5 100644 --- a/python/ray/rllib/examples/carla/train_ppo.py +++ b/python/ray/rllib/examples/carla/train_ppo.py @@ -21,7 +21,7 @@ env_config.update({ }) register_carla_model() -ray.init(redirect_output=True) +ray.init() run_experiments({ "carla": { "run": "PPO", diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 50a87f4e7..74ac739c1 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -123,15 +123,6 @@ def cli(logging_level, logging_format): "limit is exceeded, redis will start LRU eviction of entries. This only " "applies to the sharded redis tables (task, object, and profile tables). " "By default this is capped at 10GB but can be set higher.") -@click.option( - "--num-workers", - required=False, - type=int, - help=("The initial number of workers to start on this node, " - "note that the local scheduler may start additional " - "workers. If you wish to control the total number of " - "concurent tasks, then use --resources instead and " - "specify the CPU field.")) @click.option( "--num-cpus", required=False, @@ -220,8 +211,8 @@ def cli(logging_level, logging_format): def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, - redis_max_memory, num_workers, num_cpus, num_gpus, resources, head, - no_ui, block, plasma_directory, huge_pages, autoscaling_config, + redis_max_memory, num_cpus, num_gpus, resources, head, no_ui, block, + plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, java_worker_options, internal_config): @@ -239,15 +230,16 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, " --resources='{\"CustomResource1\": 3, " "\"CustomReseource2\": 2}'") + redirect_worker_output = None if not no_redirect_worker_output else True + redirect_output = None if not no_redirect_output else True ray_params = ray.parameter.RayParams( node_ip_address=node_ip_address, object_manager_port=object_manager_port, node_manager_port=node_manager_port, - num_workers=num_workers, object_store_memory=object_store_memory, redis_password=redis_password, - redirect_worker_output=not no_redirect_worker_output, - redirect_output=not no_redirect_output, + redirect_worker_output=redirect_worker_output, + redirect_output=redirect_output, num_cpus=num_cpus, num_gpus=num_gpus, resources=resources, diff --git a/python/ray/tune/examples/bayesopt_example.py b/python/ray/tune/examples/bayesopt_example.py index 94b797b39..06eacd5d0 100644 --- a/python/ray/tune/examples/bayesopt_example.py +++ b/python/ray/tune/examples/bayesopt_example.py @@ -30,7 +30,7 @@ if __name__ == '__main__': parser.add_argument( "--smoke-test", action="store_true", help="Finish quickly for testing") args, _ = parser.parse_known_args() - ray.init(redirect_output=True) + ray.init() register_trainable("exp", easy_objective) diff --git a/python/ray/tune/examples/genetic_example.py b/python/ray/tune/examples/genetic_example.py index 9d206f51a..696a94219 100644 --- a/python/ray/tune/examples/genetic_example.py +++ b/python/ray/tune/examples/genetic_example.py @@ -34,7 +34,7 @@ if __name__ == '__main__': parser.add_argument( "--smoke-test", action="store_true", help="Finish quickly for testing") args, _ = parser.parse_known_args() - ray.init(redirect_output=True) + ray.init() register_trainable("exp", michalewicz_function) diff --git a/python/ray/tune/examples/hyperopt_example.py b/python/ray/tune/examples/hyperopt_example.py index 0dce74d52..029000e0a 100644 --- a/python/ray/tune/examples/hyperopt_example.py +++ b/python/ray/tune/examples/hyperopt_example.py @@ -33,7 +33,7 @@ if __name__ == '__main__': parser.add_argument( "--smoke-test", action="store_true", help="Finish quickly for testing") args, _ = parser.parse_known_args() - ray.init(redirect_output=True) + ray.init() register_trainable("exp", easy_objective) diff --git a/python/ray/tune/examples/nevergrad_example.py b/python/ray/tune/examples/nevergrad_example.py index 1f85b254a..0e13e3430 100644 --- a/python/ray/tune/examples/nevergrad_example.py +++ b/python/ray/tune/examples/nevergrad_example.py @@ -31,7 +31,7 @@ if __name__ == '__main__': parser.add_argument( "--smoke-test", action="store_true", help="Finish quickly for testing") args, _ = parser.parse_known_args() - ray.init(redirect_output=True) + ray.init() register_trainable("exp", easy_objective) diff --git a/python/ray/tune/examples/sigopt_example.py b/python/ray/tune/examples/sigopt_example.py index e242a7ae5..8a583e48c 100644 --- a/python/ray/tune/examples/sigopt_example.py +++ b/python/ray/tune/examples/sigopt_example.py @@ -34,7 +34,7 @@ if __name__ == '__main__': parser.add_argument( "--smoke-test", action="store_true", help="Finish quickly for testing") args, _ = parser.parse_known_args() - ray.init(redirect_output=True) + ray.init() register_trainable("exp", easy_objective) diff --git a/python/ray/tune/examples/skopt_example.py b/python/ray/tune/examples/skopt_example.py index d2ce69221..87cadac6e 100644 --- a/python/ray/tune/examples/skopt_example.py +++ b/python/ray/tune/examples/skopt_example.py @@ -31,7 +31,7 @@ if __name__ == '__main__': parser.add_argument( "--smoke-test", action="store_true", help="Finish quickly for testing") args, _ = parser.parse_known_args() - ray.init(redirect_output=True) + ray.init() register_trainable("exp", easy_objective) diff --git a/python/ray/worker.py b/python/ray/worker.py index 8a7fe7b5a..01aebf44c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -579,7 +579,7 @@ class Worker(object): Returns: The return object IDs for this task. """ - with profiling.profile("submit_task", worker=self): + with profiling.profile("submit_task"): if actor_id is None: assert actor_handle_id is None actor_id = ActorID.nil() @@ -828,7 +828,7 @@ class Worker(object): if function_name != "__ray_terminate__": self.reraise_actor_init_error() self.memory_monitor.raise_if_low_memory() - with profiling.profile("task:deserialize_arguments", worker=self): + with profiling.profile("task:deserialize_arguments"): arguments = self._get_arguments_for_execution( function_name, args) except RayTaskError as e: @@ -844,7 +844,7 @@ class Worker(object): # Execute the task. try: - with profiling.profile("task:execute", worker=self): + with profiling.profile("task:execute"): if (task.actor_id().is_nil() and task.actor_creation_id().is_nil()): outputs = function_executor(*arguments) @@ -867,7 +867,7 @@ class Worker(object): # Store the outputs in the local object store. try: - with profiling.profile("task:store_outputs", worker=self): + with profiling.profile("task:store_outputs"): # If this is an actor task, then the last object ID returned by # the task is a dummy output, not returned by the function # itself. Decrement to get the correct number of return values. @@ -952,7 +952,7 @@ class Worker(object): title = "ray_{}:{}()".format(actor.__class__.__name__, function_name) next_title = "ray_{}".format(actor.__class__.__name__) - with profiling.profile("task", extra_data=extra_data, worker=self): + with profiling.profile("task", extra_data=extra_data): with _changeproctitle(title, next_title): self._process_task(task, execution_info) # Reset the state fields so the next task can run. @@ -981,7 +981,7 @@ class Worker(object): Returns: A task from the local scheduler. """ - with profiling.profile("worker_idle", worker=self): + with profiling.profile("worker_idle"): task = self.raylet_client.get_task() # Automatically restrict the GPUs available to this task. @@ -993,7 +993,7 @@ class Worker(object): """The main loop a worker runs to receive and execute tasks.""" def exit(signum, frame): - shutdown(worker=self) + shutdown() sys.exit(0) signal.signal(signal.SIGTERM, exit) @@ -1109,8 +1109,9 @@ def print_failed_task(task_status): task_status["error_message"])) -def error_info(worker=global_worker): +def error_info(): """Return information about failed tasks.""" + worker = global_worker worker.check_connected() return (global_state.error_messages(job_id=worker.task_driver_id) + global_state.error_messages(job_id=DriverID.nil())) @@ -1251,11 +1252,9 @@ def init(redis_address=None, log_to_driver=True, node_ip_address=None, object_id_seed=None, - num_workers=None, local_mode=False, - driver_mode=None, - redirect_worker_output=True, - redirect_output=True, + redirect_worker_output=None, + redirect_output=None, ignore_reinit_error=False, num_redis_shards=None, redis_max_clients=None, @@ -1270,8 +1269,7 @@ def init(redis_address=None, plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None, - _internal_config=None, - use_raylet=None): + _internal_config=None): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -1320,10 +1318,6 @@ def init(redis_address=None, manner. However, the same ID should not be used for different jobs. local_mode (bool): True if the code should be executed serially without Ray. This is useful for debugging. - redirect_worker_output: True if the stdout and stderr of worker - processes should be redirected to files. - redirect_output (bool): True if stdout and stderr for non-worker - processes should be redirected to files and false otherwise. ignore_reinit_error: True if we should suppress errors from calling ray.init() a second time. num_redis_shards: The number of Redis shards to start in addition to @@ -1364,18 +1358,6 @@ def init(redis_address=None, if configure_logging: setup_logger(logging_level, logging_format) - # Add the use_raylet option for backwards compatibility. - if use_raylet is not None: - if use_raylet: - logger.warning("WARNING: The use_raylet argument has been " - "deprecated. Please remove it.") - else: - raise DeprecationWarning("The use_raylet argument is deprecated. " - "Please remove it.") - - if driver_mode is not None: - raise Exception("The 'driver_mode' argument has been deprecated. " - "To run Ray in local mode, pass in local_mode=True.") if local_mode: driver_mode = LOCAL_MODE else: @@ -1424,7 +1406,6 @@ def init(redis_address=None, ray_params = ray.parameter.RayParams( redis_address=redis_address, node_ip_address=node_ip_address, - num_workers=num_workers, object_id_seed=object_id_seed, local_mode=local_mode, driver_mode=driver_mode, @@ -1458,9 +1439,6 @@ def init(redis_address=None, address_info["raylet_socket_name"] = _global_node.raylet_socket_name else: # In this case, we are connecting to an existing cluster. - if num_workers is not None: - raise Exception("When connecting to an existing cluster, " - "num_workers must not be provided.") if num_cpus is not None or num_gpus is not None: raise Exception("When connecting to an existing cluster, num_cpus " "and num_gpus must not be provided.") @@ -1548,13 +1526,7 @@ def init(redis_address=None, _post_init_hooks = [] -def cleanup(worker=global_worker): - raise DeprecationWarning( - "The function ray.worker.cleanup() has been deprecated. Instead, " - "please call ray.shutdown().") - - -def shutdown(worker=global_worker): +def shutdown(): """Disconnect the worker, and terminate processes started by ray.init(). This will automatically run at the end when a Python process that uses Ray @@ -1567,7 +1539,7 @@ def shutdown(worker=global_worker): need to redefine them. If they were defined in an imported module, then you will need to reload the module. """ - disconnect(worker) + disconnect() # Shut down the Ray processes. global _global_node @@ -1575,7 +1547,7 @@ def shutdown(worker=global_worker): _global_node.kill_all_processes(check_alive=False, allow_graceful=True) _global_node = None - worker.set_mode(None) + global_worker.set_mode(None) atexit.register(shutdown) @@ -2037,12 +2009,13 @@ def connect(info, worker.cached_functions_to_run = None -def disconnect(worker=global_worker): +def disconnect(): """Disconnect this worker from the scheduler and object store.""" # Reset the list of cached remote functions and actors so that if more # remote functions or actors are defined and then connect is called again, # the remote functions will be exported. This is mostly relevant for the # tests. + worker = global_worker if worker.connected: # Shutdown all of the threads that we've started. TODO(rkn): This # should be handled cleanly in the worker object's destructor and not @@ -2129,8 +2102,7 @@ def register_custom_serializer(cls, deserializer=None, local=False, driver_id=None, - class_id=None, - worker=global_worker): + class_id=None): """Enable serialization and deserialization for a particular class. This method runs the register_class function defined below on every worker, @@ -2159,6 +2131,7 @@ def register_custom_serializer(cls, be efficiently serialized by Ray. This can also raise an exception if use_dict is true and cls is not pickleable. """ + worker = global_worker assert (serializer is None) == (deserializer is None), ( "The serializer/deserializer arguments must both be provided or " "both not be provided.") @@ -2225,7 +2198,7 @@ def register_custom_serializer(cls, register_class_for_serialization({"worker": worker}) -def get(object_ids, worker=global_worker): +def get(object_ids): """Get a remote object or a list of remote objects from the object store. This method blocks until the object corresponding to the object ID is @@ -2245,8 +2218,9 @@ def get(object_ids, worker=global_worker): Exception: An exception is raised if the task that created the object or that created one of the objects raised an exception. """ + worker = global_worker worker.check_connected() - with profiling.profile("ray.get", worker=worker): + with profiling.profile("ray.get"): if worker.mode == LOCAL_MODE: # In LOCAL_MODE, ray.get is the identity operation (the input will # actually be a value not an objectid). @@ -2270,7 +2244,7 @@ def get(object_ids, worker=global_worker): return value -def put(value, worker=global_worker): +def put(value): """Store an object in the object store. Args: @@ -2279,8 +2253,9 @@ def put(value, worker=global_worker): Returns: The object ID assigned to this value. """ + worker = global_worker worker.check_connected() - with profiling.profile("ray.put", worker=worker): + with profiling.profile("ray.put"): if worker.mode == LOCAL_MODE: # In LOCAL_MODE, ray.put is the identity operation. return value @@ -2293,7 +2268,7 @@ def put(value, worker=global_worker): return object_id -def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): +def wait(object_ids, num_returns=1, timeout=None): """Return a list of IDs that are ready and a list of IDs that are not. .. warning:: @@ -2327,6 +2302,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): A list of object IDs that are ready and a list of the remaining object IDs. """ + worker = global_worker if isinstance(object_ids, ObjectID): raise TypeError( @@ -2356,7 +2332,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): worker.check_connected() # TODO(swang): Check main thread. - with profiling.profile("ray.wait", worker=worker): + with profiling.profile("ray.wait"): # When Ray is run in LOCAL_MODE, all functions are run immediately, # so all objects in object_id are ready. if worker.mode == LOCAL_MODE: diff --git a/test/runtest.py b/test/runtest.py index d34b2be68..b2ff6c730 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2502,7 +2502,7 @@ def test_not_logging_to_driver(shutdown_only): reason="New GCS API doesn't have a Python API yet.") def test_workers(shutdown_only): num_workers = 3 - ray.init(redirect_worker_output=True, num_cpus=num_workers) + ray.init(num_cpus=num_workers) @ray.remote def f(): diff --git a/test/stress_tests.py b/test/stress_tests.py index 117002a2f..0cf712a1a 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -204,7 +204,6 @@ def ray_start_reconstruction(request): "num_cpus": 1, "object_store_memory": plasma_store_memory // num_nodes, "redis_max_memory": 10**7, - "redirect_output": True, "_internal_config": json.dumps({ "initial_reconstruction_timeout_milliseconds": 200 }) @@ -213,7 +212,6 @@ def ray_start_reconstruction(request): cluster.add_node( num_cpus=1, object_store_memory=plasma_store_memory // num_nodes, - redirect_output=True, _internal_config=json.dumps({ "initial_reconstruction_timeout_milliseconds": 200 })) diff --git a/test/tempfile_test.py b/test/tempfile_test.py index 7bd167006..3016e8aa5 100644 --- a/test/tempfile_test.py +++ b/test/tempfile_test.py @@ -64,7 +64,7 @@ def test_temp_plasma_store_socket(): def test_raylet_tempfiles(): - ray.init(redirect_worker_output=False) + ray.init(num_cpus=0) node = ray.worker._global_node top_levels = set(os.listdir(node.get_temp_dir_path())) assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} @@ -80,23 +80,7 @@ def test_raylet_tempfiles(): assert socket_files == {"plasma_store", "raylet"} ray.shutdown() - ray.init(redirect_worker_output=True, num_cpus=0) - node = ray.worker._global_node - top_levels = set(os.listdir(node.get_temp_dir_path())) - assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} - log_files = set(os.listdir(node.get_logs_dir_path())) - assert log_files == { - "log_monitor.out", "log_monitor.err", "plasma_store.out", - "plasma_store.err", "webui.out", "webui.err", "monitor.out", - "monitor.err", "raylet_monitor.out", "raylet_monitor.err", - "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err", - "raylet.out", "raylet.err" - } # with raylet logs - socket_files = set(os.listdir(node.get_sockets_dir_path())) - assert socket_files == {"plasma_store", "raylet"} - ray.shutdown() - - ray.init(redirect_worker_output=True, num_cpus=2) + ray.init(num_cpus=2) node = ray.worker._global_node top_levels = set(os.listdir(node.get_temp_dir_path())) assert top_levels == {"ray_ui.ipynb", "sockets", "logs"}