diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index 4f93e189f..8f50b5b8a 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -3,5 +3,6 @@ from __future__ import division from __future__ import print_function from .tfutils import TensorFlowVariables +from .features import flush_redis_unsafe -__all__ = ["TensorFlowVariables"] +__all__ = ["TensorFlowVariables", "flush_redis_unsafe"] diff --git a/python/ray/experimental/features.py b/python/ray/experimental/features.py new file mode 100644 index 000000000..b6c3e77b3 --- /dev/null +++ b/python/ray/experimental/features.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray + + +def flush_redis_unsafe(): + """This removes some non-critical state from the primary Redis shard. + + This removes the log files as well as the event log from Redis. This can + be used to try to address out-of-memory errors caused by the accumulation + of metadata in Redis. However, it will only partially address the issue as + much of the data is in the task table (and object table), which are not + flushed. + """ + if not hasattr(ray.worker.global_worker, "redis_client"): + raise Exception("ray.experimental.flush_redis_unsafe cannot be called " + "before ray.init() has been called.") + + redis_client = ray.worker.global_worker.redis_client + + # Delete the log files from the primary Redis shard. + keys = redis_client.keys("LOGFILE:*") + if len(keys) > 0: + num_deleted = redis_client.delete(*keys) + else: + num_deleted = 0 + print("Deleted {} log files from Redis.".format(num_deleted)) + + # Delete the event log from the primary Redis shard. + keys = redis_client.keys("event_log:*") + if len(keys) > 0: + num_deleted = redis_client.delete(*keys) + else: + num_deleted = 0 + print("Deleted {} event logs from Redis.".format(num_deleted))