mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-28 15:55:02 +08:00
Remove old component.py
This commit is contained in:
@@ -1,563 +0,0 @@
|
||||
"""
|
||||
Commonly used messaging components.
|
||||
|
||||
Contains the base class for all components.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
import time
|
||||
import socket
|
||||
import gevent
|
||||
import logging
|
||||
import traceback
|
||||
import humanhash
|
||||
|
||||
# pyzmq
|
||||
import zmq
|
||||
# gevent_zeromq
|
||||
import gevent_zeromq
|
||||
# zmq_ctypes
|
||||
#import zmq_ctypes
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from utils.gpoll import _Poller as GeventPoller
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
|
||||
COMPONENT_FAILURE, BACKTEST_STATE, CONTROL_FRAME
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
|
||||
class Component(object):
|
||||
"""
|
||||
Base class for components. Defines the the base messaging
|
||||
interface for components.
|
||||
|
||||
:param addresses: a dict of name_string -> zmq port address strings.
|
||||
Must have the following entries
|
||||
|
||||
:param sync_address: socket address used for synchronizing the start of
|
||||
all workers, heartbeating, and exit notification
|
||||
will be used in REP/REQ sockets. Bind is always on
|
||||
the REP side.
|
||||
|
||||
:param data_address: socket address used for data sources to stream
|
||||
their records. Will be used in PUSH/PULL sockets
|
||||
between data sources and a Feed. Bind will always
|
||||
be on the PULL side (we always have N producers and
|
||||
1 consumer)
|
||||
|
||||
:param feed_address: socket address used to publish consolidated feed
|
||||
from serialization of data sources
|
||||
will be used in PUB/SUB sockets between Feed and
|
||||
Transforms. Bind is always on the PUB side.
|
||||
|
||||
:param merge_address: socket address used to publish transformed
|
||||
values. will be used in PUSH/PULL from many
|
||||
transforms to one Merge Bind will always be on
|
||||
the PULL side (we always have N producers and
|
||||
1 consumer)
|
||||
|
||||
:param result_address: socket address used to publish merged data
|
||||
source feed and transforms to clients will be
|
||||
used in PUB/SUB from one Merge to one or many
|
||||
clients. Bind is always on the PUB side.
|
||||
|
||||
bind/connect methods will return the correct socket type for each
|
||||
address.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.zmq = None
|
||||
self.context = None
|
||||
self.addresses = None
|
||||
|
||||
self.out_socket = None
|
||||
self.killed = False
|
||||
self.controller = None
|
||||
# timeout after a full minute
|
||||
self.heartbeat_timeout = 60 *1000
|
||||
self.state_flag = COMPONENT_STATE.OK
|
||||
self.error_state = COMPONENT_FAILURE.NOFAILURE
|
||||
self.on_done = None
|
||||
|
||||
self._exception = None
|
||||
self.fail_time = None
|
||||
self.start_tic = None
|
||||
self.stop_tic = None
|
||||
self.note = None
|
||||
self.confirmed = False
|
||||
|
||||
# Humanhashes make this way easier to debug because they
|
||||
# stick in your mind unlike a 32 byte string of random hex.
|
||||
self.guid = uuid.uuid4()
|
||||
self.huid = humanhash.humanize(self.guid.hex)
|
||||
|
||||
self.init()
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
Subclasses should override this to extend the setup for
|
||||
the class. Shouldn't have side effects.
|
||||
"""
|
||||
pass
|
||||
|
||||
# ------------
|
||||
# Core Methods
|
||||
# ------------
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
Open the connections needed to start doing work.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def ready(self):
|
||||
"""
|
||||
Return ``True`` if and only if the component has finished execution.
|
||||
"""
|
||||
return self.state_flag in [COMPONENT_STATE.DONE, \
|
||||
COMPONENT_STATE.EXCEPTION]
|
||||
|
||||
def successful(self):
|
||||
"""
|
||||
Return ``True`` if and only if the component has finished execution
|
||||
successfully, that is, without raising an error.
|
||||
"""
|
||||
return self.state_flag == COMPONENT_STATE.DONE and not \
|
||||
self.exception
|
||||
|
||||
@property
|
||||
def exception(self):
|
||||
"""
|
||||
Holds the exception that the component failed on, or
|
||||
``None`` if the component has not failed.
|
||||
"""
|
||||
return self._exception
|
||||
|
||||
def do_work(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def init_zmq(self, flavor):
|
||||
"""
|
||||
ZMQ in all flavors. Have it your way.
|
||||
|
||||
mp - Distinct contexts | pyzmq
|
||||
thread - Same context | pyzmq
|
||||
green - Same context | gevent_zeromq
|
||||
pypy - Same context | zmq_ctypes
|
||||
|
||||
"""
|
||||
|
||||
if flavor == 'mp':
|
||||
self.zmq = zmq
|
||||
self.context = self.zmq.Context()
|
||||
self.zmq_poller = self.zmq.Poller
|
||||
return
|
||||
if flavor == 'thread':
|
||||
self.zmq = zmq
|
||||
self.context = self.zmq.Context.instance()
|
||||
self.zmq_poller = self.zmq.Poller
|
||||
return
|
||||
if flavor == 'green':
|
||||
self.zmq = gevent_zeromq.zmq
|
||||
self.context = self.zmq.Context.instance()
|
||||
self.zmq_poller = GeventPoller
|
||||
return
|
||||
if flavor == 'pypy':
|
||||
self.zmq = zmq
|
||||
self.context = self.zmq.Context.instance()
|
||||
self.zmq_poller = self.zmq.Poller
|
||||
return
|
||||
|
||||
raise Exception("Unknown ZeroMQ Flavor")
|
||||
|
||||
def _run(self):
|
||||
self.start_tic = time.time()
|
||||
|
||||
self.done = False # TODO: use state flag
|
||||
self.sockets = []
|
||||
|
||||
self.init_zmq(self.zmq_flavor)
|
||||
|
||||
self.setup_poller()
|
||||
|
||||
self.open()
|
||||
self.setup_sync()
|
||||
self.setup_control()
|
||||
|
||||
self.loop()
|
||||
self.shutdown()
|
||||
|
||||
self.stop_tic = time.time()
|
||||
|
||||
def run(self, catch_exceptions=True):
|
||||
"""
|
||||
Run the component.
|
||||
|
||||
Optionally takes an argument to catch and log all exceptions raised
|
||||
during execution ues this with care since it makes it very hard to
|
||||
debug since it mucks up your stacktraces.
|
||||
"""
|
||||
|
||||
if catch_exceptions:
|
||||
try:
|
||||
self._run()
|
||||
except Exception as exc:
|
||||
exc_info = sys.exc_info()
|
||||
self.signal_exception(exc)
|
||||
|
||||
# Reraise the exception
|
||||
raise exc_info[0], exc_info[1], exc_info[2]
|
||||
finally:
|
||||
|
||||
self.shutdown()
|
||||
self.teardown_sockets()
|
||||
|
||||
def working(self):
|
||||
"""
|
||||
Controls when the work loop will start and end
|
||||
|
||||
If we encounter an exception or signal done exit.
|
||||
|
||||
Overload for higher order behavior.
|
||||
"""
|
||||
return (not self.done)
|
||||
|
||||
def loop(self, lockstep=True):
|
||||
"""
|
||||
Loop to do work while we still have work to do.
|
||||
"""
|
||||
while self.working():
|
||||
self.confirm()
|
||||
self.do_work()
|
||||
|
||||
def confirm(self):
|
||||
"""
|
||||
Send a synchronization request to the host.
|
||||
"""
|
||||
if not self.confirmed:
|
||||
# TODO: proper framing
|
||||
self.sync_socket.send(self.get_id + ":RUN")
|
||||
|
||||
self.receive_sync_ack() # blocking
|
||||
self.confirmed = True
|
||||
|
||||
def runtime(self):
|
||||
if self.ready() and self.start_tic and self.stop_tic:
|
||||
return self.stop_tic - self.start_tic
|
||||
|
||||
# ----------------------------
|
||||
# Cleanup & Modes of Failure
|
||||
# ----------------------------
|
||||
|
||||
def teardown_sockets(self):
|
||||
"""
|
||||
Close all zmq sockets safely. This is universal, no matter
|
||||
where this is running it will need the sockets closed.
|
||||
"""
|
||||
#close all the sockets
|
||||
for sock in self.sockets:
|
||||
sock.close()
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Clean shutdown.
|
||||
|
||||
Tear down after normal operation.
|
||||
"""
|
||||
if self.on_done:
|
||||
self.on_done()
|
||||
|
||||
def kill(self):
|
||||
"""
|
||||
Unclean shutdown.
|
||||
|
||||
Tear down ( fast ) as a mode of failure in the
|
||||
simulation or on service halt.
|
||||
|
||||
Context specific.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
# ----------------------
|
||||
# Internal Maintenance
|
||||
# ----------------------
|
||||
|
||||
def signal_exception(self, exc=None, scope=None):
|
||||
"""
|
||||
This is *very* important error tracking handler.
|
||||
|
||||
Will inform the system that the component has failed and
|
||||
how it has failed.
|
||||
"""
|
||||
|
||||
if scope == 'algo':
|
||||
self.error_state = COMPONENT_FAILURE.ALGOEXCEPT
|
||||
else:
|
||||
self.error_state = COMPONENT_FAILURE.HOSTEXCEPT
|
||||
|
||||
self.state_flag = COMPONENT_STATE.EXCEPTION
|
||||
# mark the time of failure so we can track the failure
|
||||
# progogation through the system.
|
||||
|
||||
self.stop_tic = time.time()
|
||||
|
||||
self._exception = exc
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
trace = '\n>>>'.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
|
||||
|
||||
exception_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.EXCEPTION,
|
||||
trace
|
||||
)
|
||||
self.control_out.send(exception_frame)
|
||||
|
||||
LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
|
||||
|
||||
def signal_done(self):
|
||||
"""
|
||||
Notify down stream components that we're done.
|
||||
"""
|
||||
|
||||
self.state_flag = COMPONENT_STATE.DONE
|
||||
|
||||
if self.out_socket:
|
||||
self.out_socket.send(str(CONTROL_PROTOCOL.DONE))
|
||||
|
||||
#notify host we're done
|
||||
# TODO: proper framing
|
||||
self.sync_socket.send(self.get_id + ":" + str(CONTROL_PROTOCOL.DONE))
|
||||
|
||||
#notify controller we're done
|
||||
done_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.DONE,
|
||||
''
|
||||
)
|
||||
self.control_out.send(done_frame)
|
||||
|
||||
self.receive_sync_ack()
|
||||
#notify internal work look that we're done
|
||||
self.done = True # TODO: use state flag
|
||||
|
||||
LOGGER.info("[%s] DONE" % self.get_id)
|
||||
|
||||
# -----------
|
||||
# Messaging
|
||||
# -----------
|
||||
|
||||
def setup_poller(self):
|
||||
"""
|
||||
Setup the poller used for multiplexing the incoming data
|
||||
handling sockets.
|
||||
"""
|
||||
|
||||
# Initializes the poller class specified by the flavor of
|
||||
# ZeroMQ. Either zmq.Poller or gpoll.Poller .
|
||||
self.poll = self.zmq_poller()
|
||||
|
||||
def receive_sync_ack(self):
|
||||
"""
|
||||
Wait for synchronization reply from the host.
|
||||
|
||||
DEPRECATED, left in for compatability for now.
|
||||
"""
|
||||
|
||||
socks = dict(self.sync_poller.poll(self.heartbeat_timeout))
|
||||
if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN:
|
||||
message = self.sync_socket.recv()
|
||||
#else:
|
||||
#raise Exception("Sync ack timed out on response for {id}".format(id=self.get_id))
|
||||
|
||||
def bind_data(self):
|
||||
return self.bind_pull_socket(self.addresses['data_address'])
|
||||
|
||||
def connect_data(self):
|
||||
return self.connect_push_socket(self.addresses['data_address'])
|
||||
|
||||
def bind_feed(self):
|
||||
return self.bind_pub_socket(self.addresses['feed_address'])
|
||||
|
||||
def connect_feed(self):
|
||||
return self.connect_sub_socket(self.addresses['feed_address'])
|
||||
|
||||
def bind_merge(self):
|
||||
return self.bind_pull_socket(self.addresses['merge_address'])
|
||||
|
||||
def connect_merge(self):
|
||||
return self.connect_push_socket(self.addresses['merge_address'])
|
||||
|
||||
def bind_result(self):
|
||||
return self.bind_pub_socket(self.addresses['result_address'])
|
||||
|
||||
def connect_result(self):
|
||||
return self.connect_sub_socket(self.addresses['result_address'])
|
||||
|
||||
def bind_pull_socket(self, addr):
|
||||
pull_socket = self.context.socket(self.zmq.PULL)
|
||||
pull_socket.bind(addr)
|
||||
self.poll.register(pull_socket, self.zmq.POLLIN)
|
||||
|
||||
self.sockets.append(pull_socket)
|
||||
|
||||
return pull_socket
|
||||
|
||||
def connect_push_socket(self, addr):
|
||||
push_socket = self.context.socket(self.zmq.PUSH)
|
||||
push_socket.connect(addr)
|
||||
#push_socket.setsockopt(self.zmq.LINGER,0)
|
||||
self.sockets.append(push_socket)
|
||||
self.out_socket = push_socket
|
||||
|
||||
return push_socket
|
||||
|
||||
def bind_pub_socket(self, addr):
|
||||
pub_socket = self.context.socket(self.zmq.PUB)
|
||||
pub_socket.bind(addr)
|
||||
#pub_socket.setsockopt(self.zmq.LINGER,0)
|
||||
self.out_socket = pub_socket
|
||||
|
||||
return pub_socket
|
||||
|
||||
def connect_sub_socket(self, addr):
|
||||
sub_socket = self.context.socket(self.zmq.SUB)
|
||||
sub_socket.connect(addr)
|
||||
sub_socket.setsockopt(self.zmq.SUBSCRIBE,'')
|
||||
self.sockets.append(sub_socket)
|
||||
|
||||
self.poll.register(sub_socket, self.zmq.POLLIN)
|
||||
|
||||
return sub_socket
|
||||
|
||||
def setup_control(self):
|
||||
"""
|
||||
Set up the control socket. Used to monitor the
|
||||
overall status of the simulation and to forcefully tear
|
||||
down the simulation in case of a failure.
|
||||
"""
|
||||
|
||||
# Allow for the possibility of not having a controller,
|
||||
# possibly the zipline devsimulator may not want this.
|
||||
if not self.controller:
|
||||
return
|
||||
|
||||
self.control_out = self.controller.message_sender(
|
||||
identity = self.get_id,
|
||||
context = self.context,
|
||||
)
|
||||
|
||||
self.control_in = self.controller.message_listener(
|
||||
context = self.context
|
||||
)
|
||||
|
||||
self.poll.register(self.control_in, self.zmq.POLLIN)
|
||||
self.sockets.extend([self.control_in, self.control_out])
|
||||
|
||||
def setup_sync(self):
|
||||
"""
|
||||
Setup the sync socket and poller. ( Connect )
|
||||
|
||||
DEPRECATED, left in for compatability for now.
|
||||
"""
|
||||
|
||||
LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
|
||||
|
||||
self.sync_socket = self.context.socket(self.zmq.REQ)
|
||||
self.sync_socket.connect(self.addresses['sync_address'])
|
||||
#self.sync_socket.setsockopt(self.zmq.LINGER,0)
|
||||
|
||||
self.sync_poller = self.zmq_poller()
|
||||
self.sync_poller.register(self.sync_socket, self.zmq.POLLIN)
|
||||
|
||||
self.sockets.append(self.sync_socket)
|
||||
|
||||
# ---------------------
|
||||
# Description and Debug
|
||||
# ---------------------
|
||||
|
||||
def extern_logger(self):
|
||||
"""
|
||||
Pipe logs out to a provided logging interface.
|
||||
"""
|
||||
pass
|
||||
|
||||
def setup_extern_logger(self):
|
||||
"""
|
||||
Pipe logs out to a provided logging interface.
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
def get_id(self):
|
||||
"""
|
||||
The descriptive name of the component.
|
||||
"""
|
||||
# Prevents the bug that Thomas ran into
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def get_type(self):
|
||||
"""
|
||||
The data flow type of the component.
|
||||
|
||||
- ``SOURCE``
|
||||
- ``CONDUIT``
|
||||
- ``SINK``
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def get_pure(self):
|
||||
"""
|
||||
Describes whehter this component purely functional,
|
||||
i.e. for a given set of inputs is it guaranteed to
|
||||
always give the same output . Components that are
|
||||
side-effectful are, generally, not pure.
|
||||
"""
|
||||
return False
|
||||
|
||||
def note(self):
|
||||
"""
|
||||
Information about the component. Mostly used for testing.
|
||||
"""
|
||||
|
||||
def get_note(self):
|
||||
return self.note or ''
|
||||
|
||||
def debug(self):
|
||||
"""
|
||||
Debug information about the component.
|
||||
"""
|
||||
return {
|
||||
'id' : self.get_id ,
|
||||
'huid' : self.huid ,
|
||||
'host' : socket.gethostname() ,
|
||||
'pid' : os.getpid() ,
|
||||
'memaddress' : hex(id(self)) ,
|
||||
'ready' : self.successful() ,
|
||||
'succesfull' : self.ready() ,
|
||||
}
|
||||
|
||||
def __len__(self):
|
||||
"""
|
||||
Some components overload this for debug purposes
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
Return a usefull string representation of the component
|
||||
to indicate its type, unique identifier, and computational
|
||||
context identifier name.
|
||||
"""
|
||||
|
||||
return "<{name} {uuid} at {host} {pid} {pointer}>".format(
|
||||
name = self.get_id ,
|
||||
uuid = self.huid ,
|
||||
host = socket.gethostname() ,
|
||||
pid = os.getpid() ,
|
||||
pointer = hex(id(self)) ,
|
||||
)
|
||||
@@ -1,4 +1,6 @@
|
||||
import zipline.protocol as zp
|
||||
from zipline.transforms import BaseTransform
|
||||
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
|
||||
COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME
|
||||
|
||||
@@ -24,6 +26,10 @@ class PassthroughTransform(BaseTransform):
|
||||
def get_type(self):
|
||||
return COMPONENT_TYPE.CONDUIT
|
||||
|
||||
#TODO, could save some cycles by skipping the _UNFRAME call and just setting value to original msg string.
|
||||
#TODO, could save some cycles by skipping the _UNFRAME call
|
||||
# and just setting value to original msg string.
|
||||
def transform(self, event):
|
||||
return {'name':zp.TRANSFORM_TYPE.PASSTHROUGH, 'value': zp.FEED_FRAME(event) }
|
||||
return {
|
||||
'name' : zp.TRANSFORM_TYPE.PASSTHROUGH,
|
||||
'value' : zp.FEED_FRAME(event)
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ from zipline.core import Component
|
||||
|
||||
import zipline.protocol as zp
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
|
||||
COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME
|
||||
CONTROL_FRAME, CONTROL_UNFRAME
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user