# Copyright (c) 2004 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2001-2004 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """EventScheduler :version: $Revision:$ :author: Logilab :copyright: 2001-2004 LOGILAB S.A. (Paris, FRANCE) 2004 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = "$Id: EventScheduler.py,v 1.9 2001/12/07 16:01:33 syt Exp $" __docformat__ = "restructuredtext en" import time from bisect import insort from Queue import Queue, Empty from narval.services.BaseService import BaseService from narval.utils import ShallowCalendar class EventScheduler(BaseService) : """the event scheduler is used to handle time condition :type queue_lenght: int :cvar queue_lenght: lenght of the event queue :type max_delay: int :cvar max_delay: maximum sleep delay in the processing loop :type timefunc: function :cvar timefunc: function used to get date / time :type delayfunc: function :cvar delayfunc: function used to wait some time :type in_queue: Queue.Queue :ivar in_queue: pending event queue :type sched_queue: list :ivar sched_queue: list of events ready to be fired :type event_cb: function :ivar event_cb: callback used to notify that an event is ready """ queue_lenght = 200 max_delay = 1 timefunc = time.time delayfunc = time.sleep def __init__(self, event_callback): BaseService.__init__(self, 'EventScheduler') self.event_cb = event_callback self.in_queue = Queue(self.queue_lenght) self.sched_list = [] def stop(self) : """stop the event schedule service""" self.loop = 0 self.schedule_event( (None, 0, 0, False) ) # thread blocks on self.queue.get() self.thread.join() log(LOG_NOTICE, 'EventScheduler stopped') def schedule_event(self, event): """add an event to schedule :type event: tuple :param event: (event, when, period, date) """ self.in_queue.put_nowait(event) def _event_ready(self, event, date, period): """called when an event is ready: * call the callback given to the constructor to notify an event is ready * if the event has a period, reschedule it :type event: tuple :param event: event definition as a 3-uple :type date: float or `ShallowCalendar` :param date: :type period: int or None :param period: optional period of the event """ self.event_cb(event) if period: if isinstance(date, ShallowCalendar): self._enter_absolute_event(date.get_next_date(), 5, (event, date, period)) else: self._enter_delayed_event(period, 5, (event, date, period)) def _run(self): """service's main loop, test for fireable events at each iteration""" while self.loop: self._process_in_queue() absdate, priority, (event, date, period) = self.sched_list[0] now = self.timefunc() if now < absdate: self.delayfunc(min(absdate - now, self.max_delay)) else: del self.sched_list[0] self._event_ready(event, date, period) # let other threads run self.delayfunc(0) def _process_in_queue(self): """process events in the incoming queue while there are some events to proceed """ # if there are some event in the sched queue, don't block on the # incoming queue, just add pending events to the sched queue if self.sched_list: try: while 1: event, when, period, absdate = self.in_queue.get_nowait() self._enter_event(event, when, period, absdate) except Empty: pass else: event, when, period, absdate = self.in_queue.get() self._enter_event(event, when, period, absdate) def _enter_event(self, event, when, period, absdate): date = when if absdate: if isinstance(when, ShallowCalendar): when = date.get_next_date() self._enter_absolute_event(when, 5, (event, date, period)) else: self._enter_delayed_event(when, 5, (event, date, period)) def _enter_absolute_event(self, absdate, priority, (event, date, period)): """enter an absolute event. The element is inserted according to its firing date in the sched_list :type date: float :param date: time of the event using seconds until the Epoch :type priority: int :param priority: event's priority :type args: list or tuple :param args: arguments to give to the _event_ready method (ie the event and its period) :rtype: tuple :return: the entered event as a tuple (date, priority, arguments) """ dated_event = absdate, priority, (event, date, period) insort(self.sched_list, dated_event) return event def _enter_delayed_event(self, delay, priority, (event, date, period)): """enter an delayed event :type delay: int :param delay: delay in second of the event, until now :type priority: int :param priority: event's priority :type args: list or tuple :param args: arguments to give to the _event_ready method (ie the event and its period) :rtype: tuple :return: the entered event as a tuple (date, priority, arguments) """ return self._enter_absolute_event(self.timefunc() + delay, priority, (event, date, period))