mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-05 03:37:55 +08:00
playing with imports
This commit is contained in:
+31
-20
@@ -5,7 +5,6 @@ import json
|
||||
import uuid
|
||||
import datetime
|
||||
import zipline.util as qutil
|
||||
from gevent_zeromq import zmq as gzmq
|
||||
|
||||
class Component(object):
|
||||
|
||||
@@ -46,30 +45,41 @@ class Component(object):
|
||||
NotImplemented
|
||||
|
||||
def run(self):
|
||||
self.run_unsafe()
|
||||
|
||||
def run_safe(self):
|
||||
try:
|
||||
#TODO: can't initialize these values in the __init__?
|
||||
self.done = False
|
||||
self.sockets = []
|
||||
if self.gevent_needed:
|
||||
qutil.LOGGER.info("Loading gevent specific zmq for {id}".format(id=self.get_id()))
|
||||
module = __import__('gevent_zmq', 'zmq')
|
||||
else:
|
||||
qutil.LOGGER.debug("NOT Loading gevent specific zmq for {id}".format(id=self.get_id()))
|
||||
module = __import__('zmq')
|
||||
self.zmq = module
|
||||
self.context = self.zmq.Context()
|
||||
self.open()
|
||||
self.setup_sync()
|
||||
self.loop()
|
||||
#close all the sockets
|
||||
for sock in self.sockets:
|
||||
sock.close()
|
||||
self.run_unsafe()
|
||||
except:
|
||||
qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id()))
|
||||
|
||||
finally:
|
||||
self.context.destroy()
|
||||
|
||||
if(self.context != None):
|
||||
self.context.destroy()
|
||||
|
||||
def run_unsafe(self):
|
||||
#import pdb; pdb.set_trace()
|
||||
#TODO: can't initialize these values in the __init__?
|
||||
self.done = False
|
||||
self.sockets = []
|
||||
if self.gevent_needed:
|
||||
qutil.LOGGER.info("Loading gevent specific zmq for {id}".format(id=self.get_id()))
|
||||
import gevent_zeromq
|
||||
self.zmq = gevent_zeromq.zmq
|
||||
else:
|
||||
qutil.LOGGER.debug("NOT Loading gevent specific zmq for {id}".format(id=self.get_id()))
|
||||
import zmq
|
||||
self.zmq = zmq
|
||||
#self.zmq = module
|
||||
qutil.LOGGER.debug("zmq file: {file}".format(file=self.zmq.__file__))
|
||||
self.context = self.zmq.Context()
|
||||
self.open()
|
||||
self.setup_sync()
|
||||
self.loop()
|
||||
#close all the sockets
|
||||
for sock in self.sockets:
|
||||
sock.close()
|
||||
|
||||
def loop(self):
|
||||
while not self.done:
|
||||
self.confirm()
|
||||
@@ -186,6 +196,7 @@ class ComponentHost(Component):
|
||||
self.feed = ParallelBuffer()
|
||||
self.merge = MergedParallelBuffer()
|
||||
self.passthrough = PassthroughTransform()
|
||||
self.gevent_needed = gevent_needed
|
||||
|
||||
#register the feed and the merge
|
||||
self.register_components([self.feed, self.merge, self.passthrough])
|
||||
|
||||
Reference in New Issue
Block a user