diff --git a/.gitignore b/.gitignore index 38739d4f..43e75541 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,14 @@ develop-eggs coverage.xml nosetests.xml +# C Extensions +*.o +*.so +*.out +# git add -f if needed +*.c + +# Vim *.swp *.swo diff --git a/README.md b/README.md index 88c1bec3..ae9fb91b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ Zipline ======= -Zipline is a distributed realtime stream processing system. +Zipline is a realtime stream processing system. System Setup ============== @@ -16,7 +16,7 @@ Initial `virtualenv` setup:: $ mkvirtualenv zipline $ workon zipline # Go get coffee, the following will compile a heap of C/C++ code - $ ./etc/ordered_pip.sh etc/requirements_sci.txt + $ ./etc/ordered_pip.sh etc/requirements_sci.txt $ ./etc/ordered_pip.sh etc/requirements.txt # And optionally: $ ./etc/ordered_pip.sh etc/requirements_dev.txt @@ -30,7 +30,7 @@ To run tests:: $ nosetests To build documentation:: - + $ paver apidocs html # outputs to docs/_build/html diff --git a/etc/requirements.txt b/etc/requirements.txt index c7090540..caa0d0c6 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -1,7 +1,12 @@ -#zeromq related -pyzmq==2.1.11 -gevent-zeromq==0.2.2 msgpack-python==0.1.12 humanhash==0.0.1 ujson==1.18 iso8601==0.1.4 + +# ZeroMQ +pyzmq==2.1.11 +gevent-zeromq==0.2.2 + +# Packaging +distribute==0.6.27 +setuptools==0.6c11 diff --git a/etc/requirements_sci.txt b/etc/requirements_sci.txt index 4ed7d007..b74b3e57 100644 --- a/etc/requirements_sci.txt +++ b/etc/requirements_sci.txt @@ -1,18 +1,19 @@ -#date related +# Date Related pytz==2011n python-dateutil==1.5 -#core scientific python -numpy==1.6.1 -scipy==0.10.0 +# Core scientific python +numpy>=1.6.1 +pandas>=0.7.0rc1 +scipy>=0.10.0 + matplotlib==1.1.0 #http://sourceforge.net/projects/matplotlib/files/matplotlib/matplotlib-1.1.0/matplotlib-1.1.0.tar.gz + numexpr==2.0.1 Cython==0.15.1 -tables==2.3.1 -scikits.statsmodels==0.3.1 -pandas==0.7.0rc1 +#tables>=2.3.1 +#scikits.statsmodels>=0.3.1 -#zeromq related +# ZeroMQ pyzmq==2.1.11 - diff --git a/pavement.py b/pavement.py index 0e89f226..d4b4bd7a 100644 --- a/pavement.py +++ b/pavement.py @@ -1,22 +1,56 @@ -import os, sys -import time +import os import re -import platform +import sys +import glob +import time +from distutils.dep_util import newer +#from distutils.extension import Extension +from setuptools.extension import Extension + +from paver.easy import options, Bunch, task, needs, path, info +from paver.setuputils import install_distutils_tasks, \ + find_packages, find_package_data from subprocess import call +install_distutils_tasks() -from paver.easy import * -from paver.doctools import * -from paver.setuputils import * +# ========= +# Compilers +# ========= -from paved import * -from paved.util import * -from paved.pycheck import * +try: + from Cython.Compiler.Main import compile + from Cython.Distutils import build_ext + have_cython = True +except ImportError: + have_cython = False -#add setuputils tasks -paver.setuputils.install_distutils_tasks() +try: + import numpy as np + have_numpy = True +except: + have_numpy = False -operating_system = platform.system() +# =================== +# Release Information +# =================== + +PACKAGE = 'zipline' +SRC_PATH = 'zipline' + +MAJOR = 0 +MINOR = 1 +MICRO = 0 +DEVELOPMENT = True + +if DEVELOPMENT: + VERSION = '%d.%d.%d dev' % (MAJOR, MINOR, MICRO) +else: + VERSION = '%d.%d.%d' % (MAJOR, MINOR, MICRO) + +# The PyPi page +DESCRIPTION = open('README.md').read() +EMAIL='dev@quantopian.com' # =========== # Setuputils @@ -35,39 +69,114 @@ def parse_requirements(file_name): requirements.append(line) return requirements -version='dev' -install_requires = parse_requirements('./etc/requirements.txt') + parse_requirements('./etc/requirements_sci.txt') +example = Extension( + "zipline/speedups/example", ["zipline/speedups/example.pyx"], + #include_dirs=[np.get_include()], +) + +# ============ +# Dependencies +# ============ + +install_requires = ( + parse_requirements('./etc/requirements.txt') + + parse_requirements('./etc/requirements_sci.txt') +) tests_require = install_requires + parse_requirements('./etc/requirements_dev.txt') +# ======== +# seutp.py +# ======== + +if have_numpy and have_cython: + cext = [example] +else: + cext = [] options( - sphinx=Bunch( + sphinx = Bunch( builddir="_build", sourcedir="" ), - setup = Bunch(name='zipline', - version = version, - classifiers = [], - packages = find_packages(), - package_data = find_package_data("zipline", package="zipline", - only_in_packages=False), - install_requires = install_requires, - tests_require = tests_require, - test_suite = 'nose.collector', - include_package_data = True, - zip_safe = False, - ), + setup = Bunch( + name = PACKAGE, + version = VERSION, + packages = find_packages(), + package_data = find_package_data( + SRC_PATH, + package = PACKAGE, + only_in_packages = False + ), + long_description = DESCRIPTION, + install_requires = install_requires, + tests_require = tests_require, + test_suite = 'nose.collector', + include_package_data = True, + zip_safe = False, + classifiers = [ + 'Development Status :: 2 - Pre-Alpha', + 'License :: OSI Approved :: BSD License', + 'Natural Language :: English', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: C', + 'Programming Language :: Cython', + 'Operating System :: OS Independent', + 'Intended Audience :: Science/Research', + 'Topic :: Office/Business :: Financial', + 'Topic :: Scientific/Engineering :: Information Analysis', + 'Topic :: System :: Distributed Computing', + ], + ext_modules = cext, + cmdclass = { + 'build_ext': build_ext + }, + entry_points = { + 'console_scripts': [ + 'zipline = zipline.core.interpreter:main', + ] + }, + ), ) -options.paved.clean.patterns.extend([ - #'*.swp', # vim related - #'*.swo', # vim related - 'nosetests.xml', - '.coverage', - ',coverage', - '*.lprof', - '*.prof', -]) +# ============ +# C Extensions +# ============ + +@task +def clean_inplace(): + """ + Remove shared objects and C files from the extension + directory. + """ + for fn in glob.glob(os.path.join(SRC_PATH, 'speedups', '*.c')): + p = path(fn) + p.remove() + + for fn in glob.glob(os.path.join(SRC_PATH, 'speedups', '*.so')): + p = path(fn) + p.remove() + +@task +def build_cython(): + for fn in glob.glob(os.path.join(SRC_PATH, 'speedups', '*.pyx')): + p = path(fn) + + modname = p.splitext()[0].basename() + dest = p.splitext()[0] + '.c' + + if newer(p.abspath(), dest.abspath()): + info('cython %s -o %s'%(p, dest.basename())) + compile(p.abspath(), full_module_name=modname) + +@task +@needs(['build_cython', 'setuptools.command.build_ext']) +def build_ext(): + pass + +# ====== +# Tasks +# ====== # Because I'm lazy stuff_i_want_in_my_debug_shell = [ @@ -75,10 +184,6 @@ stuff_i_want_in_my_debug_shell = [ ('zmq', 'zmq', []), ] -# ====== -# Tasks -# ====== - @task def coverage(): """ diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 827a8d00..79312002 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -46,6 +46,7 @@ class TradeSimulationClient(Component): def open(self): self.result_feed = self.connect_result() + self.perf.open(self.context) def do_work(self): # poll all the sockets diff --git a/zipline/core/component.py b/zipline/core/component.py index 023018d0..d5b14d96 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -10,6 +10,7 @@ import socket import logging import traceback import humanhash +from setproctitle import setproctitle # pyzmq import zmq @@ -27,6 +28,8 @@ LOGGER = logging.getLogger('ZiplineLogger') from zipline.exceptions import ComponentNoInit from zipline.transitions import WorkflowMeta +# LOGBOOK - embed PID in log output + class Component(object): """ @@ -165,6 +168,8 @@ class Component(object): self.zmq = zmq self.context = self.zmq.Context() self.zmq_poller = self.zmq.Poller + # The the process title so you can watch it in top + setproctitle(self.__class__.__name__) return if flavor == 'thread': self.zmq = zmq @@ -287,8 +292,6 @@ class Component(object): Tear down ( fast ) as a mode of failure in the simulation or on service halt. - - Context specific. """ raise NotImplementedError @@ -318,14 +321,16 @@ class Component(object): self._exception = exc exc_type, exc_value, exc_traceback = sys.exc_info() trace = '\n>>>'.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) + sys.stdout.write(trace) - exception_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.EXCEPTION, - trace - ) - self.control_out.send(exception_frame) + if hasattr(self, 'control_out'): + exception_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.EXCEPTION, + trace + ) + self.control_out.send(exception_frame) - LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) + #LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) def signal_done(self): """ @@ -472,7 +477,7 @@ class Component(object): DEPRECATED, left in for compatability for now. """ - LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) + #LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) self.sync_socket = self.context.socket(self.zmq.REQ) self.sync_socket.connect(self.addresses['sync_address']) diff --git a/zipline/core/devsimulator.py b/zipline/core/devsimulator.py index b1aee965..85ddc44e 100644 --- a/zipline/core/devsimulator.py +++ b/zipline/core/devsimulator.py @@ -4,7 +4,7 @@ See :py:method"" """ import threading -from zipline.core import ComponentHost +from zipline.core.simulatorref import SimulatorBase class AddressAllocator(object): """ @@ -28,13 +28,13 @@ class AddressAllocator(object): pass -class Simulator(ComponentHost): +class Simulator(SimulatorBase): zmq_flavor = 'thread' def __init__(self, addresses): # TODO: rethink this - ComponentHost.__init__(self, addresses) + SimulatorBase.__init__(self, addresses) self.subthreads = [] self.running = False diff --git a/zipline/core/host.py b/zipline/core/host.py index 052e2d5c..16126465 100644 --- a/zipline/core/host.py +++ b/zipline/core/host.py @@ -89,7 +89,7 @@ class ComponentHost(Component): """ Setup the sync socket and poller. ( Bind ) """ - LOGGER.debug("Connecting sync server.") + #LOGGER.debug("Connecting sync server.") self.sync_socket = self.context.socket(self.zmq.REP) self.sync_socket.bind(self.addresses['sync_address']) @@ -100,8 +100,15 @@ class ComponentHost(Component): self.sockets.append(self.sync_socket) def open(self): + LOGGER.info('== Roll Call ==\n') + for component in self.components.itervalues(): + LOGGER.info(component) + + LOGGER.info('== End Roll Call ==\n') + for component in self.components.itervalues(): self.launch_component(component) + self.launch_controller() def is_running(self): @@ -151,6 +158,3 @@ class ComponentHost(Component): def launch_component(self, component): raise NotImplementedError - - def teardown_component(self, component): - raise NotImplementedError diff --git a/zipline/core/interpreter.py b/zipline/core/interpreter.py new file mode 100644 index 00000000..6bcd4eed --- /dev/null +++ b/zipline/core/interpreter.py @@ -0,0 +1,82 @@ +import sys +import yaml +import argparse +import fileinput +from cStringIO import StringIO +from zipline.utils.date_utils import EPOCH, date_to_datetime + +def interpret(args): + print 'Reading {ifile}'.format(ifile=args.file) + + metastart = False + metadone = False + + metadata = StringIO() + algorithm = StringIO() + + for line in fileinput.input(args.file): + if line.startswith('---'): + if metastart: + metastart = False + metadone = False + else: + metastart = True + metadone = False + metadata.write(line) + + elif metastart: + metadata.write(line) + else: + algorithm.write(line) + + #print 'Metadata:' + #print metadata.getvalue() + + #print 'Algorithm:' + #print algorithm.getvalue() + + try: + meta = yaml.load_all(metadata.getvalue()) + except yaml.error.YAMLError, e: + print e + sys.exit(0) + + try: + meta = meta.next() + except StopIteration: + raise RuntimeError("No metadata in file.") + + algocode = algorithm.getvalue() + + start = meta['start_date'] + end = meta['end_date'] + + meta['start_date'] = EPOCH(date_to_datetime(start)) + meta['end_date'] = EPOCH(date_to_datetime(end)) + meta['algocode'] = algocode + + print end - start + + ns = {} + + # -- Sanity check -- + exec(algocode) in ns + + assert ns['initialize'] + assert ns['get_sid_filter'] + assert ns['handle_data'] + + return algocode, meta + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('file', metavar='file', help='Algorithm file.') + args = parser.parse_args() + + if not args.file: + print parser.print_help() + sys.exit(0) + interpret(args) + +if __name__ == '__main__': + main() diff --git a/zipline/core/simulatorref.py b/zipline/core/simulatorref.py new file mode 100644 index 00000000..abe87379 --- /dev/null +++ b/zipline/core/simulatorref.py @@ -0,0 +1,94 @@ +""" + +The reference simulator for all of Quantopian infastructure. + +If a subclass does not conform to the API it will fail at +compiletime. + +Subclasses: + + - (partial) zipline.devsimulator.Simulator + - ( full ) qexec.executor.simulator.ProcessSimulator + - ( full ) qexec.executor.simulator.ThreadSimulator + - ( full ) qexec.executor.simulator.GreenletSimulator + +""" + +import abc +from zipline.core.host import ComponentHost + +class SimulatorBase(ComponentHost): + + __metaclass__ = abc.ABCMeta + + def __init__(self, addresses): + """ + Initailizes the simulator. + """ + ComponentHost.__init__(self, addresses) + + @abc.abstractproperty + def get_id(self): + """Human readable name of the simulator.""" + return "Reference Simulator" + + @abc.abstractmethod + def launch_component(self, component): + """ Launch an indvidiaul component in the simulation. """ + raise NotImplementedError + + @abc.abstractmethod + def launch_controller(self): + """ Launch the controller for the simulation. """ + raise NotImplementedError + + @abc.abstractmethod + def simulate(self): + """ Run a simulation. """ + raise NotImplementedError + + @abc.abstractmethod + def shutdown(self): + """ Normal shutdown procedure. """ + raise NotImplementedError + + def cancel(self): + """ Soft shutdown """ + self.controller.shutdown(soft=True) + + def kill(self): + """ Hard shutdown """ + self.controller.shutdown(hard=True) + + # Extension Methods + # ----------------- + # Provided by some simulators, those that do not will degrade + # gracefully. + + # - ``did_clean_shutdown`` + # - ``point_of_failure`` + # - ``launch_debugger`` + + def did_clean_shutdown(self): + """ + Returns True if all the subcomponents in the simulation yielded + cleanly. + """ + return False + + def point_of_failure(self): + """ Returns the point of failure of the code. """ + failures = [ + c for c in self._components.values() + if c.exception + ] + + # Sort by failure time so we can follow the failure + # through the system. + return sorted(failures, key=lambda c: c.fail_time) + + def launch_debugger(self): + """ + Launches a remote debug shell in the context of the failed component. + """ + pass diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 76231d2e..5a2aeaef 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -123,7 +123,7 @@ import zipline.finance.risk as risk LOGGER = logging.getLogger('ZiplineLogger') -class PerformanceTracker(): +class PerformanceTracker(object): """ Tracks the performance of the zipline as it is running in the simulator, relays this out to the Deluge broker and then @@ -137,7 +137,6 @@ class PerformanceTracker(): def __init__(self, trading_environment): - self.trading_environment = trading_environment self.trading_day = datetime.timedelta(hours = 6, minutes = 30) self.calendar_day = datetime.timedelta(hours = 24) @@ -155,11 +154,13 @@ class PerformanceTracker(): self.returns = [] self.txn_count = 0 self.event_count = 0 - self.result_stream = None self.last_dict = None self.order_log = [] self.exceeded_max_loss = False + self.results_socket = None + self.results_addr = None + # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( # initial positions are empty @@ -191,19 +192,21 @@ class PerformanceTracker(): def get_portfolio(self): return self.cumulative_performance.to_ndict() - def publish_to(self, zmq_socket, context=None): + def open(self, context): + if self.results_addr: + sock = context.socket(zmq.PUSH) + sock.connect(self.results_addr) + self.results_socket = sock + else: + LOGGER.warn("Not streaming results because no results socket given") + + def publish_to(self, results_addr): """ Publish the performance results asynchronously to a socket. """ - if isinstance(zmq_socket, zmq.Socket): - self.result_stream = zmq_socket - else: - ctx = context or zmq.Context.instance() - sock = ctx.socket(zmq.PUSH) - sock.connect(zmq_socket) - - self.result_stream = sock + assert isinstance(results_addr, basestring), type(results_addr) + self.results_addr = results_addr def to_dict(self): """ @@ -272,9 +275,9 @@ class PerformanceTracker(): self.progress = self.day_count / self.total_days # Output results - if self.result_stream: + if self.results_socket: msg = zp.PERF_FRAME(self.to_dict()) - self.result_stream.send(msg) + self.results_socket.send(msg) # if self.trading_environment.max_drawdown: @@ -313,7 +316,7 @@ class PerformanceTracker(): def handle_simulation_end(self): """ When the simulation is complete, run the full period risk report - and send it out on the result_stream. + and send it out on the results socket. """ log_msg = "Simulated {n} trading days out of {m}." @@ -332,24 +335,24 @@ class PerformanceTracker(): exceeded_max_loss = self.exceeded_max_loss ) - if self.result_stream: + if self.results_socket: LOGGER.info("about to stream the risk report...") risk_dict = self.risk_report.to_dict() msg = zp.RISK_FRAME(risk_dict) - self.result_stream.send(msg) + self.results_socket.send(msg) # this signals that the simulation is complete. - self.result_stream.send("DONE") + self.results_socket.send("DONE") -class Position(): +class Position(object): def __init__(self, sid): - self.sid = sid - self.amount = 0 - self.cost_basis = 0.0 ##per share + self.sid = sid + self.amount = 0 + self.cost_basis = 0.0 ##per share self.last_sale_price = None - self.last_sale_date = None + self.last_sale_date = None def update(self, txn): if(self.sid != txn.sid): @@ -360,12 +363,12 @@ class Position(): self.cost_basis = 0.0 self.amount = 0 else: - prev_cost = self.cost_basis*self.amount - txn_cost = txn.amount*txn.price - total_cost = prev_cost + txn_cost - total_shares = self.amount + txn.amount + prev_cost = self.cost_basis*self.amount + txn_cost = txn.amount*txn.price + total_cost = prev_cost + txn_cost + total_shares = self.amount + txn.amount self.cost_basis = total_cost/total_shares - self.amount = self.amount + txn.amount + self.amount = self.amount + txn.amount def currentValue(self): return self.amount * self.last_sale_price @@ -375,10 +378,10 @@ class Position(): template = "sid: {sid}, amount: {amount}, cost_basis: {cost_basis}, \ last_sale_price: {last_sale_price}" return template.format( - sid=self.sid, - amount=self.amount, - cost_basis=self.cost_basis, - last_sale_price=self.last_sale_price + sid = self.sid, + amount = self.amount, + cost_basis = self.cost_basis, + last_sale_price = self.last_sale_price ) def to_dict(self): @@ -394,7 +397,7 @@ class Position(): } -class PerformancePeriod(): +class PerformancePeriod(object): def __init__( self, diff --git a/zipline/lines.py b/zipline/lines.py index 0fbd5dc4..070931c5 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -165,12 +165,9 @@ class SimulatedTrading(object): #self.add_transform(self.transaction_sim) self.sim.register_controller( self.con ) - self.sim.on_done = self.shutdown() - self.trading_client.set_algorithm(self.algorithm) - @staticmethod def create_test_zipline(**config): """ @@ -322,20 +319,17 @@ class SimulatedTrading(object): assert n > 0 leased = self.allocator.lease(n) - self.leased_sockets.extend(leased) + return leased def simulate(self, blocking=False): self.started = True self.sim_context = self.sim.simulate() + if blocking: self.sim_context.join() - def shutdown(self): - pass - #self.allocator.reaquire(*self.leased_sockets) - #-------------------------------- # Component property accessors #-------------------------------- diff --git a/zipline/speedups/__init__.py b/zipline/speedups/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/zipline/speedups/example.pyx b/zipline/speedups/example.pyx new file mode 100644 index 00000000..befd4ae7 --- /dev/null +++ b/zipline/speedups/example.pyx @@ -0,0 +1,3 @@ +from libc.stdio cimport printf + +printf("Hello World!") diff --git a/zipline/utils/date_utils.py b/zipline/utils/date_utils.py index 131dd61b..b80ee867 100644 --- a/zipline/utils/date_utils.py +++ b/zipline/utils/date_utils.py @@ -127,3 +127,8 @@ if __name__ == '__main__': for day in trading_days(now, now30): print day print time.time() - tic + +def date_to_datetime(t): + dt = datetime.fromordinal(t.toordinal()) + dt = dt.replace(tzinfo = pytz.utc) + return dt diff --git a/zipline/utils/protocol_utils.py b/zipline/utils/protocol_utils.py index c0b7ad16..7149a7c1 100644 --- a/zipline/utils/protocol_utils.py +++ b/zipline/utils/protocol_utils.py @@ -141,7 +141,7 @@ class ndict(MutableMapping): self.__internal.update(other_nd.__internal) def __repr__(self): - return "namedict: " + str(self.__internal) + return "ndict(%s)" % str(self.__internal) # Faster dictionary comparison? #def __eq__(self, other):