diff --git a/doc/source/package-ref.rst b/doc/source/package-ref.rst index 2b67a5f64..2fa9e55f6 100644 --- a/doc/source/package-ref.rst +++ b/doc/source/package-ref.rst @@ -158,13 +158,6 @@ ray.available_resources .. autofunction:: ray.available_resources -.. _ray-errors-ref: - -ray.errors -~~~~~~~~~~ - -.. autofunction:: ray.errors - Experimental APIs ----------------- diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 5e111ef94..1118313ae 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -77,7 +77,7 @@ _config = _Config() from ray.profiling import profile # noqa: E402 from ray.state import (jobs, nodes, actors, objects, timeline, object_transfer_timeline, cluster_resources, - available_resources, errors) # noqa: E402 + available_resources) # noqa: E402 from ray.worker import ( LOCAL_MODE, SCRIPT_MODE, @@ -122,7 +122,6 @@ __all__ = [ "object_transfer_timeline", "cluster_resources", "available_resources", - "errors", "LOCAL_MODE", "PYTHON_MODE", "SCRIPT_MODE", diff --git a/python/ray/dashboard/node_stats.py b/python/ray/dashboard/node_stats.py index f8c401292..76f99c67c 100644 --- a/python/ray/dashboard/node_stats.py +++ b/python/ray/dashboard/node_stats.py @@ -175,8 +175,8 @@ class NodeStats(threading.Thread): p.subscribe(log_channel) logger.info("NodeStats: subscribed to {}".format(log_channel)) - error_channel = ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB") - p.subscribe(error_channel) + error_channel = ray.gcs_utils.RAY_ERROR_PUBSUB_PATTERN + p.psubscribe(error_channel) logger.info("NodeStats: subscribed to {}".format(error_channel)) actor_channel = ray.gcs_utils.RAY_ACTOR_PUBSUB_PATTERN @@ -211,9 +211,10 @@ class NodeStats(threading.Thread): pid = str(data["pid"]) self._logs[ip][pid].extend(data["lines"]) elif channel == str(error_channel): - gcs_entry = ray.gcs_utils.GcsEntry.FromString(data) + pubsub_msg = ray.gcs_utils.PubSubMessage.FromString( + data) error_data = ray.gcs_utils.ErrorTableData.FromString( - gcs_entry.entries[0]) + pubsub_msg.data) message = error_data.error_message message = re.sub(r"\x1b\[\d+m", "", message) match = re.search(r"\(pid=(\d+), ip=(.*?)\)", message) diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 2429b9c39..8e9fa979d 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -60,13 +60,14 @@ RAY_ACTOR_PUBSUB_PATTERN = "ACTOR:*".encode("ascii") # Reporter pub/sub updates RAY_REPORTER_PUBSUB_PATTERN = "RAY_REPORTER.*".encode("ascii") +RAY_ERROR_PUBSUB_PATTERN = "ERROR_INFO:*".encode("ascii") + # These prefixes must be kept up-to-date with the TablePrefix enum in # gcs.proto. # TODO(rkn): We should use scoped enums, in which case we should be able to # just access the flatbuffer generated values. TablePrefix_RAYLET_TASK_string = "RAYLET_TASK" TablePrefix_OBJECT_string = "OBJECT" -TablePrefix_ERROR_INFO_string = "ERROR_INFO" TablePrefix_PROFILE_string = "PROFILE" TablePrefix_JOB_string = "JOB" TablePrefix_ACTOR_string = "ACTOR" diff --git a/python/ray/state.py b/python/ray/state.py index d99e9b0fe..f9a1ddaef 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -760,67 +760,6 @@ class GlobalState: return dict(total_available_resources) - def _error_messages(self, job_id): - """Get the error messages for a specific driver. - - Args: - job_id: The ID of the job to get the errors for. - - Returns: - A list of the error messages for this driver. - """ - assert isinstance(job_id, ray.JobID) - message = self.redis_client.execute_command( - "RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("ERROR_INFO"), "", - job_id.binary()) - - # If there are no errors, return early. - if message is None: - return [] - - gcs_entries = gcs_utils.GcsEntry.FromString(message) - error_messages = [] - for entry in gcs_entries.entries: - error_data = gcs_utils.ErrorTableData.FromString(entry) - assert job_id.binary() == error_data.job_id - error_message = { - "type": error_data.type, - "message": error_data.error_message, - "timestamp": error_data.timestamp, - } - error_messages.append(error_message) - return error_messages - - def error_messages(self, job_id=None): - """Get the error messages for all drivers or a specific driver. - - Args: - job_id: The specific job to get the errors for. If this is - None, then this method retrieves the errors for all jobs. - - Returns: - A list of the error messages for the specified driver if one was - given, or a dictionary mapping from job ID to a list of error - messages for that driver otherwise. - """ - self._check_connected() - - if job_id is not None: - assert isinstance(job_id, ray.JobID) - return self._error_messages(job_id) - - error_table_keys = self.redis_client.keys( - gcs_utils.TablePrefix_ERROR_INFO_string + "*") - job_ids = [ - key[len(gcs_utils.TablePrefix_ERROR_INFO_string):] - for key in error_table_keys - ] - - return { - binary_to_hex(job_id): self._error_messages(ray.JobID(job_id)) - for job_id in job_ids - } - def actor_checkpoint_info(self, actor_id): """Get checkpoint info for the given actor id. Args: @@ -1001,24 +940,3 @@ def available_resources(): resource in the cluster. """ return state.available_resources() - - -def errors(all_jobs=False): - """Get error messages from the cluster. - - Args: - all_jobs: False if we should only include error messages for this - specific job, or True if we should include error messages for all - jobs. - - Returns: - Error messages pushed from the cluster. This will be a single list if - all_jobs is False, or a dictionary mapping from job ID to a list of - error messages for that job if all_jobs is True. - """ - if not all_jobs: - worker = ray.worker.global_worker - error_messages = state.error_messages(job_id=worker.current_job_id) - else: - error_messages = state.error_messages(job_id=None) - return error_messages diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 35114d8ee..8c74f1314 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -208,17 +208,6 @@ def run_string_as_driver_nonblocking(driver_script): return proc -def flat_errors(): - errors = [] - for job_errors in ray.errors(all_jobs=True).values(): - errors.extend(job_errors) - return errors - - -def relevant_errors(error_type): - return [error for error in flat_errors() if error["type"] == error_type] - - def wait_for_num_actors(num_actors, timeout=10): start_time = time.time() while time.time() - start_time < timeout: @@ -228,16 +217,6 @@ def wait_for_num_actors(num_actors, timeout=10): raise RayTestTimeoutException("Timed out while waiting for global state.") -def wait_for_errors(error_type, num_errors, timeout=20): - start_time = time.time() - while time.time() - start_time < timeout: - if len(relevant_errors(error_type)) >= num_errors: - return - time.sleep(0.1) - raise RayTestTimeoutException("Timed out waiting for {} {} errors.".format( - num_errors, error_type)) - - def wait_for_condition(condition_predictor, timeout=30, retry_interval_ms=100): """Wait until a condition is met or time out with an exception. @@ -404,3 +383,31 @@ def get_other_nodes(cluster, exclude_head=False): def get_non_head_nodes(cluster): """Get all non-head nodes.""" return list(filter(lambda x: x.head is False, cluster.list_all_nodes())) + + +def init_error_pubsub(): + """Initialize redis error info pub/sub""" + p = ray.worker.global_worker.redis_client.pubsub( + ignore_subscribe_messages=True) + error_pubsub_channel = ray.gcs_utils.RAY_ERROR_PUBSUB_PATTERN + p.psubscribe(error_pubsub_channel) + return p + + +def get_error_message(pub_sub, num, error_type=None, timeout=10): + """Get errors through pub/sub.""" + start_time = time.time() + msgs = [] + while time.time() - start_time < timeout and len(msgs) < num: + msg = pub_sub.get_message() + if msg is None: + time.sleep(0.01) + continue + pubsub_msg = ray.gcs_utils.PubSubMessage.FromString(msg["data"]) + error_data = ray.gcs_utils.ErrorTableData.FromString(pubsub_msg.data) + if error_type is None or error_type == error_data.type: + msgs.append(error_data) + else: + time.sleep(0.01) + + return msgs diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index ffbfad1b9..a67b88c82 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -9,6 +9,7 @@ import subprocess import ray from ray.cluster_utils import Cluster +from ray.test_utils import init_error_pubsub @pytest.fixture @@ -209,3 +210,10 @@ def two_node_cluster(): # The code after the yield will run as teardown code. ray.shutdown() cluster.shutdown() + + +@pytest.fixture() +def error_pubsub(): + p = init_error_pubsub() + yield p + p.close() diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index ce0129145..22765dd95 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -12,13 +12,12 @@ import ray.ray_constants as ray_constants import ray.test_utils import ray.cluster_utils from ray.test_utils import ( - relevant_errors, wait_for_condition, - wait_for_errors, wait_for_pid_to_exit, generate_internal_config_map, get_other_nodes, SignalActor, + get_error_message, ) SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM @@ -627,10 +626,12 @@ def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes, @pytest.mark.skip(reason="TODO: Actor checkpointing") -def test_checkpointing_save_exception(ray_start_regular, +def test_checkpointing_save_exception(ray_start_regular, error_pubsub, ray_checkpointable_actor_cls): """Test actor can still be recovered if checkpoints fail to complete.""" + p = error_pubsub + @ray.remote(max_restarts=2) class RemoteCheckpointableActor(ray_checkpointable_actor_cls): def save_checkpoint(self, actor_id, checkpoint_context): @@ -663,14 +664,18 @@ def test_checkpointing_save_exception(ray_start_regular, assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False # Check that the checkpoint error was pushed to the driver. - wait_for_errors(ray_constants.CHECKPOINT_PUSH_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.CHECKPOINT_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.CHECKPOINT_PUSH_ERROR @pytest.mark.skip(reason="TODO: Actor checkpointing") -def test_checkpointing_load_exception(ray_start_regular, +def test_checkpointing_load_exception(ray_start_regular, error_pubsub, ray_checkpointable_actor_cls): """Test actor can still be recovered if checkpoints fail to load.""" + p = error_pubsub + @ray.remote(max_restarts=2) class RemoteCheckpointableActor(ray_checkpointable_actor_cls): def load_checkpoint(self, actor_id, checkpoints): @@ -704,7 +709,9 @@ def test_checkpointing_load_exception(ray_start_regular, assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False # Check that the checkpoint error was pushed to the driver. - wait_for_errors(ray_constants.CHECKPOINT_PUSH_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.CHECKPOINT_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.CHECKPOINT_PUSH_ERROR @pytest.mark.parametrize( @@ -759,14 +766,16 @@ def test_bad_checkpointable_actor_class(): return True -def test_init_exception_in_checkpointable_actor(ray_start_regular, - ray_checkpointable_actor_cls): +def test_init_exception_in_checkpointable_actor( + ray_start_regular, error_pubsub, ray_checkpointable_actor_cls): # This test is similar to test_failure.py::test_failed_actor_init. # This test is used to guarantee that checkpointable actor does not # break the same logic. error_message1 = "actor constructor failed" error_message2 = "actor method failed" + p = error_pubsub + @ray.remote class CheckpointableFailedActor(ray_checkpointable_actor_cls): def __init__(self): @@ -781,17 +790,15 @@ def test_init_exception_in_checkpointable_actor(ray_start_regular, a = CheckpointableFailedActor.remote() # Make sure that we get errors from a failed constructor. - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) - errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) + errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) assert len(errors) == 1 - assert error_message1 in errors[0]["message"] + assert error_message1 in errors[0].error_message # Make sure that we get errors from a failed method. a.fail_method.remote() - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2) - errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 2 - assert error_message1 in errors[1]["message"] + errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) + assert len(errors) == 1 + assert error_message1 in errors[0].error_message def test_decorated_method(ray_start_regular): diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 276ebd552..f769f993c 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -349,31 +349,6 @@ def test_ray_setproctitle(ray_start_2_cpus): ray.get(unique_1.remote()) -def test_duplicate_error_messages(shutdown_only): - ray.init(num_cpus=0) - - driver_id = ray.WorkerID.nil() - error_data = ray.gcs_utils.construct_error_message(driver_id, "test", - "message", 0) - - # Push the same message to the GCS twice (they are the same because we - # do not include a timestamp). - - r = ray.worker.global_worker.redis_client - - r.execute_command("RAY.TABLE_APPEND", - ray.gcs_utils.TablePrefix.Value("ERROR_INFO"), - ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"), - driver_id.binary(), error_data) - - # Before https://github.com/ray-project/ray/pull/3316 this would - # give an error - r.execute_command("RAY.TABLE_APPEND", - ray.gcs_utils.TablePrefix.Value("ERROR_INFO"), - ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"), - driver_id.binary(), error_data) - - @pytest.mark.skipif( os.getenv("TRAVIS") is None, reason="This test should only be run on Travis.") diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 16ffbe02e..6772a5937 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -14,15 +14,14 @@ import ray import ray.ray_constants as ray_constants from ray.cluster_utils import Cluster from ray.test_utils import ( - relevant_errors, wait_for_condition, - wait_for_errors, - RayTestTimeoutException, SignalActor, + init_error_pubsub, + get_error_message, ) -def test_failed_task(ray_start_regular): +def test_failed_task(ray_start_regular, error_pubsub): @ray.remote def throw_exception_fct1(): raise Exception("Test function 1 intentionally failed.") @@ -35,13 +34,15 @@ def test_failed_task(ray_start_regular): def throw_exception_fct3(x): raise Exception("Test function 3 intentionally failed.") + p = error_pubsub + throw_exception_fct1.remote() throw_exception_fct1.remote() - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2) - assert len(relevant_errors(ray_constants.TASK_PUSH_ERROR)) == 2 - for task in relevant_errors(ray_constants.TASK_PUSH_ERROR): - msg = task.get("message") - assert "Test function 1 intentionally failed." in msg + + msgs = get_error_message(p, 2, ray_constants.TASK_PUSH_ERROR) + assert len(msgs) == 2 + for msg in msgs: + assert "Test function 1 intentionally failed." in msg.error_message x = throw_exception_fct2.remote() try: @@ -120,7 +121,8 @@ def test_get_throws_quickly_when_found_exception(ray_start_regular): ray.get(signal2.send.remote()) -def test_fail_importing_remote_function(ray_start_2_cpus): +def test_fail_importing_remote_function(ray_start_2_cpus, error_pubsub): + p = error_pubsub # Create the contents of a temporary Python file. temporary_python_file = """ def temporary_helper_function(): @@ -150,11 +152,11 @@ def temporary_helper_function(): # Invoke the function so that the definition is exported. g.remote(1, y=2) - wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2) - errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR) - assert len(errors) >= 2, errors - assert "No module named" in errors[0]["message"] - assert "No module named" in errors[1]["message"] + errors = get_error_message( + p, 2, ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR) + assert errors[0].type == ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR + assert "No module named" in errors[0].error_message + assert "No module named" in errors[1].error_message # Check that if we try to call the function it throws an exception and # does not hang. @@ -164,26 +166,28 @@ def temporary_helper_function(): ray.get(g.remote(1, y=2)) f.close() - # Clean up the junk we added to sys.path. sys.path.pop(-1) -def test_failed_function_to_run(ray_start_2_cpus): +def test_failed_function_to_run(ray_start_2_cpus, error_pubsub): + p = error_pubsub + def f(worker): if ray.worker.global_worker.mode == ray.WORKER_MODE: raise Exception("Function to run failed.") ray.worker.global_worker.run_function_on_all_workers(f) - wait_for_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR, 2) # Check that the error message is in the task info. - errors = relevant_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR) + errors = get_error_message(p, 2, ray_constants.FUNCTION_TO_RUN_PUSH_ERROR) assert len(errors) == 2 - assert "Function to run failed." in errors[0]["message"] - assert "Function to run failed." in errors[1]["message"] + assert errors[0].type == ray_constants.FUNCTION_TO_RUN_PUSH_ERROR + assert "Function to run failed." in errors[0].error_message + assert "Function to run failed." in errors[1].error_message -def test_fail_importing_actor(ray_start_regular): +def test_fail_importing_actor(ray_start_regular, error_pubsub): + p = error_pubsub # Create the contents of a temporary Python file. temporary_python_file = """ def temporary_helper_function(): @@ -210,21 +214,22 @@ def temporary_helper_function(): return 1 # There should be no errors yet. - assert len(ray.errors()) == 0 - + errors = get_error_message(p, 2) + assert len(errors) == 0 # Create an actor. foo = Foo.remote(3, arg2=0) - # Wait for the error to arrive. - wait_for_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR, 1) - errors = relevant_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR) - assert "No module named" in errors[0]["message"] + errors = get_error_message(p, 2) + assert len(errors) == 2 - # Wait for the error from when the __init__ tries to run. - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) - errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) - assert ("failed to be imported, and so cannot execute this method" in - errors[0]["message"]) + for error in errors: + # Wait for the error to arrive. + if error.type == ray_constants.REGISTER_ACTOR_PUSH_ERROR: + assert "No module named" in error.error_message + else: + # Wait for the error from when the __init__ tries to run. + assert ("failed to be imported, and so cannot execute this method" + in error.error_message) # Check that if we try to get the function it throws an exception and # does not hang. @@ -232,10 +237,11 @@ def temporary_helper_function(): ray.get(foo.get_val.remote(1, arg2=2)) # Wait for the error from when the call to get_val. - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2) - errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) + errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.TASK_PUSH_ERROR assert ("failed to be imported, and so cannot execute this method" in - errors[1]["message"]) + errors[0].error_message) f.close() @@ -243,7 +249,8 @@ def temporary_helper_function(): sys.path.pop(-1) -def test_failed_actor_init(ray_start_regular): +def test_failed_actor_init(ray_start_regular, error_pubsub): + p = error_pubsub error_message1 = "actor constructor failed" error_message2 = "actor method failed" @@ -258,20 +265,21 @@ def test_failed_actor_init(ray_start_regular): a = FailedActor.remote() # Make sure that we get errors from a failed constructor. - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) - errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) + errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) assert len(errors) == 1 - assert error_message1 in errors[0]["message"] + assert errors[0].type == ray_constants.TASK_PUSH_ERROR + assert error_message1 in errors[0].error_message # Make sure that we get errors from a failed method. a.fail_method.remote() - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2) - errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 2 - assert error_message1 in errors[1]["message"] + errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.TASK_PUSH_ERROR + assert error_message1 in errors[0].error_message -def test_failed_actor_method(ray_start_regular): +def test_failed_actor_method(ray_start_regular, error_pubsub): + p = error_pubsub error_message2 = "actor method failed" @ray.remote @@ -286,10 +294,10 @@ def test_failed_actor_method(ray_start_regular): # Make sure that we get errors from a failed method. a.fail_method.remote() - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) - errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) + errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) assert len(errors) == 1 - assert error_message2 in errors[0]["message"] + assert errors[0].type == ray_constants.TASK_PUSH_ERROR + assert error_message2 in errors[0].error_message def test_incorrect_method_calls(ray_start_regular): @@ -328,7 +336,9 @@ def test_incorrect_method_calls(ray_start_regular): a.nonexistent_method.remote() -def test_worker_raising_exception(ray_start_regular): +def test_worker_raising_exception(ray_start_regular, error_pubsub): + p = error_pubsub + @ray.remote(max_calls=2) def f(): # This is the only reasonable variable we can set here that makes the @@ -339,12 +349,15 @@ def test_worker_raising_exception(ray_start_regular): # Running this task should cause the worker to raise an exception after # the task has successfully completed. f.remote() - - wait_for_errors(ray_constants.WORKER_CRASH_PUSH_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.WORKER_CRASH_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.WORKER_CRASH_PUSH_ERROR -def test_worker_dying(ray_start_regular): +def test_worker_dying(ray_start_regular, error_pubsub): + p = error_pubsub # Define a remote function that will kill the worker that runs it. + @ray.remote(max_retries=0) def f(): eval("exit()") @@ -352,14 +365,15 @@ def test_worker_dying(ray_start_regular): with pytest.raises(ray.exceptions.RayWorkerError): ray.get(f.remote()) - wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) - - errors = relevant_errors(ray_constants.WORKER_DIED_PUSH_ERROR) + errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR) assert len(errors) == 1 - assert "died or was killed while executing" in errors[0]["message"] + assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR + assert "died or was killed while executing" in errors[0].error_message -def test_actor_worker_dying(ray_start_regular): +def test_actor_worker_dying(ray_start_regular, error_pubsub): + p = error_pubsub + @ray.remote class Actor: def kill(self): @@ -375,10 +389,14 @@ def test_actor_worker_dying(ray_start_regular): ray.get(obj) with pytest.raises(ray.exceptions.RayTaskError): ray.get(consume.remote(obj)) - wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR -def test_actor_worker_dying_future_tasks(ray_start_regular): +def test_actor_worker_dying_future_tasks(ray_start_regular, error_pubsub): + p = error_pubsub + @ray.remote(max_restarts=0) class Actor: def getpid(self): @@ -397,7 +415,9 @@ def test_actor_worker_dying_future_tasks(ray_start_regular): with pytest.raises(Exception): ray.get(obj) - wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR def test_actor_worker_dying_nothing_in_progress(ray_start_regular): @@ -415,7 +435,10 @@ def test_actor_worker_dying_nothing_in_progress(ray_start_regular): ray.get(task2) -def test_actor_scope_or_intentionally_killed_message(ray_start_regular): +def test_actor_scope_or_intentionally_killed_message(ray_start_regular, + error_pubsub): + p = error_pubsub + @ray.remote class Actor: pass @@ -424,15 +447,16 @@ def test_actor_scope_or_intentionally_killed_message(ray_start_regular): a = Actor.remote() a.__ray_terminate__.remote() time.sleep(1) - assert len( - ray.errors()) == 0, ("Should not have propogated an error - {}".format( - ray.errors())) + errors = get_error_message(p, 1) + assert len(errors) == 0, "Should not have propogated an error - {}".format( + errors) @pytest.mark.skip("This test does not work yet.") @pytest.mark.parametrize( "ray_start_object_store_memory", [10**6], indirect=True) -def test_put_error1(ray_start_object_store_memory): +def test_put_error1(ray_start_object_store_memory, error_pubsub): + p = error_pubsub num_objects = 3 object_size = 4 * 10**5 @@ -470,7 +494,10 @@ def test_put_error1(ray_start_object_store_memory): put_arg_task.remote() # Make sure we receive the correct error message. - wait_for_errors(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1) + errors = get_error_message(p, 1, + ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR @pytest.mark.skip("This test does not work yet.") @@ -514,22 +541,28 @@ def test_put_error2(ray_start_object_store_memory): put_task.remote() # Make sure we receive the correct error message. - wait_for_errors(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1) + # get_error_message(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1) -def test_version_mismatch(shutdown_only): +@pytest.mark.skip("Publish happeds before we subscribe it") +def test_version_mismatch(error_pubsub, shutdown_only): ray_version = ray.__version__ ray.__version__ = "fake ray version" ray.init(num_cpus=1) + p = error_pubsub - wait_for_errors(ray_constants.VERSION_MISMATCH_PUSH_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.VERSION_MISMATCH_PUSH_ERROR) + assert False, errors + assert len(errors) == 1 + assert errors[0].type == ray_constants.VERSION_MISMATCH_PUSH_ERROR # Reset the version. ray.__version__ = ray_version -def test_export_large_objects(ray_start_regular): +def test_export_large_objects(ray_start_regular, error_pubsub): + p = error_pubsub import ray.ray_constants as ray_constants large_object = np.zeros(2 * ray_constants.PICKLE_OBJECT_WARNING_SIZE) @@ -542,7 +575,10 @@ def test_export_large_objects(ray_start_regular): f.remote() # Make sure that a warning is generated. - wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 1) + errors = get_error_message(p, 1, + ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR @ray.remote class Foo: @@ -552,11 +588,15 @@ def test_export_large_objects(ray_start_regular): Foo.remote() # Make sure that a warning is generated. - wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2) + errors = get_error_message(p, 1, + ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR @pytest.mark.skip(reason="TODO detect resource deadlock") -def test_warning_for_resource_deadlock(shutdown_only): +def test_warning_for_resource_deadlock(error_pubsub, shutdown_only): + p = error_pubsub # Check that we get warning messages for infeasible tasks. ray.init(num_cpus=1) @@ -574,10 +614,13 @@ def test_warning_for_resource_deadlock(shutdown_only): # Run in a task to check we handle the blocked task case correctly f.remote() - wait_for_errors(ray_constants.RESOURCE_DEADLOCK_ERROR, 1, timeout=30) + errors = get_error_message(p, 1, ray_constants.RESOURCE_DEADLOCK_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.RESOURCE_DEADLOCK_ERROR -def test_warning_for_infeasible_tasks(ray_start_regular): +def test_warning_for_infeasible_tasks(ray_start_regular, error_pubsub): + p = error_pubsub # Check that we get warning messages for infeasible tasks. @ray.remote(num_gpus=1) @@ -590,11 +633,15 @@ def test_warning_for_infeasible_tasks(ray_start_regular): # This task is infeasible. f.remote() - wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR # This actor placement task is infeasible. Foo.remote() - wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 2) + errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR def test_warning_for_infeasible_zero_cpu_actor(shutdown_only): @@ -603,6 +650,7 @@ def test_warning_for_infeasible_zero_cpu_actor(shutdown_only): # requires no CPUs). ray.init(num_cpus=0) + p = init_error_pubsub() @ray.remote class Foo: @@ -610,7 +658,10 @@ def test_warning_for_infeasible_zero_cpu_actor(shutdown_only): # The actor creation should be infeasible. Foo.remote() - wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR + p.close() def test_warning_for_too_many_actors(shutdown_only): @@ -619,15 +670,23 @@ def test_warning_for_too_many_actors(shutdown_only): num_cpus = 2 ray.init(num_cpus=num_cpus) + p = init_error_pubsub() + @ray.remote class Foo: def __init__(self): time.sleep(1000) [Foo.remote() for _ in range(num_cpus * 3)] - wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR + [Foo.remote() for _ in range(num_cpus)] - wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 2) + errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR + p.close() def test_warning_for_too_many_nested_tasks(shutdown_only): @@ -635,6 +694,7 @@ def test_warning_for_too_many_nested_tasks(shutdown_only): # started that we will receive a warning. num_cpus = 2 ray.init(num_cpus=num_cpus) + p = init_error_pubsub() @ray.remote def f(): @@ -654,7 +714,10 @@ def test_warning_for_too_many_nested_tasks(shutdown_only): ray.get(h.remote()) [g.remote() for _ in range(num_cpus * 4)] - wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR) + assert len(errors) == 1 + assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR + p.close() def test_warning_for_many_duplicate_remote_functions_and_actors(shutdown_only): @@ -786,9 +849,10 @@ def test_redis_module_failure(ray_start_regular): # Note that this test will take at least 10 seconds because it must wait for # the monitor to detect enough missed heartbeats. -def test_warning_for_dead_node(ray_start_cluster_2_nodes): +def test_warning_for_dead_node(ray_start_cluster_2_nodes, error_pubsub): cluster = ray_start_cluster_2_nodes cluster.wait_for_nodes() + p = error_pubsub node_ids = {item["NodeID"] for item in ray.nodes()} @@ -801,14 +865,11 @@ def test_warning_for_dead_node(ray_start_cluster_2_nodes): cluster.list_all_nodes()[0].kill_raylet() # Check that we get warning messages for both raylets. - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=40) + errors = get_error_message(p, 2, ray_constants.REMOVED_NODE_ERROR, 40) # Extract the client IDs from the error messages. This will need to be # changed if the error message changes. - warning_node_ids = { - item["message"].split(" ")[5] - for item in relevant_errors(ray_constants.REMOVED_NODE_ERROR) - } + warning_node_ids = {error.error_message.split(" ")[5] for error in errors} assert node_ids == warning_node_ids @@ -837,24 +898,28 @@ def test_connect_with_disconnected_node(shutdown_only): cluster = Cluster() cluster.add_node(num_cpus=0, _internal_config=config) ray.init(address=cluster.address) - info = relevant_errors(ray_constants.REMOVED_NODE_ERROR) - assert len(info) == 0 + p = init_error_pubsub() + errors = get_error_message(p, 1, timeout=5) + assert len(errors) == 0 # This node is killed by SIGKILL, ray_monitor will mark it to dead. dead_node = cluster.add_node(num_cpus=0) cluster.remove_node(dead_node, allow_graceful=False) - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1) + errors = get_error_message(p, 1, ray_constants.REMOVED_NODE_ERROR) + assert len(errors) == 1 # This node is killed by SIGKILL, ray_monitor will mark it to dead. dead_node = cluster.add_node(num_cpus=0) cluster.remove_node(dead_node, allow_graceful=False) - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2) + errors = get_error_message(p, 1, ray_constants.REMOVED_NODE_ERROR) + assert len(errors) == 1 # This node is killed by SIGTERM, ray_monitor will not mark it again. removing_node = cluster.add_node(num_cpus=0) cluster.remove_node(removing_node, allow_graceful=True) - with pytest.raises(RayTestTimeoutException): - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 3, timeout=2) + errors = get_error_message(p, 1, timeout=2) + assert len(errors) == 0 # There is no connection error to a dead node. - info = relevant_errors(ray_constants.RAYLET_CONNECTION_ERROR) - assert len(info) == 0 + errors = get_error_message(p, 1, timeout=2) + assert len(errors) == 0 + p.close() @pytest.mark.parametrize( diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index f8fa26279..99f894865 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -8,16 +8,19 @@ import ray from ray.test_utils import ( RayTestTimeoutException, check_call_ray, run_string_as_driver, run_string_as_driver_nonblocking, wait_for_children_of_pid, - wait_for_children_of_pid_to_exit, kill_process_by_name, Semaphore) + wait_for_children_of_pid_to_exit, kill_process_by_name, Semaphore, + init_error_pubsub, get_error_message) def test_error_isolation(call_ray_start): address = call_ray_start # Connect a driver to the Ray cluster. ray.init(address=address) + p = init_error_pubsub() # There shouldn't be any errors yet. - assert len(ray.errors()) == 0 + errors = get_error_message(p, 1, 2) + assert len(errors) == 0 error_string1 = "error_string1" error_string2 = "error_string2" @@ -31,13 +34,11 @@ def test_error_isolation(call_ray_start): ray.get(f.remote()) # Wait for the error to appear in Redis. - while len(ray.errors()) != 1: - time.sleep(0.1) - print("Waiting for error to appear.") + errors = get_error_message(p, 1) # Make sure we got the error. - assert len(ray.errors()) == 1 - assert error_string1 in ray.errors()[0]["message"] + assert len(errors) == 1 + assert error_string1 in errors[0].error_message # Start another driver and make sure that it does not receive this # error. Make the other driver throw an error, and make sure it @@ -45,11 +46,13 @@ def test_error_isolation(call_ray_start): driver_script = """ import ray import time +from ray.test_utils import (init_error_pubsub, get_error_message) ray.init(address="{}") - +p = init_error_pubsub() time.sleep(1) -assert len(ray.errors()) == 0 +errors = get_error_message(p, 1, 2) +assert len(errors) == 0 @ray.remote def f(): @@ -60,12 +63,10 @@ try: except Exception as e: pass -while len(ray.errors()) != 1: - print(len(ray.errors())) - time.sleep(0.1) -assert len(ray.errors()) == 1 +errors = get_error_message(p, 1) +assert len(errors) == 1 -assert "{}" in ray.errors()[0]["message"] +assert "{}" in errors[0].error_message print("success") """.format(address, error_string2, error_string2) @@ -76,8 +77,9 @@ print("success") # Make sure that the other error message doesn't show up for this # driver. - assert len(ray.errors()) == 1 - assert error_string1 in ray.errors()[0]["message"] + errors = get_error_message(p, 1) + assert len(errors) == 1 + p.close() def test_remote_function_isolation(call_ray_start): diff --git a/python/ray/tests/test_stress_failure.py b/python/ray/tests/test_stress_failure.py index 2b792004d..8dee9020e 100644 --- a/python/ray/tests/test_stress_failure.py +++ b/python/ray/tests/test_stress_failure.py @@ -7,8 +7,8 @@ import time import ray from ray.cluster_utils import Cluster -from ray.test_utils import flat_errors import ray.ray_constants as ray_constants +from ray.test_utils import get_error_message @pytest.fixture(params=[1, 4]) @@ -203,12 +203,12 @@ def test_multiple_recursive(ray_start_reconstruction): assert cluster.remaining_processes_alive() -def wait_for_errors(error_check): +def wait_for_errors(p, error_check): # Wait for errors from all the nondeterministic tasks. errors = [] time_left = 100 while time_left > 0: - errors = flat_errors() + errors.extend(get_error_message(p, 1)) if error_check(errors): break time_left -= 1 @@ -223,7 +223,8 @@ def wait_for_errors(error_check): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") -def test_nondeterministic_task(ray_start_reconstruction): +def test_nondeterministic_task(ray_start_reconstruction, error_pubsub): + p = error_pubsub plasma_store_memory, num_nodes, cluster = ray_start_reconstruction # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' @@ -280,9 +281,9 @@ def test_nondeterministic_task(ray_start_reconstruction): min_errors = 1 return len(errors) >= min_errors - errors = wait_for_errors(error_check) + errors = wait_for_errors(p, error_check) # Make sure all the errors have the correct type. - assert all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR + assert all(error.type == ray_constants.HASH_MISMATCH_PUSH_ERROR for error in errors) assert cluster.remaining_processes_alive() @@ -293,7 +294,8 @@ def test_nondeterministic_task(ray_start_reconstruction): reason="Failing with new GCS API on Linux.") @pytest.mark.parametrize( "ray_start_object_store_memory", [10**9], indirect=True) -def test_driver_put_errors(ray_start_object_store_memory): +def test_driver_put_errors(ray_start_object_store_memory, error_pubsub): + p = error_pubsub plasma_store_memory = ray_start_object_store_memory # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' @@ -333,10 +335,11 @@ def test_driver_put_errors(ray_start_object_store_memory): def error_check(errors): return len(errors) > 1 - errors = wait_for_errors(error_check) - assert all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR - or "ray.exceptions.UnreconstructableError" in error["message"] - for error in errors) + errors = wait_for_errors(p, error_check) + assert all( + error.type == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR + or "ray.exceptions.UnreconstructableError" in error.error_messages + for error in errors) # NOTE(swang): This test tries to launch 1000 workers and breaks. diff --git a/python/ray/utils.py b/python/ray/utils.py index 985dc7ab4..1ce21ba5c 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -117,10 +117,11 @@ def push_error_to_driver_through_redis(redis_client, # of through the raylet. error_data = ray.gcs_utils.construct_error_message(job_id, error_type, message, time.time()) - redis_client.execute_command( - "RAY.TABLE_APPEND", ray.gcs_utils.TablePrefix.Value("ERROR_INFO"), - ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"), job_id.binary(), - error_data) + pubsub_msg = ray.gcs_utils.PubSubMessage() + pubsub_msg.id = job_id.hex() + pubsub_msg.data = error_data + redis_client.publish("ERROR_INFO:" + job_id.hex(), + pubsub_msg.SerializeAsString()) def is_cython(obj): diff --git a/python/ray/worker.py b/python/ray/worker.py index d2151fcbc..9cf7df3ec 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1095,16 +1095,11 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped): # Really we should just subscribe to the errors for this specific job. # However, currently all errors seem to be published on the same channel. - error_pubsub_channel = str( - ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB")).encode("ascii") - worker.error_message_pubsub_client.subscribe(error_pubsub_channel) - # worker.error_message_pubsub_client.psubscribe("*") + error_pubsub_channel = ray.gcs_utils.RAY_ERROR_PUBSUB_PATTERN + worker.error_message_pubsub_client.psubscribe(error_pubsub_channel) try: # Get the errors that occurred before the call to subscribe. - error_messages = ray.errors() - for error_message in error_messages: - logger.error(error_message) while True: # Exit if we received a signal that we should stop. @@ -1115,10 +1110,9 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped): if msg is None: threads_stopped.wait(timeout=0.01) continue - gcs_entry = ray.gcs_utils.GcsEntry.FromString(msg["data"]) - assert len(gcs_entry.entries) == 1 + pubsub_msg = ray.gcs_utils.PubSubMessage.FromString(msg["data"]) error_data = ray.gcs_utils.ErrorTableData.FromString( - gcs_entry.entries[0]) + pubsub_msg.data) job_id = error_data.job_id if job_id not in [ worker.current_job_id.binary(), diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index ea2b1d810..e3a2ed637 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1328,21 +1328,12 @@ ServiceBasedErrorInfoAccessor::ServiceBasedErrorInfoAccessor( Status ServiceBasedErrorInfoAccessor::AsyncReportJobError( const std::shared_ptr &data_ptr, const StatusCallback &callback) { - JobID job_id = JobID::FromBinary(data_ptr->job_id()); - std::string type = data_ptr->type(); - RAY_LOG(DEBUG) << "Reporting job error, job id = " << job_id << ", type = " << type; - rpc::ReportJobErrorRequest request; - request.mutable_error_data()->CopyFrom(*data_ptr); - client_impl_->GetGcsRpcClient().ReportJobError( - request, [job_id, type, callback](const Status &status, - const rpc::ReportJobErrorReply &reply) { - if (callback) { - callback(status); - } - RAY_LOG(DEBUG) << "Finished reporting job error, status = " << status - << ", job id = " << job_id << ", type = " << type; - }); - return Status::OK(); + auto job_id = JobID::FromBinary(data_ptr->job_id()); + RAY_LOG(DEBUG) << "Publishing job error, job id = " << job_id; + Status status = client_impl_->GetGcsPubSub().Publish( + ERROR_INFO_CHANNEL, job_id.Hex(), data_ptr->SerializeAsString(), callback); + RAY_LOG(DEBUG) << "Finished publishing job error, job id = " << job_id; + return status; } ServiceBasedWorkerInfoAccessor::ServiceBasedWorkerInfoAccessor( diff --git a/src/ray/gcs/gcs_server/error_info_handler_impl.cc b/src/ray/gcs/gcs_server/error_info_handler_impl.cc deleted file mode 100644 index 274970075..000000000 --- a/src/ray/gcs/gcs_server/error_info_handler_impl.cc +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/gcs/gcs_server/error_info_handler_impl.h" - -namespace ray { -namespace rpc { - -void DefaultErrorInfoHandler::HandleReportJobError( - const ReportJobErrorRequest &request, ReportJobErrorReply *reply, - SendReplyCallback send_reply_callback) { - JobID job_id = JobID::FromBinary(request.error_data().job_id()); - std::string type = request.error_data().type(); - RAY_LOG(DEBUG) << "Reporting job error, job id = " << job_id << ", type = " << type; - auto error_table_data = std::make_shared(); - error_table_data->CopyFrom(request.error_data()); - auto on_done = [job_id, type, reply, send_reply_callback](Status status) { - if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to report job error, job id = " << job_id - << ", type = " << type; - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - - Status status = gcs_client_.Errors().AsyncReportJobError(error_table_data, on_done); - if (!status.ok()) { - on_done(status); - } - RAY_LOG(DEBUG) << "Finished reporting job error, job id = " << job_id - << ", type = " << type; -} - -} // namespace rpc -} // namespace ray diff --git a/src/ray/gcs/gcs_server/error_info_handler_impl.h b/src/ray/gcs/gcs_server/error_info_handler_impl.h deleted file mode 100644 index 5830dc797..000000000 --- a/src/ray/gcs/gcs_server/error_info_handler_impl.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "ray/gcs/redis_gcs_client.h" -#include "ray/rpc/gcs_server/gcs_rpc_server.h" - -namespace ray { -namespace rpc { - -/// This implementation class of `ErrorInfoHandler`. -class DefaultErrorInfoHandler : public rpc::ErrorInfoHandler { - public: - explicit DefaultErrorInfoHandler(gcs::RedisGcsClient &gcs_client) - : gcs_client_(gcs_client) {} - - void HandleReportJobError(const ReportJobErrorRequest &request, - ReportJobErrorReply *reply, - SendReplyCallback send_reply_callback) override; - - private: - gcs::RedisGcsClient &gcs_client_; -}; - -} // namespace rpc -} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index c6652cb1c..77d12595c 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -140,11 +140,9 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() { ////////////////////////////////////////////////////////////////////////////////////////// GcsNodeManager::GcsNodeManager(boost::asio::io_service &main_io_service, boost::asio::io_service &node_failure_detector_io_service, - gcs::ErrorInfoAccessor &error_info_accessor, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage) - : error_info_accessor_(error_info_accessor), - main_io_service_(main_io_service), + : main_io_service_(main_io_service), node_failure_detector_(new NodeFailureDetector( node_failure_detector_io_service, gcs_table_storage, gcs_pub_sub, [this](const ClientID &node_id) { @@ -406,7 +404,8 @@ std::shared_ptr GcsNodeManager::RemoveNode( << " has missed too many heartbeats from it."; auto error_data_ptr = gcs::CreateErrorTableData(type, error_message.str(), current_time_ms()); - RAY_CHECK_OK(error_info_accessor_.AsyncReportJobError(error_data_ptr, nullptr)); + RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, node_id.Hex(), + error_data_ptr->SerializeAsString(), nullptr)); } // Notify all listeners. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 615952f5a..6a4d96518 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -36,13 +36,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// /// \param main_io_service The main event loop. /// \param node_failure_detector_io_service The event loop of node failure detector. - /// \param error_info_accessor The error info accessor, which is used to report error. /// \param gcs_pub_sub GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. /// when detecting the death of nodes. explicit GcsNodeManager(boost::asio::io_service &main_io_service, boost::asio::io_service &node_failure_detector_io_service, - gcs::ErrorInfoAccessor &error_info_accessor, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage); @@ -219,8 +217,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { }; private: - /// Error info accessor. - gcs::ErrorInfoAccessor &error_info_accessor_; /// The main event loop for node failure detector. boost::asio::io_service &main_io_service_; /// Detector to detect the failure of node. diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 2d9c698ba..6617a2b48 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -16,7 +16,6 @@ #include "ray/common/network_util.h" #include "ray/common/ray_config.h" -#include "ray/gcs/gcs_server/error_info_handler_impl.h" #include "ray/gcs/gcs_server/gcs_actor_manager.h" #include "ray/gcs/gcs_server/gcs_job_manager.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" @@ -95,11 +94,6 @@ void GcsServer::Start() { stats_service_.reset(new rpc::StatsGrpcService(main_service_, *stats_handler_)); rpc_server_.RegisterService(*stats_service_); - error_info_handler_ = InitErrorInfoHandler(); - error_info_service_.reset( - new rpc::ErrorInfoGrpcService(main_service_, *error_info_handler_)); - rpc_server_.RegisterService(*error_info_service_); - gcs_worker_manager_ = InitGcsWorkerManager(); worker_info_service_.reset( new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_)); @@ -166,8 +160,7 @@ void GcsServer::InitGcsNodeManager() { node_manager_io_service_.run(); })); gcs_node_manager_ = std::make_shared( - main_service_, node_manager_io_service_, redis_gcs_client_->Errors(), gcs_pub_sub_, - gcs_table_storage_); + main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_); } void GcsServer::InitGcsActorManager() { @@ -279,11 +272,6 @@ std::unique_ptr GcsServer::InitStatsHandler() { new rpc::DefaultStatsHandler(gcs_table_storage_)); } -std::unique_ptr GcsServer::InitErrorInfoHandler() { - return std::unique_ptr( - new rpc::DefaultErrorInfoHandler(*redis_gcs_client_)); -} - std::unique_ptr GcsServer::InitGcsWorkerManager() { return std::unique_ptr( new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_)); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 4186d26e1..ffeb8a6e4 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -98,9 +98,6 @@ class GcsServer { /// The stats handler virtual std::unique_ptr InitStatsHandler(); - /// The error info handler - virtual std::unique_ptr InitErrorInfoHandler(); - /// The worker manager virtual std::unique_ptr InitGcsWorkerManager(); @@ -148,9 +145,6 @@ class GcsServer { /// Stats handler and service std::unique_ptr stats_handler_; std::unique_ptr stats_service_; - /// Error info handler and service - std::unique_ptr error_info_handler_; - std::unique_ptr error_info_service_; /// The gcs worker manager std::unique_ptr gcs_worker_manager_; /// Worker info service diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index cc266e334..726144bc6 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -269,14 +269,6 @@ class GcsHeartbeatBatchTable : public GcsTable { - public: - explicit GcsErrorInfoTable(std::shared_ptr &store_client) - : GcsTable(store_client) { - table_name_ = TablePrefix_Name(TablePrefix::ERROR_INFO); - } -}; - class GcsProfileTable : public GcsTable { public: explicit GcsProfileTable(std::shared_ptr &store_client) @@ -377,11 +369,6 @@ class GcsTableStorage { return *heartbeat_batch_table_; } - GcsErrorInfoTable &ErrorInfoTable() { - RAY_CHECK(error_info_table_ != nullptr); - return *error_info_table_; - } - GcsProfileTable &ProfileTable() { RAY_CHECK(profile_table_ != nullptr); return *profile_table_; @@ -413,7 +400,6 @@ class GcsTableStorage { std::unique_ptr placement_group_schedule_table_; std::unique_ptr heartbeat_table_; std::unique_ptr heartbeat_batch_table_; - std::unique_ptr error_info_table_; std::unique_ptr profile_table_; std::unique_ptr worker_table_; std::unique_ptr internal_config_table_; @@ -443,7 +429,6 @@ class RedisGcsTableStorage : public GcsTableStorage { placement_group_schedule_table_.reset( new GcsPlacementGroupScheduleTable(store_client_)); heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_)); - error_info_table_.reset(new GcsErrorInfoTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); internal_config_table_.reset(new GcsInternalConfigTable(store_client_)); @@ -472,7 +457,6 @@ class InMemoryGcsTableStorage : public GcsTableStorage { new GcsPlacementGroupScheduleTable(store_client_)); heartbeat_table_.reset(new GcsHeartbeatTable(store_client_)); heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_)); - error_info_table_.reset(new GcsErrorInfoTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); internal_config_table_.reset(new GcsInternalConfigTable(store_client_)); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 74d316905..5d8f098bc 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -28,7 +28,7 @@ class GcsActorSchedulerTest : public ::testing::Test { gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(redis_client_); gcs_node_manager_ = std::make_shared( - io_service_, io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_); + io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); store_client_ = std::make_shared(io_service_); gcs_actor_table_ = std::make_shared(store_client_); @@ -52,9 +52,6 @@ class GcsActorSchedulerTest : public ::testing::Test { boost::asio::io_service io_service_; std::shared_ptr store_client_; std::shared_ptr gcs_actor_table_; - - GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_; - std::shared_ptr raylet_client_; std::shared_ptr worker_client_; std::shared_ptr gcs_node_manager_; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index b193d7d61..6a493f743 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -20,16 +20,21 @@ namespace ray { class GcsNodeManagerTest : public ::testing::Test { + public: + GcsNodeManagerTest() { + gcs_pub_sub_ = std::make_shared(redis_client_); + } + protected: - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_pub_sub_; + std::shared_ptr redis_client_; std::shared_ptr gcs_table_storage_; }; TEST_F(GcsNodeManagerTest, TestManagement) { boost::asio::io_service io_service; - auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor(); - gcs::GcsNodeManager node_manager(io_service, io_service, error_info_accessor, - gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_, + gcs_table_storage_); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = ClientID::FromBinary(node->node_id()); @@ -43,9 +48,8 @@ TEST_F(GcsNodeManagerTest, TestManagement) { TEST_F(GcsNodeManagerTest, TestListener) { boost::asio::io_service io_service; - auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor(); - gcs::GcsNodeManager node_manager(io_service, io_service, error_info_accessor, - gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_, + gcs_table_storage_); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc index d9179c4fb..9fc2bd321 100644 --- a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc @@ -55,7 +55,7 @@ class GcsObjectManagerTest : public ::testing::Test { void SetUp() override { gcs_table_storage_ = std::make_shared(io_service_); gcs_node_manager_ = std::make_shared( - io_service_, io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_); + io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); gcs_object_manager_ = std::make_shared( gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_); GenTestData(); @@ -83,7 +83,6 @@ class GcsObjectManagerTest : public ::testing::Test { protected: boost::asio::io_service io_service_; - GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_client_; std::shared_ptr gcs_pub_sub_; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index dc72070a5..e90dea612 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -27,7 +27,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { gcs_table_storage_ = std::make_shared(redis_client_); gcs_pub_sub_ = std::make_shared(redis_client_); gcs_node_manager_ = std::make_shared( - io_service_, io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_); + io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); gcs_table_storage_ = std::make_shared(io_service_); store_client_ = std::make_shared(io_service_); gcs_placement_group_scheduler_ = @@ -39,7 +39,6 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { protected: boost::asio::io_service io_service_; - GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_; std::shared_ptr store_client_; std::shared_ptr raylet_client_; diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 4c1e2a9d4..25a46fea8 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -375,16 +375,6 @@ class GcsServerTest : public ::testing::Test { return WaitReady(promise.get_future(), timeout_ms_); } - bool ReportJobError(const rpc::ReportJobErrorRequest &request) { - std::promise promise; - client_->ReportJobError( - request, [&promise](const Status &status, const rpc::ReportJobErrorReply &reply) { - RAY_CHECK_OK(status); - promise.set_value(true); - }); - return WaitReady(promise.get_future(), timeout_ms_); - } - bool ReportWorkerFailure(const rpc::ReportWorkerFailureRequest &request) { std::promise promise; client_->ReportWorkerFailure( @@ -755,16 +745,6 @@ TEST_F(GcsServerTest, TestStats) { ASSERT_TRUE(AddProfileData(add_profile_data_request)); } -TEST_F(GcsServerTest, TestErrorInfo) { - // Report error - rpc::ReportJobErrorRequest report_error_request; - rpc::ErrorTableData error_table_data; - JobID job_id = JobID::FromInt(1); - error_table_data.set_job_id(job_id.Binary()); - report_error_request.mutable_error_data()->CopyFrom(error_table_data); - ASSERT_TRUE(ReportJobError(report_error_request)); -} - TEST_F(GcsServerTest, TestWorkerInfo) { // Report worker failure auto worker_failure_data = Mocker::GenWorkerTableData(); diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index effb7a455..c24f5b6bc 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -369,17 +369,6 @@ struct GcsServerMocker { void AsyncResubscribe(bool is_pubsub_server_restarted) override {} }; - class MockedErrorInfoAccessor : public gcs::ErrorInfoAccessor { - public: - Status AsyncReportJobError(const std::shared_ptr &data_ptr, - const gcs::StatusCallback &callback) override { - if (callback) { - callback(Status::OK()); - } - return Status::OK(); - } - }; - class MockGcsPubSub : public gcs::GcsPubSub { public: MockGcsPubSub(std::shared_ptr redis_client) diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 3338eb224..4ac1a6256 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -34,6 +34,7 @@ namespace gcs { #define TASK_LEASE_CHANNEL "TASK_LEASE" #define HEARTBEAT_CHANNEL "HEARTBEAT" #define HEARTBEAT_BATCH_CHANNEL "HEARTBEAT_BATCH" +#define ERROR_INFO_CHANNEL "ERROR_INFO" /// \class GcsPubSub /// diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index ac10f87c1..53a40247f 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -763,20 +763,11 @@ Status RedisNodeInfoAccessor::AsyncSubscribeToResources( return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done); } -RedisErrorInfoAccessor::RedisErrorInfoAccessor(RedisGcsClient *client_impl) - : client_impl_(client_impl) {} +RedisErrorInfoAccessor::RedisErrorInfoAccessor(RedisGcsClient *client_impl) {} Status RedisErrorInfoAccessor::AsyncReportJobError( const std::shared_ptr &data_ptr, const StatusCallback &callback) { - ErrorTable::WriteCallback on_done = nullptr; - if (callback != nullptr) { - on_done = [callback](RedisGcsClient *client, const JobID &job_id, - const ErrorTableData &data) { callback(Status::OK()); }; - } - - JobID job_id = JobID::FromBinary(data_ptr->job_id()); - ErrorTable &error_table = client_impl_->error_table(); - return error_table.Append(job_id, job_id, data_ptr, on_done); + return Status::Invalid("Not implemented"); } RedisStatsInfoAccessor::RedisStatsInfoAccessor(RedisGcsClient *client_impl) diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index 4869df8e9..04c5b2b32 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -414,9 +414,6 @@ class RedisErrorInfoAccessor : public ErrorInfoAccessor { Status AsyncReportJobError(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; - - private: - RedisGcsClient *client_impl_{nullptr}; }; /// \class RedisStatsInfoAccessor diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index b9a6d1d7f..8af7ca40e 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -54,7 +54,6 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { // For raylet, NodeID should be initialized in raylet layer(not here). client_table_.reset(new ClientTable({primary_context}, this)); - error_table_.reset(new ErrorTable({primary_context}, this)); job_table_.reset(new JobTable({primary_context}, this)); heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context}, this)); // Tables below would be sharded. @@ -107,7 +106,6 @@ std::string RedisGcsClient::DebugString() const { result << "\n- TaskReconstructionLog: " << task_reconstruction_log_->DebugString(); result << "\n- TaskLeaseTable: " << task_lease_table_->DebugString(); result << "\n- HeartbeatTable: " << heartbeat_table_->DebugString(); - result << "\n- ErrorTable: " << error_table_->DebugString(); result << "\n- ProfileTable: " << profile_table_->DebugString(); result << "\n- ClientTable: " << client_table_->DebugString(); result << "\n- JobTable: " << job_table_->DebugString(); @@ -140,8 +138,6 @@ HeartbeatBatchTable &RedisGcsClient::heartbeat_batch_table() { return *heartbeat_batch_table_; } -ErrorTable &RedisGcsClient::error_table() { return *error_table_; } - JobTable &RedisGcsClient::job_table() { return *job_table_; } ProfileTable &RedisGcsClient::profile_table() { return *profile_table_; } diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 7b01c0ba1..b266f0c34 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -101,9 +101,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { virtual raylet::TaskTable &raylet_task_table(); TaskLeaseTable &task_lease_table(); TaskReconstructionLog &task_reconstruction_log(); - /// Implements the Errors() interface. - // TODO: Some API for getting the error on the driver - ErrorTable &error_table(); /// Implements the Stats() interface. ProfileTable &profile_table(); /// Implements the Workers() interface. @@ -124,7 +121,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { std::unique_ptr task_lease_table_; std::unique_ptr heartbeat_table_; std::unique_ptr heartbeat_batch_table_; - std::unique_ptr error_table_; std::unique_ptr profile_table_; std::unique_ptr client_table_; std::unique_ptr actor_checkpoint_table_; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 5f295a996..6e9283b78 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -532,10 +532,6 @@ Status Hash::Subscribe(const JobID &job_id, const ClientID &client_id, return Status::OK(); } -std::string ErrorTable::DebugString() const { - return Log::DebugString(); -} - std::string ProfileTable::DebugString() const { return Log::DebugString(); } @@ -869,7 +865,6 @@ template class Log; template class Table; template class Table; template class Table; -template class Log; template class Log; template class Log; template class Log; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index b44653d2d..6faea1e5e 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -860,21 +860,6 @@ class TaskTable : public Table { } // namespace raylet -class ErrorTable : public Log { - public: - ErrorTable(const std::vector> &contexts, - RedisGcsClient *client) - : Log(contexts, client) { - pubsub_channel_ = TablePubsub::ERROR_INFO_PUBSUB; - prefix_ = TablePrefix::ERROR_INFO; - }; - - /// Returns debug string for class. - /// - /// \return string. - std::string DebugString() const; -}; - class ProfileTable : public Log { public: ProfileTable(const std::vector> &contexts, diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index af090c7bc..2f668f32b 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -33,20 +33,19 @@ enum TablePrefix { TASK_RECONSTRUCTION = 8; HEARTBEAT = 9; HEARTBEAT_BATCH = 10; - ERROR_INFO = 11; - JOB = 12; - PROFILE = 13; - TASK_LEASE = 14; - ACTOR_CHECKPOINT = 15; - ACTOR_CHECKPOINT_ID = 16; - NODE_RESOURCE = 17; - DIRECT_ACTOR = 18; + JOB = 11; + PROFILE = 12; + TASK_LEASE = 13; + ACTOR_CHECKPOINT = 14; + ACTOR_CHECKPOINT_ID = 15; + NODE_RESOURCE = 16; + DIRECT_ACTOR = 17; // WORKER is already used in WorkerType, so use WORKERS here. - WORKERS = 19; - INTERNAL_CONFIG = 20; - TABLE_PREFIX_MAX = 21; - PLACEMENT_GROUP_SCHEDULE = 22; - PLACEMENT_GROUP = 23; + WORKERS = 18; + INTERNAL_CONFIG = 29; + TABLE_PREFIX_MAX = 20; + PLACEMENT_GROUP_SCHEDULE = 21; + PLACEMENT_GROUP = 22; } // The channel that Add operations to the Table should be published on, if any. @@ -60,13 +59,12 @@ enum TablePubsub { ACTOR_PUBSUB = 6; HEARTBEAT_PUBSUB = 7; HEARTBEAT_BATCH_PUBSUB = 8; - ERROR_INFO_PUBSUB = 9; - TASK_LEASE_PUBSUB = 10; - JOB_PUBSUB = 11; - NODE_RESOURCE_PUBSUB = 12; - DIRECT_ACTOR_PUBSUB = 13; - WORKER_FAILURE_PUBSUB = 14; - TABLE_PUBSUB_MAX = 15; + TASK_LEASE_PUBSUB = 9; + JOB_PUBSUB = 10; + NODE_RESOURCE_PUBSUB = 11; + DIRECT_ACTOR_PUBSUB = 12; + WORKER_FAILURE_PUBSUB = 13; + TABLE_PUBSUB_MAX = 14; } enum GcsChangeMode { diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index b796fa743..1797126ac 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -425,20 +425,6 @@ service StatsGcsService { rpc GetAllProfileInfo(GetAllProfileInfoRequest) returns (GetAllProfileInfoReply); } -message ReportJobErrorRequest { - ErrorTableData error_data = 1; -} - -message ReportJobErrorReply { - GcsStatus status = 1; -} - -// Service for error info access. -service ErrorInfoGcsService { - // Report a job error to GCS Service. - rpc ReportJobError(ReportJobErrorRequest) returns (ReportJobErrorReply); -} - message ReportWorkerFailureRequest { WorkerTableData worker_failure = 1; } diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index aadbc5813..35bbecb73 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -103,8 +103,6 @@ class GcsRpcClient { new GrpcClient(address, port, client_call_manager)); stats_grpc_client_ = std::unique_ptr>( new GrpcClient(address, port, client_call_manager)); - error_info_grpc_client_ = std::unique_ptr>( - new GrpcClient(address, port, client_call_manager)); worker_info_grpc_client_ = std::unique_ptr>( new GrpcClient(address, port, client_call_manager)); placement_group_info_grpc_client_ = @@ -233,10 +231,6 @@ class GcsRpcClient { /// Get information of all profiles from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, GetAllProfileInfo, stats_grpc_client_, ) - /// Report a job error to GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError, - error_info_grpc_client_, ) - /// Report a worker failure to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(WorkerInfoGcsService, ReportWorkerFailure, worker_info_grpc_client_, ) @@ -267,7 +261,6 @@ class GcsRpcClient { std::unique_ptr> object_info_grpc_client_; std::unique_ptr> task_info_grpc_client_; std::unique_ptr> stats_grpc_client_; - std::unique_ptr> error_info_grpc_client_; std::unique_ptr> worker_info_grpc_client_; std::unique_ptr> placement_group_info_grpc_client_; diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 030c0d3f4..c1981b436 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -38,9 +38,6 @@ namespace rpc { #define STATS_SERVICE_RPC_HANDLER(HANDLER) RPC_SERVICE_HANDLER(StatsGcsService, HANDLER) -#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER) \ - RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER) - #define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \ RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER) @@ -402,41 +399,6 @@ class StatsGrpcService : public GrpcService { StatsGcsServiceHandler &service_handler_; }; -class ErrorInfoGcsServiceHandler { - public: - virtual ~ErrorInfoGcsServiceHandler() = default; - - virtual void HandleReportJobError(const ReportJobErrorRequest &request, - ReportJobErrorReply *reply, - SendReplyCallback send_reply_callback) = 0; -}; - -/// The `GrpcService` for `ErrorInfoGcsService`. -class ErrorInfoGrpcService : public GrpcService { - public: - /// Constructor. - /// - /// \param[in] handler The service handler that actually handle the requests. - explicit ErrorInfoGrpcService(boost::asio::io_service &io_service, - ErrorInfoGcsServiceHandler &handler) - : GrpcService(io_service), service_handler_(handler){}; - - protected: - grpc::Service &GetGrpcService() override { return service_; } - - void InitServerCallFactories( - const std::unique_ptr &cq, - std::vector> *server_call_factories) override { - ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError); - } - - private: - /// The grpc async service object. - ErrorInfoGcsService::AsyncService service_; - /// The service handler that actually handle the requests. - ErrorInfoGcsServiceHandler &service_handler_; -}; - class WorkerInfoGcsServiceHandler { public: virtual ~WorkerInfoGcsServiceHandler() = default; @@ -528,7 +490,6 @@ using NodeInfoHandler = NodeInfoGcsServiceHandler; using ObjectInfoHandler = ObjectInfoGcsServiceHandler; using TaskInfoHandler = TaskInfoGcsServiceHandler; using StatsHandler = StatsGcsServiceHandler; -using ErrorInfoHandler = ErrorInfoGcsServiceHandler; using WorkerInfoHandler = WorkerInfoGcsServiceHandler; using PlacementGroupInfoHandler = PlacementGroupInfoGcsServiceHandler;