diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 06aa7272..068cda09 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -46,6 +46,7 @@ class TradeSimulationClient(Component): def open(self): self.result_feed = self.connect_result() + self.perf.open(self.context) def do_work(self): # poll all the sockets diff --git a/zipline/core/component.py b/zipline/core/component.py index 023018d0..fec82966 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -27,6 +27,8 @@ LOGGER = logging.getLogger('ZiplineLogger') from zipline.exceptions import ComponentNoInit from zipline.transitions import WorkflowMeta +# LOGBOOK - embed PID in log output + class Component(object): """ @@ -287,8 +289,6 @@ class Component(object): Tear down ( fast ) as a mode of failure in the simulation or on service halt. - - Context specific. """ raise NotImplementedError @@ -318,14 +318,16 @@ class Component(object): self._exception = exc exc_type, exc_value, exc_traceback = sys.exc_info() trace = '\n>>>'.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) + sys.stdout.write(trace) - exception_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.EXCEPTION, - trace - ) - self.control_out.send(exception_frame) + if hasattr(self, 'control_out'): + exception_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.EXCEPTION, + trace + ) + self.control_out.send(exception_frame) - LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) + #LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) def signal_done(self): """ @@ -472,7 +474,7 @@ class Component(object): DEPRECATED, left in for compatability for now. """ - LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) + #LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) self.sync_socket = self.context.socket(self.zmq.REQ) self.sync_socket.connect(self.addresses['sync_address']) diff --git a/zipline/core/host.py b/zipline/core/host.py index 052e2d5c..16126465 100644 --- a/zipline/core/host.py +++ b/zipline/core/host.py @@ -89,7 +89,7 @@ class ComponentHost(Component): """ Setup the sync socket and poller. ( Bind ) """ - LOGGER.debug("Connecting sync server.") + #LOGGER.debug("Connecting sync server.") self.sync_socket = self.context.socket(self.zmq.REP) self.sync_socket.bind(self.addresses['sync_address']) @@ -100,8 +100,15 @@ class ComponentHost(Component): self.sockets.append(self.sync_socket) def open(self): + LOGGER.info('== Roll Call ==\n') + for component in self.components.itervalues(): + LOGGER.info(component) + + LOGGER.info('== End Roll Call ==\n') + for component in self.components.itervalues(): self.launch_component(component) + self.launch_controller() def is_running(self): @@ -151,6 +158,3 @@ class ComponentHost(Component): def launch_component(self, component): raise NotImplementedError - - def teardown_component(self, component): - raise NotImplementedError diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index d72c229d..59bf0adb 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -125,7 +125,7 @@ import zipline.finance.risk as risk LOGGER = logging.getLogger('ZiplineLogger') -class PerformanceTracker(): +class PerformanceTracker(object): """ Tracks the performance of the zipline as it is running in the simulator, relays this out to the Deluge broker and then @@ -139,7 +139,6 @@ class PerformanceTracker(): def __init__(self, trading_environment): - self.trading_environment = trading_environment self.trading_day = datetime.timedelta(hours = 6, minutes = 30) self.calendar_day = datetime.timedelta(hours = 24) @@ -157,11 +156,13 @@ class PerformanceTracker(): self.returns = [] self.txn_count = 0 self.event_count = 0 - self.result_stream = None self.last_dict = None self.order_log = [] self.exceeded_max_loss = False + self.results_socket = None + self.results_addr = None + # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( # initial positions are empty @@ -193,19 +194,18 @@ class PerformanceTracker(): def get_portfolio(self): return self.cumulative_performance.to_ndict() - def publish_to(self, zmq_socket, context=None): + def open(self, context): + sock = context.socket(zmq.PUSH) + sock.connect(self.results_addr) + self.results_socket = sock + + def publish_to(self, results_addr): """ Publish the performance results asynchronously to a socket. """ - if isinstance(zmq_socket, zmq.Socket): - self.result_stream = zmq_socket - else: - ctx = context or zmq.Context.instance() - sock = ctx.socket(zmq.PUSH) - sock.connect(zmq_socket) - - self.result_stream = sock + assert isinstance(results_addr, basestring), type(results_addr) + self.results_addr = results_addr def to_dict(self): """ @@ -274,9 +274,9 @@ class PerformanceTracker(): self.progress = self.day_count / self.total_days # Output results - if self.result_stream: + if self.results_socket: msg = zp.PERF_FRAME(self.to_dict()) - self.result_stream.send(msg) + self.results_socket.send(msg) # if self.trading_environment.max_drawdown: @@ -315,7 +315,7 @@ class PerformanceTracker(): def handle_simulation_end(self): """ When the simulation is complete, run the full period risk report - and send it out on the result_stream. + and send it out on the results socket. """ log_msg = "Simulated {n} trading days out of {m}." @@ -334,24 +334,24 @@ class PerformanceTracker(): exceeded_max_loss = self.exceeded_max_loss ) - if self.result_stream: + if self.results_socket: LOGGER.info("about to stream the risk report...") risk_dict = self.risk_report.to_dict() msg = zp.RISK_FRAME(risk_dict) - self.result_stream.send(msg) + self.results_socket.send(msg) # this signals that the simulation is complete. - self.result_stream.send("DONE") + self.results_socket.send("DONE") -class Position(): +class Position(object): def __init__(self, sid): - self.sid = sid - self.amount = 0 - self.cost_basis = 0.0 ##per share + self.sid = sid + self.amount = 0 + self.cost_basis = 0.0 ##per share self.last_sale_price = None - self.last_sale_date = None + self.last_sale_date = None def update(self, txn): if(self.sid != txn.sid): @@ -362,12 +362,12 @@ class Position(): self.cost_basis = 0.0 self.amount = 0 else: - prev_cost = self.cost_basis*self.amount - txn_cost = txn.amount*txn.price - total_cost = prev_cost + txn_cost - total_shares = self.amount + txn.amount + prev_cost = self.cost_basis*self.amount + txn_cost = txn.amount*txn.price + total_cost = prev_cost + txn_cost + total_shares = self.amount + txn.amount self.cost_basis = total_cost/total_shares - self.amount = self.amount + txn.amount + self.amount = self.amount + txn.amount def currentValue(self): return self.amount * self.last_sale_price @@ -377,10 +377,10 @@ class Position(): template = "sid: {sid}, amount: {amount}, cost_basis: {cost_basis}, \ last_sale_price: {last_sale_price}" return template.format( - sid=self.sid, - amount=self.amount, - cost_basis=self.cost_basis, - last_sale_price=self.last_sale_price + sid = self.sid, + amount = self.amount, + cost_basis = self.cost_basis, + last_sale_price = self.last_sale_price ) def to_dict(self): @@ -396,7 +396,7 @@ class Position(): } -class PerformancePeriod(): +class PerformancePeriod(object): def __init__( self, diff --git a/zipline/lines.py b/zipline/lines.py index 0fbd5dc4..070931c5 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -165,12 +165,9 @@ class SimulatedTrading(object): #self.add_transform(self.transaction_sim) self.sim.register_controller( self.con ) - self.sim.on_done = self.shutdown() - self.trading_client.set_algorithm(self.algorithm) - @staticmethod def create_test_zipline(**config): """ @@ -322,20 +319,17 @@ class SimulatedTrading(object): assert n > 0 leased = self.allocator.lease(n) - self.leased_sockets.extend(leased) + return leased def simulate(self, blocking=False): self.started = True self.sim_context = self.sim.simulate() + if blocking: self.sim_context.join() - def shutdown(self): - pass - #self.allocator.reaquire(*self.leased_sockets) - #-------------------------------- # Component property accessors #--------------------------------