ENH add no multiprocessing option to strategy

This commit is contained in:
M6stafa
2021-01-26 22:46:59 +03:30
parent 53404e57cc
commit 35e8e56252
3 changed files with 78 additions and 25 deletions
+4
View File
@@ -341,6 +341,7 @@ df.ta.strategy(verbose=True)
df.ta.strategy(timed=True)
# Choose the number of cores to use. Default is all available cores.
# For no multiprocessing, set this value to 0.
df.ta.cores = 4
# Maybe you do not want certain indicators.
@@ -405,6 +406,9 @@ df.ta.categories
# Defaults to the number of cpus you have.
df.ta.cores = 4
# Set the number of cores to 0 for no multiprocessing.
df.ta.cores = 0
# Returns the number of cores you set or your default number of cpus.
df.ta.cores
```
+39 -25
View File
@@ -294,7 +294,7 @@ class AnalysisIndicators(BasePandasObject):
"""property: df.ta.cores = integer"""
cpus = cpu_count()
if value is not None and isinstance(value, int):
self._cores = int(value) if 0 < value <= cpus else cpus
self._cores = int(value) if 0 <= value <= cpus else cpus
else:
self._cores = cpus
@@ -637,47 +637,61 @@ class AnalysisIndicators(BasePandasObject):
timed = kwargs.pop("timed", False)
results = []
pool = Pool(self.cores)
use_multiprocessing = True if self.cores > 0 else False
has_col_names = False
if timed:
stime = perf_counter()
if mode["custom"]:
if use_multiprocessing and mode["custom"]:
# Determine if the Custom Model has 'col_names' parameter
has_col_names = (True if len([
True for x in ta
if "col_names" in x and isinstance(x["col_names"], tuple)
]) else False)
# Create a list of all the custom indicators into a list
custom_ta = [(
ind["kind"],
ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else (),
{**ind, **kwargs},
) for ind in ta]
if has_col_names:
if verbose:
print(f"[i] No mulitproccessing support for 'col_names' option.")
# Without multiprocessing:
for ind in ta:
params = ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else tuple()
getattr(self, ind["kind"])(*params, **{**ind, **kwargs})
else:
if verbose:
print(f"[i] Multiprocessing: {self.cores} of {cpu_count()} cores.")
use_multiprocessing = False
if use_multiprocessing:
if verbose:
print(f"[i] Multiprocessing: {self.cores} of {cpu_count()} cores.")
pool = Pool(self.cores)
if mode["custom"]:
# Create a list of all the custom indicators into a list
custom_ta = [(
ind["kind"],
ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else (),
{**ind, **kwargs},
) for ind in ta]
# Custom multiprocessing pool. Must be ordered for Chained Strategies
# May fix this to cpus if Chaining/Composition if it remains
# inconsistent
results = pool.imap(self._mp_worker, custom_ta, self.cores)
else:
default_ta = [(ind, tuple(), kwargs) for ind in ta]
# All and Categorical multiprocessing pool. Speed over Order.
results = pool.imap_unordered(self._mp_worker, default_ta, self.cores)
pool.close()
pool.join()
else:
# Without multiprocessing:
if verbose:
print(f"[i] Multiprocessing: {self.cores} of {cpu_count()} cores.")
default_ta = [(ind, tuple(), kwargs) for ind in ta]
# All and Categorical multiprocessing pool. Speed over Order.
results = pool.imap_unordered(self._mp_worker, default_ta, self.cores)
pool.close()
pool.join()
if has_col_names:
print(f"[i] No mulitproccessing support for 'col_names' option.")
else:
print(f"[i] No mulitproccessing (cores = 0).")
if mode["custom"]:
for ind in ta:
params = ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else tuple()
getattr(self, ind["kind"])(*params, **{**ind, **kwargs})
else:
for ind in ta:
getattr(self, ind)(*tuple(), **kwargs)
# Apply prefixes/suffixes and appends indicator results to the
# DataFrame
+35
View File
@@ -182,3 +182,38 @@ class TestStrategyMethods(TestCase):
def test_volume_category(self):
self.category = "Volume"
self.data.ta.strategy(self.category, verbose=verbose, timed=strategy_timed)
# @skip
def test_all_no_multiprocessing(self):
self.category = "All with No Multiprocessing"
cores = self.data.ta.cores
self.data.ta.cores = 0
self.data.ta.strategy(verbose=verbose, timed=strategy_timed)
self.data.ta.cores = cores
# @skip
def test_custom_no_multiprocessing(self):
self.category = "Custom A with No Multiprocessing"
cores = self.data.ta.cores
self.data.ta.cores = 0
momo_bands_sma_ta = [
{"kind": "rsi"}, # 1
{"kind": "macd"}, # 3
{"kind": "sma", "length": 50}, # 1
{"kind": "sma", "length": 200 }, # 1
{"kind": "bbands", "length": 20}, # 3
{"kind": "log_return", "cumulative": True}, # 1
{"kind": "ema", "close": "CUMLOGRET_1", "length": 5, "suffix": "CLR"}
]
custom = pandas_ta.Strategy(
"Commons with Cumulative Log Return EMA Chain", # name
momo_bands_sma_ta, # ta
"Common indicators with specific lengths and a chained indicator", # description
)
self.data.ta.strategy(custom, verbose=verbose, timed=strategy_timed)
self.data.ta.cores = cores