From a139a5df8cce608fc488b365d694eea60f17b974 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Tue, 5 Jun 2018 07:18:12 -0700 Subject: [PATCH] [DataFrame] Implement Memoizer (#2157) * Implement Memoizer * Add LRUCache * Add comments --- python/ray/dataframe/utils.py | 93 +++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index e576cf860..466061c56 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import collections import pandas as pd import numpy as np import ray @@ -10,6 +11,96 @@ from . import get_npartitions _NAN_BLOCKS = {} +_MEMOIZER_CAPACITY = 1000 # Capacity per function + + +class LRUCache: + """A LRUCache implemented with collections.OrderedDict + + Notes: + - OrderedDict will record the order each item is inserted. + - The head of the queue will be LRU items. + """ + + def __init__(self, capacity): + self.capacity = capacity + self.cache = collections.OrderedDict() + + def __contains__(self, key): + return key in self.cache + + def __getitem__(self, key): + """Retrieve item from cache and re-insert it to the back of the queue + """ + value = self.cache.pop(key) + self.cache[key] = value + return value + + def __setitem__(self, key, value): + if key in self.cache: + self.cache.pop(key) + + if len(self.cache) >= self.capacity: + # Pop oldest items at the beginning of the queue + self.cache.popitem(last=False) + + self.cache[key] = value + + +class memoize: + """A basic memoizer that cache the input and output of the remote function + + Notes: + - How is this implemented? + This meoizer is implemented by adding a caching layer to the remote + function's remote attribute. When user call f.remote(*args), we will + first check against the cache, and then call the ray remote function + if we can't find the return value in the cache. + - When should this be used? + This should be used when we anticipate temporal locality for the + function. For example, we can reasonally assume users will perform + columnar operation repetitively over time (like sum() or loc[]). + - Caveat + Don't use this decorator if the any argument to the remote function + will mutate. Following snippet will fail + ```py + @memoize + @ray.remote + def f(obj): + ... + + mutable_obj = [1] + oid_1 = f.remote(mutable_obj) # will be cached + + mutable_obj.append(3) + oid_2 = f.remote(mutable_obj) # cache hit! + + oid_1 == oid_2 # True! + ``` + In short, use this function sparingly. The ideal case is that all + inputs are ray ObjectIDs because they are immutable objects. + - Future Development + - Fix the mutability bug + - Dynamic cache size (Fixed as 1000 for now) + """ + + def __init__(self, f): + # Save of remote function + self.old_remote_func = f.remote + self.cache = LRUCache(capacity=_MEMOIZER_CAPACITY) + + def remote(self, *args): + """Return cached result if the arguments are cached + """ + args = tuple(args) + + if args in self.cache: + cached_result = self.cache[args] + return cached_result + + result = self.old_remote_func(*args) + self.cache[args] = result + return result def _get_nan_block_id(n_row=1, n_col=1, transpose=False): @@ -308,6 +399,7 @@ def create_blocks_helper(df, npartitions, axis): return blocks +@memoize @ray.remote def _blocks_to_col(*partition): if len(partition): @@ -317,6 +409,7 @@ def _blocks_to_col(*partition): return pd.Series() +@memoize @ray.remote def _blocks_to_row(*partition): row_part = pd.concat(partition, axis=1, copy=False)\