Public Member Functions | |
def | __init__ |
def | __del__ |
def | process |
Public Attributes | |
ncpus | |
ppservers | |
sessions | |
server | |
mode | |
pool | |
stats | |
Private Member Functions | |
def | _printStatistics |
def | _mergeStatistics |
Class to in charge of managing the tasks and distributing them to the workers. They can be local (using other cores) or remote using other nodes in the local cluster
Definition at line 120 of file Parallel.py.
def GaudiPython::Parallel::WorkManager::__init__ | ( | self, | ||
ncpus = 'autodetect' , |
||||
ppservers = None | ||||
) |
Definition at line 123 of file Parallel.py.
00125 : 00126 if ncpus == 'autodetect' : self.ncpus = _detect_ncpus() 00127 else : self.ncpus = ncpus 00128 if ppservers : 00129 import pp 00130 self.ppservers = ppservers 00131 self.sessions = [ SshSession(srv) for srv in ppservers ] 00132 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers) 00133 self.mode = 'cluster' 00134 else : 00135 import processing 00136 self.pool = processing.Pool(self.ncpus) 00137 self.mode = 'multicore' self.stats = {}
def GaudiPython::Parallel::WorkManager::__del__ | ( | self | ) |
def GaudiPython::Parallel::WorkManager::process | ( | self, | ||
task, | ||||
items, | ||||
timeout = 90000 | ||||
) |
Definition at line 141 of file Parallel.py.
00141 : self.server.destroy() 00142 00143 def process(self, task, items, timeout=90000): 00144 if not isinstance(task,Task) : 00145 raise TypeError("task argument needs to be an 'Task' instance") 00146 # --- Call the Local initialialization 00147 task.initializeLocal() 00148 # --- Schedule all the jobs .... 00149 if self.mode == 'cluster' : 00150 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items] 00151 for job in jobs : 00152 result, stat = job() 00153 task._mergeResults(result) 00154 self._mergeStatistics(stat) 00155 self._printStatistics() 00156 self.server.print_stats() 00157 elif self.mode == 'multicore' : 00158 start = time.time() 00159 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items )) 00160 for result, stat in jobs.get(timeout) : 00161 task._mergeResults(result) 00162 self._mergeStatistics(stat) 00163 end = time.time() 00164 self._printStatistics() 00165 print 'Time elapsed since server creation %f' %(end-start)
def GaudiPython::Parallel::WorkManager::_printStatistics | ( | self | ) | [private] |
Definition at line 166 of file Parallel.py.
00168 : 00169 njobs = 0 00170 for stat in self.stats.values(): 00171 njobs += stat.njob 00172 print 'Job execution statistics:' 00173 print 'job count | % of all jobs | job time sum | time per job | job server' 00174 for name, stat in self.stats.items(): print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
def GaudiPython::Parallel::WorkManager::_mergeStatistics | ( | self, | ||
stat | ||||
) | [private] |
Definition at line 175 of file Parallel.py.
00177 : 00178 if stat.name not in self.stats : self.stats[stat.name] = Statistics() 00179 s = self.stats[stat.name] 00180 s.time += stat.time 00181 s.njob += 1
Definition at line 124 of file Parallel.py.
Definition at line 128 of file Parallel.py.
Definition at line 129 of file Parallel.py.
Definition at line 130 of file Parallel.py.
Definition at line 131 of file Parallel.py.
Definition at line 134 of file Parallel.py.
Definition at line 136 of file Parallel.py.