# -*- coding: utf8 -*-
################################################################################
#
# Copyright (c) 2012 STEPHANE MANGIN. (http://le-spleen.net) All Rights Reserved
# Stéphane MANGIN <stephane.mangin@freesbee.fr>
# Copyright (c) 2012 OSIELL SARL. (http://osiell.com) All Rights Reserved
# Eric Flaux <contact@osiell.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
################################################################################
"""
"""
import os
import sys
import csv
import unicodedata
from pprint import pprint
from csv2oerp.lib import unicode_csv
from csv2oerp.constants_and_vars import STATS
[docs]def get_line_from_table(csv_file, value, column, separator=u',', quote=u'"', header=True):
"""Search for an equivalence between value from column `col_referer`
and `value` value, and return value from column `col_return` index at
the same line as matching pair `col_referer` values.
:param csv_file: The name of the CSV file to search into
:type csv_file: str
:param value: The value to test
:type value: type
:param col_referer: The number of the column to compare to `value`
:type col_referer: int
:param col_return: he number of the column to return if matches
:type col_return: int
:param separator: The CSV separator
:type separator: str
:param quote: The CSV quote
:type quote: str
:param header: CSV file has header
:type header: bool
:returns: list
"""
res = []
with open(csv_file, u'r') as f:
filereader = unicode_csv.Reader(
f, delimiter=separator, quotechar=quote)
for line in filereader:
if header:
header = False
continue
if line[column] == value:
res.append(line)
return res
[docs]def search_csv(csv_file, value, col_referer=0, col_return=1,
separator=',', quote='"', header=True):
"""Search for an equivalence between value from column `col_referer`
and `value` value, and return value from column `col_return` index at
the same line as matching pair `col_referer` values.
:param csv_file: The name of the CSV file to search into
:type csv_file: str
:param value: The value to test
:type value: type
:param col_referer: The number of the column to compare to `value`
:type col_referer: int
:param col_return: he number of the column to return if matches
:type col_return: int
:param separator: The CSV separator
:type separator: str
:param quote: The CSV quote
:type quote: str
:param header: CSV file has header
:type header: bool
:returns: list
"""
res = []
filereader = unicode_csv.Reader(
csv_file, delimiter=separator, quotechar=quote)
for item in filereader:
if header:
header = False
continue
if item[col_referer] == value:
res.append(item[col_return])
return res
[docs]def purge_stats():
"""Re-initialize objects activities resume
"""
global STATS
STATS = {
u'objects': {}
}
[docs]def show_stats():
"""Show model's activities resume.
At the end of an import for example.
"""
sys.stdout.write(u'\n')
sys.stdout.write(u'*' * 79 + u'\n')
sys.stdout.write(u'* Objects activities :' + u' ' * 32 +
u'Written Created Skipped *\n')
sys.stdout.write(u'*' * 79 + u'\n')
for model_name in STATS[u'objects'].keys():
sys.stdout.write(u'%(name)-51s : %(written)7s %(created)7s %(skipped)7s' %
STATS[u'objects'][model_name])
sys.stdout.write(u'\n')
sys.stdout.write(u'\n')
[docs]def strip_accents(string, normalize=u'NFD', category=u'Mn'):
""" Replace accents from string with non accented char
"""
res = (c for c in unicodedata.normalize(normalize, string)
if unicodedata.category(c) != category)
return u''.join(res)
[docs]def clean_line(line, encode_from=u'utf-8'):
""" Encode a list in ``encode_from`` encoding
Strip accents, and replace carriages return by space, replace double quotes by simple ones
:param line: A list
"""
return [clean(col, encode_from) for col in line]
[docs]def clean_lines(lines, encode_from=u'utf-8'):
""" Encode a list of list in ``encode_from`` encoding
Strip accents, and replace carriages return by space, replace double quotes by simple ones
:param lines: A list of list
"""
return [clean_line(line, encode_from) for line in lines]
[docs]def clean(string, encode_from=u'utf-8'):
""" Clean a string by removing returns and double quotes.
Strip accents, and replace carriages return by space, replace double quotes by simple ones
"""
try:
string = unicode(string)
except:
pass
string.replace(u'\r\n', u' ')
string.replace(u'\r', u' ')
string.replace(u'\n', u' ')
string.replace(u'"', u'\'')
try:
string = strip_accents(string)
except IndexError:
pass
return string
[docs]def generate_code(filenames=(), header=True, mode=u'stdout', delimiter=None,
quotechar=None, encoding=u'utf-8'):
"""Return a template's skeleton code
prerequisites:
Having a columns header to the CSV file
:param filenames: Name tuple of filenames to generate the importation code from
:type filename: tuple
:param header: First line column's title or not
:type header: bool
:param stdout: Printing mode (``stdout``, ``file``)
:type stdout: str
"""
if mode != u'stdout':
if os.path.isfile(mode) \
and raw_input(u'`%s` already existed. Overwrites ?' % mode) in (u'y', u'Y', u'o', u'O'):
f = open(mode, u'wb')
else:
exit()
else:
f = sys.stdout
if not isinstance(filenames, tuple) or not isinstance(filenames, list):
if filenames.count(','):
filenames = filenames.split(',')
else:
filenames = (filenames,)
# HEADERS + SOME CALLBACKS
f.write("""#!/usr/bin/env python
# -*- coding: utf8 -*-
################################################################################
#
# Copyright (c) 2012 STEPHANE MANGIN. (http://le-spleen.net) All Rights Reserved
# Stephane MANGIN <stephane.mangin@freesbee.fr>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
################################################################################
import csv2oerp
from csv2oerp import Field, Model, Session, Openerp
from csv2oerp import callbacks as cb
oerp = Openerp(
host='localhost',
port=8069,
dbname='database',
user='admin',
pwd='admin',
lang='fr_FR')
""")
for path in filenames:
# Get file name without extention, replace space by underscore
(dirName, fileName) = os.path.split(path)
(fileBaseName, fileExtension) = os.path.splitext(fileName)
fileBaseName = fileBaseName.replace(u' ', u'_').replace(u'.', u'_')
with open(path, u"rb") as csvfile:
if not delimiter or not quotechar:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
delimiter = dialect.delimiter
quotechar = dialect.quotechar
lines = list(unicode_csv.Reader(
path, delimiter=delimiter, dialect=dialect,
quotechar=quotechar, encoding=encoding))
#===================================================================
# Header
#===================================================================
f.write(u"#" + u"=" * 79 + "\n")
f.write(u"# %s\n" % fileBaseName)
f.write(u"#" + u"=" * 79 + u"\n")
#===================================================================
# Mapping
#===================================================================
i = 0
for item in lines[0]:
f.write(u"# %s: %s\n" % (i, item))
i += 1
#===================================================================
# Functionnal mapping
#===================================================================
f.write("""%s = Session(
u'Unique name for this session (Also used by logger)',
'%s',
delimiter='%s', quotechar='%s', encoding='%s')\n\n""" %
(fileBaseName, path, delimiter, quotechar, encoding))
f.write(u"model_name = Model('openerp.model.name', fields=[\n")
i = 0
for item in lines[0]:
f.write(u" Field(name='%s', columns=[%s], callbacks=[]), \n" %
(item, i))
i += 1
f.write(u"], update=True, create=True, search=[])\n")
f.write(u"\n")
f.write(u"# Bind and start importation\n")
for path in filenames:
#===================================================================
# Start lines
#===================================================================
(dirName, fileName) = os.path.split(path)
(fileBaseName, fileExtension) = os.path.splitext(fileName)
fileBaseName = fileBaseName.replace(u' ', u'_').replace(u'.', u'_')
f.write(u"%s.bind(oerp, [model_name])\n" % fileBaseName)
#=======================================================================
# Stats
#=======================================================================
f.write(u"\n")
f.write(u"# Show statistics\n")
f.write(u"csv2oerp.show_stats()\n")
if mode != u'stdout':
f.close()
return
if __name__ == '__main__':
try:
generate_code(sys.argv[1])
except IndexError:
print "You have to give a proper path to a csv file in first argument."