From 0cf783d993ab9464e8288feb63ea10d0bb1e2466 Mon Sep 17 00:00:00 2001 From: Brendan Smithyman Date: Mon, 29 Jun 2015 14:30:12 -0400 Subject: [PATCH] Improvements to scheduler (now use slices). Change name "dispatcher" to "problem". --- SimPEG/Parallel.py | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/SimPEG/Parallel.py b/SimPEG/Parallel.py index 3a6afc6b..a09547b6 100644 --- a/SimPEG/Parallel.py +++ b/SimPEG/Parallel.py @@ -144,9 +144,9 @@ except NameError: class SystemSolver(object): - def __init__(self, dispatcher, endpointName, schedule): + def __init__(self, problem, endpointName, schedule): - self.dispatcher = dispatcher + self.problem = problem self.endpointName = endpointName self.schedule = schedule @@ -158,10 +158,10 @@ class SystemSolver(object): clearRef = Reference(fnformat%(self.endpointName, self.schedule[entry]['clear'])) reduceLabels = self.schedule[entry]['reduce'] - dview = self.dispatcher.remote.dview - lview = self.dispatcher.remote.lview + dview = self.problem.remote.dview + lview = self.problem.remote.lview - chunksPerWorker = getattr(self.dispatcher, 'chunksPerWorker', 1) + chunksPerWorker = getattr(self.problem, 'chunksPerWorker', 1) G = SystemGraph() @@ -170,19 +170,12 @@ class SystemSolver(object): # Parse sources # TODO: Get from Survey somehow? - nsrc = self.dispatcher.nsrc + nsrc = self.problem.nsrc if isrcs is None: - isrcslist = range(nsrc) + isrcs = slice(None) - elif isinstance(isrcs, slice): - isrcslist = range(isrcs.start or 0, isrcs.stop or nsrc, isrcs.step or 1) - - else: - try: - _ = isrcs[0] - isrcslist = isrcs - except TypeError: - isrcslist = [isrcs] + elif not isinstance(isrcs, slice): + raise Exception('Scheduler must run over slice or None!') # TODO: Replace w/ hook into Endpoint classes systemsOnWorkers = dview['%s.localProblems.keys()'%self.endpointName] @@ -215,7 +208,7 @@ class SystemSolver(object): with lview.temp_flags(block=False): iworks = 0 - for work in self._getChunks(isrcslist, int(round(chunksPerWorker*len(relIDs)))): + for work in self._subSlice(isrcs, int(round(chunksPerWorker*len(relIDs)))): if work: job = lview.apply(fnRef, Reference(self.endpointName), tag, work) systemJobs.append(job) @@ -225,7 +218,7 @@ class SystemSolver(object): G.add_edge(tagNode, label) iworks += 1 - if getattr(self.dispatcher, 'ensembleClear', False): # True for ensemble ending, False for individual ending + if getattr(self.problem, 'ensembleClear', False): # True for ensemble ending, False for individual ending tagNode = 'Wrap: %d, %d'%tag for label in systemNodes: G.add_edge(label, tagNode) @@ -264,7 +257,7 @@ class SystemSolver(object): jobs = [] after = clearJobs for label in reduceLabels: - job = self.dispatcher.remote.reduceLB(Reference(self.endpointName), label, after=after) + job = self.problem.remote.reduceLB(Reference(self.endpointName), label, after=after) after = job if job is not None: jobs.append(job) @@ -275,7 +268,7 @@ class SystemSolver(object): return G def wait(self, G): - self.dispatcher.remote.lview.wait(G.node['End']['jobs'] if G.node['End']['jobs'] else (G.node[wn]['jobs'] for wn in (G.predecessors(tn)[0] for tn in G.predecessors('End')))) + self.problem.remote.lview.wait(G.node['End']['jobs'] if G.node['End']['jobs'] else (G.node[wn]['jobs'] for wn in (G.predecessors(tn)[0] for tn in G.predecessors('End')))) # TODO: Hopefully obsoleted by SuperReference @staticmethod @@ -290,6 +283,13 @@ class SystemSolver(object): nproblems = len(problems) return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks)) + @staticmethod + def _subSlice(insl, chunks=1): + start = insl.start or 0 + nproblems = insl.stop - start + return [slice(start + i*nproblems/chunks, start + (i+1)*nproblems/chunks) for i in xrange(chunks)] + + class RemoteInterface(object): def __init__(self, profile=None, MPI=None, nThreads=1, bootstrap=None): @@ -543,7 +543,8 @@ class RemoteInterface(object): code = 'temp_norm = (%(key)s * %(key)s.conj()).sum(0).sum(0)' self.e0.execute(code%{'key': key}) - code = 'temp_norm = {key: np.sqrt(temp_norm[key] for key in temp_norm)}' + code = 'temp_norm = {key: np.sqrt(temp_norm[key]).real for key in temp_norm}' + self.e0.execute(code%{'key': key}) result = CommonReducer(self.e0['temp_norm']) self.e0.execute('del temp_norm')