From 86100bc119e589406e4fc39712908152d75daa23 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 29 Jan 2020 14:56:09 -0800 Subject: [PATCH] Revert "Support of scikit-learn with ray joblib backend (#6925)" (#6957) This reverts commit a7ecda60172f862c8baba887adfd6bd571462682. --- doc/source/joblib.rst | 69 -------- python/ray/experimental/joblib/__init__.py | 17 -- python/ray/experimental/joblib/ray_backend.py | 58 ------- python/ray/tests/BUILD | 8 - python/ray/tests/test_joblib.py | 159 ------------------ 5 files changed, 311 deletions(-) delete mode 100644 doc/source/joblib.rst delete mode 100644 python/ray/experimental/joblib/__init__.py delete mode 100644 python/ray/experimental/joblib/ray_backend.py delete mode 100644 python/ray/tests/test_joblib.py diff --git a/doc/source/joblib.rst b/doc/source/joblib.rst deleted file mode 100644 index 54b6027d0..000000000 --- a/doc/source/joblib.rst +++ /dev/null @@ -1,69 +0,0 @@ -sklearn Ray Backend API (Experimental) -======================================= - -.. warning:: - - Support for running scikit-learn on Ray is an experimental feature, - so it may be changed at any time without warning. If you encounter any - bugs/shortcomings/incompatibilities, please file an `issue on GitHub`_. - Contributions are always welcome! - -.. _`issue on GitHub`: https://github.com/ray-project/ray/issues - -Ray supports running distributed `scikit-learn`_ programs by -implementing a Ray backend for `joblib`_ using `Ray Actors `__ -instead of local processes. This makes it easy to scale existing applications -that use scikit-learn from a single node to a cluster. - -.. _`joblib`: https://joblib.readthedocs.io -.. _`scikit-learn`: https://scikit-learn.org - -Quickstart ----------- - -To get started, first `install Ray `__, then use -``from ray.experimental.joblib import register_ray`` and run ``register_ray()``. -This will register Ray as a joblib backend for scikit-learn to use. -Then run your original scikit-learn code inside -``with joblib.parallel_backend('ray')``. This will start a local Ray cluster. -See the `Run on a Cluster`_ section below for instructions to run on -a multi-node Ray cluster instead. - -.. code-block:: python - - import numpy as np - from sklearn.datasets import load_digits - from sklearn.model_selection import RandomizedSearchCV - from sklearn.svm import SVC - digits = load_digits() - param_space = { - 'C': np.logspace(-6, 6, 30), - 'gamma': np.logspace(-8, 8, 30), - 'tol': np.logspace(-4, -1, 30), - 'class_weight': [None, 'balanced'], - } - model = SVC(kernel='rbf') - search = RandomizedSearchCV(model, param_space, cv=5, n_iter=300, verbose=10) - - import joblib - from ray.experimental.joblib import register_ray - register_ray() - with joblib.parallel_backend('ray'): - search.fit(digits.data, digits.target) - -Run on a Cluster ----------------- - -This section assumes that you have a running Ray cluster. To start a Ray cluster, -please refer to the `cluster setup `__ instructions. - -To connect a scikit-learn to a running Ray cluster, you have to specify the address of the -head node by setting the ``RAY_ADDRESS`` environment variable. - -You can also start Ray manually by calling ``ray.init()`` (with any of its supported -configuration options) before calling ``with joblib.parallel_backend('ray')``. - -.. warning:: - - If you do not set the ``RAY_ADDRESS`` environment variable and do not provide - ``address`` in ``ray.init(address=
)`` then scikit-learn will run on a SINGLE node! diff --git a/python/ray/experimental/joblib/__init__.py b/python/ray/experimental/joblib/__init__.py deleted file mode 100644 index 275cae2ea..000000000 --- a/python/ray/experimental/joblib/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -from joblib.parallel import register_parallel_backend - - -def register_ray(): - """ Register Ray Backend to be called with parallel_backend("ray"). """ - try: - from ray.experimental.joblib.ray_backend import RayBackend - register_parallel_backend("ray", RayBackend) - except ImportError: - msg = ("To use the ray backend you must install ray." - "Try running 'pip install ray'." - "See https://ray.readthedocs.io/en/latest/installation.html" - "for more information.") - raise ImportError(msg) - - -__all__ = ["register_ray"] diff --git a/python/ray/experimental/joblib/ray_backend.py b/python/ray/experimental/joblib/ray_backend.py deleted file mode 100644 index 56c1d7878..000000000 --- a/python/ray/experimental/joblib/ray_backend.py +++ /dev/null @@ -1,58 +0,0 @@ -from joblib._parallel_backends import MultiprocessingBackend -from joblib.pool import PicklingPool -import logging - -from ray.experimental.multiprocessing.pool import Pool -import ray - -RAY_ADDRESS_ENV = "RAY_ADDRESS" - -logger = logging.getLogger(__name__) - - -class RayBackend(MultiprocessingBackend): - """Ray backend uses ray, a system for scalable distributed computing. - More info about Ray is available here: https://ray.readthedocs.io. - """ - - def configure(self, - n_jobs=1, - parallel=None, - prefer=None, - require=None, - **memmappingpool_args): - """Make Ray Pool the father class of PicklingPool. PicklingPool is a - father class that inherits Pool from multiprocessing.pool. The next - line is a patch, which changes the inheritance of Pool to be from - ray.experimental.multiprocessing.pool. - """ - PicklingPool.__bases__ = (Pool, ) - """Use all available resources when n_jobs == -1. Must set RAY_ADDRESS - variable in the environment or run ray.init(address=..) to run on - multiple nodes. - """ - if n_jobs == -1: - if not ray.is_initialized(): - import os - if RAY_ADDRESS_ENV in os.environ: - ray_address = os.environ[RAY_ADDRESS_ENV] - logger.info( - "Connecting to ray cluster at address='{}'".format( - ray_address)) - ray.init(address=ray_address) - else: - logger.info("Starting local ray cluster") - ray.init() - ray_cpus = int(ray.state.cluster_resources()["CPU"]) - n_jobs = ray_cpus - - eff_n_jobs = super(RayBackend, self).configure( - n_jobs, parallel, prefer, require, **memmappingpool_args) - return eff_n_jobs - - def effective_n_jobs(self, n_jobs): - eff_n_jobs = super(RayBackend, self).effective_n_jobs(n_jobs) - if n_jobs == -1: - ray_cpus = int(ray.state.cluster_resources()["CPU"]) - eff_n_jobs = ray_cpus - return eff_n_jobs diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 9a1bbde6d..94b94933c 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -259,14 +259,6 @@ py_test( deps = ["//:ray_lib"], ) -py_test( - name = "test_joblib", - size = "medium", - srcs = ["test_joblib.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - py_test( name = "test_multi_node_2", size = "medium", diff --git a/python/ray/tests/test_joblib.py b/python/ray/tests/test_joblib.py deleted file mode 100644 index a5e0d972b..000000000 --- a/python/ray/tests/test_joblib.py +++ /dev/null @@ -1,159 +0,0 @@ -import numpy as np -import joblib -from sklearn.datasets import load_digits, load_iris -from sklearn.model_selection import RandomizedSearchCV -from time import time -from sklearn.datasets import fetch_openml -from sklearn.ensemble import ExtraTreesClassifier -from sklearn.ensemble import RandomForestClassifier -from sklearn.kernel_approximation import Nystroem -from sklearn.kernel_approximation import RBFSampler -from sklearn.pipeline import make_pipeline -from sklearn.svm import LinearSVC, SVC -from sklearn.tree import DecisionTreeClassifier -from sklearn.utils import check_array -from sklearn.linear_model import LogisticRegression -from sklearn.neural_network import MLPClassifier -from sklearn.model_selection import cross_val_score - -from ray.experimental.joblib import register_ray -import ray - - -def test_register_ray(): - register_ray() - assert "ray" in joblib.parallel.BACKENDS - assert not ray.is_initialized() - - -def test_ray_backend(shutdown_only): - register_ray() - from ray.experimental.joblib.ray_backend import RayBackend - with joblib.parallel_backend("ray"): - assert type(joblib.parallel.get_active_backend()[0]) == RayBackend - - -def test_svm_single_node(shutdown_only): - digits = load_digits() - param_space = { - "C": np.logspace(-6, 6, 10), - "gamma": np.logspace(-8, 8, 10), - "tol": np.logspace(-4, -1, 3), - "class_weight": [None, "balanced"], - } - - model = SVC(kernel="rbf") - search = RandomizedSearchCV( - model, param_space, cv=3, n_iter=50, verbose=10) - register_ray() - with joblib.parallel_backend("ray"): - search.fit(digits.data, digits.target) - assert ray.is_initialized() - - -def test_svm_multiple_nodes(ray_start_cluster_2_nodes): - digits = load_digits() - param_space = { - "C": np.logspace(-6, 6, 30), - "gamma": np.logspace(-8, 8, 30), - "tol": np.logspace(-4, -1, 30), - "class_weight": [None, "balanced"], - } - - model = SVC(kernel="rbf") - search = RandomizedSearchCV( - model, param_space, cv=5, n_iter=100, verbose=10) - register_ray() - with joblib.parallel_backend("ray"): - search.fit(digits.data, digits.target) - assert ray.is_initialized() - - -"""This test only makes sure the different sklearn classifiers are supported -and do not fail. It can be improved to check for accuracy similar to -'test_cross_validation' but the classifiers need to be improved (to improve -the accuracy), which results in longer test time. -""" - - -def test_sklearn_benchmarks(ray_start_cluster_2_nodes): - ESTIMATORS = { - "CART": DecisionTreeClassifier(), - "ExtraTrees": ExtraTreesClassifier(n_estimators=10), - "RandomForest": RandomForestClassifier(), - "Nystroem-SVM": make_pipeline( - Nystroem(gamma=0.015, n_components=1000), LinearSVC(C=1)), - "SampledRBF-SVM": make_pipeline( - RBFSampler(gamma=0.015, n_components=1000), LinearSVC(C=1)), - "LogisticRegression-SAG": LogisticRegression( - solver="sag", tol=1e-1, C=1e4), - "LogisticRegression-SAGA": LogisticRegression( - solver="saga", tol=1e-1, C=1e4), - "MultilayerPerceptron": MLPClassifier( - hidden_layer_sizes=(32, 32), - max_iter=100, - alpha=1e-4, - solver="sgd", - learning_rate_init=0.2, - momentum=0.9, - verbose=1, - tol=1e-2, - random_state=1), - "MLP-adam": MLPClassifier( - hidden_layer_sizes=(32, 32), - max_iter=100, - alpha=1e-4, - solver="adam", - learning_rate_init=0.001, - verbose=1, - tol=1e-2, - random_state=1) - } - # Load dataset. - print("Loading dataset...") - data = fetch_openml("mnist_784") - X = check_array(data["data"], dtype=np.float32, order="C") - y = data["target"] - - # Normalize features. - X = X / 255 - - # Create train-test split. - print("Creating train-test split...") - n_train = 6000 - X_train = X[:n_train] - y_train = y[:n_train] - register_ray() - - train_time = {} - random_seed = 0 - # Use two workers per classifier. - num_jobs = 2 - with joblib.parallel_backend("ray"): - for name in sorted(ESTIMATORS.keys()): - print("Training %s ... " % name, end="") - estimator = ESTIMATORS[name] - estimator_params = estimator.get_params() - estimator.set_params( - **{ - p: random_seed - for p in estimator_params if p.endswith("random_state") - }) - - if "n_jobs" in estimator_params: - estimator.set_params(n_jobs=num_jobs) - time_start = time() - estimator.fit(X_train, y_train) - train_time[name] = time() - time_start - print("training", name, "took", train_time[name], "seconds") - - -def test_cross_validation(shutdown_only): - register_ray() - iris = load_iris() - clf = SVC(kernel="linear", C=1, random_state=0) - with joblib.parallel_backend("ray", n_jobs=5): - accuracy = cross_val_score(clf, iris.data, iris.target, cv=5) - assert len(accuracy) == 5 - for result in accuracy: - assert result > 0.95