Source code for kwalitee.models

# -*- coding: utf-8 -*-
#
# This file is part of kwalitee
# Copyright (C) 2014, 2015 CERN.
#
# kwalitee is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# kwalitee is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with kwalitee; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
#
# In applying this licence, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

"""Database models to persist the data over time."""

from __future__ import unicode_literals

import os

from datetime import datetime
from flask import json
from flask_sqlalchemy import SQLAlchemy


# Storing states as integers so string can be changed/l10n later
STATE_PENDING = 0
STATE_SUCCESS = 1
STATE_ERROR = 2

STATES = {
    STATE_PENDING: "pending",
    STATE_SUCCESS: "success",
    STATE_ERROR: "error"
}


db = SQLAlchemy()


[docs]def init_app(app): """Initialize the Flask app with db.""" database = os.path.join(app.instance_path, app.config.get("DATABASE_NAME", "database")) database = "{0}.db".format(database) app.config.setdefault("DATABASE", database) app.config.setdefault("SQLALCHEMY_DATABASE_URI", os.environ.get("SQLALCHEMY_DATABASE_URI", "sqlite:///{0}".format(database))) db.app = app db.init_app(app) if not db.engine.has_table(Account.__tablename__): db.create_all() return db
[docs]class Account(db.Model): """Github account.""" id = db.Column(db.Integer(), primary_key=True, autoincrement=True) """Identifier""" name = db.Column(db.UnicodeText(), unique=True, nullable=False) """Username.""" email = db.Column(db.UnicodeText()) """Email.""" token = db.Column(db.UnicodeText()) """API Token.""" created_at = db.Column(db.DateTime(), nullable=False, default=datetime.now) """Date of creation.""" updated_at = db.Column(db.DateTime(), nullable=False, default=datetime.now, onupdate=datetime.now) """Date of last modification.""" def __init__(self, name, email=None, token=None): """Initialize the account.""" self.name = name self.email = email self.token = token def __repr__(self): """String representation of the account.""" return "<Account ({0.id}, {0.name}, {0.email})>".format(self) @classmethod
[docs] def create(cls, name, email=None, token=None): """Create and commit and new account.""" acc = cls(name, email, token) db.session.add(acc) db.session.commit() return acc
@classmethod
[docs] def find_or_create(cls, name, email=None, token=None): """Find or create an account.""" acc = cls.query.filter_by(name=name).first() if not acc: return cls.create(name, email, token) return acc
@classmethod
[docs] def update_or_create(cls, name, email=None, token=None): """Modify or create an account.""" acc = cls.query.filter_by(name=name).first() if not acc: return cls.create(name, email, token) else: if email and acc.email != email: acc.email = email if token and acc.token != token: acc.token = token if db.session.is_modified(acc): db.session.add(acc) db.session.commit() return acc
[docs]class Repository(db.Model): """Github repository.""" __table_args__ = db.UniqueConstraint('owner_id', 'name'), id = db.Column(db.Integer, primary_key=True, autoincrement=True) owner_id = db.Column(db.Integer(), db.ForeignKey("account.id"), nullable=False) name = db.Column(db.UnicodeText(), nullable=False) created_at = db.Column(db.DateTime(), nullable=False, default=datetime.now) updated_at = db.Column(db.DateTime(), nullable=False, default=datetime.now, onupdate=datetime.now) owner = db.relationship("Account", backref=db.backref("repositories", cascade="all", order_by=db.asc(name), lazy="dynamic")) @property def fullname(self): """Get the fullname of the repository.""" return "{0}/{1}".format(self.owner.name, self.name) def __init__(self, owner, name): """Initialize the repository.""" self.owner = owner self.name = name def __repr__(self): """String representation of the repository.""" return "<Repository ({0}, {1})>".format(self.id, self.fullname) @classmethod
[docs] def find_or_create(cls, owner, name): """Find or create a repository.""" repo = cls.query.filter_by(name=name, owner_id=owner.id).first() if not repo: repo = cls(owner, name) db.session.add(repo) db.session.commit() return repo
[docs]class CommitStatus(db.Model): """Status of a push.""" __table_args__ = db.UniqueConstraint('repository_id', 'sha'), id = db.Column(db.Integer, primary_key=True, autoincrement=True) repository_id = db.Column(db.Integer(), db.ForeignKey("repository.id"), nullable=False) sha = db.Column(db.UnicodeText(), nullable=False) url = db.Column(db.UnicodeText(), nullable=False) _state = db.Column(db.Integer(), nullable=False) _errors = db.Column(db.Integer(), nullable=False) _content = db.Column(db.UnicodeText()) created_at = db.Column(db.DateTime(), nullable=False, default=datetime.now) updated_at = db.Column(db.DateTime(), nullable=False, default=datetime.now, onupdate=datetime.now) repository = db.relationship("Repository", backref=db.backref("commit_statuses", cascade="all", order_by=db.desc(id), lazy="dynamic")) @property def errors(self): """Get the number of errors found.""" return self._errors @property def state(self): """Get the state.""" return STATES[self._state]
[docs] def get_content(self): """Get the content of the status.""" return json.loads(self._content)
[docs] def set_content(self, value): """Set the content of the status.""" self._errors = len(value["message"]) if value["files"] is not None: for ferrors in value["files"].values(): self._errors += len(ferrors["errors"] or []) if self._errors: self._state = STATE_ERROR else: self._state = STATE_SUCCESS self._content = json.dumps(value, ensure_ascii=False)
content = property(get_content, set_content) def __init__(self, repository, sha, url, content=None): """Initialize the commit status.""" self.repository = repository self.sha = sha self.url = url self._errors = 0 if content: self.content = content else: self.content = {"message": [], "files": None} self._state = STATE_PENDING def __repr__(self): """String representation of the commit status.""" return "<CommitStatus ({0.id}, {0.repository_id}, {0.sha}, " \ "{0.state})>" \ .format(self)
[docs] def is_pending(self): """Return True is the commit status hasn't been checked yet.""" return self._state == STATE_PENDING
@classmethod
[docs] def find_or_create(cls, repository, sha, url): """Find or create a commit status.""" cs = cls.query.filter_by(repository_id=repository.id, sha=sha).first() if not cs: cs = CommitStatus(repository, sha, url) db.session.add(cs) db.session.commit() return cs
[docs]class BranchStatus(db.Model): """Status of a pull request.""" __table_args__ = db.UniqueConstraint('commit_id', 'name'), id = db.Column(db.Integer(), primary_key=True, autoincrement=True) commit_id = db.Column(db.Integer(), db.ForeignKey("commit_status.id"), nullable=False) name = db.Column(db.UnicodeText(), nullable=False) url = db.Column(db.UnicodeText(), nullable=False) _state = db.Column(db.Integer(), nullable=False) _errors = db.Column(db.Integer(), nullable=False) _content = db.Column(db.UnicodeText()) created_at = db.Column(db.DateTime(), nullable=False, default=datetime.now) updated_at = db.Column(db.DateTime(), nullable=False, default=datetime.now, onupdate=datetime.now) commit = db.relationship("CommitStatus", backref=db.backref("branch_statuses", cascade="all", order_by=db.desc(id), lazy="dynamic")) @property def errors(self): """Get the number of errors found.""" return self._errors @property def state(self): """Get the state.""" return STATES[self._state]
[docs] def get_content(self): """Get the content of the status.""" return json.loads(self._content)
[docs] def set_content(self, value): """Set the content of the status.""" c = {"commits": [], "files": value.get("files", {})} if "files" in value and value["files"]: for ferrors in value["files"].values(): self._errors += len(ferrors["errors"] or []) for commit in value["commits"]: if not isinstance(commit, CommitStatus): # FIXME potentially unnecessary heavy operation commit = CommitStatus.query.filter_by( repository_id=self.commit.repository_id, sha=commit ).first() self._errors += len(commit.content["message"]) c["commits"].append(commit.sha) if "files" not in value or value["files"] is None: self._state = STATE_PENDING elif self._errors: self._state = STATE_ERROR else: self._state = STATE_SUCCESS self._content = json.dumps(c, ensure_ascii=False)
content = property(get_content, set_content) def __init__(self, commit, name, url, content=None): """Initialize a branch status.""" self.commit = commit self.name = name self.url = url self._errors = 0 if content: self.content = content else: self.content = {"commits": [], "files": {}} self._state = STATE_PENDING def __repr__(self): """String representation of a branch status.""" return "<BranchStatus ({0.id}, {0.commit_id}, {0.name}, {0.state})>" \ .format(self)
[docs] def is_pending(self): """Return True is the commit status hasn't been checked yet.""" return self._state == STATE_PENDING
@classmethod
[docs] def find_or_create(cls, commit, name, url, content=None): """Find or create a commit status.""" bs = cls.query.filter_by(commit_id=commit.id, name=name).first() if not bs: bs = BranchStatus(commit, name, url, content) db.session.add(bs) db.session.commit() return bs