Source code for gramps.gen.plug.utils

#
# Gramps - a GTK+/GNOME based genealogy program
#
# Copyright (C) 2009 B. Malengier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

"""
General utility functions useful for the generic plugin system
"""

#-------------------------------------------------------------------------
#
# Standard Python modules
#
#-------------------------------------------------------------------------
import sys
import os
import datetime
if sys.version_info[0] < 3:
    from StringIO import StringIO
else:
    from io import StringIO, BytesIO

#-------------------------------------------------------------------------
#
# set up logging
#
#-------------------------------------------------------------------------
import logging
LOG = logging.getLogger(".gen.plug")

#-------------------------------------------------------------------------
#
# Gramps modules
#
#-------------------------------------------------------------------------
from ._pluginreg import make_environment
from ..const import USER_PLUGINS
from ...version import VERSION_TUPLE
from . import BasePluginManager
from ..utils.configmanager import safe_eval
from ..config import config
from ..const import GRAMPS_LOCALE as glocale
_ = glocale.translation.sgettext
from ..constfunc import conv_to_unicode

#-------------------------------------------------------------------------
#
# Local utility functions for gen.plug
#
#-------------------------------------------------------------------------
[docs]def version_str_to_tup(sversion, positions): """ Given a string version and positions count, returns a tuple of integers. >>> version_str_to_tup("1.02.9", 2) (1, 2) """ try: tup = tuple(map(int, sversion.split("."))) tup += (0,) * (positions - len(tup)) except: tup = (0,) * positions return tup[:positions]
[docs]class newplugin(object): """ Fake newplugin. """ def __init__(self): globals()["register_results"].append({}) def __setattr__(self, attr, value): globals()["register_results"][-1][attr] = value
[docs]def register(ptype, **kwargs): """ Fake registration. Side-effect sets register_results to kwargs. """ retval = {"ptype": ptype} retval.update(kwargs) # Get the results back to calling function if "register_results" in globals(): globals()["register_results"].append(retval) else: globals()["register_results"] = [retval]
[docs]class Zipfile(object): """ Class to duplicate the methods of tarfile.TarFile, for Python 2.5. """ def __init__(self, buffer): import zipfile self.buffer = buffer self.zip_obj = zipfile.ZipFile(buffer)
[docs] def extractall(self, path, members=None): """ Extract all of the files in the zip into path. """ names = self.zip_obj.namelist() for name in self.get_paths(names): fullname = os.path.join(path, name) if not os.path.exists(fullname): os.mkdir(fullname) for name in self.get_files(names): fullname = os.path.join(path, name) outfile = file(fullname, 'wb') outfile.write(self.zip_obj.read(name)) outfile.close()
[docs] def extractfile(self, name): """ Extract a name from the zip file. >>> Zipfile(buffer).extractfile("Dir/dile.py").read() <Contents> """ class ExtractFile(object): def __init__(self, zip_obj, name): self.zip_obj = zip_obj self.name = name def read(self): data = self.zip_obj.read(self.name) del self.zip_obj return data return ExtractFile(self.zip_obj, name)
[docs] def close(self): """ Close the zip object. """ self.zip_obj.close()
[docs] def getnames(self): """ Get the files and directories of the zipfile. """ return self.zip_obj.namelist()
[docs] def get_paths(self, items): """ Get the directories from the items. """ return (name for name in items if self.is_path(name) and not self.is_file(name))
[docs] def get_files(self, items): """ Get the files from the items. """ return (name for name in items if self.is_file(name))
[docs] def is_path(self, name): """ Is the name a path? """ return os.path.split(name)[0]
[docs] def is_file(self, name): """ Is the name a directory? """ return os.path.split(name)[1]
[docs]def available_updates(): whattypes = config.get('behavior.check-for-update-types') if sys.version_info[0] < 3: from urllib2 import urlopen else: from urllib.request import urlopen LOG.debug("Checking for updated addons...") langs = glocale.get_language_list() langs.append("en") # now we have a list of languages to try: fp = None for lang in langs: URL = ("%s/listings/addons-%s.txt" % (config.get("behavior.addons-url"), lang)) LOG.debug(" trying: %s" % URL) try: fp = urlopen(URL, timeout=10) # seconds except: try: URL = ("%s/listings/addons-%s.txt" % (config.get("behavior.addons-url"), lang[:2])) fp = urlopen(URL, timeout=10) except Exception as err: # some error LOG.warning("Failed to open addon metadata for {lang} {url}: {err}". format(lang=lang, url=URL, err=err)) fp = None if fp and fp.getcode() == 200: # ok break pmgr = BasePluginManager.get_instance() addon_update_list = [] if fp and fp.getcode() == 200: lines = list(fp.readlines()) count = 0 for line in lines: line = line.decode('utf-8') try: plugin_dict = safe_eval(line) if type(plugin_dict) != type({}): raise TypeError("Line with addon metadata is not " "a dictionary") except: LOG.warning("Skipped a line in the addon listing: " + str(line)) continue id = plugin_dict["i"] plugin = pmgr.get_plugin(id) if plugin: LOG.debug("Comparing %s > %s" % (version_str_to_tup(plugin_dict["v"], 3), version_str_to_tup(plugin.version, 3))) if (version_str_to_tup(plugin_dict["v"], 3) > version_str_to_tup(plugin.version, 3)): LOG.debug(" Downloading '%s'..." % plugin_dict["z"]) if "update" in whattypes: if (not config.get('behavior.do-not-show-previously-seen-updates') or plugin_dict["i"] not in config.get('behavior.previously-seen-updates')): addon_update_list.append((_("Updated"), "%s/download/%s" % (config.get("behavior.addons-url"), plugin_dict["z"]), plugin_dict)) else: LOG.debug(" '%s' is ok" % plugin_dict["n"]) else: LOG.debug(" '%s' is not installed" % plugin_dict["n"]) if "new" in whattypes: if (not config.get('behavior.do-not-show-previously-seen-updates') or plugin_dict["i"] not in config.get('behavior.previously-seen-updates')): addon_update_list.append((_("updates|New"), "%s/download/%s" % (config.get("behavior.addons-url"), plugin_dict["z"]), plugin_dict)) config.set("behavior.last-check-for-updates", datetime.date.today().strftime("%Y/%m/%d")) count += 1 if fp: fp.close() else: LOG.debug("Checking Addons Failed") LOG.debug("Done checking!") return addon_update_list
[docs]def load_addon_file(path, callback=None): """ Load an addon from a particular path (from URL or file system). """ if sys.version_info[0] < 3: from urllib2 import urlopen else: from urllib.request import urlopen import tarfile import io if (path.startswith("http://") or path.startswith("https://") or path.startswith("ftp://")): try: fp = urlopen(path) except: if callback: callback(_("Unable to open '%s'") % path) return else: try: fp = open(path) except: if callback: callback(_("Unable to open '%s'") % path) return try: content = fp.read() if sys.version_info[0] < 3: buffer = StringIO(content) else: buffer = BytesIO(content) except: if callback: callback(_("Error in reading '%s'") % path) return fp.close() # file_obj is either Zipfile or TarFile if path.endswith(".zip") or path.endswith(".ZIP"): file_obj = Zipfile(buffer) elif path.endswith(".tar.gz") or path.endswith(".tgz"): try: file_obj = tarfile.open(None, fileobj=buffer) except: if callback: callback(_("Error: cannot open '%s'") % path) return else: if callback: callback(_("Error: unknown file type: '%s'") % path) return # First, see what versions we have/are getting: good_gpr = set() for gpr_file in [name for name in file_obj.getnames() if name.endswith(".gpr.py")]: if callback: callback((_("Examining '%s'...") % gpr_file) + "\n") contents = file_obj.extractfile(gpr_file).read() # Put a fake register and _ function in environment: env = make_environment(register=register, newplugin=newplugin, _=lambda text: text) # clear out the result variable: globals()["register_results"] = [] # evaluate the contents: try: exec(contents, env) except: if callback: msg = _("Error in '%s' file: cannot load.") % gpr_file callback(" " + msg + "\n") continue # There can be multiple addons per gpr file: for results in globals()["register_results"]: gramps_target_version = results.get("gramps_target_version", None) id = results.get("id", None) if gramps_target_version: vtup = version_str_to_tup(gramps_target_version, 2) # Is it for the right version of gramps? if vtup == VERSION_TUPLE[0:2]: # If this version is not installed, or > installed, install it good_gpr.add(gpr_file) if callback: callback(" " + (_("'%s' is for this version of Gramps.") % id) + "\n") else: # If the plugin is for another version; inform and do nothing if callback: callback(" " + (_("'%s' is NOT for this version of Gramps.") % id) + "\n") callback(" " + (_("It is for version %(v1)d.%(v2)d") % { 'v1': vtup[0], 'v2': vtup[1]} + "\n")) continue else: # another register function doesn't have gramps_target_version if gpr_file in good_gpr: s.remove(gpr_file) if callback: callback(" " + (_("Error: missing gramps_target_version in '%s'...") % gpr_file) + "\n") if len(good_gpr) > 0: # Now, install the ok ones file_obj.extractall(USER_PLUGINS) if callback: callback((_("Installing '%s'...") % path) + "\n") gpr_files = set([os.path.split(os.path.join(USER_PLUGINS, name))[0] for name in good_gpr]) for gpr_file in gpr_files: u_gpr_file = conv_to_unicode(gpr_file) if callback: callback(" " + (_("Registered '%s'") % u_gpr_file) + "\n") file_obj.close() #------------------------------------------------------------------------- # # OpenFileOrStdout class # #-------------------------------------------------------------------------
[docs]class OpenFileOrStdout: """Context manager to open a file or stdout for writing.""" def __init__(self, filename): self.filename = filename self.filehandle = None def __enter__(self): if self.filename == '-': self.filehandle = sys.stdout else: self.filehandle = open(self.filename, 'w') return self.filehandle def __exit__(self, exc_type, exc_value, traceback): if self.filehandle and self.filename != '-': self.filehandle.close() return False #------------------------------------------------------------------------- # # OpenFileOrStdin class # #-------------------------------------------------------------------------
[docs]class OpenFileOrStdin: """Context manager to open a file or stdin for reading.""" def __init__(self, filename, add_mode=''): self.filename = filename self.mode = 'r%s' % add_mode self.filehandle = None def __enter__(self): if self.filename == '-': self.filehandle = sys.stdin else: self.filehandle = open(self.filename, self.mode) return self.filehandle def __exit__(self, exc_type, exc_value, traceback): if self.filename != '-': self.filehandle.close() return False