Moved bundle download methods into catalyst/data/bundles/core

This commit is contained in:
Conner Fromknecht
2017-06-30 11:34:55 -07:00
parent 01a650f2a4
commit bc9d60a8d7
2 changed files with 56 additions and 58 deletions
+53
View File
@@ -1,6 +1,7 @@
from collections import namedtuple
import errno
import os
import requests
import shutil
import warnings
@@ -32,6 +33,7 @@ import catalyst.utils.paths as pth
from catalyst.utils.preprocess import preprocess
from catalyst.utils.calendars import get_calendar
ONE_MEGABYTE = 1024 * 1024
def asset_db_path(bundle_name, timestr, environ=None, db_version=None):
return pth.data_path(
@@ -131,6 +133,57 @@ def ingestions_for_bundle(bundle, environ=None):
reverse=True,
)
def download_with_progress(url, chunk_size, **progress_kwargs):
"""
Download streaming data from a URL, printing progress information to the
terminal.
Parameters
----------
url : str
A URL that can be understood by ``requests.get``.
chunk_size : int
Number of bytes to read at a time from requests.
**progress_kwargs
Forwarded to click.progressbar.
Returns
-------
data : BytesIO
A BytesIO containing the downloaded data.
"""
resp = requests.get(url, stream=True)
resp.raise_for_status()
total_size = int(resp.headers['content-length'])
data = BytesIO()
with click.progressbar(length=total_size, **progress_kwargs) as pbar:
for chunk in resp.iter_content(chunk_size=chunk_size):
data.write(chunk)
pbar.update(len(chunk))
data.seek(0)
return data
def download_without_progress(url):
"""
Download data from a URL, returning a BytesIO containing the loaded data.
Parameters
----------
url : str
A URL that can be understood by ``requests.get``.
Returns
-------
data : BytesIO
A BytesIO containing the downloaded data.
"""
resp = requests.get(url)
resp.raise_for_status()
return BytesIO(resp.content)
RegisteredBundle = namedtuple(
'RegisteredBundle',
+3 -58
View File
@@ -7,10 +7,8 @@ import tarfile
from time import time, sleep
from datetime import datetime
from click import progressbar
from logbook import Logger
import pandas as pd
import requests
from six.moves.urllib.parse import urlencode
from catalyst.utils.calendars import register_calendar_alias
@@ -334,62 +332,9 @@ def quandl_bundle(environ,
)
def download_with_progress(url, chunk_size, **progress_kwargs):
"""
Download streaming data from a URL, printing progress information to the
terminal.
Parameters
----------
url : str
A URL that can be understood by ``requests.get``.
chunk_size : int
Number of bytes to read at a time from requests.
**progress_kwargs
Forwarded to click.progressbar.
Returns
-------
data : BytesIO
A BytesIO containing the downloaded data.
"""
resp = requests.get(url, stream=True)
resp.raise_for_status()
total_size = int(resp.headers['content-length'])
data = BytesIO()
with progressbar(length=total_size, **progress_kwargs) as pbar:
for chunk in resp.iter_content(chunk_size=chunk_size):
data.write(chunk)
pbar.update(len(chunk))
data.seek(0)
return data
def download_without_progress(url):
"""
Download data from a URL, returning a BytesIO containing the loaded data.
Parameters
----------
url : str
A URL that can be understood by ``requests.get``.
Returns
-------
data : BytesIO
A BytesIO containing the downloaded data.
"""
resp = requests.get(url)
resp.raise_for_status()
return BytesIO(resp.content)
QUANTOPIAN_QUANDL_URL = (
'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl'
)
ONE_MEGABYTE = 1024 * 1024
@bundles.register('quantopian-quandl', create_writers=False)
@@ -405,13 +350,13 @@ def quantopian_quandl_bundle(environ,
show_progress,
output_dir):
if show_progress:
data = download_with_progress(
data = bundles.download_with_progress(
QUANTOPIAN_QUANDL_URL,
chunk_size=ONE_MEGABYTE,
chunk_size=bundles.ONE_MEGABYTE,
label="Downloading Bundle: quantopian-quandl",
)
else:
data = download_without_progress(QUANTOPIAN_QUANDL_URL)
data = bundles.download_without_progress(QUANTOPIAN_QUANDL_URL)
with tarfile.open('r', fileobj=data) as tar:
if show_progress: