Removed data_scraper

This commit is contained in:
Juan Pablo Amoroso
2020-03-20 15:47:25 -03:00
parent be20c56478
commit 562e43bd63
21 changed files with 0 additions and 4761 deletions
-8
View File
@@ -1,8 +0,0 @@
{
"cboe": {
"mute_notifications": []
},
"notifications": {
"slack_webhook": ""
}
}
View File
-48
View File
@@ -1,48 +0,0 @@
import logging.config
import os
import argparse
from data_scraper import cboe, tiingo, backup
parser = argparse.ArgumentParser(prog="data_scraper.py")
parser.add_argument("-t", "--symbols", nargs="+", help="Symbols to fetch")
parser.add_argument(
"-s",
"--scraper",
choices=["cboe", "tiingo"],
default="cboe",
help="Scraper to use")
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable logging")
parser.add_argument(
"-a",
"--aggregate",
action="store_true",
help="Aggregate daily data files")
parser.add_argument(
"-b", "--backup", action="store_true", help="Backup files in S3 bucket")
args = parser.parse_args()
module_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
if args.verbose:
config_file = os.path.realpath(os.path.join(module_dir, "logconfig.ini"))
logging.config.fileConfig(fname=config_file)
if args.aggregate:
if args.symbols:
cboe.aggregate_monthly_data(args.symbols)
else:
cboe.aggregate_monthly_data()
elif args.backup:
backup.backup_data()
else:
if args.scraper == "tiingo":
scraper = tiingo
else:
scraper = cboe
if args.symbols:
scraper.fetch_data(args.symbols)
else:
scraper.fetch_data()
-114
View File
@@ -1,114 +0,0 @@
import logging
import os
import boto3
from botocore.exceptions import ClientError
from data_scraper import utils
from data_scraper.notifications import slack_notification, Status
logger = logging.getLogger(__name__)
def backup_data():
"""Uploads scraped files to S3 bucket.
Set bucket name in environment variable $S3_BUCKET
"""
try:
bucket_name = utils.get_environment_var("S3_BUCKET")
except EnvironmentError as e:
logger.error(str(e))
slack_notification("Backup failed. Set $S3_BUCKET env variable",
__name__)
raise e
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
data_path = utils.get_save_data_path()
cboe_data = os.path.join(data_path, "cboe")
cboe_folders = []
if os.path.exists(cboe_data):
cboe_folders = [
os.path.join(cboe_data, folder) for folder in os.listdir(cboe_data)
if not folder.endswith("daily")
]
tiingo_data = os.path.join(data_path, "tiingo")
tiingo_folders = []
if os.path.exists(tiingo_data):
tiingo_folders = [
os.path.join(tiingo_data, folder)
for folder in os.listdir(tiingo_data)
]
done_cboe, fail_cboe = _upload_folders(
bucket, "cboe", cboe_folders, remove_files=False)
done_tiingo, fail_tiingo = _upload_folders(
bucket, "tiingo", tiingo_folders, remove_files=True)
done = done_cboe + done_tiingo
failed = fail_cboe + fail_tiingo
if len(done) > 0:
msg = "Successful backup of symbols: " + ", ".join(done)
slack_notification(msg, __name__, status=Status.Success)
if len(failed) > 0:
msg = "Unable to backup symbols: " + ", ".join(done)
slack_notification(msg, __name__, status=Status.Warning)
def _upload_folders(bucket, scraper, folders, remove_files=False):
"""Uploads folders to S3 bucket and (optionally) removes old files"""
data_path = utils.get_save_data_path()
done, failed = [], []
for folder in folders:
symbol = os.path.basename(folder)
try:
if remove_files:
_remove_old_files(bucket, prefix=scraper + "/" + symbol)
_upload_folder(bucket, folder, data_path)
except Exception:
failed.append(os.path.basename(folder))
else:
done.append(os.path.basename(folder))
return (done, failed)
def _upload_folder(bucket, folder, data_path):
"""Uploads folder contents to S3 bucket"""
if not os.path.isdir(folder):
return
for root, dirs, files in os.walk(folder):
for file in files:
file_path = os.path.join(root, file)
key = os.path.relpath(file_path, data_path)
if _key_exists(bucket, key):
logger.debug("File already exists in S3")
continue
try:
bucket.upload_file(file_path, key)
logger.debug("Uploaded file %s to S3", file)
except Exception as e:
msg = "Error uploading data file {} to S3.\nReceived exception message {}".format(
file_path, str(e))
logger.error(msg, exc_info=True)
slack_notification(msg, __name__)
raise e
def _key_exists(bucket, key):
try:
bucket.Object(key).load()
except ClientError as e:
return int(e.response["Error"]["Code"]) != 404
return False
def _remove_old_files(bucket, prefix):
old_files = bucket.objects.filter(Prefix=prefix)
for file in old_files:
file.delete()
-249
View File
@@ -1,249 +0,0 @@
import logging
import os
from datetime import date
from io import StringIO
from itertools import groupby
from bs4 import BeautifulSoup
import requests
import pandas as pd
from . import utils, validation
from .notifications import slack_notification, Status
logger = logging.getLogger(__name__)
url = "http://www.cboe.com/delayedquote/quote-table-download"
def fetch_data(symbols=None):
"""Fetches options data for a given list of symbols"""
symbols = symbols or _get_all_listed_symbols()
options = utils.get_module_config("cboe")
mute_notifications = options.get("mute_notifications", [])
try:
form_data = _form_data()
except requests.ConnectionError as ce:
msg = "Connection error trying to reach {}".format(url)
logger.error(msg)
slack_notification(msg, __name__)
raise ce
except Exception as e:
msg = "Error parsing response"
logger.error(msg, exc_info=True)
slack_notification(msg, __name__)
raise e
headers = {"Referer": url}
file_url = "http://www.cboe.com/delayedquote/quotedata.dat"
symbols = [symbol.upper() for symbol in symbols]
done, failed = [], []
for symbol in symbols:
form_data["ctl00$ContentTop$C005$txtTicker"] = symbol
try:
response = requests.post(url,
data=form_data,
headers=headers,
allow_redirects=False)
symbol_req = requests.get(file_url,
cookies=response.cookies,
headers=headers)
symbol_data = symbol_req.text
if symbol_data == "" or symbol_data.startswith(" <!DOCTYPE"):
raise Exception
except Exception:
failed.append(symbol)
msg = "Error fetching symbol {} data".format(symbol)
logger.error(msg, exc_info=True)
if symbol not in mute_notifications:
slack_notification(msg, __name__)
else:
_save_data(symbol, symbol_data)
done.append(symbol)
if len(done) > 0:
msg = "Successfully scraped symbols: " + ", ".join(done)
slack_notification(msg, __name__, status=Status.Success)
if len(failed) > 0:
msg = "Failed to scrape symbols: " + ", ".join(failed)
slack_notification(msg, __name__, status=Status.Warning)
def aggregate_monthly_data(symbols=None):
"""Aggregate daily snapshots into monthly files and validate data"""
symbols = symbols or _get_all_listed_symbols()
save_data_path = utils.get_save_data_path()
scraper_dir = os.path.join(save_data_path, "cboe")
symbols = [symbol.upper() for symbol in symbols]
for symbol in symbols:
daily_dir = os.path.join(scraper_dir, symbol + "_daily")
if not os.path.exists(daily_dir):
msg = "Error aggregating data. Dir {} not found.".format(daily_dir)
logger.error(msg)
slack_notification(msg, __name__)
continue
monthly_dir = os.path.join(scraper_dir, symbol)
symbol_files = [
file for file in os.listdir(daily_dir) if file.endswith(".csv")
]
for month, files in groupby(symbol_files, _monthly_grouper):
file_names = list(files)
daily_files = [
os.path.join(daily_dir, name) for name in file_names
]
try:
symbol_df = concatenate_files(daily_files)
except Exception:
msg = "Error concatenating daily files for period " + month
logger.error(msg, exc_info=True)
slack_notification(msg, __name__)
continue
date_range = pd.to_datetime(symbol_df["quotedate"].unique())
if not validation.validate_dates_in_month(symbol, date_range):
today = pd.Timestamp.today()
first_date = date_range[0]
if first_date.year != today.year or first_date.month != today.month:
msg = "Some trading dates where missing for symbol {}".format(
symbol)
slack_notification(msg, __name__)
continue
if not os.path.exists(monthly_dir):
os.makedirs(monthly_dir)
logger.debug("Symbol dir %s created", monthly_dir)
file_name = _monthly_filename(file_names)
monthly_file = os.path.join(monthly_dir, file_name)
symbol_df.to_csv(monthly_file, index=False)
if not validation.validate_aggregate_file(monthly_file,
daily_files):
utils.remove_file(monthly_file)
msg = "Data in {} differs from the daily files".format(
monthly_file)
logger.error(msg)
slack_notification(msg, __name__)
continue
logger.debug("Saved monthly data %s", monthly_file)
for file in daily_files:
utils.remove_file(file, logger)
def _get_all_listed_symbols():
"""Returns array of all listed symbols.
http://www.cboe.com/publish/scheduledtask/mktdata/cboesymboldir2.csv
"""
current_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
symbols_file = os.path.realpath(
os.path.join(current_dir, "cboesymboldir2.csv"))
symbols_df = pd.read_csv(symbols_file, skiprows=1)
return symbols_df["Stock Symbol"].array
def concatenate_files(files):
"""Returns a dataframe of the concatenated data from `files`."""
df_generator = (pd.read_csv(file) for file in sorted(files))
return pd.concat(df_generator, ignore_index=True)
def _form_data():
"""Return validation form data"""
homepage = requests.get(url)
soup = BeautifulSoup(homepage.content, "lxml")
data = {
"__VIEWSTATE": soup.select_one("#__VIEWSTATE")["value"],
"__EVENTVALIDATION": soup.select_one("#__EVENTVALIDATION")["value"]
}
return data
def _save_data(symbol, symbol_data):
"""Saves the contents of `symbol_data` to
`$SAVE_DATA_PATH/cboe/{symbol}_daily/{symbol}_{%date}.csv`
"""
filename = date.today().strftime(symbol + "_%Y%m%d.csv")
save_data_path = utils.get_save_data_path()
symbol_dir = os.path.join(save_data_path, "cboe", symbol + "_daily")
if not os.path.exists(symbol_dir):
os.makedirs(symbol_dir)
logger.debug("Symbol dir %s created", symbol_dir)
file_path = os.path.join(symbol_dir, filename)
if os.path.exists(file_path) and validation.file_hash_matches_data(
file_path, symbol_data):
logger.debug("File %s already downloaded", file_path)
else:
daily_df = _wrangle_data(symbol, symbol_data)
daily_df.to_csv(file_path, index=False)
logger.debug("Saved daily symbol data as %s", file_path)
def _wrangle_data(symbol, symbol_data):
"""Returns a properly formated (_tidy_) dataframe"""
string_data = StringIO(symbol_data)
first_line = string_data.readline()
spot_price = float(first_line.split(",")[-2])
quote_date = date.today().strftime("%m/%d/%Y")
data = pd.read_csv(string_data, skiprows=1)
call_columns = [
"Calls", "Expiration Date", "Strike", "Last Sale", "Net", "Bid", "Ask",
"Vol", "Open Int", "IV", "Delta", "Gamma"
]
calls = data[call_columns]
put_columns = [
"Puts", "Expiration Date", "Strike", "Last Sale.1", "Net.1", "Bid.1",
"Ask.1", "Vol.1", "Open Int.1", "IV.1", "Delta.1", "Gamma.1"
]
puts = data[put_columns]
renamed_columns = [
"optionroot", "expiration", "strike", "last", "net", "bid", "ask",
"volume", "openinterest", "impliedvol", "delta", "gamma"
]
calls.columns = renamed_columns
calls.insert(loc=1, column="type", value="call")
puts.columns = renamed_columns
puts.insert(loc=1, column="type", value="put")
merged = pd.concat([calls, puts])
merged.insert(loc=0, column="underlying", value=symbol)
merged.insert(loc=1, column="underlying_last", value=spot_price)
merged.insert(loc=2, column="exchange", value="CBOE")
merged.insert(loc=6, column="quotedate", value=quote_date)
return merged
def _monthly_grouper(filename):
"""Returns `{year}{month}` string. Used to group files by month."""
basename = filename.split(".")[0]
file_date = basename.split("_")[1]
return file_date[:-2]
def _monthly_filename(filenames):
"""Returns filename of monthly aggregate file in the form
`{symbol}_{start_date}_to_{end_date}.csv`
"""
sorted_files = list(sorted(filenames))
first_file = sorted_files[0]
last_file = sorted_files[-1]
last_day = last_file.split(".")[0][-8:] # Get only the date
file_name = first_file.split(".")[0] + "_to_" + last_day + ".csv"
return file_name
-66
View File
@@ -1,66 +0,0 @@
# CBOE data scraper
# Requires Selenium and a headless Chrome driver
import tempfile
import time
import os
import shutil
from datetime import date
from selenium import webdriver
class CBOE():
"""CBOE data downloader."""
url = "http://www.cboe.com/delayedquote/quote-table-download"
def __init__(self):
self.data_path = self._get_data_path()
self.tmp_dir = tempfile.TemporaryDirectory()
self.driver = self._initilize_driver(self.tmp_dir.name)
def _get_data_path():
path = os.getenv("OPTIONS_DATA_PATH")
if not path:
raise EnvironmentError("Environment variable $OPTIONS_DATA_PATH not set")
return os.path.expanduser(path)
def _initilize_driver(download_dir):
"""Initilizes the Chrome driver to silently download files
to a temporary directory.
"""
options = webdriver.ChromeOptions()
options.add_argument("headless")
options.add_argument("disable-gpu")
driver = webdriver.Chrome(options=options)
driver.command_executor._commands["send_command"] = (
"POST",
"/session/$sessionId/chromium/send_command"
)
params = {
"cmd": "Page.setDownloadBehavior",
"params": {
"behavior": "allow",
"downloadPath": download_dir
}
}
driver.execute("send_command", params)
driver.implicitly_wait(10)
return driver
def fetch_data(self, symbols):
"""Fetches options data for a given list of symbols"""
self.driver.get(CBOE.url)
for symbol in symbols:
ticker = self.driver.find_element_by_css_selector("input#txtTicker")
ticker.send_keys(symbol)
submit = self.driver.find_element_by_css_selector("input#cmdSubmit")
submit.click()
time.sleep(15) # Horrible hack
download_path = os.path.join(self.tmp_dir.name, "quotedata.dat")
renamed_file = date.today().strftime(symbol + "_%Y%m%d.csv")
full_path = os.path.join(self.data_path, renamed_file)
shutil.move(download_path, full_path)
def __del__(self):
self.tmp_dir.cleanup()
File diff suppressed because it is too large Load Diff
-39
View File
@@ -1,39 +0,0 @@
[loggers]
keys=root,tiingo,cboe
[handlers]
keys=consoleHandler,fileHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=ERROR
handlers=consoleHandler
[logger_tiingo]
level=DEBUG
handlers=consoleHandler,fileHandler
qualname=data_scraper.tiingo
propagate=0
[logger_cboe]
level=DEBUG
handlers=consoleHandler,fileHandler
qualname=data_scraper.cboe
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=ERROR
formatter=simpleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=handlers.RotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=("data_scraper.log", "a", 3000000, 10)
[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
-59
View File
@@ -1,59 +0,0 @@
import logging
from datetime import datetime
from enum import Enum
import requests
from .utils import get_module_config
logger = logging.getLogger(__name__)
Status = Enum("Status", "Success Warning Error")
options = get_module_config("notifications")
try:
webhook = options["slack_webhook"]
except KeyError as e:
logger.error("Missing slack webhook from configuration file")
raise e
payload = {
"channel": "#algotrading",
"username": "Talebot",
"icon_emoji": ":taleb:",
"attachments": [{
"footer": "Talebot"
}]
}
def slack_notification(text, scraper, status=Status.Error):
"""Post Slack notification"""
if status == Status.Error:
emoji = ":thumbsdown: "
title = "data_scraper error"
color = "#B22222"
else:
title = "data_scraper status report"
if status == Status.Success:
emoji = ":thumbsup: "
color = "#49C39E"
else:
emoji = ":warning: "
color = "#EDB625"
msg = emoji + text
payload["attachments"][0]["fallback"] = msg
payload["attachments"][0]["text"] = msg
payload["attachments"][0]["color"] = color
payload["attachments"][0]["title"] = title
payload["attachments"][0]["fields"] = [{"title": scraper}]
payload["attachments"][0]["ts"] = datetime.today().timestamp()
response = requests.post(webhook, json=payload)
if response.status_code != 200:
msg = "Error connecting to Slack {}. Response is:\n{}".format(
response.status_code, response.text)
logger.error(msg)
-41
View File
@@ -1,41 +0,0 @@
#!/bin/bash
## This script downloads stock options data,
## veryfies zipfile integrity, saves md5 signature
## and uploads them to S3.
## To run, pass a list of files to download:
## $> ./backup.sh files.txt
TMPDIR=tmp
NOW=$(date +"%m-%d-%Y-%H%M%S")
RETRY="${NOW}.txt"
MD5SUMS=md5sums.txt
mkdir -p $TMPDIR
while read filename
do
echo "Downloading file $filename to $TMPDIR"
wget --quiet -P $TMPDIR "ftp://l3_hdall:JKNRH7LYXV@ftp.deltaneutral.com/${filename}"
newpath="$TMPDIR/$filename"
echo "Verifying zipfile $newpath"
if unzip -t -q $newpath
then
echo "File check OK"
else
echo "ERROR: File check failed for $newfile"
echo $filename >> $RETRY
rm $newpath
continue
fi
echo "Appending md5 sum for $f"
md5sum $newpath >> $MD5SUMS
echo "Copying $newpath to S3 bucket"
rclone copy -v $newpath longueduree:longueduree
echo "Deleting $newpath"
rm $newpath
done <$1
View File
-104
View File
@@ -1,104 +0,0 @@
import logging
import unittest
from unittest.mock import patch
import os
import shutil
from requests import ConnectionError
import pandas as pd
from data_scraper import cboe
logging.disable(level=logging.CRITICAL)
class TestCBOE(unittest.TestCase):
"""Tests CBOE data scraper"""
test_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
test_data_path = os.path.realpath(os.path.join(test_dir, "data"))
cboe_data_path = os.path.join(test_data_path, "cboe")
spx_data_path = os.path.join(cboe_data_path, "SPX_March_2019.csv")
@classmethod
def setUpClass(cls):
cls.save_data_path = os.environ.get("SAVE_DATA_PATH", None)
os.environ["SAVE_DATA_PATH"] = cls.test_data_path
@classmethod
def tearDownClass(cls):
if cls.save_data_path:
os.environ["SAVE_DATA_PATH"] = cls.save_data_path
@patch("data_scraper.cboe.slack_notification", return_value=None)
def test_fetch_spy(self, mocked_notification):
"""Fetch todays SPY quote"""
cboe.fetch_data(["SPY"])
spy_dir = os.path.join(TestCBOE.cboe_data_path, "SPY_daily")
self.addCleanup(TestCBOE.remove_files, spy_dir)
if self.assertTrue(os.path.exists(spy_dir)):
self.assertTrue(mocked_notification.called)
file_name = "SPY_" + pd.Timestamp.today().strftime(
"%Y%m%d") + ".csv"
file_path = os.path.join(spy_dir, file_name)
spy_df = pd.read_csv(file_path, parse_dates=["quotedate"])
self.assertTrue(all(spy_df["underlying"] == "SPX"))
self.assertEqual(spy_df["quotedate"].nunique(), 1)
counts = spy_df["type"].value_counts()
self.assertEqual(counts["put"] + counts["call"], len(spy_df))
@patch("data_scraper.cboe.slack_notification", return_value=None)
def test_fetch_invalid_symbol(self, mocked_notification):
"""Fetching invalid symbol should send notification"""
cboe.fetch_data(["FOOBAR"])
self.assertTrue(mocked_notification.called)
@patch("data_scraper.cboe.url", new="http://www.aldkfjaskldfjsa.com")
@patch("data_scraper.cboe.slack_notification", return_value=None)
def test_no_connection(self, mocked_notification):
"""Raise ConnectionError and send notification when host is unreachable"""
with self.assertRaises(ConnectionError):
cboe.fetch_data(["SPX"])
self.assertTrue(mocked_notification.called)
@patch("data_scraper.cboe.utils.remove_file", return_value=None)
@patch("data_scraper.cboe.slack_notification", return_value=None)
def test_data_aggregation(self, mocked_notification, mocked_remove):
"""Test data aggregation happy path"""
cboe.aggregate_monthly_data(["SPX"])
aggregate_file = os.path.join(TestCBOE.cboe_data_path, "SPX",
"SPX_20190301_to_20190329.csv")
self.addCleanup(TestCBOE.remove_files, os.path.dirname(aggregate_file))
self.assertTrue(mocked_remove.called)
self.assertFalse(mocked_notification.called)
if self.assertTrue(os.path.exists(aggregate_file)):
spx_df = pd.read_csv(TestCBOE.spx_data_path)
aggregate_df = pd.read_csv(aggregate_file)
self.assertTrue(spx_df.equals(aggregate_df))
@patch("data_scraper.cboe.utils.remove_file", return_value=None)
@patch("data_scraper.cboe.slack_notification", return_value=None)
def test_aggregate_missing_days(self, mocked_notification, mocked_remove):
"""Data aggregation should send notification when there are missing days"""
cboe.aggregate_monthly_data(["GOOG"])
self.assertTrue(mocked_notification.called)
self.assertFalse(mocked_remove.called)
@patch("data_scraper.cboe.utils.remove_file", return_value=None)
@patch("data_scraper.cboe.slack_notification", return_value=None)
def test_aggregate_invalid_symbol(self, mocked_notification,
mocked_remove):
"""Data aggregation should fail and send notification on invalid symbol"""
cboe.aggregate_monthly_data(["FOOBAR"])
self.assertTrue(mocked_notification.called)
self.assertFalse(mocked_remove.called)
def remove_files(file_path):
if os.path.exists(file_path):
shutil.rmtree(file_path)
if __name__ == "__main__":
unittest.main()
-75
View File
@@ -1,75 +0,0 @@
import logging
import unittest
from unittest.mock import patch
import os
import shutil
import pandas as pd
from data_scraper import tiingo
logging.disable(level=logging.CRITICAL)
class TestTiingo(unittest.TestCase):
"""Tests Tiingo data scraper"""
test_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
test_data_path = os.path.realpath(os.path.join(test_dir, "data"))
tiingo_data_path = os.path.join(test_data_path, "tiingo")
@classmethod
def setUpClass(cls):
assert "TIINGO_API_KEY" in os.environ, "$TIINGO_API_KEY env variable must be set"
cls.save_data_path = os.environ.get("SAVE_DATA_PATH", None)
os.environ["SAVE_DATA_PATH"] = cls.test_data_path
@classmethod
def tearDownClass(cls):
if cls.save_data_path:
os.environ["SAVE_DATA_PATH"] = cls.save_data_path
@patch("data_scraper.tiingo.slack_notification", return_value=None)
def test_fetch_gld(self, mocked_notification):
"""Fetch GLD data"""
tiingo.fetch_data(["GLD"])
gld_dir = os.path.join(TestTiingo.tiingo_data_path, "GLD")
self.addCleanup(TestTiingo.remove_files, gld_dir)
if self.assertTrue(os.path.exists(gld_dir)):
self.assertTrue(mocked_notification.called)
file_name = "GLD_" + pd.Timestamp.today().strftime(
"%Y%m%d") + ".csv"
file_path = os.path.join(gld_dir, file_name)
gld_df = pd.read_csv(file_path)
self.assertTrue(all(gld_df["symbol"] == "GLD"))
expected_columns = [
"symbol", "date", "adjClose", "adjHigh", "adjLow", "adjOpen",
"adjVolume", "close", "divCash", "high", "low", "open",
"splitFactor", "volume"
]
self.assertEqual(gld_df.columns, expected_columns)
@patch("data_scraper.tiingo.slack_notification", return_value=None)
def test_fetch_invalid_symbol(self, mocked_notification):
"""Fetching invalid symbol data should send notification"""
tiingo.fetch_data(["FOOBAR"])
self.assertTrue(mocked_notification.called)
@patch("data_scraper.tiingo.pdr.get_data_tiingo") # mock pandas_datareader
@patch("data_scraper.tiingo.slack_notification", return_value=None)
def test_no_connection(self, mocked_notification, mocked_pdr):
"""Raise ConnectionError and send notification when host is unreachable"""
mocked_pdr.side_effect = ConnectionError("This is a test")
with self.assertRaises(ConnectionError):
tiingo.fetch_data(["IBM"])
self.assertTrue(mocked_notification.called)
def remove_files(file_path):
if os.path.exists(file_path):
shutil.rmtree(file_path)
if __name__ == "__main__":
unittest.main()
-122
View File
@@ -1,122 +0,0 @@
import logging
import os
from datetime import date
import pandas as pd
import pandas_datareader as pdr
from . import utils, validation
from .notifications import slack_notification, Status
logger = logging.getLogger(__name__)
# Default symbols to fetch
assets = [
"VTSMX", "VFINX", "VIVAX", "VIGRX", "VIMSX", "VMVIX", "VMGIX", "NAESX",
"VISVX", "VISGX", "BRSIX", "VGTSX", "VTMGX", "VFSVX", "EFV", "VEURX",
"VPACX", "VEIEX", "VFISX", "VFITX", "IEF", "VUSTX", "VBMFX", "VIPSX",
"PIGLX", "PGBIX", "VFSTX", "LQD", "VWESX", "VWEHX", "VWSTX", "VWITX",
"VWLTX", "VGSIX", "GLD", "PSAU", "GSG"
]
def fetch_data(symbols=assets):
"""Fetches historical data for given symbols from Tiingo"""
api_key = utils.get_environment_var("TIINGO_API_KEY")
symbols = [symbol.upper() for symbol in symbols]
done, failed = [], []
for symbol in symbols:
try:
symbol_data = pdr.get_data_tiingo(symbol, api_key=api_key)
except ConnectionError as ce:
msg = "Unable to connect to api.tiingo.com when fetching symbol {}".format(
symbol)
logger.error(msg, exc_info=True)
slack_notification(msg, __name__)
raise ce
except TypeError:
# pandas_datareader raises TypeError when fetching invalid symbol
failed.append(symbol)
msg = "Attempted to fetch invalid symbol {}".format(symbol)
logger.error(msg, exc_info=True)
slack_notification(msg, __name__)
except Exception:
msg = "Error fetching symbol {}".format(symbol)
logger.error(msg, exc_info=True)
slack_notification(msg, __name__)
else:
_save_data(symbol, symbol_data.reset_index())
done.append(symbol)
if len(done) > 0:
msg = "Successfully scraped symbols: " + ", ".join(done)
slack_notification(msg, __name__, status=Status.Success)
if len(failed) > 0:
msg = "Failed to scrape symbols: " + ", ".join(failed)
slack_notification(msg, __name__, status=Status.Warning)
def _save_data(symbol, symbol_df):
"""Saves the contents of `symbol_df` to
`$SAVE_DATA_PATH/tiingo/{symbol}/{symbol}_{%date}.csv`"""
filename = date.today().strftime(symbol + "_%Y%m%d.csv")
save_data_path = utils.get_save_data_path()
symbol_dir = os.path.join(save_data_path, "tiingo", symbol)
if not os.path.exists(symbol_dir):
os.makedirs(symbol_dir)
logger.debug("Symbol dir %s created", symbol_dir)
file_path = os.path.join(symbol_dir, filename)
if os.path.exists(file_path) and validation.file_hash_matches_data(
file_path, symbol_df.to_csv()):
logger.debug("File %s already downloaded", file_path)
else:
expected_columns = [
"symbol", "date", "adjClose", "adjHigh", "adjLow", "adjOpen",
"adjVolume", "close", "divCash", "high", "low", "open",
"splitFactor", "volume"
]
if validation.validate_historical_dates(
symbol, symbol_df["date"]) and validation.validate_columns(
expected_columns, symbol_df.columns):
merged_df = _merge(symbol, symbol_df)
pattern = symbol + "_*"
utils.remove_files(symbol_dir, pattern, logger)
merged_df.to_csv(file_path, index=False)
logger.debug("Saved symbol data as %s", file_path)
def _merge(symbol, symbol_df):
"""Merge `symbol_df` with previous data file."""
save_data_path = utils.get_save_data_path()
symbol_dir = os.path.join(save_data_path, "tiingo", symbol)
files = os.listdir(symbol_dir)
if len(files) == 0:
return symbol_df
last_file = sorted(files)[-1]
old_df = pd.read_csv(os.path.join(symbol_dir, last_file),
parse_dates=["date"],
index_col="date")
symbol_df.index = symbol_df["date"]
diffs = old_df.index.difference(symbol_df.index)
if diffs.empty:
return symbol_df
else:
msg = """Old data included dates not present in scraped file for symbol {}
Merged new data with previous file.""".format(symbol)
logger.error(msg)
slack_notification(msg, __name__)
merged_df = pd.concat([symbol_df, old_df.loc[diffs]])
merged_df.sort_index(inplace=True)
return merged_df.reset_index()
-52
View File
@@ -1,52 +0,0 @@
import glob
import json
import os
def get_environment_var(variable):
"""Returns the value of a given environment variable.
Raises `EnvironmentError` if not found.
"""
if variable not in os.environ:
raise EnvironmentError(
"Environment variable {} not set".format(variable))
return os.path.expanduser(os.environ[variable])
def get_save_data_path():
"""Reads data path from environment variable `$SAVE_DATA_PATH`.
If it is not set, defaults to `./data/scraped`.
"""
try:
data_dir = get_environment_var("SAVE_DATA_PATH")
except EnvironmentError:
data_dir = "data/scraped"
os.makedirs(data_dir)
return data_dir
def get_module_config(module, config_file="data_scraper.conf"):
"""Parses configuration file and returns the configuration options
for the chosen `module`.
"""
options = {}
if os.path.exists(config_file):
with open(config_file) as file:
config = json.load(file)
options = config.get(module, {})
return options
def remove_files(data_dir, pattern, logger=None):
"""Removes files in `data_dir` that match `pattern`"""
for file in glob.glob(os.path.join(data_dir, pattern)):
remove_file(file, logger)
def remove_file(file, logger=None):
os.remove(file)
if logger:
logger.debug("Removed file %s", file)
-93
View File
@@ -1,93 +0,0 @@
import logging
import hashlib
import pandas as pd
import pandas_market_calendars as mcal
from . import cboe
from .notifications import slack_notification
logger = logging.getLogger(__name__)
def file_hash_matches_data(file_path, data):
file_hash = file_md5(file_path)
data_md5 = hashlib.md5(data.encode()).hexdigest()
return file_hash == data_md5
def file_md5(file, chunk_size=4096):
md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
md5.update(chunk)
return md5.hexdigest()
def validate_dates_in_month(symbol, date_range):
"""Compares `date_range` (month) with NYSE trading calendar.
Returns `True` if there are no missing days.
"""
# NYSE and CBOE have the same trading calendar
# https://www.nyse.com/markets/hours-calendars
# http://cfe.cboe.com/about-cfe/holiday-calendar
nyse = mcal.get_calendar("NYSE")
first_date = date_range[0]
period = pd.Period(year=first_date.year, month=first_date.month, freq="M")
trading_days = nyse.valid_days(start_date=period.start_time,
end_date=period.end_time)
# Remove timezone info
trading_days = trading_days.tz_convert(tz=None)
missing_days = trading_days.difference(date_range)
if not missing_days.empty:
logger.error("Error validating monthly dates. Missing: %s",
missing_days)
return missing_days.empty
def validate_historical_dates(symbol, date_range):
"""Compares `date_range` (any time range) with trading calendar.
Returns `True` if there are no missing days.
"""
nyse = mcal.get_calendar("NYSE")
start_date = date_range.min()
end_date = date_range.max()
trading_days = nyse.valid_days(start_date=start_date, end_date=end_date)
# Remove timezone info
trading_days = trading_days.tz_convert(tz=None)
date_range = date_range.dt.tz_convert(tz=None)
missing_days = trading_days.difference(date_range)
if not missing_days.empty:
logger.error("Error validating historical dates. Missing: %s",
missing_days)
return missing_days.empty
def validate_columns(expected, received):
"""Verify that the `received` columns scraped are equal to `expected`"""
valid = all(expected == received)
if not valid:
expected_cols = ", ".join(expected)
received_cols = ", ".join(received)
msg = """Columns expected differ from those received.
Expected: {}
Received: {}""".format(expected_cols, received_cols)
logger.error(msg)
slack_notification(msg, __name__)
return valid
def validate_aggregate_file(aggregate_file, daily_files):
"""Compares `aggregate_file` with the data from `daily_files`."""
aggregate_df = pd.read_csv(aggregate_file)
recreated_df = cboe.concatenate_files(daily_files)
return aggregate_df.equals(recreated_df)
-19
View File
@@ -1,19 +0,0 @@
FROM debian:latest
MAINTAINER Juan Pablo Amoroso <jamoroso@lambdaclass.com>
RUN apt-get update
RUN apt-get install -y python3 python3-pip make cron build-essential pkg-config openssl libssl-dev
RUN python3 -m pip install pipenv
ENV LC_ALL=C.UTF-8 LANG=C.UTF-8
COPY . /finance
WORKDIR /finance
RUN make init
COPY ./docker/data_scraper/crontab /etc/cron.d/scraper-cron
COPY ./docker/data_scraper/entrypoint.sh /usr/bin/entrypoint.sh
COPY ./docker/data_scraper/run-task.sh /usr/bin/run-task
RUN chmod 0644 /etc/cron.d/scraper-cron && crontab /etc/cron.d/scraper-cron
ENTRYPOINT ["entrypoint.sh"]
CMD ["cron", "-f"]
-4
View File
@@ -1,4 +0,0 @@
0 19 * * 1-5 root cd /finance && run-task make scrape scraper=cboe
0 19 * * 1-5 root cd /finance && run-task make scrape scraper=tiingo
0 0 1 * * root cd /finance && run-task make aggregate && run-task make backup
-5
View File
@@ -1,5 +0,0 @@
#!/bin/bash
# cron does not read env, save it here
env > /root/env
exec "$@"
-4
View File
@@ -1,4 +0,0 @@
#!/bin/bash
# import env vars that were written in entrypoint
env - `cat /root/env` $@
-8
View File
@@ -1,8 +0,0 @@
version: "3"
services:
scraper:
image: data_scraper:latest
container_name: data_scraper
volumes:
- ~/finance/data:/finance/data