From 89f9a1179ea7d65cbf4cb5e01a077139cc127500 Mon Sep 17 00:00:00 2001 From: fredfortier Date: Fri, 8 Dec 2017 15:15:18 -0500 Subject: [PATCH] BLD: improvements to stats output --- catalyst/exchange/exchange_algorithm.py | 64 +++++++++++++++++++++---- catalyst/exchange/stats_utils.py | 46 ++++++++++++++++-- 2 files changed, 97 insertions(+), 13 deletions(-) diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index 147d87e4..2ce0357f 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -36,7 +36,8 @@ from catalyst.exchange.exchange_utils import save_algo_object, get_algo_object, save_algo_df, group_assets_by_exchange from catalyst.exchange.live_graph_clock import LiveGraphClock from catalyst.exchange.simple_clock import SimpleClock -from catalyst.exchange.stats_utils import get_pretty_stats, stats_to_s3 +from catalyst.exchange.stats_utils import get_pretty_stats, stats_to_s3, \ + stats_to_algo_folder from catalyst.finance.execution import MarketOrder from catalyst.finance.performance.period import calc_period_stats from catalyst.gens.tradesimulation import AlgorithmSimulator @@ -61,6 +62,8 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm): super(ExchangeTradingAlgorithmBase, self).__init__(*args, **kwargs) + self.current_day = None + if self.simulate_orders is None \ and self.sim_params.arena == 'backtest': self.simulate_orders = True @@ -281,6 +284,8 @@ class ExchangeTradingAlgorithmBacktest(ExchangeTradingAlgorithmBase): ) self.frame_stats.append(frame_stats) + self.current_day = data.current_dt.floor('1D') + def _create_stats_df(self): stats = pd.DataFrame(self.frame_stats) stats.set_index('period_close', inplace=True, drop=False) @@ -308,7 +313,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): self.stats_output = kwargs.pop('stats_output', None) self._clock = None - self.frame_stats = deque(maxlen=60) + self.frame_stats = list() self.pnl_stats = get_algo_df(self.algo_namespace, 'pnl_stats') @@ -326,7 +331,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): self.retry_order = 2 self.retry_delay = 5 - self.stats_minutes = 20 + self.stats_minutes = 10 super(ExchangeTradingAlgorithmLive, self).__init__(*args, **kwargs) @@ -438,27 +443,49 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): return self.perf_tracker.get_account(False) def synchronize_portfolio(self, attempt_index=0): + """ + Synchronizes the portfolio tracked by the algorithm to refresh + its current value. + + This includes updating the last_sale_price of all tracked + positions, returning the available cash, and raising error + if the data goes out of sync. + + Parameters + ---------- + attempt_index: int + + Returns + ------- + float + The amount of base currency available for trading. + + float + The total value of all tracked positions. + + """ tracker = self.perf_tracker.position_tracker total_cash = 0.0 total_positions_value = 0.0 try: # Position keys correspond to assets - assets = list(tracker.positions) + positions = self.portfolio.positions + assets = list(positions) exchange_assets = group_assets_by_exchange(assets) for exchange_name in self.exchanges: assets = exchange_assets[exchange_name] \ if exchange_name in exchange_assets else [] exchange_positions = \ - [tracker.positions[asset] for asset in assets] + [positions[asset] for asset in assets] exchange = self.exchanges[exchange_name] # Type: Exchange cash, positions_value = \ exchange.calculate_totals(exchange_positions) total_cash += cash - total_positions_value += total_positions_value + total_positions_value += positions_value for position in exchange_positions: tracker.update_position( @@ -471,6 +498,7 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): raise ValueError('Cash on exchanges is lower than the algo.') return total_cash, total_positions_value + except ExchangeRequestError as e: log.warn( 'update portfolio attempt {}: {}'.format(attempt_index, e) @@ -574,6 +602,11 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): if not self.is_running: return + # Resetting the frame stats every day to minimize memory footprint + today = data.current_dt.floor('1D') + if self.current_day is not None and today > self.current_day: + self.frame_stats = list() + new_transactions, new_commissions, closed_orders = \ self.blotter.get_transactions(data) @@ -625,7 +658,6 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): ) )) - today = pd.to_datetime('today', utc=True) daily_stats = self.prepare_period_stats( start_dt=today, end_dt=pd.Timestamp.utcnow() @@ -639,6 +671,17 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): except Exception as e: log.warn('unable to calculate performance: {}'.format(e)) + csv_bytes = None + try: + csv_bytes = stats_to_algo_folder( + stats=self.frame_stats, + algo_namespace=self.algo_namespace, + recorded_cols=recorded_cols, + ) + + except Exception as e: + log.warn('unable save stats locally: {}'.format(e)) + try: if self.stats_output is not None: if 's3://' in self.stats_output: @@ -647,14 +690,17 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): stats=self.frame_stats, algo_namespace=self.algo_namespace, recorded_cols=recorded_cols, + bytes_to_write=csv_bytes ) else: raise ValueError( 'Only S3 stats output is supported for now.' ) + except Exception as e: - log.warn('unable save stats: {}'.format(e)) + log.warn('unable save stats externally: {}'.format(e)) + # TODO: pickle does not seem to work in python 3 try: save_algo_object( @@ -665,6 +711,8 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): except Exception as e: log.warn('unable to save minute perfs to disk: {}'.format(e)) + self.current_day = data.current_dt.floor('1D') + @api_method def batch_market_order(self, share_counts): raise NotImplementedError() diff --git a/catalyst/exchange/stats_utils.py b/catalyst/exchange/stats_utils.py index ec89a340..b5a080dc 100644 --- a/catalyst/exchange/stats_utils.py +++ b/catalyst/exchange/stats_utils.py @@ -3,12 +3,15 @@ import numbers import copy import numpy as np +import os import pandas as pd import boto3 import time from catalyst.assets._assets import TradingPair +from catalyst.exchange.exchange_utils import get_algo_folder + s3 = boto3.resource('s3') @@ -183,7 +186,7 @@ def prepare_stats(stats, recorded_cols=list()): """ asset_cols = list() - stats = copy.deepcopy(list(stats)) + stats = copy.deepcopy(stats) # Using a copy since we are adding rows inside the loop. for row_index, row_data in enumerate(list(stats)): assets = [p['sid'] for p in row_data['positions']] @@ -301,18 +304,51 @@ def get_csv_stats(stats, recorded_cols=None): ).encode() -def stats_to_s3(uri, stats, algo_namespace, recorded_cols=None): - bytes_to_write = get_csv_stats(stats, recorded_cols=recorded_cols) +def stats_to_s3(uri, stats, algo_namespace, recorded_cols=None, + folder='catalyst/stats', bytes_to_write=None): + """ + Uploads the performance stats to a S3 bucket. + + Parameters + ---------- + uri: str + stats: list[Object] + algo_namespace: str + recorded_cols: list[str] + folder: str + bytes_to_write: str + Option to reuse bytes instead of re-computing the csv + + Returns + ------- + + """ + if bytes_to_write is None: + bytes_to_write = get_csv_stats(stats, recorded_cols=recorded_cols) timestr = time.strftime('%Y%m%d') parts = uri.split('//') - obj = s3.Object(parts[1], 'stats/{}-{}.csv'.format( - timestr, algo_namespace + obj = s3.Object(parts[1], '{}/{}-{}.csv'.format( + folder, timestr, algo_namespace )) obj.put(Body=bytes_to_write) +def stats_to_algo_folder(stats, algo_namespace, recorded_cols=None): + bytes_to_write = get_csv_stats(stats, recorded_cols=recorded_cols) + + timestr = time.strftime('%Y%m%d') + folder = get_algo_folder(algo_namespace) + + filename = os.path.join(folder, '{}-{}.csv'.format(timestr, 'frames')) + + with open(filename, 'wb') as handle: + handle.write(bytes_to_write) + + return bytes_to_write + + def df_to_string(df): """ Create a formatted str representation of the DataFrame.