diff --git a/SimPEG/Parallel.py b/SimPEG/Parallel.py index 73c02408..6f4e32e9 100644 --- a/SimPEG/Parallel.py +++ b/SimPEG/Parallel.py @@ -6,56 +6,10 @@ import networkx DEFAULT_MPI = True MPI_BELLWETHERS = ['PMI_SIZE', 'OMPI_UNIVERSE_SIZE'] -def getChunks(problems, chunks=1): - nproblems = len(problems) - return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks)) - -@interactive -def hasSystem(tag): - global localSystem - return tag in localSystem - -@interactive -def hasSystemRank(tag, wid): - global localSystem - global rank - return (tag in localSystem) and (rank == wid) - -def cdSame(rc): - import os - - dview = rc[:] - - home = os.getenv('HOME') - cwd = os.getcwd() - - @interactive - def cdrel(relpath): - import os - home = os.getenv('HOME') - fullpath = os.path.join(home, relpath) - try: - os.chdir(fullpath) - except OSError: - return False - else: - return True - - if cwd.find(home) == 0: - relpath = cwd[len(home)+1:] - return all(rc[:].apply_sync(cdrel, relpath)) - -def adjustMKLVectorization(nt=1): - try: - import mkl - except ImportError: - pass - finally: - mkl.set_num_threads(nt) - class SystemGraph(networkx.DiGraph): - def _codeStatus(self, data): + @staticmethod + def _codeStatus(data): status = 0 @@ -169,7 +123,7 @@ class SystemSolver(object): with lview.temp_flags(block=False): iworks = 0 - for work in getChunks(isrcslist, int(round(chunksPerWorker*len(relIDs)))): + for work in self._getChunks(isrcslist, int(round(chunksPerWorker*len(relIDs)))): if work: job = lview.apply(fnRef, tag, work) systemJobs.append(job) @@ -189,7 +143,7 @@ class SystemSolver(object): rank = ids[i] with lview.temp_flags(block=False, after=systemJobs): - job = lview.apply(depend(hasSystemRank, tag, rank)(clearRef), tag) + job = lview.apply(depend(self._hasSystemRank, tag, rank)(clearRef), tag) clearJobs.append(job) label = 'Wrap: %d, %d, %d'%(tag[0],tag[1], i) G.add_node(label, jobs=[job]) @@ -228,20 +182,17 @@ class SystemSolver(object): def wait(self, G): self.dispatcher.remote.lview.wait(G.node['End']['jobs'] if G.node['End']['jobs'] else (G.node[wn]['jobs'] for wn in (G.predecessors(tn)[0] for tn in G.predecessors('End')))) -@interactive -def reduceJob(worker, root, key): + @staticmethod + @interactive + def _hasSystemRank(tag, wid): + global localSystem + global rank + return (tag in localSystem) and (rank == wid) - from IPython.parallel.error import UnmetDependency - if not rank == worker: - raise UnmetDependency - - from SimPEG.Utils import CommonReducer - - # exec('global %s'%key) - - code = 'globals()["%(key)s"] = comm.reduce(%(key)s, root=%(root)d)' - exec(code%{'key': key, 'root': root}) - exec('globals()["%(key)s"] = %(key)s if %(key)s is not None else CommonReducer()'%{'key': key}) + @staticmethod + def _getChunks(problems, chunks=1): + nproblems = len(problems) + return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks)) class RemoteInterface(object): @@ -254,7 +205,7 @@ class RemoteInterface(object): pclient = Client(**pupdate) - if not cdSame(pclient): + if not self._cdSame(pclient): print('Could not change all workers to the same directory as the client!') dview = pclient[:] @@ -320,7 +271,7 @@ class RemoteInterface(object): @nThreads.setter def nThreads(self, value): self._nThreads = value - self.dview.apply(adjustMKLVectorization, self._nThreads) + self.dview.apply(self._adjustMKLVectorization, self._nThreads) def __setitem__(self, key, item): @@ -352,7 +303,7 @@ class RemoteInterface(object): if self.useMPI: with self.lview.temp_flags(block=False, after=after): - job = self.lview.map(reduceJob, xrange(len(self.pclient.ids)), repeat(0), repeat(key)) + job = self.lview.map(self._reduceJob, xrange(len(self.pclient.ids)), repeat(0), repeat(key)) return job @@ -476,4 +427,54 @@ class RemoteInterface(object): result = CommonReducer(self.e0['temp_norm%s'%(key,)]) self.e0.execute('del temp_norm%s'%(key,)) - return result \ No newline at end of file + return result + + @staticmethod + @interactive + def _reduceJob(worker, root, key): + + from IPython.parallel.error import UnmetDependency + if not rank == worker: + raise UnmetDependency + + from SimPEG.Utils import CommonReducer + + # exec('global %s'%key) + + code = 'globals()["%(key)s"] = comm.reduce(%(key)s, root=%(root)d)' + exec(code%{'key': key, 'root': root}) + exec('globals()["%(key)s"] = %(key)s if %(key)s is not None else CommonReducer()'%{'key': key}) + + @staticmethod + def _adjustMKLVectorization(nt=1): + try: + import mkl + except ImportError: + pass + finally: + mkl.set_num_threads(nt) + + @staticmethod + def _cdSame(rc): + import os + + dview = rc[:] + + home = os.getenv('HOME') + cwd = os.getcwd() + + @interactive + def cdrel(relpath): + import os + home = os.getenv('HOME') + fullpath = os.path.join(home, relpath) + try: + os.chdir(fullpath) + except OSError: + return False + else: + return True + + if cwd.find(home) == 0: + relpath = cwd[len(home)+1:] + return all(rc[:].apply_sync(cdrel, relpath))