From 6b04664645a20b9c786faa60f840c717cc6cd764 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Fri, 29 May 2020 09:55:47 -0700 Subject: [PATCH] [Serve] Add Tutorial for Batch Inference (#8490) --- doc/source/serve/advanced.rst | 5 +- doc/source/serve/tutorials/batch.rst | 119 ++++++++++++++++++ doc/source/serve/tutorials/index.rst | 3 +- doc/source/walkthrough.rst | 6 +- python/ray/serve/BUILD | 8 ++ .../ray/serve/examples/doc/tutorial_batch.py | 110 ++++++++++++++++ 6 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 doc/source/serve/tutorials/batch.rst create mode 100644 python/ray/serve/examples/doc/tutorial_batch.py diff --git a/doc/source/serve/advanced.rst b/doc/source/serve/advanced.rst index 2081e2df6..8dba3314d 100644 --- a/doc/source/serve/advanced.rst +++ b/doc/source/serve/advanced.rst @@ -57,7 +57,7 @@ Batching to improve performance =============================== You can also have Ray Serve batch requests for performance. In order to do use this feature, you need to: -1. Set the `max_batch_size` in the `BackendConfig`. +1. Set the ``max_batch_size`` in the ``config`` dictionary. 2. Modify your backend implementation to accept a list of requests and return a list of responses instead of handling a single request. @@ -80,6 +80,9 @@ You can also have Ray Serve batch requests for performance. In order to do use t serve.create_backend("counter1", BatchingExample, config=config) serve.set_traffic("counter1", {"counter1": 1.0}) +Please take a look at :ref:`Batching Tutorial` for a deep +dive. + .. _`serve-split-traffic`: Splitting Traffic and A/B Testing diff --git a/doc/source/serve/tutorials/batch.rst b/doc/source/serve/tutorials/batch.rst new file mode 100644 index 000000000..d09e95b13 --- /dev/null +++ b/doc/source/serve/tutorials/batch.rst @@ -0,0 +1,119 @@ +.. _serve-batch-tutorial: + +Batching Tutorial +================= + +In this guide, we will deploy a simple vectorized adder that takes +a batch of queries and add them at once. In particular, we show: + +- How to implement and deploy Ray Serve model that accepts batches. +- How to configure the batch size. +- How to query the model in Python. + +This tutorial should help the following use cases: + +- You want to perform offline batch inference on a cluster of machines. +- You want to serve online queries and your model can take advantage of batching. + For example, linear regressions and neural networks use CPU and GPU's + vectorized instructions to perform computation in parallel. Performing + inference with batching can increase the *throughput* of the model as well as + *utilization* of the hardware. + + +Let's import Ray Serve and some other helpers. + +.. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py + :start-after: __doc_import_begin__ + :end-before: __doc_import_end__ + +You can use the ``@serve.accept_batch`` decorator to annotate a function or a class. +This annotation is needed because batched backends have different APIs compared +to single request backends. In a batched backend, the inputs are a list of values. + +For single query backend, the input types are single flask request or Python +argument: + +.. code-block:: python + + def single_request( + flask_request: Flask.Request, + *, + python_arg: int = 0 + ): + pass + +For batched backend, the inputs types are converted to list of their original +types: + +.. code-block:: python + + @serve.accept_batch + def batched_request( + flask_request: List[Flask.Request], + *, + python_arg: List[int] + ): + pass + +Let's define the backend function. We will take in a list of requests, extract +the input value, convert them into an array, and use NumPy to add 1 to each element. + +.. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py + :start-after: __doc_define_servable_v0_begin__ + :end-before: __doc_define_servable_v0_end__ + +Let's deploy it. Note that in the ``config`` section of ``create_backend``, we +are specifying the maximum batch size via ``config={"max_batch_size": 4}``. This +configuration option limits the maximum possible batch size send to the backend. + +.. note:: + Ray Serve performs *opportunistic batching*. When a worker is free to evaluate + the next batch, Ray Serve will look at the pending queries and take + ``max(number_of_pending_queries, max_batch_size)`` queries to form a batch. + +.. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py + :start-after: __doc_deploy_begin__ + :end-before: __doc_deploy_end__ + +Let's define a :ref:`Ray remote task` to send queries in +parallel. As you can see, the first batch has a batch size of 1, and the subsequent +queries have a batch size of 4. Even though each query is issued independently, +Ray Serve was able to evaluate them in batches. + +.. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py + :start-after: __doc_query_begin__ + :end-before: __doc_query_end__ + +What if you want to evaluate a whole batch in Python? Ray Serve allows you to send +queries via the Python API. You can use the boolean value ``serve.context.web`` to +distinguish the origin of the queries. A batch of queries can either come from +the web server or the Python API. Ray Serve will guarantee there won't be queries +with mixed origins. + +When the batch of requests comes from the web API, Ray Serve will fill the first +argument ``flask_requests`` with a list of ``Flask.Request`` objects and set +``serve.context.web = True``. When the batch of requests comes from the Python API, +Ray Serve will fill ``flask_requests`` arguments with placeholders, and directly inject +Python objects into the keyword arguments. In this case, the ``numbers`` argument +will be a list of Python integers. + +.. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py + :start-after: __doc_define_servable_v1_begin__ + :end-before: __doc_define_servable_v1_end__ + +Let's deploy the new version to the same endpoint. Don't forget to set +``max_batch_size``! + +.. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py + :start-after: __doc_deploy_v1_begin__ + :end-before: __doc_deploy_v1_end__ + +To query the backend via Python API, we can use ``serve.get_handle`` to receive +a handle to the corresponding "endpoint". To enqueue a query, you can call +``handle.remote(argument_name=argument_value)``. This call returns immediately +with a :ref:`Ray ObjectID`. You can call `ray.get` to retrieve +the result. + +.. literalinclude:: ../../../../python/ray/serve/examples/doc/tutorial_batch.py + :start-after: __doc_query_handle_begin__ + :end-before: __doc_query_handle_end__ \ No newline at end of file diff --git a/doc/source/serve/tutorials/index.rst b/doc/source/serve/tutorials/index.rst index 20d62dbee..34380019f 100644 --- a/doc/source/serve/tutorials/index.rst +++ b/doc/source/serve/tutorials/index.rst @@ -2,7 +2,7 @@ Tutorials ========= -Below are a list of tutorials that you can use to learn more about the different pieces of +Below are a list of tutorials that you can use to learn more about the different pieces of Ray Serve functionality and how to integrate different modeling frameworks. .. toctree:: @@ -13,6 +13,7 @@ Ray Serve functionality and how to integrate different modeling frameworks. tensorflow.rst pytorch.rst sklearn.rst + batch.rst Other Topics: diff --git a/doc/source/walkthrough.rst b/doc/source/walkthrough.rst index f39f3604b..bc3805252 100644 --- a/doc/source/walkthrough.rst +++ b/doc/source/walkthrough.rst @@ -34,6 +34,8 @@ Ray will then be able to utilize all cores of your machine. Find out how to conf To start a multi-node Ray cluster, see the `cluster setup page `__. +.. _ray-remote-functions: + Remote functions (Tasks) ------------------------ @@ -87,6 +89,8 @@ All computation is performed in the background, driven by Ray's internal event l See the `ray.remote package reference `__ page for specific documentation on how to use ``ray.remote``. +.. _ray-object-ids: + **Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**. For example, take this function: .. code:: python @@ -176,7 +180,7 @@ Remote functions can be canceled by calling ``ray.cancel`` on the returned Objec def blocking_operation(): time.sleep(10e6) return 100 - + obj_id = blocking_operation.remote() ray.cancel(obj_id) diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index b7ae8e7ee..f60ef2aaf 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -89,6 +89,14 @@ py_test( deps = [":serve_lib"] ) +py_test( + name = "tutorial_batch", + size = "small", + srcs = glob(["examples/doc/*.py"]), + tags = ["exclusive"], + deps = [":serve_lib"] +) + # Disable the deployment tutorial test because it requires # ray start --head in the background. # py_test( diff --git a/python/ray/serve/examples/doc/tutorial_batch.py b/python/ray/serve/examples/doc/tutorial_batch.py new file mode 100644 index 000000000..1bfaa6e64 --- /dev/null +++ b/python/ray/serve/examples/doc/tutorial_batch.py @@ -0,0 +1,110 @@ +# yapf: disable +# __doc_import_begin__ +import ray +from ray import serve + +from typing import List +import time + +import numpy as np +import requests +# __doc_import_end__ +# yapf: enable + + +# __doc_define_servable_v0_begin__ +@serve.accept_batch +def batch_adder_v0(flask_requests: List): + numbers = [int(request.args["number"]) for request in flask_requests] + + input_array = np.array(numbers) + print("Our input array has shape:", input_array.shape) + # Sleep for 200ms, this could be performing CPU intensive computation + # in real models + time.sleep(0.2) + output_array = input_array + 1 + return output_array.astype(int).tolist() + + +# __doc_define_servable_v0_end__ + +# __doc_deploy_begin__ +serve.init() +serve.create_endpoint("adder", "/adder", methods=["GET"]) +serve.create_backend("adder:v0", batch_adder_v0, config={"max_batch_size": 4}) +serve.set_traffic("adder", {"adder:v0": 1}) +# __doc_deploy_end__ + + +# __doc_query_begin__ +@ray.remote +def send_query(number): + resp = requests.get("http://localhost:8000/adder?number={}".format(number)) + return int(resp.text) + + +# Let's use Ray to send all queries in parallel +results = ray.get([send_query.remote(i) for i in range(9)]) +print("Result returned:", results) +# Output +# (pid=...) Our input array has shape: (1,) +# (pid=...) Our input array has shape: (4,) +# (pid=...) Our input array has shape: (4,) +# Result returned: [1, 2, 3, 4, 5, 6, 7, 8, 9] +# __doc_query_end__ + + +# __doc_define_servable_v1_begin__ +@serve.accept_batch +def batch_adder_v1(flask_requests: List, *, numbers: List = []): + # Depending on request context, we process the input data differently. + print("Current context is", "web" if serve.context.web else "python") + if serve.context.web: + # If the requests come from web request, we parse the flask request + # to numbers + numbers = [int(request.args["number"]) for request in flask_requests] + else: + # Otherwise, we are processing requests invoked directly from Python. + numbers = numbers + + input_array = np.array(numbers) + print("Our input array has shape:", input_array.shape) + # Sleep for 200ms, this could be performing CPU intensive computation + # in real models + time.sleep(0.2) + output_array = input_array + 1 + return output_array.astype(int).tolist() + + +# __doc_define_servable_v1_end__ + +# __doc_deploy_v1_begin__ +serve.create_backend("adder:v1", batch_adder_v1, config={"max_batch_size": 4}) +serve.set_traffic("adder", {"adder:v1": 1}) +# __doc_deploy_v1_end__ + +# __doc_query_handle_begin__ +handle = serve.get_handle("adder") +print(handle) +# Output +# RayServeHandle( +# Endpoint="adder", +# Traffic={'adder:v1': 1} +# ) + +input_batch = list(range(9)) +print("Input batch is", input_batch) +# Input batch is [0, 1, 2, 3, 4, 5, 6, 7, 8] + +result_batch = ray.get([handle.remote(numbers=i) for i in input_batch]) +# Output +# (pid=...) Current context is python +# (pid=...) Our input array has shape: (1,) +# (pid=...) Current context is python +# (pid=...) Our input array has shape: (4,) +# (pid=...) Current context is python +# (pid=...) Our input array has shape: (4,) + +print("Result batch is", result_batch) +# Result batch is [1, 2, 3, 4, 5, 6, 7, 8, 9] +# __doc_query_handle_end__