# Copyright (c) 2000-2005 LOGILAB S.A. (Paris, FRANCE). # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """basic actions to handle chatting :version: $Revision:$ :author: Logilab :copyright: 2000-2005 LOGILAB S.A. (Paris, FRANCE) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = '$Id: Chat.py,v $' __docformat__ = "restructuredtext en" import re import sys from os.path import join, exists, isabs from random import choice import ail from mx.DateTime import DateTimeDelta from narval import config from narval.public import AL_NS, TYPE_NS, url_to_file, expand_vars # FIXME: this exception is not jabber specific from narval.protocol_handlers.jabber import NotInForumException from narval.interfaces.base import IData, IIMessage, ICommand from narval.interfaces.core import IStartPlan from narval.elements import create_error, create_command from narval.elements.base import DataElement, EventElement, PhoneCallElement from narval.elements.email import EmailElement DEFAULT_RULE_FILE_URL = 'file:$NARVAL_HOME/data/chatbot.ail' DEFAULT_FOAF_FILE_URL = 'file:$NARVAL_HOME/data/foaf.rdf' _BRAINS = {} def rule_file(obj=None): """return the path to the rule file and it's encoding :param obj: object adaptable to IURL :rtype: tuple(str, str) """ return url_to_file(obj, DEFAULT_RULE_FILE_URL) def foaf_file(obj=None): """return the path to the foaf file and it's encoding :param obj: object adaptable to IURL :rtype: tuple(str, str) """ return url_to_file(obj, DEFAULT_FOAF_FILE_URL) def botconfig_to_ailvars(master, activator): """ :type master: `narval.elements.chat.MasterInformationsElement`or None :param master: the element containing master's information :type activator: `narval.interfaces.base.IActivator` or None :param activator: the activator element :rtype: dict :return: a dictionary of AIL environment variables based on botconfig """ envs = [] if master is not None: envs += [ ('MYUSERNAME', master.username), ('MYUSERMAIL', master.email), ('MYUSERSIP', master.sip), ] if activator is not None: envs += [ ('MYUSER', activator.master_id(master)), ] if activator is not None: envs += [ ('BOTNAME', activator.user), ] return dict([(var, value) for var, value in envs if value is not None]) def get_ail_brain(obj, master, activator, force_reload=False): """ :type obj: `narval.element.ALElement` :param obj: memory element describing where to find AIL rules :type master: `narval.elements.chat.MasterInformationsElement` :param master: the element containing master's information :type activator: `narval.interfaces.base.IActivator` :param activator: the activator element :rtype: `ail.Brain` :return: a AIL brain object for a rule file """ datapath, encoding = rule_file(obj) try: if force_reload: raise KeyError('ail reloading') # FIXME: env_context may have changed... brain = _BRAINS[datapath] except KeyError: env_context = botconfig_to_ailvars(master, activator) brain = ail.Brain(AILBrainHandler()) brain.load([datapath], env_context) _BRAINS[datapath] = brain return brain def room_and_guest_for_conf(cmd): room, guests = cmd.args msg = cmd.from_msg server = msg.get_server() # FIXME: jabber specific if not '@' in room: # FIXME: get the conference server from the msg (which will get it # from the activator) room = '%s@conference.%s' % (room, server) guest_ids = [] for guest in guests.split(): # FIXME: check guest is not already in room if not '@' in guest: guest = '%s@%s' % (guest, server) guest_ids.append(guest) return room, guest_ids def min_format_element(elmt): if hasattr(elmt, 'get_identity'): elid = '.'.join(elmt.get_identity()) else: elid = elmt.eid return '* %s: %s' % (elmt.__class__.__name__, elid) def xml_format_element(elmt): return elmt.as_xml() + '\n' def short_help(plan, ail_brain, ctrl, _): plan_help = plan.descriptions.get('en', '') if plan_help: return '* %s:\n %s\n' % (plan.name, plan_help) return '* %s\n' % (plan.name) def long_help(plan, ail_brain, ctrl, master_id): data = [short_help(plan, ail_brain, ctrl, master_id).strip()] match_cmds = None for tr in plan.transitions(): for cond in tr.conditions: match_cmds = extract_command(cond) if match_cmds is not None: break if match_cmds is not None: break if match_cmds: data.append(' Triggered by the following command(s) / rule(s):') for cmd in match_cmds: data.append(' - %s (access given to: %s)' % (cmd, ', '.join(ctrl.access_rights(master_id, cmd)))) for rule in ail_brain.rev_think('command', cmd): data.append('\t%s' % rule) return '\n'.join(data) + '\n' def extract_command(proto): """try to get possible commands triggering a prototype entry""" for match in proto.matches: if match.strip().startswith('ICommand'): try: match_cmds = [match.split('==', 1)[1].strip()] except IndexError: try: match_cmds = match.split(' in ', 1)[1].strip()[1:-1].split(',') except IndexError: continue # unquote before returning return [cmd.strip()[1:-1] for cmd in match_cmds] class DoNothing(Exception): pass class AILBrainHandler: """the ail brain handler provides some actions available in the ail rules file """ def command(self, cmdname, *args): return create_command(cmdname, args) def random(self, *args): return create_command('response', [choice(args)]) def ignore(self, *args): raise DoNothing() # actions definitions start here ############################################## MOD_XML = ''' ''' % AL_NS def act_is_for_me(inputs): """this action will produce an error if the message is not for me (ie in a group chat with a body prefixed by a "toto:" where toto is not my login). If it's prefixed of my own nick, remove the prefix from the original message's body """ msg = IIMessage(inputs['msg']) msg.explicitly_for_me = None if msg.get_type() == 'groupchat': text = msg.get_body() m = re.match('([\w-]+):', text) if m: me = inputs['activator'].user if m.group(1) != me: error = create_error(type='not for me', msg='this message isn\'t for me...') return {'error': error} else: msg.set_body(text.split(':', 1)[1]) msg.explicitly_for_me = True else: msg.explicitly_for_me = False else: msg.explicitly_for_me = True return {} MOD_XML = MOD_XML + """ %s IIMessage(elmt).type == 'incoming' msg.activator_eid == elmt.eid """ def act_ail_extract_command(inputs): """try to extract a command from an incoming message. this action produces an optional Command element if a command has been detected, or an optional Message element if the command can't be executed """ master_infos = inputs['masterinfos'] msg = inputs['msg'] text = msg.get_body() # The below "if" is commented because this test is already done # in act_is_for_me() ## if re.match('%s\s*:' % msg.get_to(), text): ## command = text.split(':', 1)[1].strip() ## elif msg.get_type() != 'groupchat': if msg.get_type() != 'groupchat': command = text.strip() brain = get_ail_brain(inputs['ailrules'], master_infos, inputs['activator']) try: command = brain.think(text) except DoNothing: return {} except ail.NoRewriteError: command = create_command('log_categorize', ()) log(LOG_DEBUG, '%s -> %s%s', (text, command.name, command.args)) ctrl = inputs['control'] if ctrl: from_user = msg.get_from_user() if not ctrl.has_access_right(msg.master_id(master_infos), from_user, command.name, msg.is_from_groupchat()): errormsg = '%s: you don\'t have access to the %s command' % ( from_user, command.name) if msg.get_type() == 'groupchat': # We don't want msg to appear in groupchats error = create_error(errormsg, type='unauthorized command') else: error = create_error(errormsg, type='chat', from_msg=msg) return {'error': error} # attach the incoming message to the command, so plan triggered by this command command.from_msg = msg return {'cmd': command} MOD_XML = MOD_XML + """ %s IIMessage(elmt).type == 'incoming' elmt.eid == msg.activator_eid isinstance(elmt, MasterInformationsElement) isinstance(elmt, BotConfigurationElement) elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:ailrules' IURL(elmt) ICommand(elmt) """ % act_ail_extract_command.__doc__ def act_reload_ail_brain(inputs): """force reloading of the ail brain""" get_ail_brain(inputs['ailrules'], inputs['masterinfos'], inputs['activator'], force_reload=True) cmd = ICommand(inputs['cmd']) msg = cmd.from_msg return {'answer': msg.build_reply('ail brain reloaded')} MOD_XML = MOD_XML + """ %s ICommand(elmt) elmt.eid == cmd.from_msg.activator_eid isinstance(elmt, MasterInformationsElement) elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:ailrules' IURL(elmt) IIMessage(elmt).type == 'outgoing' """ % act_reload_ail_brain.__doc__ def act_create_conference(inputs): """create a conference room""" cmd = ICommand(inputs['cmd']) room, guests = room_and_guest_for_conf(cmd) msg = cmd.from_msg # create a presence to add ourself to the conference room presence = msg.build_presence(pto='%s/%s' % (room, msg.get_to_user())) presence.set_status('Online') # create invitations invitations = [] for guest in guests: invitations.append(msg.build_invitation(room, guest)) return {'invitations' : invitations, 'presence': presence} MOD_XML += """ create a conference room and invite participants ICommand(elmt) IIPresence(elmt).type == 'outgoing' IIMessage(elmt).type == 'outgoing' """ def act_notify_delayed_conference(inputs): """notify attendees about a conference in a given delay""" start_plan = IStartPlan(inputs['startplan']) msg = IIMessage(inputs['msg']) by = '' if msg: by = ' by %s' % msg.get_from_user() delay = start_plan.delay / 60 conf_setup_cmd = start_plan.context[0] room, guests = room_and_guest_for_conf(conf_setup_cmd) if start_plan.cancelling: text = 'conference at %s has been rescheduled in %s minutes%s (%s)' % ( room, delay, by, start_plan.eid) else: text = 'a conference at %s will be created in %s minutes%s (%s)' % ( room, delay, by, start_plan.eid) messages = [msg.build_message(text, guest) for guest in guests] return {'messages' : messages} MOD_XML += """ create a conference room and invite participants IStartPlan(elmt) IIMessage(elmt) IIMessage(elmt).type == 'outgoing' """ def act_leave_conference(inputs): cmd = ICommand(inputs['cmd']) msg = cmd.from_msg args = cmd.args try: to = args[0] or msg.get_room() except NotInForumException: error = create_error('i\'m not in a forum !', 'chat', msg) return { 'error' : error} presence = msg.build_presence('unavailable', to) reply = msg.build_reply('byebye everyone...') return {'answer': reply, 'presence': presence} MOD_XML += """ leave a forum when asked to ICommand(elmt) IIPresence(elmt).type == 'outgoing' IIMessage(elmt).type == 'outgoing' """ def act_subscribe_to_user(inputs): """subscribe to a user (i.e. add it to our roster)""" cmd = ICommand(inputs['cmd']) user = cmd.args[0] msg = cmd.from_msg if not '@' in user: user = '%s@%s' % (user, msg.get_server(conf=False)) presence = msg.build_presence('subscribe', user) return {'presence': presence, 'answer': msg.build_reply('sent subscription request to %s' % user)} MOD_XML += """ %s ICommand(elmt) IIPresence(elmt).type == 'outgoing' IIMessage(elmt).type == 'outgoing' """ + act_subscribe_to_user.__doc__ from narval.extensions.xml_template import XMLTemplateReader, Generator, NoMoreQuestion def act_start_templated_discussion(inputs): """start a discussion where narval will ask questions to get (xml) data according to a xml template """ cmd = ICommand(inputs['cmd']) msg = cmd.from_msg template = cmd.args[0] dest_file = expand_vars(cmd.args[1]) if not isabs(dest_file): dest_file = join(config.get_home(), 'data', dest_file) msg.in_discussion.dest_file = dest_file template_file = join(config.get_home(), 'data', '%s.template' % template) if not exists(template_file): return {'answer': msg.build_reply('no such template %r' % str(template))} # switch mode msg.in_discussion.mode = 'fillingtemplate' template_obj = XMLTemplateReader().from_stream(file(template_file)) msg.in_discussion.disc_generator = gen = Generator(template_obj) return {'answer': msg.build_reply(gen.next_question())} MOD_XML += """ %s ICommand(elmt) IIMessage(elmt).type == 'outgoing' """ + act_start_templated_discussion.__doc__ def act_continue_templated_discussion(inputs): """continue a discussion where narval will ask questions to get (xml) data according to a xml template. When no more questions has to be asked, xml data is written back and we switch back to the default discussion mode. """ msg = IIMessage(inputs['msg']) assert msg.in_discussion.mode == 'fillingtemplate' gen = msg.in_discussion.disc_generator # FIXME replace by cmd ? if msg.get_body() == 'cancel': msg.in_discussion.mode = 'default' return {'answer': msg.build_reply('input canceled')} try: gen.push_answer(msg.get_body()) return {'answer': msg.build_reply(gen.next_question())} except NoMoreQuestion: msg.in_discussion.mode = 'default' dest_file = msg.in_discussion.dest_file # FIXME XXX - test if file exists and either rename or prompt for new name stream = open(dest_file, 'w') stream.write(gen.as_xml()) # FIXME: write generated file return {'answer': msg.build_reply('process completed, file written to %s' % dest_file)} MOD_XML += """ %s IIMessage(elmt).type == 'incoming' IIMessage(elmt).type == 'outgoing' """ + act_continue_templated_discussion.__doc__ def act_active_help_command(inputs): """generate help for active recipes waiting for command element """ cmd = ICommand(inputs['cmd']) # FIXME: support multiple help languages ? result = DataElement() if not inputs['active-plans']: result.data = 'no plans matching %r' % cmd.args[0] return {'help': result} actives = {} for plan in inputs['active-plans']: actives[plan.name] = plan plans = actives.keys() plans.sort() if len(cmd.args): result.data = 'help for plans matching %r:\n' % cmd.args[0] format = long_help else: result.data = 'active plans:\n' result.data += '(type "help " for more information about a specific plan)\n' format = short_help brain = get_ail_brain(inputs['ailrules'], None, inputs['masterinfos']) ctrl = inputs['control'] master_id = cmd.from_msg.master_id(inputs['masterinfos']) for plan_name in plans: result.data += format(actives[plan_name], brain, ctrl, master_id) return {'help': result} MOD_XML += ''' %s ICommand(elmt) elmt.state in ("running", "fireable") and elmt.group.startswith("active") (not cmd) or (not cmd.args) or (elmt.name.startswith(cmd.args[0])) isinstance(elmt, MasterInformationsElement) isinstance(elmt, BotConfigurationElement) elmt.getattr((TYPE_NS, "name")) == "uri:memory:ailrules" IURL(elmt) IData(elmt) ''' % act_active_help_command.__doc__ def act_set_mode(inputs): """set the discussion mode""" cmd = inputs['cmd'] mode = cmd.args[0] cmd.from_msg.in_discussion.mode = mode return {'msg': cmd.from_msg.build_reply('switched to %s mode' % mode)} MOD_XML = MOD_XML + """ %s ICommand(elmt) IIMessage(elmt).type == 'outgoing' """ % act_set_mode.__doc__ def act_search_command(inputs): """ handle a message describing a search command and output appropriate search command element """ cmd = ICommand(inputs['cmd']) msg = cmd.from_msg # trick: create a Ifile with no data, so it shouldn't be take # as something to log in latter actions # FIXME: this rely on the log action's prototype... logfile = msg.in_discussion.get_log_file(inputs['discussion-log-base']) data = DataElement() data.data = ' '.join(cmd.args) data.setattr((TYPE_NS, 'name'), 'uri:memory:search-pattern') return {'search': data, 'data': logfile} MOD_XML = MOD_XML + """ Handle 'search' command : grep a log file ICommand(elmt).name elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:discussion-log-base' IURL(elmt) IFile(elmt) IData(elmt) and elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:search-pattern' """ def act_date_command(inputs): """handle a message describing a date (meeting) command and output appropriate date (meeting) command element """ cmd = ICommand(inputs['cmd']) try: event = EventElement() event.from_time, other = event.extract_time_from_string(' '.join(cmd.args)) # by default make a 1hour event try: event.to_time = event.from_time + DateTimeDelta(0, 1) except NameError: pass event.subject = ' '.join(other.split()) return {'event': event} except Exception, ex: log_traceback(LOG_ERR, sys.exc_info()) error = create_error('error while parsing event: %r' % ex, 'chat', cmd.from_msg) return {'error': error} MOD_XML = MOD_XML + """ Handle 'date' command: insert an event to a calendar ICommand(elmt).name == 'date' IEvent(elmt) """ def act_tell_master_command(inputs): """handle 'tell-your-master' command""" cmd = ICommand(inputs['cmd']) master_informations = inputs['masterinfos'] myuser = cmd.from_msg.master_id(master_informations) activator = inputs['activator'] msg_author = cmd.from_msg.get_from_user() msgtext = '%s said : %s' % (msg_author, cmd.args[0]) if len(cmd.args) == 2: via = cmd.args[1] else: via = master_informations.prefered_via email = None phonecall = None if via == 'im': msg = activator.create_msg(msgtext, myuser) elif via == 'email': botname = activator.user sendto = master_informations.email email = EmailElement(to = sendto, from_name = botname, from_address = "%s@narvalland.com" % botname, subject='[TELL-MASTER] message from %s' % msg_author, type='outgoing') email.body = msgtext else: # via == 'phonecall' sendto = master_informations.sip phonecall = PhoneCallElement(sip_uri=sendto, sentence=msgtext) if via != 'im': info = cmd.from_msg.build_reply('your message was forwarded to %s' % sendto) msg = info return {'msg': msg, 'email': email, 'phonecall' : phonecall} # XXX refactor to merge infos from different config elements # (activity configuration, master conf, activator, etc.) MOD_XML = MOD_XML + """ Handle 'tell-your-master' command: forward messages to master via email / im / etc. ICommand(elmt).name == 'tell_master' cmd.from_msg.activator_eid == elmt.eid isinstance(elmt, MasterInformationsElement) IIMessage(elmt).type == 'outgoing' isinstance(elmt, EmailElement) and elmt.type == 'outgoing' IPhoneCall(elmt) """ def act_rdf_extract_command(inputs): """ask another agent for his kb""" cmd = ICommand(inputs['cmd']) msg = cmd.from_msg out = msg.build_reply('rdf-export im') out.set_to('%s@%s' % (cmd.args[0], msg.get_server())) return {'answer': out} MOD_XML = MOD_XML + """ ask another agent for his kb ICommand(elmt).name == 'rdf_extract' IIMessage(elmt).type == 'outgoing' """ def act_foaf_extract_command(inputs): """ask another agent for his foaf information""" cmd = ICommand(inputs['cmd']) msg = cmd.from_msg #FIXME: use following: out = msg.build_reply("show master's foaf") #as soon as ail fixed and can handle quotes out = msg.build_reply("show master's foaf") out.set_to('%s@%s' % (cmd.args[0], msg.get_server())) return {'answer': out} MOD_XML = MOD_XML + """ Process a chat sentence to extract a command ICommand(elmt).name IIMessage(elmt).type == 'outgoing' """ ## def act_handle_composite(inputs): ## """XXX WRITE ME, and is this finished / used somewhere ? ## """ ## cmd = inputs['cmd'] ## msg = cmd.from_msg ## data = inputs.get('data', None) ## if cmd.args == ['stop']: ## return {'answer': msg.build_reply(data.data)} ## if data is None: ## data = DataElement() ## data.setattr((AL_NS, 'persist'), 'yes') ## data.data = '' ## answer = 'macro started' ## data.data += msg and msg.get_body() or '' ## return {'data' : data, ## 'answer' : msg.build_reply('step saved. carry on.')} ## MOD_XML = MOD_XML + """ ## ## Handle 'composite' - test for incremental actions ## ## ICommand(elmt).name == 'macro' ## ## ## IData(elmt) and elmt.getattr((AL_NS, 'persist')) == 'yes' ## ## ## IIMessage(elmt).type == 'outgoing' ## ## ## IData(elmt) ## ## """ def act_email2msg(inputs): """warn user about a new email""" myuser = inputs['masterinfos'].myuser assert myuser, 'no myuser defined' email = inputs['email'] activator = inputs['activator'] body = 'a new email from %s is available (%r)' % (email.from_name, email.subject) return {'msg': activator.create_msg(body, myuser)} MOD_XML = MOD_XML + """ %s isinstance(elmt, MasterInformationsElement) isinstance(elmt, JabberActivatorElement) isinstance(elmt, EmailElement) IIMessage(elmt).type == 'outgoing' """ % act_email2msg.__doc__ def act_command2msg(inputs): """create a chat answer from a command""" cmd = ICommand(inputs['cmd']) msg = cmd.from_msg if msg.is_from_groupchat(): # ignore predifined response if we are in a group chat and the # message wasn't explicitly for me if not msg.explicitly_for_me: log(LOG_DEBUG, 'dropping %s since it\'s not explicitly for me', msg) return {} prefix = '%s: ' % msg.get_from_user() else: prefix = '' return {'answer' : msg.build_reply(prefix + ' '.join(cmd.args))} MOD_XML += """ %s ICommand(elmt).name == 'response' IIMessage(elmt).type == 'outgoing' """ % act_command2msg.__doc__ def act_data2msg(inputs): """create a chat answer from a list of IData elements """ msg = IIMessage(inputs['msg']) answer = '\n'.join([IData(elmt).data for elmt in inputs['response']]) return {'answer' : msg.build_reply(answer)} MOD_XML += """ %s IIMessage(elmt).type == 'incoming' IData(elmt) IIMessage(elmt).type == 'outgoing' """ % act_data2msg.__doc__ def act_dump_elements(inputs): """inspect narval's memory""" cmd = ICommand(inputs['cmd']) msg = cmd.from_msg answer = [] if len(cmd.args) > 1 and cmd.args[1] == 'xml': format_elmt = xml_format_element else: format_elmt = min_format_element for elmt in inputs['elmts']: answer.append(format_elmt(elmt)) if not answer: answer = ['no matching elements'] return {'answer' : msg.build_reply('\n'.join(answer))} MOD_XML = MOD_XML + """ %s ICommand(elmt) eval(cmd.args[0]) IIMessage(elmt).type == 'outgoing' """ % act_dump_elements.__doc__ def act_say_hello(inputs): """ this action will simply say 'hello' (used simply by tests - will only work from start_plan) """ cmd = ICommand(inputs['cmd']) msg = cmd.from_msg return {'answer' : msg.build_reply('hello')} MOD_XML = MOD_XML + """ %s ICommand(elmt) elmt.name == 'start_plan' IIMessage(elmt).type == 'outgoing' """ % act_say_hello.__doc__ MOD_XML += ""