00001
00002 '''
00003 Provide tools for running code asynchronously.
00004 '''
00005
00006 def async_caller(obj, inbox, outbox):
00007 '''
00008
00009 Accept messages, convert to method calls on given object, pass
00010 return value as message.
00011
00012 '''
00013 while True:
00014 msg = inbox.get(block=True)
00015 methname, methnumber, args, kwds = msg
00016 if methname == 'async_caller_exit':
00017 break
00018 meth = eval("obj.%s"%methname)
00019 ret = meth(*args, **kwds)
00020 outbox.put((methnumber, ret))
00021 continue
00022 return
00023
00024 class AsyncMethod(object):
00025 '''
00026
00027 A callable that stands in for a method.
00028
00029 '''
00030
00031 def __init__(self, name, number, outbox):
00032 self.name = name
00033 self.number = number
00034 self.outbox = outbox
00035 self.callback = None
00036
00037 return
00038
00039 def __call__(self, callback, *args, **kwds):
00040 if self.callback:
00041 msg = 'This instance of the %s method has already been caled'
00042 raise AttributeError, msg % self.name
00043 self.callback = callback
00044 self.outbox.put((self.name, self.number, args, kwds))
00045 return
00046
00047
00048 class AsyncInterface(object):
00049 '''
00050
00051 Asynchronous interface to some other object (called "real" below).
00052
00053 Any method calls on this object, other than the ones explicitly
00054 defined, must have as their first argument a callable object
00055 ("callback"). The method name and remaining arguments will be
00056 queued to another object running in a different process where it
00057 will be interpted and result in calling the method on the real
00058 object. The result will be queued back to this interface object
00059 where the callback will be called with the result as its argument.
00060
00061 '''
00062
00063 def __init__(self, obj, proctype=None):
00064 if not obj:
00065 raise ValueError,'Must give a valid object'
00066 self.__dict__['_obj'] = obj
00067 self.__dict__['_count'] = 0
00068 self.__dict__['_callbacks'] = {}
00069
00070 if proctype:
00071 meth = eval ("self.do_%s"%proctype)
00072 meth()
00073 return
00074 else:
00075 try:
00076 import multiprocessing
00077 except ImportError:
00078 self.do_threaded()
00079 else:
00080 self.do_multiprocessing()
00081 pass
00082 pass
00083 return
00084
00085 def do_multiprocessing(self):
00086 from multiprocessing import Queue, Process
00087 self.__dict__['_proctype'] = 'multiprocessing'
00088 self.__dict__['_outbox'] = outbox = Queue()
00089 self.__dict__['_inbox'] = inbox = Queue()
00090 self.__dict__['_proc'] = proc = Process(target=async_caller,
00091 args=(self._obj, outbox, inbox))
00092 proc.start()
00093 return
00094
00095 def do_threaded(self):
00096 from Queue import Queue
00097 from threading import Thread
00098 self.__dict__['_proctype'] = 'threading'
00099 self.__dict__['_outbox'] = outbox = Queue()
00100 self.__dict__['_inbox'] = inbox = Queue()
00101 self.__dict__['_proc'] = proc = Thread(target=async_caller,
00102 args=(self._obj, outbox, inbox))
00103 proc.start()
00104 return
00105
00106 def __getattr__(self, name):
00107 if not self._obj:
00108 raise AttributeError,'No object'
00109 if not hasattr(self._obj, name):
00110 msg = 'No such attribute: "%s" in "%s"'
00111 raise AttributeError, msg % (name, self._obj)
00112 self._count += 1
00113 meth = AsyncMethod(name, self._count, self._outbox)
00114 self._callbacks[self._count] = meth
00115 return meth
00116
00117 def __del__(self):
00118 self.shutdown()
00119 return
00120
00121 def shutdown(self):
00122 'Gently shutdown the real object and drain the queue'
00123 self._outbox.put(('async_caller_exit', None, None, None))
00124 self.drain_queue()
00125 self._proc.join()
00126 return
00127
00128 def abort(self):
00129 'Immediately shutdown'
00130 if self._proctype == 'multiproces':
00131 self._proc.terminate()
00132 import sys
00133 sys.exit(0)
00134 return
00135
00136 def drain_queue(self, depth = None):
00137 'Drain queue of at most depth pending results. All if depth=None'
00138 while depth is None or depth > 0:
00139 if depth is not None: depth -= 1
00140 if self._inbox.empty():
00141
00142 return
00143 cbnum, res = self._inbox.get()
00144 meth = self._callbacks[cbnum]
00145
00146 meth.callback(res)
00147 del self._callbacks[cbnum]
00148 continue
00149 return
00150