Files
simpeg/SimPEG/Parallel.py
T
2015-05-11 10:28:16 -04:00

545 lines
18 KiB
Python

from IPython.parallel import Client, parallel, Reference, require, depend, interactive
from SimPEG.Utils import CommonReducer
import numpy as np
import networkx
DEFAULT_MPI = True
MPI_BELLWETHERS = ['PMI_SIZE', 'OMPI_UNIVERSE_SIZE']
class SuperReference(object):
'''
Object that can be called to return a reference, but
will only be schedulable on the correct worker(s) if
its 'lrank' parameter has been set.
'''
def __init__(self, ref, lrank=None):
if (lrank is None) or (type(lrank) is list):
self.rank = lrank
else:
self.rank = [lrank]
self.ref = ref
def __call__(self, *args, **kwargs):
from IPython.parallel import depend
from IPython.parallel.error import UnmetDependency
if (self.rank is not None) and (globals().get('rank', None) not in self.rank):
raise UnmetDependency('Global \'rank\' does not satisfy requirements')
return self.ref(*args, **kwargs)
class Endpoint(object):
'''
Object that holds the namespace of the SimPEG parallel
footprint on the remote workers.
'''
systemFactory = lambda: None # Callable for constructing system / problem
localFields = {} # Dictionary for storing local fields
globalFields = {} # Dictionary for storing merged fields
localSystems = {} # Dictionary of local subsystem / problem objects
functions = {} # Dictionary of callables to carry out modelling / etc.
fieldspec = {} # Dictionary of callables to setup field storage objects
baseSystemConfig = {} # Base configuration for system
def setupLocalFields(self, whichfields=None):
if whichfields is None:
self.localFields = {}
if getattr(self, 'fieldspec', None) is not None:
for fn in (whichfields or self.fieldspec):
self.localFields[fn] = self.fieldspec[fn]()
def setupLocalSystem(self, subConfig):
systemConfig = self.baseSystemConfig.copy()
systemConfig.update(subConfig)
self.localSystems[subConfig['tag']] = self.systemFactory(systemConfig)
class SystemGraph(networkx.DiGraph):
'''
NetworkX Directed Graph subclass that knows about
job status information, and can return a representation
of itself for use in interactive debugging/testing.
'''
@staticmethod
def _codeStatus(data):
status = 0
if 'jobs' in data:
status = 1 * data['jobs'][-1].ready() + 1
if status > 1:
status += 1 * (not data['jobs'][-1].successful())
return status
def _codeGraph(self):
from networkx.readwrite import json_graph
G = networkx.DiGraph()
for e in self.edges_iter():
G.add_edge(e[0], e[1])
for n, data in self.nodes_iter(data=True):
G.add_node(n, status=self._codeStatus(data))
return json_graph.node_link_data(G)
def RenderHTML(self):
import pkg_resources
from IPython.core import display
import time
data = str(self._codeGraph())
uniqueID = hash(time.time())
formatstr = {
'uniqueID': 'Graph%s'%uniqueID,
'JSONData': data,
}
code = pkg_resources.resource_string('SimPEG', 'Resources/Parallel/SystemGraph.html')%formatstr
return display.HTML(data=code)._repr_html_()
try:
get_ipython().display_formatter.formatters['text/html'].for_type(SystemGraph, SystemGraph.RenderHTML)
except NameError:
pass
class SystemSolver(object):
def __init__(self, dispatcher, endpointName, schedule):
self.dispatcher = dispatcher
self.endpointName = endpointName
self.schedule = schedule
def __call__(self, entry, isrcs):
# TODO: Replace with SuperReference instances
fnformat = '%s.functions["%s"]'
fnRef = Reference(fnformat%(self.endpointName, self.schedule[entry]['solve']))
clearRef = Reference(fnformat%(self.endpointName, self.schedule[entry]['clear']))
reduceLabels = self.schedule[entry]['reduce']
dview = self.dispatcher.remote.dview
lview = self.dispatcher.remote.lview
chunksPerWorker = getattr(self.dispatcher, 'chunksPerWorker', 1)
G = SystemGraph()
mainNode = 'Beginning'
G.add_node(mainNode)
# Parse sources
# TODO: Get from Survey somehow?
nsrc = self.dispatcher.nsrc
if isrcs is None:
isrcslist = range(nsrc)
elif isinstance(isrcs, slice):
isrcslist = range(isrcs.start or 0, isrcs.stop or nsrc, isrcs.step or 1)
else:
try:
_ = isrcs[0]
isrcslist = isrcs
except TypeError:
isrcslist = [isrcs]
# TODO: Replace w/ hook into Endpoint classes
systemsOnWorkers = dview['%s.localSystems.keys()'%self.endpointName]
ids = dview['rank']
tags = set()
for ltags in systemsOnWorkers:
tags = tags.union(set(ltags))
clearJobs = []
endNodes = {}
tailNodes = []
for tag in tags:
tagNode = 'Head: %d, %d'%tag
G.add_edge(mainNode, tagNode)
relIDs = []
for i in xrange(len(ids)):
systems = systemsOnWorkers[i]
rank = ids[i]
if tag in systems:
relIDs.append(i)
systemJobs = []
endNodes[tag] = []
systemNodes = []
with lview.temp_flags(block=False):
iworks = 0
for work in self._getChunks(isrcslist, int(round(chunksPerWorker*len(relIDs)))):
if work:
job = lview.apply(fnRef, Reference(self.endpointName), tag, work)
systemJobs.append(job)
label = 'Compute: %d, %d, %d'%(tag[0], tag[1], iworks)
systemNodes.append(label)
G.add_node(label, jobs=[job])
G.add_edge(tagNode, label)
iworks += 1
if getattr(self.dispatcher, 'ensembleClear', False): # True for ensemble ending, False for individual ending
tagNode = 'Wrap: %d, %d'%tag
for label in systemNodes:
G.add_edge(label, tagNode)
for i in relIDs:
rank = ids[i]
with lview.temp_flags(block=False, after=systemJobs):
# TODO: Remove dependency on self._hasSystemRank, once the SuperReferences
# are able to be used. They will automatically schedule only on the
# correct (allowed) systems.
job = lview.apply(depend(self._hasSystemRank, tag, rank)(clearRef), Reference(self.endpointName), tag)
clearJobs.append(job)
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1], i)
G.add_node(label, jobs=[job])
endNodes[tag].append(label)
G.add_edge(tagNode, label)
else:
for i, sjob in enumerate(systemJobs):
with lview.temp_flags(block=False, follow=sjob):
job = lview.apply(clearRef, Reference(self.endpointName), tag)
clearJobs.append(job)
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1],i)
G.add_node(label, jobs=[job])
endNodes[tag].append(label)
G.add_edge(systemNodes[i], label)
tagNode = 'Tail: %d, %d'%tag
for label in endNodes[tag]:
G.add_edge(label, tagNode)
tailNodes.append(tagNode)
endNode = 'End'
jobs = []
after = clearJobs
for label in reduceLabels:
job = self.dispatcher.remote.reduceLB(Reference(self.endpointName), label, after=after)
after = job
if job is not None:
jobs.append(job)
G.add_node(endNode, jobs=jobs)
for node in tailNodes:
G.add_edge(node, endNode)
return G
def wait(self, G):
self.dispatcher.remote.lview.wait(G.node['End']['jobs'] if G.node['End']['jobs'] else (G.node[wn]['jobs'] for wn in (G.predecessors(tn)[0] for tn in G.predecessors('End'))))
# TODO: Hopefully obsoleted by SuperReference
@staticmethod
@interactive
def _hasSystemRank(tag, wid):
global localSystem
global rank
return (tag in localSystem) and (rank == wid)
@staticmethod
def _getChunks(problems, chunks=1):
nproblems = len(problems)
return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks))
class RemoteInterface(object):
def __init__(self, profile=None, MPI=None, nThreads=1, bootstrap=None):
# TODO: Add interface for namespace bootstrapping from
# the dispatcher / problem side
if profile is not None:
pupdate = {'profile': profile}
else:
pupdate = {}
pclient = Client(**pupdate)
if not self._cdSame(pclient):
print('Could not change all workers to the same directory as the client!')
dview = pclient[:]
dview.block = True
dview.clear()
remoteSetup = '''
import os'''
parMPISetup = '''
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()'''
for command in remoteSetup.strip().split('\n'):
dview.execute(command.strip())
dview.scatter('rank', pclient.ids, flatten=True)
self.e0 = pclient[0]
self.e0.block = True
self.useMPI = False
MPI = DEFAULT_MPI if MPI is None else MPI
if MPI:
MPISafe = False
for var in MPI_BELLWETHERS:
MPISafe = MPISafe or all(dview['os.getenv("%s")'%(var,)])
if MPISafe:
for command in parMPISetup.strip().split('\n'):
dview.execute(command.strip())
ranks = dview['rank']
reorder = [ranks.index(i) for i in xrange(len(ranks))]
dview = pclient[reorder]
dview.block = True
dview.activate()
# Set up necessary parts for broadcast-based communication
self.e0 = pclient[reorder[0]]
self.e0.block = True
self.comm = Reference('comm')
self.useMPI = MPISafe
self.pclient = pclient
self.dview = dview
self.lview = pclient.load_balanced_view()
self.nThreads = nThreads
if bootstrap is not None:
for command in bootstrap.strip().split('\n'):
dview.execute(command.strip())
@property
def nThreads(self):
return self._nThreads
@nThreads.setter
def nThreads(self, value):
self._nThreads = value
self.dview.apply(self._adjustMKLVectorization, self._nThreads)
def __setitem__(self, key, item):
if self.useMPI:
self.e0[key] = item
code = 'if rank != 0: %(key)s = None\n%(key)s = comm.bcast(%(key)s, root=0)'
self.dview.execute(code%{'key': key})
else:
self.dview[key] = item
def __getitem__(self, key):
if self.useMPI:
code = 'temp_%(key)s = None\ntemp_%(key)s = comm.gather(%(key)s, root=%(root)d)'
self.dview.execute(code%{'key': key, 'root': 0})
item = self.e0['temp_%s'%(key,)]
self.e0.execute('del temp_%s'%(key,))
else:
item = self.dview[key]
return item
def reduceLB(self, endpoint, key, after=None):
repeat = lambda value: (value for i in xrange(len(self.pclient.ids)))
if self.useMPI:
with self.lview.temp_flags(block=False, after=after):
job = self.lview.map(self._reduceJob, xrange(len(self.pclient.ids)), repeat(0), repeat(endpoint), repeat(key))
return job
def reduce(self, key, axis=None):
if self.useMPI:
code = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
self.dview.execute(code%{'key': key, 'root': 0})
# if axis is not None:
# code = 'temp_%(key)s = temp_%(key)s.sum(axis=%(axis)d)'
# self.e0.execute(code%{'key': key, 'axis': axis})
item = self.e0['temp_%s'%(key,)]
self.dview.execute('del temp_%s'%(key,))
else:
item = reduce(np.add, self.dview[key])
return item
def reduceMul(self, key1, key2, axis=None):
if self.useMPI:
# Gather
code_reduce = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
self.dview.execute(code_reduce%{'key': key1, 'root': 0})
self.dview.execute(code_reduce%{'key': key2, 'root': 0})
# Multiply
code_mul = 'temp_%(key1)s%(key2)s = temp_%(key1)s * temp_%(key2)s'
self.e0.execute(code_mul%{'key1': key1, 'key2': key2})
# Potentially sum
if axis is not None:
code = 'temp_%(key1)s%(key2)s = temp_%(key1)s%(key2)s.sum(axis=%(axis)d)'
self.e0.execute(code%{'key1': key1, 'key2': key2, 'axis': axis})
# Pull
item = self.e0['temp_%(key1)s%(key2)s'%{'key1': key1, 'key2': key2}]
# Clear
self.dview.execute('del temp_%s'%(key1,))
self.dview.execute('del temp_%s'%(key2,))
self.e0.execute('del temp_%(key1)s%(key2)s'%{'key1': key1, 'key2': key2})
else:
item1 = reduce(np.add, self.dview[key1])
item2 = reduce(np.add, self.dview[key2])
item = item1 * item2
return item
def remoteDifference(self, key1, key2, keyresult):
if self.useMPI:
root = 0
# Gather
code_reduce = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
self.dview.execute(code_reduce%{'key': key1, 'root': root})
self.dview.execute(code_reduce%{'key': key2, 'root': root})
# Difference
code_difference = '%(keyresult)s = temp_%(key1)s - temp_%(key2)s'
self.e0.execute(code_difference%{'key1': key1, 'key2': key2, 'keyresult': keyresult})
# Broadcast
code = 'if rank != 0: %(key)s = None\n%(key)s = comm.bcast(%(key)s, root=%(root)d)'
self.dview.execute(code%{'key': keyresult, 'root': root})
# Clear
self.e0.execute('del temp_%s'%(key1,))
self.e0.execute('del temp_%s'%(key2,))
else:
item1 = reduce(np.add, self.dview[key1])
item2 = reduce(np.add, self.dview[key2])
item = item1 - item2
self.dview[keyresult] = item
def remoteOpGatherFirst(self, op, key1, key2, keyresult):
if self.useMPI:
root = 0
# Gather
code_reduce = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
self.dview.execute(code_reduce%{'key': key1, 'root': root})
# Difference
code_difference = '%(keyresult)s = temp_%(key1)s %(op)s %(key2)s'
self.e0.execute(code_difference%{'op': op, 'key1': key1, 'key2': key2, 'keyresult': keyresult})
# Broadcast
code = 'if rank != 0: %(key)s = None\n%(key)s = comm.bcast(%(key)s, root=%(root)d)'
self.dview.execute(code%{'key': keyresult, 'root': root})
# Clear
self.e0.execute('del temp_%s'%(key1,))
else:
item1 = reduce(np.add, self.dview[key1])
item2 = self.e0[key2] # Assumes that any arbitrary worker has this information
item = eval('item1 %s item2'%(op,))
self.dview[keyresult] = item
def remoteDifferenceGatherFirst(self, *args):
self.remoteOpGatherFirst('-', *args)
def normFromDifference(self, key):
code = 'temp_norm%(key)s = (%(key)s * %(key)s.conj()).sum(0).sum(0)'
self.e0.execute(code%{'key': key})
code = 'temp_norm%(key)s = {key: np.sqrt(temp_norm%(key)s[key]).real for key in temp_norm%(key)s.keys()}'
self.e0.execute(code%{'key': key})
result = CommonReducer(self.e0['temp_norm%s'%(key,)])
self.e0.execute('del temp_norm%s'%(key,))
return result
@staticmethod
@interactive
def _reduceJob(worker, root, endpoint, key):
from IPython.parallel.error import UnmetDependency
if not rank == worker:
raise UnmetDependency
code = 'endpoint.globalFields["%(key)s"] = comm.reduce(endpoint.localFields["%(key)s"], root=%(root)d)'
exec(code%{'key': key, 'root': root})
@staticmethod
def _adjustMKLVectorization(nt=1):
try:
import mkl
except ImportError:
pass
finally:
mkl.set_num_threads(nt)
@staticmethod
def _cdSame(rc):
import os
dview = rc[:]
home = os.getenv('HOME')
cwd = os.getcwd()
@interactive
def cdrel(relpath):
import os
home = os.getenv('HOME')
fullpath = os.path.join(home, relpath)
try:
os.chdir(fullpath)
except OSError:
return False
else:
return True
if cwd.find(home) == 0:
relpath = cwd[len(home)+1:]
return all(rc[:].apply_sync(cdrel, relpath))