# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany).
# Copyright (c) 2001-2005 LOGILAB S.A. (Paris, FRANCE).
#
# http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com
# http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""classes used to handle plan elements (i.e. running recipe)
:version: $Revision:$
:author: Logilab
:copyright:
2001-2005 LOGILAB S.A. (Paris, FRANCE)
2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany)
:contact:
http://www.logilab.fr/ -- mailto:contact@logilab.fr
http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com
"""
__revision__ = "$Id: ALAbstraction.py,v 1.36 2004/04/02 10:06:46 syt Exp $"
__docformat__ = "restructuredtext en"
from copy import copy
from logilab.common import intersection, difference
from narval import NO_NS, AL_NS
from narval.reader import REGISTRY
from narval.utils import ShallowCalendar
from narval.element import NSAttribute
from narval.recipe import RecipeXMLHandler, RecipeElement, Step, Transition
from narval.serialutils import C_STEP, C_PLAN, C_PARENT, C_MEM
from narval.prototype import Condition
from narval.tags import tag_element
from narval.delegates import RecipeDelegate, ActionDelegate
class StateMixin(object):
"""a mixin for stateful classes (PlanElement / PlanStep / PlanTransition)
:type SDD: dict
:cvar SDD:
states diagram definition (each entry is a state, associated with the
list of possible destination states from it
:type state: str
:ivar state: the current state
"""
SDD = {}
def set_state(self, state):
"""go to the given state
:type state: str
:param state: the new state
:raise AssertionError:
if the state is not reachable from the current state according to
the states diagram definition
"""
# -- precondition
oldstate = self.state
if oldstate is None:
self.state = state
elif state != oldstate:
assert state in self.SDD[oldstate], \
'Bad state diagram transition in %s %s from %s to %s' % (
self.__xml_element__[1], self.readable_name(), oldstate, state)
# change state
self.state = state
# propagate change
for func in self._get_callbacks(state):
func(self)
def tagger_id(self):
"""return a unique tagger id for a step / transition
:rtype: str
:return: the tagger identifier
"""
if hasattr(self, '_taggerid'):
id = self._taggerid
else:
group, name = self.plan.get_identity()
self._taggerid = id = '.'.join((group, name, self.id))
return id
def _get_callbacks(self, state):
"""return a list of functions / methods that should be called on a
state change event
:type state: str
:param state: the new state
:raise NotImplementedError: **must be provided by concrete classes**
"""
raise NotImplementedError()
class PlanTransition(Transition, StateMixin):
"""a plan transition is a transition in a running recipe (i.e. a plan)
:type plan: `PlanElement`
:ivar plan: the plan of this transition
"""
__xml_element__ = (AL_NS, 'transition')
state = NSAttribute(NO_NS, None, str, str)
SDD = {'wait-step': ['wait-time', 'wait-condition',
'fireable', 'impossible'],
'wait-time': ['wait-condition', 'fireable', 'impossible'],
'wait-condition': ['fireable', 'impossible'],
'fireable': ['fired', 'impossible'],
'fired': [],
'impossible': []
}
def __init__(self, transition, plan):
super(PlanTransition, self).__init__()
self.clone_attrs(transition._ns_attrs)
self.time_conditions = transition.time_conditions
self.conditions = []
for cond in transition.conditions:
clone = Condition()
clone.clone_attrs(cond._ns_attrs)
# FIXME: copy ?
clone.matches = cond.matches
clone.cmatches = cond.cmatches
self.conditions.append(clone)
clone.owner = self
self.justify_table = []
self.plan = plan
self.init_conditions = False
self.prev_steps_elmts = []
self.set_state('wait-step')
def readable_name(self):
return '%s.%s' % (self.plan.readable_name(), self.id)
def fire(self):
"""fire the transition :
* add elements going through the transition to the plan
* set state as 'fired'
* prepare outgoing steps
"""
# tag elements that correspond to 'use' conditions
for condition, list in self.justify_table :
if condition.use:
for element in list:
tag_element(element, self)
# add to the plan the elements that justify the transition
self.plan.add_elements(list)
# set new state
self.set_state('fired')
# run out_steps
for step in self.out_steps:
step.prepare(self)
def get_inputs(self, input, context):
"""select elements by priority, according to the input prototype
:type input: `narval.prototype.InputEntry`
:param input: an input prototype
:rtype: list
:return:
the list of elements matching the prototype in this transition or in
elements from incoming steps
"""
cands = []
for cond, list in self.justify_table:
cands += input.match_elements(list, context)
if not cands:
cands += input.match_elements(self.prev_steps_elmts, context)
return cands
# changelisteners interfaces ###############################################
def step_change(self, step):
"""a step has changed : verify state of in-steps until the transition
may be fireable (i.e. all incoming steps are finished (but the
transition may still have to wait some [time] conditions)
:type step: `PlanStep`
:param step: the step who's changed
"""
if self.state != 'fired':
for step in self.in_steps:
state = step.state
on_error = self.on_error[step.id]
# if true, transition is impossible
if ( state in ['history', 'impossible']
or (state == 'done' and on_error)
or (state in ('error', 'failed') and not on_error) ):
self.set_state( 'impossible')
break
# if true, transition may be possible later: wait
elif ( (state != 'done' and not on_error)
or (state not in ('error', 'failed') and on_error) ):
self.set_state( 'wait-step')
break
else:
# in-steps ok, check time condition
self._evaluate_time()
def element_change(self, element):
"""an element has changed : check if the element doesn't satisfy a
condition
:type element: `narval.public.ALElement`
:param element: the element who's changed
"""
if self.init_conditions and self.state == 'wait-condition':
self._evaluate_conditions_element(element)
self._check_conditions()
def time_condition_match(self):
"""hook called when a time condition is satified : go to the
'wait-condition' state
"""
self.set_state('wait-condition')
self._evaluate_conditions()
# private ##################################################################
def _get_callbacks(self, state):
"""return a list of functions / methods that should be called on a
state change event
:type state: str
:param state: the new state
:rtype: list
:return: the list of methods that should be called on state change
"""
result = [self.plan.memory.transition_change,
self.plan.transition_change]
for step in self.out_steps + self.in_steps:
result.append(step.transition_change)
return result
def _evaluate_time(self):
"""verify that **at least one** of the time conditions holds or wait
for it. If there is no time condition, evaluate elements conditions
"""
if self.time_conditions:
dates = []
for time_condition in self.time_conditions :
dates.append(ShallowCalendar(time_condition).get_next_date())
self.set_state('wait-time')
self.plan.memory.transition_wait_time(self, min(dates))
else :
# no time condition, proceed with other conditions
self._evaluate_conditions()
def _evaluate_conditions(self):
"""check elements conditions associated with the transition when all
input steps are done
"""
# if not initialized yet, do it now
if not self.init_conditions:
self._init_conditions()
self.init_conditions = True
self._check_conditions()
def _init_conditions(self):
"""intialize the transition:
1. fetch elements from incoming steps
2. try to get valid elements for optional condition, from fetched
elements or from the plan
"""
for step in self.in_steps :
for step_outputs in step.outputs:
for elmts in step_outputs.values():
self.prev_steps_elmts += elmts
for cond in self.conditions:
# order elements by priority
if cond.from_context <= C_STEP:
cond.match_elements(self.prev_steps_elmts)
#if len(cond.matching_element):
# cond.actual_context = C_STEP
if not cond.is_satisfied():
self.plan.get_inputs(cond)
def _evaluate_conditions_element(self, element):
"""check conditions associated with the transition when all input steps
are done (_evaluate_conditions done) and when an element is added in
memory, to check wether this element is satisfying some unsatisfied
condition
:type element: `narval.public.ALElement`
:param element: the element just added to the memory
"""
# get element context
if element in self.plan.p_elements:
e_context = C_PLAN
else:
if self.plan.get_parent_level(element) == -1:
e_context = C_MEM
else:
e_context = C_PARENT
for cond in self.conditions:
# better context and match ?
if cond.from_context <= e_context <= cond.to_context:
cond.match_elements([element])
cond.actual_context = e_context
def _check_conditions(self):
"""check that all transition's conditions are satisfied, and set the
state to fireable when if every thing is ok
"""
for cond in self.conditions:
if not cond.is_satisfied():
# if to_context == C_STEP here, the condition won't ever
# be satisfied
if cond.to_context == C_STEP:
self.set_state('impossible')
else:
self.set_state('wait-condition')
break
else:
# every condition is satisfied, init table and set fireable
for cond in self.conditions:
cands = cond.matching_elmts
if len(cands) > 0:
if not cond.list:
# FIXME: this is just a quick fix, used to fix pb with
# simultaneous presence elements
cands = [cands.pop()]
self.justify_table.append((cond, cands))
self.set_state('fireable')
class PlanStep(Step, StateMixin):
"""a plan step is a step in a running recipe (i.e. a plan)
:type plan: `PlanElement`
:ivar plan: the plan of this transition
:type delegate: None or `RecipeDelegate` or `ActionDelegate`
:ivar delegate:
the `narval.delegates.Delegate` object according to the step's target
:type outputs: list
:ivar outputs: elements produced by this step
"""
__xml_element__ = (AL_NS, 'step')
state = NSAttribute(NO_NS, None, str, str)
SDD = {'todo': ['ready', 'failed', 'impossible'],
'ready': ['running'],
'running': ['end', 'failed'],
'end': ['done', 'failed', 'error'],
'done': ['history'],
'history': [],
'impossible': [],
'failed': [],
'error': []
}
def __init__(self, step, plan):
super(PlanStep, self).__init__()
self.clone_attrs(step._ns_attrs)
# FIXME: make a copy
self.prototype = copy(step.prototype)
# convert arguments string to elements list
if step.arguments:
self.arguments = REGISTRY.from_string(
'%s' % step.arguments, level=1)
else:
self.arguments = []
self.plan = plan
self.fired_transition = None
self.outputs = []
self.delegate = None
self.prototype.set_owner(self)
self.set_state('todo')
def readable_name(self):
return '%s.%s' % (self.plan.readable_name(), self.id)
# Step element methods #####################################################
def prepare(self, transition=None):
"""prepare the step
:type transition: None or `PlanTransition`
:param transition: optional transition incoming to this step
"""
self.fired_transition = transition
if not self.delegate :
self._make_delegate()
self.delegate.prepare()
def run(self):
"""execute the step"""
self.delegate.run()
def end(self):
"""end the step"""
self.delegate.end()
def get_inputs(self, input, context):
"""select elements by priority, according to the input prototype
:type input: `narval.prototype.InputEntry`
:param input: an input prototype
:rtype: list
:return:
the list of elements matching the prototype in this step's arguments,
in the incoming transition and / or in the step's plan
"""
cands = []
from_ctx, to_ctx = input.from_context, input.to_context
if self.arguments:
cands = input.match_elements(self.arguments, context)
if self.fired_transition and from_ctx <= C_STEP and to_ctx >= C_STEP:
cands += self.fired_transition.get_inputs(input, context)
if not cands and to_ctx > C_STEP:
cands = self.plan.get_inputs(input, context)
return cands
# ChangeListener interface #################################################
def transition_change(self, transition):
"""transition change hook, called when a transition's status has
changed
:type transition: `PlanTransition`
:param transition: the transition who's status changed
"""
for transition in self.out_transitions :
if transition.state == 'fired' and self.state == 'done':
self.set_state( 'history')
break
else:
if self.in_transitions :
for transition in self.in_transitions :
if transition.state != 'impossible' :
break
else:
self.set_state( 'impossible')
def plan_change(self, plan):
"""a child plan has status changed, fix our state accordingly
:type plan: `PlanElement`
:param plan: the changing plan
"""
# if child plan is end or failed, this step is end or failed
if plan != self.plan:
child_plan_state = plan.state
if child_plan_state == 'end' :
self.set_state( 'end')
elif child_plan_state == 'failed' :
self.set_state( 'failed')
# private #################################################################
def _make_delegate(self):
"""create the delegate according to the step's type"""
type = self.type
assert type in ('recipe', 'action'), 'Unknown type %s' % type
if type == 'recipe' :
self.delegate = RecipeDelegate(self)
elif type == 'action' :
self.delegate = ActionDelegate(self)
def _get_callbacks(self, state):
"""return a list of functions / methods that should be called on a
state change event
:type state: str
:param state: the new state
:rtype: tuple or list
:return: the list of methods that should be called on state change
"""
if state == 'end' :
# shortcut propagation
return (self.plan.memory.step_change,)
result = [self.plan.memory.step_change, self.plan.step_change]
for transition in self.out_transitions:
result.append(transition.step_change)
return result
INTERNAL_CLASSES = {
(AL_NS, 'step'):PlanStep,
(AL_NS, 'transition') : PlanTransition}
class PlanXMLHandler(RecipeXMLHandler):
"""XML handler for recipe elements"""
def start_element(self, name, attrs):
"""SAX callback: start a new xml node
:type name: tuple
:param name: the tag name as a tuple (uri, name)
:type attrs: dict
:param attrs:
the node's attribute values, indexed by attribute's name as a tuple
(uri, name)
"""
super(PlanXMLHandler, self).start_element(name, attrs)
if name in INTERNAL_CLASSES:
orig_elmt = self._stack.pop()
self.elmt.remove_element(orig_elmt)
new_elmt = INTERNAL_CLASSES[name](orig_elmt, self.elmt)
new_elmt.setattr((NO_NS, 'state'), attrs[(NO_NS, 'state')])
self.elmt.add_element(new_elmt)
self._stack.append(new_elmt)
class PlanElement(RecipeElement, StateMixin):
"""a plan element is a running recipe: RecipeElement + execution context
:type p_elements: list
:ivar p_elements: elements in this plan
:type start_step: `Step`
:ivar start_step: first step of the recipe
:type end_step: `Step`
:ivar end_step: last step of the recipe
:type parent_plan: `PlanElement` or None
:ivar parent_plan: reference to the parent plan if any
:type parent_step: `Step` or None
:ivar parent_step: reference to the parent step in the parent plan if any
:type transitions: dict
:ivar transitions: dictionary of transitions indexed by their state
"""
__xml_element__ = (AL_NS, 'plan')
__child_handler__ = PlanXMLHandler
state = NSAttribute(NO_NS, None, str, str)
# State diagram Definition ################################################
SDD = {'ready': ['running'],
'running': ['fireable', 'failing', 'end'],
'fireable': ['running', 'failing', 'end'],
'failing': ['failed-end'],
'failed-end': ['failed'],
'failed': [],
'end': ['done'],
'done': []
}
def __init__(self, recipe=None):
super(PlanElement, self).__init__()
self.elements = {}
# list for elements used by this plan
self.p_elements = []
#self.elements_node = []
self.parent_plan = None
self.parent_step = None
self.transition_states = {'impossible': [],
'wait-step': [], 'wait-condition': [],
'wait-time': [], 'fireable': [], 'history': []}
if recipe is not None:
self.init_from_recipe(recipe)
else:
# FIXME: what to do (plan created from memory file...)
pass
def readable_name(self):
return '%s.%s' % (self.group, self.name)
def init_from_recipe(self, recipe):
self.clone_attrs(recipe._ns_attrs)
self.descriptions = recipe.descriptions.copy()
# instantiate recipe with dynamic objects
for trans in recipe.transitions():
self.elements[trans.id] = PlanTransition(trans, self)
for step in recipe.steps():
sid = step.id
self.elements[sid] = plan_step = PlanStep(step, self)
# FIXME "id" was a string in recipe, but is now an object's
# reference in a plan /!\
if sid == recipe.start_step:
self.start_step = plan_step
if sid == recipe.end_step:
self.end_step = plan_step
# relink transitions to steps
for plan_trans in self.transitions():
trans = recipe.elements[plan_trans.id]
on_err = trans.on_error
for ref in trans.in_steps:
step = self.elements[ref.id]
step.out_transitions.append(plan_trans)
plan_trans.add_in_step(step, on_err[step.id])
for ref in trans.out_steps:
step = self.elements[ref.id]
step.in_transitions.append(plan_trans)
plan_trans.add_out_step(step)
self._active_steps = {}
self.set_state('ready')
# Plan element methods #####################################################
def set_parents(self, parent_plan, parent_step):
"""set the plan parent information
:type parent_plan: `PlanElement`
:param parent_plan: the plan from which is issued this plan
:type parent_step: `Step`
:param parent_step: the step from which is issued this plan
"""
self.parent_plan, self.parent_step = parent_plan, parent_step
def start(self):
"""start the plan (state change to running and prepare the first step)
"""
self.set_state('running')
self.start_step.prepare()
def run(self):
"""run the plan : fire fireable transitions by priority order"""
# look for transition of highest priority
transitions, priority = [], -1
for t in self.transition_states['fireable'] :
try:
p = t.priority
except ValueError:
p = 0
if p > priority:
transitions = [t]
priority = p
elif p == priority:
transitions.append(t)
assert len(transitions), "Couldn't select transition to fire"
if len(transitions) > 1:
self.memory.mk_error(
"Can't choose between several fireable transitions",
'bad recipe')
for transition in self.transition_states['fireable']:
transition.set_state('impossible')
else:
transition = transitions[0]
# fire transition
transition.fire()
self.transition_states['fireable'] = []
def end(self):
"""end the plan : set state to 'done' or 'failed' according to the
current state, and restart the plan if necessary
"""
# change state
if self.state == 'end':
self.set_state( 'done')
elif self.state == 'failed-end':
self.set_state( 'failed')
# if plan has mark 'restart': restart it !
if self.restart:
plan = '%s.%s' % (self.group, self.name)
self.memory.start_plan(plan, None, None, [])
def get_inputs(self, input, context=None):
"""select elements by priority, according to the input prototype
:type input: `narval.prototype.InputEntry`
:param input: an input prototype
:rtype: list
:return:
the list of elements matching the prototype in this plan and its
parent plans, up to the memory
"""
from_ctx = input.from_context
to_ctx = input.to_context
cands = []
if from_ctx <= C_PLAN and to_ctx >= C_PLAN:
cands = input.match_elements(self.p_elements, context)
#input.actual_context = C_PLAN
if not cands and to_ctx >= C_PARENT:
if self.parent_step and from_ctx <= C_PARENT:
cands = self.parent_step.get_inputs(input, context)
#input.actual_context = C_PARENT
elif to_ctx >= C_MEM:
cands = self.memory.get_inputs(input, context)
# filter elements generated by this plan
cands = [elmt for elmt in cands if elmt.from_plan != self.eid]
#input.actual_context = C_MEM
return cands
def get_parent_level(self, element, p_level=0):
"""get the level, starting from where the given element has
been found. -1 means that the element has been found in memory
:type element: `narval.public.ALElement`
:param element: the element to search
:type p_level: int
:param p_level:
starting level index, you should usually not give this parameter
explicitly
:rtype: int
:return: the number of parent level where the element has been found
"""
if element in self.p_elements:
return p_level
if self.parent_plan:
return self.parent_plan.get_parent_level(element, p_level+1)
# element in memory
return -1
def add_elements(self, elements) :
"""add a list of elements to the plan (if element is not already in the
context of the plan)
:type elements: list
:param elements: list of elements to add to the plan
"""
for elmt in difference(elements, self.p_elements):
eid, elmt = self.memory.add_element(elmt)
self.p_elements.append(elmt)
# propagate happy news
self.memory.plan_change(self, 'add', elmt)
# FIXME (syt): duh ? what's this for ?
if self.parent_step:
self.parent_step.outputs.append({1:elements})
def remove_elements(self, elements):
"""remove a list of elements from the plan
:type elements: list
:param elements: list of elements to remove from the plan
"""
memory = self.memory
if elements is self.p_elements:
for elmt in elements:
# propagate sad news
memory.plan_change(self, 'remove', elmt)
# erase
memory.delete_ref_to_element(elmt)
self.p_elements = []
else:
p_elements = self.p_elements
for elmt in intersection(elements, p_elements):
for i in xrange(len(p_elements)):
if p_elements[i] is elmt:
p_elements.pop(i)
# propagate sad news
memory.plan_change(self, 'remove', elmt)
# erase
memory.delete_ref_to_element(elmt)
break
else:
log(LOG_ERR, 'element with eid=%s not in plan', elmt.eid)
def cleanup(self):
"""cleanup the plan: remove all elements in the plan's context"""
self.remove_elements(self.p_elements)
# change listeners interfaces ##############################################
def transition_change(self, transition):
"""transition change hook, called when a transition's status has
changed
:type transition: `PlanTransition`
:param transition: the transition who's status changed
"""
if transition.state == 'fireable' :
self.transition_states['fireable'].append(transition)
self.set_state( 'fireable')
def step_change(self, step):
"""step change hook, called when a step's status has changed
:type step: `PlanStep`
:param step: the step who's status changed
"""
s = step.state
if s == 'running':
self._set_active(step, 'running')
elif step is self.end_step :
if s == 'done':
self.set_state('end')
elif s in ('impossible','failed','error') :
self.set_state('failing')
if s in ('done', 'failed','error'):
self._set_finished(step)
if self.state == 'failing' and not self._active_steps:
self.set_state( 'failed-end')
def element_change(self, element):
"""an element has been modified in memory, propagate information to
transitions
:type element: narval.public.ALElement
:param element: the modified element
"""
for transition in self.transitions():
transition.element_change(element)
def _get_callbacks(self, state):
"""return a list of functions / methods that should be called on a
state change event
:type state: str
:param state: the new state
:rtype: tuple
:return: the list of methods that should be called on state change
"""
if self.parent_step:
return (self.memory.plan_change, self.parent_step.plan_change)
return (self.memory.plan_change,)
def _set_active(self, step, state):
"""set a step as active (the step should not already be active)
:type step: `PlanStep`
:param step: the step to activate
:type state: str
:param state: the active state
"""
if state is not None:
self.set_state(state)
assert not self._active_steps.has_key(step)
self._active_steps[step] = 1
def _set_finished(self, step):
"""set a step as done (the step should be active)
:type step: `PlanStep`
:param step: the step to finish
"""
if self._active_steps.has_key(step):
del self._active_steps[step]
def make_plan(recipe, parent_plan, parent_step):
"""create a plan from the recipe
:rtype: `PlanElement`
:return: a new plan instance for this recipe
"""
plan = PlanElement(recipe=recipe)
plan.set_parents(parent_plan, parent_step)
return plan
# hack: add the .instantiate method on the RecipeElement class to avoid a nasty
# cyclic dependancy
RecipeElement.make_plan = make_plan