# Copyright (c) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany). # Copyright (c) 2001-2005 LOGILAB S.A. (Paris, FRANCE). # # http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com # http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """narval introspection related actions :version: $Revision:$ :author: Logilab :copyright: 2000-2005 LOGILAB S.A. (Paris, FRANCE) 2004-2005 DoCoMo Euro-Labs GmbH (Munich, Germany) :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr http://www.docomolab-euro.com/ -- mailto:tarlano@docomolab-euro.com """ __revision__ = "$Id: Basic.py,v 1.70 2002/09/20 14:12:35 syt Exp $" __docformat__ = 'restructuredtext en' import os import time import smtplib, mailbox, mimetools from cStringIO import StringIO from narval.public import AL_NS, url_to_file from narval.elements.email import EmailElement def mbox_file(obj): """return the path to the plans log's file and it's encoding :param obj: object adaptable to IURL or None :rtype: str, path to the mbox """ return url_to_file(obj)[0] MOD_XML = ''' ''' % AL_NS def act_read_new_mails(inputs): """output email element for new mails in a unix mail box add an X-Narval mark in headers of the mbox. All attachments except text/plain are discarded (we don't want a 20Mo file in Narval memory!) and saved into a dedicated directory (FIXME: not yet implemented) FIXME: this action has serious potential concurency problem : we may loose email written by an external program while this actions is running We should probably write id of readen messages in another file to avoid mbox rewriting """ mailbox_path = inputs['mbox'].path() ## attachments_dir = inputs['attachmentsdir'] ## attachments_dir = attachments_dir and attachments_dir.path() last_check = getattr(inputs['mbox'], 'mbox_check_date', None) last_modif = os.stat(mailbox_path)[8] if last_check is not None and last_check >= last_modif: return {} inputs['mbox'].mbox_check_date = last_modif mbox = UnixMailbox(open(mailbox_path)) list_to_write = [] emails = [] discard_subjects = ("DON'T DELETE THIS MESSAGE -- FOLDER INTERNAL DATA",) msg = mbox.next() while msg: # skip internal folder data FIXME: pine specific ? if msg and msg.get('Subject') in discard_subjects: msg = mbox.next() continue # add separator list_to_write.append('From narval %s\n' \ % time.asctime(time.localtime())) list_to_write = list_to_write + msg.headers # already processed ? if msg.get('X-Narval') : list_to_write.append('\n') else: list_to_write.append('X-Narval: I read that one :-)\n') list_to_write.append('\n') m = EmailElement.from_rfc822(msg)#, attachments_dir) m.type = 'incoming' emails.append(m) # write message's body msg.rewindbody() list_to_write = list_to_write + msg.fp.readlines() # go to the next message msg = mbox.next() # replace mbox content with marked content if list_to_write: open(mailbox_path, 'w').writelines(list_to_write) return {'mails': emails} MOD_XML = MOD_XML +""" %s Mailbox to process elmt.getattr((TYPE_NS, 'name')) == 'uri:memory:mbox' IURL(elmt) readen email elements isinstance(elmt, EmailElement) """ % act_read_new_mails.__doc__ def act_send_mail(inputs) : email = inputs['email'] try: server_addr = inputs['server'].name except: server_addr = 'mail' ## attachs = [] ## for a in args['attachments']: ## attachs = select_one(args, 'url/text()') buf = StringIO() frm = "%s <%s>" % (email.from_name, email.from_address) envelope_dest = email.to.split(', ') if email.cc: envelope_dest += email.cc.split(', ') buf.write('From: %s\r\nTo: %s\r\nCc: %s\r\nSubject: %s\r\n' % (frm, email.to, email.cc, email.subject)) else: buf.write('From: %s\r\nTo: %s\r\nSubject: %s\r\n' % (frm, email.to, email.subject)) # mail body ## mp = body.getAttributeNS(NO_NS, 'multipart') ## if not mp == 'yes': ## if body.firstChild: ## text = '\n'+ body.firstChild.data.encode(encod) ## else: ## text = '' ## if attachs or mp =='yes': ## # multipart message ## multipart_doc = MimeWriter.MimeWriter(buf) ## multipart_doc.startmultipartbody('Mixed') ## if text: ## subwriter = multipart_doc.nextpart() ## subwriter.addheader('Content-Transfert-Encoding', get_encoding()) ## if part == parts[0]: ## subody = subwriter.startbody('text/plain') ## subody.write(text) ## if mp == 'yes': ## parts = select_many(body,'part') ## # write down parts ## for part in parts: ## subwriter = multipart_doc.nextpart() ## subwriter.addheader('Content-Transfert-Encoding', ## part.getAttributeNS(NO_NS,'encoding')) ## if text or part != parts[0]: ## file_name = str(string.split(part.getAttributeNS(NO_NS,'name'), '/')) ## subwriter.addheader('Content-Disposition', ## 'attachement; filename = "%s"'%(file_name)) ## subody = subwriter.startbody(part.getAttributeNS(NO_NS,'content_type')) ## subody.write(part.firstChild.data) ## if attachs: ## import urllib ## for a in attachs: ## url = select_one(a, 'url/text()') ## type, encoding = mimetypes.guess_type(url) ## if not encoding: ## encoding = a.getAttributeNS(NO_NS,'encoding') ## if not type: ## type = a.getAttributeNS(NO_NS,'type') ## subwriter = multipart_doc.nextpart() ## if encoding: ## subwriter.addheader('Content-Transfert-Encoding', encoding) ## if text or parts or a != attachs[0]: ## file_name = a.getAttributeNS(NO_NS,'name') or os.path.basename(url) ## subwriter.addheader('Content-Disposition', ## 'attachement; filename = "%s"'%(file_name)) ## if type: ## subody = subwriter.startbody(type) ## # read data ## try: ## f = urllib.urlopen(normalize_url(url)[0]) ## subody.write(f.read) ## except Exception, e: ## output.setdefault('error', []).append(error(doc, msg=e)) ## multipart_doc.lastpart() ## else: buf.write('\n') buf.write(email.body) # signature ## try: ## sign = select_one(email,'signature/text()').data ## except SelectException: ## sign = '\n-- \nNarval your pal' ## buf.write('\r\n\r\n%s' % sign) send = smtplib.SMTP(server_addr) #send.set_debuglevel(1) send.sendmail(frm, envelope_dest, buf.getvalue()) send.quit() return {} MOD_XML = MOD_XML + ''' Take an email as input and send it. Optionaly attach required files. isinstance(elmt, EmailElement) Email to send, with valid headers/body SMTP server where the message will be send (default "mail") isinstance(elmt, ServiceElement) and elmt.getattr((TYPE_NS, "type")) == "smtp" ''' MOD_XML = MOD_XML + "" # Private ##################################################################### class UnixMailbox (mailbox.UnixMailbox) : """wrapper class to fixe a mailbox bug FIXME: no idea if this is still necessary """ def next(self): """fonction qui renvoie un couple compose de la 1ere ligne du document representant le mail ainsi qu'un objet de type mimetools.Message contenant toutes les autres informations """ while 1: self.fp.seek(self.seekp) try: self._search_start() except EOFError: self.seekp = self.fp.tell() return None start = self.fp.tell() self._search_end() self.seekp = stop = self.fp.tell() if start != stop: break return mimetools.Message(mailbox._Subfile(self.fp, start, stop)) def _isrealfromline(self, _): """For explanations, see home.netscape.com/eng/mozilla/2.0/relnotes/demo/content-length.html """ return 1