diff --git a/tests/test_components.py b/tests/test_components.py deleted file mode 100644 index a9320dde..00000000 --- a/tests/test_components.py +++ /dev/null @@ -1,384 +0,0 @@ -import zmq -import pytz -from pprint import pformat as pf -from datetime import datetime, timedelta - -from unittest2 import TestCase, skip -from collections import defaultdict -from zipline.gens.composites import date_sorted_sources, merged_transforms - -from zipline.core.devsimulator import AddressAllocator -from zipline.gens.transform import Passthrough, StatefulTransform -from zipline.gens.mavg import MovingAverage -from zipline.gens.tradesimulation import TradeSimulationClient as tsc - -from zipline.utils.factory import create_trading_environment -from zipline.test_algorithms import TestAlgorithm - - -from zipline.utils.test_utils import ( - setup_logger, - teardown_logger, - create_monitor, - launch_monitor -) - -from zipline.core import Component -from zipline.protocol import ( - DATASOURCE_FRAME, - DATASOURCE_UNFRAME, - FEED_FRAME, - FEED_UNFRAME, - MERGE_FRAME, - MERGE_UNFRAME, - SIMULATION_STYLE, - PERF_FRAME, - BT_UPDATE_UNFRAME -) - -from zipline.gens.tradegens import SpecificEquityTrades - -import logbook -log = logbook.Logger('ComponentTestCase') - -allocator = AddressAllocator(1000) - - -class ComponentTestCase(TestCase): - - leased_sockets = defaultdict(list) - - def setUp(self): - self.zipline_test_config = { - 'allocator' : allocator, - 'sid' : 133, - 'devel' : False, - 'results_socket' : allocator.lease(1)[0], - 'simulation_style' : SIMULATION_STYLE.FIXED_SLIPPAGE - } - self.ctx = zmq.Context() - setup_logger(self) - - count = 250 - filter = [2,3] - #Set up source a. One minute between events. - args_a = tuple() - kwargs_a = { - 'count' : 2*count, - 'sids' : [1,2,3], - 'start' : datetime(2002,1,3,15, tzinfo = pytz.utc), - 'delta' : timedelta(hours = 6), - 'filter' : filter - } - self.source_a = SpecificEquityTrades(*args_a, **kwargs_a) - - #Set up source b. Two minutes between events. - args_b = tuple() - kwargs_b = { - 'count' : count, - 'sids' : [2,3,4], - 'start' : datetime(2002,1,3,14, tzinfo = pytz.utc), - 'delta' : timedelta(minutes = 5), - 'filter' : filter - } - self.source_b = SpecificEquityTrades(*args_b, **kwargs_b) - - self.environment = create_trading_environment(year = 2002) - - - - def tearDown(self): - teardown_logger(self) - - @skip - def test_source(self): - monitor = create_monitor(allocator) - socket_uri = allocator.lease(1)[0] - count = 100 - - filter = [1,2,3,4] - #Set up source a. One minute between events. - args_a = tuple() - kwargs_a = { - 'sids' : [1,2], - 'start' : datetime(2012,6,6,0,tzinfo=pytz.utc), - 'delta' : timedelta(minutes = 1), - 'filter' : filter, - 'count' : count - } - - trade_gen = SpecificEquityTrades(*args_a, **kwargs_a) - - - comp_a = Component( - trade_gen, - monitor, - socket_uri, - DATASOURCE_FRAME, - DATASOURCE_UNFRAME, - "source_a" - ) - - mon_proc = launch_monitor(monitor) - - for event in comp_a: - log.info(event) - - # wait for the sending process to exit - comp_a.proc.join() - mon_proc.join() - - @skip - def test_sort(self): - monitor = create_monitor(allocator) - socket_uris = allocator.lease(3) - count = 100 - - filter = [1,2,3,4] - #Set up source a. One minute between events. - args_a = tuple() - kwargs_a = { - 'sids' : [1,2], - 'start' : datetime(2012,6,6,0,tzinfo=pytz.utc), - 'delta' : timedelta(minutes = 1), - 'filter' : filter, - 'count' : count - } - trade_gen_a = SpecificEquityTrades(*args_a, **kwargs_a) - - #Set up source b. Two minutes between events. - args_b = tuple() - kwargs_b = { - 'sids' : [2], - 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), - 'delta' : timedelta(minutes = 1), - 'filter' : filter, - 'count' : count - } - trade_gen_b = SpecificEquityTrades(*args_b, **kwargs_b) - - #Set up source c. Three minutes between events. - args_c = tuple() - kwargs_c = { - 'sids' : [3], - 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), - 'delta' : timedelta(minutes = 1), - 'filter' : filter, - 'count' : count - } - - trade_gen_c = SpecificEquityTrades(*args_c, **kwargs_c) - - - comp_a = Component( - trade_gen_a, - monitor, - socket_uris[0], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME, - trade_gen_a.get_hash() - ) - - comp_b = Component( - trade_gen_b, - monitor, - socket_uris[1], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME, - trade_gen_b.get_hash() - ) - - comp_c = Component( - trade_gen_c, - monitor, - socket_uris[2], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME, - trade_gen_c.get_hash() - ) - - sources = [comp_a, comp_b, comp_c] - - sorted_out = date_sorted_sources(*sources) - - mon_proc = launch_monitor(monitor) - - prev = None - sort_count = 0 - for msg in sorted_out: - if prev: - self.assertTrue(msg.dt >= prev.dt, \ - "Messages should be in date ascending order") - prev = msg - sort_count += 1 - - self.assertEqual(count*3, sort_count) - - # wait for processes to finish - comp_a.proc.join() - comp_b.proc.join() - comp_c.proc.join() - mon_proc.join() - - @skip - def test_full(self): - monitor = create_monitor(allocator) - - # ------------------------ - # Run sources in dedicated processes - comp_a = Component( - self.source_a, - monitor, - allocator.lease(1)[0], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME, - self.source_a.get_hash() - ) - - comp_b = Component( - self.source_b, - monitor, - allocator.lease(1)[0], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME, - self.source_b.get_hash() - ) - - # Date sort the sources, and run the sort in a dedicated - # process - sources = [comp_a, comp_b] - - sorted_out = date_sorted_sources(*sources) - - sorted = Component( - sorted_out, - monitor, - allocator.lease(1)[0], - FEED_FRAME, - FEED_UNFRAME, - "sort" - ) - - - passthrough = StatefulTransform(Passthrough) - mavg_price = StatefulTransform( - MovingAverage, - ['price'], - market_aware = False, - delta=timedelta(minutes = 20) - ) - - merged_gen = merged_transforms(sorted, passthrough, mavg_price) - - merged = Component( - merged_gen, - monitor, - allocator.lease(1)[0], - MERGE_FRAME, - MERGE_UNFRAME, - "merge" - ) - - algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3]) - - style = SIMULATION_STYLE.FIXED_SLIPPAGE - - trading_client = tsc(algo, self.environment, style) - tsc_gen = trading_client.simulate(merged) - - tsc_comp = Component( - tsc_gen, - monitor, - allocator.lease(1)[0], - PERF_FRAME, - BT_UPDATE_UNFRAME, - "tsc" - ) - mon_proc = launch_monitor(monitor) - for message in tsc_comp: - log.info(pf(message)) - - - # wait for processes to finish - comp_a.proc.join() - comp_b.proc.join() - sorted.proc.join() - merged.proc.join() - tsc_comp.proc.join() - mon_proc.join() - return - - def test_single_thread(self): - - #Set up source c. Three minutes between events. - - sorted = date_sorted_sources(self.source_a, self.source_b) - - passthrough = StatefulTransform(Passthrough) - mavg_price = StatefulTransform( - MovingAverage, - ['price'], - market_aware=False, - delta=timedelta(minutes = 20), - ) - - merged = merged_transforms(sorted, passthrough, mavg_price) - - algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3]) - style = SIMULATION_STYLE.FIXED_SLIPPAGE - - trading_client = tsc(algo, self.environment, style) - for message in trading_client.simulate(merged): - log.info(pf(message)) - - @skip - def test_compound(self): - monitor = create_monitor(allocator) - - sorted_out = date_sorted_sources(self.source_a, self.source_b) - - sorted = Component( - sorted_out, - monitor, - allocator.lease(1)[0], - FEED_FRAME, - FEED_UNFRAME, - "feed" - ) - - passthrough = StatefulTransform(Passthrough) - mavg_price = StatefulTransform( - MovingAverage, - ['price'], - market_aware = False, - delta=timedelta(minutes = 20) - ) - - merged_gen = merged_transforms(sorted, passthrough, mavg_price) - - merged = Component( - merged_gen, - monitor, - allocator.lease(1)[0], - MERGE_FRAME, - MERGE_UNFRAME, - "merge" - ) - - algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3]) - style = SIMULATION_STYLE.FIXED_SLIPPAGE - - trading_client = tsc(algo, self.environment, style) - tsc_gen = trading_client.simulate(merged) - - - mon_proc = launch_monitor(monitor) - for message in tsc_gen: - log.info(pf(message)) - - - # wait for processes to finish - sorted.proc.join() - merged.proc.join() - mon_proc.join() - return diff --git a/zipline/core/__init__.py b/zipline/core/__init__.py index 6bdbf960..ea179376 100644 --- a/zipline/core/__init__.py +++ b/zipline/core/__init__.py @@ -1,7 +1,5 @@ -from component import Component from monitor import Monitor __all__ = [ - Component, Monitor, ] diff --git a/zipline/core/component.py b/zipline/core/component.py deleted file mode 100644 index c9ce56e6..00000000 --- a/zipline/core/component.py +++ /dev/null @@ -1,651 +0,0 @@ -""" -Contains the base class for all components. -""" - -import os -import sys -import uuid -import time -import socket -import logbook -import humanhash -import multiprocessing -from setproctitle import setproctitle -from collections import namedtuple - - -# pyzmq -import zmq - -from zipline.core.monitor import PARAMETERS - -from zipline.protocol import ( - CONTROL_PROTOCOL, - COMPONENT_STATE, - CONTROL_FRAME, - CONTROL_UNFRAME, - EXCEPTION_FRAME -) - - -log = logbook.Logger('Component') - -class KillSignal(Exception): - def __init__(self): - pass - -class ShutdownSignal(Exception): - def __init__(self): - pass - -ComponentSocketArgs = namedtuple('ComponentSocketArgs',['uri','style','bind']) - -class Component(object): - - # ------------ - # Construction - # ------------ - - def __init__(self, - generator, - monitor, - socket_uri, - frame, - unframe, - component_id - ): - - # ----------------- - # Generator - # ----------------- - self.generator = generator - self.frame = frame - self.component_id = component_id - - # lock for waiting on monitor "GO" - self.waiting = None - - # ----------------- - # ZMQ properties - # ----------------- - self.in_socket_args = ComponentSocketArgs( - uri = socket_uri, - style = zmq.PULL, - bind = False - ) - self.out_socket_args = ComponentSocketArgs( - uri = socket_uri, - style = zmq.PUSH, - bind = True - ) - self.zmq = None - self.context = None - self.out_socket = None - self.in_socket = None - self.monitor = monitor - self.unframe = unframe - self.prefix = "" - - # TODO: state_flag is deprecated, remove - self.state_flag = COMPONENT_STATE.OK - - # track time of last ping we received from monitor - self.last_ping = time.time() - - # Humanhashes make this way easier to debug because they stick - # in your mind unlike a 32 byte string of random hex. - self.guid = uuid.uuid4() - self.huid = humanhash.humanize(self.guid.hex) - - # first, start the generator in its own process. Once - # Monitor says "go", Events from the generator will be - # FRAME'd and PUSH'd to self.socket_uri. - monitor.add_to_topology(self.component_id) - - self.proc = multiprocessing.Process( - target=self.loop_send - ) - self.proc.start() - - # Placeholder for receive generator, which will be - # created in __iter__ - self.recv_gen = None - - - # ------------ - # Core Methods - # ------------ - - def loop_send(self): - """ - The main component loop. This is wrapped inside a - exception reporting context inside of run. - - The core logic of the all components is run here. - """ - try: - # The process title so you can watch it in top, ps. - self.prefix = "FORK-" - setproctitle(self.get_id) - - log.info("Start %r" % self) - log.info("Pid %s" % os.getpid()) - log.info("Group %s" % os.getpgrp()) - - self.open() - - self.signal_ready() - self.lock_ready() - - msg = None - for event in self.generator: - - if hasattr(event, 'dt') and event.dt == 'DONE': - continue - - self.wait_ready() - - self.heartbeat() - msg = self.frame(event) - self.out_socket.send(msg) - - self.signal_done() - - # keep heartbeating until we receive the shutdown - # message from the Monitor (raises a - # ShutdownSignal), or we don't hear from the Monitor - # for MAX_COMPONENT_WAIT. - while True: - self.heartbeat(timeout=1000) - - except Exception as exc: - self.handle_exception(exc) - finally: - log.info("Exiting %r" % self) - - - def create_recv_gen(self): - try: - # return the generator - return self.loop_recv() - except Exception as exc: - self.handle_exception(exc) - finally: - log.info("Created Recv Gen for %r" % self) - - def loop_recv(self): - try: - self.open(send=False) - self.signal_ready() - self.lock_ready() - - # we block on ready here until monitor sends the GO - # self.wait_ready() - for event in self.gen_from_poller(self.poll, self.in_socket, self.unframe): - yield event - - self.signal_done() - except Exception as exc: - self.handle_exception(exc) - finally: - log.info("Exiting %r" % self) - - def gen_from_poller(self, poller, in_socket, unframe): - - while True: - # Since we will yield None to avoid blocking, we need - # to have a small delay to give the poller a chance - # to receive a message from upstream. - socks = dict(poller.poll(100)) - self.heartbeat() - if socks.get(in_socket) == zmq.POLLIN: - message = in_socket.recv() - if message == str(CONTROL_PROTOCOL.DONE): - break - else: - event = unframe(message) - yield event - else: - yield - - def handle_exception(self, exc, re_raise=False): - if isinstance(exc, KillSignal): - # if we get a kill signal, forcibly close all the - # sockets. - self.teardown_sockets() - elif isinstance(exc, ShutdownSignal): - # signal from monitor of an orderly shutdown, - # do nothing. - pass - else: - self.signal_exception(exc) - - def __iter__(self): - return self - - def next(self): - if not self.recv_gen: - self.recv_gen = self.create_recv_gen() - return self.recv_gen.next() - - # ---------------------------- - # Cleanup & Modes of Failure - # ---------------------------- - - def teardown_sockets(self): - """ - Close all zmq sockets safely. This is universal, no matter where - this is running it will need the sockets closed. - """ - log.warn("{id} closing all sockets".format(id=self.get_id)) - #close all the sockets - for sock in self.sockets: - sock.close() - - def shutdown(self): - """ - Clean shutdown. - """ - raise ShutdownSignal() - - def kill(self): - """ - Unclean shutdown. - - Tear down ( fast ) as a mode of failure in the simulation or on - service halt. - """ - raise KillSignal() - - def signal_exception(self, exc=None, scope=None): - """ - All exceptions inside any component should boil back to - this handler. - - Will inform the system that the component has failed and how it - has failed. - """ - self.state_flag = COMPONENT_STATE.EXCEPTION - exc_type, exc_value, exc_traceback = sys.exc_info() - - # if a downstream component fails, this component may try - # sending when there are zero connections to the socket, - # which will raise ZMQError(EAGAIN). So, it doesn't make - # sense to relay this exception to Monitor and the rest - # of the zipline. - if isinstance(exc, zmq.ZMQError) and exc.errno == zmq.EAGAIN: - log.warn("{id} raised a ZMQError(EAGAIN) not relaying"\ - .format(id=self.get_id)) - return - - # sys.stdout.write(trace) - log.exception("Unexpected error in run for {id}.".format(id=self.get_id)) - - try: - log.info('{id} sending exception to monitor'\ - .format(id=self.get_id)) - msg = EXCEPTION_FRAME( - exc_traceback, - exc_type.__name__, - exc_value.message - ) - - exception_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.EXCEPTION, - msg - ) - self.control_out.send(exception_frame, self.zmq.NOBLOCK) - # The monitor should relay the exception back - # to all zipline components. Wait here until the - # notice arrives, and we can assume other zipline - # components have broken out of their message - # loops. - for i in xrange(PARAMETERS.MAX_COMPONENT_WAIT): - self.heartbeat(timeout=1000) - log.warn("{id} never heard back from monitor."\ - .format(id=self.get_id)) - - except KillSignal: - log.info("{id} received confirmation from monitor"\ - .format(id=self.get_id)) - except: - log.exception("Exception waiting for monitor reply") - - - - # ---------------------- - # Internal Maintenance - # ---------------------- - - def lock_ready(self): - """ - Unlock the component, topology is now ready to run. - """ - self.waiting = True - - def unlock_ready(self): - """ - Unlock the component, topology is still pending. - """ - self.waiting = False - - def wait_ready(self): - # Implicit side-effect of unlocking the component iff - # the GO message is received from the monitor level. - # This then unlocks the barrier and proceeds to the - # do_work state. - - # Poll on a subset of the control protocol while we exist - # in the locked quasimode. Respond to HEARTBEAT and GO - # messages. - - start_wait = time.time() - - while self.waiting: - socks = dict(self.poll.poll(0)) - - assert self.control_in, \ - 'Component does not have a control_in socket' - - if socks.get(self.control_in) == zmq.POLLIN: - - msg = self.control_in.recv() - event, payload = CONTROL_UNFRAME(msg) - - # ==== - # Go - # ==== - - # A distributed lock from the monitor to ensure - # synchronized start. - - if event == CONTROL_PROTOCOL.HEARTBEAT: - heartbeat_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.OK, - payload - ) - self.control_out.send(heartbeat_frame) - log.info('Prestart Heartbeat ' + self.get_id) - - elif event == CONTROL_PROTOCOL.GO: - # Side effectful call from the monitor to unlock - # and begin doing work only when the entire topology - # of the system beings to come online - log.info('Unlocking ' + self.get_id) - self.unlock_ready() - - # ========= - # Soft Kill - # ========= - - # Try and clean up properly and send out any reports or - # data that are done during a clean shutdown. Inform the - # monitor that we're done. - elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.shutdown() - break - - # ========= - # Hard Kill - # ========= - - # Just exit. - elif event == CONTROL_PROTOCOL.KILL: - self.kill() - break - - elif time.time() - start_wait > PARAMETERS.MAX_COMPONENT_WAIT: - log.info('No go signal from monitor, %s exiting' \ - % self.get_id) - self.kill() - break - - def heartbeat(self, timeout=0): - # wait for synchronization reply from the host - socks = dict(self.poll.poll(timeout)) - - # ---------------- - # Control Dispatch - # ---------------- - assert self.control_in, 'Component does not have a control_in socket' - - if socks.get(self.control_in) == zmq.POLLIN: - msg = self.control_in.recv() - event, payload = CONTROL_UNFRAME(msg) - - # =========== - # Heartbeat - # =========== - - # The monitor will send out a single number packed in - # a CONTROL_FRAME with ``heartbeat`` event every - # (n)-seconds. The component then has n seconds to - # respond to it. If not then it will be considered as - # malfunctioning or maybe CPU bound. - - if event == CONTROL_PROTOCOL.HEARTBEAT: - # Heart outgoing - heartbeat_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.OK, - payload - ) - - self.last_ping = float(payload) - # Echo back the heartbeat identifier to tell the - # monitor that this component is still alive and - # doing work - self.control_out.send(heartbeat_frame) - - - # ========= - # Soft Kill - # ========= - - # Try and clean up properly and send out any reports or - # data that are done during a clean shutdown. Inform the - # monitor that we're done. - elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.shutdown() - - # ========= - # Hard Kill - # ========= - - # Just exit. - elif event == CONTROL_PROTOCOL.KILL: - self.kill() - - # In case we didn't receive a ping, send a pre-emptive - # pong to the monitor. - elif time.time() - self.last_ping > 2: - # send a ping ahead of schedule - pre_pong = time.time() - heartbeat_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.OK, - str(pre_pong) - ) - - # Echo back the heartbeat identifier to tell the - # monitor that this component is still alive and - # doing work - self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK) - self.last_ping = pre_pong - elif time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: - # monitor is gone without sending the shutdown - # signal, do a hard exit. - self.kill() - - - def signal_ready(self): - log.info(self.get_id + ' is ready') - frame = CONTROL_FRAME( - CONTROL_PROTOCOL.READY, - '' - ) - self.control_out.send(frame) - - def signal_done(self): - """ - Notify down stream components that we're done. - """ - - self.state_flag = COMPONENT_STATE.DONE - # notify internal work loop that we're done - self.done = True # TODO: use state flag - - if self.out_socket: - msg = zmq.Message(str(CONTROL_PROTOCOL.DONE)) - self.out_socket.send(msg) - - - # notify monitor we're done - done_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.DONE, - '' - ) - - self.control_out.send(done_frame) - log.info("[%s] sent control done" % self.get_id) - - # ----------- - # Messaging - # ----------- - - def open(self, send=True): - """ - Open the connections needed to start doing work. - Perform any setup that must be done within process. - """ - self.sockets = [] - self.zmq = zmq - self.context = self.zmq.Context() - self.poll = self.zmq.Poller() - - self.setup_control() - - if send: - self.out_socket = self.open_socket(self.out_socket_args) - self.sockets.extend([self.out_socket]) - else: - self.in_socket = self.open_socket(self.in_socket_args) - self.sockets.extend([self.in_socket]) - - def open_socket(self, sock_args): - if sock_args.bind: - return self.bind_socket(sock_args) - else: - return self.connect_socket(sock_args) - - def bind_socket(self, sock_args): - if sock_args.style == zmq.PULL: - return self.bind_pull_socket(sock_args.uri) - if sock_args.style == zmq.PUSH: - return self.bind_push_socket(sock_args.uri) - if sock_args.style == zmq.PUB: - return self.bind_pub_socket(sock_args.uri) - - raise Exception("Invalid socket arguments") - - def connect_socket(self, sock_args): - if sock_args.style == zmq.PULL: - return self.connect_pull_socket(sock_args.uri) - if sock_args.style == zmq.PUSH: - return self.connect_push_socket(sock_args.uri) - if sock_args.style == zmq.SUB: - return self.connect_sub_socket(sock_args.uri) - - raise Exception("Invalid socket arguments") - - def bind_push_socket(self, addr): - push_socket = self.context.socket(self.zmq.PUSH) - push_socket.bind(addr) - self.sockets.append(push_socket) - - return push_socket - - def connect_pull_socket(self, addr): - pull_socket = self.context.socket(self.zmq.PULL) - pull_socket.connect(addr) - self.sockets.append(pull_socket) - self.poll.register(pull_socket, self.zmq.POLLIN) - - return pull_socket - - - def bind_pull_socket(self, addr): - pull_socket = self.context.socket(self.zmq.PULL) - pull_socket.bind(addr) - self.poll.register(pull_socket, self.zmq.POLLIN) - self.sockets.append(pull_socket) - - return pull_socket - - def connect_push_socket(self, addr): - push_socket = self.context.socket(self.zmq.PUSH) - push_socket.connect(addr) - self.sockets.append(push_socket) - - return push_socket - - - def setup_control(self): - """ - Set up the control socket. Used to monitor the overall status - of the simulation and to forcefully tear down the simulation in - case of a failure. - """ - self.control_out = self.monitor.message_sender( - identity = self.get_id, - context = self.context, - ) - - self.control_in = self.monitor.message_listener( - context = self.context - ) - - self.poll.register(self.control_in, self.zmq.POLLIN) - self.sockets.extend([self.control_in, self.control_out]) - - # --------------------- - # Description and Debug - # --------------------- - - @property - def get_id(self): - """ - The time invariant name for this component. - Must be unique within this zipline. - """ - return self.prefix + self.component_id - - def get_hash(self): - return self.component_id - - def debug(self): - """ - Debug information about the component. - """ - return { - 'id' : self.get_id , - 'huid' : self.huid , - 'host' : socket.gethostname() , - 'pid' : os.getpid() , - 'memaddress' : hex(id(self)) , - 'ready' : self.successful() , - 'successful' : self.ready() , - } - - def __repr__(self): - """ - Return a useful string representation of the component to - indicate its type, unique identifier, and computational context - identifier name. - """ - - return "<{name} {uuid} at {host} {pid} {pointer}>".format( - name = self.get_id , - uuid = self.guid , - host = socket.gethostname() , - pid = os.getpid() , - pointer = hex(id(self)) , - )