Source code for pyinference.inference.factor

# coding=utf-8

from numpy import repeat, ones, array
from pyinference.inference.variable import Variable

__author__ = 'sejros'


def _itershape(tup):
    """ Итерирование по кортежу.
    """
    r = array(tup).prod()
    res = []
    for i in range(r):
        row = []
        index = i + 0
        for j in range(len(tup)):
            row.insert(0, index % tup[-j - 1])
            index = (index - (index % tup[-j - 1])) / tup[-j - 1]
        res.append(row)
    return res


[docs]class Factor(object): """ Фактор логического вывода. Фактор связывает несколько переменных в совместное распределение вероятности. Фактор это некая функция, которая ставит в соответствие каждому возможному набору значений некого множества переменных действительное число (значение фактора). Набор соответствующих значений A некоторого множества переменных V называется назначением (assignment), если: - мощность множества A равна мощности множества V; - каждый элемент множества A является элементом множества значений соответствующего элемента множества V Пример: допустим, M – бинарная переменная M=(“монета”, {“орел”, “решка”}), а V – набор переменных V={M}3={M, M, M}, тогда А={“орел”, “орел”, “решка”} – корректное назначение, а А1={“решка”, “решка”} – некорректное назначение, так как не указано значение третьей переменной. А2={“решка”, “решка”, “ребро”} – также не является корректным назначением, так как третий элемент («ребро») отсутствует в области определения третьей переменной M. Однако, назначение А2 является корректным для множества V’={M, M, M’}, где M’=(“другая монета”, {“орел”, “решка”, “ребро”}), так как теперь третья переменная во множестве V’ уже другая и она содержит значение «ребро» в своей области определения. По тем же причинам, назначение А2 является корректным для набора V’’={M’, M’, M’}. Таким образом, фактор определяется упорядоченной тройкой F=(N, V, X), где N – имя фактора, V – область определения фактора (набор переменных, на которых определен фактор), X – множество значений фактора соответствующей мощности. Фактор определен на некотором упорядоченном множестве переменных, называемом областью определения (scope) фактора. Область определения обозначается так: F(A,B,C) или так: scope(F)={A,B,C}, где F – фактор, A, B и C – переменные, входящие в фактор. Заметим, что определение фактора не накладывает никаких ограничений ни на отдельные значения, ни на сумму этих значений, несмотря на то, что теория вероятности предусматривает такие ограничения. Фактор – это базовый строительный блок в вероятностных графических моделях. и представление вероятностей – всего лишь одно из возможных применений факторов. Например, определим фактор на одной переменной M, следующим образом:: coin_var = Variable(name="coin", terms=["орел", "решка"]) coin_factor = Factor(name="coin", cons=[coin_var]) import numpy as np coin_factor.cpd = np.array([0.5, 0.5]) .. note:: Заметим, что по умолчанию, распределение фактора (атрибут `cpd`) принимает значения, соответствующие равномерному распределению. Так что последние две строчки в данном примере можно было бы опустить. Факторы удобны для использования в вероятностных графических моделях тем, что на них определены некоторые операции, часто выполняемые в процессе логического вывода, в довольно общей форме, что позволяет использовать их как элементарные объекты для построения логических сетей. Также, факторы могут представлять условные вероятности. Рассмотрим пример из медицинской диагностики. Определим бинарную переменную с=(«болезнь», {«нет», «есть»}) и бинарную переменную t=(«тест», {«положительный», «отрицательный»}). Переменная с показывает, есть ли у пациента определенное заболевание. а переменная t – результат диагностического теста. Допустим, что априорная вероятность данного заболевания равна 1%. Сконструируем фактор С=(«P(c)», {c}, {0.99, 0.01}). Очевидно, что вероятность получения определенного результата теста зависит от того, есть ли данное заболевание у пациента, или нет, то есть мы предполагаем известной вероятность P(t|c). Допустим, что вероятность получения положительного результата теста равна 90%. Соответственно, вероятность ложноотрицательного результата равна 0,1. Вероятность получить отрицательный результат при отсутствии заболевания равна 0,8, а вероятность ложноположительного результата – 0,2. Создадим фактор, представляющий данную условную вероятность:: c = Variable(name='C', terms=['no', 'yes']) t = Variable(name='T', terms=['pos', 'neg']) C = Factor(name='C', cons=[c]) C.cpd = np.array([0.99, 0.01]) T = Factor(name='T|C', cons=[t], cond=[c]) T.cpd = np.array([[0.2, 0.8], [0.9, 0.1]]) Синтаксис: Создадим два фактора, представляющих распределение двух переменных: P(A) и P(B|A). Первая переменная имеет безусловное распределение, значение второй зависит от значения первой. Подготовка переменных (могут быть как дискретные, так и со связанным классификатором): >>> from pyinference.fuzzy import set as fuzzy_set >>> a = Variable(name='A', terms=['low', 'high']) >>> fs = fuzzy_set.Partition(peaks=[0.0, 0.5, 1.0]) >>> b = Variable(name='B', terms=fs) Создание факторов: >>> A = Factor(name='A', cons=[a]) >>> B = Factor(name='B|A', cons=[b], cond=[a]) Поля класса: name (`str`): имя фактора cond (`list`): список условных переменных cons (`list`): список подусловных переменных vars (`list`): список всех переменных фактора (объединение предыдущих двух) shape (`tuple`): кортеж мощностей всех переменных фактора (сохраняя порядок атрибута `vars`). Соответствует форме массива `cpd`. cpd (:class:`numpy.array`): массив, хранящий распределение условной вероятности фактора. Именованные параметры: name (`str`): имя фактора cons (`list`): массив подусловных переменных cond (`list`): массив условных переменных Исключения: AttributeError: ошибка возникает, если массив подусловных переменных (cons) пуст """ def __init__(self, name='', cond=None, cons=None): if len(cons) == 0: raise AttributeError self.name = name self.cond = sorted(cond) if cond else [] self.cons = sorted(cons) self.vars = self.cond + self.cons self.shape = tuple([var.card for var in self.vars]) self.cpd = ones(self.shape) self._normalize() def _normalize(self): n, m = len(self.cons), len(self.cond) s = self.cpd.sum(axis=tuple(range(m, n + m))).flatten() koef = array(self.shape)[m:].prod() if isinstance(s, float): s = array([s]) s = repeat(s, koef) s[s == 0.0] = 1.0 self.cpd = self.cpd.flatten() / s self.cpd = self.cpd.reshape(self.shape) def _map(self, other): res = [] for i in xrange(len(other.vars)): for j in xrange(len(self.vars)): if self.vars[j].name == other.vars[i].name: res.append(j) break return res
[docs] def marginal(self, var): """ Выполняет маргинализацию переменной из фактора. Маргинализация позволяет исключить переменную из области определения фактора, просуммировав соответствующие значения назначений маргинализируемого фактора. Маргинализация является основой алгоритма variable elimination: - F(A,B,C) - B = F(A,C) - F(A,C|B) - B = F(A,C) - F(A,B|C) - B = F(A|C) Может вызываться как метод (``m = f1.marginal(var)``) или как оператор "-" (``m = f1 - var``). Синтаксис: >>> import numpy as np >>> c = Variable(name='C', terms=['no', 'yes']) >>> t = Variable(name='T', terms=['pos', 'neg']) >>> C = Factor(name='C', cons=[c]) >>> C.cpd = np.array([0.99, 0.01]) >>> T = Factor(name='T|C', cons=[t], cond=[c]) >>> T.cpd = np.array([[0.2, 0.8], [0.9, 0.1]]) >>> m = T - c >>> m.cpd array([ 1.1, 0.9]) >>> len(m.vars) 1 Параметры: var (:class:`Variable` or `list`): маргинализуемая (исключаемая) переменная. Также в данный метод может передаваться список исключаемых переменных. Возвращает: Маргинализированный фактор Исключения: `TypeError`: ошибка возникает, когда второй операнд имеет неподдерживаемый тип. """ ind = -1 for i in xrange(len(self.vars)): if var.name == self.vars[i].name: ind = i if ind == -1: raise AttributeError cond1 = set(self.cond) cons1 = set(self.cons) cons2 = {var} cond = cond1 - cons2 cons = cons1 - cons2 res = Factor(name="Marginal", cons=sorted(list(cons)), cond=sorted(list(cond))) res.cpd = self.cpd.sum(axis=ind) return res
[docs] def product(self, other): """ Реализует произведение факторов. Произведение факторов объединяет области определения двух факторов. Значением для каждого назначения является произведение соответствующих назначений множителей: - F(A) * F(B) = F(A,B) - F(A,|B) * F(B) = F(A,B) - F(A,B|C) * F(D|C) = F(A,B,D|C) - F(A,B|C) * F(D,C) = F(A,B,C,D) - None * F(A) = F(A) Может вызываться как метод (``p = f1.product(f2)``) или как оператор "*" (``p = f1 * f2``). .. note:: Также первым операндом произведения может выступать None (``None * f1``). Тогда метод вернет второй операнд. Это сделано для удобства множественного произведения. Синтаксис: >>> import numpy as np >>> c = Variable(name='C', terms=['no', 'yes']) >>> t = Variable(name='T', terms=['pos', 'neg']) >>> C = Factor(name='C', cons=[c]) >>> C.cpd = np.array([0.99, 0.01]) >>> T = Factor(name='T|C', cons=[t], cond=[c]) >>> T.cpd = np.array([[0.2, 0.8], [0.9, 0.1]]) >>> p = T * C >>> "%0.3f" % p.cpd[0,0] '0.198' >>> len(p.vars) 2 Параметры: other (:class:`Factor`): фактор-множитель. Возвращает: Фактор-произведение двух исходных Исключения: `AttributeError`: ошибка возникает, когда исключаемая переменная не входит в исходный фактор. `TypeError`: ошибка возникает, когда второй операнд имеет неподдерживаемый тип. """ cond1 = set(self.cond) cons1 = set(self.cons) cond2 = set(other.cond) cons2 = set(other.cons) everything = cons1 | cons2 | cond1 | cond2 cond = cond1 & cond2 cons = everything - cond res = Factor(name="Product", cons=list(cons), cond=list(cond)) flat = res.cpd.flatten() ass = _itershape(res.shape) map1 = res._map(self) map2 = res._map(other) for i in range(len(flat)): ass1, ass2 = [], [] for j in map1: ass1.append(ass[i][j]) for j in map2: ass2.append(ass[i][j]) cpd1 = self.cpd[tuple(ass1)] cpd2 = other.cpd[tuple(ass2)] flat[i] = cpd1 * cpd2 res.cpd = flat.reshape(res.shape) return res
[docs] def divide(self, other): """ Реализует деление факторов. Деление факторов позволяет получить условное распределение из безусловного: F(A,B) / F(B) = F(A|B) Может вызываться как метод (``p = f1.divide(f2)``) или как оператор "/" (``p = f1 / f2``). Синтаксис: >>> import numpy as np >>> c = Variable(name='C', terms=['no', 'yes']) >>> t = Variable(name='T', terms=['pos', 'neg']) >>> C = Factor(name='C', cons=[c]) >>> C.cpd = np.array([0.99, 0.01]) >>> T = Factor(name='T|C', cons=[t], cond=[c]) >>> T.cpd = np.array([[0.2, 0.8], [0.9, 0.1]]) >>> p = (C * T) / C >>> "%0.3f" % p.cpd[0,0] '0.200' >>> len(p.vars) 2 Параметры: other (:class:`Factor`): фактор-делитель. Возвращает: Фактор-частное двух исходных Исключения: NotImplementedError: ошибка возникает, когда: - делитель имеет условные переменные; - некоторые подусловные переменные делителя отсутствуют среди подусловных переменных делимого TypeError: ошибка возникает, когда второй операнд имеет неподдерживаемый тип. """ cond1 = set(self.cond) cons1 = set(self.cons) cond2 = set(other.cond) cons2 = set(other.cons) if len(cond2) > 0: raise NotImplementedError if len(cons2 & cond1) > 0: raise NotImplementedError if len(cons2 & cons1) < len(cons2): raise NotImplementedError cond = cond1 | cons2 cons = cons1 - cons2 res = Factor(name="Conditional", cons=sorted(list(cons)), cond=sorted(list(cond))) flat = res.cpd.flatten() ass = _itershape(res.shape) map1 = res._map(self) map2 = res._map(other) for i in range(len(flat)): ass1, ass2 = [], [] for j in map1: ass1.append(ass[i][j]) for j in map2: ass2.append(ass[i][j]) cpd1 = self.cpd[tuple(ass1)] cpd2 = other.cpd[tuple(ass2)] flat[i] = cpd1 / cpd2 res.cpd = flat.reshape(res.shape) res._normalize() return res
def __mul__(self, other): if other is None: return self elif not isinstance(other, Factor): raise TypeError return self.product(other) def __rmul__(self, other): if other is None: return self elif isinstance(other, Factor): return self.product(other) else: raise NotImplementedError def __sub__(self, other): if isinstance(other, Variable): return self.marginal(other) elif isinstance(other, list): res = self for var in other: res = res - var return res else: raise TypeError def __div__(self, other): if not isinstance(other, Factor): raise TypeError return self.divide(other) def __repr__(self): """ Подробное текстовое представление фактора. Возвращает: строку, содержащую слудующую информацию о факторе: - имя фактора; - имена подусловных и условных переменных, разделенных символом "|" (порядок внутри групп может не соблюдаться); - список всех переменных фактора в соответствующем порядке; - распределение условной вероятности по векторам значений переменных в соответственном порядке; - сумму вектора распределения (должна быть равна 1.0 для безусловных распределений). """ res = '' res += self.name + ':\n' flat = self.cpd.flatten() ass = _itershape(self.shape) res += str(self.cons) + '|' + str(self.cond) + '\n' res += str(self.vars) + '\n' res += str(self.shape) + ', ' + str(self.cpd.shape) + '\n' for i in range(len(ass)): res += str(ass[i]) + ' ' + str(flat[i]) + '\n' res += 'Sum: ' + str(self.cpd.sum()) + '\n' return res