# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2001-2005 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Engine is the Narval interpreter :version: $Revision:$ :author: Logilab :copyright: 2001-2005 LOGILAB S.A. (Paris, FRANCE) 2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = "$Id: Engine.py 20 2004-04-15 14:43:51Z syt $" __docformat__ = "restructuredtext en" import sys import os import threading from Queue import Queue, Empty from narval import bibal, config from narval.narvalrc import NarvalRC from narval.memory import Memory from narval.public import expand_vars, implements from narval.utils import ShallowCalendar from narval.reader import REGISTRY from narval.engine_interfaces import IThreadMgmt, IMemoryChangeListener, \ IPlanChangeListener, IStepChangeListener, ITransitionChangeListener, \ IDebug, IMessage, IInteraction from narval.action import ActionElement from narval.recipe import RecipeElement, Step from narval.plan import PlanElement from narval.services.EventScheduler import EventScheduler from narval.services.ComService import ComService from narval.services.ShellService import ShellService from narval.communication import RPCFactory from narval.interfaces.core import IListenOn from narval.elements.core import QuitElement, StartPlanElement def core_register(registry): """register core handlers and interfaces :param registry: the interpreter's registry :type registry: `narval.reader.Registry` """ registry.register_classes(ActionElement, RecipeElement, PlanElement) def recipe_from_action(action_name, name='??', group='??'): """make and return an anonymous recipe wrapping a single action""" recipe = RecipeElement(name=name, group=group) action_step = Step(id='1', type='action', target=action_name) recipe.add_element(action_step) return recipe class BadRecipe(Exception): pass class EngineThread(threading.Thread) : """an engine thread is a classic thread plus a reference to the engine :cvar engine: the reference to the engine :type engine: `Narval` """ engine = None def _Thread__stop(self): """overriden to notify the engine about the end of the tread""" self.engine.end_of_thread() threading.Thread._Thread__stop(self) class EngineRPCHandler: """mapping used by the RPC services to know which engine's methods are publishable :type Narval: `Narval` :cvar Narval: reference to the narval interpreter :type ComService: `ComService` :cvar ComService: reference to the communication service :type NAME: str :cvar NAME: name of the dispatcher :type EXPORTED_METHODS: dict :cvar EXPORTED_METHODS: dictionary defining methods accessible through rpc """ Narval = None ComService = None NAME = 'EngineEventDispatcher' EXPORTED_METHODS = { # listeners 'add_remote_listener': 'ComService', 'remove_remote_listener': 'ComService', # 'remove_remote_memory_listener':'ComService', # 'add_remote_engine_listener': 'ComService', # 'remove_remote_engine_listener':'ComService', # memory interface 'get_memory': 'Narval', 'get_element': 'Narval', 'add_element': 'Narval', 'remove_element': 'Narval', 'replace_element': 'Narval', 'start_plan': 'Narval', 'get_element_list': 'Narval', 'get_elements_list': 'Narval', # messaging 'post_message': 'Narval', # debug 'debug_step_one_step':'Narval', 'debug_suspend': 'Narval', 'debug_continue': 'Narval', # remote control 'save_recipes': 'Narval', 'shutdown': 'Narval', # defined in NEP 'instantiate_handler_recipe': 'Narval' } class Narval: """Narval is your pal ! Terminology: * memory: place where Narval stores all it knows of. * elements: pieces of information stored in memory * recipe: a recipe is an element that describes a sequence of steps, separated by transitions. A recipe does nothing. It is knowledge. * plan: a plan is an element. A plan is an instance of a recipe. A plan actually does something. * step: A step can point to an action or a recipe. * transition: a transition links steps in a recipe. it can be fired off when the associated condition is true. Other advantages of Narval are supposed to be: * its reflexivity: it can reason about what it is doing, as what it is doing are running plans and plans are elements in memory. it can reason about what it knows how to do, as recipes are elements in memory. It can reason about what it can do as each action is defined as a transformation (set of inputs and outputs). * it has possibilities for machine learning. because everything is recorded, and because of the above flexibility, lots of possibilities are opened for automatic learning. :type registry: `narval.reader.Registry` :ivar registry: the interpreter's registry :type alive: bool :ivar alive: indicates whether the interpreter is running :type quit_when_done: bool :ivar quit_when_done: indicates whether the interpreter should stop when there is nothing more to do :type save_mem_on_quit: bool :ivar save_mem_on_quit: indicates whether the interpreter should dump its memory to a file on quit event :type event_queue: Queue :ivar event_queue: the event queue, cumulating posted events :type thread_queue: Queue :ivar thread_queue: the tread queue, cumulating waiting threads :type max_threads: int :ivar max_threads: maximum number of simultaneous threads :type step_by_step: bool :ivar step_by_step: indicates whether the interpreter is running in step by step mode :type debug: bool :ivar debug: indicates whether the interpreter is running in debug mode :type memory: `narval.Memory.Memory` :ivar memory: interpreter's main memory :type rc: `narval.narvalrc.NarvalRC` :ivar rc: interpreter's user configuration """ __implements__ = (IMemoryChangeListener, IStepChangeListener, ITransitionChangeListener, IPlanChangeListener, IThreadMgmt, IMessage, IDebug, IInteraction) def __init__(self, rcfile): self.test_mode = False # registry self.registry = REGISTRY core_register(REGISTRY) # alive and for long self.alive = True self.quit_when_done = False self.save_mem_on_quit = False # event queue self.event_queue = Queue(0) self.ready_steps = None self.ready_plans = None # thread management self.thread_queue = Queue(0) self.max_threads = 10 EngineThread.engine = self # debug self.step_by_step = False self.step_by_step_semaphore = threading.Semaphore(0) self.debug = 0 # create memory document and elements factory self.memory = Memory(self) # create scheduler self.event_scheduler = EventScheduler(self.post_event) # services self.rpc_servers = {} self.com_service = None self.shell_service = None # new shell service (contributed by Spex66) # protocol handlers: { key_protocol: handler instance } self.protocol_handlers = {} # user config self.rc = NarvalRC(rcfile) self.encoding = self.rc['encoding'] def run(self) : """mainloop. Execute plans endlessly unless a debugger says 'wait' or an event says 'quit' """ self.event_scheduler.start() while self.alive : # done ? if self.quit_when_done and self.event_queue.empty() and \ not self.memory.are_active_plans() : self.shutdown() continue # clear lists self.ready_steps = [] self.ready_plans = [] # debug if self.step_by_step : self.step_by_step_semaphore.acquire() # wait for event try: event = self._pop_event() except Empty: continue # catch all error there to avoid killing the event thread # (in which case narval won't be able to do anything anymore...) try: self._process_event(event) except SystemExit: raise except: log(LOG_ERR, 'internal error while processing event %s', (event,)) log_traceback(LOG_ERR, sys.exc_info()) # fire transitions for plan in self.ready_plans: try: plan.run() except: log(LOG_ERR, 'internal error while running plan %s', (plan,)) log_traceback(LOG_ERR, sys.exc_info()) # run steps for step in self.ready_steps: try: step.run() except: log(LOG_ERR, 'internal error while running step %s', (plan,)) log_traceback(LOG_ERR, sys.exc_info()) def shutdown(self): """shutdown the narval interpreter""" self.post_event( ('quit',) ) def quit(self): """shutdown all running services to stop the narval interpreter""" log(LOG_NOTICE, "I'm dying: event quit in the queue ! <:~(") self.alive = False # stop services log(LOG_NOTICE, 'stopping event scheduler') self.event_scheduler.stop() log(LOG_NOTICE, 'stopping rpc servers') self.stop_rpc() log(LOG_NOTICE, 'stopping communication service') self.stop_com_service() log(LOG_NOTICE, 'stopping shell service') self.stop_shell_service() # stop protocol handlers log(LOG_NOTICE, 'stopping protocol handlers') self.stop_protocol_handlers() if threading.activeCount() > 1: thread_names = [thd.getName() for thd in threading.enumerate() if thd.getName() != 'MainThread'] log(LOG_NOTICE, 'waiting for active threads %s', ', '.join(thread_names)) # save memory to file ? if self.save_mem_on_quit : log(LOG_NOTICE, 'saving memory') self.save_memory(self.save_mem_on_quit) self._commit_memory() del self.memory home = config.get_home() log(LOG_NOTICE, 'using %s as home to remove pid file', home) pid_file = os.path.join(home, 'narval.pid') if os.path.exists(pid_file): os.remove(pid_file) sys.exit(0) def _process_event(self, event): """process an engine event :type event: tuple :param event: the event to process, where the first element is the event's type and additional elements the event's arguments """ event_type = event[0] if event_type == 'end_of_thread': self._run_thread() # start plan ('start_plan', planName,(parent_plan,parent_step),elements) elif event_type == 'start_plan': self._start_plan(event[1]) # plan_end ('planEnd', plan), step_end ('stepEnd', step) elif event_type in ('plan_end', 'step_end'): event[1].end() # add_element ('add_element', element_as_xml) elif event_type == 'add_element': self.memory.add_element_as_string(event[1]) # remove_element ('remove_element', eid) elif event_type == 'remove_element': self.memory.remove_element_by_id(event[1]) # replace_element ('replace_element', eid, element_as_xml) elif event_type == 'replace_element': self.memory.replace_element_as_string(event[1], event[2]) # forget element ('forget_element', elmt) elif event_type == 'forget_element': self.memory.remove_element(event[1]) # time condition ('time-condition', transition) elif event_type == 'time_condition': event[1].time_condition_match() # post_message ('post_message', message_as_xml) elif event_type == 'post_message': self.memory.add_message_as_string(event[1]) # start rpc elif event_type == 'start_rpc': self.start_rpc(event[1], event[2]) # load memory elif event_type == 'load_memory': self.load_memory(event[1]) # event directly mapped to functions elif event_type in ('load_recipes', 'load_modules', 'load_elements', 'load_interfaces', 'load_protocol_handlers', 'stop_rpc', 'start_shell_service', 'stop_shell_service', 'quit'): getattr(self, event_type)() # unknown else: log(LOG_ERR, "unknown event %s", (event,)) # debug #################################################################### def debug_suspend(self): """suspend execution""" self.step_by_step = True def debug_continue(self): """resume execution""" self.step_by_step = False self.step_by_step_semaphore.release() def debug_step_one_step(self): """run one execution's step :rtype: bool :return: the value of the flag indicating if we are in debug mode """ if self.step_by_step: self.step_by_step_semaphore.release() return self.step_by_step # event queue & schedule ################################################### def post_event(self, event): """add an event to the event queue :type event: tuple :param event: the event to queue """ self._fire_event('engine', 'post_queue_event', str(id(event)), str(event)) self.event_queue.put(event) def schedule_event(self, event, when=0, period=0, date=False): """schedule an event with delay or date, and optional period :type event: tuple :param event: the event to schedule :type when: int or float :param when: delay in seconds or absolute date :type period: int :param period: period in seconds, if the event should be automatically restarted every X seconds :type date: bool :param date: flag indicating the when is a delay (date == False) or absolute date using the epoch representation """ evt = (event, when, period, date) self._fire_event('engine', 'post_scheduler_event', str(id(evt)), str(evt)) self.event_scheduler.schedule_event(evt) def _pop_event(self): """pop an event to proceed. If not event is available, this method will block until an event is posted. :rtype: tuple :return: the poped event to schedule """ # we have to set a timeout and catch Empty exception in the caller # so that we get a chance to get other interruptions # this seems to be due to an implementation change in python, making # interruptions unvailaible while blocking on Queue.get event = self.event_queue.get(timeout=2) self._fire_event('engine', 'pop_queue_event', str(id(event))) return event def _fire_event(self, target, method, *args): """record event to the communication service if it's running :type target: str :param target: the event's target :type method: str :param method: the event's name :param args: arbitrary additional arguements defining the vent """ if self.com_service is not None: self.com_service.add_event(target, method, args) # threads management ####################################################### def create_thread(self, *args, **kwargs): """add a thread to the thread queue and return immediatly, even if there is already the maximum number of threads running so that the newly created (well, futur) has to wait another to finish :param args: arguments list given to the thead constructor :param kwargs: named arguments dictionary given to the thead constructor """ self.thread_queue.put( (args, kwargs) ) self._run_thread() def end_of_thread(self): """notify a thread end""" log(LOG_INFO, 'thread done') self.post_event( ('end_of_thread',) ) def _run_thread(self): """try to instantiate and start threads in queue according to the current active threads number """ log(LOG_INFO, '%s threads running...', threading.activeCount()) if threading.activeCount() < self.max_threads : try: args, kargs = self.thread_queue.get_nowait() EngineThread(*args, **kargs).start() log(LOG_INFO, 'started new thread') except Empty: pass # services ################################################################# def start_rpc(self, proto, port): """start the rpc service on , using the protocol :type proto: str :param proto: the rpc protocol identifier, may be 'xmlrpc' or 'pyro' :type port: int :param port: the rpc port """ if not self.rpc_servers.has_key(proto): try: self.start_com_service() EngineRPCHandler.Narval = self EngineRPCHandler.ComService = self.com_service self.rpc_servers[proto] = RPCFactory.create_server( port, EngineRPCHandler, proto, self.debug) self.rpc_servers[proto].start() except Exception, ex: log(LOG_WARN, 'error while starting rpc service %r: %s', (proto, ex)) log_traceback(LOG_ERR, sys.exc_info()) else: log(LOG_WARN, '%s rpc service is already started', proto) def stop_rpc(self): """stop all running rpc servers""" for proto in self.rpc_servers.keys() : self.rpc_servers[proto].stop() def start_com_service(self) : """start the communication service if it isn't already running""" if self.com_service is None: self.com_service = ComService() self.com_service.start() def stop_com_service(self) : """stop the communication service if it's currently running""" if not self.com_service is None: self.com_service.stop() def start_shell_service(self): """start the shell service if it isn't already running""" if self.shell_service is None: self.shell_service = ShellService(engine=self) self.shell_service.start() def stop_shell_service(self): """stop the shell service if it's currently running""" if not self.shell_service is None: self.shell_service.stop() # protocol handlers management ############################################ def load_protocol_handlers(self) : """load available protocol handlers""" bibal.load_protocol_handlers(self) def register_protocol_handler(self, protocol_id, handler_class): """register a protocol handler""" self.protocol_handlers[protocol_id] = handler_class(self) def stop_protocol_handlers(self): """stop running protocol handlers""" for handler in self.protocol_handlers.values() : log(LOG_INFO, 'stopping %s', handler) handler.stop() def instantiate_handler_recipe(self, recipe_name, element, protocol, context_elements=None): """checks the element against the output declared in the handler's prototype, and instantiates a plan from the recipe with the element in the plan's memory CAUTION : we should add use="yes" in input/match, where element is inserted :type recipe_name: str :param recipe_name: the name of the recipe to instantiate (.) :type element: `narval.public.ALElement` :param element: the element generated by the protocol handler :type protocol: `narval.protocol_handlers.ProtocolHandler.ProtocolHandler` :param protocol: the protocol handler object :type context_elements: iterable :param element: additional elements to put in the instantiated plan's context """ if protocol.check_output(element): start_plan = StartPlanElement(recipe=recipe_name) start_plan.context = [element] if context_elements: start_plan.context.extend(context_elements) self.start_plan(start_plan) else: log(LOG_ERR, 'Found no corresponding between one of activated ' 'handlers and element that is tried to be added') def get_prototype_helper(self, protocol_handler_class) : """instantiate the given protocol handler and return the result of the get_prototype method on the instance :type protocol_handler_class: classobj :param protocol_handler_class: the protocol handler's class :rtype: str :return: the protocol handler's prototype as XML """ return protocol_handler_class(self).get_prototype() def control_handler_activation(self, element, callback, protocol_name): """activate/deactivate the right protocol handler when an element corresponding to an handler activators appears/disappears in engine's memory :type element: `narval.public.ALElement` :param element: the element added/removed to/from the memory :type callback: str :param callback: 'activate' or 'deactivate', if the handler should be activated (activator added) or deactivated (activator removed) :type protocol_name: str :param protocol_name: the name of the protocol (jabber / http) """ try: hdlr = self.protocol_handlers[protocol_name] getattr(hdlr, callback)(element) except Exception, ex: log(LOG_ERR, 'could not activate handler: %s', ex) log_traceback(LOG_ERR, sys.exc_info()) def _find_handler(self, element, node_type, protocol_name=None): """find the handler to which element corresponds among the available protocol handlers, using their prototypes :type element: `narval.public.ALElement` :param element: the new element added to the memory :type node_type: str :param node_type: one of 'input', 'output', 'activator' according to the protocol handler's prototype that should be used :type protocol_name: str or None :param protocol_name: the name of the protocol or None if each available protocol handler should be tried :rtype: `narval.protocol_handlers.ProtocolHandler.ProtocolHandler` or None :return: the protocol handler object corresponding to the element if any """ if protocol_name is None: for key, handler in self.protocol_handlers.items(): # check if node_type (input/output/activator) matches if getattr(handler, "check_%s" % node_type)(element): return (key, handler) else: check_func = getattr(self.protocol_handlers[protocol_name], "check_%s" % node_type) if check_func(element): return (protocol_name, self.protocol_handlers[protocol_name]) return None # (Un)Load stuff in memory ################################################ def check_date(self, element): """check load date for action / recipe and reload it if necessary :type element: `narval.public.ALElement` :param element: the recipe or action element to check :rtype: `narval.public.ALElement` :return: the reloaded element or the given one """ # FIXME - reload actions broken - customized actions? how to find actions in share assert isinstance(element, ActionElement) or \ isinstance(element, RecipeElement), element.__class__ if not hasattr(element, 'load_date'): return element group, name, path = element.group, element.name, element.from_file if isinstance(element, ActionElement): callback = bibal.reload_module get_f = self.memory.get_actions else: callback = bibal.reload_recipe get_f = self.memory.get_recipes if os.stat(path)[8] > int(element.load_date): new_elmts = {} def add(elmt): """dummy bibal callback, adding elements to the dictionary""" new_elmts[elmt.name] = elmt try: element = callback(self.registry, add, group, name, path) except: log_traceback(LOG_ERR, sys.exc_info()) else: # update memory elements for this cookbook / module for elmt in [elmt for elmt in get_f() if elmt.group == group]: assert elmt.eid, 'no eid on element %s'% elmt if new_elmts[elmt.name]: self.memory.replace_element(elmt, new_elmts[elmt.name]) del new_elmts[elmt.name] else: self.memory.remove_element(elmt) self.memory.add_elements(new_elmts.values()) return element def load_recipes(self): """load recipes from the narval home to the memory""" bibal.load_recipes(self.registry, self.memory.add_element) self._checkpoint_memory() def load_modules(self): """load actions from the narval home to the memory""" bibal.load_modules(self.registry, self.memory.add_element) self._checkpoint_memory() def load_elements(self): """register elements defined in the narval home""" bibal.load_elements(self.registry) self._checkpoint_memory() def load_interfaces(self): """register elements defined in the narval home""" bibal.load_interfaces(self.registry) self._checkpoint_memory() def load_memory(self, path=None): """load the initial memory file :type path: str :param path: optional path to the memory file """ bibal.load_memory(self.registry, self.memory.add_element, path) self._checkpoint_memory() def save_memory(self, path): """dump the current memory to a file :type path: str :param path: optional path to the memory file """ path = expand_vars(path) stream = open(path, 'w') stream.write(self.memory.as_xml(self.encoding)) stream.close() def save_recipes(self): """save all recipes (cookbooks) in memory""" errors = bibal.save_recipes(self.memory, self.encoding) if errors: msg = "The following files could not be saved. Please check that \ you have write access :\n%s" % '\n'.join(errors) self.memory.add_element(self.memory.mk_error(msg)) def _checkpoint_memory(self): pass def _commit_memory(self, get_mem=1): pass # Internal events ######################################################### # take care to do stuff *before* posting events to the event queue or the # thready-boogieman will catch you by processing the event before you get # time to blink ! def memory_change(self, action, element, old_element=None) : """memory change hook, called when an element has been added, removed or replaced in memory :type action: str :param action: one of 'add', 'remove', 'replace' according to the change's type :type element: `narval.public.ALElement` :param element: the element which has changed in the memory :type old_element: `narval.public.ALElement` :param old_element: the element being replaced in case of a 'replace' action, None for all other actions """ assert action in ('add', 'remove', 'replace') eid = element.eid assert eid, 'No eid on memory element %s' % element # fire event self._fire_event('memory', '%s_element' % action, eid) # event that calls for a service (listen-on, start plan, activators...) if action == 'add' and implements(element, IListenOn): self.post_event( ('start_rpc', element.type, element.port) ) if action != 'remove': if isinstance(element, StartPlanElement): self.start_plan(element) elif isinstance(element, QuitElement): self.shutdown() if self.protocol_handlers: h_act = self._find_handler(element, 'activator') if h_act: handler = h_act[0] # got a protocol handler activator element new_action = 'activate' if action == 'remove': new_action = 'deactivate' self.control_handler_activation(element, new_action, handler) elif action == 'add': # not an activator. is it a protocol handler request element ? h_input = self._find_handler(element, 'input', None) if h_input: _, protocol_handler = h_input protocol_handler.handle_outgoing(element) def step_change(self, step): """step change hook, called when a step's status has changed :type step: `narval.plan.PlanStep` :param step: the step who's status changed """ state = step.state self._fire_event('plan', 'step_status_changed', step.plan.eid, step.id, step.state) if state == 'ready' : self.ready_steps.append(step) elif state == 'end' : self.post_event( ('step_end', step) ) def plan_change(self, plan, action='state', element=None): """plan change hook, called when plan's status has changed or when an element has been added or removed in the plan's memory :type plan: `narval.plan.PlanElement` :param plan: the changing plan :type action: str :param action: one of 'state', 'add' or 'remove' according to the change's type :type element: `narval.public.ALElement` :param element: the element which has been added or removed in the plan's memory, implying action in ('add', 'remove') """ assert action in ('state', 'add', 'remove') if action == 'state' : state = plan.state self._fire_event('plan', 'plan_status_changed', plan.eid, state) if state == 'fireable' or state == 'ready': self.ready_plans.append(plan) elif state in ('end', 'failed-end'): self.post_event( ('plan_end', plan) ) elif state in ('done', 'failed'): # schedule decay self.schedule_event(('forget_element', plan), plan.decay) else: self._fire_event('plan', 'plan_%s_element' % action, plan.eid, element.eid) def transition_change(self, transition): """transition change hook, called when a transition's status has changed :type transition: `narval.plan.PlanTransition` :param transition: the transition who's status changed """ self._fire_event('plan', 'transition_status_changed', transition.plan.eid, transition.id, transition.state) # Start plan utilities #################################################### def _start_plan(self, element): """build a plan from a recipe :type element: `StartPlanElement` :param element: the element defining the plan to start """ assert isinstance(element, StartPlanElement) # Forget the start-plan element in 60 secs if it has been added to # memory by a plan. # Currently those created by mem.start_plan are not (and this method is # called by recipe's delegate and restartable plan). # # FIXME: should all start-plan elements be added to the memory ? if element.eid and element.from_plan is not None: element.persist = False # only schedule the forget_element event if the start plan element # is no more referenced (i.e. the plan which has generated it is # already forgotten) if self.memory.references_count(element.eid) == 0: self.schedule_event(('forget_element', element), 60) # ignore cancelled start plan if element.cancelled: return try: recipe = self._get_recipe(element.recipe) except (OSError, BadRecipe), ex: error = self.memory.mk_error(str(ex), 'bad recipe') self.memory.add_element(error) else: # make plan and add it to memory plan = recipe.make_plan(element.parent_plan, element.parent_step) self.memory.add_element(plan) context = element.context_elements(self.memory) log(LOG_DEBUG, 'instantiate %s with context %s', (plan, context)) plan.add_elements(context) self._fire_event('plan', 'instanciate_recipe', plan.eid, recipe.name) plan.start() def _get_recipe(self, recipe_name): """get a recipe from a recipe or action name: if it's actually an action name, build and initialize an anonymous recipe """ msg = None try: recipe = self.memory.get_recipe(recipe_name) recipe = self.check_date(recipe) msg = recipe.check_syntax()[1] except OSError: raise except KeyError: try: self.memory.get_action(recipe_name) except KeyError: msg = 'Unknown recipe or action %s' % (recipe_name) else: recipe = recipe_from_action(recipe_name) # FIXME: it's probably better to add the recipe to memory but # if so it'll be never removed currently recipe.memory = self.memory recipe.check_syntax() if msg: raise BadRecipe(msg) return recipe # Memory manipulation (IIinteraction) ##################################### def start_plan(self, element): """post an event to build a plan from a recipe when ready :type element: `StartPlanElement` :param element: the element defining the plan to start """ event = ('start_plan', element) # set persist on start plan to avoid it to be removed before the # plan is actually started... element.persist = True if element.time: # FIXME: create this object in the element class ? date = ShallowCalendar(element.time) self.schedule_event(event, when=date, period=True, date=True) elif element.delay: self.schedule_event(event, when=element.delay, period=element.delay_period, date=False) else: self.post_event(event) def get_memory(self): """return the narval's main memory as an xml string :rtype: str :return: the main memory's content as XML """ return self.memory.as_xml('UTF-8') def get_element(self, eid): """return the element with the given identifier as an xml string, or None if no element with the given eid exists in memory :type eid: int :param eid: the identifier of the element to retreive :rtype: str or None :return: the element with the given identifier as XML if found """ element = self.memory.get_element(eid) if element: return element.as_xml('UTF-8') return None def add_element(self, element_xml): """add a new element as an xml string to the memory :type element_xml: str :param element_xml: an XML string representing the element to add """ self.post_event( ('add_element', element_xml) ) def remove_element(self, eid): """remove the element with the given eid from the memory :type eid: int :param eid: the identifier of the element to remove """ self.post_event( ('remove_element', eid) ) def replace_element(self, eid, new_element_xml): """replace the element with the given eid by the given new element (as an xml string) :type eid: int :param eid: the identifier of the element to replace :type new_element_xml: str :param new_element_xml: an XML string representing the element that will replace the old one """ self.post_event( ('replace_element', eid, new_element_xml) ) def post_message(self, message_xml) : """post a message from another agent :type message_xml: str :param message_xml: the xml string describing the message """ self.post_event( ('post_message', message_xml) ) # FIXME: horn specific def get_elements_list(self): result = [] for elmt in self.memory.get_elements(): result.append(elmt.display_data()) return result def get_element_list(self,elementId): return self.memory.get_element(elementId).display_data() def _get_list(self, element): return (element.eid, element.display_type(), element.display_group(), element.display_name())