Process Simulator!

This commit is contained in:
Stephen Diehl
2012-05-26 15:54:43 -04:00
parent b47a3d8241
commit 3446c116ab
5 changed files with 55 additions and 54 deletions
+1
View File
@@ -46,6 +46,7 @@ class TradeSimulationClient(Component):
def open(self):
self.result_feed = self.connect_result()
self.perf.open(self.context)
def do_work(self):
# poll all the sockets
+11 -9
View File
@@ -27,6 +27,8 @@ LOGGER = logging.getLogger('ZiplineLogger')
from zipline.exceptions import ComponentNoInit
from zipline.transitions import WorkflowMeta
# LOGBOOK - embed PID in log output
class Component(object):
"""
@@ -287,8 +289,6 @@ class Component(object):
Tear down ( fast ) as a mode of failure in the simulation or on
service halt.
Context specific.
"""
raise NotImplementedError
@@ -318,14 +318,16 @@ class Component(object):
self._exception = exc
exc_type, exc_value, exc_traceback = sys.exc_info()
trace = '\n>>>'.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
sys.stdout.write(trace)
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
trace
)
self.control_out.send(exception_frame)
if hasattr(self, 'control_out'):
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
trace
)
self.control_out.send(exception_frame)
LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
#LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
def signal_done(self):
"""
@@ -472,7 +474,7 @@ class Component(object):
DEPRECATED, left in for compatability for now.
"""
LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
#LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
self.sync_socket = self.context.socket(self.zmq.REQ)
self.sync_socket.connect(self.addresses['sync_address'])
+8 -4
View File
@@ -89,7 +89,7 @@ class ComponentHost(Component):
"""
Setup the sync socket and poller. ( Bind )
"""
LOGGER.debug("Connecting sync server.")
#LOGGER.debug("Connecting sync server.")
self.sync_socket = self.context.socket(self.zmq.REP)
self.sync_socket.bind(self.addresses['sync_address'])
@@ -100,8 +100,15 @@ class ComponentHost(Component):
self.sockets.append(self.sync_socket)
def open(self):
LOGGER.info('== Roll Call ==\n')
for component in self.components.itervalues():
LOGGER.info(component)
LOGGER.info('== End Roll Call ==\n')
for component in self.components.itervalues():
self.launch_component(component)
self.launch_controller()
def is_running(self):
@@ -151,6 +158,3 @@ class ComponentHost(Component):
def launch_component(self, component):
raise NotImplementedError
def teardown_component(self, component):
raise NotImplementedError
+33 -33
View File
@@ -125,7 +125,7 @@ import zipline.finance.risk as risk
LOGGER = logging.getLogger('ZiplineLogger')
class PerformanceTracker():
class PerformanceTracker(object):
"""
Tracks the performance of the zipline as it is running in
the simulator, relays this out to the Deluge broker and then
@@ -139,7 +139,6 @@ class PerformanceTracker():
def __init__(self, trading_environment):
self.trading_environment = trading_environment
self.trading_day = datetime.timedelta(hours = 6, minutes = 30)
self.calendar_day = datetime.timedelta(hours = 24)
@@ -157,11 +156,13 @@ class PerformanceTracker():
self.returns = []
self.txn_count = 0
self.event_count = 0
self.result_stream = None
self.last_dict = None
self.order_log = []
self.exceeded_max_loss = False
self.results_socket = None
self.results_addr = None
# this performance period will span the entire simulation.
self.cumulative_performance = PerformancePeriod(
# initial positions are empty
@@ -193,19 +194,18 @@ class PerformanceTracker():
def get_portfolio(self):
return self.cumulative_performance.to_ndict()
def publish_to(self, zmq_socket, context=None):
def open(self, context):
sock = context.socket(zmq.PUSH)
sock.connect(self.results_addr)
self.results_socket = sock
def publish_to(self, results_addr):
"""
Publish the performance results asynchronously to a
socket.
"""
if isinstance(zmq_socket, zmq.Socket):
self.result_stream = zmq_socket
else:
ctx = context or zmq.Context.instance()
sock = ctx.socket(zmq.PUSH)
sock.connect(zmq_socket)
self.result_stream = sock
assert isinstance(results_addr, basestring), type(results_addr)
self.results_addr = results_addr
def to_dict(self):
"""
@@ -274,9 +274,9 @@ class PerformanceTracker():
self.progress = self.day_count / self.total_days
# Output results
if self.result_stream:
if self.results_socket:
msg = zp.PERF_FRAME(self.to_dict())
self.result_stream.send(msg)
self.results_socket.send(msg)
#
if self.trading_environment.max_drawdown:
@@ -315,7 +315,7 @@ class PerformanceTracker():
def handle_simulation_end(self):
"""
When the simulation is complete, run the full period risk report
and send it out on the result_stream.
and send it out on the results socket.
"""
log_msg = "Simulated {n} trading days out of {m}."
@@ -334,24 +334,24 @@ class PerformanceTracker():
exceeded_max_loss = self.exceeded_max_loss
)
if self.result_stream:
if self.results_socket:
LOGGER.info("about to stream the risk report...")
risk_dict = self.risk_report.to_dict()
msg = zp.RISK_FRAME(risk_dict)
self.result_stream.send(msg)
self.results_socket.send(msg)
# this signals that the simulation is complete.
self.result_stream.send("DONE")
self.results_socket.send("DONE")
class Position():
class Position(object):
def __init__(self, sid):
self.sid = sid
self.amount = 0
self.cost_basis = 0.0 ##per share
self.sid = sid
self.amount = 0
self.cost_basis = 0.0 ##per share
self.last_sale_price = None
self.last_sale_date = None
self.last_sale_date = None
def update(self, txn):
if(self.sid != txn.sid):
@@ -362,12 +362,12 @@ class Position():
self.cost_basis = 0.0
self.amount = 0
else:
prev_cost = self.cost_basis*self.amount
txn_cost = txn.amount*txn.price
total_cost = prev_cost + txn_cost
total_shares = self.amount + txn.amount
prev_cost = self.cost_basis*self.amount
txn_cost = txn.amount*txn.price
total_cost = prev_cost + txn_cost
total_shares = self.amount + txn.amount
self.cost_basis = total_cost/total_shares
self.amount = self.amount + txn.amount
self.amount = self.amount + txn.amount
def currentValue(self):
return self.amount * self.last_sale_price
@@ -377,10 +377,10 @@ class Position():
template = "sid: {sid}, amount: {amount}, cost_basis: {cost_basis}, \
last_sale_price: {last_sale_price}"
return template.format(
sid=self.sid,
amount=self.amount,
cost_basis=self.cost_basis,
last_sale_price=self.last_sale_price
sid = self.sid,
amount = self.amount,
cost_basis = self.cost_basis,
last_sale_price = self.last_sale_price
)
def to_dict(self):
@@ -396,7 +396,7 @@ class Position():
}
class PerformancePeriod():
class PerformancePeriod(object):
def __init__(
self,
+2 -8
View File
@@ -165,12 +165,9 @@ class SimulatedTrading(object):
#self.add_transform(self.transaction_sim)
self.sim.register_controller( self.con )
self.sim.on_done = self.shutdown()
self.trading_client.set_algorithm(self.algorithm)
@staticmethod
def create_test_zipline(**config):
"""
@@ -322,20 +319,17 @@ class SimulatedTrading(object):
assert n > 0
leased = self.allocator.lease(n)
self.leased_sockets.extend(leased)
return leased
def simulate(self, blocking=False):
self.started = True
self.sim_context = self.sim.simulate()
if blocking:
self.sim_context.join()
def shutdown(self):
pass
#self.allocator.reaquire(*self.leased_sockets)
#--------------------------------
# Component property accessors
#--------------------------------