# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2001-2005 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """miscellaneous utilities for the Narval interpreter :version: $Revision:$ :author: Logilab :copyright: 2001-2005 LOGILAB S.A. (Paris, FRANCE) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com :type REGISTRY: `Registry` :var REGISTRY: a reference to the interpreter's registry :type match_expression: bound method :var match_expression: provides a quick access to REGISTRY.match_expression """ __revision__ = '$Id: misc.py,v 1.1 2001/12/06 10:45:17 syt Exp $' __docformat__ = 'restructuredtext en' from time import localtime, mktime class Singleton(object): """a Singleton implementation, using __new__ to always return the same instance """ def __new__(cls, *args, **kwargs): try: inst = cls.__instance cls.__init__ = cls.__null_init__ except AttributeError: cls.__instance = inst = object.__new__(cls, *args, **kwargs) return inst def __null_init__(self, *args, **kwargs): """method used to override __init__ after instanciation""" pass # commented out since it's no used anymore today (syt on april 15 2005) ## class BinaryStore: ## """This class is intended to define a storage of binaries extracted from a ## web page, dictionary-like, and all the methods necessary for extraction, ## storage, referencing of those binaries ## :type dico: dict ## :ivar dico: dictionary of stored binaries indexed by their identifier ## """ ## def __init__(self, dico=None): ## self.dico = dico or {} ## self.id = 1 # 0 is reserved to tr.png img ## self.id_lock = Lock() ## def get_urls(self): ## """return the list of stored urls ## :rtype: list ## :return: the url of stored binaries ## """ ## return [url for (url, data) in self.dico.values() ] ## def get_binaries(self): ## """return the list of stored binaries ## :rtype: list ## :return: the stored binaries ## """ ## return [data for (url, data) in self.dico.values() ] ## def set_id(self): ## """create and return a new unique identifier ## :rtype: str ## :return: a new unique identifier ## """ ## self.id_lock.acquire() ## self.id += 1 ## self.id_lock.release() ## return str(self.id) ## def get_id_from_url(self, url): ## """get the unique identifier for the given url ## :type url: str ## :param url: the url of the stored binary ## :rtype: str ## :return: the id associated with the url ## """ ## # FIXME: to trash ## for key, val in self.dico.items(): ## if url == val[0]: ## return key ## def get_binary_from_id(self, uid): ## """get the binary data associated with the given unique identifier ## :type uid: str ## :param uid: the uid of the binary data ## :rtype: str or None ## :return: ## the binary data or None if there is no data associated with the ## given identifier ## """ ## try: ## return self.dico[uid][1] ## except KeyError: ## return None ## def get_binary_from_url(self, url): ## """get the binary data associated with the given url ## :type url: str ## :param url: the url of the binary data ## :rtype: str or None ## :return: ## the binary data or None if there is no data associated with the ## given url ## """ ## return self.get_binary_from_id(self.get_id_from_url(url)) ## def add_binary(self, url, data_binary=None): ## """add a binary to the store ## :type url: str ## :param url: url of the binary data ## :type data_binary: str or None ## :param data_binary: optional binary data located at the given url ## """ ## self.dico[self.set_id()] = (url, data_binary) ## def load_binary_from_id(self, uid): ## """load binary data associated with the given uid ## :type uid: str ## :param uid: the uid of the binary data ## :rtype: str ## :return: the binary data associated with the given uid ## """ ## # FIXME: cyclic dependancy ## from narval.public import normalize_url ## if self.dico[uid][1] is None: ## url = self.dico[uid][0] ## stream = urllib.urlopen(normalize_url(url)[0]) ## data_binary = stream.read() ## stream.close() ## else: ## data_binary = self.dico[uid][1] ## return data_binary ## def load_binary_from_url(self, url): ## """load binary data associated with the given url ## :type url: str ## :param url: the url of the binary data ## :rtype: str ## :return: the binary data associated with the given url ## """ ## return self.load_binary_from_id(self.get_id_from_url(url)) ## def clear(self): ## """remove all entries from the store""" ## self.dico = {} class WrapError(Exception): """calendar wrap error""" class ShallowCalendar: """simple calendar used to represent time conditions :type seconds: list :ivar seconds: list of seconds matched by the time condition :type minutes: list :ivar minutes: list of minutes matched by the time condition :type hours: list :ivar hours: list of hours matched by the time condition :type monthdays: list :ivar monthdays: list of month days matched by the time condition :type months: list :ivar months: list of months matched by the time condition :type weekdays: list :ivar weekdays: list of week days matched by the time condition :type year: int :ivar year: current year :type month_idx: int :ivar month_idx: current month index :type monthday_idx: int :ivar monthday_idx: current month day index :type hour_idx: int :ivar hour_idx: current hours index :type minute_idx: int :ivar minute_idx: current minutes index :type second_idx: int :ivar second_idx: current seconds index """ def __init__(self, (seconds, minutes, hours, monthdays, months, weekdays)): self.seconds = build_time_list(seconds, 0, 60) self.minutes = build_time_list(minutes, 0, 60) self.hours = build_time_list(hours, 0, 24) self.monthdays = build_time_list(monthdays, 1, 61) self.months = build_time_list(months, 1, 13) self.weekdays = build_time_list(weekdays, 0, 7) self.year = None self.month_idx = None self.monthday_idx = None self.hour_idx = None self.minute_idx = None self.second_idx = None self.latest_date = None self.resolution = 0 for i, time_str in enumerate((seconds, minutes, hours, monthdays, months)): if time_str != '*': self.resolution = i break def get_next_date(self, date=None): """return the next date where the time condition is matched, starting from the given date or now :type date: tuple or None :param date: date as returned by time.localtime() or None to start from now :rtype: float :return: date as a float as returned by time.time() (i.e. seconds since the Epoch) """ if date is not None or self.latest_date is None: self.latest_date = self.next_date(date or localtime(), True) else: self.latest_date = self.next_date(self.latest_date, False) return mktime(self.latest_date) def next_date(self, date, first=True): """return the next date where the time condition is matched, starting from the given date :type date: tuple :param date: date as returned by time.localtime() :rtype: float :return: next date as returned by time.localtime() """ y, m, d, h, mm, s = date[:6] self.year = y if first: find_next = self.first_find_next_in_ordered_list else: find_next = self.find_next_in_ordered_list try: self.month_idx = find_next(self.months, m, 3) except WrapError: # no next month in the list, go to next year, starting from january self.year += 1 self.month_idx = 0 try: self.monthday_idx = find_next(self.monthdays, d, 2) except WrapError: # no next month day in the list, go to next month self.next_month() self.monthday_idx = -1 self.next_day() try: self.hour_idx = find_next(self.hours, h, 1) except WrapError: # no next hour in the list, go to next day self.next_day() self.hour_idx = 0 try: self.minute_idx = find_next(self.minutes, mm, 0) except WrapError: # no next minute in the list, go to next hour self.next_hour() self.minute_idx = 0 try: self.second_idx = find_next(self.seconds, s, -1) except WrapError: # no next second in the list, go to next minute self.next_minute() self.second_idx = 0 # FIXME : handle day of week return (self.year, self.months[self.month_idx], self.monthdays[self.monthday_idx], self.hours[self.hour_idx], self.minutes[self.minute_idx], self.seconds[self.second_idx], -1,-1,-1) def next_month(self): """increment the calendar's month""" self.month_idx = self.month_idx + 1 if self.month_idx == len(self.months): self.month_idx = 0 self.year = self.year + 1 def next_day(self): """increment the calendar's day""" self.monthday_idx = self.monthday_idx + 1 monthday = self.monthdays[self.monthday_idx] if self.monthday_idx == len(self.monthdays) or \ not self._exist_day_in_month(monthday, self.months[self.month_idx]): self.monthday_idx = 0 self.next_month() if not self._exist_day_in_month(self.monthdays[self.monthday_idx], self.months[self.month_idx]): # the new month has 31 days, so no further testing is required self.next_month() def next_hour(self): """increment the calendar's hour""" self.hour_idx = self.hour_idx+1 if self.hour_idx == len(self.hours): self.hour_idx = 0 self.next_day() def next_minute(self): """increment the calendar's minute""" self.minute_idx = self.minute_idx+1 if self.minute_idx == len(self.minutes): self.minute_idx = 0 self.next_hour() def next_second(self): """increment the calendar's second""" self.second_idx = self.second_idx+1 if self.second_idx == len(self.seconds): self.second_idx = 0 self.next_minute() def first_find_next_in_ordered_list(self, olist, elt, resolution): """return the index of the element greater or equal than in the given ordered list. This method should be used for the first next date call, the `find_next_in_ordered_list` method should be used after that to avoid getting two times the same value. :type olist: list :param olist: ordered list of integer :type elt: int :param elt: reference's integer, we want the next one greater or equal in the list :type resolution: int :param resolution: current resolution value, ignored by this method :raise WrapError: if there is no greater or equal element :rtype: int :return: the index in the list of the number greater or equal than """ for i in xrange(len(olist)): if olist[i] >= elt: return i raise WrapError('Carry on!') def find_next_in_ordered_list(self, olist, elt, resolution): """return the index of the element greater or equal than in the given ordered list :type olist: list :param olist: ordered list of integer :type elt: int :param elt: reference's integer, we want the next one greater or equal in the list :type resolution: int :param resolution: current resolution value (if the resolution is equal to the calendar resolution, will return the next strictly greater element :raise WrapError: if there is no greater or equal element :rtype: int :return: the index in the list of the number greater or equal than """ if resolution == self.resolution: for i in xrange(len(olist)): if olist[i] > elt: return i else: for i in xrange(len(olist)): if olist[i] >= elt: return i raise WrapError('Carry on!') def _exist_day_in_month(self, day, month): """return true if the given day exists in the given month :type day: int :param day: day of the month :type month: int :param month: month of the year :rtype: bool :return: a flag indicating whether the day exists in the month """ if day == 31 and month not in (1, 3, 5, 7, 8, 10, 12): return False if day == 30 and month == 2: return False if day == 29 and month == 2 and self._february(self.year)==28: return False return True def _february(self, year=None): """special case to handle the february month :type year: int :param year: year for which we want to know if february as 28 or 29 days, default to the next february year :rtype: int :return: the number of days in february of the given year """ if not year: date = localtime() if date[1] > 2: year = date[0] + 1 else: year = date[0] if year % 4 or (year % 100 and not year % 400): return 28 else: return 29 def build_time_list(cron_string, minimum, maximum): """get a sorted list of time values (integer) according to the cron like string and the possible time range defined by minimum and maximum :type cron_string: str :param cron_string: cron like value defining possible value. Possible syntax are : * '*': all possible values in the range * '1': include value 1 * '1-10': include values from 1 to 10 * '1,5-10': include value 1 and values from 5 to 10 :type minimum: int :param minimum: minimal value of the range :type maximum: int :param maximum: maximal value of the range :rtype: list :return: the sorted list of possible time range """ if cron_string == '*': return range(minimum, maximum) answer = [] for val in cron_string.split(','): limit = val.split('-') if len(limit) == 1: answer.append(int(limit[0])) elif len(limit) >= 2: answer += range(int(limit[0]), int(limit[1]) + 1) else: log(LOG_ERR, 'ignoring weird value %r in range', val) answer.sort() return answer ## if __name__ == "__main__" : ## sc = ShallowCalendar(('0', '0', '*', '1,10,20,30', '*', '*')) ## print sc.get_next_date()