diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index b33a2acb..144568d1 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -58,7 +58,6 @@ class ExceptionTestCase(TestCase): self.assertTrue(payload['date']) del payload['date'] check(self, payload, INITIALIZE_TB) - self.assertTrue(zipline.sim.ready()) self.assertFalse(zipline.sim.exception) @@ -85,7 +84,6 @@ class ExceptionTestCase(TestCase): self.assertTrue(payload['date']) del payload['date'] check(self, payload, HANDLE_DATA_TB) - self.assertTrue(zipline.sim.ready()) self.assertFalse(zipline.sim.exception) @@ -109,7 +107,6 @@ class ExceptionTestCase(TestCase): self.assertTrue(payload['date']) del payload['date'] check(self, payload, ZERO_DIV_TB) - self.assertTrue(zipline.sim.ready()) self.assertFalse(zipline.sim.exception) @@ -123,20 +120,20 @@ class ExceptionTestCase(TestCase): INITIALIZE_TB =\ {'message': 'Algo exception in initialize', 'name': 'Exception', - 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 204, 'method': 'run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 195, 'method': '_run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 235, 'method': 'loop'}, + 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'}, + {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'}, + {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.initialize_algo()', - 'lineno': 97, + 'lineno': 91, 'method': 'do_work'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.do_op(self.algorithm.initialize)', - 'lineno': 80, + 'lineno': 74, 'method': 'initialize_algo'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'callable_op(*args, **kwargs)', - 'lineno': 210, + 'lineno': 194, 'method': 'do_op'}, {'filename': '/zipline/test_algorithms.py', 'line': 'raise Exception("Algo exception in initialize")', @@ -144,54 +141,52 @@ INITIALIZE_TB =\ 'method': 'initialize'}]} HANDLE_DATA_TB =\ -{ - 'message': 'Algo exception in handle_data', +{'message': 'Algo exception in handle_data', 'name': 'Exception', - 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 204, 'method': 'run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 195, 'method': '_run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 235, 'method': 'loop'}, + 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'}, + {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'}, + {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.process_event(event)', - 'lineno': 116, + 'lineno': 110, 'method': 'do_work'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.run_algorithm()', - 'lineno': 164, + 'lineno': 158, 'method': 'process_event'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.do_op(self.algorithm.handle_data, data)', - 'lineno': 186, + 'lineno': 180, 'method': 'run_algorithm'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'callable_op(*args, **kwargs)', - 'lineno': 210, + 'lineno': 194, 'method': 'do_op'}, {'filename': '/zipline/test_algorithms.py', 'line': 'raise Exception("Algo exception in handle_data")', 'lineno': 187, 'method': 'handle_data'}]} - ZERO_DIV_TB= \ {'message': 'integer division or modulo by zero', 'name': 'ZeroDivisionError', - 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 204, 'method': 'run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 195, 'method': '_run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 235, 'method': 'loop'}, + 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'}, + {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'}, + {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.process_event(event)', - 'lineno': 116, + 'lineno': 110, 'method': 'do_work'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.run_algorithm()', - 'lineno': 164, + 'lineno': 158, 'method': 'process_event'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'self.do_op(self.algorithm.handle_data, data)', - 'lineno': 186, + 'lineno': 180, 'method': 'run_algorithm'}, {'filename': '/zipline/components/tradesimulation.py', 'line': 'callable_op(*args, **kwargs)', - 'lineno': 210, + 'lineno': 194, 'method': 'do_op'}, {'filename': '/zipline/test_algorithms.py', 'line': '5/0', 'lineno': 218, 'method': 'handle_data'}]} diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index aa8ca937..eabddaec 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -12,7 +12,7 @@ from zipline.utils.protocol_utils import ndict from zipline.utils.log_utils import ZeroMQLogHandler, stdout_only_pipe -from logbook import Logger, NestedSetup, Processor, queues +from logbook import Logger, NestedSetup, Processor log = logbook.Logger('TradeSimulation') @@ -20,7 +20,7 @@ log = logbook.Logger('TradeSimulation') class TradeSimulationClient(Component): - def init(self, trading_environment, sim_style, results_socket): + def init(self, trading_environment, sim_style, results_socket, algorithm): self.received_count = 0 self.prev_dt = None self.event_queue = None @@ -29,13 +29,20 @@ class TradeSimulationClient(Component): self.trading_environment = trading_environment self.current_dt = trading_environment.period_start self.last_iteration_dur = datetime.timedelta(seconds=0) - self.algorithm = None + self.algorithm = algorithm + self.algorithm.set_order(self.order) self.max_wait = datetime.timedelta(seconds=60) self.last_msg_dt = datetime.datetime.utcnow() - self.txn_sim = TransactionSimulator(sim_style) + self.txn_sim = TransactionSimulator( + open_orders={}, + style=sim_style + ) self.event_data = ndict() - self.perf = perf.PerformanceTracker(self.trading_environment) + self.perf = perf.PerformanceTracker( + self.trading_environment, + self.algorithm.get_sid_filter() + ) self.zmq_out = None self.results_socket = results_socket self.algo_initialized = False @@ -44,19 +51,6 @@ class TradeSimulationClient(Component): def get_id(self): return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) - def set_algorithm(self, algorithm): - """ - :param algorithm: must implement the algorithm protocol. See - :py:mod:`zipline.test.algorithm` - """ - self.algorithm = algorithm - # register the client's order method with the algorithm - self.algorithm.set_order(self.order) - # we need to provide the performance tracker with the - # sids referenced in the algorithm, so portfolio can - # initialize with all possible sids. - self.perf.set_sids(self.algorithm.get_sid_filter()) - def open(self): self.result_feed = self.connect_result() if self.results_socket: @@ -185,16 +179,6 @@ class TradeSimulationClient(Component): # LOG_EXTRA_FIELDS in zipline/protocol.py self.do_op(self.algorithm.handle_data, data) - def exception_callback(self, exc_type, exc_value, exc_traceback): - if self.results_socket: - log.info("Sending exception frame") - msg = zp.EXCEPTION_FRAME( - exc_traceback, - exc_type.__name__, - exc_value.message - ) - self.out_socket.send(msg) - def do_op(self, callable_op, *args, **kwargs): """ Wrap a callable operation with the zmq logbook handler if it exits.""" diff --git a/zipline/core/component.py b/zipline/core/component.py index fc859307..962966f4 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -17,8 +17,14 @@ import zmq from zipline.core.monitor import PARAMETERS -from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ - COMPONENT_FAILURE, CONTROL_FRAME, CONTROL_UNFRAME +from zipline.protocol import ( + CONTROL_PROTOCOL, + COMPONENT_STATE, + COMPONENT_FAILURE, + CONTROL_FRAME, + CONTROL_UNFRAME, + EXCEPTION_FRAME +) log = logbook.Logger('Component') @@ -491,7 +497,6 @@ class Component(object): self._exception = exc exc_type, exc_value, exc_traceback = sys.exc_info() - trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) # if a downstream component fails, this component may try # sending when there are zero connections to the socket, @@ -506,14 +511,19 @@ class Component(object): # sys.stdout.write(trace) log.exception("Unexpected error in run for {id}.".format(id=self.get_id)) - self.relay_exception(exc_type, exc_value, exc_traceback) - if hasattr(self, 'control_out') and self.control_out: try: - log.info('{id} sending exception to controller'.format(id=self.get_id)) + log.info('{id} sending exception to controller'\ + .format(id=self.get_id)) + msg = EXCEPTION_FRAME( + exc_traceback, + exc_type.__name__, + exc_value.message + ) + exception_frame = CONTROL_FRAME( CONTROL_PROTOCOL.EXCEPTION, - trace + msg ) self.control_out.send(exception_frame, self.zmq.NOBLOCK) # The controller should relay the exception back @@ -525,16 +535,12 @@ class Component(object): self.heartbeat(timeout=1000) log.warn("{id} never heard back from monitor."\ .format(id=self.get_id)) + except KillSignal: + log.info("{id} received confirmation from controller"\ + .format(id=self.get_id)) except: log.exception("Exception waiting for controller reply") - def relay_exception(self, exc_type, exc_value, exc_traceback): - if hasattr(self, 'exception_callback') and self.exception_callback: - log.info('{id} making exception callback'.format(id=self.get_id)) - self.exception_callback(exc_type, exc_value, exc_traceback) - - - def signal_done(self): """ Notify down stream components that we're done. diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 65c37d32..ff6adf0c 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -9,8 +9,13 @@ from signal import SIGHUP, SIGINT from collections import OrderedDict, Counter -from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ - CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \ +from zipline.protocol import ( + CONTROL_PROTOCOL, + CONTROL_FRAME, + CONTROL_UNFRAME, + CONTROL_STATES, + INVALID_CONTROL_FRAME +) from zipline.utils.protocol_utils import ndict @@ -248,6 +253,11 @@ class Controller(object): self.router.bind(self.route_socket) self.router.setsockopt(zmq.LINGER, 0) + # -- Exception Out -- + # =================== + self.ex_out = self.context.socket(self.zmq.PUSH) + self.ex_out.connect(self.exception_socket) + poller = self.zmq.Poller() poller.register(self.router, self.zmq.POLLIN) #poller.register(self.cancel, self.zmq.POLLIN) @@ -379,6 +389,7 @@ class Controller(object): """ if not self.send_sighup: log.warning("Skipping SIGINT") + return ppid = os.getpid() log.warning("Sending SIGINT") os.kill(ppid, SIGINT) @@ -417,8 +428,7 @@ class Controller(object): log.info('New component %r' % component) for component in bad: - self.fail(component) - + self.timed_out(component) for component in missing: @@ -465,22 +475,16 @@ class Controller(object): # Epic Fail Handling # ------------------ - def fail_universal(self): - # TODO: this requires higher order functionality - log.error('Component missed heartbeat, Monitor shutting down system') - self.kill() - - def fail(self, component): + def timed_out(self, component): if self.state is CONTROL_STATES.TERMINATE: return - universal = self.fail_universal - fail_handlers = { } - if component in (self.topology - self.finished) or self.freeform: log.warning('Component "%s" missed heartbeat' % component) - self.tracked.remove(component) - fail_handlers.get(component, universal)() + # we treat a time out as a severe failure, and + # conduct a rapid shutdown + self.kill() + # ------------------- # Completion Handling @@ -494,26 +498,16 @@ class Controller(object): # -------------- # Error Handling # -------------- - - def exception_universal(self): - """ - Shutdown the system on failure. - """ - log.error('System in exception state, shutting down') + def exception(self, component, exception_data): + self.error_replay[(component, time.time())] = exception_data + log.error('Component in exception state: %s. Shutting down system and sending exception data to listeners.'\ + % component) + # Send the exception message out to listeners. + self.ex_out.send(exception_data) + # An exception in one component is treated as a hard + # failure, and we conduct a rapid shutdown. self.kill() - def exception(self, component, failure): - universal = self.exception_universal - exception_handlers = { } - - if component in self.topology or self.freeform: - self.error_replay[(component, time.time())] = failure - log.error('Component in exception state: %s' % component) - - exception_handlers.get(component, universal)() - else: - raise UnknownChatter(component) - # ----------------- # Protocol Handling # ----------------- @@ -560,7 +554,19 @@ class Controller(object): # A component is telling us it failed, and how if id is CONTROL_PROTOCOL.EXCEPTION: - self.exception(identity, status) + # status should be a msgpack emitted from + # EXCEPTION_FRAME + try: + exception_data = status + self.exception(identity, exception_data) + except: + # if an exception occurs when we try to handle + # the exception, signal the parent that we need + # to go down + # TODO: should we attempt to call self.exception? + log.exception("Unexpected exception sending exception data") + self.kill() + return # A component is telling us its done with work and won't @@ -615,6 +621,8 @@ class Controller(object): log.info('Component Log for -- %s --:\n%s' % (component, error)) def kill(self): + """Aggressively exit the whole zipline. + """ if self.state is CONTROL_STATES.TERMINATE: return @@ -622,6 +630,9 @@ class Controller(object): self.send_hardkill() self.state = CONTROL_STATES.TERMINATE self.alive = False + # send burrito an interrupt, instructing it to kill all + # child processes assocated with this zipline. + time.sleep(3) self.signal_interrupt() def shutdown(self): diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index fdca878d..6871cd07 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -144,7 +144,7 @@ class PerformanceTracker(object): """ - def __init__(self, trading_environment): + def __init__(self, trading_environment, sid_list): self.trading_environment = trading_environment self.trading_day = datetime.timedelta(hours = 6, minutes = 30) @@ -164,7 +164,6 @@ class PerformanceTracker(object): self.txn_count = 0 self.event_count = 0 self.last_dict = None - self.order_log = [] self.exceeded_max_loss = False self.results_socket = None @@ -198,9 +197,14 @@ class PerformanceTracker(object): keep_transactions = True ) - def set_sids(self, sid_list): for sid in sid_list: self.cumulative_performance.positions[sid] = Position(sid) + self.todays_performance.positions[sid] = Position(sid) + + def update(self, event): + event.perf_message = self.process_event() + event.portfolio = self.get_portfolio + return event def get_portfolio(self): return self.cumulative_performance.as_portfolio() @@ -238,8 +242,6 @@ class PerformanceTracker(object): 'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict() } - def log_order(self, order): - self.order_log.append(order) def process_event(self, event): @@ -288,6 +290,8 @@ class PerformanceTracker(object): # calculate progress of test self.progress = self.day_count / self.total_days + # TODO!!!! + # Output results if self.results_socket: msg = zp.PERF_FRAME(self.to_dict()) @@ -584,7 +588,6 @@ class PerformancePeriod(object): return positions - # def get_positions_list(self): positions = [] for sid, pos in self.positions.iteritems(): diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index bf3a5374..9cae6e72 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -10,9 +10,8 @@ log = logbook.Logger('Transaction Simulator') class TransactionSimulator(object): - def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME): - self.open_orders = {} - self.order_count = 0 + def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME): + self.open_orders = open_orders self.txn_count = 0 self.trade_window = datetime.timedelta(seconds=30) self.orderTTL = datetime.timedelta(days=1) @@ -27,28 +26,12 @@ class TransactionSimulator(object): elif style == SIMULATION_STYLE.NOOP: self.apply_trade_to_open_orders = self.simulate_noop - def add_open_order(self, event): - # Orders are captured in a buffer by sid. No calculations are done here. - # Amount is explicitly converted to an int. - # Orders of amount zero are ignored. - - self.order_count += 1 - event.amount = int(event.amount) - - if event.amount == 0: - log = "requested to trade zero shares of {sid}".format( - sid=event.sid - ) - log.debug(log) - return - - if not self.open_orders.has_key(event.sid): - self.open_orders[event.sid] = [] - - # set the filled property to zero - event.filled = 0 - self.open_orders[event.sid].append(event) - + def update(self, event): + event.txn = None + if event.type == zp.DATASOURCE_TYPE.TRADE: + event.txn = self.apply_trade_to_open_orders(event) + return event + def simulate_buy_all(self, event): txn = self.create_transaction( event.sid, @@ -81,7 +64,7 @@ class TransactionSimulator(object): txn = self.create_transaction( event.sid, amount, - event.price + 0.10, + event.price + 0.10, # Magic constant? event.dt, direction ) diff --git a/zipline/gens/examples.py b/zipline/gens/examples.py new file mode 100644 index 00000000..d9051b10 --- /dev/null +++ b/zipline/gens/examples.py @@ -0,0 +1,38 @@ +from zipline.gens.composites import + +if __name__ == "__main__": + + filter = [1,2,3,4] + #Set up source a. One hour between events. + args_a = tuple() + kwargs_a = {'sids' : [1,2,3,4], + 'start' : datetime(2012,6,6,0), + 'delta' : timedelta(minutes = ), + 'filter' : filter + } + #Set up source b. One day between events. + args_b = tuple() + kwargs_b = {'sids' : [1,2,3,4], + 'start' : datetime(2012,6,6,0), + 'delta' : timedelta(days = 1), + 'filter' : filter + } + #Set up source c. One minute between events. + args_c = tuple() + kwargs_c = {'sids' : [1,2,3,4], + 'start' : datetime(2012,6,6,0), + 'delta' : timedelta(minutes = 1), + 'filter' : filter + } + + sources = (SpecificEquityTrades,) * 4 + source_args = (args_a, args_b, args_c, args_d) + source_kwargs = (kwargs_a, kwargs_b, kwargs_c, kwargs_d) + + # Generate our expected source_ids. + zip_args = zip(source_args, source_kwargs) + expected_ids = ["SpecificEquityTrades" + hash_args(*args, **kwargs) + for args, kwargs in zip_args] + + # Pipe our sources into sort. + sort_out = date_sorted_sources(sources, source_args, source_kwargs) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 65523a41..07bd48d6 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -13,7 +13,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): our algorithm's simulated universe. Results are fed to the user's algorithm, which directly inserts transactions into the TransactionSimulator's order book. - + TransactionSimulator maintains a dictionary from sids to the unfulfilled orders placed by the user's algorithm. As trade events arrive, if the algorithm has open orders against the @@ -37,7 +37,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): overwritten so that only the most recent snapshot of the universe is sent to the algo. """ - + #============ # Algo Setup #============ @@ -46,10 +46,10 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # reference it from within the user's algorithm. sids = algo.get_sid_filter() open_orders = {} - + for sid in sids: open_orders[sids] = [] - + # Closure to pass into the user's algo to allow placing orders # into the txn_sim's dict of open orders. def order(self, sid, amount): @@ -70,51 +70,48 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): return open_orders[sid].append(event) - + # Set the algo's order method. algo.set_order(order) - + # Provide a logbook logging interface to user code. algo.set_logger(Logger("Algolog")) # Call user-defined initialize method before we process any # events. algo.initialize() - + # Pipe the in stream into the transaction simulator. # Creates a TRANSACTION field on the event containing transaction # information if we filled any pending orders on the event's sid. # TRANSACTION is None if we didn't fill any orders. - with_txns = stateful_transform(stream_in, - TransactionSimulator, - open_orders, - style = sim_style) - - + with_txns = stateful_transform( + stream_in, + TransactionSimulator, + open_orders, + style = sim_style + ) + + # Pipe the events with transactions to perf. This will remove the # TRANSACTION field added by TransactionSimulator and replace it with # a portfolio object to be passed to the user's algorithm. Also adds # a PERF_MESSAGE field which is usually none, but contains an update # message once per day. - with_portfolio_and_perf_msg = stateful_transform(stream_with_txns, - PerformanceTracker, - trading_environment, - sids) - + with_portfolio_and_perf_msg = stateful_transform( + stream_with_txns, + PerformanceTracker, + trading_environment, + sids + ) + # Batch the event stream by dt to be processed by the user's algo. # Will also set the PERF_MESSAGE field if the batch contains a perf # message. - + batches = batcher(with_portfolio_and_perf_msg) - + for batch in batches: algo.handle_data(batch.data) if batch.perf_message: yield perf_message - - - - - - - diff --git a/zipline/gens/zmq_gens.py b/zipline/gens/zmq_gens.py index 524852a7..e60dae2b 100644 --- a/zipline/gens/zmq_gens.py +++ b/zipline/gens/zmq_gens.py @@ -2,15 +2,17 @@ import zmq import zipline.protocol as zp -def gen_from_zmq(poller, unframe): +def gen_from_zmq(poller, unframe, namestring): """ A generator that takes an initialized zmq poller and yields messages from the poller until it gets a zp.CONTROL_PROTOCOL.DONE. """ while True: message = poller.recv() - if message = zp.CONTROL_PROTOCOL.DONE: - yield "DONE" + # Done protocol should now be a message type so that + # done messages can also have source_ids. + if message.type == zp.CONTROL_PROTOCOL.DONE: + yield done_message(message.source_id) break else: yield unframe(message) diff --git a/zipline/lines.py b/zipline/lines.py index 2dc43e2f..6a70bc14 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -132,9 +132,15 @@ class SimulatedTrading(object): } self.con = Controller( + # pub socket sockets[5], + # route socket sockets[6], - self.send_sighup + # exception socket to match tradesimclient's result + # socket, because we want to relay exceptions to the + # same listener + config['results_socket'], + send_sighup=self.send_sighup ) self.started = False @@ -146,7 +152,8 @@ class SimulatedTrading(object): self.trading_client = TradeSimulationClient( self.trading_environment, self.sim_style, - config['results_socket'] + config['results_socket'], + self.algorithm ) self.add_client(self.trading_client) @@ -158,7 +165,6 @@ class SimulatedTrading(object): self.sim.register_controller( self.con ) - self.trading_client.set_algorithm(self.algorithm) @staticmethod def create_test_zipline(**config):