mirror of
https://github.com/wassname/pandas-ta.git
synced 2026-06-27 16:10:07 +08:00
MAINT refactor TST mod BUG lingreg return
This commit is contained in:
@@ -116,6 +116,7 @@ env/**
|
||||
pandas_ta/_wrapper.py
|
||||
|
||||
# twopirllc stuff
|
||||
ta-lib/
|
||||
AlphaVantageAPI/
|
||||
|
||||
data/datas.csv
|
||||
|
||||
@@ -28,4 +28,5 @@ test_ta:
|
||||
python -m unittest -v -f tests/test_indicator_*.py
|
||||
|
||||
test_utils:
|
||||
python -m unittest -v -f tests/test_utils.py
|
||||
python -m unittest -v -f tests/test_utils.py
|
||||
python -m unittest -v -f tests/test_utils_metrics.py
|
||||
@@ -2,11 +2,13 @@ name = "pandas_ta"
|
||||
"""
|
||||
.. moduleauthor:: Kevin Johnson
|
||||
"""
|
||||
from importlib.util import find_spec
|
||||
from pkg_resources import get_distribution, DistributionNotFound
|
||||
import os.path
|
||||
|
||||
|
||||
_dist = get_distribution("pandas_ta")
|
||||
try:
|
||||
_dist = get_distribution("pandas_ta")
|
||||
# Normalize case for Windows systems
|
||||
dist_loc = os.path.normcase(_dist.location)
|
||||
here = os.path.normcase(__file__)
|
||||
@@ -18,7 +20,6 @@ except DistributionNotFound:
|
||||
|
||||
version = __version__ = _dist.version
|
||||
|
||||
from importlib.util import find_spec
|
||||
|
||||
Imports = {
|
||||
"scipy": find_spec("scipy") is not None,
|
||||
|
||||
@@ -18,8 +18,7 @@ def kdj(high=None, low=None, close=None, length=None, signal=None, offset=None,
|
||||
highest_high = high.rolling(length).max()
|
||||
lowest_low = low.rolling(length).min()
|
||||
|
||||
fastk = 100 * (close - lowest_low) / non_zero_range(highest_high,
|
||||
lowest_low)
|
||||
fastk = 100 * (close - lowest_low) / non_zero_range(highest_high, lowest_low)
|
||||
|
||||
k = rma(fastk, length=signal)
|
||||
d = rma(k, length=signal)
|
||||
|
||||
@@ -24,7 +24,7 @@ def ssf(close, length=None, poles=None, offset=None, **kwargs):
|
||||
a0 = npExp(-x) # e^(-x)
|
||||
b0 = 2 * a0 * npCos(npSqrt(3) * x) # 2e^(-x)*cos(3^(.5) * x)
|
||||
c0 = a0 * a0 # e^(-2x)
|
||||
|
||||
|
||||
c4 = c0 * c0 # e^(-4x)
|
||||
c3 = -c0 * (1 + b0) # -e^(-2x) * (1 + 2e^(-x)*cos(3^(.5) * x))
|
||||
c2 = c0 + b0 # e^(-2x) + 2e^(-x)*cos(3^(.5) * x)
|
||||
|
||||
@@ -39,7 +39,7 @@ def drawdown(close, offset=None, **kwargs) -> DataFrame:
|
||||
dd.fillna(method=kwargs["fill_method"], inplace=True)
|
||||
dd_pct.fillna(method=kwargs["fill_method"], inplace=True)
|
||||
dd_log.fillna(method=kwargs["fill_method"], inplace=True)
|
||||
|
||||
|
||||
# Name and Categorize it
|
||||
dd.name = "DD"
|
||||
dd_pct.name = f"{dd.name}_PCT"
|
||||
|
||||
@@ -29,23 +29,23 @@ def increasing(close, length=None, strict=None, asint=None, offset=None, **kwarg
|
||||
# Offset
|
||||
if offset != 0:
|
||||
increasing = increasing.shift(offset)
|
||||
|
||||
|
||||
# Handle fills
|
||||
if "fillna" in kwargs:
|
||||
increasing.fillna(kwargs["fillna"], inplace=True)
|
||||
if "fill_method" in kwargs:
|
||||
increasing.fillna(method=kwargs["fill_method"], inplace=True)
|
||||
|
||||
|
||||
# Name and Categorize it
|
||||
increasing.name = f"{'S' if strict else ''}INC_{length}"
|
||||
increasing.category = "trend"
|
||||
|
||||
|
||||
return increasing
|
||||
|
||||
|
||||
increasing.__doc__ = \
|
||||
"""Increasing
|
||||
|
||||
|
||||
Returns True if the series is increasing over a period, False otherwise. If the kwarg 'strict' is True, it returns True if it is continuously increasing over the period. When using the kwarg 'asint', then it returns 1 for True or 0 for False.
|
||||
|
||||
Calculation:
|
||||
|
||||
@@ -13,10 +13,8 @@ def long_run(fast, slow, length=None, offset=None, **kwargs):
|
||||
offset = get_offset(offset)
|
||||
|
||||
# Calculate Result
|
||||
pb = increasing(fast, length) & decreasing(
|
||||
slow, length) # potential bottom or bottom
|
||||
bi = increasing(fast, length) & increasing(
|
||||
slow, length) # fast and slow are increasing
|
||||
pb = increasing(fast, length) & decreasing(slow, length) # potential bottom or bottom
|
||||
bi = increasing(fast, length) & increasing(slow, length) # fast and slow are increasing
|
||||
long_run = pb | bi
|
||||
|
||||
# Offset
|
||||
|
||||
@@ -15,16 +15,16 @@ def qstick(open_, close, length=None, offset=None, **kwargs):
|
||||
# Calculate Result
|
||||
diff = non_zero_range(close, open_)
|
||||
|
||||
if ma in [None, "sma"]:
|
||||
qstick = sma(diff, length=length)
|
||||
if ma == "dema":
|
||||
qstick = dema(diff, length=length, **kwargs)
|
||||
if ma == "ema":
|
||||
elif ma == "ema":
|
||||
qstick = ema(diff, length=length, **kwargs)
|
||||
if ma == "hma":
|
||||
elif ma == "hma":
|
||||
qstick = hma(diff, length=length)
|
||||
if ma == "rma":
|
||||
elif ma == "rma":
|
||||
qstick = rma(diff, length=length)
|
||||
else: # "sma"
|
||||
qstick = sma(diff, length=length)
|
||||
|
||||
# Offset
|
||||
if offset != 0:
|
||||
|
||||
@@ -14,8 +14,7 @@ def short_run(fast, slow, length=None, offset=None, **kwargs):
|
||||
|
||||
# Calculate Result
|
||||
pt = decreasing(fast, length) & increasing(slow, length) # potential top or top
|
||||
bd = decreasing(fast, length) & decreasing(
|
||||
slow, length) # fast and slow are decreasing
|
||||
bd = decreasing(fast, length) & decreasing(slow, length) # fast and slow are decreasing
|
||||
short_run = pt | bd
|
||||
|
||||
# Offset
|
||||
|
||||
+23
-19
@@ -13,6 +13,7 @@ from numpy import corrcoef as npCorrcoef
|
||||
from numpy import dot as npDot
|
||||
from numpy import exp as npExp
|
||||
from numpy import log as npLog
|
||||
from numpy import NaN as npNaN
|
||||
from numpy import ndarray as npNdArray
|
||||
from numpy import seterr
|
||||
from numpy import sqrt as npSqrt
|
||||
@@ -42,7 +43,7 @@ def combination(**kwargs: dict) -> int:
|
||||
return numerator // denominator
|
||||
|
||||
|
||||
def fibonacci(n: int = 2, **kwargs) -> npNdArray:
|
||||
def fibonacci(n: int = 2, **kwargs: dict) -> npNdArray:
|
||||
"""Fibonacci Sequence as a numpy array"""
|
||||
n = int(fabs(n)) if n >= 0 else 2
|
||||
|
||||
@@ -110,7 +111,7 @@ def log_geometric_mean(series: Series) -> float:
|
||||
return 0
|
||||
|
||||
|
||||
def pascals_triangle(n: int = None, **kwargs) -> npNdArray:
|
||||
def pascals_triangle(n: int = None, **kwargs: dict) -> npNdArray:
|
||||
"""Pascal's Triangle
|
||||
|
||||
Returns a numpy array of the nth row of Pascal's Triangle.
|
||||
@@ -138,7 +139,7 @@ def pascals_triangle(n: int = None, **kwargs) -> npNdArray:
|
||||
return triangle
|
||||
|
||||
|
||||
def symmetric_triangle(n: int = None, **kwargs) -> Optional[List[int]]:
|
||||
def symmetric_triangle(n: int = None, **kwargs: dict) -> Optional[List[int]]:
|
||||
"""Symmetric Triangle with n >= 2
|
||||
|
||||
Returns a numpy array of the nth row of Symmetric Triangle.
|
||||
@@ -184,7 +185,7 @@ def zero(x: Tuple[int, float]) -> Tuple[int, float]:
|
||||
# TESTING
|
||||
|
||||
|
||||
def df_error_analysis(dfA: DataFrame, dfB: DataFrame, **kwargs) -> DataFrame:
|
||||
def df_error_analysis(dfA: DataFrame, dfB: DataFrame, **kwargs: dict) -> DataFrame:
|
||||
"""DataFrame Correlation Analysis helper"""
|
||||
corr_method = kwargs.pop("corr_method", "pearson")
|
||||
|
||||
@@ -207,26 +208,29 @@ def df_error_analysis(dfA: DataFrame, dfB: DataFrame, **kwargs) -> DataFrame:
|
||||
# PRIVATE
|
||||
def _linear_regression_np(x: Series, y: Series) -> dict:
|
||||
"""Simple Linear Regression in Numpy for two 1d arrays for environments without the sklearn package."""
|
||||
m = x.size
|
||||
result = {"a": npNaN, "b": npNaN, "r": npNaN, "t": npNaN, "line": npNaN}
|
||||
x_sum = x.sum()
|
||||
y_sum = y.sum()
|
||||
|
||||
# 1st row, 2nd col value corr(x, y)
|
||||
r = npCorrcoef(x, y)[0, 1]
|
||||
if int(x_sum) != 0:
|
||||
# 1st row, 2nd col value corr(x, y)
|
||||
r = npCorrcoef(x, y)[0, 1]
|
||||
|
||||
r_mix = m * (x * y).sum() - x_sum * y_sum
|
||||
b = r_mix / (m * (x * x).sum() - x_sum * x_sum)
|
||||
a = y.mean() - b * x.mean()
|
||||
line = a + b * x
|
||||
m = x.size
|
||||
r_mix = m * (x * y).sum() - x_sum * y_sum
|
||||
b = r_mix // (m * (x * x).sum() - x_sum * x_sum)
|
||||
a = y.mean() - b * x.mean()
|
||||
line = a + b * x
|
||||
|
||||
_np_err = seterr()
|
||||
seterr(divide="ignore", invalid="ignore")
|
||||
result = {
|
||||
"a": a, "b": b, "r": r,
|
||||
"t": r / npSqrt((1 - r * r) / (m - 2)),
|
||||
"line": line,
|
||||
}
|
||||
seterr(divide=_np_err["divide"], invalid=_np_err["invalid"])
|
||||
|
||||
_np_err = seterr()
|
||||
seterr(divide="ignore", invalid="ignore")
|
||||
result = {
|
||||
"a": a, "b": b, "r": r,
|
||||
"t": r / npSqrt((1 - r * r) / (m - 2)),
|
||||
"line": line,
|
||||
}
|
||||
seterr(divide=_np_err["divide"], invalid=_np_err["invalid"])
|
||||
return result
|
||||
|
||||
def _linear_regression_sklearn(x: Series, y: Series) -> dict:
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
from typing import Tuple
|
||||
|
||||
from numpy import log as npLog
|
||||
from numpy import NaN as npNaN
|
||||
from numpy import sqrt as npSqrt
|
||||
from pandas import Series, Timedelta
|
||||
|
||||
@@ -92,7 +93,7 @@ def log_max_drawdown(close: Series) -> float:
|
||||
|
||||
Args:
|
||||
close (pd.Series): Series of 'close's
|
||||
|
||||
|
||||
>>> result = ta.log_max_drawdown(close)
|
||||
"""
|
||||
close = verify_series(close)
|
||||
@@ -102,7 +103,7 @@ def log_max_drawdown(close: Series) -> float:
|
||||
|
||||
def max_drawdown(close: Series, method:str = None, all:bool = False) -> float:
|
||||
"""Maximum Drawdown from close. Default: 'dollar'.
|
||||
|
||||
|
||||
Args:
|
||||
close (pd.Series): Series of 'close's
|
||||
method (str): Max DD calculation options: 'dollar', 'percent', 'log'. Default: 'dollar'
|
||||
@@ -157,7 +158,7 @@ def optimal_leverage(
|
||||
return amount
|
||||
|
||||
|
||||
def pure_profit_score(close: Series) -> float:
|
||||
def pure_profit_score(close: Series) -> Tuple[float, int]:
|
||||
"""Pure Profit Score of a series.
|
||||
|
||||
Args:
|
||||
@@ -169,7 +170,9 @@ def pure_profit_score(close: Series) -> float:
|
||||
close_index = Series(0, index=close.reset_index().index)
|
||||
|
||||
r = linear_regression(close_index, close)["r"]
|
||||
return r * cagr(close)
|
||||
if r is not npNaN:
|
||||
return r * cagr(close)
|
||||
return 0
|
||||
|
||||
|
||||
def sharpe_ratio(close: Series, benchmark_rate: float = 0.0, log: bool = False, use_cagr: bool = False, period: int = RATE["TRADING_DAYS_PER_YEAR"]) -> float:
|
||||
@@ -225,7 +228,7 @@ def volatility(close: Series, tf: str = "years", returns: bool = False, log: boo
|
||||
>>> result = ta.volatility(close, tf="years", returns=False, log=False, **kwargs)
|
||||
"""
|
||||
close = verify_series(close)
|
||||
|
||||
|
||||
if not returns:
|
||||
returns = percent_return(close=close) if not log else log_return(close=close)
|
||||
else:
|
||||
|
||||
@@ -55,8 +55,8 @@ Calculation:
|
||||
length=14, drift=1, percent=False
|
||||
SMA = Simple Moving Average
|
||||
EMA = Exponential Moving Average
|
||||
WMA = Weighted Moving Average
|
||||
WMA = Weighted Moving Average
|
||||
WMA = Weighted Moving Average
|
||||
WMA = Weighted Moving Average
|
||||
RMA = WildeR's Moving Average
|
||||
TR = True Range
|
||||
|
||||
@@ -69,7 +69,7 @@ Calculation:
|
||||
ATR = WMA(tr, length)
|
||||
else:
|
||||
ATR = RMA(tr, length)
|
||||
|
||||
|
||||
if percent:
|
||||
ATR *= 100 / close
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ def thermo(high, low, length=None, long=None, short=None, mamode=None, drift=Non
|
||||
thermo_ma.name = f"THERMOma{_props}"
|
||||
thermo_long.name = f"THERMOl{_props}"
|
||||
thermo_short.name = f"THERMOs{_props}"
|
||||
|
||||
|
||||
thermo.category = thermo_ma.category = thermo_long.category = thermo_short.category = "volatility"
|
||||
|
||||
# Prepare Dataframe to return
|
||||
@@ -97,7 +97,7 @@ Calculation:
|
||||
|
||||
thermo_long = thermo < (thermo_ma * long)
|
||||
thermo_short = thermo > (thermo_ma * short)
|
||||
thermo_long = thermo_long.astype(int)
|
||||
thermo_long = thermo_long.astype(int)
|
||||
thermo_short = thermo_short.astype(int)
|
||||
|
||||
Args:
|
||||
|
||||
@@ -23,23 +23,6 @@ def aobv(close, volume, fast=None, slow=None, mamode=None, max_lookback=None, mi
|
||||
|
||||
# Calculate Result
|
||||
obv_ = obv(close=close, volume=volume, **kwargs)
|
||||
# if mamode is None or mamode == "EMA":
|
||||
# mamode = "EMA"
|
||||
# maf = ema(close=obv_, length=fast, **kwargs)
|
||||
# mas = ema(close=obv_, length=slow, **kwargs)
|
||||
# elif mamode == "HMA":
|
||||
# maf = hma(close=obv_, length=fast, **kwargs)
|
||||
# mas = hma(close=obv_, length=slow, **kwargs)
|
||||
# elif mamode == "LINREG":
|
||||
# maf = linreg(close=obv_, length=fast, **kwargs)
|
||||
# mas = linreg(close=obv_, length=slow, **kwargs)
|
||||
# elif mamode == "SMA":
|
||||
# maf = sma(close=obv_, length=fast, **kwargs)
|
||||
# mas = sma(close=obv_, length=slow, **kwargs)
|
||||
# elif mamode == "WMA":
|
||||
# maf = wma(close=obv_, length=fast, **kwargs)
|
||||
# mas = wma(close=obv_, length=slow, **kwargs)
|
||||
|
||||
maf = ma(mamode, obv_, length=fast, **kwargs)
|
||||
mas = ma(mamode, obv_, length=slow, **kwargs)
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ setup(
|
||||
"pandas_ta.volatility",
|
||||
"pandas_ta.volume"
|
||||
],
|
||||
version=".".join(("0", "2", "32b")),
|
||||
version=".".join(("0", "2", "33b")),
|
||||
description=long_description,
|
||||
long_description=long_description,
|
||||
author="Kevin Johnson",
|
||||
|
||||
@@ -9,8 +9,8 @@ from .context import pandas_ta
|
||||
from unittest import skip, skipUnless, TestCase
|
||||
from pandas import DataFrame
|
||||
|
||||
|
||||
cores = 4
|
||||
# Strategy Testing Parameters
|
||||
cores = cpu_count()
|
||||
cumulative = False
|
||||
speed_table = False
|
||||
strategy_timed = False
|
||||
|
||||
@@ -61,7 +61,6 @@ class TestUtilityMetrics(TestCase):
|
||||
|
||||
def test_jensens_alpha(self):
|
||||
bench_return = self.pctret.sample(n=self.close.shape[0], random_state=1)
|
||||
|
||||
result = pandas_ta.jensens_alpha(self.close, bench_return)
|
||||
self.assertIsInstance(result, float)
|
||||
self.assertGreaterEqual(result, 0)
|
||||
@@ -98,7 +97,7 @@ class TestUtilityMetrics(TestCase):
|
||||
|
||||
def test_pure_profit_score(self):
|
||||
result = pandas_ta.pure_profit_score(self.close)
|
||||
self.assertIsInstance(result, float)
|
||||
self.assertIsInstance(result, int or float)
|
||||
self.assertGreaterEqual(result, 0)
|
||||
|
||||
def test_sharpe_ratio(self):
|
||||
@@ -119,5 +118,6 @@ class TestUtilityMetrics(TestCase):
|
||||
|
||||
for tf in ["years", "months", "weeks", "days", "hours", "minutes", "seconds"]:
|
||||
result = pandas_ta.utils.volatility(self.close, tf)
|
||||
self.assertIsInstance(result, float)
|
||||
self.assertGreaterEqual(result, 0)
|
||||
with self.subTest(tf=tf):
|
||||
self.assertIsInstance(result, float)
|
||||
self.assertGreaterEqual(result, 0)
|
||||
Reference in New Issue
Block a user