diff --git a/python/ray/tune/sync_client.py b/python/ray/tune/sync_client.py index b21a0b06a..c8c509aa6 100644 --- a/python/ray/tune/sync_client.py +++ b/python/ray/tune/sync_client.py @@ -135,6 +135,10 @@ class SyncClient: """Resets state.""" pass + def close(self): + """Clean up hook.""" + pass + class FunctionBasedClient(SyncClient): def __init__(self, sync_up_func, sync_down_func, delete_func=None): @@ -179,6 +183,7 @@ class CommandBasedClient(SyncClient): self.sync_down_template = sync_down_template self.delete_template = delete_template self.logfile = None + self._closed = False self.cmd_process = None def set_logdir(self, logdir): @@ -189,6 +194,16 @@ class CommandBasedClient(SyncClient): """ self.logfile = tempfile.NamedTemporaryFile( prefix="log_sync_out", dir=logdir, suffix=".log", delete=False) + self._closed = False + + def _get_logfile(self): + if self._closed: + raise RuntimeError( + "[internalerror] The client has been closed. " + "Please report this stacktrace + your cluster configuration " + "on Github!") + else: + return self.logfile def sync_up(self, source, target): return self._execute(self.sync_up_template, source, target) @@ -203,7 +218,10 @@ class CommandBasedClient(SyncClient): final_cmd = self.delete_template.format(target=quote(target)) logger.debug("Running delete: {}".format(final_cmd)) self.cmd_process = subprocess.Popen( - final_cmd, shell=True, stderr=subprocess.PIPE, stdout=self.logfile) + final_cmd, + shell=True, + stderr=subprocess.PIPE, + stdout=self._get_logfile()) return True def wait(self): @@ -223,6 +241,13 @@ class CommandBasedClient(SyncClient): logger.warning("Sync process still running but resetting anyways.") self.cmd_process = None + def close(self): + if self.logfile: + logger.debug(f"Closing the logfile: {str(self.logfile)}") + self.logfile.close() + self.logfile = None + self._closed = True + @property def is_running(self): """Returns whether a sync or delete process is running.""" @@ -240,7 +265,10 @@ class CommandBasedClient(SyncClient): source=quote(source), target=quote(target)) logger.debug("Running sync: {}".format(final_cmd)) self.cmd_process = subprocess.Popen( - final_cmd, shell=True, stderr=subprocess.PIPE, stdout=self.logfile) + final_cmd, + shell=True, + stderr=subprocess.PIPE, + stdout=self._get_logfile()) return True @staticmethod diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index 16518edbf..16d068f85 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -206,6 +206,9 @@ class Syncer: self.last_sync_down_time = float("-inf") self.sync_client.reset() + def close(self): + self.sync_client.close() + @property def _remote_path(self): return self._remote_dir @@ -445,6 +448,7 @@ class SyncerCallback(Callback): trainable_ip = ray.get(trial.runner.get_current_ip.remote()) trial_syncer.set_worker_ip(trainable_ip) trial_syncer.sync_down_if_needed() + trial_syncer.close() def on_checkpoint(self, iteration: int, trials: List["Trial"], trial: "Trial", checkpoint: Checkpoint, **info): diff --git a/python/ray/tune/utils/mock.py b/python/ray/tune/utils/mock.py index 337482fe9..1ec925519 100644 --- a/python/ray/tune/utils/mock.py +++ b/python/ray/tune/utils/mock.py @@ -2,6 +2,7 @@ import os import numpy as np import json import random +import uuid import ray.utils @@ -20,7 +21,12 @@ LOCAL_DELETE_TEMPLATE = "rm -rf {target}" def mock_storage_client(): """Mocks storage client that treats a local dir as durable storage.""" - return get_sync_client(LOCAL_SYNC_TEMPLATE, LOCAL_DELETE_TEMPLATE) + client = get_sync_client(LOCAL_SYNC_TEMPLATE, LOCAL_DELETE_TEMPLATE) + path = os.path.join(ray.utils.get_user_temp_dir(), + f"mock-client-{uuid.uuid4().hex[:4]}") + os.makedirs(path, exist_ok=True) + client.set_logdir(path) + return client class MockNodeSyncer(NodeSyncer):