from WebKit.Examples.ExamplePage import ExamplePage from MiscUtils.Configurable import Configurable class DBConfig(Configurable): """Database configuration.""" def defaultConfig(self): return { 'dbapi': 'pg', 'database': 'demo', 'user': 'demo', 'password': 'demo', 'mincached': 5, 'maxcached': 25 } def configFilename(self): return 'Configs/Database.config' # the database tables used in this example: tables = ('''seminars ( id varchar(4) primary key, title varchar(64) unique not null, cost money, places_left smallint)''', '''attendees ( name varchar(64) not null, seminar varchar(4), paid boolean, primary key(name, seminar), foreign key (seminar) references seminars(id) on delete cascade)''') class DBUtilsExample(ExamplePage): """Example page for the DBUtils package.""" # Initialize the database class once when this class is loaded: config = DBConfig().config() if config.get('maxcached', None) is None: dbmod_name = 'Persistent' else: dbmod_name = 'Pooled' dbapi_name = config.get('dbapi', 'pg') if dbapi_name == 'pg': # use the PyGreSQL classic DB API dbmod_name += 'Pg' if config.has_key('database'): config['dbname'] = config['database'] del config['database'] if config.has_key('password'): config['passwd'] = config['password'] del config['password'] else: # use a DB-API 2 compliant module dbmod_name += 'DB' dbapi = dbmod = dbclass = dbstatus = None try: dbapi = __import__(dbapi_name) try: dbmod = getattr(__import__('DBUtils.' + dbmod_name), dbmod_name) try: if dbapi_name == 'pg': del config['dbapi'] else: config['dbapi'] = dbapi dbclass = getattr(dbmod, dbmod_name)(**config) except dbapi.Error, error: dbstatus = str(error) except Exception: dbstatus = 'Could not connect to the database.' except Exception: dbstatus = 'Could not import DBUtils.%s.' % dbmod_name except Exception: dbstatus = 'Could not import %s.' % dbapi_name # Initialize the buttons _actions = [] _buttons = [] for action in ('create tables', 'list seminars', 'list attendees', 'new seminar', 'new attendee'): value = action.capitalize() action = action.split() action[1] = action[1].capitalize() action = ''.join(action) _actions.append(action) _buttons.append('' % (action, value)) _buttons = tuple(_buttons) def title(self): return "DBUtils Example" def actions(self): return ExamplePage.actions(self) + self._actions def awake(self, transaction): ExamplePage.awake(self, transaction) self._output = [] def postAction(self, actionName): self.writeBody() del self._output ExamplePage.postAction(self, actionName) def output(self, s): self._output.append(s) def outputMsg(self, msg, error=0): self._output.append('
%s
' % (error and 'red' or 'green', msg)) def connection(self, shareable=1): if self.dbstatus: error = self.dbstatus else: try: if self.dbmod_name == 'PooledDB': return self.dbclass.connection(shareable) else: return self.dbclass.connection() except self.dbapi.Error, error: error = str(error) except Exception: error = 'Cannot connect to the database.' self.outputMsg(error, 1) def sqlEncode(self, s): if s is None: return 'null' s = s.replace('\\', '\\\\').replace('\'', '\\\'') return "'%s'" % s def createTables(self): db = self.connection(0) if not db: return for table in tables: self._output.append('Creating the following table:
' '%s' % table) ddl = 'create table ' + table try: if self.dbapi_name == 'pg': db.query(ddl) else: db.cursor().execute(ddl) db.commit() except self.dbapi.Error, error: if self.dbapi_name != 'pg': db.rollback() self.outputMsg(error, 1) else: self.outputMsg('The table was successfully created.') db.close() def listSeminars(self): id = self.request().field('id', None) if id: if type(id) != type([]): id = [id] cmd = ','.join(map(self.sqlEncode, id)) cmd = 'delete from seminars where id in (%s)' % cmd db = self.connection(0) if not db: return try: if self.dbapi_name == 'pg': db.query('begin') db.query(cmd) db.query('end') else: db.cursor().execute(cmd) db.commit() except self.dbapi.Error, error: try: if self.dbapi_name == 'pg': db.query('end') else: db.rollback() except Exception: pass self.outputMsg(error, 1) return else: self.outputMsg('Entries deleted: %d' % len(id)) db = self.connection() if not db: return query = ('select id, title, cost, places_left from seminars ' 'order by title') try: if self.dbapi_name == 'pg': result = db.query(query).getresult() else: cursor = db.cursor() cursor.execute(query) result = cursor.fetchall() cursor.close() except self.dbapi.Error, error: self.outputMsg(error, 1) return if not result: self.outputMsg('There are no seminars in the database.', 1) return wr = self.output button = self._buttons[1].replace('List seminars', 'Delete') wr('
Configuration: %r
' % DBConfig().config()) wr('This example uses a small demo database ' 'designed to track the attendees for a series of seminars ' '(see "The ' 'Python DB-API" by Andrew Kuchling).
') wr('' % self._buttons)