# Copyright (c) 2000-2004 LOGILAB S.A. (Paris, FRANCE). # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # Copyright (c) 2004 DoCoMo Euro-Labs GmbH (Munich, Germany). # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Entertainment related actions (imdb/allocine/xmltv) :version: $Revision:$ :author: Logilab :copyright: 2000-2004 LOGILAB S.A. (Paris, FRANCE) 2004 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = '$Id: $' from sets import Set from os.path import getmtime from narval.public import AL_NS, expand_vars from narval.elements import create_error from narval.extensions.xmltv import read_programmes#, read_channels, read_data # XMLTV listings for france can be extracted from telecable by tcl/tk script # find it at : http://www.lahiette.com/biboobox/?XMLTV/Grabber+TCL%2FTK DEFAULT_XMLTV = '$NARVAL_HOME/data/xmltv.xml' STORED_XMLTV = {} READING_XMLTV_LOCK = False class AlreadyReadingXmltvError(Exception): pass class WrongInputError(Exception): pass class SearchCentral: def __init__(self): self.file_refs = {} self.available_search_engines = ['imdb', 'allocine', 'xmltv'] self.listing_engines = ['allocine'] def search(self, type, text): search_functions = {'imdb' : search_movie_imdb, 'allocine' : search_movie_allocine, 'xmltv' : search_movie_xmltv} search_func = search_functions[type] if self.file_refs.has_key(type): return search_func(text, self.file_refs[type]) return search_func(text) def search_complete(self, type, text): # do the search and fill at the end. pass def get_complete(self, type, eid): return eval('get_full_info_%s(eid)' % type) def add_file_ref(self, key, file_ref): # FIXME do we want multiple adresses for this? if self.file_refs.has_key(key): if self.file_refs[key] != file_ref: log(LOG_NOTICE, 'replace file reference for'\ '%s from %s to %s' % (key, self.file_refs[key], file_ref)) self.file_refs[key] = file_ref def get_available_engines(self): return self.available_search_engines def search_listing(self, film_elmt, postcode): if film_elmt.type not in self.listing_engines: return 'No listings available for %s' % film_elmt.type return eval('search_listing_%s(film_elmt, postcode)' % film_elmt.type) class FilmElement: def __init__(self, type = None, title = '', eid = 0, year = ''): self.title = title self.eid = eid self.type = type self.url = '' self.year = year self.info = {} self.imgs = [] def one_line_representation(self): return self.title + (self.year and '(%s)' % self.year) def representation(self): try: return eval('self.representation_%s()' % self.type) except : return '\n'.join([self.one_line_representation(), 'url : %s' % self.url, '']) def representation_xmltv(self): return '\n'.join([self.one_line_representation(), 'date: %s', self.info.get('date'), 'starts: %s ends: %s' (self.info.get('start'), self.info.get('end')), '']) # *allocine - freevo* from narval.extensions.fxdallocine import FxdAllocine # CODES ALLOCINE ALLOCINE_TOUT = 0 ALLOCINE_FILMS = 1 ALLOCINE_SALLES = 2 ALLOCINE_LOCALITES = 4 ALLOCINE_STARS = 2 #FIXME - have only one instance of FxdAllocine? def search_movie_allocine(name): """ search for a movie by name in allocine.fr""" api = FxdAllocine() results = api.searchAllocine(name, ALLOCINE_FILMS) return [FilmElement('allocine', x[1], x[0], x[2]) for x in results] def get_full_info_allocine(eid): """ get full info about the referenced film """ api = FxdAllocine() api.setAllocineId(eid) film = FilmElement('allocine', api.title, eid) film.url = api.url film.info = api.info film.imgs = api.image_urls film.year = api.info.get('year','') return film def search_listings_movie(film_elmt, postcode): api = FxdAllocine() api.setAllocineListings(film_elmt.info['cprojection'], postcode) return api.listings #*imdb - freevo* from narval.extensions.fxdimdb import FxdImdb def search_movie_imdb(name): """ search for a movie by name in IMDb""" api = FxdImdb() results = api.searchImdb(name) return [FilmElement('imdb', x[1], x[0], x[2]) for x in results] def get_full_info_imdb(eid): """ get full info about the referenced film """ api = FxdImdb() api.setImdbId(eid) film = FilmElement('imdb', api.title, eid) film.url = api.url film.info = api.info film.imgs = api.image_urls return film # * xmltv * def search_movie_xmltv(name, file_item=DEFAULT_XMLTV): """ search for a movie in xmltv file """ initialize_xmltv(file_item) programmes = STORED_XMLTV[expand_vars(file_item)][1] results = [] # FIXME - could probably do this by parsing XML for each in programmes: for each_title, _ in each['title']: if each_title.find(name) != -1: results.append(each) return [FilmElement('xmltv', x.get('title')[0][0], x.get(''), x.get('year','')) for x in results] def initialize_xmltv(file_item=DEFAULT_XMLTV): """ initializes the programmes item in memory (global variable) """ global READING_XMLTV_LOCK log(LOG_DEBUG, file_item) filename = expand_vars(file_item) content = open(expand_vars(file_item)) if STORED_XMLTV.has_key(filename): timestamp, programmes = STORED_XMLTV[filename] if timestamp < getmtime(filename) and not READING_XMLTV_LOCK: READING_XMLTV_LOCK = True log(LOG_WARN, 'READING XMLTV FILE') programmes = read_programmes(content) READING_XMLTV_LOCK = False STORED_XMLTV[filename] = (getmtime(filename), programmes) else: log(LOG_WARN, 'USING XMLTV FILE IN MEMORY') else: if not READING_XMLTV_LOCK: log(LOG_WARN, 'READING XMLTV FILE ORIGINAL') READING_XMLTV_LOCK = True programmes = read_programmes(content) READING_XMLTV_LOCK = False else: raise AlreadyReadingXmltvError STORED_XMLTV[filename] = (getmtime(filename), programmes) def get_full_info_xmltv(eid): # FIXME TODO return FilmElement('xmltv', 'n\a') def format_results(results): index = 1 formatted_lines = [] for result in results: formatted_lines.append('%s) [%s] %s' % (index, result.type, result.one_line_representation())) index += 1 if index >= 7: formatted_lines.append('more results available... refine your search') break return '\n'.join(formatted_lines) def extract_refining_search(search_string, engines): """ find in search string if there are references to specific engines, if so restrict the searches to these eg. 'Monty Python' -> will search with all search engines available 'imdb Monty Python' -> will search will only imdb """ engine_set = Set(engines) search_seq = search_string.split() search_set = Set(search_seq) inter = search_set.intersection(engine_set) if len(inter): for item in inter: search_seq.remove(item) #' '.join([str(x) for x in search_set.difference(engines)]) return [x for x in inter], ' '.join(search_seq) return engines, search_string # TODO # o record location (postcode?) # o record favourite cinemas (allocine ids) # o narval resever ta place sur allocine ! (ssl...) # o quoi de neuf cette semaine? # o converti horaires et restriction de jour pour films rares - # et propose de les inserer dans le PIM MOD_XML = ''' ''' % AL_NS def act_search_film(inputs): """ action that searches for a film in IMDB/allocine/XMLTV """ msg = inputs['msg'] cmd = inputs.get('command',None) tail = '' #FIXME TODO - have threads (concurent web access) search_central = SearchCentral() asked_searches, search_str = extract_refining_search(' '.join(cmd.args), search_central.get_available_engines()) results = [] for engine in asked_searches: # to have a file_ref added to the search engine the name # in the inputs has to correspond to the name of the engine if inputs.get('%s-file' % engine, None): search_central.add_file_ref(engine, inputs['%s-file' % engine]) try: results += search_central.search(engine, search_str) except AlreadyReadingXmltvError: tail = '\n - for xmltv results, narval is currently reading xmltv' \ 'file (can take a while), try again later.' msg.in_discussion.film_results = results if len(results) == 1: return {'answer':msg.build_reply(results[0].representation() + tail)} formatted = format_results(results) return {'answer':msg.build_reply(formatted + tail+ '\n\'select #\' for further info')} MOD_XML = MOD_XML + """ Process a chat sentence to extract a command IIMessage(elmt).type == 'incoming' ICommand(elmt) IURL(elmt) elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:xmltv-file' IIMessage(elmt).type == 'outgoing' """ def act_init_xmltv(inputs): """ action that searches for a film in IMDB/allocine/XMLTV """ msg = inputs['msg'] try: inputs.get('xmltv-file', None) and initialize_xmltv(inputs['xmltv-file']) or initialize_xmltv() except AlreadyReadingXmltvError: return {'answer':msg.build_reply('narval is already loading xmltvfile')} return {'answer':msg.build_reply('xmltv ready to be used')} MOD_XML = MOD_XML + """ IIMessage(elmt).type == 'incoming' IURL(elmt) elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:xmltv-file' IIMessage(elmt).type == 'outgoing' """ def act_select_film(inputs): """ action that searches for a film in IMDB/allocine/XMLTV """ msg = inputs['msg'] cmd = inputs['command'] previous_results = getattr(msg.in_discussion, 'film_results', None) if not previous_results: return {'answer':msg.build_reply('Can\'t select anything. Do a search first.')} index = int(cmd.args[0]) if index > len(previous_results): return {'answer':msg.build_reply('Please select within the given bounds'\ ' : [1-%s]' % len(previous_results))} film_elmt = previous_results[index-1] search_central = SearchCentral() movie = search_central.get_complete(film_elmt.type, film_elmt.eid) return {'answer' : msg.build_reply(movie.representation())} MOD_XML = MOD_XML + """ IIMessage(elmt).type == 'incoming' ICommand(elmt) and elmt.name == 'select_film' IURL(elmt) elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:xmltv-file' IIMessage(elmt).type == 'outgoing' """ def add_fav_cinema_to_kb(inputs): return {} def record_seen_movie(inputs): # so it doens't offer these films # unique key : type-eid-title return {} def what_s_on_tonight(inputs): return {} def act_record_postcode(inputs): """record a postcode and do the search that was asked for before postcode was asked if existant """ cmd = inputs['command'] if len(cmd.args[0]) != 5: return {'error': create_error('%s is not a valid postcode' % cmd.args[0], type='chat', from_msg=cmd.from_msg)} ctrl = inputs['control'] ctrl.postcode = cmd.args[0] return {} MOD_XML = MOD_XML + """ %s isinstance(elmt, BotConfigurationElement) and not elmt.postcode ICommand(elmt).name == 'enter_postcode' """ % act_record_postcode.__doc__ def act_ask_postcode(inputs): """ask for a postcode if not present""" msg = inputs['msg'] return {'answer': msg.build_reply('What is your postcode?')} MOD_XML = MOD_XML + """ %s IIMessage(elmt).type == 'incoming' isinstance(elmt, BotConfigurationElement) and not getattr(elmt, 'postcode', None) IIMessage(elmt).type == 'outgoing' """ % act_ask_postcode.__doc__ MOD_XML += ""