From e56eb354eb2ea65493ba25c0f52be04a4b2033a1 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 18 Aug 2018 13:19:52 -0700 Subject: [PATCH] [tune] Remove hack to serve pin requests off thread (#2680) * nopin * fix --- python/ray/tune/function_runner.py | 2 -- python/ray/tune/util.py | 28 ---------------------------- 2 files changed, 30 deletions(-) diff --git a/python/ray/tune/function_runner.py b/python/ray/tune/function_runner.py index 6d7e38ed2..c7ccf8850 100644 --- a/python/ray/tune/function_runner.py +++ b/python/ray/tune/function_runner.py @@ -9,7 +9,6 @@ import traceback from ray.tune import TuneError from ray.tune.trainable import Trainable from ray.tune.result import TIMESTEPS_TOTAL -from ray.tune.util import _serve_get_pin_requests class StatusReporter(object): @@ -108,7 +107,6 @@ class FunctionRunner(Trainable): self._default_config["script_min_iter_time_s"])) result = self._status_reporter._get_and_clear_status() while result is None: - _serve_get_pin_requests() time.sleep(1) result = self._status_reporter._get_and_clear_status() diff --git a/python/ray/tune/util.py b/python/ray/tune/util.py index f6ae3ab28..691d25adb 100644 --- a/python/ray/tune/util.py +++ b/python/ray/tune/util.py @@ -2,15 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from six.moves import queue import base64 import numpy as np -import threading import ray _pinned_objects = [] -_fetch_requests = queue.Queue() PINNED_OBJECT_PREFIX = "ray.tune.PinnedObject:" @@ -33,36 +30,11 @@ def get_pinned_object(pinned_id): from ray.local_scheduler import ObjectID - if threading.current_thread().getName() != "MainThread": - placeholder = queue.Queue() - _fetch_requests.put((placeholder, pinned_id)) - print("Requesting main thread to fetch pinned object", pinned_id) - return placeholder.get() - return _from_pinnable( ray.get( ObjectID(base64.b64decode(pinned_id[len(PINNED_OBJECT_PREFIX):])))) -def _serve_get_pin_requests(): - """This is hack to avoid ray.get() on the function runner thread. - - The issue is that we run trainable functions on a separate thread, - which cannot access Ray API methods. So instead, that thread puts the - fetch in a queue that is periodically checked from the main thread. - """ - - assert threading.current_thread().getName() == "MainThread" - - try: - while not _fetch_requests.empty(): - (placeholder, pinned_id) = _fetch_requests.get_nowait() - print("Fetching pinned object from main thread", pinned_id) - placeholder.put(get_pinned_object(pinned_id)) - except queue.Empty: - pass - - def _to_pinnable(obj): """Converts obj to a form that can be pinned in object store memory.