00001
00002
00003
00004 """ GaudiPython.Parallel module.
00005 This module provides 'parallel' processing support for GaudiPyhton.
00006 It is adding some sugar on top of public domain packages such as
00007 the 'processing' or the 'pp' packages. The interface can be made
00008 independent of the underlying implementation package.
00009 Two main class are defined: Task and WorkManager
00010 """
00011
00012 __all__ = [ 'Task','WorkManager' ]
00013 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
00014
00015 import sys, os, time, copy
00016
00017 def _prefunction( f, task, item) :
00018 return f((task,item))
00019 def _ppfunction( args ) :
00020
00021 task, item = args
00022 stat = Statistics()
00023
00024 if not task.__class__._initializeDone :
00025 for k,v in task.environ.items() :
00026 if k not in excluded_varnames : os.environ[k] = v
00027 task.initializeRemote()
00028 task.__class__._initializeDone = True
00029
00030 task._resetOutput()
00031
00032 task.process(item)
00033
00034 stat.stop()
00035 return (copy.deepcopy(task.output), stat)
00036
00037 def _detect_ncpus():
00038 """Detects the number of effective CPUs in the system"""
00039
00040 if hasattr(os, "sysconf"):
00041 if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"):
00042
00043 ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
00044 if isinstance(ncpus, int) and ncpus > 0:
00045 return ncpus
00046 else:
00047
00048 return int(os.popen2("sysctl -n hw.ncpu")[1].read())
00049
00050 if os.environ.has_key("NUMBER_OF_PROCESSORS"):
00051 ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]);
00052 if ncpus > 0:
00053 return ncpus
00054
00055 return 1
00056
00057 class Statistics(object):
00058 def __init__(self):
00059 self.name = os.getenv('HOSTNAME')
00060 self.start = time.time()
00061 self.time = 0.0
00062 self.njob = 0
00063 def stop(self):
00064 self.time = time.time() - self.start
00065
00066 class Task(object) :
00067 """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
00068 User class much inherit from it and implement the methods initializeLocal,
00069 initializeRemote, process and finalize. """
00070 _initializeDone = False
00071 def __new__ ( cls, *args, **kwargs ):
00072 task = object.__new__( cls, *args, **kwargs )
00073 task.output = ()
00074 task.environ = {}
00075 for k,v in os.environ.items(): task.environ[k] = v
00076 task.cwd = os.getcwd()
00077 return task
00078 def initializeLocal(self):
00079 pass
00080 def initializeRemote(self):
00081 pass
00082 def process(self, item):
00083 pass
00084 def finalize(self) :
00085 pass
00086 def _mergeResults(self, result) :
00087 if type(result) is not type(self.output) :
00088 raise TypeError("output type is not same as obtained result")
00089
00090 if not hasattr( result , '__iter__' ):
00091 if hasattr(self.output,'Add') : self.output.Add(result)
00092 elif hasattr(self.output,'__iadd__') : self.output += result
00093 elif hasattr(self.output,'__add__') : self.output = self.output + result
00094 else : raise TypeError('result cannot be added')
00095
00096 elif type(result) is dict :
00097 if self.output.keys() <= result.keys(): minkeys = self.output.keys()
00098 else: minkeys = result.keys()
00099 for key in result.keys() :
00100 if key in self.output :
00101 if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
00102 elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
00103 elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
00104 else : raise TypeError('result cannot be added')
00105 else :
00106 self.output[key] = result[key]
00107
00108 else :
00109 for i in range( min( len(self.output) , len(result)) ):
00110 if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
00111 elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
00112 elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
00113 else : raise TypeError('result cannot be added')
00114 def _resetOutput(self):
00115 output = (type(self.output) is dict) and self.output.values() or self.output
00116 for o in output :
00117 if hasattr(o, 'Reset'): o.Reset()
00118
00119
00120 class WorkManager(object) :
00121 """ Class to in charge of managing the tasks and distributing them to
00122 the workers. They can be local (using other cores) or remote
00123 using other nodes in the local cluster """
00124
00125 def __init__( self, ncpus='autodetect', ppservers=None) :
00126 if ncpus == 'autodetect' : self.ncpus = _detect_ncpus()
00127 else : self.ncpus = ncpus
00128 if ppservers :
00129 import pp
00130 self.ppservers = ppservers
00131 self.sessions = [ SshSession(srv) for srv in ppservers ]
00132 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
00133 self.mode = 'cluster'
00134 else :
00135 import processing
00136 self.pool = processing.Pool(self.ncpus)
00137 self.mode = 'multicore'
00138 self.stats = {}
00139
00140 def __del__(self):
00141 if hasattr(self,'server') : self.server.destroy()
00142
00143 def process(self, task, items, timeout=90000):
00144 if not isinstance(task,Task) :
00145 raise TypeError("task argument needs to be an 'Task' instance")
00146
00147 task.initializeLocal()
00148
00149 if self.mode == 'cluster' :
00150 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items]
00151 for job in jobs :
00152 result, stat = job()
00153 task._mergeResults(result)
00154 self._mergeStatistics(stat)
00155 self._printStatistics()
00156 self.server.print_stats()
00157 elif self.mode == 'multicore' :
00158 start = time.time()
00159 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))
00160 for result, stat in jobs.get(timeout) :
00161 task._mergeResults(result)
00162 self._mergeStatistics(stat)
00163 end = time.time()
00164 self._printStatistics()
00165 print 'Time elapsed since server creation %f' %(end-start)
00166
00167 task.finalize()
00168 def _printStatistics(self):
00169 njobs = 0
00170 for stat in self.stats.values():
00171 njobs += stat.njob
00172 print 'Job execution statistics:'
00173 print 'job count | % of all jobs | job time sum | time per job | job server'
00174 for name, stat in self.stats.items():
00175 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
00176
00177 def _mergeStatistics(self, stat):
00178 if stat.name not in self.stats : self.stats[stat.name] = Statistics()
00179 s = self.stats[stat.name]
00180 s.time += stat.time
00181 s.njob += 1
00182
00183
00184 class SshSession(object) :
00185 def __init__(self, hostname):
00186 import pyssh
00187 import pp
00188 self.host = hostname
00189 ppprefix = os.path.dirname(os.path.dirname(pp.__file__))
00190 self.session = pyssh.Ssh(host=hostname)
00191 self.session.open()
00192 self.session.read_lazy()
00193 self.session.write('cd %s\n' % os.getcwd())
00194 self.session.read_lazy()
00195 self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
00196 self.session.read_lazy()
00197 self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
00198 self.session.read_lazy()
00199 self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
00200 self.session.read_lazy()
00201 self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
00202 self.session.read_lazy()
00203 self.session.read_lazy()
00204 print 'started ppserver in ', hostname
00205 def __del__(self):
00206 self.session.close()
00207 print 'killed ppserver in ', self.host