mirror of
https://github.com/wassname/simpeg.git
synced 2026-07-04 01:24:50 +08:00
Move module-level functions to static methods for their respective classes.
This commit is contained in:
+68
-67
@@ -6,56 +6,10 @@ import networkx
|
||||
DEFAULT_MPI = True
|
||||
MPI_BELLWETHERS = ['PMI_SIZE', 'OMPI_UNIVERSE_SIZE']
|
||||
|
||||
def getChunks(problems, chunks=1):
|
||||
nproblems = len(problems)
|
||||
return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks))
|
||||
|
||||
@interactive
|
||||
def hasSystem(tag):
|
||||
global localSystem
|
||||
return tag in localSystem
|
||||
|
||||
@interactive
|
||||
def hasSystemRank(tag, wid):
|
||||
global localSystem
|
||||
global rank
|
||||
return (tag in localSystem) and (rank == wid)
|
||||
|
||||
def cdSame(rc):
|
||||
import os
|
||||
|
||||
dview = rc[:]
|
||||
|
||||
home = os.getenv('HOME')
|
||||
cwd = os.getcwd()
|
||||
|
||||
@interactive
|
||||
def cdrel(relpath):
|
||||
import os
|
||||
home = os.getenv('HOME')
|
||||
fullpath = os.path.join(home, relpath)
|
||||
try:
|
||||
os.chdir(fullpath)
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
if cwd.find(home) == 0:
|
||||
relpath = cwd[len(home)+1:]
|
||||
return all(rc[:].apply_sync(cdrel, relpath))
|
||||
|
||||
def adjustMKLVectorization(nt=1):
|
||||
try:
|
||||
import mkl
|
||||
except ImportError:
|
||||
pass
|
||||
finally:
|
||||
mkl.set_num_threads(nt)
|
||||
|
||||
class SystemGraph(networkx.DiGraph):
|
||||
|
||||
def _codeStatus(self, data):
|
||||
@staticmethod
|
||||
def _codeStatus(data):
|
||||
|
||||
status = 0
|
||||
|
||||
@@ -169,7 +123,7 @@ class SystemSolver(object):
|
||||
|
||||
with lview.temp_flags(block=False):
|
||||
iworks = 0
|
||||
for work in getChunks(isrcslist, int(round(chunksPerWorker*len(relIDs)))):
|
||||
for work in self._getChunks(isrcslist, int(round(chunksPerWorker*len(relIDs)))):
|
||||
if work:
|
||||
job = lview.apply(fnRef, tag, work)
|
||||
systemJobs.append(job)
|
||||
@@ -189,7 +143,7 @@ class SystemSolver(object):
|
||||
rank = ids[i]
|
||||
|
||||
with lview.temp_flags(block=False, after=systemJobs):
|
||||
job = lview.apply(depend(hasSystemRank, tag, rank)(clearRef), tag)
|
||||
job = lview.apply(depend(self._hasSystemRank, tag, rank)(clearRef), tag)
|
||||
clearJobs.append(job)
|
||||
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1], i)
|
||||
G.add_node(label, jobs=[job])
|
||||
@@ -228,20 +182,17 @@ class SystemSolver(object):
|
||||
def wait(self, G):
|
||||
self.dispatcher.remote.lview.wait(G.node['End']['jobs'] if G.node['End']['jobs'] else (G.node[wn]['jobs'] for wn in (G.predecessors(tn)[0] for tn in G.predecessors('End'))))
|
||||
|
||||
@interactive
|
||||
def reduceJob(worker, root, key):
|
||||
@staticmethod
|
||||
@interactive
|
||||
def _hasSystemRank(tag, wid):
|
||||
global localSystem
|
||||
global rank
|
||||
return (tag in localSystem) and (rank == wid)
|
||||
|
||||
from IPython.parallel.error import UnmetDependency
|
||||
if not rank == worker:
|
||||
raise UnmetDependency
|
||||
|
||||
from SimPEG.Utils import CommonReducer
|
||||
|
||||
# exec('global %s'%key)
|
||||
|
||||
code = 'globals()["%(key)s"] = comm.reduce(%(key)s, root=%(root)d)'
|
||||
exec(code%{'key': key, 'root': root})
|
||||
exec('globals()["%(key)s"] = %(key)s if %(key)s is not None else CommonReducer()'%{'key': key})
|
||||
@staticmethod
|
||||
def _getChunks(problems, chunks=1):
|
||||
nproblems = len(problems)
|
||||
return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks))
|
||||
|
||||
class RemoteInterface(object):
|
||||
|
||||
@@ -254,7 +205,7 @@ class RemoteInterface(object):
|
||||
|
||||
pclient = Client(**pupdate)
|
||||
|
||||
if not cdSame(pclient):
|
||||
if not self._cdSame(pclient):
|
||||
print('Could not change all workers to the same directory as the client!')
|
||||
|
||||
dview = pclient[:]
|
||||
@@ -320,7 +271,7 @@ class RemoteInterface(object):
|
||||
@nThreads.setter
|
||||
def nThreads(self, value):
|
||||
self._nThreads = value
|
||||
self.dview.apply(adjustMKLVectorization, self._nThreads)
|
||||
self.dview.apply(self._adjustMKLVectorization, self._nThreads)
|
||||
|
||||
|
||||
def __setitem__(self, key, item):
|
||||
@@ -352,7 +303,7 @@ class RemoteInterface(object):
|
||||
|
||||
if self.useMPI:
|
||||
with self.lview.temp_flags(block=False, after=after):
|
||||
job = self.lview.map(reduceJob, xrange(len(self.pclient.ids)), repeat(0), repeat(key))
|
||||
job = self.lview.map(self._reduceJob, xrange(len(self.pclient.ids)), repeat(0), repeat(key))
|
||||
|
||||
return job
|
||||
|
||||
@@ -476,4 +427,54 @@ class RemoteInterface(object):
|
||||
result = CommonReducer(self.e0['temp_norm%s'%(key,)])
|
||||
self.e0.execute('del temp_norm%s'%(key,))
|
||||
|
||||
return result
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
@interactive
|
||||
def _reduceJob(worker, root, key):
|
||||
|
||||
from IPython.parallel.error import UnmetDependency
|
||||
if not rank == worker:
|
||||
raise UnmetDependency
|
||||
|
||||
from SimPEG.Utils import CommonReducer
|
||||
|
||||
# exec('global %s'%key)
|
||||
|
||||
code = 'globals()["%(key)s"] = comm.reduce(%(key)s, root=%(root)d)'
|
||||
exec(code%{'key': key, 'root': root})
|
||||
exec('globals()["%(key)s"] = %(key)s if %(key)s is not None else CommonReducer()'%{'key': key})
|
||||
|
||||
@staticmethod
|
||||
def _adjustMKLVectorization(nt=1):
|
||||
try:
|
||||
import mkl
|
||||
except ImportError:
|
||||
pass
|
||||
finally:
|
||||
mkl.set_num_threads(nt)
|
||||
|
||||
@staticmethod
|
||||
def _cdSame(rc):
|
||||
import os
|
||||
|
||||
dview = rc[:]
|
||||
|
||||
home = os.getenv('HOME')
|
||||
cwd = os.getcwd()
|
||||
|
||||
@interactive
|
||||
def cdrel(relpath):
|
||||
import os
|
||||
home = os.getenv('HOME')
|
||||
fullpath = os.path.join(home, relpath)
|
||||
try:
|
||||
os.chdir(fullpath)
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
if cwd.find(home) == 0:
|
||||
relpath = cwd[len(home)+1:]
|
||||
return all(rc[:].apply_sync(cdrel, relpath))
|
||||
|
||||
Reference in New Issue
Block a user