[DataFrame] Adding read methods and tests (#1712)

* Adding read methods and tests

* Referencing internal partition method so constructors are more canonical with Pandas

* Fixing to reference from_pandas in utils

* Cleaning up unused imports

* rerunning tests

* fixing flake8

* resolving errors

* Added sql and sas test

* updating

* Temporarily phasing out read_csv code for wrapper while diagnosing, added io tests to travis

* Adding travis

* restoring distributed read csv

* resolving rebases

* lint

* Sampling out HD test

* adding dep

* fix pathing

* Flagging out tests

* resolving read_method issues

* fix build issue

* move additional dependencies to extras

* fixing lint

* removing IO dependencies

* updated requirements doc
This commit is contained in:
adgirish
2018-04-20 18:33:08 -07:00
committed by Devin Petersohn
parent cffda73da1
commit 3c48783a16
4 changed files with 518 additions and 11 deletions
+212
View File
@@ -6,12 +6,14 @@ from itertools import chain
from io import BytesIO
import os
import re
import warnings
from pyarrow.parquet import ParquetFile
import pandas as pd
from .dataframe import ray, DataFrame
from . import get_npartitions
from .utils import from_pandas
# Parquet
@@ -189,6 +191,7 @@ def read_csv(filepath,
We only support local files for now.
kwargs: Keyword arguments in pandas::from_csv
"""
kwargs = dict(
sep=sep,
delimiter=delimiter,
@@ -260,3 +263,212 @@ def read_csv(filepath,
df_obj_ids.append(df)
return DataFrame(row_partitions=df_obj_ids, columns=columns)
def read_json(path_or_buf=None,
orient=None,
typ='frame',
dtype=True,
convert_axes=True,
convert_dates=True,
keep_default_dates=True,
numpy=False,
precise_float=False,
date_unit=None,
encoding=None,
lines=False,
chunksize=None,
compression='infer'):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_json(path_or_buf, orient, typ, dtype,
convert_axes, convert_dates, keep_default_dates,
numpy, precise_float, date_unit, encoding,
lines, chunksize, compression)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_html(io,
match='.+',
flavor=None,
header=None,
index_col=None,
skiprows=None,
attrs=None,
parse_dates=False,
tupleize_cols=None,
thousands=',',
encoding=None,
decimal='.',
converters=None,
na_values=None,
keep_default_na=True):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_html(io, match, flavor, header, index_col,
skiprows, attrs, parse_dates, tupleize_cols,
thousands, encoding, decimal, converters,
na_values, keep_default_na)
ray_frame = from_pandas(port_frame[0], get_npartitions())
return ray_frame
def read_clipboard(sep=r'\s+'):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_clipboard(sep)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_excel(io,
sheet_name=0,
header=0,
skiprows=None,
skip_footer=0,
index_col=None,
names=None,
usecols=None,
parse_dates=False,
date_parser=None,
na_values=None,
thousands=None,
convert_float=True,
converters=None,
dtype=None,
true_values=None,
false_values=None,
engine=None,
squeeze=False):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_excel(io, sheet_name, header, skiprows, skip_footer,
index_col, names, usecols, parse_dates,
date_parser, na_values, thousands,
convert_float, converters, dtype, true_values,
false_values, engine, squeeze)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_hdf(path_or_buf,
key=None,
mode='r'):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_hdf(path_or_buf, key, mode)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_feather(path,
nthreads=1):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_feather(path)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_msgpack(path_or_buf,
encoding='utf-8',
iterator=False):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_msgpack(path_or_buf, encoding, iterator)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_stata(filepath_or_buffer,
convert_dates=True,
convert_categoricals=True,
encoding=None,
index_col=None,
convert_missing=False,
preserve_dtypes=True,
columns=None,
order_categoricals=True,
chunksize=None,
iterator=False):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_stata(filepath_or_buffer, convert_dates,
convert_categoricals, encoding, index_col,
convert_missing, preserve_dtypes, columns,
order_categoricals, chunksize, iterator)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_sas(filepath_or_buffer,
format=None,
index=None,
encoding=None,
chunksize=None,
iterator=False):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_sas(filepath_or_buffer, format, index, encoding,
chunksize, iterator)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_pickle(path,
compression='infer'):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_pickle(path, compression)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
def read_sql(sql,
con,
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
columns=None,
chunksize=None):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
port_frame = pd.read_sql(sql, con, index_col, coerce_float, params,
parse_dates, columns, chunksize)
ray_frame = from_pandas(port_frame, get_npartitions())
return ray_frame
+297 -7
View File
@@ -5,13 +5,23 @@ from __future__ import print_function
import pytest
import numpy as np
import pandas as pd
from ray.dataframe.utils import to_pandas
import ray.dataframe.io as io
import os
from ray.dataframe.utils import to_pandas
import sqlite3
TEST_PARQUET_FILENAME = 'test.parquet'
TEST_CSV_FILENAME = 'test.csv'
TEST_JSON_FILENAME = 'test.json'
TEST_HTML_FILENAME = 'test.html'
TEST_EXCEL_FILENAME = 'test.xlsx'
TEST_FEATHER_FILENAME = 'test.feather'
TEST_HDF_FILENAME = 'test.hdf'
TEST_MSGPACK_FILENAME = 'test.msg'
TEST_STATA_FILENAME = 'test.dta'
TEST_PICKLE_FILENAME = 'test.pkl'
TEST_SAS_FILENAME = os.getcwd() + '/data/test1.sas7bdat'
TEST_SQL_FILENAME = 'test.db'
SMALL_ROW_SIZE = 2000
LARGE_ROW_SIZE = 7e6
@@ -57,6 +67,178 @@ def teardown_csv_file():
os.remove(TEST_CSV_FILENAME)
@pytest.fixture
def setup_json_file(row_size, force=False):
if os.path.exists(TEST_JSON_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_json(TEST_JSON_FILENAME)
@pytest.fixture
def teardown_json_file():
if os.path.exists(TEST_JSON_FILENAME):
os.remove(TEST_JSON_FILENAME)
@pytest.fixture
def setup_html_file(row_size, force=False):
if os.path.exists(TEST_HTML_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_html(TEST_HTML_FILENAME)
@pytest.fixture
def teardown_html_file():
if os.path.exists(TEST_HTML_FILENAME):
os.remove(TEST_HTML_FILENAME)
@pytest.fixture
def setup_clipboard(row_size, force=False):
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_clipboard()
@pytest.fixture
def setup_excel_file(row_size, force=False):
if os.path.exists(TEST_EXCEL_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_excel(TEST_EXCEL_FILENAME)
@pytest.fixture
def teardown_excel_file():
if os.path.exists(TEST_EXCEL_FILENAME):
os.remove(TEST_EXCEL_FILENAME)
@pytest.fixture
def setup_feather_file(row_size, force=False):
if os.path.exists(TEST_FEATHER_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_feather(TEST_FEATHER_FILENAME)
@pytest.fixture
def teardown_feather_file():
if os.path.exists(TEST_FEATHER_FILENAME):
os.remove(TEST_FEATHER_FILENAME)
@pytest.fixture
def setup_hdf_file(row_size, force=False):
if os.path.exists(TEST_HDF_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_hdf(TEST_HDF_FILENAME, 'test')
@pytest.fixture
def teardown_hdf_file():
if os.path.exists(TEST_HDF_FILENAME):
os.remove(TEST_HDF_FILENAME)
@pytest.fixture
def setup_msgpack_file(row_size, force=False):
if os.path.exists(TEST_MSGPACK_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_msgpack(TEST_MSGPACK_FILENAME)
@pytest.fixture
def teardown_msgpack_file():
if os.path.exists(TEST_MSGPACK_FILENAME):
os.remove(TEST_MSGPACK_FILENAME)
@pytest.fixture
def setup_stata_file(row_size, force=False):
if os.path.exists(TEST_STATA_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_stata(TEST_STATA_FILENAME)
@pytest.fixture
def teardown_stata_file():
if os.path.exists(TEST_STATA_FILENAME):
os.remove(TEST_STATA_FILENAME)
@pytest.fixture
def setup_pickle_file(row_size, force=False):
if os.path.exists(TEST_PICKLE_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_pickle(TEST_PICKLE_FILENAME)
@pytest.fixture
def teardown_pickle_file():
if os.path.exists(TEST_PICKLE_FILENAME):
os.remove(TEST_PICKLE_FILENAME)
@pytest.fixture
def setup_sql_file(conn, force=False):
if os.path.exists(TEST_SQL_FILENAME) and not force:
pass
else:
df = pd.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7],
'col3': [8, 9, 10, 11],
'col4': [12, 13, 14, 15],
'col5': [0, 0, 0, 0]})
df.to_sql(TEST_SQL_FILENAME.split(".")[0], conn)
@pytest.fixture
def teardown_sql_file():
if os.path.exists(TEST_SQL_FILENAME):
os.remove(TEST_SQL_FILENAME)
def test_from_parquet_small():
setup_parquet_file(SMALL_ROW_SIZE)
@@ -90,12 +272,120 @@ def test_from_csv():
teardown_csv_file()
def test_from_csv_delimiter():
setup_csv_file(SMALL_ROW_SIZE, delimiter='|')
def test_from_json():
setup_json_file(SMALL_ROW_SIZE)
pd_df = pd.read_csv(TEST_CSV_FILENAME)
ray_df = io.read_csv(TEST_CSV_FILENAME)
pd_df = pd.read_json(TEST_JSON_FILENAME)
ray_df = io.read_json(TEST_JSON_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_csv_file()
teardown_json_file()
def test_from_html():
setup_html_file(SMALL_ROW_SIZE)
pd_df = pd.read_html(TEST_HTML_FILENAME)[0]
ray_df = io.read_html(TEST_HTML_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_html_file()
@pytest.mark.skip(reason="No clipboard on Travis")
def test_from_clipboard():
setup_clipboard(SMALL_ROW_SIZE)
pd_df = pd.read_clipboard()
ray_df = io.read_clipboard()
assert ray_df_equals_pandas(ray_df, pd_df)
def test_from_excel():
setup_excel_file(SMALL_ROW_SIZE)
pd_df = pd.read_excel(TEST_EXCEL_FILENAME)
ray_df = io.read_excel(TEST_EXCEL_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_excel_file()
def test_from_feather():
setup_feather_file(SMALL_ROW_SIZE)
pd_df = pd.read_feather(TEST_FEATHER_FILENAME)
ray_df = io.read_feather(TEST_FEATHER_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_feather_file()
@pytest.mark.skip(reason="Memory overflow on Travis")
def test_from_hdf():
setup_hdf_file(SMALL_ROW_SIZE)
pd_df = pd.read_hdf(TEST_HDF_FILENAME, key='test')
ray_df = io.read_hdf(TEST_HDF_FILENAME, key='test')
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_hdf_file()
def test_from_msgpack():
setup_msgpack_file(SMALL_ROW_SIZE)
pd_df = pd.read_msgpack(TEST_MSGPACK_FILENAME)
ray_df = io.read_msgpack(TEST_MSGPACK_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_msgpack_file()
def test_from_stata():
setup_stata_file(SMALL_ROW_SIZE)
pd_df = pd.read_stata(TEST_STATA_FILENAME)
ray_df = io.read_stata(TEST_STATA_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_stata_file()
def test_from_pickle():
setup_pickle_file(SMALL_ROW_SIZE)
pd_df = pd.read_pickle(TEST_PICKLE_FILENAME)
ray_df = io.read_pickle(TEST_PICKLE_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_pickle_file()
def test_from_sql():
conn = sqlite3.connect(TEST_SQL_FILENAME)
setup_sql_file(conn, True)
pd_df = pd.read_sql("select * from test", conn)
ray_df = io.read_sql("select * from test", conn)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_sql_file()
@pytest.mark.skip(reason="No SAS write methods in Pandas")
def test_from_sas():
pd_df = pd.read_sas(TEST_SAS_FILENAME)
ray_df = io.read_sas(TEST_SAS_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)