Source code for pandas_plink._read

from __future__ import division, unicode_literals

import sys
from collections import OrderedDict as odict

import pandas as pd

from ._bed_read import read_bed

PY3 = sys.version_info >= (3, )

if PY3:
    _ord = lambda x: x
else:
    _ord = ord






def _read_bim(fn):
    header = odict([('chrom', bytes), ('snp', bytes), ('cm', float),
                    ('pos', int), ('a0', bytes), ('a1', bytes)])
    df = pd.read_csv(
        fn,
        delim_whitespace=True,
        header=None,
        names=header.keys(),
        dtype=header,
        compression=None,
        engine='c')

    df['chrom'] = df['chrom'].astype('category')
    df['a0'] = df['a0'].astype('category')
    df['a1'] = df['a1'].astype('category')
    df['i'] = range(df.shape[0])
    df.set_index(['chrom', 'pos'], inplace=True)
    df.sort_index(inplace=True)
    return df


def _read_fam(fn):
    header = odict([('fid', str), ('iid', str), ('father', str),
                    ('mother', str), ('gender', bytes), ('trait', str)])

    df = pd.read_csv(
        fn,
        delim_whitespace=True,
        header=None,
        names=header.keys(),
        dtype=header,
        compression=None,
        index_col=['fid', 'iid'],
        engine='c')

    df['gender'] = df['gender'].astype('category')
    df['i'] = range(df.shape[0])
    df.sort_index(inplace=True)
    return df


def _read_bed(fn, nsamples, nmarkers, verbose):
    fn = _ascii_airlock(fn)

    _check_bed_header(fn)
    major = _major_order(fn)

    ncols = nmarkers if major == 'individual' else nsamples
    nrows = nmarkers if major == 'snp' else nsamples

    return read_bed(fn, nrows, ncols, verbose)


def _check_bed_header(fn):
    with open(fn, "rb") as f:
        arr = f.read(2)
        ok = _ord(arr[0]) == 108 and _ord(arr[1]) == 27
        if not ok:
            raise ValueError("Invalid BED file: %s." % fn)


def _major_order(fn):
    with open(fn, "rb") as f:
        f.seek(2)
        arr = f.read(1)
        if _ord(arr[0]) == 1:
            return 'snp'
        elif _ord(arr[0]) == 0:
            return 'individual'
        raise ValueError("Couldn't understand matrix layout.")


def _ascii_airlock(v):
    if not isinstance(v, bytes):
        v = v.encode()
    return v