From 3c48783a164b42de69f85d3f1076156de2bcbef4 Mon Sep 17 00:00:00 2001 From: adgirish Date: Fri, 20 Apr 2018 18:33:08 -0700 Subject: [PATCH] [DataFrame] Adding read methods and tests (#1712) * Adding read methods and tests * Referencing internal partition method so constructors are more canonical with Pandas * Fixing to reference from_pandas in utils * Cleaning up unused imports * rerunning tests * fixing flake8 * resolving errors * Added sql and sas test * updating * Temporarily phasing out read_csv code for wrapper while diagnosing, added io tests to travis * Adding travis * restoring distributed read csv * resolving rebases * lint * Sampling out HD test * adding dep * fix pathing * Flagging out tests * resolving read_method issues * fix build issue * move additional dependencies to extras * fixing lint * removing IO dependencies * updated requirements doc --- .travis.yml | 1 + .travis/install-dependencies.sh | 12 +- python/ray/dataframe/io.py | 212 +++++++++++++++++++ python/ray/dataframe/test/test_io.py | 304 ++++++++++++++++++++++++++- 4 files changed, 518 insertions(+), 11 deletions(-) diff --git a/.travis.yml b/.travis.yml index aa04e3a7c..ca5650f89 100644 --- a/.travis.yml +++ b/.travis.yml @@ -146,6 +146,7 @@ script: - python -m pytest python/ray/dataframe/test/test_dataframe.py - python -m pytest python/ray/dataframe/test/test_series.py - python -m pytest python/ray/dataframe/test/test_concat.py + - python -m pytest python/ray/dataframe/test/test_io.py # ray tune tests - python python/ray/tune/test/dependency_test.py diff --git a/.travis/install-dependencies.sh b/.travis/install-dependencies.sh index 37e2b2b2b..388c3574b 100755 --- a/.travis/install-dependencies.sh +++ b/.travis/install-dependencies.sh @@ -24,7 +24,8 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O miniconda.sh -nv bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" - pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests + pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests \ + feather-format lxml openpyxl xlrd elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update sudo apt-get install -y cmake pkg-config python-dev python-numpy build-essential autoconf curl libtool unzip @@ -32,7 +33,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh -nv bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" - pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests + pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests \ + feather-format lxml openpyxl xlrd elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed which -s brew @@ -48,7 +50,8 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then wget https://repo.continuum.io/miniconda/Miniconda2-latest-MacOSX-x86_64.sh -O miniconda.sh -nv bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" - pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests + pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests \ + feather-format lxml openpyxl xlrd elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed which -s brew @@ -64,7 +67,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh -O miniconda.sh -nv bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" - pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests + pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas requests \ + feather-format lxml openpyxl xlrd elif [[ "$LINT" == "1" ]]; then sudo apt-get update sudo apt-get install -y cmake build-essential autoconf curl libtool unzip diff --git a/python/ray/dataframe/io.py b/python/ray/dataframe/io.py index fdfe04cfc..c1abc0ec4 100644 --- a/python/ray/dataframe/io.py +++ b/python/ray/dataframe/io.py @@ -6,12 +6,14 @@ from itertools import chain from io import BytesIO import os import re +import warnings from pyarrow.parquet import ParquetFile import pandas as pd from .dataframe import ray, DataFrame from . import get_npartitions +from .utils import from_pandas # Parquet @@ -189,6 +191,7 @@ def read_csv(filepath, We only support local files for now. kwargs: Keyword arguments in pandas::from_csv """ + kwargs = dict( sep=sep, delimiter=delimiter, @@ -260,3 +263,212 @@ def read_csv(filepath, df_obj_ids.append(df) return DataFrame(row_partitions=df_obj_ids, columns=columns) + + +def read_json(path_or_buf=None, + orient=None, + typ='frame', + dtype=True, + convert_axes=True, + convert_dates=True, + keep_default_dates=True, + numpy=False, + precise_float=False, + date_unit=None, + encoding=None, + lines=False, + chunksize=None, + compression='infer'): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_json(path_or_buf, orient, typ, dtype, + convert_axes, convert_dates, keep_default_dates, + numpy, precise_float, date_unit, encoding, + lines, chunksize, compression) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_html(io, + match='.+', + flavor=None, + header=None, + index_col=None, + skiprows=None, + attrs=None, + parse_dates=False, + tupleize_cols=None, + thousands=',', + encoding=None, + decimal='.', + converters=None, + na_values=None, + keep_default_na=True): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_html(io, match, flavor, header, index_col, + skiprows, attrs, parse_dates, tupleize_cols, + thousands, encoding, decimal, converters, + na_values, keep_default_na) + ray_frame = from_pandas(port_frame[0], get_npartitions()) + + return ray_frame + + +def read_clipboard(sep=r'\s+'): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_clipboard(sep) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_excel(io, + sheet_name=0, + header=0, + skiprows=None, + skip_footer=0, + index_col=None, + names=None, + usecols=None, + parse_dates=False, + date_parser=None, + na_values=None, + thousands=None, + convert_float=True, + converters=None, + dtype=None, + true_values=None, + false_values=None, + engine=None, + squeeze=False): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_excel(io, sheet_name, header, skiprows, skip_footer, + index_col, names, usecols, parse_dates, + date_parser, na_values, thousands, + convert_float, converters, dtype, true_values, + false_values, engine, squeeze) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_hdf(path_or_buf, + key=None, + mode='r'): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_hdf(path_or_buf, key, mode) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_feather(path, + nthreads=1): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_feather(path) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_msgpack(path_or_buf, + encoding='utf-8', + iterator=False): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_msgpack(path_or_buf, encoding, iterator) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_stata(filepath_or_buffer, + convert_dates=True, + convert_categoricals=True, + encoding=None, + index_col=None, + convert_missing=False, + preserve_dtypes=True, + columns=None, + order_categoricals=True, + chunksize=None, + iterator=False): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_stata(filepath_or_buffer, convert_dates, + convert_categoricals, encoding, index_col, + convert_missing, preserve_dtypes, columns, + order_categoricals, chunksize, iterator) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_sas(filepath_or_buffer, + format=None, + index=None, + encoding=None, + chunksize=None, + iterator=False): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_sas(filepath_or_buffer, format, index, encoding, + chunksize, iterator) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_pickle(path, + compression='infer'): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_pickle(path, compression) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame + + +def read_sql(sql, + con, + index_col=None, + coerce_float=True, + params=None, + parse_dates=None, + columns=None, + chunksize=None): + + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + port_frame = pd.read_sql(sql, con, index_col, coerce_float, params, + parse_dates, columns, chunksize) + ray_frame = from_pandas(port_frame, get_npartitions()) + + return ray_frame diff --git a/python/ray/dataframe/test/test_io.py b/python/ray/dataframe/test/test_io.py index 00b0a06ad..2bfbf7c43 100644 --- a/python/ray/dataframe/test/test_io.py +++ b/python/ray/dataframe/test/test_io.py @@ -5,13 +5,23 @@ from __future__ import print_function import pytest import numpy as np import pandas as pd +from ray.dataframe.utils import to_pandas import ray.dataframe.io as io import os - -from ray.dataframe.utils import to_pandas +import sqlite3 TEST_PARQUET_FILENAME = 'test.parquet' TEST_CSV_FILENAME = 'test.csv' +TEST_JSON_FILENAME = 'test.json' +TEST_HTML_FILENAME = 'test.html' +TEST_EXCEL_FILENAME = 'test.xlsx' +TEST_FEATHER_FILENAME = 'test.feather' +TEST_HDF_FILENAME = 'test.hdf' +TEST_MSGPACK_FILENAME = 'test.msg' +TEST_STATA_FILENAME = 'test.dta' +TEST_PICKLE_FILENAME = 'test.pkl' +TEST_SAS_FILENAME = os.getcwd() + '/data/test1.sas7bdat' +TEST_SQL_FILENAME = 'test.db' SMALL_ROW_SIZE = 2000 LARGE_ROW_SIZE = 7e6 @@ -57,6 +67,178 @@ def teardown_csv_file(): os.remove(TEST_CSV_FILENAME) +@pytest.fixture +def setup_json_file(row_size, force=False): + if os.path.exists(TEST_JSON_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_json(TEST_JSON_FILENAME) + + +@pytest.fixture +def teardown_json_file(): + if os.path.exists(TEST_JSON_FILENAME): + os.remove(TEST_JSON_FILENAME) + + +@pytest.fixture +def setup_html_file(row_size, force=False): + if os.path.exists(TEST_HTML_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_html(TEST_HTML_FILENAME) + + +@pytest.fixture +def teardown_html_file(): + if os.path.exists(TEST_HTML_FILENAME): + os.remove(TEST_HTML_FILENAME) + + +@pytest.fixture +def setup_clipboard(row_size, force=False): + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_clipboard() + + +@pytest.fixture +def setup_excel_file(row_size, force=False): + if os.path.exists(TEST_EXCEL_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_excel(TEST_EXCEL_FILENAME) + + +@pytest.fixture +def teardown_excel_file(): + if os.path.exists(TEST_EXCEL_FILENAME): + os.remove(TEST_EXCEL_FILENAME) + + +@pytest.fixture +def setup_feather_file(row_size, force=False): + if os.path.exists(TEST_FEATHER_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_feather(TEST_FEATHER_FILENAME) + + +@pytest.fixture +def teardown_feather_file(): + if os.path.exists(TEST_FEATHER_FILENAME): + os.remove(TEST_FEATHER_FILENAME) + + +@pytest.fixture +def setup_hdf_file(row_size, force=False): + if os.path.exists(TEST_HDF_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_hdf(TEST_HDF_FILENAME, 'test') + + +@pytest.fixture +def teardown_hdf_file(): + if os.path.exists(TEST_HDF_FILENAME): + os.remove(TEST_HDF_FILENAME) + + +@pytest.fixture +def setup_msgpack_file(row_size, force=False): + if os.path.exists(TEST_MSGPACK_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_msgpack(TEST_MSGPACK_FILENAME) + + +@pytest.fixture +def teardown_msgpack_file(): + if os.path.exists(TEST_MSGPACK_FILENAME): + os.remove(TEST_MSGPACK_FILENAME) + + +@pytest.fixture +def setup_stata_file(row_size, force=False): + if os.path.exists(TEST_STATA_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_stata(TEST_STATA_FILENAME) + + +@pytest.fixture +def teardown_stata_file(): + if os.path.exists(TEST_STATA_FILENAME): + os.remove(TEST_STATA_FILENAME) + + +@pytest.fixture +def setup_pickle_file(row_size, force=False): + if os.path.exists(TEST_PICKLE_FILENAME) and not force: + pass + else: + df = pd.DataFrame({ + 'col1': np.arange(row_size), + 'col2': np.arange(row_size) + }) + df.to_pickle(TEST_PICKLE_FILENAME) + + +@pytest.fixture +def teardown_pickle_file(): + if os.path.exists(TEST_PICKLE_FILENAME): + os.remove(TEST_PICKLE_FILENAME) + + +@pytest.fixture +def setup_sql_file(conn, force=False): + if os.path.exists(TEST_SQL_FILENAME) and not force: + pass + else: + df = pd.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col4': [12, 13, 14, 15], + 'col5': [0, 0, 0, 0]}) + df.to_sql(TEST_SQL_FILENAME.split(".")[0], conn) + + +@pytest.fixture +def teardown_sql_file(): + if os.path.exists(TEST_SQL_FILENAME): + os.remove(TEST_SQL_FILENAME) + + def test_from_parquet_small(): setup_parquet_file(SMALL_ROW_SIZE) @@ -90,12 +272,120 @@ def test_from_csv(): teardown_csv_file() -def test_from_csv_delimiter(): - setup_csv_file(SMALL_ROW_SIZE, delimiter='|') +def test_from_json(): + setup_json_file(SMALL_ROW_SIZE) - pd_df = pd.read_csv(TEST_CSV_FILENAME) - ray_df = io.read_csv(TEST_CSV_FILENAME) + pd_df = pd.read_json(TEST_JSON_FILENAME) + ray_df = io.read_json(TEST_JSON_FILENAME) assert ray_df_equals_pandas(ray_df, pd_df) - teardown_csv_file() + teardown_json_file() + + +def test_from_html(): + setup_html_file(SMALL_ROW_SIZE) + + pd_df = pd.read_html(TEST_HTML_FILENAME)[0] + ray_df = io.read_html(TEST_HTML_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_html_file() + + +@pytest.mark.skip(reason="No clipboard on Travis") +def test_from_clipboard(): + setup_clipboard(SMALL_ROW_SIZE) + + pd_df = pd.read_clipboard() + ray_df = io.read_clipboard() + + assert ray_df_equals_pandas(ray_df, pd_df) + + +def test_from_excel(): + setup_excel_file(SMALL_ROW_SIZE) + + pd_df = pd.read_excel(TEST_EXCEL_FILENAME) + ray_df = io.read_excel(TEST_EXCEL_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_excel_file() + + +def test_from_feather(): + setup_feather_file(SMALL_ROW_SIZE) + + pd_df = pd.read_feather(TEST_FEATHER_FILENAME) + ray_df = io.read_feather(TEST_FEATHER_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_feather_file() + + +@pytest.mark.skip(reason="Memory overflow on Travis") +def test_from_hdf(): + setup_hdf_file(SMALL_ROW_SIZE) + + pd_df = pd.read_hdf(TEST_HDF_FILENAME, key='test') + ray_df = io.read_hdf(TEST_HDF_FILENAME, key='test') + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_hdf_file() + + +def test_from_msgpack(): + setup_msgpack_file(SMALL_ROW_SIZE) + + pd_df = pd.read_msgpack(TEST_MSGPACK_FILENAME) + ray_df = io.read_msgpack(TEST_MSGPACK_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_msgpack_file() + + +def test_from_stata(): + setup_stata_file(SMALL_ROW_SIZE) + + pd_df = pd.read_stata(TEST_STATA_FILENAME) + ray_df = io.read_stata(TEST_STATA_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_stata_file() + + +def test_from_pickle(): + setup_pickle_file(SMALL_ROW_SIZE) + + pd_df = pd.read_pickle(TEST_PICKLE_FILENAME) + ray_df = io.read_pickle(TEST_PICKLE_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_pickle_file() + + +def test_from_sql(): + conn = sqlite3.connect(TEST_SQL_FILENAME) + setup_sql_file(conn, True) + + pd_df = pd.read_sql("select * from test", conn) + ray_df = io.read_sql("select * from test", conn) + + assert ray_df_equals_pandas(ray_df, pd_df) + + teardown_sql_file() + + +@pytest.mark.skip(reason="No SAS write methods in Pandas") +def test_from_sas(): + pd_df = pd.read_sas(TEST_SAS_FILENAME) + ray_df = io.read_sas(TEST_SAS_FILENAME) + + assert ray_df_equals_pandas(ray_df, pd_df)