mirror of
https://github.com/wassname/pandas-ta.git
synced 2026-06-27 16:10:07 +08:00
Update core.py
Fix cleanup of multiprocessing resources
This commit is contained in:
+33
-33
@@ -717,43 +717,43 @@ class AnalysisIndicators(BasePandasObject):
|
||||
|
||||
if use_multiprocessing:
|
||||
_total_ta = len(ta)
|
||||
pool = Pool(self.cores)
|
||||
# Some magic to optimize chunksize for speed based on total ta indicators
|
||||
_chunksize = mp_chunksize - 1 if mp_chunksize > _total_ta else int(npLog10(_total_ta)) + 1
|
||||
if verbose:
|
||||
print(f"[i] Multiprocessing {_total_ta} indicators with {_chunksize} chunks and {self.cores}/{cpu_count()} cpus.")
|
||||
with Pool(self.cores) as pool:
|
||||
# Some magic to optimize chunksize for speed based on total ta indicators
|
||||
_chunksize = mp_chunksize - 1 if mp_chunksize > _total_ta else int(npLog10(_total_ta)) + 1
|
||||
if verbose:
|
||||
print(f"[i] Multiprocessing {_total_ta} indicators with {_chunksize} chunks and {self.cores}/{cpu_count()} cpus.")
|
||||
|
||||
results = None
|
||||
if mode["custom"]:
|
||||
# Create a list of all the custom indicators into a list
|
||||
custom_ta = [(
|
||||
ind["kind"],
|
||||
ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else (),
|
||||
{**ind, **kwargs},
|
||||
) for ind in ta]
|
||||
# Custom multiprocessing pool. Must be ordered for Chained Strategies
|
||||
# May fix this to cpus if Chaining/Composition if it remains
|
||||
results = pool.imap(self._mp_worker, custom_ta, _chunksize)
|
||||
else:
|
||||
default_ta = [(ind, tuple(), kwargs) for ind in ta]
|
||||
# All and Categorical multiprocessing pool.
|
||||
if all_ordered:
|
||||
if Imports["tqdm"]:
|
||||
results = tqdm(pool.imap(self._mp_worker, default_ta, _chunksize)) # Order over Speed
|
||||
else:
|
||||
results = pool.imap(self._mp_worker, default_ta, _chunksize) # Order over Speed
|
||||
results = None
|
||||
if mode["custom"]:
|
||||
# Create a list of all the custom indicators into a list
|
||||
custom_ta = [(
|
||||
ind["kind"],
|
||||
ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else (),
|
||||
{**ind, **kwargs},
|
||||
) for ind in ta]
|
||||
# Custom multiprocessing pool. Must be ordered for Chained Strategies
|
||||
# May fix this to cpus if Chaining/Composition if it remains
|
||||
results = pool.imap(self._mp_worker, custom_ta, _chunksize)
|
||||
else:
|
||||
if Imports["tqdm"]:
|
||||
results = tqdm(pool.imap_unordered(self._mp_worker, default_ta, _chunksize)) # Speed over Order
|
||||
default_ta = [(ind, tuple(), kwargs) for ind in ta]
|
||||
# All and Categorical multiprocessing pool.
|
||||
if all_ordered:
|
||||
if Imports["tqdm"]:
|
||||
results = tqdm(pool.imap(self._mp_worker, default_ta, _chunksize)) # Order over Speed
|
||||
else:
|
||||
results = pool.imap(self._mp_worker, default_ta, _chunksize) # Order over Speed
|
||||
else:
|
||||
results = pool.imap_unordered(self._mp_worker, default_ta, _chunksize) # Speed over Order
|
||||
if results is None:
|
||||
print(f"[X] ta.strategy('{name}') has no results.")
|
||||
return
|
||||
if Imports["tqdm"]:
|
||||
results = tqdm(pool.imap_unordered(self._mp_worker, default_ta, _chunksize)) # Speed over Order
|
||||
else:
|
||||
results = pool.imap_unordered(self._mp_worker, default_ta, _chunksize) # Speed over Order
|
||||
if results is None:
|
||||
print(f"[X] ta.strategy('{name}') has no results.")
|
||||
return
|
||||
|
||||
pool.close()
|
||||
pool.join()
|
||||
self._last_run = get_time(self.exchange, to_string=True)
|
||||
pool.close()
|
||||
pool.join()
|
||||
self._last_run = get_time(self.exchange, to_string=True)
|
||||
|
||||
else:
|
||||
# Without multiprocessing:
|
||||
|
||||
Reference in New Issue
Block a user