import Pyro.core import Pyro.naming import Pyro.errors import itertools import sys,os,time import sets Pyro.config.PYRO_MOBILE_CODE=1 # Enable mobile code (for the tasks) # # The task dispatcher. # It receives a single computation Task, searches in the Pyro Namespace # for available cell Processors, and divides and distributes the Task # among the processors. # It lets them run (in parallel!) and gathers the result from each. # As soon as one found the 'solution' to the Task, the others are # signalled to abort their computation, and the result is returned # to the client program. # # NOTE2: notice that this module doesn't import or use # the actual task code implementation. It relies on Pyro's # mobile code feature to obtain the actual task data! # (which it then sends to the cell processor(s) to run). # class Dispatcher(Pyro.core.ObjBase): def __init__(self, nameServer): Pyro.config.PYRO_NS_DEFAULTGROUP = ":Distributed.Cells" Pyro.core.ObjBase.__init__(self) self.NS=nameServer # make sure that the Event Server is active: self.NS.resolve(Pyro.constants.EVENTSERVER_NAME) def process(self,task): print "received task:",task try: cells=self.NS.list(None) # search the namespace for processors print "cells:",cells cells = [ self.NS.resolve(name).getAttrProxy() for name,otype in cells ] if cells: print "splitting task" taskparts = task.split(len(cells)) print "-->",len(taskparts),"tasks" celliter = itertools.cycle(cells) # just loop over all processors print "dispatching" busy=sets.Set() # While there are still subtasks to compute, and the # solution has not been found, dispatch the next subtask # to an available cell. while taskparts and not task.getResult(): cell = celliter.next() if cell in busy: time.sleep(0.2) if cell.finished: # okay... a cell has become available again. # process its result and remove it from the busy-set task.join(cell.task) busy.remove(cell) else: continue # found an idle cell, give it a task to do busy.add(cell) subtask = taskparts.pop() # NOTE: we first submit the task to the cell using a normal # Pyro invocation. This is needed because of the mobile code feature: # the server cannot request code over a oneway invocation. # Only after the server has received the computation task, # we invoke a oneway method to actually start the cell processor. # Because is is oneway, we can continue, while the cell is running. cell._setOneway("process") cell.receivetask(subtask) cell.process() print "dispatched %s, %d remain" % (str(subtask), len(taskparts)) print "Task fully dispatched!" print "Gathering results..." # wait for and retreive the result from each cell while busy and not task.getResult(): for cell in sets.Set(busy): if cell.finished: print "got result, %d remain" % len(busy) busy.remove(cell) task.join(cell.task) if task.getResult(): break time.sleep(0.2) print "all done for task",task if task.getResult() and busy: print "There are still %d running tasks" % len(busy) for cell in busy: cell._setOneway("abort") # so that we can continue cell.abort() print "Returning final result!" return task.getResult() else: print "no cells in ns" raise RuntimeError("no computing cells found") except Pyro.errors.NamingError,x: print "No cells registered on the network",x raise RuntimeError("no computing cells found") Pyro.core.initServer() ns=Pyro.naming.NameServerLocator().getNS() try: ns.createGroup(":Distributed") except Pyro.errors.NamingError: pass try: ns.unregister(":Distributed.Dispatcher") except Pyro.errors.NamingError: pass daemon=Pyro.core.Daemon() daemon.useNameServer(ns) uri=daemon.connect(Dispatcher(ns),":Distributed.Dispatcher") print "Dispatcher ready." daemon.requestLoop()