diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index ffc12173a..9b02f1cf9 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -229,6 +229,8 @@ class GlobalState(object): message = self._execute_command(object_id, "RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.OBJECT, "", object_id.binary()) + if message is None: + return {} gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( message, 0) @@ -292,6 +294,8 @@ class GlobalState(object): message = self._execute_command(task_id, "RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.RAYLET_TASK, "", task_id.binary()) + if message is None: + return {} gcs_entries = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( message, 0) diff --git a/test/monitor_test.py b/test/monitor_test.py index d21a6a3f3..248aa6fb0 100644 --- a/test/monitor_test.py +++ b/test/monitor_test.py @@ -25,6 +25,9 @@ def _test_cleanup_on_driver_exit(num_redis_shards): init_cmd = [m for m in lines if m.startswith("ray.init")] assert 1 == len(init_cmd) redis_address = init_cmd[0].split("redis_address=\"")[-1][:-2] + max_attempts_before_failing = 100 + # Wait for monitor.py to start working. + time.sleep(2) def StateSummary(): obj_tbl_len = len(ray.global_state.object_table()) @@ -40,8 +43,6 @@ def _test_cleanup_on_driver_exit(num_redis_shards): if (0, 1) != summary_start[:2]: success.value = False - max_attempts_before_failing = 100 - # Two new objects. ray.get(ray.put(1111)) ray.get(ray.put(1111)) @@ -83,18 +84,17 @@ def _test_cleanup_on_driver_exit(num_redis_shards): driver.start() # Wait for client to exit. driver.join() - time.sleep(3) - # Just make sure Driver() is run and succeeded. Note(rkn), if the below - # assertion starts failing, then the issue may be that the summary - # values computed in the Driver function are being updated slowly and - # so the call to StateSummary() is getting outdated values. This could - # be fixed by looping until StateSummary() returns the desired values. + # Just make sure Driver() is run and succeeded. assert success.value # Check that objects, tasks, and functions are cleaned up. ray.init(redis_address=redis_address) - # The assertion below can fail if the monitor is too slow to clean up - # the global state. + attempts = 0 + while (0, 1) != StateSummary()[:2]: + time.sleep(0.1) + attempts += 1 + if attempts == max_attempts_before_failing: + break assert (0, 1) == StateSummary()[:2] ray.shutdown()