Merge pull request #53 from quantopian/cheetah

Process simulator
This commit is contained in:
fawce
2012-05-29 07:30:23 -07:00
17 changed files with 423 additions and 113 deletions
+8
View File
@@ -35,6 +35,14 @@ develop-eggs
coverage.xml
nosetests.xml
# C Extensions
*.o
*.so
*.out
# git add -f if needed
*.c
# Vim
*.swp
*.swo
+3 -3
View File
@@ -1,7 +1,7 @@
Zipline
=======
Zipline is a distributed realtime stream processing system.
Zipline is a realtime stream processing system.
System Setup
==============
@@ -16,7 +16,7 @@ Initial `virtualenv` setup::
$ mkvirtualenv zipline
$ workon zipline
# Go get coffee, the following will compile a heap of C/C++ code
$ ./etc/ordered_pip.sh etc/requirements_sci.txt
$ ./etc/ordered_pip.sh etc/requirements_sci.txt
$ ./etc/ordered_pip.sh etc/requirements.txt
# And optionally:
$ ./etc/ordered_pip.sh etc/requirements_dev.txt
@@ -30,7 +30,7 @@ To run tests::
$ nosetests
To build documentation::
$ paver apidocs html
# outputs to docs/_build/html
+8 -3
View File
@@ -1,7 +1,12 @@
#zeromq related
pyzmq==2.1.11
gevent-zeromq==0.2.2
msgpack-python==0.1.12
humanhash==0.0.1
ujson==1.18
iso8601==0.1.4
# ZeroMQ
pyzmq==2.1.11
gevent-zeromq==0.2.2
# Packaging
distribute==0.6.27
setuptools==0.6c11
+10 -9
View File
@@ -1,18 +1,19 @@
#date related
# Date Related
pytz==2011n
python-dateutil==1.5
#core scientific python
numpy==1.6.1
scipy==0.10.0
# Core scientific python
numpy>=1.6.1
pandas>=0.7.0rc1
scipy>=0.10.0
matplotlib==1.1.0
#http://sourceforge.net/projects/matplotlib/files/matplotlib/matplotlib-1.1.0/matplotlib-1.1.0.tar.gz
numexpr==2.0.1
Cython==0.15.1
tables==2.3.1
scikits.statsmodels==0.3.1
pandas==0.7.0rc1
#tables>=2.3.1
#scikits.statsmodels>=0.3.1
#zeromq related
# ZeroMQ
pyzmq==2.1.11
+145 -40
View File
@@ -1,22 +1,56 @@
import os, sys
import time
import os
import re
import platform
import sys
import glob
import time
from distutils.dep_util import newer
#from distutils.extension import Extension
from setuptools.extension import Extension
from paver.easy import options, Bunch, task, needs, path, info
from paver.setuputils import install_distutils_tasks, \
find_packages, find_package_data
from subprocess import call
install_distutils_tasks()
from paver.easy import *
from paver.doctools import *
from paver.setuputils import *
# =========
# Compilers
# =========
from paved import *
from paved.util import *
from paved.pycheck import *
try:
from Cython.Compiler.Main import compile
from Cython.Distutils import build_ext
have_cython = True
except ImportError:
have_cython = False
#add setuputils tasks
paver.setuputils.install_distutils_tasks()
try:
import numpy as np
have_numpy = True
except:
have_numpy = False
operating_system = platform.system()
# ===================
# Release Information
# ===================
PACKAGE = 'zipline'
SRC_PATH = 'zipline'
MAJOR = 0
MINOR = 1
MICRO = 0
DEVELOPMENT = True
if DEVELOPMENT:
VERSION = '%d.%d.%d dev' % (MAJOR, MINOR, MICRO)
else:
VERSION = '%d.%d.%d' % (MAJOR, MINOR, MICRO)
# The PyPi page
DESCRIPTION = open('README.md').read()
EMAIL='dev@quantopian.com'
# ===========
# Setuputils
@@ -35,39 +69,114 @@ def parse_requirements(file_name):
requirements.append(line)
return requirements
version='dev'
install_requires = parse_requirements('./etc/requirements.txt') + parse_requirements('./etc/requirements_sci.txt')
example = Extension(
"zipline/speedups/example", ["zipline/speedups/example.pyx"],
#include_dirs=[np.get_include()],
)
# ============
# Dependencies
# ============
install_requires = (
parse_requirements('./etc/requirements.txt') +
parse_requirements('./etc/requirements_sci.txt')
)
tests_require = install_requires + parse_requirements('./etc/requirements_dev.txt')
# ========
# seutp.py
# ========
if have_numpy and have_cython:
cext = [example]
else:
cext = []
options(
sphinx=Bunch(
sphinx = Bunch(
builddir="_build",
sourcedir=""
),
setup = Bunch(name='zipline',
version = version,
classifiers = [],
packages = find_packages(),
package_data = find_package_data("zipline", package="zipline",
only_in_packages=False),
install_requires = install_requires,
tests_require = tests_require,
test_suite = 'nose.collector',
include_package_data = True,
zip_safe = False,
),
setup = Bunch(
name = PACKAGE,
version = VERSION,
packages = find_packages(),
package_data = find_package_data(
SRC_PATH,
package = PACKAGE,
only_in_packages = False
),
long_description = DESCRIPTION,
install_requires = install_requires,
tests_require = tests_require,
test_suite = 'nose.collector',
include_package_data = True,
zip_safe = False,
classifiers = [
'Development Status :: 2 - Pre-Alpha',
'License :: OSI Approved :: BSD License',
'Natural Language :: English',
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Programming Language :: C',
'Programming Language :: Cython',
'Operating System :: OS Independent',
'Intended Audience :: Science/Research',
'Topic :: Office/Business :: Financial',
'Topic :: Scientific/Engineering :: Information Analysis',
'Topic :: System :: Distributed Computing',
],
ext_modules = cext,
cmdclass = {
'build_ext': build_ext
},
entry_points = {
'console_scripts': [
'zipline = zipline.core.interpreter:main',
]
},
),
)
options.paved.clean.patterns.extend([
#'*.swp', # vim related
#'*.swo', # vim related
'nosetests.xml',
'.coverage',
',coverage',
'*.lprof',
'*.prof',
])
# ============
# C Extensions
# ============
@task
def clean_inplace():
"""
Remove shared objects and C files from the extension
directory.
"""
for fn in glob.glob(os.path.join(SRC_PATH, 'speedups', '*.c')):
p = path(fn)
p.remove()
for fn in glob.glob(os.path.join(SRC_PATH, 'speedups', '*.so')):
p = path(fn)
p.remove()
@task
def build_cython():
for fn in glob.glob(os.path.join(SRC_PATH, 'speedups', '*.pyx')):
p = path(fn)
modname = p.splitext()[0].basename()
dest = p.splitext()[0] + '.c'
if newer(p.abspath(), dest.abspath()):
info('cython %s -o %s'%(p, dest.basename()))
compile(p.abspath(), full_module_name=modname)
@task
@needs(['build_cython', 'setuptools.command.build_ext'])
def build_ext():
pass
# ======
# Tasks
# ======
# Because I'm lazy
stuff_i_want_in_my_debug_shell = [
@@ -75,10 +184,6 @@ stuff_i_want_in_my_debug_shell = [
('zmq', 'zmq', []),
]
# ======
# Tasks
# ======
@task
def coverage():
"""
+1
View File
@@ -46,6 +46,7 @@ class TradeSimulationClient(Component):
def open(self):
self.result_feed = self.connect_result()
self.perf.open(self.context)
def do_work(self):
# poll all the sockets
+14 -9
View File
@@ -10,6 +10,7 @@ import socket
import logging
import traceback
import humanhash
from setproctitle import setproctitle
# pyzmq
import zmq
@@ -27,6 +28,8 @@ LOGGER = logging.getLogger('ZiplineLogger')
from zipline.exceptions import ComponentNoInit
from zipline.transitions import WorkflowMeta
# LOGBOOK - embed PID in log output
class Component(object):
"""
@@ -165,6 +168,8 @@ class Component(object):
self.zmq = zmq
self.context = self.zmq.Context()
self.zmq_poller = self.zmq.Poller
# The the process title so you can watch it in top
setproctitle(self.__class__.__name__)
return
if flavor == 'thread':
self.zmq = zmq
@@ -287,8 +292,6 @@ class Component(object):
Tear down ( fast ) as a mode of failure in the simulation or on
service halt.
Context specific.
"""
raise NotImplementedError
@@ -318,14 +321,16 @@ class Component(object):
self._exception = exc
exc_type, exc_value, exc_traceback = sys.exc_info()
trace = '\n>>>'.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
sys.stdout.write(trace)
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
trace
)
self.control_out.send(exception_frame)
if hasattr(self, 'control_out'):
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
trace
)
self.control_out.send(exception_frame)
LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
#LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
def signal_done(self):
"""
@@ -472,7 +477,7 @@ class Component(object):
DEPRECATED, left in for compatability for now.
"""
LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
#LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
self.sync_socket = self.context.socket(self.zmq.REQ)
self.sync_socket.connect(self.addresses['sync_address'])
+3 -3
View File
@@ -4,7 +4,7 @@ See :py:method""
"""
import threading
from zipline.core import ComponentHost
from zipline.core.simulatorref import SimulatorBase
class AddressAllocator(object):
"""
@@ -28,13 +28,13 @@ class AddressAllocator(object):
pass
class Simulator(ComponentHost):
class Simulator(SimulatorBase):
zmq_flavor = 'thread'
def __init__(self, addresses):
# TODO: rethink this
ComponentHost.__init__(self, addresses)
SimulatorBase.__init__(self, addresses)
self.subthreads = []
self.running = False
+8 -4
View File
@@ -89,7 +89,7 @@ class ComponentHost(Component):
"""
Setup the sync socket and poller. ( Bind )
"""
LOGGER.debug("Connecting sync server.")
#LOGGER.debug("Connecting sync server.")
self.sync_socket = self.context.socket(self.zmq.REP)
self.sync_socket.bind(self.addresses['sync_address'])
@@ -100,8 +100,15 @@ class ComponentHost(Component):
self.sockets.append(self.sync_socket)
def open(self):
LOGGER.info('== Roll Call ==\n')
for component in self.components.itervalues():
LOGGER.info(component)
LOGGER.info('== End Roll Call ==\n')
for component in self.components.itervalues():
self.launch_component(component)
self.launch_controller()
def is_running(self):
@@ -151,6 +158,3 @@ class ComponentHost(Component):
def launch_component(self, component):
raise NotImplementedError
def teardown_component(self, component):
raise NotImplementedError
+82
View File
@@ -0,0 +1,82 @@
import sys
import yaml
import argparse
import fileinput
from cStringIO import StringIO
from zipline.utils.date_utils import EPOCH, date_to_datetime
def interpret(args):
print 'Reading {ifile}'.format(ifile=args.file)
metastart = False
metadone = False
metadata = StringIO()
algorithm = StringIO()
for line in fileinput.input(args.file):
if line.startswith('---'):
if metastart:
metastart = False
metadone = False
else:
metastart = True
metadone = False
metadata.write(line)
elif metastart:
metadata.write(line)
else:
algorithm.write(line)
#print 'Metadata:'
#print metadata.getvalue()
#print 'Algorithm:'
#print algorithm.getvalue()
try:
meta = yaml.load_all(metadata.getvalue())
except yaml.error.YAMLError, e:
print e
sys.exit(0)
try:
meta = meta.next()
except StopIteration:
raise RuntimeError("No metadata in file.")
algocode = algorithm.getvalue()
start = meta['start_date']
end = meta['end_date']
meta['start_date'] = EPOCH(date_to_datetime(start))
meta['end_date'] = EPOCH(date_to_datetime(end))
meta['algocode'] = algocode
print end - start
ns = {}
# -- Sanity check --
exec(algocode) in ns
assert ns['initialize']
assert ns['get_sid_filter']
assert ns['handle_data']
return algocode, meta
def main():
parser = argparse.ArgumentParser()
parser.add_argument('file', metavar='file', help='Algorithm file.')
args = parser.parse_args()
if not args.file:
print parser.print_help()
sys.exit(0)
interpret(args)
if __name__ == '__main__':
main()
+94
View File
@@ -0,0 +1,94 @@
"""
The reference simulator for all of Quantopian infastructure.
If a subclass does not conform to the API it will fail at
compiletime.
Subclasses:
- (partial) zipline.devsimulator.Simulator
- ( full ) qexec.executor.simulator.ProcessSimulator
- ( full ) qexec.executor.simulator.ThreadSimulator
- ( full ) qexec.executor.simulator.GreenletSimulator
"""
import abc
from zipline.core.host import ComponentHost
class SimulatorBase(ComponentHost):
__metaclass__ = abc.ABCMeta
def __init__(self, addresses):
"""
Initailizes the simulator.
"""
ComponentHost.__init__(self, addresses)
@abc.abstractproperty
def get_id(self):
"""Human readable name of the simulator."""
return "Reference Simulator"
@abc.abstractmethod
def launch_component(self, component):
""" Launch an indvidiaul component in the simulation. """
raise NotImplementedError
@abc.abstractmethod
def launch_controller(self):
""" Launch the controller for the simulation. """
raise NotImplementedError
@abc.abstractmethod
def simulate(self):
""" Run a simulation. """
raise NotImplementedError
@abc.abstractmethod
def shutdown(self):
""" Normal shutdown procedure. """
raise NotImplementedError
def cancel(self):
""" Soft shutdown """
self.controller.shutdown(soft=True)
def kill(self):
""" Hard shutdown """
self.controller.shutdown(hard=True)
# Extension Methods
# -----------------
# Provided by some simulators, those that do not will degrade
# gracefully.
# - ``did_clean_shutdown``
# - ``point_of_failure``
# - ``launch_debugger``
def did_clean_shutdown(self):
"""
Returns True if all the subcomponents in the simulation yielded
cleanly.
"""
return False
def point_of_failure(self):
""" Returns the point of failure of the code. """
failures = [
c for c in self._components.values()
if c.exception
]
# Sort by failure time so we can follow the failure
# through the system.
return sorted(failures, key=lambda c: c.fail_time)
def launch_debugger(self):
"""
Launches a remote debug shell in the context of the failed component.
"""
pass
+36 -33
View File
@@ -123,7 +123,7 @@ import zipline.finance.risk as risk
LOGGER = logging.getLogger('ZiplineLogger')
class PerformanceTracker():
class PerformanceTracker(object):
"""
Tracks the performance of the zipline as it is running in
the simulator, relays this out to the Deluge broker and then
@@ -137,7 +137,6 @@ class PerformanceTracker():
def __init__(self, trading_environment):
self.trading_environment = trading_environment
self.trading_day = datetime.timedelta(hours = 6, minutes = 30)
self.calendar_day = datetime.timedelta(hours = 24)
@@ -155,11 +154,13 @@ class PerformanceTracker():
self.returns = []
self.txn_count = 0
self.event_count = 0
self.result_stream = None
self.last_dict = None
self.order_log = []
self.exceeded_max_loss = False
self.results_socket = None
self.results_addr = None
# this performance period will span the entire simulation.
self.cumulative_performance = PerformancePeriod(
# initial positions are empty
@@ -191,19 +192,21 @@ class PerformanceTracker():
def get_portfolio(self):
return self.cumulative_performance.to_ndict()
def publish_to(self, zmq_socket, context=None):
def open(self, context):
if self.results_addr:
sock = context.socket(zmq.PUSH)
sock.connect(self.results_addr)
self.results_socket = sock
else:
LOGGER.warn("Not streaming results because no results socket given")
def publish_to(self, results_addr):
"""
Publish the performance results asynchronously to a
socket.
"""
if isinstance(zmq_socket, zmq.Socket):
self.result_stream = zmq_socket
else:
ctx = context or zmq.Context.instance()
sock = ctx.socket(zmq.PUSH)
sock.connect(zmq_socket)
self.result_stream = sock
assert isinstance(results_addr, basestring), type(results_addr)
self.results_addr = results_addr
def to_dict(self):
"""
@@ -272,9 +275,9 @@ class PerformanceTracker():
self.progress = self.day_count / self.total_days
# Output results
if self.result_stream:
if self.results_socket:
msg = zp.PERF_FRAME(self.to_dict())
self.result_stream.send(msg)
self.results_socket.send(msg)
#
if self.trading_environment.max_drawdown:
@@ -313,7 +316,7 @@ class PerformanceTracker():
def handle_simulation_end(self):
"""
When the simulation is complete, run the full period risk report
and send it out on the result_stream.
and send it out on the results socket.
"""
log_msg = "Simulated {n} trading days out of {m}."
@@ -332,24 +335,24 @@ class PerformanceTracker():
exceeded_max_loss = self.exceeded_max_loss
)
if self.result_stream:
if self.results_socket:
LOGGER.info("about to stream the risk report...")
risk_dict = self.risk_report.to_dict()
msg = zp.RISK_FRAME(risk_dict)
self.result_stream.send(msg)
self.results_socket.send(msg)
# this signals that the simulation is complete.
self.result_stream.send("DONE")
self.results_socket.send("DONE")
class Position():
class Position(object):
def __init__(self, sid):
self.sid = sid
self.amount = 0
self.cost_basis = 0.0 ##per share
self.sid = sid
self.amount = 0
self.cost_basis = 0.0 ##per share
self.last_sale_price = None
self.last_sale_date = None
self.last_sale_date = None
def update(self, txn):
if(self.sid != txn.sid):
@@ -360,12 +363,12 @@ class Position():
self.cost_basis = 0.0
self.amount = 0
else:
prev_cost = self.cost_basis*self.amount
txn_cost = txn.amount*txn.price
total_cost = prev_cost + txn_cost
total_shares = self.amount + txn.amount
prev_cost = self.cost_basis*self.amount
txn_cost = txn.amount*txn.price
total_cost = prev_cost + txn_cost
total_shares = self.amount + txn.amount
self.cost_basis = total_cost/total_shares
self.amount = self.amount + txn.amount
self.amount = self.amount + txn.amount
def currentValue(self):
return self.amount * self.last_sale_price
@@ -375,10 +378,10 @@ class Position():
template = "sid: {sid}, amount: {amount}, cost_basis: {cost_basis}, \
last_sale_price: {last_sale_price}"
return template.format(
sid=self.sid,
amount=self.amount,
cost_basis=self.cost_basis,
last_sale_price=self.last_sale_price
sid = self.sid,
amount = self.amount,
cost_basis = self.cost_basis,
last_sale_price = self.last_sale_price
)
def to_dict(self):
@@ -394,7 +397,7 @@ class Position():
}
class PerformancePeriod():
class PerformancePeriod(object):
def __init__(
self,
+2 -8
View File
@@ -165,12 +165,9 @@ class SimulatedTrading(object):
#self.add_transform(self.transaction_sim)
self.sim.register_controller( self.con )
self.sim.on_done = self.shutdown()
self.trading_client.set_algorithm(self.algorithm)
@staticmethod
def create_test_zipline(**config):
"""
@@ -322,20 +319,17 @@ class SimulatedTrading(object):
assert n > 0
leased = self.allocator.lease(n)
self.leased_sockets.extend(leased)
return leased
def simulate(self, blocking=False):
self.started = True
self.sim_context = self.sim.simulate()
if blocking:
self.sim_context.join()
def shutdown(self):
pass
#self.allocator.reaquire(*self.leased_sockets)
#--------------------------------
# Component property accessors
#--------------------------------
View File
+3
View File
@@ -0,0 +1,3 @@
from libc.stdio cimport printf
printf("Hello World!")
+5
View File
@@ -127,3 +127,8 @@ if __name__ == '__main__':
for day in trading_days(now, now30):
print day
print time.time() - tic
def date_to_datetime(t):
dt = datetime.fromordinal(t.toordinal())
dt = dt.replace(tzinfo = pytz.utc)
return dt
+1 -1
View File
@@ -141,7 +141,7 @@ class ndict(MutableMapping):
self.__internal.update(other_nd.__internal)
def __repr__(self):
return "namedict: " + str(self.__internal)
return "ndict(%s)" % str(self.__internal)
# Faster dictionary comparison?
#def __eq__(self, other):