diff --git a/python/ray/rllib/utils/compression.py b/python/ray/rllib/utils/compression.py index dcf07eb6a..24176285b 100644 --- a/python/ray/rllib/utils/compression.py +++ b/python/ray/rllib/utils/compression.py @@ -2,31 +2,60 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import time import base64 +import numpy as np import pyarrow try: - import snappy - SNAPPY_ENABLED = True + import lz4.frame + LZ4_ENABLED = True except ImportError: - print("WARNING: python-snappy not available, disabling sample compression") - SNAPPY_ENABLED = False + print( + "WARNING: lz4 not available, disabling sample compression. " + "This will significantly impact RLlib performance. " + "To install lz4, run `pip install lz4`.") + LZ4_ENABLED = False def pack(data): - if SNAPPY_ENABLED: - data = snappy.compress( - pyarrow.serialize(data).to_buffer().to_pybytes()) + if LZ4_ENABLED: + data = pyarrow.serialize(data).to_buffer().to_pybytes() + data = lz4.frame.compress(data) # TODO(ekl) we shouldn't need to base64 encode this data, but this # seems to not survive a transfer through the object store if we don't. - return base64.b64encode(data) - else: - return data + data = base64.b64encode(data) + return data def unpack(data): - if SNAPPY_ENABLED: + if LZ4_ENABLED: data = base64.b64decode(data) - return pyarrow.deserialize(snappy.decompress(data)) - else: - return data + data = lz4.frame.decompress(data) + data = pyarrow.deserialize(data) + return data + + +# Intel(R) Core(TM) i7-4600U CPU @ 2.10GHz +# Compression speed: 753.664 MB/s +# Compression ratio: 87.4839812046 +# Decompression speed: 910.9504 MB/s +if __name__ == "__main__": + size = 32 * 80 * 80 * 4 + data = np.ones(size).reshape((32, 80, 80, 4)) + + count = 0 + start = time.time() + while time.time() - start < 1: + pack(data) + count += 1 + compressed = pack(data) + print("Compression speed: {} MB/s".format(count * size * 4 / 1e6)) + print("Compression ratio: {}".format(round(size * 4 / len(compressed), 2))) + + count = 0 + start = time.time() + while time.time() - start < 1: + unpack(compressed) + count += 1 + print("Decompression speed: {} MB/s".format(count * size * 4 / 1e6)) diff --git a/python/setup.py b/python/setup.py index ff736dacd..f6dcfd62d 100644 --- a/python/setup.py +++ b/python/setup.py @@ -56,7 +56,7 @@ optional_ray_files += ray_autoscaler_files extras = { "rllib": [ "tensorflow", "pyyaml", "gym[atari]", "opencv-python", - "python-snappy", "scipy"] + "lz4", "scipy"] }