#!/usr/bin/env python
# -*- coding: latin-1; -*-
#
# PgWorksheet - PostgreSQL Front End
# http://pgworksheet.projects.postgresql.org/
#
# Copyright © 2004-2005 Henri Michelon & CML http://www.e-cml.org/
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details (read LICENSE.txt).
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# $Id: pgworksheet,v 1.88 2006/01/10 21:31:35 hmichelon Exp $
#
import os
import sys
import string
import gettext
import locale
import ConfigParser
#this is needed for py2exe
if sys.platform == 'win32':
#win32 platform, add the "lib" folder to the system path
os.environ['PATH'] += ";gtk/bin;gtk/lib;postgresql;"
import pygtk
else:
import pygtk
#not win32, ensure version 2.0 of pygtk is imported
pygtk.require('2.0')
import gtk
import pgw
import pgw.UI
import pgw.RunSQL
import pgw.Execute
import pgw.DBConnection
# maximum entries in the SQL queries history
PGW_MAX_HISTORY = 100
# maximum entries in the connection parameters history
PGW_MAX_CONNECTION_HISTORY = 5
class PgWorksheet:
def __init__(self, app_name, app_version, pixmap_path, locale_path):
# Initialize I18N
if (pgw.mswindows()):
try:
# try to get the default language
lang = gettext.translation(app_name, locale_path,
[ locale.getdefaultlocale()[0] ])
lang.install()
except IOError:
# fallback to the default method
gettext.bindtextdomain(app_name, locale_path)
gettext.textdomain(app_name)
gettext.install(app_name, locale_path, unicode=1)
else:
gettext.bindtextdomain(app_name, locale_path)
gettext.textdomain(app_name)
gettext.install(app_name, locale_path, unicode=1)
# Build UI
self.ui = pgw.UI.UI(self, app_name, app_version, pixmap_path)
# Default connection state : not connected
self.db = None
self.disconnect()
# Initialize prev/next query lifos
self.prev_statements = []
self.next_statements = []
self.load_history()
self.prev_saved = 1
# Display connection dialog on startup
self.on_menu_connect(None)
# Start application
gtk.main()
def add_prevstatement(self, sql):
"""Add a query to the previous queries lifo"""
# do not add the same query two times
if (len(self.prev_statements) > 0):
prev = self.prev_statements[len(self.prev_statements)-1]
if (prev == sql): return
# add the query to the lifo
self.prev_statements.append(sql)
self.ui.enable_prevsql(len(self.prev_statements) > 0)
def get_history_path(self):
"""Returns the path to the configuration file"""
return os.path.join(pgw.get_user_configdir(), '.pgworksheet_history');
def save_history(self):
"""Save the history in a text file"""
try:
fd = open(self.get_history_path(), 'w')
self.add_prevstatement(self.ui.get_sqlbuffer_text())
for sql in self.prev_statements:
fd.write("\n#$#\n")
fd.write(string.rstrip(sql))
for sql in self.next_statements:
fd.write("\n#$#\n")
fd.write(string.rstrip(sql))
fd.write("\n#$#\n")
fd.close()
except IOError:
pass
def load_history(self):
"""Load the history from a text file"""
try:
fd = open(self.get_history_path(), 'r')
sql = ''
count = 0
for line in fd:
line = string.rstrip(line)
if (line == '') :
continue
try:
line = unicode(line, 'UTF-8')
except UnicodeDecodeError:
try:
line = unicode(line, pgw.get_user_encoding())
except UnicodeDecodeError:
pass
if (line == '#$#'):
if (len(sql) > 0):
self.prev_statements.append(sql)
count = count + 1
sql = ''
continue
sql += line
sql += '\n'
if (len(sql) > 0):
self.prev_statements.append(sql)
fd.close()
if (count > PGW_MAX_HISTORY):
self.prev_statements = self.prev_statements[count - PGW_MAX_HISTORY : count]
self.ui.enable_prevsql(len(self.prev_statements) > 0)
except IOError:
pass
def is_connected(self):
"""Return TRUE if connected to a database"""
if (self.db is None):
return None
else:
return self.db.is_connected()
def connect(self, host = None, port = None, db = None,
user = None, password = None):
"""Connect to a database"""
self.ui.wndmain.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH))
self.ui.status(_('Trying to connect as %(user)s to %(db)s on %(host)s') %
{'user':user, 'db':db, 'host':host}, _('connecting...'))
while (gtk.events_pending() == True):
gtk.main_iteration_do(False)
# Disconnect the re-connect
self.disconnect()
self.db = pgw.DBConnection.DBConnection(host, port, db, user, password)
# we are connected
if (self.is_connected()):
# update the UI to reflect the connection state
self.ui.enable_disconnect()
self.ui.enable_runsql()
self.ui.status(_('connected as %(user)s to %(db)s on %(host)s') %
{'user':user, 'db':db, 'host':host}, self.db.pgversion())
new_conn = "%s,%s,%s,%s" % (host, port, db, user)
# update the connection history
n = 0
for conn in self.all_connections:
# remove the connection from the history if it already exists
if (conn == new_conn):
self.all_connections.pop(n)
break
n = n + 1
# add the connection to the history, making it the first of the list
self.all_connections.insert(0, new_conn)
# save the connection history in the config file
cp = ConfigParser.ConfigParser()
try:
cp.readfp(open(pgw.get_config_path(), 'r'))
except IOError:
pass
if (not cp.has_section("connections")):
cp.add_section("connections")
n = 0
while ((n <= PGW_MAX_CONNECTION_HISTORY) and
(n < len(self.all_connections))):
cp.set("connections", "conn%d" % (n + 1), self.all_connections[n])
n = n + 1
try:
cp.write(open(pgw.get_config_path(), 'w'))
except IOError:
pass
# ready to type queries, give the focus to the text field
self.ui.setfocus_sqlbuffer()
# initialize the objects used to execute the queries
self.execute = pgw.Execute.Execute(self.db)
self.run = pgw.RunSQL.RunSQL(self.execute,
self.ui.sqlview,
self.ui.resulttab,
self.ui.status_result)
self.ui.wndmain.window.set_cursor(None)
def disconnect(self):
"""Disconnect from the current database"""
# disconnect from the database
if (self.is_connected()): self.db.disconnect()
# destroy the objects used for this connection
self.db = None
self.execute = None
self.run = None
# update the UI to reflect the connection state
self.ui.status(_('not connected'), 'PgWorksheet v' + app_version)
self.ui.enable_disconnect(False)
self.ui.enable_runsql(False)
def on_wndmain_destroy(self, widget):
"""Called when the application quits"""
self.disconnect()
self.save_history()
gtk.main_quit()
sys.exit(0)
def on_wndmain_delete(self, widget, event):
"""Called when the user wants to close the main window"""
return False
def on_menu_connect(self, widget):
"""Called when the user want the connection dialog box"""
# fill the connection dialog box with default parameters
try:
self.username = os.environ['USERNAME']
except KeyError:
try:
self.username = os.environ['USER']
except KeyError:
pass
host = 'localhost'
port = '5432'
database = 'template1'
username = 'pgsql'
self.display_connect_dialog(host, port, username, database, 0)
def display_connect_dialog(self, host, port, username, database, overwrite_entry):
# display and execute the connection dialog box
params = self.ui.connect_dialog(self, host, port, username, database, overwrite_entry)
# check if the user have clicked "Cancel"
if (params is not None):
# connect to the database
host, port, username, passwd, database = params;
self.connect(host, port, database, username, passwd)
# error connecting to the database, retry
if (not self.is_connected()):
self.ui.error_box(_('Error connecting to %s:%s@%s:%s') %
(username, database, host, port))
self.display_connect_dialog(host, port, username, database, 1)
def on_dlgconnect_map(self, widget):
"""Called when the connection dialog box is displayed"""
# clear the connections history
self.all_connections = []
# load the connections history from the config file
cp = ConfigParser.ConfigParser()
try :
cp.readfp(open(pgw.get_config_path(), 'r'))
n = 1
while n <= PGW_MAX_CONNECTION_HISTORY:
try:
line = cp.get("connections", "conn%d" % n)
# add the connection to the connections history
self.all_connections.append(line)
host, port, db, user = string.split(line, ',')
# add the connections to the connections history list of the dialog box
self.ui.storeconn.append(["%s:%s@%s" % (user, db, host), line])
n = n + 1
except:
break
# if we have at least one connection in the history, made it the default
if (n > 1) :
# select the last used connection
self.ui.viewconn.set_cursor(self.ui.storeconn.get_path(
self.ui.storeconn.get_iter_first()))
except IOError:
pass
def on_dlgconnect_change(self, treeview):
"""Called when the user choose a connection in the connection history list"""
# fill the connection dialog with the selected connection parameters
model, iter = treeview.get_selection().get_selected()
host, port, db, user = string.split(model.get(iter, 1)[0], ',')
self.ui.entry_host.set_text(host)
self.ui.entry_port.set_text(port)
self.ui.entry_database.set_text(db)
self.ui.entry_user.set_text(user)
self.ui.entry_password.set_text('')
def on_menu_disconnect(self, widget):
"""Called when the user wants to disconnect from the database"""
self.disconnect()
def on_menu_opensql(self, widget):
"""The user wants to open a file with some queries"""
filename = self.ui.file_dialog(_('Select a SQL text file'));
if (filename is not None):
self.ui.undo.lock = True
for handler in self.ui.buffer_handlers:
self.ui.sqlbuffer.handler_block(handler)
self.ui.set_sqlbuffer_text('')
try:
input = open(filename, 'r')
for line in input:
try:
self.ui.sqlbuffer.insert_at_cursor(unicode(line, 'UTF-8'))
except UnicodeDecodeError:
try:
self.ui.sqlbuffer.insert_at_cursor(unicode(line, pgw.get_user_encoding()))
except UnicodeDecodeError:
self.ui.sqlbuffer.insert_at_cursor(line)
except IOError:
self.ui.error_box(_('Error while opening or reading from %s') %filename)
for handler in self.ui.buffer_handlers:
self.ui.sqlbuffer.handler_unblock(handler)
self.ui.undo.reset()
self.ui.undo.lock = False
self.ui.syntax.refresh()
pgw.set_proportional(self.ui.sqlbuffer)
def file_overwrite(self, title):
"""Display a "Save As" dialopg box and prompt a confirmation if the selected file exists"""
filename = self.ui.file_dialog(title, gtk.FILE_CHOOSER_ACTION_SAVE,
gtk.STOCK_SAVE_AS);
if (filename is not None):
try:
os.stat(filename)
if (self.ui.yesno_box(_('%s already exists, overwrite ?') % filename) ==
gtk.RESPONSE_YES):
return filename
return self.file_overwrite(title)
except OSError: # file does not exists
return filename
return None
def on_menu_savesql(self, widget):
"""The user wants to save his queries"""
filename = self.file_overwrite(_('Save SQL queries'))
if (filename is not None):
try:
output = open(filename, 'w')
output.write(self.ui.get_sqlbuffer_text())
except IOError:
self.ui.error_box(_('Error while creating or writing %s') % filename)
def save_list_row(self, model, path, iter, output):
"""Save a row of a TreeView in a tabular form"""
col = 0
while (col < model.get_n_columns()):
val = string.replace(model.get_value(iter, col), '"', '\"')
output.write('"' + val + '"')
col = col + 1
if (col < model.get_n_columns()):
output.write('\t')
output.write('\n')
def saveresults(self, widget, output):
"""Save the content of a TreeView to a tab separated file"""
widget = widget.get_child()
if (isinstance(widget, gtk.TextView)):
buffer = widget.get_buffer()
output.write(buffer.get_text(buffer.get_start_iter(),
buffer.get_end_iter()))
elif (isinstance(widget, gtk.TreeView)):
widget.get_model().foreach(self.save_list_row, output)
def on_menu_saveallresults(self, widget):
"""The user wants to save ALL the results"""
if (self.ui.resulttab.get_n_pages() > 0):
filename = self.file_overwrite(_('Save all the results'))
if (filename is not None):
try:
output = open(filename, 'w')
page = 0
while page < self.ui.resulttab.get_n_pages() :
self.saveresults(self.ui.resulttab.get_nth_page(page), output)
page = page + 1
except IOError:
self.ui.error_box(_('Error while creating or writing %s') % filename)
def on_menu_saveresults(self, widget):
"""The user wants to save the current result"""
if (self.ui.resulttab.get_n_pages() > 0):
filename = self.file_overwrite(_('Save the results'))
if (filename is not None):
try:
output = open(filename, 'w')
self.saveresults(self.ui.resulttab.get_nth_page(
self.ui.resulttab.get_current_page()), output)
except IOError:
self.ui.error_box(_('Error while creating or writing %s') % filename)
def on_menu_cut(self, widget):
"""Cut text to the clipboard"""
w = self.ui.wndmain.get_focus()
if (isinstance(w, gtk.TextView)):
w.emit('cut-clipboard')
def on_menu_copy(self, widget):
"""Copy text to the clipboard"""
w = self.ui.wndmain.get_focus()
if (isinstance(w, gtk.TextView)):
w.emit('copy-clipboard')
elif (isinstance(w, gtk.TreeView)):
model, iter = w.get_selection().get_selected()
if (iter is not None):
col = 0
result = ''
while (col < model.get_n_columns()):
val = string.replace(model.get_value(iter, col), '"', '\"')
result = result + val
col = col + 1
if (col < model.get_n_columns()):
result = result + '\t'
clip = gtk.Clipboard()
clip.set_text(result)
def on_menu_paste(self, widget):
"""Paste from the clipboard"""
w = self.ui.wndmain.get_focus()
if (isinstance(w, gtk.TextView)):
w.emit('paste-clipboard')
def on_menu_selectall(self, widget):
"""Select the entire text"""
w = self.ui.wndmain.get_focus()
if (isinstance(w, gtk.TextView)):
buffer = w.get_buffer()
buffer.move_mark_by_name('selection_bound', buffer.get_start_iter())
buffer.move_mark_by_name('insert', buffer.get_end_iter())
def on_sqlview_focus_in(self, widget, event):
self.ui.enable_cut()
self.ui.enable_paste()
def on_sqlview_focus_out(self, widget, event):
self.ui.enable_cut(False)
self.ui.enable_paste(False)
def on_sqlview_keypress(self, widget, event):
"""Save the last statement in the history
if needed (after an execution"""
if (event is None) : return
if (event.keyval != 65507):
if (self.prev_saved == 0):
self.add_prevstatement(self.ui.get_sqlbuffer_text())
self.prev_saved = 1
def on_menu_about(self, widget):
self.ui.about_dialog()
def on_menu_runsql(self, widget):
"""Execute the SQL queries"""
if (not self.is_connected()):
if (self.ui.yesno_box(_('Not connected to a database.\nDo you want to connect now ?')) ==
gtk.RESPONSE_NO):
return
self.on_menu_connect(widget)
if (not self.is_connected()):
return
self.on_text_change(widget)
self.prev_saved = 0
self.ui.wndmain.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH))
self.run.run()
self.ui.enable_saveresult(self.ui.resulttab.get_n_pages() > 0)
self.ui.wndmain.window.set_cursor(None)
def on_menu_prevsql(self, widget):
"""Display the previous statement from the history"""
self.ui.undo.lock = True
if (len(self.prev_statements) > 0):
s = self.prev_statements.pop()
self.next_statements.append(self.ui.get_sqlbuffer_text())
self.ui.set_sqlbuffer_text(s)
self.prev = s
self.ui.enable_prevsql(len(self.prev_statements) > 0)
self.ui.enable_nextsql(len(self.next_statements) > 0)
self.ui.undo.lock = False
def on_menu_nextsql(self, widget):
"""Display the next statement from the history"""
self.ui.undo.lock = True
if (len(self.next_statements) > 0):
s = self.next_statements.pop()
self.prev_statements.append(self.ui.get_sqlbuffer_text())
self.ui.set_sqlbuffer_text(s)
self.prev = s
self.ui.enable_prevsql(len(self.prev_statements) > 0)
self.ui.enable_nextsql(len(self.next_statements) > 0)
self.ui.undo.lock = False
def on_text_change(self, widget):
"""The text have been changed after navigation the history"""
if (self.ui.undo.lock):
return
if (len(self.next_statements) > 0):
if (self.next_statements[0] == ''):
self.next_statements.pop(0)
self.prev_statements.append(self.prev)
for i in reversed(self.next_statements):
self.prev_statements.append(i)
self.next_statements = []
self.ui.enable_prevsql(len(self.prev_statements) > 0)
self.ui.enable_nextsql(len(self.next_statements) > 0)
# Application parameters
app_name = 'pgworksheet'
app_version = '1.8.1'
# Default pixmap path
pixmap_path = '/usr/X11R6/share/pixmaps/pgworksheet'
# Find current pixmap path
if (not os.access(os.path.join(pixmap_path, 'pgworksheet-32.png'), os.F_OK)):
pixmap_path = os.path.join(sys.prefix, 'share/pixmaps/pgworksheet')
if (not os.access(os.path.join(pixmap_path, 'pgworksheet-32.png'), os.F_OK)):
pixmap_path = os.path.join(os.path.dirname(sys.argv[0]), 'pixmaps/pgworksheet')
# Find current locale path
locale_path = '@LOCALE_PATH@'
if (not os.access(os.path.join(locale_path, 'fr/LC_MESSAGES/pgworksheet.mo'), os.F_OK)):
locale_path = os.path.join(sys.prefix, 'share/locale')
if (not os.access(os.path.join(locale_path, 'fr/LC_MESSAGES/pgworksheet.mo'), os.F_OK)):
locale_path = os.path.join(os.path.dirname(sys.argv[0]), 'locale')
# Start program
p = None
try:
p = PgWorksheet(app_name, app_version, pixmap_path, locale_path)
except KeyboardInterrupt:
if (p is not None):
p.on_wndmain_destroy(None)