#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import json
[docs]class DBConnect:
"""Light database connection object."""
settings = {}
def _check_settings(self):
"""
Check configuration file
:return: True if all settings are correct
"""
keys = ['host', 'user', 'password']
if not all(key in self.settings.keys() for key in keys):
raise ValueError(
'Please check credentials file for correct keys: host, user, '
'password, database'
)
if self.engine == "mysql" and 'database' not in self.settings.keys():
raise ValueError(
'database parameter is missing in credentials'
)
# @NOTE PostgreSQL uses dbname and is automatically set to username
def connect(self):
"""
Creates connection to database, sets connection and cursor
Connection to database can be loosed,
if that happens you can use this function to reconnect to database
"""
if self.engine == "mysql":
try:
import mysql.connector # MySQL Connector
from mysql.connector import errorcode
self.connection = mysql.connector.connect(**self.settings)
except ImportError:
raise ValueError(
'Please, install mysql-connector module before using plugin.'
)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
raise ValueError("Wrong credentials, ACCESS DENIED")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
raise ValueError(
"Database %s does not exists" % (self.settings['database'])
)
else:
raise ValueError(err)
elif self.engine == "postgres":
try:
import psycopg2
except ImportError:
raise ValueError(
'Please, install psycopg2 module before using plugin.'
)
self.connection = psycopg2.connect(**self.settings)
else:
raise NotImplementedError(
"Database engine %s not implemented!" % self.engine
)
self.cursor = self.connection.cursor()
def __init__(self, credentials_file=None, charset='utf8',
port=3306, engine="mysql", **kwargs):
"""
Initialise object with credentials file provided
You can choose between providing file or connection details
Available parameters:
https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
"""
if credentials_file:
with open(credentials_file, 'r') as f:
self.settings = json.load(f)
if 'port' not in self.settings:
self.settings['port'] = port
if 'charset' not in self.settings:
self.settings['charset'] = charset
# Merge with kwargs
self.settings.update(**kwargs)
self.engine = self.settings.pop('engine', engine)
# @NOTE Charset parameter not supported in PostgreSQL
if self.engine == 'postgres':
self.settings.pop('charset', None)
self._check_settings()
self.connection = None
self.cursor = None
# Establish connection and set cursor
self.connect()
def disconnect(self):
"""
Disconnect from database
"""
self.connection.close()
@staticmethod
def _where_builder(filters, case):
"""
Build where part for query
:param filters: dict filters for rows (where)
:return: str update query and dict where data
"""
query = ""
where_data = {}
for key in filters:
if isinstance(filters[key], tuple):
if len(filters[key]) == 3:
# Like (id_start, id_end, '<=>')
if '=' in filters[key][2]:
query += key + ' >= ' + \
'%(where_start_' + key + ')s AND ' + key + \
' <= ' + '%(where_end_' + key + ')s ' + \
case + ' '
else:
query += key + ' > ' + '%(where_start_' + key + \
')s AND ' + key + ' < ' + '%(where_end_' + \
key + ')s ' + case + ' '
where_data['start_' + key] = filters[key][0]
where_data['end_' + key] = filters[key][1]
elif len(filters[key]) == 2:
# Like (id_start, '>=')
if not filters[key][0]:
query += key + ' ' + filters[key][1] + ' ' + \
'NULL ' + case + ' '
else:
query += key + ' ' + filters[key][1] + ' ' + \
'%(where_' + key + ')s ' + case + ' '
where_data[key] = filters[key][0]
else:
raise ValueError(
"Missing case param in filter: %s" % filters[key][0]
)
elif not filters[key] and not isinstance(filters[key], int):
query += key + ' is NULL ' + case + ' '
else:
query += key + ' = ' + '%(where_' + key + ')s ' + case + ' '
where_data[key] = filters[key]
return query.rstrip(case + ' '), where_data
def fetch(self, table, limit=1000, fields=None, filters=None, case='AND'):
"""
Get data from table
:param table: name of the table
:type table: str
:param limit: result limit for fetch
:type limit: int
:param fields: fields to get
:type fields: list
:param filters: filters to get custom results (where)
:type filters: dict
:param case: [AND, OR] for filter type
:type case: str
:return: array of dictionary with column name and value
"""
if fields:
query = 'SELECT '
for field in fields:
query += field + ', '
query = query.rstrip(', ') + ' FROM ' + str(table)
else:
query = 'SELECT * FROM %s' % table
data = None
if filters:
data = {}
query += ' WHERE '
update_query, where_data = self._where_builder(filters, case)
query += update_query
for key in where_data:
data['where_' + key] = where_data[key]
query += ' LIMIT ' + str(limit)
if data:
self.cursor.execute(query, data)
else:
self.cursor.execute(query)
items = self.cursor.fetchall()
if fields:
columns = fields
else:
columns = [i[0] for i in self.cursor.description]
results = []
for item in items:
data = {}
for i in range(len(columns)):
data[columns[i]] = item[i]
results.append(data)
return results
def insert(self, data, table, commit=True, update=None):
"""
Insert dictionary object to database
:type data: dict
:param data: Object with keys as column name in database
:type table: str
:param table: Table name
:param commit: Commit after every insert
:type update: dict
:param update: Update selected columns if key is duplicate
:return: dict with Boolean status key and message
"""
if not self.connection:
return {'status': False, 'message': "Connection is not defined"}
if not self.cursor:
return {'status': False, 'message': "Cursor is not defined"}
if not len(data):
return {'status': False, 'message': "Object is empty"}
# Make datetime and date objects string:
try:
query_insert = "INSERT INTO %s (" % table
query_value = " VALUES ("
for key in data:
query_insert += key + ', '
query_value += '%(' + key + ')s, '
query_insert = query_insert.rstrip(', ') + ')'
query_value = query_value.rstrip(', ') + ')'
query = query_insert + query_value
if update and bool(update):
# bool(dict) checks if dict is not empty
if self.engine == "mysql":
query += ' ON DUPLICATE KEY UPDATE '
for key in update:
query += key + ' = '
if isinstance(update[key], int):
query += update[key] + ', '
else:
query += '"' + update[key] + '", '
query = query.rstrip(', ')
elif self.engine == "postgres":
query += ' ON CONFLICT ON CONSTRAINT '
query += table + '_pkey'
query += ' DO UPDATE SET '
for key in update:
query = key + ' = '
if isinstance(update[key], int):
query += update[key] + ', '
else:
query += '"' + update[key] + '", '
query = query.rstrip(', ')
else:
raise NotImplementedError(
"Update on insert not implemented for choosen engine"
)
# Format, execute and send to database:
self.cursor.execute(query, data)
if commit:
self.commit()
except Exception as e:
if not isinstance(e, str):
e = str(e)
return {'status': False, 'message': e}
return {'status': True, 'message': "Object added to database"}
def update(self, data, filters, table, case='AND', commit=True):
"""
Update database using information in dictionary
:type data: dict
:param data: Object with keys as column name in database
:type filters: dict
:param filters: Objects with keys as column name for filters statement
:type table: str
:param table: Table name
:type case: str
:param case: Search case, Should be 'AND' or 'OR'
:type commit: bool
:param commit: Commit at the end or add to pool
:return: dict with Boolean status key and message
"""
if not self.connection:
return {'status': False, 'message': "Connection is not defined"}
if not self.cursor:
return {'status': False, 'message': "Cursor is not defined"}
if not len(data):
return {'status': False, 'message': "Object is empty"}
# Make datetime and date objects string:
try:
# Build query:
query_update = "UPDATE %s SET " % table
for key in data:
query_update += key + ' = %(' + key + ')s, '
# remove last comma and add empty space
query_update = query_update.rstrip(', ') + ' '
query_update += 'WHERE '
update_query, where_data = self._where_builder(filters, case)
query_update += update_query
# merge filters and data:
for key in where_data:
data['where_' + key] = where_data[key]
# execute and send to database:
self.cursor.execute(query_update, data)
if commit:
self.commit()
except Exception as e:
if not isinstance(e, str):
e = str(e)
return {'status': False, 'message': e}
return {'status': True, 'message': "Object added to database"}
def delete(self, table, filters=None, case='AND', commit=True):
"""
Delete item from table
:param table: name of table
:param filters: filter for item(s) to be deleted
:param case: [AND, OR] case for filter
:param commit: Commit at the end or add to pool
"""
if not filters:
raise ValueError(
"You must provide filter to delete some record(s). "
"For all records try truncate"
)
query = "DELETE FROM %s WHERE " % table
data = {}
update_query, where_data = self._where_builder(filters, case)
query += update_query
for key in where_data:
data['where_' + key] = where_data[key]
self.cursor.execute(query, data)
if commit:
self.commit()
def increment(self, table, fields, steps=1, filters=None,
case="AND", commit=True):
"""
Increment column in table
:param table: str table name
:param fields: list column names to increment
:param steps: int steps to increment, default is 1
:param filters: dict filters for rows to use
:param case: Search case, Should be 'AND' or 'OR'
:param commit: Commit at the end or add to pool
:note: If you use safe update mode, filters should be provided
"""
if not fields:
raise ValueError(
"You must provide which columns (fields) to update"
)
query = "UPDATE %s SET " % str(table)
for column in fields:
query += "{column} = {column} + {steps}, ".format(
column=column, steps=steps)
query = query.rstrip(', ')
data = {}
if filters:
query += ' WHERE '
update_query, where_data = self._where_builder(filters, case)
query += update_query
for key in where_data:
data['where_' + key] = where_data[key]
try:
self.cursor.execute(query, data)
except Exception as e:
return {'status': False, 'message': str(e)}
if commit:
self.commit()
return {'status': True, 'message': "Columns incremented"}
def value_sum(self, table, fields, filters=None, case='AND'):
"""
Get total sum of a numeric column(s)
:param table: name of the table
:type table: str
:param fields: fields to get sum of
:type fields: list
:param filters: filters to get custom results (where)
:type filters: dict
:param case: [AND, OR] for filter type
:type case: str
:return: dict with column name and value as Decimal
"""
query = 'SELECT '
for field in fields:
query += 'SUM(' + field + '), '
query = query.rstrip(', ') + ' FROM ' + str(table)
data = None
if filters:
data = {}
query += ' WHERE '
update_query, where_data = self._where_builder(filters, case)
query += update_query
for key in where_data:
data['where_' + key] = where_data[key]
if data:
self.cursor.execute(query, data)
else:
self.cursor.execute(query)
row = self.cursor.fetchone()
result = {}
for i in range(len(row)):
result[fields[i]] = row[i]
return result
def commit(self):
"""
Commit collected data for making changes to database
"""
self.connection.commit()
if __name__ == '__main__':
pass