# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2001-2005 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """classes used to handle recipe elements :version: $Revision:$ :author: Logilab :copyright: 2001-2005 LOGILAB S.A. (Paris, FRANCE) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = "$Id: ALAbstraction.py,v 1.36 2004/04/02 10:06:46 syt Exp $" __docformat__ = "restructuredtext en" from narval.public import AL_NS, NO_NS from narval.element import NSAttribute, NSAttributesElement, \ DescriptionableMixin from narval.xml_handlers import BaseXMLHandler, DescriptionHandler from narval.serialutils import yn_value, yn_rev_value from narval.action import ALRecipeItem from narval.prototype import PrototypeHandler, Prototype, Condition, \ PrototypeException class Transition(NSAttributesElement, DescriptionableMixin): """recipe transition element (static) :type id: str :ivar id: the transition's identifier, must be unique within a recipe :type priority: int :ivar priority: the transition's priority :type in_steps: list :ivar in_steps: list of references to steps incoming to this transition :type to_steps: list :ivar to_steps: list of references to steps outgoing from this transition :type on_error: dict :ivar on_error: dictionary where keys are the identifier of error inputs :type time_condition: list :ivar time_condition: list of time conditions on the transition, if any :type conditions: list :ivar conditions: list of `narval.prototype.Condition` objects to satisfy before being fireable """ __xml_element__ = (AL_NS, 'transition') id = NSAttribute(NO_NS, None, str, str) priority = NSAttribute(NO_NS, 0, int, str) def __init__(self, **kwargs): super(Transition, self).__init__(**kwargs) self.in_steps, self.out_steps = [], [] self.on_error = {} self.time_conditions = [] self.conditions = [] def children_as_xml(self, encoding='UTF-8'): """return the xml representation of the transition's children elements :type encoding: str :param encoding: the encoding to use in the returned string, default to UTF-8 :rtype: str :return: the transition as an XML document """ result = [] descr_xml = self.description_as_xml(encoding) descr_xml and result.append(descr_xml) for step in self.in_steps: result.append('' % ( step.id, yn_rev_value(self.on_error[step.id]))) for step in self.out_steps: result.append('' % step.id) for condition in self.conditions: result.append(condition.as_xml(encoding)) for cond in self.time_conditions: result.append('' % cond) return '\n'.join(result) def add_condition(self, condition): """add a condition on the transition :type condition: `narval.prototype.Condition` :param condition: Condition instance to add """ self.conditions.append(condition) def add_in_step(self, step, on_error=False): """add an input step :type step: `Step` :param step: step to add to transition's inputs :type on_error: str :param on_error: one of 'yes' or 'no' indicating whether the input is triggered on error """ assert isinstance(on_error, bool) step_id = step.id self.on_error[step_id] = on_error self.in_steps.append(step) def add_out_step(self, step): """add an output step :type step: `Step` :param step: step to add to transition's outputs """ self.out_steps.append(step) def remove_in_step(self, step): """remove an input step :type step: `Step` :param step: step to remove from transition's inputs """ self.in_steps.remove(step) del self.on_error[step.id] def remove_out_step(self, step): """remove an output step :type step: `Step` :param step: step to remove from transition's outputs """ self.out_steps.remove(step) class Step(NSAttributesElement, DescriptionableMixin): """recipe step element (static) :type id: str :ivar id: the step's identifier, must be unique within a recipe :type type: str :ivar type: the step's type, 'recipe' or 'action' :type target: str :ivar target: the step's target name, . :type foreach: str :ivar foreach: optional identifier of an input on which the step should be repeated :type label: str :ivar label: label of the step :type in_transitions: list :ivar in_transitions: incoming transitions (`Transition`) :type out_transitions: list :ivar out_transitions: outgoing transitions (`Transition`) :type arguments: str or list :ivar arguments: step arguments as an XML snippet or as a list of elements (`narval.public.ALElement`) :type prototype: `narval.prototype.Prototype` :ivar prototype: step's prototype """ __xml_element__ = (AL_NS, 'step') id = NSAttribute(NO_NS, None, str, str) type = NSAttribute(NO_NS, None, str, str) priority = NSAttribute(NO_NS, 0, int, str) target = NSAttribute(NO_NS, None, str, str) foreach = NSAttribute(NO_NS, None, str, str) def __init__(self, label='', arguments='', **kwargs): super(Step, self).__init__(**kwargs) # FIXME: label should be supported for multiple languages self.label = u'' self.arguments = u'' self.in_transitions, self.out_transitions = [], [] self.prototype = Prototype() def children_as_xml(self, encoding='UTF-8'): """return the xml representation of the step :type encoding: str :param encoding: the encoding to use in the returned string, default to UTF-8 :rtype: str :return: the step as an XML document """ result = [] if self.label: label = '%s' % self.label.encode(encoding) result.append(label) descr_xml = self.description_as_xml(encoding) descr_xml and result.append(descr_xml) proto_xml = self.prototype.as_xml(encoding) proto_xml and result.append(proto_xml) if self.arguments: result.append('') if isinstance(self.arguments, unicode): result.append(self.arguments.encode(encoding)) else: for elt in self.arguments: result.append(elt.as_xml(encoding)) result.append('') return '\n'.join(result) class RecipeXMLHandler(BaseXMLHandler): """XML handler for recipe elements""" tr_class = Transition st_class = Step def __init__(self, elmt, ns_context, locator): super(RecipeXMLHandler, self).__init__(elmt, ns_context, locator) self._stack = [self.elmt] self._sub_hdlr = None def start_element(self, name, attrs): """SAX callback: start a new xml node :type name: tuple :param name: the tag name as a tuple (uri, name) :type attrs: dict :param attrs: the node's attribute values, indexed by attribute's name as a tuple (uri, name) """ prefix, local = name if self._stack[-1] == 'arguments': # namespaces handling qname, ns = self.get_qname(name) result = [ns] for a_name, value in attrs.items(): a_qname, ns = self.get_qname(a_name) if not ns in result: result.append(ns) result.append('%s="%s"' % (a_qname, value)) self._stack[-2].arguments = '%s<%s%s>' % (self._stack[-2].arguments, qname, ' '.join(result)) return if prefix != AL_NS: log(LOG_ERR, 'unknown prefix %s in action prototype, tag %s ignored', (prefix, local)) return # prototype if local in ('input', 'output', 'condition'): self._sub_hdlr = PrototypeHandler(self._stack[-1], self._ns_context, self._locator) if self._sub_hdlr: self._sub_hdlr.start_element(name, attrs) elif local == 'description': self._sub_hdlr = DescriptionHandler(self._stack[-1], attrs.get((NO_NS, 'lang'), 'en')) elif local in('arguments', 'label', 'match'): self._stack.append(local) elif local == 'step': elmt = self.st_class() elmt.init_attrs(attrs) self.elmt.add_element(elmt) self._stack.append(elmt) elif local == 'transition': elmt = self.tr_class() elmt.init_attrs(attrs) self.elmt.add_element(elmt) self._stack.append(elmt) elif local == 'in': trans = self._stack[-1] trans.add_in_step(self.elmt.elements[attrs[(NO_NS, 'idref')]], yn_value(attrs.get((NO_NS, 'onError'), 'no'))) elif local == 'out': trans = self._stack[-1] try: trans.add_out_step(self.elmt.elements[attrs[(NO_NS, 'idref')]]) except KeyError: raise Exception('Bad recipe %s: no such step %s (defined: %s' % (self.elmt.name, attrs[(NO_NS, 'idref')], self.elmt.elements.keys())) elif local == 'condition': cond = Condition() cond.init_attrs(attrs) self._stack[-1].conditions.append(cond) self._stack.append(cond) elif local == 'time': self._stack[-1].time_conditions.append( (attrs.get((NO_NS, 'seconds'), '*'), attrs.get((NO_NS, 'minutes'), '*'), attrs.get((NO_NS, 'hours'), '*'), attrs.get((NO_NS, 'monthdays'), '*'), attrs.get((NO_NS, 'months'), '*'), attrs.get((NO_NS, 'weekdays'), '*'))) else: log(LOG_ERR, 'Ignoring tag %s', local) def end_element(self, name): """SAX callback: close a xml node :type name: tuple :param name: the tag name as a tuple (uri, name) """ prefix, local = name if self._stack[-1] == 'arguments' and not ( prefix == AL_NS and local == 'arguments'): self._stack[-2].arguments += '' % (self.get_qname(name)[0]) elif prefix != AL_NS: return if self._sub_hdlr: try: self._sub_hdlr.end_element(name) except AttributeError: # description handler self._sub_hdlr = None elif local in ('step', 'transition', 'condition', 'match', 'arguments', 'label'): self._stack.pop() if local in ('input', 'output', 'condition'): self._sub_hdlr = None def characters(self, content): """SAX callback: get some (non empty) string :type content: unicode :param content: the non empty string to hold """ content = content.strip() if not content: return if self._sub_hdlr: self._sub_hdlr.characters(content) elif self._stack[-1] == 'arguments': self._stack[-2].arguments += content.replace('<', '<') elif self._stack[-1] == 'label': self._stack[-2].label += content else: log(LOG_ERR, 'Duh !!??? %s', self._stack[-1]) def get_qname(self, name): """get the qualified name for a tag name :type name: tuple :param name: the tag name as a tuple (uri, name) :rtype: tuple :return: the qualified name : and the formatted xml name space declaration """ if not name[0]: return name[1], '' else: for uri, prefix in self._ns_context.items(): if uri == name[0]: return '%s:%s' % (prefix, name[1]), \ ' xmlns:%s="%s"' % (prefix, uri) raise Exception('Unknown prefix for namespace %s', name[0]) class RecipeElement(ALRecipeItem): """a recipe element : a graph of steps and transitions :type name: str :ivar name: the recipe's name :type group: str :ivar group: the recipe's group (i.e. cookbook) :type restart: bool :ivar restart: flag indicating whether the plan should be automatically restarted when it's finished :type decay: int :ivar decay: number of seconds until the plan is forgotten after its end :type elements: dict :ivar elements: dictionary of steps and transitions in the recipe, indexed by their identifier :type memory: `narval.memory.Memory` or None :ivar memory: a reference to the interpreter's memory, set when the element is added to memory """ __xml_element__ = (AL_NS, 'recipe') __child_handler__ = RecipeXMLHandler restart = NSAttribute(NO_NS, False, yn_value, yn_rev_value) # default DECAY set to 90 seconds, because we use too much RAM decay = NSAttribute(NO_NS, 90, int, str) def __init__(self, **kwargs): super(RecipeElement, self).__init__(**kwargs) # recipe's steps and transitions indexed by their identifier self.elements = {} self.check, self.errors = None, '' self.memory = None self.start_step, self.end_step = None, None def children_as_xml(self, encoding='UTF-8'): """return the xml representation of the recipe :type encoding: str :param encoding: the encoding to use in the returned string, default to UTF-8 :rtype: str :return: the recipe as an XML document """ result = [] descr_xml = self.description_as_xml(encoding) descr_xml and result.append(descr_xml) # first serialize steps for elmt in self.elements.values(): if isinstance(elmt, Step): result.append(elmt.as_xml(encoding, namespaces_def=False)) # then serialize transitions for elmt in self.elements.values(): if not isinstance(elmt, Step): result.append(elmt.as_xml(encoding, namespaces_def=False)) return '\n'.join(result) def transitions(self): """return an iterator on transition instances in this recipe""" for s_or_t in self.elements.values(): if isinstance(s_or_t, Transition): yield s_or_t def steps(self): """return an iterator on step instances in this recipe""" for s_or_t in self.elements.values(): if isinstance(s_or_t, Step): yield s_or_t def check_syntax(self) : """check the recipe's validity :rtype: tuple :return: a 2-uple (status, error) where status is 'bad' or 'ok'. If status is 'bad', error is a string describing detected errors """ # if that recipe was checked, return result if self.check is not None: return self.check, self.errors log(LOG_INFO, 'check syntax for recipe %s.%s at %s', (self.group, self.name, id(self))) errors = [] # FIXME: check prototype matchs syntax (SyntaxError, NameError...) # FIXME: detect cycles # make lists of steps / transitions steps_io = {} for step in self.steps(): steps_io[step.id] = ([], []) if step.type == 'recipe' : try: self.memory.get_recipe(step.target) except KeyError: errors.append('* bad recipe reference %s in step %s' % (step.target, step.id)) elif step.type == 'action' : # check action exists try: action = self.memory.get_action(step.target) except KeyError: errors.append('* bad action reference %s for step %s' % (step.target, step.id)) else: # check prototypes try: step.prototype.check(action) except PrototypeException, ex : errors.append('* prototype error for step %s: %s' % (step.id, str(ex))) else : msg = '* missing "type" attribute for step %s' % step.id errors.append(msg) # check that transitions are pointing to existing steps for transition in self.transitions() : #inout = transition.in_steps + transition.out_steps for ref in transition.in_steps: ref = ref.id if not steps_io.has_key(ref): errors.append('* bad step reference %s in transition %s' % ( ref, transition.id)) else: steps_io[ref][1].append(transition.id) for ref in transition.out_steps: ref = ref.id if not steps_io.has_key(ref): errors.append('* bad step reference %s in transition %s' % ( ref, transition.id)) else: steps_io[ref][0].append(transition.id) # check start step and end step start_cands, end_cands = [], [] for sid in steps_io.keys(): if len(steps_io[sid][0]) == 0: start_cands.append(sid) if len(steps_io[sid][1]) == 0: end_cands.append(sid) if len(start_cands) != 1: msg = '* Unable to find start step (candidats:%s)' % start_cands errors.append(msg) else: self.start_step = start_cands[0] if len(end_cands) != 1: msg = '* Unable to find end step (candidats:%s)' % end_cands errors.append(msg) else: self.end_step = end_cands[0] if errors: self.check = 'bad' errors.insert(0, 'Bad recipe %s:' % self.name) self.errors = '\n'.join(errors) else: self.check = 'ok' return self.check, self.errors # Edit interface ########################################################## def add_element(self, element): """add a step or a transition to the recipe :type element: `narval.public.ALElement` :param element: the element (step or transition) to add to the recipe :rtype: str :return: the identifier of the added element """ if self.elements.has_key(element.id): raise Exception('duplicated element id %s in recipe %s.%s' % ( element.id, self.group, self.name)) if not element.id: raise Exception('element with no id %s' % element) self.elements[element.id] = element return element.id def remove_element(self, element): """remove a step or a transition from the recipe :type element: `narval.public.ALElement` :param element: the element (step or transition) to remove from the recipe """ del self.elements[element.id]