# Copyright (c) 2004-2005 LOGILAB S.A. (Paris, FRANCE).
# http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany).
# http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""Some RDF related actions
:version: $Revision:$
:author: Logilab
:copyright:
2004-2005 LOGILAB S.A. (Paris, FRANCE)
2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany)
:contact:
http://www.logilab.fr/ -- mailto:contact@logilab.fr
http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com
"""
__revision__ = '$Id: PyLogDB $'
__docformat__ = "restructuredtext en"
import re
from urllib import urlopen
from cStringIO import StringIO
from xml.sax import ContentHandler, make_parser
from xml.sax.xmlreader import InputSource
from narval.public import AL_NS, url_to_file
from narval.elements import create_error
from narval.elements.base import FileElement
from narval.elements.rdf import RDFStatementElement, RDFRuleElement
from narval.interfaces.base import IIMessage
class IncorrectRuleDefinitionException(Exception): pass
class RDFHandler(ContentHandler):
"""a xml handler to create RDFStatementElements from a XML-RDF file"""
def __init__(self):
ContentHandler.__init__(self)
self.elements = []
# name spaces mapping
self._ns_contexts = [{}] # contains uri -> prefix dicts
self._current_context = self._ns_contexts[-1]
self.subject = ''
self.final_subject = ''
self.process_person = False
self.knows_mode = False
self.person_elements = []
self.knows_elements = []
self.knows_nick = ''
def startElement(self, head, attrs):
# FIXME - Should use sax2 interface with NS support enabled
rdf, name = head.split(':')
self.object = ''
if head == 'foaf:Person':
self.process_person = True
elif head == 'foaf:knows':
self.knows_mode = True
if rdf in ('foaf','rdfs') and attrs.has_key('rdf:resource'):
self.object = attrs['rdf:resource']
if self.knows_mode and name == 'seeAlso':
knows_nick = get_nick_from_url(self.object)
if knows_nick:
self.knows_nick = knows_nick
self.person_elements.append(RDFStatementElement(self.final_subject,
'knows',
knows_nick))
if name == "Description":
self.subject = attrs['rdf:about']
elif name == "RDF":
pass
else:
self.predicate = name
def endElement(self, head):
rdf, name = head.split(':')
if head == 'foaf:Person' and not self.knows_mode:
if self.final_subject:
for element in self.person_elements:
element.subject = self.final_subject
self.elements.append(element)
self.process_person = False
if head == 'foaf:nick':
if self.knows_mode:
self.knows_nick = self.object
else:
self.final_subject = self.object
if rdf == 'foaf' and self.process_person:
if self.knows_mode:
# TODO - get eid (nick) from url?
if not self.knows_nick:
self.knows_elements.append(RDFStatementElement('',
self.predicate,
self.knows_nick or self.object))
else:
self.predicate = self.predicate.replace('seeAlso', 'isDescribedIn')
self.elements.append(RDFStatementElement(self.knows_nick, self.predicate, self.object))
else:
self.person_elements.append(RDFStatementElement(self.subject, self.predicate, self.object))
elif head == "rdf:Description" and not self.process_person:
self.elements.append(RDFStatementElement(self.subject, self.predicate, self.object))
if head == 'foaf:knows':
self.knows_mode = False
for element in self.knows_elements:
if self.knows_nick:
element.subject = self.knows_nick
else:
element.subject = self.final_subject
element.predicate = 'knows_ref'
self.elements.append(element)
self.knows_elements = []
self.knows_nick = ''
def endPrefixMapping(self, prefix):
self._current_context = self._ns_contexts.pop()
def endDocument(self):
return self.elements
def characters(self, content):
self.object += content.strip()
def statements_to_xml_rdf(statements):
return '''
%s
''' % '\n\n'.join([stmt.as_rdf_xml() for stmt in statements])
def text_to_rule(text):
"""create and return a RDFRuleElement from a specially formated text
"""
rule = RDFRuleElement()
rule.implies = []
variables = {}
text = text.replace(',', ' and ')
if text.startswith('rule:'):
text = text[5:]
triplet = []
for each in text.strip().split():
if each == 'if':
if len(triplet) != 3:
raise IncorrectRuleDefinitionException(
'Length of triplet is not 3\n%s' % triplet)
rule.statement = RDFStatementElement(*triplet)
triplet = []
elif each == 'and':
if len(triplet) != 3:
raise IncorrectRuleDefinitionException(
'Length of triplet is not 3\n%s' % triplet)
rule.implies.append(RDFStatementElement(*triplet))
triplet = []
else:
triplet.append(each)
else:
if len(triplet) != 3:
raise IncorrectRuleDefinitionException(
'Length of triplet is not 3\n%s' % triplet)
rule.implies.append(RDFStatementElement(*triplet))
return rule
FOAFNICK_RGX = re.compile('(.*?)')
def get_nick_from_url(url):
""" gets a nick from a foaf file in given url """
# FIXME had thread to do a timeout
other_foaf = urlopen(url).read()
match = FOAFNICK_RGX.search(other_foaf)
if match:
return match.group(1)
return ''
DEFAULT_RDF_KB_FILE_URL = 'file:$NARVAL_HOME/data/kb.rdf'
def rdf_kb_file(obj=None):
"""return the path to the knowledge file and it's encoding
:param obj: object adaptable to IURL
:rtype: tuple(str, str)
"""
return url_to_file(obj, DEFAULT_RDF_KB_FILE_URL)
# actions definitions start here ##############################################
MOD_XML = '''
''' % AL_NS
def act_xmlrdf2stmts(inputs):
cmd = inputs['command']
handler = RDFHandler()
parser = make_parser()
parser.setContentHandler(handler)
close = True
if cmd.args:
if cmd.args[0] == 'string':
stream = InputSource()
stream.setByteStream(StringIO(cmd.args[1]))
close = False
else:
# cmd.args[0] == 'url'
# TODO - use args of cmd to specify file location
# elif cmd.args:
raise Exception('import rdf from specified file not implemented yet')
# elif inputs.has_key('to-read') and inputs['to-read']:
# openable = IOpen(inputs['to-read'])
# stream = openable.open()
else:
raise Exception('no rdf source given !')
parser.parse(stream)
if close:
stream.close()
return {'statements' : handler.elements}
MOD_XML = MOD_XML + """
produce a list of IRDFStatement elements from XML RDF
ICommand(elmt).name == 'kb_rdf_import'
IRDFStatement(elmt)
"""
def act_stmts2xmlrdf(inputs):
rdf_xml_string = statements_to_xml_rdf(inputs['statements'])
f = FileElement()
f.address, f.encoding = rdf_kb_file(inputs['rdf-kb'])
f.data = rdf_xml_string
f.mode = 'w'
return {'xmlrdf': f}
MOD_XML = MOD_XML + """
produce XML-RDF from a list of IRDFStatement elements
IRDFStatement(elmt)
elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:rdf-kb'
IURL(elmt)
IFile(elmt)
"""
def act_ask_if_user_is_a_narval(inputs):
presence = inputs['presence']
return {'answer' : presence.build_reply('are you a narval?')}
MOD_XML = MOD_XML + """
ask if user sending its presence is an agent
IIPresence(elmt)
IIMessage(elmt)
"""
def act_state_you_are_a_narval(inputs):
msg = IIMessage(inputs['msg'])
ctrl = inputs['control']
master_infos = inputs['masterinfos']
myuser = master_infos.jabberid
if ctrl and myuser:
reply = 'I\'m %s\'s narval' % myuser.strip()
else:
reply = 'I\'m a narval'
return {'answer': msg.build_reply(reply)}
MOD_XML = MOD_XML + """
create a special message indicating we are a narval agent
IIMessage(elmt).type == 'incoming'
isinstance(elmt, BotConfigurationElement)
isinstance(elmt, MasterInformationsElement)
IIMessage(elmt).type == 'outgoing'
"""
def act_is_a_known_user_stmt(inputs):
result = []
presence = inputs['presence']
if presence.is_from_groupchat():
# fail if this presence come from a conference room
return {}
return {'query-stmt': RDFStatementElement(presence.get_from_user(), 'is_a', None)}
MOD_XML = MOD_XML + """
Make an RDF statement to query a kb if the presence is coming from a known user
IIPresence(elmt).get_type() in ('available', 'subscribe')
IRDFStatement(elmt)
"""
def act_is_a_narval_stmt(inputs):
"""create an RDF statement to query a kb if the presence is coming
from a known narval
"""
msg = IIMessage(inputs['msg'])
return {'query-stmt': RDFStatementElement(msg.get_from_user(), 'is_a', 'narval')}
MOD_XML = MOD_XML + """
%s
IIMessage(elmt).type == 'incoming'
IRDFStatement(elmt)
""" % act_is_a_narval_stmt.__doc__
def act_is_a_human_stmt(inputs):
"""create an RDF statement to query a kb if the presence is coming
from a known human
"""
msg = IIMessage(inputs['msg'])
return {'query-stmt': RDFStatementElement(msg.get_from_user(), 'is_a', 'human')}
MOD_XML = MOD_XML + """
%s
IIMessage(elmt).type == 'incoming'
IRDFStatement(elmt)
""" % act_is_a_human_stmt.__doc__
def act_stmts_for_user_info(inputs):
cmd = inputs['command']
msg = cmd.from_msg
from_user = str(msg.get_from_user())
is_a = RDFStatementElement(from_user, 'is_a', cmd.args[0])
stmts = [is_a]
if len(cmd.args) > 1:
assert is_a.object == 'narval'
agent_of = cmd.args[1]
stmts.append(RDFStatementElement(from_user, 'narval_of', agent_of))
# FIXME: I guess we can deduce this, but who knows an agent won't
# have it's own agent someday...
stmts.append(RDFStatementElement(agent_of, 'is_a', 'human'))
return {'statement': stmts}
MOD_XML = MOD_XML + """
Make RDF statements for info concerning a user
ICommand(elmt).name == 'kb_add_user_info'
IRDFStatement(elmt)
"""
## def act_search_stmts_from_setup_conf(inputs):
## # XXX finish me
## result = []
## for guest in inputs['cmd'].args[1].strip().split():
## result.append(RDFStatementElement(None, 'narval_of', guest))
## return {'query-stmts': result}
## MOD_XML = MOD_XML + """
##
## Make an RDF statement to query a kb if the presence is coming from a known user
##
## IIPresence(elmt).get_type() in ('available', 'subscribe')
##
##
## IRDFStatement(elmt)
##
## """
def act_rule_from_text(inputs):
cmd = inputs['command']
msg = cmd.from_msg
try:
rule = text_to_rule(cmd.args[0])
except IncorrectRuleDefinitionException, ex:
error = create_error(str(ex), type='bad rule definition', from_msg=msg)
return {'error': error}
return {'rule':rule}
MOD_XML = MOD_XML + """
Add an inference rule to the knowledge base
ICommand(elmt).name == 'kb_add_rule'
IRDFRule(elmt)
"""
MOD_XML += ""