# Copyright (c) 2004 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2001-2004 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Narval's Communication Service implementation :version: $Revision:$ :author: Logilab :copyright: 2001-2004 LOGILAB S.A. (Paris, FRANCE) 2004 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = "$Id: ComService.py,v 1.18 2001/11/26 11:05:33 syt Exp $" __docformat__ = "restructuredtext en" import sys import Queue from narval.services.BaseService import BaseService from narval.communication import RPCFactory class ComService(BaseService): """Communication Service for narval (observer / observed pattern) the communication service register some listeners for different category of messages, and notify each listener in a category when some event occurs in this category :type listeners: dict :ivar listeners: dictionary of registered listeners, indexed by category :type queue: Queue.Queue :ivar queue: pending events queue """ def __init__(self, name='ComService') : BaseService.__init__(self, name) self.listeners = {'plan': [], 'memory': [], 'engine': []} self.queue = Queue.Queue(100) def stop(self) : """stop the communication service""" self.loop = 0 self.add_event( None, None, None ) # thread blocks on self.queue.get() self.thread.join() log(LOG_NOTICE, 'service %s stopped', self.name) def add_event(self, category, method, args): """put an event on queue :type category: str :param category: the category of the event (one of 'plan', 'memory', 'engine') :type method: str :param method: method name of the event :type args: list :param args: arguments given to the event's method """ assert category in (None, 'plan', 'memory', 'engine'), category self.queue.put( (category, method, args) ) def _run(self) : """infinite events processing loop""" while self.loop : target, method, args = self.queue.get() if target and method : self._fire_event(target, method, args) def _fire_event(self, category, method, args): """fire an event by notifying every listener about it :type category: str :param category: the category of the event (one of 'plan', 'memory', 'engine') :type method: str :param method: method name of the event :type args: list :param args: arguments given to the event's method """ for listener in self.listeners[category]: try: getattr(listener, method)(*args) except Exception, ex: log(LOG_ERR, 'error calling %s(%s) on listener %s : %s', (method, args, listener, ex)) log_traceback(LOG_ERR, sys.exc_info()) # try to remove buggy remote listeners try: uri = listener.__dict__['_narval_uri_'] except KeyError: pass except: log_traceback(LOG_ERR, sys.exc_info()) else: self.remove_remote_listener(category, uri) # listeners (un)registration ############################################### def add_listener(self, category, listener): """add a listener for a given messages'category :type category: str :param category: the category of messages the listener is registering to (one of 'plan', 'memory', 'engine') :type listener: narval.interfaces.I*Listener :param listener: the listener to add """ assert category in ('plan', 'memory', 'engine') self.listeners[category].append(listener) def remove_listener(self, category, listener): """remove a listener from a given messages'category :type category: str :param category: the category of messages the listener is unregistering from (one of 'plan', 'memory', 'engine') :type listener: narval.interfaces.I*Listener :param listener: the listener to remove """ assert category in ('plan', 'memory', 'engine') self.listeners[category].remove(listener) def add_remote_listener(self, category, uri): """add a remote listener for a given messages'category :type category: str :param category: the category of messages the remote listener is registering to (one of 'plan', 'memory', 'engine') :type uri: str :param uri: the uri of the remote listener to add """ remote_cat = 'remote_%s' % category if not self.listeners.has_key((remote_cat, uri)): listener = RPCFactory.create_proxy(uri) listener.__dict__['_narval_uri_'] = uri self.listeners[(remote_cat, uri)] = listener self.add_listener(category, listener) if category == 'plan': listener.fire_transition(0, 0) def remove_remote_listener(self, category, uri): """remove a remote listener from a given messages'category :type category: str :param category: the category of messages the remote listener is unregistering from (one of 'plan', 'memory', 'engine') :type uri: str :param uri: the uri of the remote listener to remove""" remote_cat = 'remote_%s' % category listener = self.listeners[(remote_cat, uri)] self.remove_listener(category, listener) del self.listeners[(remote_cat, uri)]