Merge branch 'develop' of github.com:enigmampc/catalyst into develop

This commit is contained in:
Victor Grau Serrat
2017-12-08 13:18:45 -07:00
2 changed files with 97 additions and 13 deletions
+56 -8
View File
@@ -40,7 +40,8 @@ from catalyst.exchange.exchange_utils import (
group_assets_by_exchange, )
from catalyst.exchange.live_graph_clock import LiveGraphClock
from catalyst.exchange.simple_clock import SimpleClock
from catalyst.exchange.stats_utils import get_pretty_stats, stats_to_s3
from catalyst.exchange.stats_utils import get_pretty_stats, stats_to_s3, \
stats_to_algo_folder
from catalyst.finance.execution import MarketOrder
from catalyst.finance.performance.period import calc_period_stats
from catalyst.gens.tradesimulation import AlgorithmSimulator
@@ -64,6 +65,8 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm):
super(ExchangeTradingAlgorithmBase, self).__init__(*args, **kwargs)
self.current_day = None
if self.simulate_orders is None \
and self.sim_params.arena == 'backtest':
self.simulate_orders = True
@@ -284,6 +287,8 @@ class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase):
)
self.frame_stats.append(frame_stats)
self.current_day = data.current_dt.floor('1D')
def _create_stats_df(self):
stats = pd.DataFrame(self.frame_stats)
stats.set_index('period_close', inplace=True, drop=False)
@@ -311,7 +316,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
self.stats_output = kwargs.pop('stats_output', None)
self._clock = None
self.frame_stats = deque(maxlen=60)
self.frame_stats = list()
self.pnl_stats = get_algo_df(self.algo_namespace, 'pnl_stats')
@@ -329,7 +334,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
self.retry_order = 2
self.retry_delay = 5
self.stats_minutes = 20
self.stats_minutes = 10
super(ExchangeTradingAlgorithmLive, self).__init__(*args, **kwargs)
@@ -441,27 +446,49 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
return self.perf_tracker.get_account(False)
def synchronize_portfolio(self, attempt_index=0):
"""
Synchronizes the portfolio tracked by the algorithm to refresh
its current value.
This includes updating the last_sale_price of all tracked
positions, returning the available cash, and raising error
if the data goes out of sync.
Parameters
----------
attempt_index: int
Returns
-------
float
The amount of base currency available for trading.
float
The total value of all tracked positions.
"""
tracker = self.perf_tracker.position_tracker
total_cash = 0.0
total_positions_value = 0.0
try:
# Position keys correspond to assets
assets = list(tracker.positions)
positions = self.portfolio.positions
assets = list(positions)
exchange_assets = group_assets_by_exchange(assets)
for exchange_name in self.exchanges:
assets = exchange_assets[exchange_name] \
if exchange_name in exchange_assets else []
exchange_positions = \
[tracker.positions[asset] for asset in assets]
[positions[asset] for asset in assets]
exchange = self.exchanges[exchange_name] # Type: Exchange
cash, positions_value = \
exchange.calculate_totals(exchange_positions)
total_cash += cash
total_positions_value += total_positions_value
total_positions_value += positions_value
for position in exchange_positions:
tracker.update_position(
@@ -474,6 +501,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
raise ValueError('Cash on exchanges is lower than the algo.')
return total_cash, total_positions_value
except ExchangeRequestError as e:
log.warn(
'update portfolio attempt {}: {}'.format(attempt_index, e)
@@ -577,6 +605,11 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
if not self.is_running:
return
# Resetting the frame stats every day to minimize memory footprint
today = data.current_dt.floor('1D')
if self.current_day is not None and today > self.current_day:
self.frame_stats = list()
new_transactions, new_commissions, closed_orders = \
self.blotter.get_transactions(data)
@@ -629,7 +662,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
)
))
today = pd.to_datetime('today', utc=True)
daily_stats = self.prepare_period_stats(
start_dt=today,
end_dt=pd.Timestamp.utcnow()
@@ -643,6 +675,17 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
except Exception as e:
log.warn('unable to calculate performance: {}'.format(e))
csv_bytes = None
try:
csv_bytes = stats_to_algo_folder(
stats=self.frame_stats,
algo_namespace=self.algo_namespace,
recorded_cols=recorded_cols,
)
except Exception as e:
log.warn('unable save stats locally: {}'.format(e))
try:
if self.stats_output is not None:
if 's3://' in self.stats_output:
@@ -651,14 +694,17 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
stats=self.frame_stats,
algo_namespace=self.algo_namespace,
recorded_cols=recorded_cols,
bytes_to_write=csv_bytes
)
else:
raise ValueError(
'Only S3 stats output is supported for now.'
)
except Exception as e:
log.warn('unable save stats: {}'.format(e))
log.warn('unable save stats externally: {}'.format(e))
# TODO: pickle does not seem to work in python 3
try:
save_algo_object(
@@ -669,6 +715,8 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase):
except Exception as e:
log.warn('unable to save minute perfs to disk: {}'.format(e))
self.current_day = data.current_dt.floor('1D')
@api_method
def batch_market_order(self, share_counts):
raise NotImplementedError()
+41 -5
View File
@@ -3,12 +3,15 @@ import numbers
import copy
import numpy as np
import os
import pandas as pd
import boto3
import time
from catalyst.assets._assets import TradingPair
from catalyst.exchange.exchange_utils import get_algo_folder
s3 = boto3.resource('s3')
@@ -183,7 +186,7 @@ def prepare_stats(stats, recorded_cols=list()):
"""
asset_cols = list()
stats = copy.deepcopy(list(stats))
stats = copy.deepcopy(stats)
# Using a copy since we are adding rows inside the loop.
for row_index, row_data in enumerate(list(stats)):
assets = [p['sid'] for p in row_data['positions']]
@@ -301,18 +304,51 @@ def get_csv_stats(stats, recorded_cols=None):
).encode()
def stats_to_s3(uri, stats, algo_namespace, recorded_cols=None):
bytes_to_write = get_csv_stats(stats, recorded_cols=recorded_cols)
def stats_to_s3(uri, stats, algo_namespace, recorded_cols=None,
folder='catalyst/stats', bytes_to_write=None):
"""
Uploads the performance stats to a S3 bucket.
Parameters
----------
uri: str
stats: list[Object]
algo_namespace: str
recorded_cols: list[str]
folder: str
bytes_to_write: str
Option to reuse bytes instead of re-computing the csv
Returns
-------
"""
if bytes_to_write is None:
bytes_to_write = get_csv_stats(stats, recorded_cols=recorded_cols)
timestr = time.strftime('%Y%m%d')
parts = uri.split('//')
obj = s3.Object(parts[1], 'stats/{}-{}.csv'.format(
timestr, algo_namespace
obj = s3.Object(parts[1], '{}/{}-{}.csv'.format(
folder, timestr, algo_namespace
))
obj.put(Body=bytes_to_write)
def stats_to_algo_folder(stats, algo_namespace, recorded_cols=None):
bytes_to_write = get_csv_stats(stats, recorded_cols=recorded_cols)
timestr = time.strftime('%Y%m%d')
folder = get_algo_folder(algo_namespace)
filename = os.path.join(folder, '{}-{}.csv'.format(timestr, 'frames'))
with open(filename, 'wb') as handle:
handle.write(bytes_to_write)
return bytes_to_write
def df_to_string(df):
"""
Create a formatted str representation of the DataFrame.