From 7cd00741b1da10570a5316365b8f0e3dbeaf9342 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 7 Apr 2017 23:19:24 -0700 Subject: [PATCH] Suppress irrelevant Redis connection errors. (#434) * Suppress error messages in worker import thread when Redis terminates. * Suppress some warnings from one of the tests. --- python/ray/worker.py | 59 ++++++++++++++++++++++++-------------------- test/actor_test.py | 2 +- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index be20030bd..a56507818 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1295,33 +1295,38 @@ def import_thread(worker): raise Exception("This code should be unreachable.") num_imported += 1 - for msg in worker.import_pubsub_client.listen(): - with worker.lock: - if msg["type"] == "psubscribe": - continue - assert msg["data"] == b"rpush" - num_imports = worker.redis_client.llen("Exports") - assert num_imports >= num_imported - for i in range(num_imported, num_imports): - key = worker.redis_client.lindex("Exports", i) - if key.startswith(b"RemoteFunction"): - with log_span("ray:import_remote_function", worker=worker): - fetch_and_register_remote_function(key, worker=worker) - elif key.startswith(b"EnvironmentVariables"): - with log_span("ray:import_environment_variable", worker=worker): - fetch_and_register_environment_variable(key, worker=worker) - elif key.startswith(b"FunctionsToRun"): - with log_span("ray:import_function_to_run", worker=worker): - fetch_and_execute_function_to_run(key, worker=worker) - elif key.startswith(b"Actor"): - # Only get the actor if the actor ID matches the actor ID of this - # worker. - actor_id, = worker.redis_client.hmget(key, "actor_id") - if worker.actor_id == actor_id: - worker.fetch_and_register["Actor"](key, worker) - else: - raise Exception("This code should be unreachable.") - num_imported += 1 + try: + for msg in worker.import_pubsub_client.listen(): + with worker.lock: + if msg["type"] == "psubscribe": + continue + assert msg["data"] == b"rpush" + num_imports = worker.redis_client.llen("Exports") + assert num_imports >= num_imported + for i in range(num_imported, num_imports): + key = worker.redis_client.lindex("Exports", i) + if key.startswith(b"RemoteFunction"): + with log_span("ray:import_remote_function", worker=worker): + fetch_and_register_remote_function(key, worker=worker) + elif key.startswith(b"EnvironmentVariables"): + with log_span("ray:import_environment_variable", worker=worker): + fetch_and_register_environment_variable(key, worker=worker) + elif key.startswith(b"FunctionsToRun"): + with log_span("ray:import_function_to_run", worker=worker): + fetch_and_execute_function_to_run(key, worker=worker) + elif key.startswith(b"Actor"): + # Only get the actor if the actor ID matches the actor ID of this + # worker. + actor_id, = worker.redis_client.hmget(key, "actor_id") + if worker.actor_id == actor_id: + worker.fetch_and_register["Actor"](key, worker) + else: + raise Exception("This code should be unreachable.") + num_imported += 1 + except redis.ConnectionError: + # When Redis terminates the listen call will throw a ConnectionError, which + # we catch here. + pass def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, diff --git a/test/actor_test.py b/test/actor_test.py index 066b07db4..9f3f95302 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -699,7 +699,7 @@ class ActorsWithGPUs(unittest.TestCase): num_gpus_per_scheduler = 10 ray.worker._init( start_ray_local=True, num_workers=0, - num_local_schedulers=num_local_schedulers, + num_local_schedulers=num_local_schedulers, redirect_output=True, num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) @ray.remote