mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 22:15:26 +08:00
using classes instead of pure generators in component
This commit is contained in:
@@ -4,7 +4,7 @@ from datetime import datetime, timedelta
|
||||
|
||||
from unittest2 import TestCase
|
||||
from collections import defaultdict
|
||||
from zipline.gens.composite import date_sorted_sources
|
||||
from zipline.gens.composites import date_sorted_sources
|
||||
|
||||
from zipline.finance.trading import SIMULATION_STYLE
|
||||
from zipline.core.devsimulator import AddressAllocator
|
||||
@@ -16,17 +16,13 @@ from zipline.utils.test_utils import (
|
||||
launch_monitor
|
||||
)
|
||||
|
||||
|
||||
from zipline.core import Component
|
||||
from zipline.core.component import ComponentSocketArgs
|
||||
from zipline.protocol import (
|
||||
DATASOURCE_FRAME,
|
||||
DATASOURCE_UNFRAME
|
||||
)
|
||||
|
||||
from zipline.gens.tradegens import SpecificEquityTrades
|
||||
from zipline.gens.sort import date_sort
|
||||
from zipline.gens.zmqgen import gen_from_poller
|
||||
|
||||
import logbook
|
||||
log = logbook.Logger('ComponentTestCase')
|
||||
@@ -85,7 +81,6 @@ class ComponentTestCase(TestCase):
|
||||
|
||||
def test_sort(self):
|
||||
monitor = create_monitor(allocator)
|
||||
poller = zmq.Poller()
|
||||
socket_uris = allocator.lease(3)
|
||||
count = 100
|
||||
|
||||
@@ -102,9 +97,7 @@ class ComponentTestCase(TestCase):
|
||||
|
||||
|
||||
comp_a = Component(
|
||||
SpecificEquityTrades,
|
||||
args_a,
|
||||
kwargs_a,
|
||||
SpecificEquityTrades(*args_a, **kwargs_a),
|
||||
monitor,
|
||||
socket_uris[0],
|
||||
DATASOURCE_FRAME,
|
||||
@@ -124,9 +117,7 @@ class ComponentTestCase(TestCase):
|
||||
|
||||
|
||||
comp_b = Component(
|
||||
SpecificEquityTrades,
|
||||
args_b,
|
||||
kwargs_b,
|
||||
SpecificEquityTrades(*args_b, **kwargs_b),
|
||||
monitor,
|
||||
socket_uris[1],
|
||||
DATASOURCE_FRAME,
|
||||
@@ -144,9 +135,7 @@ class ComponentTestCase(TestCase):
|
||||
}
|
||||
|
||||
comp_c = Component(
|
||||
SpecificEquityTrades,
|
||||
args_c,
|
||||
kwargs_c,
|
||||
SpecificEquityTrades(*args_c, **kwargs_c),
|
||||
monitor,
|
||||
socket_uris[2],
|
||||
DATASOURCE_FRAME,
|
||||
|
||||
@@ -8,7 +8,6 @@ import uuid
|
||||
import time
|
||||
import socket
|
||||
import logbook
|
||||
import traceback
|
||||
import humanhash
|
||||
import multiprocessing
|
||||
from setproctitle import setproctitle
|
||||
@@ -25,7 +24,6 @@ from zipline.core.monitor import PARAMETERS
|
||||
from zipline.protocol import (
|
||||
CONTROL_PROTOCOL,
|
||||
COMPONENT_STATE,
|
||||
COMPONENT_FAILURE,
|
||||
CONTROL_FRAME,
|
||||
CONTROL_UNFRAME,
|
||||
EXCEPTION_FRAME
|
||||
@@ -48,24 +46,18 @@ class Component(object):
|
||||
|
||||
def __init__(self,
|
||||
generator,
|
||||
component_id,
|
||||
monitor,
|
||||
socket_uri,
|
||||
frame,
|
||||
unframe
|
||||
):
|
||||
|
||||
assert component_id, \
|
||||
"Every component needs a unique and invariant identifier"
|
||||
assert isinstance(component_id, basestring), \
|
||||
"Components must have string IDs"
|
||||
|
||||
# -----------------
|
||||
# Generator
|
||||
# -----------------
|
||||
self.generator = generator
|
||||
self.frame = frame
|
||||
self.component_id = hash(self.generator)
|
||||
self.component_id = self.generator.get_hash()
|
||||
|
||||
# lock for waiting on monitor "GO"
|
||||
self.waiting = None
|
||||
@@ -120,7 +112,7 @@ class Component(object):
|
||||
The core logic of the all components is run here.
|
||||
"""
|
||||
# The process title so you can watch it in top, ps.
|
||||
setproctitle(self.gen_func.__name__)
|
||||
setproctitle(self.generator.__class__.__name__)
|
||||
self.prefix = "FORK-"
|
||||
|
||||
log.info("Start %r" % self)
|
||||
|
||||
Reference in New Issue
Block a user