mirror of
https://github.com/wassname/simpeg.git
synced 2026-06-28 20:24:27 +08:00
639 lines
22 KiB
Python
639 lines
22 KiB
Python
from ipyparallel import Client, parallel, Reference, require, depend, interactive
|
|
from SimPEG.Utils import CommonReducer
|
|
import numpy as np
|
|
import networkx
|
|
|
|
DEFAULT_MPI = True
|
|
MPI_BELLWETHERS = ['PMI_SIZE', 'OMPI_UNIVERSE_SIZE']
|
|
|
|
class SuperReference(object):
|
|
'''
|
|
Object that can be called to return a reference, but
|
|
will only be schedulable on the correct worker(s) if
|
|
its 'lrank' parameter has been set.
|
|
'''
|
|
|
|
def __init__(self, ref, lrank=None):
|
|
|
|
if (lrank is None) or (type(lrank) is list):
|
|
self.rank = lrank
|
|
else:
|
|
self.rank = [lrank]
|
|
|
|
self.ref = ref
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
|
|
from ipyparallel import depend
|
|
from ipyparallel.error import UnmetDependency
|
|
|
|
if (self.rank is not None) and (globals().get('rank', None) not in self.rank):
|
|
raise UnmetDependency('Global \'rank\' does not satisfy requirements')
|
|
|
|
return self.ref(*args, **kwargs)
|
|
|
|
class Endpoint(object):
|
|
'''
|
|
Object that holds the namespace of the SimPEG parallel
|
|
footprint on the remote workers.
|
|
'''
|
|
|
|
problemFactory = lambda: None # Callable for constructing system / problem
|
|
surveyFactory = lambda: None # Callable for constructing survey
|
|
localFields = {} # Dictionary for storing local fields
|
|
globalFields = {} # Dictionary for storing merged fields
|
|
localProblems = {} # Dictionary of local subsystem / problem objects
|
|
localSurveys = {} # Dictionary of local survey objects
|
|
functions = {} # Dictionary of callables to carry out modelling / etc.
|
|
fieldspec = None # Dictionary of callables to setup field storage objects
|
|
baseSystemConfig = {} # Base configuration for system
|
|
|
|
def setupLocalFields(self, whichfields=None):
|
|
|
|
# If no names are specified, clear all fields first
|
|
if whichfields is None:
|
|
self.localFields = {}
|
|
|
|
# If we have a 'fieldspec' object...
|
|
if getattr(self, 'fieldspec', None) is not None:
|
|
# ...either loop over the specified names, or all the fields...
|
|
for fn in (whichfields or self.fieldspec):
|
|
# ...and construct a new empty object per the 'fieldspec' constructor.
|
|
self.localFields[fn] = self.fieldspec[fn]()
|
|
|
|
def setupLocalSurveys(self, subConfigs):
|
|
|
|
# Loop over possible survey configurations (may differ in source terms, etc.)
|
|
for isub in subConfigs:
|
|
# For each 'isub' create a separate copy of the base configuration...
|
|
geom = self.baseSystemConfig['geom'].copy()
|
|
# ...and update with any differences...
|
|
geom.update(subConfigs[isub])
|
|
# ...then construct the Survey object and store it for later pairing.
|
|
self.localSurveys[isub] = self.surveyFactory(geom)
|
|
|
|
def setupLocalProblem(self, subConfig):
|
|
|
|
# Make a copy w/o the geometry information (which is used by Survey)
|
|
systemConfig = {key: self.baseSystemConfig[key] for key in self.baseSystemConfig if key not in ['geom']}
|
|
|
|
# Update with the configuration for this subproblem
|
|
systemConfig.update(subConfig)
|
|
|
|
# Create the local subproblem...
|
|
problem = self.problemFactory(systemConfig)
|
|
# ...and pair it with a corresponding survey for this 'isub' (e.g., frequency)...
|
|
problem.pair(self.localSurveys[subConfig['isub']])
|
|
# ...then store in the Endpoint for later access by the scheduler.
|
|
self.localProblems[subConfig['tag']] = problem
|
|
|
|
|
|
class SystemGraph(networkx.DiGraph):
|
|
'''
|
|
NetworkX Directed Graph subclass that knows about
|
|
job status information, and can return a representation
|
|
of itself for use in interactive debugging/testing.
|
|
'''
|
|
|
|
@staticmethod
|
|
def _codeStatus(data):
|
|
|
|
status = 0
|
|
|
|
if 'jobs' in data:
|
|
status = 1 * data['jobs'][-1].ready() + 1
|
|
if status > 1:
|
|
status += 1 * (not data['jobs'][-1].successful())
|
|
|
|
return status
|
|
|
|
def _codeGraph(self):
|
|
from networkx.readwrite import json_graph
|
|
|
|
G = networkx.DiGraph()
|
|
|
|
for e in self.edges_iter():
|
|
G.add_edge(e[0], e[1])
|
|
|
|
for n, data in self.nodes_iter(data=True):
|
|
G.add_node(n, status=self._codeStatus(data))
|
|
|
|
return json_graph.node_link_data(G)
|
|
|
|
def RenderHTML(self):
|
|
import pkg_resources
|
|
from IPython.core import display
|
|
import time
|
|
|
|
data = str(self._codeGraph())
|
|
uniqueID = hash(time.time())
|
|
|
|
formatstr = {
|
|
'uniqueID': 'Graph%s'%uniqueID,
|
|
'JSONData': data,
|
|
}
|
|
|
|
code = pkg_resources.resource_string('SimPEG', 'Resources/Parallel/SystemGraph.html')%formatstr
|
|
|
|
return display.HTML(data=code)._repr_html_()
|
|
|
|
try:
|
|
get_ipython().display_formatter.formatters['text/html'].for_type(SystemGraph, SystemGraph.RenderHTML)
|
|
except NameError:
|
|
pass
|
|
|
|
class SystemSolver(object):
|
|
|
|
def __init__(self, problem, schedule):
|
|
|
|
self.problem = problem
|
|
self.remote = problem.remote
|
|
self.schedule = schedule
|
|
|
|
def __call__(self, entry, isrcs):
|
|
|
|
# TODO: Replace with SuperReference instances
|
|
fnformat = '%s.functions["%s"]'
|
|
fnRef = Reference(fnformat%(self.remote.endpointName, self.schedule[entry]['solve']))
|
|
clearRef = Reference(fnformat%(self.remote.endpointName, self.schedule[entry]['clear']))
|
|
reduceLabels = self.schedule[entry]['reduce']
|
|
|
|
dview = self.problem.remote.dview
|
|
lview = self.problem.remote.lview
|
|
|
|
chunksPerWorker = getattr(self.problem, 'chunksPerWorker', 1)
|
|
|
|
G = SystemGraph()
|
|
|
|
mainNode = 'Beginning'
|
|
G.add_node(mainNode)
|
|
|
|
# Parse sources
|
|
# TODO: Get from Survey somehow?
|
|
nsrc = self.problem.nsrc
|
|
if isrcs is None:
|
|
isrcs = slice(None)
|
|
|
|
elif not isinstance(isrcs, slice):
|
|
raise Exception('Scheduler must run over slice or None!')
|
|
|
|
# TODO: Replace w/ hook into Endpoint classes
|
|
systemsOnWorkers = dview['%s.localProblems.keys()'%self.remote.endpointName]
|
|
ids = dview['rank']
|
|
tags = set()
|
|
for ltags in systemsOnWorkers:
|
|
tags = tags.union(set(ltags))
|
|
|
|
clearJobs = []
|
|
endNodes = {}
|
|
tailNodes = []
|
|
|
|
for tag in tags:
|
|
|
|
tagNode = 'Head: %d, %d'%tag
|
|
G.add_edge(mainNode, tagNode)
|
|
|
|
relIDs = []
|
|
for i in xrange(len(ids)):
|
|
|
|
systems = systemsOnWorkers[i]
|
|
rank = ids[i]
|
|
|
|
if tag in systems:
|
|
relIDs.append(rank)
|
|
|
|
systemJobs = []
|
|
endNodes[tag] = []
|
|
systemNodes = []
|
|
|
|
with lview.temp_flags(block=False):
|
|
iworks = 0
|
|
for work in self._subSlice(isrcs, int(round(chunksPerWorker*len(relIDs)))):
|
|
if work:
|
|
job = lview.apply(fnRef, Reference(self.remote.endpointName), tag, work)
|
|
systemJobs.append(job)
|
|
label = 'Compute: %d, %d, %d'%(tag[0], tag[1], iworks)
|
|
systemNodes.append(label)
|
|
G.add_node(label, jobs=[job], subslice=work, tag=tag)
|
|
G.add_edge(tagNode, label)
|
|
iworks += 1
|
|
|
|
if getattr(self.problem, 'ensembleClear', False): # True for ensemble ending, False for individual ending
|
|
tagNode = 'Wrap: %d, %d'%tag
|
|
for label in systemNodes:
|
|
G.add_edge(label, tagNode)
|
|
|
|
for rank in relIDs:
|
|
|
|
with lview.temp_flags(block=False, after=systemJobs):
|
|
# TODO: Remove dependency on self._hasSystemRank, once the SuperReferences
|
|
# are able to be used. They will automatically schedule only on the
|
|
# correct (allowed) systems.
|
|
job = lview.apply(clearRef, Reference(self.remote.endpointName), tag, rank)
|
|
clearJobs.append(job)
|
|
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1], rank)
|
|
G.add_node(label, jobs=[job], tag=tag, rank=rank)
|
|
endNodes[tag].append(label)
|
|
G.add_edge(tagNode, label)
|
|
else:
|
|
|
|
for i, sjob in enumerate(systemJobs):
|
|
with lview.temp_flags(block=False, follow=sjob, after=sjob):
|
|
job = lview.apply(clearRef, Reference(self.remote.endpointName), tag)
|
|
clearJobs.append(job)
|
|
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1],i)
|
|
G.add_node(label, jobs=[job])
|
|
endNodes[tag].append(label)
|
|
G.add_edge(systemNodes[i], label)
|
|
|
|
tagNode = 'Tail: %d, %d'%tag
|
|
for label in endNodes[tag]:
|
|
G.add_edge(label, tagNode)
|
|
tailNodes.append(tagNode)
|
|
|
|
endNode = 'End'
|
|
jobs = []
|
|
after = clearJobs
|
|
for label in reduceLabels:
|
|
job = self.problem.remote.reduceLB(Reference(self.remote.endpointName), label, after)
|
|
after = job
|
|
if job is not None:
|
|
jobs.append(job)
|
|
G.add_node(endNode, jobs=jobs)
|
|
for node in tailNodes:
|
|
G.add_edge(node, endNode)
|
|
|
|
return G
|
|
|
|
def wait(self, G):
|
|
self.problem.remote.lview.wait(G.node['End']['jobs'] if G.node['End']['jobs'] else (G.node[wn]['jobs'] for wn in (G.predecessors(tn)[0] for tn in G.predecessors('End'))))
|
|
|
|
# TODO: Hopefully obsoleted by SuperReference
|
|
@staticmethod
|
|
@interactive
|
|
def _hasSystemRank(endpoint, tag, wid):
|
|
global rank
|
|
|
|
return (tag in endpoint.localProblems) and (rank == wid)
|
|
|
|
@staticmethod
|
|
def _getChunks(problems, chunks=1):
|
|
nproblems = len(problems)
|
|
return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks))
|
|
|
|
@staticmethod
|
|
def _subSlice(insl, chunks=1):
|
|
start = insl.start or 0
|
|
nproblems = insl.stop - start
|
|
return [slice(start + i*nproblems/chunks, start + (i+1)*nproblems/chunks) for i in xrange(chunks)]
|
|
|
|
|
|
class RemoteInterface(object):
|
|
|
|
def __init__(self, profile=None, MPI=None, nThreads=1, bootstrap=None, endpointName='endpoint'):
|
|
|
|
# TODO: Add interface for namespace bootstrapping from
|
|
# the dispatcher / problem side
|
|
|
|
if profile is not None:
|
|
pupdate = {'profile': profile}
|
|
else:
|
|
pupdate = {}
|
|
|
|
pclient = Client(**pupdate)
|
|
|
|
if not self._cdSame(pclient):
|
|
print('Could not change all workers to the same directory as the client!')
|
|
|
|
dview = pclient[:]
|
|
dview.block = True
|
|
dview.clear()
|
|
|
|
remoteSetup = '''
|
|
import os'''
|
|
|
|
parMPISetup = '''
|
|
from mpi4py import MPI
|
|
comm = MPI.COMM_WORLD
|
|
rank = comm.Get_rank()'''
|
|
|
|
for command in remoteSetup.strip().split('\n'):
|
|
dview.execute(command.strip())
|
|
|
|
dview.scatter('rank', pclient.ids, flatten=True)
|
|
|
|
self.e0 = pclient[0]
|
|
self.e0.block = True
|
|
|
|
self.useMPI = False
|
|
MPI = DEFAULT_MPI if MPI is None else MPI
|
|
if MPI:
|
|
MPISafe = False
|
|
|
|
for var in MPI_BELLWETHERS:
|
|
MPISafe = MPISafe or all(dview['os.getenv("%s")'%(var,)])
|
|
|
|
if MPISafe:
|
|
for command in parMPISetup.strip().split('\n'):
|
|
dview.execute(command.strip())
|
|
ranks = dview['rank']
|
|
reorder = [ranks.index(i) for i in xrange(len(ranks))]
|
|
dview = pclient[reorder]
|
|
dview.block = True
|
|
dview.activate()
|
|
|
|
# Set up necessary parts for broadcast-based communication
|
|
self.e0 = pclient[reorder[0]]
|
|
self.e0.block = True
|
|
self.comm = Reference('comm')
|
|
|
|
self.useMPI = MPISafe
|
|
|
|
self.pclient = pclient
|
|
self.dview = dview
|
|
self.lview = pclient.load_balanced_view()
|
|
|
|
self.nThreads = nThreads
|
|
|
|
if bootstrap is not None:
|
|
for command in bootstrap.strip().split('\n'):
|
|
dview.execute(command.strip())
|
|
|
|
self.endpointName = endpointName
|
|
|
|
@property
|
|
def nThreads(self):
|
|
return self._nThreads
|
|
@nThreads.setter
|
|
def nThreads(self, value):
|
|
self._nThreads = value
|
|
self.dview.apply(self._adjustMKLVectorization, self._nThreads)
|
|
|
|
|
|
def __setitem__(self, key, item):
|
|
|
|
if self.useMPI:
|
|
self.e0[key] = item
|
|
code = 'if rank != 0: %(key)s = None\n%(key)s = comm.bcast(%(key)s, root=0)'
|
|
self.dview.execute(code%{'key': key})
|
|
|
|
else:
|
|
self.dview[key] = item
|
|
|
|
def __getitem__(self, key):
|
|
|
|
if self.useMPI:
|
|
code = 'temp_%(key)s = None\ntemp_%(key)s = comm.gather(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code%{'key': key, 'root': 0})
|
|
item = self.e0['temp_%s'%(key,)]
|
|
self.e0.execute('del temp_%s'%(key,))
|
|
|
|
else:
|
|
item = self.dview[key]
|
|
|
|
return item
|
|
|
|
def reduceLB(self, endpoint, key, after=None):
|
|
|
|
repeat = lambda value: (value for i in xrange(len(self.pclient.ids)))
|
|
|
|
if self.useMPI:
|
|
with self.lview.temp_flags(block=False, after=after):
|
|
job = self.lview.map(self._reduceJob, xrange(len(self.pclient.ids)), repeat(0), repeat(endpoint), repeat(key))
|
|
|
|
return job
|
|
|
|
def reduce(self, key, axis=None):
|
|
|
|
if self.useMPI:
|
|
code = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code%{'key': key, 'root': 0})
|
|
|
|
# if axis is not None:
|
|
# code = 'temp_%(key)s = temp_%(key)s.sum(axis=%(axis)d)'
|
|
# self.e0.execute(code%{'key': key, 'axis': axis})
|
|
|
|
item = self.e0['temp_%s'%(key,)]
|
|
self.dview.execute('del temp_%s'%(key,))
|
|
|
|
else:
|
|
item = reduce(np.add, self.dview[key])
|
|
|
|
return item
|
|
|
|
def reduceMul(self, key1, key2, axis=None):
|
|
|
|
if self.useMPI:
|
|
# Gather
|
|
code_reduce = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code_reduce%{'key': key1, 'root': 0})
|
|
self.dview.execute(code_reduce%{'key': key2, 'root': 0})
|
|
|
|
# Multiply
|
|
code_mul = 'temp_%(key1)s%(key2)s = temp_%(key1)s * temp_%(key2)s'
|
|
self.e0.execute(code_mul%{'key1': key1, 'key2': key2})
|
|
|
|
# Potentially sum
|
|
if axis is not None:
|
|
code = 'temp_%(key1)s%(key2)s = temp_%(key1)s%(key2)s.sum(axis=%(axis)d)'
|
|
self.e0.execute(code%{'key1': key1, 'key2': key2, 'axis': axis})
|
|
|
|
# Pull
|
|
item = self.e0['temp_%(key1)s%(key2)s'%{'key1': key1, 'key2': key2}]
|
|
|
|
# Clear
|
|
self.dview.execute('del temp_%s'%(key1,))
|
|
self.dview.execute('del temp_%s'%(key2,))
|
|
self.e0.execute('del temp_%(key1)s%(key2)s'%{'key1': key1, 'key2': key2})
|
|
|
|
else:
|
|
item1 = reduce(np.add, self.dview[key1])
|
|
item2 = reduce(np.add, self.dview[key2])
|
|
item = item1 * item2
|
|
|
|
return item
|
|
|
|
def remoteMulE0(self, key1, key2, axis=None):
|
|
|
|
code_mul = 'temp_field = %(key1)s * %(key2)s'
|
|
self.e0.execute(code_mul%{'key1': key1, 'key2': key2})
|
|
|
|
if axis is not None:
|
|
code = 'temp_field = temp_field.sum(axis=%(axis)d)'
|
|
self.e0.execute(code%{'axis': axis})
|
|
|
|
item = self.e0['temp_field']
|
|
self.e0.execute('del temp_field')
|
|
|
|
return item
|
|
|
|
def remoteDifference(self, key1, key2, keyresult):
|
|
|
|
if self.useMPI:
|
|
|
|
root = 0
|
|
|
|
# Gather
|
|
code_reduce = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code_reduce%{'key': key1, 'root': root})
|
|
self.dview.execute(code_reduce%{'key': key2, 'root': root})
|
|
|
|
# Difference
|
|
code_difference = '%(keyresult)s = temp_%(key1)s - temp_%(key2)s'
|
|
self.e0.execute(code_difference%{'key1': key1, 'key2': key2, 'keyresult': keyresult})
|
|
|
|
# Broadcast
|
|
code = 'if rank != 0: %(key)s = None\n%(key)s = comm.bcast(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code%{'key': keyresult, 'root': root})
|
|
|
|
# Clear
|
|
self.e0.execute('del temp_%s'%(key1,))
|
|
self.e0.execute('del temp_%s'%(key2,))
|
|
|
|
else:
|
|
item1 = reduce(np.add, self.dview[key1])
|
|
item2 = reduce(np.add, self.dview[key2])
|
|
|
|
item = item1 - item2
|
|
self.dview[keyresult] = item
|
|
|
|
def remoteOpGatherFirst(self, op, key1, key2, keyresult):
|
|
|
|
if self.useMPI:
|
|
|
|
root = 0
|
|
|
|
# Gather
|
|
code_reduce = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code_reduce%{'key': key1, 'root': root})
|
|
|
|
# Difference
|
|
code_difference = '%(keyresult)s = temp_%(key1)s %(op)s %(key2)s'
|
|
self.e0.execute(code_difference%{'op': op, 'key1': key1, 'key2': key2, 'keyresult': keyresult})
|
|
|
|
# Broadcast
|
|
code = 'if rank != 0: %(key)s = None\n%(key)s = comm.bcast(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code%{'key': keyresult, 'root': root})
|
|
|
|
# Clear
|
|
self.e0.execute('del temp_%s'%(key1,))
|
|
|
|
else:
|
|
item1 = reduce(np.add, self.dview[key1])
|
|
item2 = self.e0[key2] # Assumes that any arbitrary worker has this information
|
|
|
|
item = eval('item1 %s item2'%(op,))
|
|
self.dview[keyresult] = item
|
|
|
|
def remoteDifferenceGatherFirst(self, *args):
|
|
self.remoteOpGatherFirst('-', *args)
|
|
|
|
def remoteSrcEstGatherFirst(self, keyresult, key1, key2, individual=False):
|
|
|
|
if self.useMPI:
|
|
|
|
root = 0
|
|
|
|
# # Gather
|
|
# code_reduce = 'temp_%(key)s = comm.reduce(%(key)s, root=%(root)d)'
|
|
# self.dview.execute(code_reduce%{'key': key1, 'root': root})
|
|
|
|
# SrcEst
|
|
if individual:
|
|
code_srcest = '%(keyresult)s = (%(key2)s.conj() * %(key1)s).sum(axis=1) / (%(key1)s.conj() * %(key1)s).sum(axis=1)'
|
|
else:
|
|
code_srcest = '%(keyresult)s = (%(key2)s.conj() * %(key1)s).sum() / (%(key1)s.conj() * %(key1)s).sum()'
|
|
self.e0.execute(code_srcest%{'key1': key1, 'key2': key2, 'keyresult': keyresult})
|
|
|
|
# Broadcast
|
|
code = 'if rank != %(root)d: %(key)s = None\n%(key)s = comm.bcast(%(key)s, root=%(root)d)'
|
|
self.dview.execute(code%{'key': keyresult, 'root': root})
|
|
|
|
else:
|
|
|
|
item1 = reduce(np.add, self.dview[key1])
|
|
item2 = self.e0[key2]
|
|
|
|
if individual:
|
|
item = (item2.conj() * item1).sum(axis=1) / (item1.conj() * item1).sum(axis=1)
|
|
else:
|
|
item = (item2.conj() * item1).sum() / (item1.conj() * item1).sum()
|
|
|
|
self.dview[keyresult] = item
|
|
|
|
def remoteApplySrc(self, keyData, keySrc):
|
|
|
|
code = '%(keyData)s = %(keySrc)s * %(keyData)s'
|
|
self.dview.execute(code%{'keyData': keyData, 'keySrc': keySrc})
|
|
|
|
# def normFromDifference(self, key):
|
|
|
|
# code = 'temp_norm%(key)s = (%(key)s * %(key)s.conj()).sum(0).sum(0)'
|
|
# self.e0.execute(code%{'key': key})
|
|
# code = 'temp_norm%(key)s = {key: np.sqrt(temp_norm%(key)s[key]).real for key in temp_norm%(key)s.keys()}'
|
|
# self.e0.execute(code%{'key': key})
|
|
# result = CommonReducer(self.e0['temp_norm%s'%(key,)])
|
|
# self.e0.execute('del temp_norm%s'%(key,))
|
|
|
|
# return result
|
|
|
|
def normFromDifference(self, key):
|
|
|
|
code = 'temp_norm = (%(key)s * %(key)s.conj()).sum(0).sum(0)'
|
|
self.e0.execute(code%{'key': key})
|
|
code = 'temp_norm = {key: np.sqrt(temp_norm[key]).real for key in temp_norm}'
|
|
self.e0.execute(code%{'key': key})
|
|
result = CommonReducer(self.e0['temp_norm'])
|
|
self.e0.execute('del temp_norm')
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
@interactive
|
|
def _reduceJob(worker, root, endpoint, key):
|
|
|
|
from ipyparallel.error import UnmetDependency
|
|
if not rank == worker:
|
|
raise UnmetDependency
|
|
|
|
# code = '%(endpoint)s.globalFields["%(key)s"] = comm.reduce(%(endpoint)s.localFields["%(key)s"], root=%(root)d)'
|
|
# exec(code%{'endpoint': endpoint, 'key': key, 'root': root})
|
|
|
|
if key not in endpoint.localFields:
|
|
endpoint.localFields[key] = endpoint.fieldspec[key]()
|
|
|
|
endpoint.globalFields[key] = comm.reduce(endpoint.localFields[key], root=root)
|
|
|
|
@staticmethod
|
|
def _adjustMKLVectorization(nt=1):
|
|
try:
|
|
import mkl
|
|
mkl.set_num_threads(nt)
|
|
except ImportError:
|
|
pass
|
|
|
|
@staticmethod
|
|
def _cdSame(rc):
|
|
import os
|
|
|
|
dview = rc[:]
|
|
|
|
home = os.getenv('HOME')
|
|
cwd = os.getcwd()
|
|
|
|
@interactive
|
|
def cdrel(relpath):
|
|
import os
|
|
home = os.getenv('HOME')
|
|
fullpath = os.path.join(home, relpath)
|
|
try:
|
|
os.chdir(fullpath)
|
|
except OSError:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
if cwd.find(home) == 0:
|
|
relpath = cwd[len(home)+1:]
|
|
return all(rc[:].apply_sync(cdrel, relpath))
|