ENH mp property + strategy multiprocessing support

This commit is contained in:
Kevin Johnson
2020-06-17 10:16:41 -07:00
parent 9cb7adb543
commit 40ab535838
4 changed files with 92 additions and 31 deletions
+18 -6
View File
@@ -15,6 +15,7 @@ All the indicators return a named Series or a DataFrame in uppercase underscore
## __Features__
* Has 100+ indicators and utility functions.
* Option to use __multiprocessing__ when using df.ta.strategy(). See below.
* Example Jupyter Notebook under the examples directory.
* A new 'ta' method called 'strategy' that be default, runs __all__ the indicators.
* Abbreviated Indicator names as listed below.
@@ -120,7 +121,7 @@ pd.DataFrame().ta.indicators()
help(ta.log_return)
```
## __New DataFrame Method__: _strategy_
## __New DataFrame Method__: _strategy_ with Multiprocessing
Strategy is a new __Pandas (TA)__ method to facilitate bulk indicator processing. By default, running ```df.ta.strategy()``` will append __all
applicable__ indicators to DataFrame ```df```. Utility methods like ```above```, ```below``` et al are not included.
@@ -129,6 +130,11 @@ applicable__ indicators to DataFrame ```df```. Utility methods like ```above```
```python
# This property only effects df.ta.strategy(). When set to True,
# it enables multiprocessing when processing "ALL" the indicators.
# Default is False
df.ta.mp = True
# Runs and appends all indicators to the current DataFrame by default
# The resultant DataFrame will be large.
df.ta.strategy()
@@ -138,6 +144,13 @@ df.ta.strategy(name='all')
# Use verbose if you want to make sure it is running.
df.ta.strategy(verbose=True)
# Use timed if you want to see how long it takes to run.
df.ta.strategy(timed=True)
# You can change the number of cores to use. Though the
# default will usually be best
df.ta.strategy(cores=4)
# Maybe you do not want certain indicators.
# Just exclude (a list of) them.
df.ta.strategy(exclude=['bop', 'mom', 'percent_return', 'wcp', 'pvi'], verbose=True)
@@ -157,11 +170,11 @@ df.columns
prehl2 = df.ta.hl2(prefix="pre")
print(prehl2.name) # "pre_HL2"
endhl2 = df.ta.hl2(suffix="end")
print(endhl2.name) # "HL2_end"
endhl2 = df.ta.hl2(suffix="post")
print(endhl2.name) # "HL2_post"
bothhl2 = df.ta.hl2(prefix="pre", suffix="end")
print(bothhl2.name) # "pre_HL2_end"
bothhl2 = df.ta.hl2(prefix="pre", suffix="post")
print(bothhl2.name) # "pre_HL2_post"
```
## __New DataFrame Properties__: _reverse_ & _datetime_ordered_
@@ -370,6 +383,5 @@ Use parameter: cumulative=**True** for cumulative results.
# Inspiration
* TradingView: http://www.tradingview.com
* Original TA-LIB: http://ta-lib.org/
* Bukosabino: https://github.com/bukosabino/ta
Please leave any comments, feedback, suggestions, or indicator requests.
+70 -21
View File
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import time
from functools import wraps
from multiprocessing import cpu_count, Pool
from random import random
from time import perf_counter
import pandas as pd
from pandas.core.base import PandasObject
@@ -15,7 +17,16 @@ from pandas_ta.volatility import *
from pandas_ta.volume import *
from pandas_ta.utils import *
version = ".".join(("0", "1", "72b"))
version = ".".join(("0", "1", "73b"))
def worker(args):
df, method, kwargs = args
if method != 'ichimoku':
return df.ta(kind=method, **kwargs)
else:
return df.ta(kind=method, **kwargs)[0]
def finalize(method):
@wraps(method)
@@ -127,6 +138,7 @@ class AnalysisIndicators(BasePandasObject):
>>> print(apo.timed)
"""
_adjusted = None
_mp = False
def __call__(self, kind=None, alias=None, timed=False, verbose=False, **kwargs):
try:
@@ -134,14 +146,13 @@ class AnalysisIndicators(BasePandasObject):
kind = kind.lower()
fn = getattr(self, kind)
if timed: stime = time.perf_counter()
if timed: stime = perf_counter()
# Run the indicator
result = fn(**kwargs)
result = fn(**kwargs) # = getattr(self, kind)(**kwargs)
# Add an alias if passed
if alias:
result.alias = f"{alias}"
if alias: result.alias = f"{alias}"
if timed:
result.timed = final_time(stime)
@@ -172,6 +183,19 @@ class AnalysisIndicators(BasePandasObject):
"""Returns True if the index is a datetime and ordered."""
return is_datetime_ordered(self._df)
@property
def mp(self) -> bool:
"""property: df.ta.mp"""
return self._mp
@mp.setter
def mp(self, value: bool) -> None:
"""property: df.ta.mp = False (Default)"""
if value is not None and isinstance(value, bool):
self._mp = value
else:
self._mp = False
@property
def reverse(self) -> pd.DataFrame:
"""Reverses the DataFrame. Simply: df.iloc[::-1]"""
@@ -200,12 +224,10 @@ class AnalysisIndicators(BasePandasObject):
if result is None: return
else:
prefix = suffix = ""
# delimiter = kwargs.pop("delimiter", "_")
delimiter = kwargs.setdefault("delimiter", "_")
if "prefix" in kwargs:
prefix = f"{kwargs['prefix']}_"
if "suffix" in kwargs:
suffix = f"_{kwargs['suffix']}"
if "prefix" in kwargs: prefix = f"{kwargs['prefix']}{delimiter}"
if "suffix" in kwargs: suffix = f"{delimiter}{kwargs['suffix']}"
if isinstance(result, pd.Series):
result.name = prefix + result.name + suffix
@@ -291,7 +313,7 @@ class AnalysisIndicators(BasePandasObject):
"""
as_list = kwargs.setdefault("as_list", False)
helper_methods = ["constants", "indicators", "strategy"] # Public non-indicator methods
ta_properties = ["adjusted", "datetime_ordered", "reverse", "version"]
ta_properties = ["adjusted", "datetime_ordered", "mp", "reverse", "version"]
exclude_methods = kwargs.setdefault("exclude", None)
ta_indicators = list((x for x in dir(pd.DataFrame().ta) if not x.startswith('_') and not x.endswith('_')))
@@ -314,9 +336,11 @@ class AnalysisIndicators(BasePandasObject):
# ALL Features
def _all(self, **kwargs):
"""Appends by default all non-excluded indicators to the DataFrame. Used by ta.strategy(**kwargs)"""
timed = kwargs.setdefault("timed", False)
verbose = kwargs.setdefault("verbose", False)
user_excluded = kwargs.setdefault("exclude", [])
cpus = cpu_count()
cores = int(kwargs.pop("cores", cpus))
timed = kwargs.pop("timed", False)
verbose = kwargs.pop("verbose", False)
user_excluded = kwargs.pop("exclude", [])
append = kwargs.setdefault("append", True)
excluded = ["above", "above_value", "below", "below_value",
@@ -326,18 +350,40 @@ class AnalysisIndicators(BasePandasObject):
current_columns = len(self._df.columns)
indicators = self.indicators(as_list=True, exclude=excluded)
# Core tuning
if cores <= 2: cores = 1
if cores == 3: cores = 2
if 4 <= cores <= 5: cores -= 2
if verbose:
print(f"[i] All indicators with the following arguments: {kwargs}")
print(f"[i] excluded[{len(excluded)}]: {', '.join(excluded)}")
if timed: stime = time.perf_counter()
if timed: stime = perf_counter()
for kind in indicators:
fn = getattr(self, kind)
fn(**kwargs)
print(f"[+] {kind}") if verbose else None
if not self.mp:
# Display multiprocessing tip 10% of the time.
if random() < 0.1:
print(f"[i] Set 'df.ta.mp = True' to enable multiprocessing. This computer has {cpus} cores. Default: False")
print(f"[i] total indicators: {len(indicators)}, columns added: {len(self._df.columns) - current_columns}")# if verbose else None
methods = [getattr(self, kind) for kind in indicators]
[f(**kwargs) for f in methods]
else:
print(f"[i] multiprocessing: {cores} of {cpu_count()} cores")
pool = Pool(cores)
result = pool.imap_unordered(
worker, ((self._df, ind, kwargs) for ind in indicators), cores
)
pool.close()
pool.join()
# Apply prefixes/suffixes and append to the DataFrame
for r in result:
self._add_prefix_suffix(r, **kwargs)
self._append(r, **kwargs)
print(f"[i] total indicators: {len(indicators)}, columns added: {len(self._df.columns) - current_columns}")
print(f"[i] runtime: {final_time(stime)}") if timed else None
@@ -1313,3 +1359,6 @@ class AnalysisIndicators(BasePandasObject):
result = vp(close=close, volume=volume, width=width, percent=percent, **kwargs)
return result
# if __name__ == "__main__":
# freeze_support()
+3 -3
View File
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
import math
import time
from time import perf_counter
import numpy as np
import pandas as pd
@@ -271,8 +271,8 @@ def fibonacci(**kwargs) -> np.ndarray:
return result
def final_time(stime: time):
time_diff = time.perf_counter() - stime
def final_time(stime):
time_diff = perf_counter() - stime
return f"{time_diff * 1000:2.4f} ms ({time_diff:2.4f} s)"
+1 -1
View File
@@ -15,7 +15,7 @@ setup(
url ="https://github.com/twopirllc/pandas-ta",
maintainer ="Kevin Johnson",
maintainer_email ="appliedmathkj@gmail.com",
# install_requires=['numpy','pandas'],
# install_requires=['pandas'],
download_url ="https://github.com/twopirllc/pandas-ta.git",
keywords =['technical analysis', 'trading', 'python3', 'pandas'],
license ="The MIT License (MIT)",