#!/usr/bin/env python # -*- coding: latin-1; -*- # # PgWorksheet - PostgreSQL Front End # http://pgworksheet.projects.postgresql.org/ # # Copyright © 2004-2005 Henri Michelon & CML http://www.e-cml.org/ # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details (read LICENSE.txt). # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # $Id: pgworksheet,v 1.88 2006/01/10 21:31:35 hmichelon Exp $ # import os import sys import string import gettext import locale import ConfigParser #this is needed for py2exe if sys.platform == 'win32': #win32 platform, add the "lib" folder to the system path os.environ['PATH'] += ";gtk/bin;gtk/lib;postgresql;" import pygtk else: import pygtk #not win32, ensure version 2.0 of pygtk is imported pygtk.require('2.0') import gtk import pgw import pgw.UI import pgw.RunSQL import pgw.Execute import pgw.DBConnection # maximum entries in the SQL queries history PGW_MAX_HISTORY = 100 # maximum entries in the connection parameters history PGW_MAX_CONNECTION_HISTORY = 5 class PgWorksheet: def __init__(self, app_name, app_version, pixmap_path, locale_path): # Initialize I18N if (pgw.mswindows()): try: # try to get the default language lang = gettext.translation(app_name, locale_path, [ locale.getdefaultlocale()[0] ]) lang.install() except IOError: # fallback to the default method gettext.bindtextdomain(app_name, locale_path) gettext.textdomain(app_name) gettext.install(app_name, locale_path, unicode=1) else: gettext.bindtextdomain(app_name, locale_path) gettext.textdomain(app_name) gettext.install(app_name, locale_path, unicode=1) # Build UI self.ui = pgw.UI.UI(self, app_name, app_version, pixmap_path) # Default connection state : not connected self.db = None self.disconnect() # Initialize prev/next query lifos self.prev_statements = [] self.next_statements = [] self.load_history() self.prev_saved = 1 # Display connection dialog on startup self.on_menu_connect(None) # Start application gtk.main() def add_prevstatement(self, sql): """Add a query to the previous queries lifo""" # do not add the same query two times if (len(self.prev_statements) > 0): prev = self.prev_statements[len(self.prev_statements)-1] if (prev == sql): return # add the query to the lifo self.prev_statements.append(sql) self.ui.enable_prevsql(len(self.prev_statements) > 0) def get_history_path(self): """Returns the path to the configuration file""" return os.path.join(pgw.get_user_configdir(), '.pgworksheet_history'); def save_history(self): """Save the history in a text file""" try: fd = open(self.get_history_path(), 'w') self.add_prevstatement(self.ui.get_sqlbuffer_text()) for sql in self.prev_statements: fd.write("\n#$#\n") fd.write(string.rstrip(sql)) for sql in self.next_statements: fd.write("\n#$#\n") fd.write(string.rstrip(sql)) fd.write("\n#$#\n") fd.close() except IOError: pass def load_history(self): """Load the history from a text file""" try: fd = open(self.get_history_path(), 'r') sql = '' count = 0 for line in fd: line = string.rstrip(line) if (line == '') : continue try: line = unicode(line, 'UTF-8') except UnicodeDecodeError: try: line = unicode(line, pgw.get_user_encoding()) except UnicodeDecodeError: pass if (line == '#$#'): if (len(sql) > 0): self.prev_statements.append(sql) count = count + 1 sql = '' continue sql += line sql += '\n' if (len(sql) > 0): self.prev_statements.append(sql) fd.close() if (count > PGW_MAX_HISTORY): self.prev_statements = self.prev_statements[count - PGW_MAX_HISTORY : count] self.ui.enable_prevsql(len(self.prev_statements) > 0) except IOError: pass def is_connected(self): """Return TRUE if connected to a database""" if (self.db is None): return None else: return self.db.is_connected() def connect(self, host = None, port = None, db = None, user = None, password = None): """Connect to a database""" self.ui.wndmain.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH)) self.ui.status(_('Trying to connect as %(user)s to %(db)s on %(host)s') % {'user':user, 'db':db, 'host':host}, _('connecting...')) while (gtk.events_pending() == True): gtk.main_iteration_do(False) # Disconnect the re-connect self.disconnect() self.db = pgw.DBConnection.DBConnection(host, port, db, user, password) # we are connected if (self.is_connected()): # update the UI to reflect the connection state self.ui.enable_disconnect() self.ui.enable_runsql() self.ui.status(_('connected as %(user)s to %(db)s on %(host)s') % {'user':user, 'db':db, 'host':host}, self.db.pgversion()) new_conn = "%s,%s,%s,%s" % (host, port, db, user) # update the connection history n = 0 for conn in self.all_connections: # remove the connection from the history if it already exists if (conn == new_conn): self.all_connections.pop(n) break n = n + 1 # add the connection to the history, making it the first of the list self.all_connections.insert(0, new_conn) # save the connection history in the config file cp = ConfigParser.ConfigParser() try: cp.readfp(open(pgw.get_config_path(), 'r')) except IOError: pass if (not cp.has_section("connections")): cp.add_section("connections") n = 0 while ((n <= PGW_MAX_CONNECTION_HISTORY) and (n < len(self.all_connections))): cp.set("connections", "conn%d" % (n + 1), self.all_connections[n]) n = n + 1 try: cp.write(open(pgw.get_config_path(), 'w')) except IOError: pass # ready to type queries, give the focus to the text field self.ui.setfocus_sqlbuffer() # initialize the objects used to execute the queries self.execute = pgw.Execute.Execute(self.db) self.run = pgw.RunSQL.RunSQL(self.execute, self.ui.sqlview, self.ui.resulttab, self.ui.status_result) self.ui.wndmain.window.set_cursor(None) def disconnect(self): """Disconnect from the current database""" # disconnect from the database if (self.is_connected()): self.db.disconnect() # destroy the objects used for this connection self.db = None self.execute = None self.run = None # update the UI to reflect the connection state self.ui.status(_('not connected'), 'PgWorksheet v' + app_version) self.ui.enable_disconnect(False) self.ui.enable_runsql(False) def on_wndmain_destroy(self, widget): """Called when the application quits""" self.disconnect() self.save_history() gtk.main_quit() sys.exit(0) def on_wndmain_delete(self, widget, event): """Called when the user wants to close the main window""" return False def on_menu_connect(self, widget): """Called when the user want the connection dialog box""" # fill the connection dialog box with default parameters try: self.username = os.environ['USERNAME'] except KeyError: try: self.username = os.environ['USER'] except KeyError: pass host = 'localhost' port = '5432' database = 'template1' username = 'pgsql' self.display_connect_dialog(host, port, username, database, 0) def display_connect_dialog(self, host, port, username, database, overwrite_entry): # display and execute the connection dialog box params = self.ui.connect_dialog(self, host, port, username, database, overwrite_entry) # check if the user have clicked "Cancel" if (params is not None): # connect to the database host, port, username, passwd, database = params; self.connect(host, port, database, username, passwd) # error connecting to the database, retry if (not self.is_connected()): self.ui.error_box(_('Error connecting to %s:%s@%s:%s') % (username, database, host, port)) self.display_connect_dialog(host, port, username, database, 1) def on_dlgconnect_map(self, widget): """Called when the connection dialog box is displayed""" # clear the connections history self.all_connections = [] # load the connections history from the config file cp = ConfigParser.ConfigParser() try : cp.readfp(open(pgw.get_config_path(), 'r')) n = 1 while n <= PGW_MAX_CONNECTION_HISTORY: try: line = cp.get("connections", "conn%d" % n) # add the connection to the connections history self.all_connections.append(line) host, port, db, user = string.split(line, ',') # add the connections to the connections history list of the dialog box self.ui.storeconn.append(["%s:%s@%s" % (user, db, host), line]) n = n + 1 except: break # if we have at least one connection in the history, made it the default if (n > 1) : # select the last used connection self.ui.viewconn.set_cursor(self.ui.storeconn.get_path( self.ui.storeconn.get_iter_first())) except IOError: pass def on_dlgconnect_change(self, treeview): """Called when the user choose a connection in the connection history list""" # fill the connection dialog with the selected connection parameters model, iter = treeview.get_selection().get_selected() host, port, db, user = string.split(model.get(iter, 1)[0], ',') self.ui.entry_host.set_text(host) self.ui.entry_port.set_text(port) self.ui.entry_database.set_text(db) self.ui.entry_user.set_text(user) self.ui.entry_password.set_text('') def on_menu_disconnect(self, widget): """Called when the user wants to disconnect from the database""" self.disconnect() def on_menu_opensql(self, widget): """The user wants to open a file with some queries""" filename = self.ui.file_dialog(_('Select a SQL text file')); if (filename is not None): self.ui.undo.lock = True for handler in self.ui.buffer_handlers: self.ui.sqlbuffer.handler_block(handler) self.ui.set_sqlbuffer_text('') try: input = open(filename, 'r') for line in input: try: self.ui.sqlbuffer.insert_at_cursor(unicode(line, 'UTF-8')) except UnicodeDecodeError: try: self.ui.sqlbuffer.insert_at_cursor(unicode(line, pgw.get_user_encoding())) except UnicodeDecodeError: self.ui.sqlbuffer.insert_at_cursor(line) except IOError: self.ui.error_box(_('Error while opening or reading from %s') %filename) for handler in self.ui.buffer_handlers: self.ui.sqlbuffer.handler_unblock(handler) self.ui.undo.reset() self.ui.undo.lock = False self.ui.syntax.refresh() pgw.set_proportional(self.ui.sqlbuffer) def file_overwrite(self, title): """Display a "Save As" dialopg box and prompt a confirmation if the selected file exists""" filename = self.ui.file_dialog(title, gtk.FILE_CHOOSER_ACTION_SAVE, gtk.STOCK_SAVE_AS); if (filename is not None): try: os.stat(filename) if (self.ui.yesno_box(_('%s already exists, overwrite ?') % filename) == gtk.RESPONSE_YES): return filename return self.file_overwrite(title) except OSError: # file does not exists return filename return None def on_menu_savesql(self, widget): """The user wants to save his queries""" filename = self.file_overwrite(_('Save SQL queries')) if (filename is not None): try: output = open(filename, 'w') output.write(self.ui.get_sqlbuffer_text()) except IOError: self.ui.error_box(_('Error while creating or writing %s') % filename) def save_list_row(self, model, path, iter, output): """Save a row of a TreeView in a tabular form""" col = 0 while (col < model.get_n_columns()): val = string.replace(model.get_value(iter, col), '"', '\"') output.write('"' + val + '"') col = col + 1 if (col < model.get_n_columns()): output.write('\t') output.write('\n') def saveresults(self, widget, output): """Save the content of a TreeView to a tab separated file""" widget = widget.get_child() if (isinstance(widget, gtk.TextView)): buffer = widget.get_buffer() output.write(buffer.get_text(buffer.get_start_iter(), buffer.get_end_iter())) elif (isinstance(widget, gtk.TreeView)): widget.get_model().foreach(self.save_list_row, output) def on_menu_saveallresults(self, widget): """The user wants to save ALL the results""" if (self.ui.resulttab.get_n_pages() > 0): filename = self.file_overwrite(_('Save all the results')) if (filename is not None): try: output = open(filename, 'w') page = 0 while page < self.ui.resulttab.get_n_pages() : self.saveresults(self.ui.resulttab.get_nth_page(page), output) page = page + 1 except IOError: self.ui.error_box(_('Error while creating or writing %s') % filename) def on_menu_saveresults(self, widget): """The user wants to save the current result""" if (self.ui.resulttab.get_n_pages() > 0): filename = self.file_overwrite(_('Save the results')) if (filename is not None): try: output = open(filename, 'w') self.saveresults(self.ui.resulttab.get_nth_page( self.ui.resulttab.get_current_page()), output) except IOError: self.ui.error_box(_('Error while creating or writing %s') % filename) def on_menu_cut(self, widget): """Cut text to the clipboard""" w = self.ui.wndmain.get_focus() if (isinstance(w, gtk.TextView)): w.emit('cut-clipboard') def on_menu_copy(self, widget): """Copy text to the clipboard""" w = self.ui.wndmain.get_focus() if (isinstance(w, gtk.TextView)): w.emit('copy-clipboard') elif (isinstance(w, gtk.TreeView)): model, iter = w.get_selection().get_selected() if (iter is not None): col = 0 result = '' while (col < model.get_n_columns()): val = string.replace(model.get_value(iter, col), '"', '\"') result = result + val col = col + 1 if (col < model.get_n_columns()): result = result + '\t' clip = gtk.Clipboard() clip.set_text(result) def on_menu_paste(self, widget): """Paste from the clipboard""" w = self.ui.wndmain.get_focus() if (isinstance(w, gtk.TextView)): w.emit('paste-clipboard') def on_menu_selectall(self, widget): """Select the entire text""" w = self.ui.wndmain.get_focus() if (isinstance(w, gtk.TextView)): buffer = w.get_buffer() buffer.move_mark_by_name('selection_bound', buffer.get_start_iter()) buffer.move_mark_by_name('insert', buffer.get_end_iter()) def on_sqlview_focus_in(self, widget, event): self.ui.enable_cut() self.ui.enable_paste() def on_sqlview_focus_out(self, widget, event): self.ui.enable_cut(False) self.ui.enable_paste(False) def on_sqlview_keypress(self, widget, event): """Save the last statement in the history if needed (after an execution""" if (event is None) : return if (event.keyval != 65507): if (self.prev_saved == 0): self.add_prevstatement(self.ui.get_sqlbuffer_text()) self.prev_saved = 1 def on_menu_about(self, widget): self.ui.about_dialog() def on_menu_runsql(self, widget): """Execute the SQL queries""" if (not self.is_connected()): if (self.ui.yesno_box(_('Not connected to a database.\nDo you want to connect now ?')) == gtk.RESPONSE_NO): return self.on_menu_connect(widget) if (not self.is_connected()): return self.on_text_change(widget) self.prev_saved = 0 self.ui.wndmain.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH)) self.run.run() self.ui.enable_saveresult(self.ui.resulttab.get_n_pages() > 0) self.ui.wndmain.window.set_cursor(None) def on_menu_prevsql(self, widget): """Display the previous statement from the history""" self.ui.undo.lock = True if (len(self.prev_statements) > 0): s = self.prev_statements.pop() self.next_statements.append(self.ui.get_sqlbuffer_text()) self.ui.set_sqlbuffer_text(s) self.prev = s self.ui.enable_prevsql(len(self.prev_statements) > 0) self.ui.enable_nextsql(len(self.next_statements) > 0) self.ui.undo.lock = False def on_menu_nextsql(self, widget): """Display the next statement from the history""" self.ui.undo.lock = True if (len(self.next_statements) > 0): s = self.next_statements.pop() self.prev_statements.append(self.ui.get_sqlbuffer_text()) self.ui.set_sqlbuffer_text(s) self.prev = s self.ui.enable_prevsql(len(self.prev_statements) > 0) self.ui.enable_nextsql(len(self.next_statements) > 0) self.ui.undo.lock = False def on_text_change(self, widget): """The text have been changed after navigation the history""" if (self.ui.undo.lock): return if (len(self.next_statements) > 0): if (self.next_statements[0] == ''): self.next_statements.pop(0) self.prev_statements.append(self.prev) for i in reversed(self.next_statements): self.prev_statements.append(i) self.next_statements = [] self.ui.enable_prevsql(len(self.prev_statements) > 0) self.ui.enable_nextsql(len(self.next_statements) > 0) # Application parameters app_name = 'pgworksheet' app_version = '1.8.1' # Default pixmap path pixmap_path = '/usr/X11R6/share/pixmaps/pgworksheet' # Find current pixmap path if (not os.access(os.path.join(pixmap_path, 'pgworksheet-32.png'), os.F_OK)): pixmap_path = os.path.join(sys.prefix, 'share/pixmaps/pgworksheet') if (not os.access(os.path.join(pixmap_path, 'pgworksheet-32.png'), os.F_OK)): pixmap_path = os.path.join(os.path.dirname(sys.argv[0]), 'pixmaps/pgworksheet') # Find current locale path locale_path = '@LOCALE_PATH@' if (not os.access(os.path.join(locale_path, 'fr/LC_MESSAGES/pgworksheet.mo'), os.F_OK)): locale_path = os.path.join(sys.prefix, 'share/locale') if (not os.access(os.path.join(locale_path, 'fr/LC_MESSAGES/pgworksheet.mo'), os.F_OK)): locale_path = os.path.join(os.path.dirname(sys.argv[0]), 'locale') # Start program p = None try: p = PgWorksheet(app_name, app_version, pixmap_path, locale_path) except KeyboardInterrupt: if (p is not None): p.on_wndmain_destroy(None)