Source code for fandango.db

#!/usr/bin/env python

#############################################################################
##
## file :       db.py
##
## description : This module simplifies database access.
##
## project :     Tango Control System
##
## $Author: Sergi Rubio Manrique, srubio@cells.es $
##
## $Revision: 2014 $
##
## copyleft :    ALBA Synchrotron Controls Section, CELLS
##               Bellaterra
##               Spain
##
#############################################################################
##
## This file is part of Tango Control System
##
## Tango Control System is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## Tango Control System is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
###########################################################################

"""
This package implements a simplified acces to MySQL using FriendlyDB object.

Go to http://mysql-python.sourceforge.net/MySQLdb.html for further information
"""

import time,datetime,log,traceback,sys
import MySQLdb

[docs]class FriendlyDB(log.Logger): """ Class for managing direct access to MySQL databases using mysql-python module """ def __init__(self,db_name,host='',user='',passwd='',autocommit=True,loglevel='WARNING',use_tuples=False,default_cursor=None): """ Initialization of MySQL connection """ self.call__init__(log.Logger,self.__class__.__name__,format='%(levelname)-8s %(asctime)s %(name)s: %(message)s') self.setLogLevel(loglevel or 'WARNING') #def __init__(self,api,db_name,user='',passwd='', host=''): #if not api or not database: #self.error('ArchivingAPI and database are required arguments for ArchivingDB initialization!') #return #self.api=api self.db_name=db_name self.host=host self.use_tuples = use_tuples #It will control if data is returned in tuples or lists self.setUser(user,passwd) self.autocommit = autocommit self.renewMySQLconnection() self.default_cursor = default_cursor or MySQLdb.cursors.Cursor self._cursor=None self._recursion = 0 self.tables={} def __del__(self): if hasattr(self,'__cursor') and self._cursor: self._cursor.close() del self._cursor if hasattr(self,'db') and self.db: self.db.close() del self.db
[docs] def setUser(self,user,passwd): """ Set User and Password to access MySQL """ self.user=user self.passwd=passwd
[docs] def setAutocommit(self,autocommit): try: self.db.autocommit(autocommit) self.autocommit = autocommit except Exception,e: self.error('Unable to set MySQLdb.connection.autocommit to %s'%autocommit) raise Exception,e
[docs] def renewMySQLconnection(self): try: if hasattr(self,'db') and self.db: self.db.close() del self.db self.db=MySQLdb.connect(db=self.db_name,host=self.host,user=self.user,passwd=self.passwd) self.db.autocommit(self.autocommit) except Exception,e: self.error( 'Unable to create a MySQLdb connection to "%s"@%s.%s: %s'%(self.user,self.host,self.db_name,str(e))) raise Exception,e
[docs] def getCursor(self,renew=True,klass=None): ''' returns the Cursor for the database renew will force the creation of a new cursor object klass may be any of MySQLdb.cursors classes (e.g. DictCursor) MySQLdb.cursors.SSCursor allows to minimize mem usage in clients (although it relies memory cleanup to the server!) ''' try: if klass in ({},dict): klass = MySQLdb.cursors.DictCursor if (renew or klass) and self._cursor: if not self._recursion: self._cursor.close() del self._cursor if renew or klass or not self._cursor: self._cursor = self.db.cursor(self.default_cursor) if klass is None else self.db.cursor(cursorclass=klass) return self._cursor except: print traceback.format_exc() self.renewMySQLconnection() self._recursion += 1 return self.getCursor(renew=True,klass=klass)
[docs] def tuples2lists(self,tuples): ''' Converts a N-D tuple to a N-D list ''' return [self.tuples2lists(t) if type(t) is tuple else t for t in tuples]
[docs] def table2dicts(self,keys,table): ''' Converts a 2-D table and a set of keys in a list of dictionaries ''' result = [] for line in table: d={} [d.__setitem__(keys[i],line[i]) for i in range(min([len(keys),len(line)]))] result.append(d) return result
[docs] def fetchall(self,cursor=None): """ This method provides a custom replacement to cursor.fetchall() method. It is used to return a list instead of a big tuple; what seems to cause trouble to python garbage collector. """ vals = [] cursor = cursor or self.getCursor() while True: v = cursor.fetchone() if v is None: break vals.append(v) return vals
[docs] def Query(self,query,export=True,asDict=False): ''' Executes a query directly in the database @param query SQL query to be executed @param export If it's True, it returns directly the values instead of a cursor @return the executed cursor, values can be retrieved by executing cursor.fetchall() ''' try: q=self.getCursor(klass = dict if asDict else self.default_cursor) q.execute(query) except: self.renewMySQLconnection() q=self.getCursor(klass = dict if asDict else None) q.execute(query) if not export: return q elif asDict or not self.use_tuples: return self.fetchall(q) #q.fetchall() else: return self.tuples2lists(self.fetchall(q)) #q.fetchall()
[docs] def Select(self,what,tables,clause='',group='',order='',limit='',distinct=False,asDict=False,trace=False): ''' Allows to create and execute Select queries using Lists as arguments @return depending on param asDict it returns a list or lists or a list of dictionaries with results ''' if type(what) is list: what=','.join(what) if len(what)>1 else what[0] if type(tables) is list: tables=','.join(tables) if len(tables)>1 else tables[0] if type(clause) is list: clause=' and '.join('(%s)'%c for c in clause) if len(clause)>1 else clause[0] elif type(clause) is dict: clause1='' for i in range(len(clause)): k,v = clause.items()[i] clause1+= "%s like '%s'"%(k,v) if type(v) is str else '%s=%s'%(k,str(v)) if (i+1)<len(clause): clause1+=" and " #' and '.join clause=clause1 if type(group) is list: group=','.join(group) if len(group)>1 else group[0] query = 'SELECT '+(distinct and ' DISTINCT ' or '') +' %s'%what if tables: query += ' FROM %s' % tables if clause: query += ' WHERE %s' % clause if group: group+= ' GROUP BY %s' % group if order: query += ' ORDER BY %s' % order if limit: query+= ' LIMIT %s' % limit result = self.Query(query,True,asDict=asDict) if not asDict and not self.use_tuples: return self.tuples2lists(result) else: return result #else: return self.table2dicts(what.split(',') if what!='*' else [k for t in tables.split(',') for k in self.getTableCols(t)],result)
[docs] def getTables(self,load=False): ''' Initializes the keys of the tables dictionary and returns these keys. ''' if load or not self.tables: q=self.Query('show tables',False) [self.tables.__setitem__(t[0],[]) for t in self.tuples2lists(q.fetchall())] return sorted(self.tables.keys())
[docs] def getTableCols(self,table): ''' Returns the column names for the given table, and stores these values in the tables dict. ''' self.getTables() if not self.tables[table]: q=self.Query('describe %s'%table,False) [self.tables[table].append(t[0]) for t in self.tuples2lists(q.fetchall())] return self.tables[table]
[docs] def getTableSize(self,table=''): table = table or '%'; res = self.Query("select table_name,table_rows from information_schema.tables where table_schema = '%s' and table_name like '%s';"%(self.db_name,table)) if not res: return 0 elif len(res)==1: return res[0][1] else: return dict(res)
[docs] def get_all_cols(self): if not self.tables: self.getTables() for t in self.tables: if not self.tables[t]: self.getTableCols(t) return
from . import doc __doc__ = doc.get_fn_autodoc(__name__,vars())