| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

Parallel.py

Go to the documentation of this file.
00001 # File: GaudiPython/Parallel.py
00002 # Author: Pere Mato (pere.mato@cern.ch)
00003 
00004 """ GaudiPython.Parallel module.
00005     This module provides 'parallel' processing support for GaudiPyhton.
00006     It is adding some sugar on top of public domain packages such as
00007     the 'processing' or the 'pp' packages. The interface can be made
00008     independent of the underlying implementation package.
00009     Two main class are defined: Task and WorkManager
00010 """
00011 
00012 __all__ = [ 'Task','WorkManager' ]
00013 excluded_varnames = ['HOSTNAME', 'SSH_CLIENT', 'SSH_CONNECTION', 'DISPLAY']
00014 
00015 import sys, os, time, copy
00016 
00017 def _prefunction( f, task, item) :
00018   return f((task,item))
00019 def _ppfunction( args ) :
00020   #--- Unpack arguments
00021   task, item = args
00022   stat = Statistics()
00023   #--- Initialize the remote side (at least once)
00024   if not task.__class__._initializeDone :
00025     for k,v in task.environ.items() :
00026       if k not in excluded_varnames : os.environ[k] = v
00027     task.initializeRemote()
00028     task.__class__._initializeDone = True
00029   #--- Reset the task output
00030   task._resetOutput()
00031   #--- Call processing
00032   task.process(item)
00033   #--- Collect statistics
00034   stat.stop()
00035   return (copy.deepcopy(task.output), stat)
00036     
00037 def _detect_ncpus():
00038   """Detects the number of effective CPUs in the system"""
00039   #for Linux, Unix and MacOS
00040   if hasattr(os, "sysconf"):
00041     if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): 
00042       #Linux and Unix
00043       ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
00044       if isinstance(ncpus, int) and ncpus > 0:
00045         return ncpus
00046       else: 
00047         #MacOS X
00048         return int(os.popen2("sysctl -n hw.ncpu")[1].read())
00049   #for Windows
00050   if os.environ.has_key("NUMBER_OF_PROCESSORS"):
00051     ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]);
00052     if ncpus > 0:
00053       return ncpus
00054   #return the default value
00055   return 1
00056 
00057 class Statistics(object):
00058   def __init__(self):
00059     self.name  = os.getenv('HOSTNAME')
00060     self.start = time.time()
00061     self.time  = 0.0
00062     self.njob  = 0
00063   def stop(self):
00064     self.time = time.time() - self.start
00065 
00066 class Task(object) :
00067   """ Basic base class to encapsulate any processing that is going to be porcessed in parallel.
00068       User class much inherit from it and implement the methods initializeLocal,
00069       initializeRemote, process and finalize.   """  
00070   _initializeDone = False
00071   def __new__ ( cls, *args, **kwargs ):
00072     task = object.__new__( cls, *args, **kwargs )
00073     task.output = ()
00074     task.environ = {}
00075     for k,v in os.environ.items(): task.environ[k] = v
00076     task.cwd = os.getcwd()
00077     return task
00078   def initializeLocal(self):
00079     pass
00080   def initializeRemote(self):
00081     pass
00082   def process(self, item):
00083     pass
00084   def finalize(self) :
00085     pass
00086   def _mergeResults(self, result) :
00087    if type(result) is not type(self.output) :
00088      raise TypeError("output type is not same as obtained result")
00089    #--No iteratable---
00090    if not hasattr( result , '__iter__' ):
00091      if hasattr(self.output,'Add') : self.output.Add(result)
00092      elif hasattr(self.output,'__iadd__') : self.output += result
00093      elif hasattr(self.output,'__add__') : self.output = self.output + result
00094      else : raise TypeError('result cannot be added')
00095    #--Dictionary---
00096    elif type(result) is dict :
00097      if self.output.keys() <= result.keys(): minkeys = self.output.keys()
00098      else: minkeys = result.keys()
00099      for key in result.keys() :
00100        if key in self.output :
00101          if hasattr(self.output[key],'Add') : self.output[key].Add(result[key])
00102          elif hasattr(self.output[key],'__iadd__') : self.output[key] += result[key]
00103          elif hasattr(self.output[key],'__add__') : self.output[key] = self.output[key] + result[key]
00104          else : raise TypeError('result cannot be added')
00105        else :
00106          self.output[key] = result[key]
00107    #--Anything else (list)
00108    else :
00109      for i in range( min( len(self.output) , len(result)) ):
00110        if hasattr(self.output[i],'Add') : self.output[i].Add(result[i])
00111        elif hasattr(self.output[i],'__iadd__') : self.output[i] += result[i]
00112        elif hasattr(self.output[i],'__add__') : self.output[i] = self.output[i] + result[i]
00113        else : raise TypeError('result cannot be added')
00114   def _resetOutput(self):
00115     output =  (type(self.output) is dict) and self.output.values() or self.output
00116     for o in output :
00117       if hasattr(o, 'Reset'): o.Reset()
00118 
00119 
00120 class WorkManager(object) :
00121   """ Class to in charge of managing the tasks and distributing them to
00122       the workers. They can be local (using other cores) or remote
00123       using other nodes in the local cluster """
00124 
00125   def __init__( self, ncpus='autodetect', ppservers=None) :
00126     if ncpus == 'autodetect' : self.ncpus = _detect_ncpus()
00127     else :                     self.ncpus = ncpus
00128     if ppservers :
00129       import pp
00130       self.ppservers = ppservers
00131       self.sessions = [ SshSession(srv) for srv in ppservers ]
00132       self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers)
00133       self.mode = 'cluster'
00134     else :
00135       import processing
00136       self.pool = processing.Pool(self.ncpus)
00137       self.mode = 'multicore'
00138     self.stats = {}
00139 
00140   def __del__(self):
00141     if hasattr(self,'server') : self.server.destroy()
00142     
00143   def process(self, task, items, timeout=90000):
00144     if not isinstance(task,Task) :
00145       raise TypeError("task argument needs to be an 'Task' instance")
00146     # --- Call the Local initialialization
00147     task.initializeLocal()
00148     # --- Schedule all the jobs ....
00149     if self.mode == 'cluster' :
00150       jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiPython.Parallel','time')) for item in items]
00151       for job in jobs :
00152         result, stat = job()
00153         task._mergeResults(result)
00154         self._mergeStatistics(stat)
00155       self._printStatistics()
00156       self.server.print_stats()
00157     elif self.mode == 'multicore' :
00158       start = time.time() 
00159       jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items ))  
00160       for result, stat in  jobs.get(timeout) : 
00161         task._mergeResults(result)
00162         self._mergeStatistics(stat)
00163       end = time.time()
00164       self._printStatistics()
00165       print 'Time elapsed since server creation %f' %(end-start)
00166     # --- Call te Local Finalize
00167     task.finalize()
00168   def _printStatistics(self):
00169     njobs = 0
00170     for stat in self.stats.values():
00171       njobs += stat.njob
00172     print 'Job execution statistics:'
00173     print 'job count | % of all jobs | job time sum | time per job | job server'
00174     for name, stat  in self.stats.items():
00175       print '       %d |        %6.2f |     %8.3f |    %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
00176 
00177   def _mergeStatistics(self, stat):
00178     if stat.name not in self.stats : self.stats[stat.name] = Statistics()
00179     s = self.stats[stat.name]
00180     s.time += stat.time
00181     s.njob += 1
00182     
00183 
00184 class SshSession(object) :
00185   def __init__(self, hostname):
00186     import pyssh
00187     import pp
00188     self.host = hostname
00189     ppprefix =  os.path.dirname(os.path.dirname(pp.__file__))
00190     self.session = pyssh.Ssh(host=hostname)
00191     self.session.open()
00192     self.session.read_lazy()
00193     self.session.write('cd %s\n' % os.getcwd())
00194     self.session.read_lazy()
00195     self.session.write('setenv PYTHONPATH %s\n' % os.environ['PYTHONPATH'])
00196     self.session.read_lazy()
00197     self.session.write('setenv LD_LIBRARY_PATH %s\n' % os.environ['LD_LIBRARY_PATH'])
00198     self.session.read_lazy()
00199     self.session.write('setenv ROOTSYS %s\n' % os.environ['ROOTSYS'])
00200     self.session.read_lazy()
00201     self.session.write('%s %s/scripts-%s/ppserver.py \n'%(sys.executable, ppprefix, sys.version.split()[0] ))
00202     self.session.read_lazy()
00203     self.session.read_lazy()
00204     print 'started ppserver in ', hostname
00205   def __del__(self):
00206     self.session.close()
00207     print 'killed ppserver in ', self.host
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Mon Apr 11 19:58:56 2011 for GaudiPython by doxygen 1.4.7