diff --git a/doc/source/example-streaming.rst b/doc/source/example-streaming.rst new file mode 100644 index 000000000..a066f2188 --- /dev/null +++ b/doc/source/example-streaming.rst @@ -0,0 +1,151 @@ +Streaming MapReduce +=================== + +This document walks through how to implement a simple streaming application +using Ray's actor capabilities. It implements a streaming MapReduce which +computes word counts on wikipedia articles. + +You can view the `code for this example`_. + +.. _`code for this example`: https://github.com/ray-project/ray/tree/master/examples/streaming + +To run the example, you need to install the dependencies + +.. code-block:: bash + + pip install wikipedia + + +and then execute the script as follows: + +.. code-block:: bash + + python ray/examples/streaming/streaming.py + +For each round of articles read, the script will output +the top 10 words in these articles together with their word count: + +.. code-block:: text + + article index = 0 + the 2866 + of 1688 + and 1448 + in 1101 + to 593 + a 553 + is 509 + as 325 + are 284 + by 261 + article index = 1 + the 3597 + of 1971 + and 1735 + in 1429 + to 670 + a 623 + is 578 + as 401 + by 293 + for 285 + article index = 2 + the 3910 + of 2123 + and 1890 + in 1468 + to 658 + a 653 + is 488 + as 364 + by 362 + for 297 + article index = 3 + the 2962 + of 1667 + and 1472 + in 1220 + a 546 + to 538 + is 516 + as 307 + by 253 + for 243 + article index = 4 + the 3523 + of 1866 + and 1690 + in 1475 + to 645 + a 583 + is 572 + as 352 + by 318 + for 306 + ... + +Note that this examples uses `distributed actor handles`_, which are still +considered experimental. + +.. _`distributed actor handles`: http://ray.readthedocs.io/en/latest/actors.html + +There is a ``Mapper`` actor, which has a method ``get_range`` used to retrieve +word counts for words in a certain range: + +.. code-block:: python + + @ray.remote + class Mapper(object): + + def __init__(self, title_stream): + # Constructor, the title stream parameter is a stream of wikipedia + # article titles that will be read by this mapper + + def get_range(self, article_index, keys): + # Return counts of all the words with first + # letter between keys[0] and keys[1] in the + # articles that haven't been read yet with index + # up to article_index + +The ``Reducer`` actor holds a list of mappers, calls ``get_range`` on them +and accumulates the results. + +.. code-block:: python + + @ray.remote + class Reducer(object): + + def __init__(self, keys, *mappers): + # Constructor for a reducer that gets input from the list of mappers + # in the argument and accumulates word counts for words with first + # letter between keys[0] and keys[1] + + def next_reduce_result(self, article_index): + # Get articles up to article_index that haven't been read yet, + # accumulate the word counts and return them + +On the driver, we then create a number of mappers and reducers and run the +streaming MapReduce: + +.. code-block:: python + + streams = # Create list of num_mappers streams + keys = # Partition the keys among the reducers. + + # Create a number of mappers. + mappers = [Mapper.remote(stream) for stream in streams] + + # Create a number of reduces, each responsible for a different range of keys. + # This gives each Reducer actor a handle to each Mapper actor. + reducers = [Reducer.remote(key, *mappers) for key in keys] + + article_index = 0 + while True: + counts = ray.get([reducer.next_reduce_result.remote(article_index) + for reducer in reducers]) + article_index += 1 + +The actual example reads a list of articles and creates a stream object which +produces an infinite stream of articles from the list. This is a toy example +meant to illustrate the idea. In practice we would produce a stream of +non-repeating items for each mapper. diff --git a/doc/source/index.rst b/doc/source/index.rst index 402b0ccef..80c351044 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -59,6 +59,7 @@ Example Program example-lbfgs.rst example-evolution-strategies.rst example-cython.rst + example-streaming.rst using-ray-with-tensorflow.rst .. toctree:: diff --git a/examples/streaming/articles.txt b/examples/streaming/articles.txt new file mode 100644 index 000000000..0bb455fd3 --- /dev/null +++ b/examples/streaming/articles.txt @@ -0,0 +1,8 @@ +New York City +Berlin +London +Paris +United States +Germany +France +United Kingdom diff --git a/examples/streaming/streaming.py b/examples/streaming/streaming.py new file mode 100644 index 000000000..92184ef9d --- /dev/null +++ b/examples/streaming/streaming.py @@ -0,0 +1,104 @@ +import argparse +from collections import Counter, defaultdict +import heapq +import numpy as np +import os +import ray +import wikipedia + +parser = argparse.ArgumentParser() +parser.add_argument("--num-mappers", + help="number of mapper actors used", default=3) +parser.add_argument("--num-reducers", + help="number of reducer actors used", default=4) + + +@ray.remote +class Mapper(object): + def __init__(self, title_stream): + self.title_stream = title_stream + self.num_articles_processed = 0 + self.articles = [] + self.word_counts = [] + + def get_new_article(self): + # Get the next wikipedia article. + article = wikipedia.page(self.title_stream.next()).content + # Count the words and store the result. + self.word_counts.append(Counter(article.split(" "))) + self.num_articles_processed += 1 + + def get_range(self, article_index, keys): + # Process more articles if this Mapper hasn't processed enough yet. + while self.num_articles_processed < article_index + 1: + self.get_new_article() + # Return the word counts from within a given character range. + return [(k, v) for k, v in self.word_counts[article_index].items() + if len(k) >= 1 and k[0] >= keys[0] and k[0] <= keys[1]] + + +@ray.remote +class Reducer(object): + def __init__(self, keys, *mappers): + self.mappers = mappers + self.keys = keys + + def next_reduce_result(self, article_index): + word_count_sum = defaultdict(lambda: 0) + # Get the word counts for this Reducer's keys from all of the Mappers + # and aggregate the results. + count_ids = [mapper.get_range.remote(article_index, self.keys) + for mapper in self.mappers] + # TODO(rkn): We should process these out of order using ray.wait. + for count_id in count_ids: + for k, v in ray.get(count_id): + word_count_sum[k] += v + return word_count_sum + + +class Stream(object): + def __init__(self, elements): + self.elements = elements + + def next(self): + i = np.random.randint(0, len(self.elements)) + return self.elements[i] + + +if __name__ == "__main__": + args = parser.parse_args() + + ray.init() + + # Create one streaming source of articles per mapper. + directory = os.path.dirname(os.path.realpath(__file__)) + streams = [] + for _ in range(args.num_mappers): + with open(os.path.join(directory, "articles.txt")) as f: + streams.append(Stream([line.strip() for line in f.readlines()])) + + # Partition the keys among the reducers. + chunks = np.array_split([chr(i) for i in range(ord("a"), ord("z") + 1)], + args.num_reducers) + keys = [[chunk[0], chunk[-1]] for chunk in chunks] + + # Create a number of mappers. + mappers = [Mapper.remote(stream) for stream in streams] + + # Create a number of reduces, each responsible for a different range of + # keys. This gives each Reducer actor a handle to each Mapper actor. + reducers = [Reducer.remote(key, *mappers) for key in keys] + + article_index = 0 + while True: + print("article index = {}".format(article_index)) + wordcounts = dict() + counts = ray.get([reducer.next_reduce_result.remote(article_index) + for reducer in reducers]) + for count in counts: + wordcounts.update(count) + most_frequent_words = heapq.nlargest(10, wordcounts, + key=wordcounts.get) + for word in most_frequent_words: + print(" ", word, wordcounts[word]) + article_index += 1