# Copyright (c) 2004-2005 LOGILAB S.A. (Paris, FRANCE). # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Some RDF related actions :version: $Revision:$ :author: Logilab :copyright: 2004-2005 LOGILAB S.A. (Paris, FRANCE) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = '$Id: PyLogDB $' __docformat__ = "restructuredtext en" import re from urllib import urlopen from cStringIO import StringIO from xml.sax import ContentHandler, make_parser from xml.sax.xmlreader import InputSource from narval.public import AL_NS, url_to_file from narval.elements import create_error from narval.elements.base import FileElement from narval.elements.rdf import RDFStatementElement, RDFRuleElement from narval.interfaces.base import IIMessage class IncorrectRuleDefinitionException(Exception): pass class RDFHandler(ContentHandler): """a xml handler to create RDFStatementElements from a XML-RDF file""" def __init__(self): ContentHandler.__init__(self) self.elements = [] # name spaces mapping self._ns_contexts = [{}] # contains uri -> prefix dicts self._current_context = self._ns_contexts[-1] self.subject = '' self.final_subject = '' self.process_person = False self.knows_mode = False self.person_elements = [] self.knows_elements = [] self.knows_nick = '' def startElement(self, head, attrs): # FIXME - Should use sax2 interface with NS support enabled rdf, name = head.split(':') self.object = '' if head == 'foaf:Person': self.process_person = True elif head == 'foaf:knows': self.knows_mode = True if rdf in ('foaf','rdfs') and attrs.has_key('rdf:resource'): self.object = attrs['rdf:resource'] if self.knows_mode and name == 'seeAlso': knows_nick = get_nick_from_url(self.object) if knows_nick: self.knows_nick = knows_nick self.person_elements.append(RDFStatementElement(self.final_subject, 'knows', knows_nick)) if name == "Description": self.subject = attrs['rdf:about'] elif name == "RDF": pass else: self.predicate = name def endElement(self, head): rdf, name = head.split(':') if head == 'foaf:Person' and not self.knows_mode: if self.final_subject: for element in self.person_elements: element.subject = self.final_subject self.elements.append(element) self.process_person = False if head == 'foaf:nick': if self.knows_mode: self.knows_nick = self.object else: self.final_subject = self.object if rdf == 'foaf' and self.process_person: if self.knows_mode: # TODO - get eid (nick) from url? if not self.knows_nick: self.knows_elements.append(RDFStatementElement('', self.predicate, self.knows_nick or self.object)) else: self.predicate = self.predicate.replace('seeAlso', 'isDescribedIn') self.elements.append(RDFStatementElement(self.knows_nick, self.predicate, self.object)) else: self.person_elements.append(RDFStatementElement(self.subject, self.predicate, self.object)) elif head == "rdf:Description" and not self.process_person: self.elements.append(RDFStatementElement(self.subject, self.predicate, self.object)) if head == 'foaf:knows': self.knows_mode = False for element in self.knows_elements: if self.knows_nick: element.subject = self.knows_nick else: element.subject = self.final_subject element.predicate = 'knows_ref' self.elements.append(element) self.knows_elements = [] self.knows_nick = '' def endPrefixMapping(self, prefix): self._current_context = self._ns_contexts.pop() def endDocument(self): return self.elements def characters(self, content): self.object += content.strip() def statements_to_xml_rdf(statements): return ''' %s ''' % '\n\n'.join([stmt.as_rdf_xml() for stmt in statements]) def text_to_rule(text): """create and return a RDFRuleElement from a specially formated text """ rule = RDFRuleElement() rule.implies = [] variables = {} text = text.replace(',', ' and ') if text.startswith('rule:'): text = text[5:] triplet = [] for each in text.strip().split(): if each == 'if': if len(triplet) != 3: raise IncorrectRuleDefinitionException( 'Length of triplet is not 3\n%s' % triplet) rule.statement = RDFStatementElement(*triplet) triplet = [] elif each == 'and': if len(triplet) != 3: raise IncorrectRuleDefinitionException( 'Length of triplet is not 3\n%s' % triplet) rule.implies.append(RDFStatementElement(*triplet)) triplet = [] else: triplet.append(each) else: if len(triplet) != 3: raise IncorrectRuleDefinitionException( 'Length of triplet is not 3\n%s' % triplet) rule.implies.append(RDFStatementElement(*triplet)) return rule FOAFNICK_RGX = re.compile('(.*?)') def get_nick_from_url(url): """ gets a nick from a foaf file in given url """ # FIXME had thread to do a timeout other_foaf = urlopen(url).read() match = FOAFNICK_RGX.search(other_foaf) if match: return match.group(1) return '' DEFAULT_RDF_KB_FILE_URL = 'file:$NARVAL_HOME/data/kb.rdf' def rdf_kb_file(obj=None): """return the path to the knowledge file and it's encoding :param obj: object adaptable to IURL :rtype: tuple(str, str) """ return url_to_file(obj, DEFAULT_RDF_KB_FILE_URL) # actions definitions start here ############################################## MOD_XML = ''' ''' % AL_NS def act_xmlrdf2stmts(inputs): cmd = inputs['command'] handler = RDFHandler() parser = make_parser() parser.setContentHandler(handler) close = True if cmd.args: if cmd.args[0] == 'string': stream = InputSource() stream.setByteStream(StringIO(cmd.args[1])) close = False else: # cmd.args[0] == 'url' # TODO - use args of cmd to specify file location # elif cmd.args: raise Exception('import rdf from specified file not implemented yet') # elif inputs.has_key('to-read') and inputs['to-read']: # openable = IOpen(inputs['to-read']) # stream = openable.open() else: raise Exception('no rdf source given !') parser.parse(stream) if close: stream.close() return {'statements' : handler.elements} MOD_XML = MOD_XML + """ produce a list of IRDFStatement elements from XML RDF ICommand(elmt).name == 'kb_rdf_import' IRDFStatement(elmt) """ def act_stmts2xmlrdf(inputs): rdf_xml_string = statements_to_xml_rdf(inputs['statements']) f = FileElement() f.address, f.encoding = rdf_kb_file(inputs['rdf-kb']) f.data = rdf_xml_string f.mode = 'w' return {'xmlrdf': f} MOD_XML = MOD_XML + """ produce XML-RDF from a list of IRDFStatement elements IRDFStatement(elmt) elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:rdf-kb' IURL(elmt) IFile(elmt) """ def act_ask_if_user_is_a_narval(inputs): presence = inputs['presence'] return {'answer' : presence.build_reply('are you a narval?')} MOD_XML = MOD_XML + """ ask if user sending its presence is an agent IIPresence(elmt) IIMessage(elmt) """ def act_state_you_are_a_narval(inputs): msg = IIMessage(inputs['msg']) ctrl = inputs['control'] master_infos = inputs['masterinfos'] myuser = master_infos.jabberid if ctrl and myuser: reply = 'I\'m %s\'s narval' % myuser.strip() else: reply = 'I\'m a narval' return {'answer': msg.build_reply(reply)} MOD_XML = MOD_XML + """ create a special message indicating we are a narval agent IIMessage(elmt).type == 'incoming' isinstance(elmt, BotConfigurationElement) isinstance(elmt, MasterInformationsElement) IIMessage(elmt).type == 'outgoing' """ def act_is_a_known_user_stmt(inputs): result = [] presence = inputs['presence'] if presence.is_from_groupchat(): # fail if this presence come from a conference room return {} return {'query-stmt': RDFStatementElement(presence.get_from_user(), 'is_a', None)} MOD_XML = MOD_XML + """ Make an RDF statement to query a kb if the presence is coming from a known user IIPresence(elmt).get_type() in ('available', 'subscribe') IRDFStatement(elmt) """ def act_is_a_narval_stmt(inputs): """create an RDF statement to query a kb if the presence is coming from a known narval """ msg = IIMessage(inputs['msg']) return {'query-stmt': RDFStatementElement(msg.get_from_user(), 'is_a', 'narval')} MOD_XML = MOD_XML + """ %s IIMessage(elmt).type == 'incoming' IRDFStatement(elmt) """ % act_is_a_narval_stmt.__doc__ def act_is_a_human_stmt(inputs): """create an RDF statement to query a kb if the presence is coming from a known human """ msg = IIMessage(inputs['msg']) return {'query-stmt': RDFStatementElement(msg.get_from_user(), 'is_a', 'human')} MOD_XML = MOD_XML + """ %s IIMessage(elmt).type == 'incoming' IRDFStatement(elmt) """ % act_is_a_human_stmt.__doc__ def act_stmts_for_user_info(inputs): cmd = inputs['command'] msg = cmd.from_msg from_user = str(msg.get_from_user()) is_a = RDFStatementElement(from_user, 'is_a', cmd.args[0]) stmts = [is_a] if len(cmd.args) > 1: assert is_a.object == 'narval' agent_of = cmd.args[1] stmts.append(RDFStatementElement(from_user, 'narval_of', agent_of)) # FIXME: I guess we can deduce this, but who knows an agent won't # have it's own agent someday... stmts.append(RDFStatementElement(agent_of, 'is_a', 'human')) return {'statement': stmts} MOD_XML = MOD_XML + """ Make RDF statements for info concerning a user ICommand(elmt).name == 'kb_add_user_info' IRDFStatement(elmt) """ ## def act_search_stmts_from_setup_conf(inputs): ## # XXX finish me ## result = [] ## for guest in inputs['cmd'].args[1].strip().split(): ## result.append(RDFStatementElement(None, 'narval_of', guest)) ## return {'query-stmts': result} ## MOD_XML = MOD_XML + """ ## ## Make an RDF statement to query a kb if the presence is coming from a known user ## ## IIPresence(elmt).get_type() in ('available', 'subscribe') ## ## ## IRDFStatement(elmt) ## ## """ def act_rule_from_text(inputs): cmd = inputs['command'] msg = cmd.from_msg try: rule = text_to_rule(cmd.args[0]) except IncorrectRuleDefinitionException, ex: error = create_error(str(ex), type='bad rule definition', from_msg=msg) return {'error': error} return {'rule':rule} MOD_XML = MOD_XML + """ Add an inference rule to the knowledge base ICommand(elmt).name == 'kb_add_rule' IRDFRule(elmt) """ MOD_XML += ""