Merge branch 'pr/314' into development

This commit is contained in:
Kevin Johnson
2021-06-23 13:45:46 -07:00
+33 -33
View File
@@ -717,43 +717,43 @@ class AnalysisIndicators(BasePandasObject):
if use_multiprocessing:
_total_ta = len(ta)
pool = Pool(self.cores)
# Some magic to optimize chunksize for speed based on total ta indicators
_chunksize = mp_chunksize - 1 if mp_chunksize > _total_ta else int(npLog10(_total_ta)) + 1
if verbose:
print(f"[i] Multiprocessing {_total_ta} indicators with {_chunksize} chunks and {self.cores}/{cpu_count()} cpus.")
with Pool(self.cores) as pool:
# Some magic to optimize chunksize for speed based on total ta indicators
_chunksize = mp_chunksize - 1 if mp_chunksize > _total_ta else int(npLog10(_total_ta)) + 1
if verbose:
print(f"[i] Multiprocessing {_total_ta} indicators with {_chunksize} chunks and {self.cores}/{cpu_count()} cpus.")
results = None
if mode["custom"]:
# Create a list of all the custom indicators into a list
custom_ta = [(
ind["kind"],
ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else (),
{**ind, **kwargs},
) for ind in ta]
# Custom multiprocessing pool. Must be ordered for Chained Strategies
# May fix this to cpus if Chaining/Composition if it remains
results = pool.imap(self._mp_worker, custom_ta, _chunksize)
else:
default_ta = [(ind, tuple(), kwargs) for ind in ta]
# All and Categorical multiprocessing pool.
if all_ordered:
if Imports["tqdm"]:
results = tqdm(pool.imap(self._mp_worker, default_ta, _chunksize)) # Order over Speed
else:
results = pool.imap(self._mp_worker, default_ta, _chunksize) # Order over Speed
results = None
if mode["custom"]:
# Create a list of all the custom indicators into a list
custom_ta = [(
ind["kind"],
ind["params"] if "params" in ind and isinstance(ind["params"], tuple) else (),
{**ind, **kwargs},
) for ind in ta]
# Custom multiprocessing pool. Must be ordered for Chained Strategies
# May fix this to cpus if Chaining/Composition if it remains
results = pool.imap(self._mp_worker, custom_ta, _chunksize)
else:
if Imports["tqdm"]:
results = tqdm(pool.imap_unordered(self._mp_worker, default_ta, _chunksize)) # Speed over Order
default_ta = [(ind, tuple(), kwargs) for ind in ta]
# All and Categorical multiprocessing pool.
if all_ordered:
if Imports["tqdm"]:
results = tqdm(pool.imap(self._mp_worker, default_ta, _chunksize)) # Order over Speed
else:
results = pool.imap(self._mp_worker, default_ta, _chunksize) # Order over Speed
else:
results = pool.imap_unordered(self._mp_worker, default_ta, _chunksize) # Speed over Order
if results is None:
print(f"[X] ta.strategy('{name}') has no results.")
return
if Imports["tqdm"]:
results = tqdm(pool.imap_unordered(self._mp_worker, default_ta, _chunksize)) # Speed over Order
else:
results = pool.imap_unordered(self._mp_worker, default_ta, _chunksize) # Speed over Order
if results is None:
print(f"[X] ta.strategy('{name}') has no results.")
return
pool.close()
pool.join()
self._last_run = get_time(self.exchange, to_string=True)
pool.close()
pool.join()
self._last_run = get_time(self.exchange, to_string=True)
else:
# Without multiprocessing: