Source code for savReaderWriter.header
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from ctypes import *
import struct
import sys
import os
import re
import time
import getpass
import functools
#import gc
from savReaderWriter import *
from generic import *
__version__ = version
[docs]class Header(Generic):
"""
This class contains methods responsible for getting and setting meta data
that is embedded in the IBM SPSS Statistics data file. In SPSS speak, this
header information is known as the SPSS Data Dictionary (which has diddly
squat to do with a Python dictionary!). NOTE: this class should not be
called directly. Use `SavHeaderReader` to retrieve metadata.
"""
def __init__(self, savFileName, mode, refSavFileName, ioUtf8=False, ioLocale=None):
"""Constructor"""
super(Header, self).__init__(savFileName, ioUtf8, ioLocale)
self.fh = super(Header, self).openSavFile(savFileName, mode,
refSavFileName)
self.varNames, self.varTypes = self.varNamesTypes
self.vNames = dict(zip(self.varNames, self.encode(self.varNames)))
def openSavFile(self):
"""This function returns the file handle that was opened in the
super class"""
return self.fh
def decode(func):
"""Decorator to Utf-8 decode all str items contained in a dictionary
If ioUtf8=True, the dictionary's keys and values are decoded, but only
values that are strs, lists, or dicts."""
bytes_ = bytes if sys.version_info[0] > 2 else str
uS = lambda x: x.decode("utf-8") if isinstance(x, bytes_) else x
uL = lambda x: list(map(uS, x)) if isinstance(x, list) else x
@functools.wraps(func)
def wrapper(arg):
result = func(arg)
if not arg.ioUtf8 or arg.ioUtf8 == 2:
return result # unchanged
if isinstance(result, bytes_):
return uS(result)
uresult = {}
for k, v in result.items():
uresult[uS(k)] = {}
try:
for i, j in v.items(): # or wrapper(j) recursion?
uresult[uS(k)][uS(i)] = uS(uL(j))
except AttributeError:
uresult[uS(k)] = uL(uS(v))
return uresult
return wrapper
def encode(self, item):
"""Counter part of decode helper function, does the opposite of that
function (but is not a decorator)"""
if not self.ioUtf8:
return item # unchanged
u = str if isPy3k else unicode
utf8dify = lambda x: x.encode("utf-8") if isinstance(x, u) else x
if isinstance(item, list):
return list(map(utf8dify, item))
elif isinstance(item, dict):
return dict([(utf8dify(x), utf8dify(y)) for x, y in item.items()])
return utf8dify(item)
def freeMemory(self, funcName, *args):
"""Clean up: free memory claimed by e.g. getValueLabels and
getVarNamesTypes"""
#gc.collect()
if segfaults:
return
#print("... freeing", funcName[8:])
func = getattr(self.spssio, funcName)
retcode = func(*args)
if retcode:
checkErrsWarns("Problem freeing memory using %s" % funcName, retcode)
@property
def numberofCases(self):
"""This function reports the number of cases present in a data file.
Prehistoric files (< SPSS v6.0) don't contain nCases info, therefore
a guesstimate of the number of cases is given for those files
(cf. `SHOW N`)
See also
--------
savReaderWriter.SavReader.__len__ : use `len(reader)` to get the
number of cases
savReaderWriter.SavReader.shape : use `reader.shape` to get a
(nrows, ncols) ntuple"""
nCases = c_long()
func = self.spssio.spssGetNumberofCases
func.argtypes = [c_int, POINTER(c_long)]
retcode = func(self.fh, nCases)
if nCases.value == -1:
func = self.spssio.spssGetEstimatedNofCases
func.argtypes = [c_int, POINTER(c_long)]
retcode = func(self.fh, nCases)
if retcode:
checkErrsWarns("Problem getting number of cases", retcode)
return nCases.value
@property
def numberofVariables(self):
"""This function returns the number of variables (columns) in the
spss dataset
See also
--------
savReaderWriter.SavReader.shape : use `reader.shape` to get a
(nrows, ncols) ntuple"""
numVars = c_int()
func = self.spssio.spssGetNumberofVariables
func.argtypes = [c_int, POINTER(c_int)]
retcode = func(self.fh, numVars)
if retcode:
checkErrsWarns("Problem getting number of variables", retcode)
return numVars.value
@property
def varNamesTypes(self):
"""Get/Set a tuple of variable names and types
* Variable names is a list of the form `[b'var1', b'var2', b'etc']`
* Variable types is a dictionary of the form `{varName: varType}`
The variable type code is an integer in the range 0-32767, 0
indicating a numeric variable (e.g., `F8.2`) and a positive value
indicating a string variable of that size (in bytes)."""
if hasattr(self, "varNames"):
return self.varNames, self.varTypes
# initialize arrays
numVars = self.numberofVariables
numVars_ = c_int(numVars)
varNamesArr = POINTER(c_char_p * numVars)()
varTypesArr = POINTER(c_int * numVars)()
# get variable names
func = self.spssio.spssGetVarNames
func.argtypes = [c_int, POINTER(c_int),
POINTER(POINTER(c_char_p * numVars)),
POINTER(POINTER(c_int * numVars))]
retcode = func(self.fh, numVars_, varNamesArr, varTypesArr)
if retcode:
checkErrsWarns("Problem getting variable names & types", retcode)
# get array contents
varNames = [varNamesArr[0][i] for i in xrange(numVars)]
varTypes = [varTypesArr[0][i] for i in xrange(numVars)]
if self.ioUtf8 and not self.ioUtf8 == 2:
varNames = [varName.decode("utf-8") for varName in varNames]
# clean up
args = (varNamesArr, varTypesArr, numVars)
self.freeMemory("spssFreeVarNames", *args)
return varNames, dict(zip(varNames, varTypes))
@varNamesTypes.setter
def varNamesTypes(self, varNamesVarTypes):
badLengthMsg = ("Empty or longer than %s chars" %
(MAXLENGTHS['SPSS_MAX_VARNAME'][0]))
varNames, varTypes = varNamesVarTypes
varNameRetcodes = {
0: ('SPSS_NAME_OK', 'Valid standard name'),
1: ('SPSS_NAME_SCRATCH', 'Valid scratch var name'),
2: ('SPSS_NAME_SYSTEM', 'Valid system var name'),
3: ('SPSS_NAME_BADLTH', badLengthMsg),
4: ('SPSS_NAME_BADCHAR', 'Invalid character or embedded blank'),
5: ('SPSS_NAME_RESERVED', 'Name is a reserved word'),
6: ('SPSS_NAME_BADFIRST', 'Invalid initial char (otherwise OK)')}
validate = self.spssio.spssValidateVarname
func = self.spssio.spssSetVarName
func.argtypes = [c_int, c_char_p, c_int]
for varName in self.varNames:
varLength = self.varTypes[varName]
retcode = validate(c_char_py3k(varName))
if retcode:
msg = ("%r is an invalid variable name [%r]" %
(varName, ": ".join(varNameRetcodes.get(retcode))))
raise SPSSIOError(msg, retcode)
retcode = func(self.fh, c_char_py3k(varName), varLength)
if retcode:
msg = "Problem setting variable name %r" % varName
checkErrsWarns(msg, retcode)
@property
@decode
def valueLabels(self):
"""Get/Set `VALUE LABELS`. Takes a dictionary of the form
`{varName: {value: valueLabel}}`:
.. code-block:: python
{b'numGender': {1: b'female',
{2: b'male'},
b'strGender': {b'f': b'female',
b'm': b'male'}}
"""
def initArrays(isNumeric=True, size=0):
"""default size=0 is used to request array size"""
labelsArr = (POINTER(c_char_p * size))()
if isNumeric:
return (POINTER(c_double * size))(), labelsArr
return (POINTER(c_char_p * size))(), labelsArr
funcN = self.spssio.spssGetVarNValueLabels
funcC = self.spssio.spssGetVarCValueLabels
valueLabels = {}
for varName in self.varNames:
vName = self.vNames[varName]
numLabels = c_int()
# step 1a: get array size (numeric values)
if self.varTypes[varName] == 0:
valuesArr, labelsArr = initArrays(True)
func = funcN
func.argtypes = [c_int, c_char_p,
POINTER(POINTER(c_double * 0)),
POINTER(POINTER(c_char_p * 0)),
POINTER(c_int)]
retcode = func(self.fh, c_char_py3k(vName),
valuesArr, labelsArr, numLabels)
valuesArr, labelsArr = initArrays(True, numLabels.value)
func.argtypes = [c_int, c_char_p,
POINTER(POINTER(c_double * numLabels.value)),
POINTER(POINTER(c_char_p * numLabels.value)),
POINTER(c_int)]
# step 1b: get array size (string values)
else:
valuesArr, labelsArr = initArrays(False)
func = funcC
func.argtypes = [c_int, c_char_p,
POINTER(POINTER(c_char_p * 0)),
POINTER(POINTER(c_char_p * 0)),
POINTER(c_int)]
retcode = func(self.fh, c_char_py3k(vName),
valuesArr, labelsArr, numLabels)
valuesArr, labelsArr = initArrays(False, numLabels.value)
func.argtypes = [c_int, c_char_p,
POINTER(POINTER(c_char_p * numLabels.value)),
POINTER(POINTER(c_char_p * numLabels.value)),
POINTER(c_int)]
# step 2: get labels with array of proper size
retcode = func(self.fh, c_char_py3k(vName),
valuesArr, labelsArr, numLabels)
if retcode:
msg = "Problem getting value labels of variable %r" % varName
checkErrsWarns(msg, retcode)
# get array contents
if not numLabels.value:
continue
values = [valuesArr[0][i] for i in xrange(numLabels.value)]
labels = [labelsArr[0][i] for i in xrange(numLabels.value)]
valueLabelsX = [(val, lbl) for val, lbl in zip(values, labels)]
valueLabels[varName] = dict(valueLabelsX)
# clean up
args = (valuesArr, labelsArr, numLabels)
if self.varTypes[varName] == 0:
self.freeMemory("spssFreeVarNValueLabels", *args)
else:
self.freeMemory("spssFreeVarCValueLabels", *args)
return valueLabels
@valueLabels.setter
def valueLabels(self, valueLabels):
if not valueLabels:
return
valLabN = self.spssio.spssSetVarNValueLabel
valLabN.argtypes = [c_int, c_char_p, c_double, c_char_p]
valLabC = self.spssio.spssSetVarCValueLabel
valLabC.argtypes = [c_int, c_char_p, c_char_p, c_char_p]
valueLabels = self.encode(valueLabels)
for varName, valueLabelsX in valueLabels.items():
valueLabelsX = self.encode(valueLabelsX)
for value, label in valueLabelsX.items():
if self.varTypes[varName] == 0:
retcode = valLabN(self.fh, c_char_py3k(varName),
value, c_char_py3k(label))
else:
retcode = valLabC(self.fh, c_char_py3k(varName),
c_char_py3k(value), c_char_py3k(label))
if retcode:
msg = "Problem setting value labels of variable %r"
checkErrsWarns(msg % varName, retcode)
@property
@decode
def varLabels(self):
"""Get/set `VARIABLE LABELS`.
Returns/takes a dictionary of the form `{varName: varLabel}`.
For example:
.. code-block:: python
{b'salary': b'Salary (dollars)',
b'educ': b'Educational level (years)'}
"""
lenBuff = MAXLENGTHS['SPSS_MAX_VARLABEL'][0]
varLabel = create_string_buffer(lenBuff)
func = self.spssio.spssGetVarLabelLong
func.argtypes = [c_int, c_char_p, POINTER(c_char * lenBuff),
c_int, POINTER(c_int)]
varLabels = {}
for varName in self.varNames:
vName = self.vNames[varName]
retcode = func(self.fh, c_char_py3k(vName),
varLabel, lenBuff, c_int())
varLabels[varName] = varLabel.value
if retcode:
msg = "Problem getting variable label of variable %r" % varName
checkErrsWarns(msg, retcode)
return varLabels
@varLabels.setter
def varLabels(self, varLabels):
if not varLabels:
return
func = self.spssio.spssSetVarLabel
func.argtypes = [c_int, c_char_p, c_char_p]
varLabels = self.encode(varLabels)
for varName, varLabel in varLabels.items():
retcode = func(self.fh, c_char_py3k(varName),
c_char_py3k(varLabel))
if retcode:
msg = ("Problem with setting variable label %r of variable %r"
% (varLabel, varName))
checkErrsWarns(msg, retcode)
@property
@decode
def formats(self):
"""Get the `PRINT FORMATS`, set `PRINT FORMATS` and `WRITE FORMATS`.
Returns/takes a dictionary of the form `{varName: <spss-format>}`.
For example:
.. code-block:: python
{b'salary': b'DOLLAR8',
b'gender': b'A1',
b'educ': b'F8.2'}"""
if hasattr(self, "formats_"):
return self.formats_
func = self.spssio.spssGetVarPrintFormat
func.argtypes = [c_int, c_char_p, POINTER(c_int),
POINTER(c_int), POINTER(c_int)]
printFormat_, printDec_, printWid_ = c_int(), c_int(), c_int()
self.formats_ = {}
for varName in self.varNames:
vName = self.vNames[varName]
retcode = func(self.fh, c_char_py3k(vName),
printFormat_, printDec_, printWid_)
if retcode:
msg = "Error getting print format for variable '%s'"
checkErrsWarns(msg % vName.decode(), retcode)
printFormat = allFormats.get(printFormat_.value)[0]
printFormat = printFormat.split(b"_")[-1]
format_ = printFormat + bytez(str(printWid_.value))
if self.varTypes[varName] == 0:
format_ += (b"." + bytez(str(printDec_.value)))
if format_.endswith(b".0"):
format_ = format_[:-2]
self.formats_[varName] = format_
return self.formats_
def _splitformats(self):
"""This function returns the 'bare' formats + variable widths,
e.g. format `F5.3` is returned as 'F' and '5'"""
pattern = b"(?P<bareFmt>[a-z]+)(?P<varWid>\d+)[.]?\d*"
if self.ioUtf8_:
pattern = pattern.decode("utf-8")
regex = re.compile(pattern, re.I)
bareformats, varWids = {}, {}
for varName, format_ in self.formats.items():
bareformat, varWid = regex.findall(format_)[0]
bareformats[varName] = bareformat
varWids[varName] = int(varWid)
return bareformats, varWids
@formats.setter
def formats(self, formats):
if not formats:
return
reverseFormats = dict([(v[0][9:], k) for k, v in allFormats.items()])
validValues = sorted(reverseFormats.keys())
regex = b"(?P<printFormat>A(HEX)?)(?P<printWid>\d+)"
isStringVar = re.compile(regex, re.IGNORECASE)
regex = b"(?P<printFormat>[A-Z]+)(?P<printWid>\d+)\.?(?P<printDec>\d*)"
isAnyVar = re.compile(regex, re.IGNORECASE)
funcP = self.spssio.spssSetVarPrintFormat # print type
funcP.argtypes = [c_int, c_char_p, c_int, c_int, c_int]
funcW = self.spssio.spssSetVarWriteFormat # write type
funcW.argtypes = funcP.argtypes
for varName, format_ in self.encode(formats).items():
format_ = format_.upper()
gotString = isStringVar.match(format_)
gotAny = isAnyVar.match(format_)
msg = ("Unknown format %r or invalid width for variable %r. " +
"Valid formats are: %s")
msg = msg % (format_, varName, b", ".join(validValues))
if gotString:
printFormat = gotString.group("printFormat")
printFormat = reverseFormats.get(printFormat)
printDec = 0
printWid = int(gotString.group("printWid"))
elif gotAny:
printFormat = gotAny.group("printFormat")
printFormat = reverseFormats.get(printFormat)
printDec = gotAny.group("printDec")
printDec = int(printDec) if printDec else 0
printWid = int(gotAny.group("printWid"))
else:
raise ValueError(msg)
if printFormat is None:
raise ValueError(msg)
args = (self.fh, c_char_py3k(varName),
printFormat, printDec, printWid)
retcode1, retcode2 = funcP(*args), funcW(*args)
if retcodes.get(retcode1) == "SPSS_INVALID_PRFOR":
# invalid PRint FORmat
msg = "format for %r misspecified (%r)"
raise SPSSIOError(msg % (varName, format_), retcode1)
if retcode1:
msg = "Problem setting format_ %r for %r" % (format_, varName)
checkErrsWarns(msg, retcode1)
def _getMissingValue(self, varName):
"""This is a helper function for the missingValues getter
method. The function returns the missing values of variable <varName>
as a a dictionary. The dictionary keys and items depend on the
particular definition, which may be discrete values and/or ranges.
Range definitions are only possible for numerical variables."""
if self.varTypes[varName] == 0:
func = self.spssio.spssGetVarNMissingValues
func.argtypes = [c_int, c_char_p, POINTER(c_int),
POINTER(c_double),
POINTER(c_double),
POINTER(c_double)]
args = (c_double(), c_double(), c_double())
else:
lenBuff = 9 # char miss vals: max 9 bytes. Newer versions also?
func = self.spssio.spssGetVarCMissingValues
func.argtypes = [c_int, c_char_p, POINTER(c_int),
POINTER(c_char * lenBuff),
POINTER(c_char * lenBuff),
POINTER(c_char * lenBuff)]
args = (create_string_buffer(lenBuff), create_string_buffer(lenBuff),
create_string_buffer(lenBuff))
missingFmt = c_int()
vName = self.vNames[varName]
retcode = func(self.fh, c_char_py3k(vName), missingFmt, *args)
if retcode:
msg = "Error getting missing value for variable '%s'" % varName
checkErrsWarns(msg, retcode)
v1, v2, v3 = [v.value for v in args]
userMiss = dict([(v, k) for k, v in userMissingValues.items()])
missingFmt = userMiss[missingFmt.value]
if missingFmt == "SPSS_NO_MISSVAL":
return {}
elif missingFmt == "SPSS_ONE_MISSVAL":
return {u"values": [v1]}
elif missingFmt == "SPSS_TWO_MISSVAL":
return {u"values": [v1, v2]}
elif missingFmt == "SPSS_THREE_MISSVAL":
return {u"values": [v1, v2, v3]}
elif missingFmt == "SPSS_MISS_RANGE":
return {u"lower": v1, u"upper": v2}
elif missingFmt == "SPSS_MISS_RANGEANDVAL":
return {u"lower": v1, u"upper": v2, u"value": v3}
def _setMissingValue(self, varName, **kwargs):
"""This is a helper function for the missingValues setter
method. The function sets missing values for variable <varName>.
Valid keyword arguments are:
* to specify a RANGE: 'lower', 'upper', optionally with 'value'
* to specify DISCRETE VALUES: 'values', specified as a list no longer
than three items, or as None, or as a float/int/str
Note: in v3.3.0, lower-upper-value(s) was returned as bytestring,
now as ustring, including when `ioLocale=False`.
"""
if kwargs == {}:
return 0
fargs = ["lower", "upper", "value", "values"]
if set(kwargs.keys()).difference(set(fargs)):
raise ValueError("Allowed keywords are: %s" % ", ".join(fargs))
varName = self.encode(varName)
varType = self.varTypes[varName]
# range of missing values, e.g. MISSING VALUES aNumVar (-9 THRU -1).
if varType == 0:
placeholder = 0.0
if "lower" in kwargs and "upper" in kwargs and \
"value" in kwargs:
missingFmt = "SPSS_MISS_RANGEANDVAL"
args = kwargs["lower"], kwargs["upper"], kwargs["value"]
elif "lower" in kwargs and "upper" in kwargs:
missingFmt = "SPSS_MISS_RANGE"
args = kwargs["lower"], kwargs["upper"], placeholder
else:
placeholder, args = b"0", None
# up to three discrete missing values
if "values" in kwargs:
values = self.encode(list(kwargs.values())[0])
if isinstance(values, (float, int, str, bytes)):
values = [values]
# check if missing values strings values are not too long
strMissLabels = [len(v) for v in values if
isinstance(v, (str, bytes))]
if strMissLabels and max(strMissLabels) > 9:
raise ValueError("Missing value label > 9 bytes")
nvalues = len(list(values)) if values is not None else values
if values is None or values == {}:
missingFmt = "SPSS_NO_MISSVAL"
args = placeholder, placeholder, placeholder
elif nvalues == 1:
missingFmt = "SPSS_ONE_MISSVAL"
args = values + [placeholder, placeholder]
elif nvalues == 2:
missingFmt = "SPSS_TWO_MISSVAL"
args = values + [placeholder]
elif nvalues == 3:
missingFmt = "SPSS_THREE_MISSVAL"
args = values
else:
msg = "You can specify up to three individual missing values"
raise ValueError(msg)
# numerical vars
if varType == 0 and args:
func = self.spssio.spssSetVarNMissingValues
func.argtypes = [c_int, c_char_p, c_int,
c_double, c_double, c_double]
args = map(float, args)
# string vars
else:
if args is None:
raise ValueError("Illegal keyword for character variable")
func = self.spssio.spssSetVarCMissingValues
func.argtypes = [c_int, c_char_p, c_int,
c_char_p, c_char_p, c_char_p]
retcode = func(self.fh, varName, userMissingValues[missingFmt], *args)
if retcode:
msg = "Problem setting missing value of variable %r" % varName
checkErrsWarns(msg, retcode)
@property
@decode
def missingValues(self):
"""Get/Set MISSING VALUES.
User missing values are values that will not be included in
calculations by SPSS. For example, 'don't know' might be coded as a
user missing value (a value of 999 is typically used, so when vairable
'age' has values 5, 15, 999, the average age is 10). This is
different from 'system missing values', which are blank/null values.
Takes a dictionary of the following form:
.. code-block:: python
# note that 'lower', 'upper', 'value(s)' are without b' prefix
missingValues = {
# discrete values
b"someNumvar1": {"values": [999, -1, -2]},
# range, cf. MISSING VALUES x (-9 THRU -1)
b"someNumvar2": {"lower": -9, "upper": -1},
b"someNumvar3": {"lower": -9, "upper": -1, "value": 999},
# string variables can have up to three missing values
b"someStrvar1": {"values": [b"foo", b"bar", b"baz"]},
b"someStrvar2": {"values': b"bletch"}
}"""
missingValues = {}
for varName in self.varNames:
missingValues[varName] = self._getMissingValue(varName)
return missingValues
@missingValues.setter
def missingValues(self, missingValues):
if missingValues:
for varName, kwargs in missingValues.items():
self._setMissingValue(varName, **kwargs)
# measurelevel, colwidth and alignment must all be set or not at all.
@property
@decode
def measureLevels(self):
"""Get/Set `VARIABLE LEVEL` (measurement level).
Returns/Takes a dictionary of the form `{varName: varMeasureLevel}`.
Valid measurement levels are: "unknown", "nominal", "ordinal", "scale",
"ratio", "flag", "typeless". This is used in SPSS procedures such as
`CTABLES`."""
func = self.spssio.spssGetVarMeasureLevel
func.argtypes = [c_int, c_char_p, POINTER(c_int)]
levels = {0: b"unknown", 1: b"nominal", 2: b"ordinal", 3: b"scale",
3: b"ratio", 4: b"flag", 5: b"typeless"}
measureLevel = c_int()
varMeasureLevels = {}
for varName in self.varNames:
vName = self.vNames[varName]
retcode = func(self.fh, c_char_py3k(vName), measureLevel)
varMeasureLevels[varName] = levels.get(measureLevel.value)
if retcode:
msg = "Problem getting measurement level %r for variable %r"
checkErrsWarns(msg % (measureLevel.value, varName), retcode)
return varMeasureLevels
@measureLevels.setter
def measureLevels(self, varMeasureLevels):
if not varMeasureLevels:
return
func = self.spssio.spssSetVarMeasureLevel
func.argtypes = [c_int, c_char_p, c_int]
levels = {b"unknown": 0, b"nominal": 1, b"ordinal": 2, b"scale": 3,
b"ratio": 3, b"flag": 4, b"typeless": 5}
for varName, level in self.encode(varMeasureLevels).items():
if level.lower() not in levels:
msg = "Valid levels are %s"
raise ValueError(msg % b", ".join(levels.keys()).decode())
level = levels.get(level.lower())
retcode = func(self.fh, c_char_py3k(varName), level)
if retcode:
msg = "Problem setting variable mesasurement level: '%s'"
checkErrsWarns(msg % varName.decode(), retcode)
@property
@decode
def columnWidths(self):
"""Get/Set `VARIABLE WIDTH` (display width).
Returns/Takes a dictionary of the form `{varName: <int>}`. A value of
zero is special and means that the IBM SPSS Statistics Data Editor
is to set an appropriate width using its own algorithm. If used,
variable alignment, measurement level and column width all needs to
be set."""
func = self.spssio.spssGetVarColumnWidth
func.argtypes = [c_int, c_char_p, POINTER(c_int)]
varColumnWidth = c_int()
varColumnWidths = {}
for varName in self.varNames:
vName = self.vNames[varName]
retcode = func(self.fh, c_char_py3k(vName), varColumnWidth)
if retcode:
msg = "Problem getting column width: '%s'"
checkErrsWarns(msg % varName, retcode)
varColumnWidths[varName] = varColumnWidth.value
return varColumnWidths
@columnWidths.setter
def columnWidths(self, varColumnWidths):
if not varColumnWidths:
return
func = self.spssio.spssSetVarColumnWidth
func.argtypes = [c_int, c_char_p, c_int]
for varName, varColumnWidth in varColumnWidths.items():
retcode = func(self.fh, c_char_py3k(varName), varColumnWidth)
if retcode:
msg = "Error setting variable column width: '%s'"
checkErrsWarns(msg % varName.decode(), retcode)
def _setColWidth10(self):
"""Set the variable display width of string values to at least 10
(it's annoying that SPSS displays e.g. a one-character variable in
very narrow columns). This also sets all measurement levels to
"unknown" and all variable alignments to "left". This function is
only called if column widths, measurement levels and variable
alignments are None."""
columnWidths = {}
for varName, varType in self.varTypes.items():
# zero = appropriate width determined by spss
columnWidths[varName] = 10 if 0 < varType < 10 else 0
self.columnWidths = columnWidths
self.measureLevels = dict([(v, b"unknown") for v in self.varNames])
self.alignments = dict([(v, b"left") for v in self.varNames])
@property
@decode
def alignments(self):
"""Get/Set `VARIABLE ALIGNMENT`. Returns/Takes a dictionary of the
form `{varName: alignment}`. Valid alignment values are: "left",
"right", "center".
.. warning:: *measureLevels, columnWidths, alignments must all three
be set, if used*"""
func = self.spssio.spssGetVarAlignment
func.argtypes = [c_int, c_char_p, POINTER(c_int)]
alignments = {0: b"left", 1: b"right", 2: b"center"}
alignment_ = c_int()
varAlignments = {}
for varName in self.varNames:
vName = self.vNames[varName]
retcode = func(self.fh, c_char_py3k(vName), alignment_)
alignment = alignments[alignment_.value]
varAlignments[varName] = alignment
if retcode:
msg = "Problem getting variable alignment: '%s'"
checkErrsWarns(msg % varName.decode(), retcode)
return varAlignments
@alignments.setter
def alignments(self, varAlignments):
if not varAlignments:
return
func = self.spssio.spssSetVarAlignment
func.argtypes = [c_int, c_char_p, c_int]
alignments = {b"left": 0, b"right": 1, b"center": 2,
"left": 0, "right": 1, "center": 2}
for varName, varAlignment in varAlignments.items():
if varAlignment.lower() not in alignments:
raise ValueError("Valid alignments are: left, center, right")
alignment = alignments.get(varAlignment.lower())
retcode = func(self.fh, c_char_py3k(varName), alignment)
if retcode:
msg = "Problem setting variable alignment for variable '%s'"
checkErrsWarns(msg % varName.decode(), retcode)
@property
@decode
def varSets(self):
"""Get/Set `VARIABLE SET` information.
Returns/Takes a dictionary with setname as keys and a list of SPSS
variables as values. For example:
.. code-block:: python
{b'SALARY': [b'salbegin', b'salary'],
b'DEMOGR': [b'gender', b'minority', b'educ']}
"""
func = self.spssio.spssGetVariableSets
func.argtypes = [c_int, POINTER(c_char_p)]
varSets = c_char_p()
retcode = func(self.fh, varSets)
if retcode:
msg = "Problem getting variable set information"
checkErrsWarns(msg, retcode)
if not varSets.value:
return {}
varSets_ = {}
for varSet in varSets.value.split(b"\n")[:-1]:
k, v = varSet.split(b"= ")
varSets_[k] = v.split()
# clean up
self.freeMemory("spssFreeVariableSets", varSets)
return varSets_
@varSets.setter
def varSets(self, varSets):
if not varSets:
return
func = self.spssio.spssSetVariableSets
func.argtypes = [c_int, c_char_p]
encoding = self.fileEncoding
varSets_ = []
for varName, varSet in varSets.items():
if isinstance(varName, bytes):
varName = varName.decode(encoding)
varSet = " ".join( [item.decode(encoding)
if isinstance(item, bytes) else item
for item in varSet] )
pair = "%s= %s" % (varName, varSet)
varSets_.append((pair).encode(encoding))
varSets_ = c_char_py3k(b"\n".join(varSets_))
retcode = func(self.fh, varSets_)
if retcode:
msg = "Problem setting variable set information"
checkErrsWarns(msg, retcode)
@property
@decode
def varRoles(self):
"""Get/Set `VARIABLE ROLES`.
Returns/Takes a dictionary of the form `{varName: varRole}`, where
varRoles may be any of the following: 'both', 'frequency', 'input',
'none', 'partition', 'record ID', 'split', 'target'"""
func = self.spssio.spssGetVarRole
func.argtypes = [c_int, c_char_p, POINTER(c_int)]
roles = {0: b"input", 1: b"target", 2: b"both", 3: b"none", 4: b"partition",
5: b"split", 6: b"frequency", 7: b"record ID"}
varRoles = {}
varRole_ = c_int()
for varName in self.varNames:
vName = self.vNames[varName]
retcode = func(self.fh, c_char_py3k(vName), varRole_)
varRole = roles.get(varRole_.value)
varRoles[varName] = varRole
if retcode:
msg = "Problem getting variable role for variable %r"
checkErrsWarns(msg, retcode)
return varRoles
@varRoles.setter
def varRoles(self, varRoles):
if not varRoles:
return
roles = {b"input": 0, b"target": 1, b"both": 2, b"none": 3,
b"partition": 4, b"split": 5, b"frequency": 6,
b"record ID": 7}
uroles = {role.decode("utf-8"): code for role, code in roles.items()}
roles.update(uroles)
func = self.spssio.spssSetVarRole
func.argtypes = [c_int, c_char_p, c_int]
for varName, varRole in varRoles.items():
varRole = roles.get(varRole)
retcode = func(self.fh, c_char_py3k(varName), varRole)
if retcode:
msg = "Problem setting variable role %r for variable %r"
checkErrsWarns(msg % (varRole, varName), retcode)
@property
@decode
def varAttributes(self):
"""Get/Set `VARIABLE ATTRIBUTES`.
Returns/Takes dictionary of the form:
.. code-block:: python
{b'var1': {b'attr name x': b'attr value x',
b'attr name y': b'attr value y'},
b'var2': {b'attr name a': b'attr value a',
b'attr name b': b'attr value b'}}
"""
# specify default array + argtypes (zero requests size)
DEFAULT_ARRAY_SIZE = 0
func = self.spssio.spssGetVarAttributes
func.argtypes = [c_int, c_char_p,
POINTER(POINTER(c_char_p * DEFAULT_ARRAY_SIZE)),
POINTER(POINTER(c_char_p * DEFAULT_ARRAY_SIZE)),
POINTER(c_int)]
# initialize arrays
attrNamesArr = (POINTER(c_char_p * DEFAULT_ARRAY_SIZE))()
attrValuesArr = (POINTER(c_char_p * DEFAULT_ARRAY_SIZE))()
attributes = {}
for varName in self.varNames:
vName = self.vNames[varName]
# step 1: get array size
nAttr = c_int()
retcode = func(self.fh, c_char_py3k(vName),
byref(attrNamesArr), byref(attrValuesArr),
byref(nAttr))
if retcode:
msg = "Problem getting attributes of variable '%s' (step 1/2)"
checkErrsWarns(msg % varName.decode(), retcode)
# step 2: get attributes with arrays of proper size
nAttr = c_int(nAttr.value)
attrNamesArr = (POINTER(c_char_p * nAttr.value))()
attrValuesArr = (POINTER(c_char_p * nAttr.value))()
func.argtypes = [c_int, c_char_p,
POINTER(POINTER(c_char_p * nAttr.value)),
POINTER(POINTER(c_char_p * nAttr.value)),
POINTER(c_int)]
retcode = func(self.fh, c_char_py3k(vName),
byref(attrNamesArr), byref(attrValuesArr),
byref(nAttr))
if retcode:
msg = "Problem getting attributes of variable '%s' (step 2/2)"
checkErrsWarns(msg % varName.decode(), retcode)
# get array contents
if not nAttr.value:
continue
k, v, n = attrNamesArr[0], attrValuesArr[0], nAttr.value
attribute = dict([(k[i], v[i]) for i in xrange(n)])
attributes[varName] = attribute
# clean up
args = (attrNamesArr, attrValuesArr, nAttr)
self.freeMemory("spssFreeAttributes", *args)
return attributes
@varAttributes.setter
def varAttributes(self, varAttributes):
if not varAttributes:
return
func = self.spssio.spssSetVarAttributes
for varName in self.varNames:
attributes = varAttributes.get(varName)
if not attributes:
continue
nAttr = len(attributes)
attrNames = (c_char_p * nAttr)(*list(attributes.keys()))
attrValues = (c_char_p * nAttr)(*list(attributes.values()))
func.argtypes = [c_int, c_char_p, POINTER(c_char_p * nAttr),
POINTER(c_char_p * nAttr), c_int]
retcode = func(self.fh, c_char_py3k(varName),
attrNames, attrValues, nAttr)
if retcode:
msg = "Problem setting variable attributes for variable %r"
checkErrsWarns(msg % varName, retcode)
@property
@decode
def fileAttributes(self):
"""Get/Set `DATAFILE ATTRIBUTES`.
Returns/Takes a dictionary of the form:
.. code-block:: python
b'attrName[1]': b'attrValue1',
b'revision[1]': b'2010-10-09',
b'revision[2]': b'2010-10-22',
b'revision[3]': b'2010-11-19'}
Square brackets indicate attribute arrays, which must
start with 1"""
# abbreviation for readability
DEFAULT_ARRAY_SIZE = 0
func = self.spssio.spssGetFileAttributes
func.argtypes = [c_int,
POINTER(POINTER(c_char_p * DEFAULT_ARRAY_SIZE)),
POINTER(POINTER(c_char_p * DEFAULT_ARRAY_SIZE)),
POINTER(c_int)]
# step 1: get array size (zero requests size)
attrNamesArr = (POINTER(c_char_p * DEFAULT_ARRAY_SIZE))()
attrValuesArr = (POINTER(c_char_p * DEFAULT_ARRAY_SIZE))()
nAttr = c_int()
retcode = func(self.fh, byref(attrNamesArr),
byref(attrValuesArr), byref(nAttr))
# step 2: get attributes with arrays of proper size
nAttr = c_int(nAttr.value)
attrNamesArr = (POINTER(c_char_p * nAttr.value))()
attrValuesArr = (POINTER(c_char_p * nAttr.value))()
func.argtypes = [c_int,
POINTER(POINTER(c_char_p * nAttr.value)),
POINTER(POINTER(c_char_p * nAttr.value)),
POINTER(c_int)]
retcode = func(self.fh, byref(attrNamesArr),
byref(attrValuesArr), byref(nAttr))
if retcode:
checkErrsWarns("Problem getting file attributes", retcode)
# get array contents
if not nAttr.value:
return {}
k, v = attrNamesArr[0], attrValuesArr[0]
attributes = dict([(k[i], v[i]) for i in xrange(nAttr.value)])
# clean up
args = (attrNamesArr, attrValuesArr, nAttr)
self.freeMemory("spssFreeAttributes", *args)
return attributes
@fileAttributes.setter
def fileAttributes(self, fileAttributes):
if not fileAttributes:
return
nAttr = len(fileAttributes)
attrNames = (c_char_p * nAttr)(*list(fileAttributes.keys()))
attrValues = (c_char_p * nAttr)(*list(fileAttributes.values()))
func = self.spssio.spssSetFileAttributes
func.argtypes = [c_int, POINTER(c_char_p * nAttr),
POINTER(c_char_p * nAttr), c_int]
retcode = func(self.fh, attrNames, attrValues, nAttr)
if retcode:
checkErrsWarns("Problem setting file attributes", retcode)
def _getMultRespDef(self, mrDef):
"""Get 'normal' multiple response defintions.
This is a helper function for the multRespDefs getter function.
A multiple response definition <mrDef> in the string format returned
by the IO module is converted into a multiple response definition of
the form multRespSet = {<setName>: {"setType": <setType>, "label":
<lbl>, "varNames": <list_of_varNames>}}. SetType may be either 'D'
(multiple dichotomy sets) or 'C' (multiple category sets). If setType
is 'D', the multiple response definition also includes '"countedValue":
countedValue'"""
regex = b"\$(?P<setName>\S+)=(?P<setType>[CD])\n?"
m = re.search(regex + b".*", mrDef, re.I | re.L)
if not m:
return {}
setType = m.group("setType")
if setType == b"C": # multiple category sets
regex += b" (?P<lblLen>\d+) (?P<lblVarNames>.+) ?\n?"
matches = re.findall(regex, mrDef, re.I)
setName, setType, lblLen, lblVarNames = matches[0]
else: # multiple dichotomy sets
# \w+ won't always work (e.g. thai) --> \S+
regex += (b"(?P<valueLen>\d+) (?P<countedValue>\S+)" +
b" (?P<lblLen>\d+) (?P<lblVarNames>.+) ?\n?")
matches = re.findall(regex, mrDef, re.I | re.L)
setName, setType, valueLen = matches[0][:3]
countedValue, lblLen, lblVarNames = matches[0][3:]
lbl = lblVarNames[:int(lblLen)]
varNames = lblVarNames[int(lblLen):].split()
multRespSet = {setName: {b"setType": setType, b"label": lbl,
b"varNames": varNames}}
if setType == b"D":
multRespSet[setName][b"countedValue"] = countedValue
return multRespSet
def _setMultRespDefs(self, multRespDefs):
"""Set 'normal' multiple response defintions.
This is a helper function for the multRespDefs setter function.
It translates the multiple response definition, specified as a
dictionary, into a string that the IO module can use"""
# see also issue #23
encoding = self.fileEncoding
mrespDefs = []
for setName, rest in multRespDefs.items():
rest = self.encode(rest)
if rest[b"setType"] not in (b"C", b"D"):
continue
# convert to ustrings for simpler string replacement
rest[u"setName"] = setName.decode()
rest[u"setType"] = rest[b"setType"].decode(encoding)
lblLen = str(len(rest[b"label"]))
rest[u"lblLen"] = lblLen
rest[u"label"] = rest.get(b"label", b"").decode(encoding)
import copy
varNames = tuple(copy.deepcopy(rest[b"varNames"]))
rest[u"varNames"] = b" ".join(varNames).decode(encoding)
# check if the variables in the MR definition exist in data
difference = set(varNames) - set(self.varNames)
if difference:
msg = "Variables not present in data: %s"
raise ValueError(msg % b", ".join(sorted(difference)))
# now build the multiple response definition
mrespDef = "$%(setName)s=%(setType)s" % rest
tail = " %(varNames)s" if lblLen == 0 else "%(label)s %(varNames)s"
# ... multiple category sets
if rest[b"setType"] == b"C":
template = " %%(lblLen)s %s " % tail
template = template % rest
# ... multiple dichotomy sets
else:
rest["countedValue"] = rest[b"countedValue"].decode(encoding)
rest["valueLen"] = len(rest["countedValue"]) # issue #4
template = ("%(valueLen)s %(countedValue)s %(lblLen)s "
"%(label)s %(varNames)s")
mrespDef += template % rest
mrespDefs.append(mrespDef.rstrip())
mrespDefs = "\n".join(mrespDefs)
return mrespDefs.encode(encoding)
def _getMultRespDefsEx(self, mrDef):
"""Get 'extended' multiple response defintions.
This is a helper function for the multRespDefs getter function."""
regex = (b"\$(?P<setName>\w+)=(?P<setType>E) (?P<flag1>1)"
b"(?P<flag2>1)? (?P<valueLen>[0-9]+) (?P<countedValue>\w+) "
b"(?P<lblLen>[0-9]+) (?P<lblVarNames>[\w ]+)")
matches = re.findall(regex, mrDef, re.I | re.L)
if not matches:
return {}
setName, setType, flag1, flag2 = matches[0][:4]
valueLen, countedValue, lblLen, lblVarNames = matches[0][4:]
length = int(lblLen)
label, varNames = lblVarNames[:length], lblVarNames[length:].split()
return {setName: {"setType": setType, "firstVarIsLabel": bool(flag2),
"label": label, "countedValue": countedValue,
"varNames": varNames}}
@property
@decode
def multRespDefs(self):
"""Get/Set `MRSETS` (multiple response) sets.
Returns/takes a dictionary of the form:
* multiple category sets: `{setName: {"setType": "C", "label": lbl,
"varNames": [<list_of_varNames>]}}`
* multiple dichotomy sets: `{setName: {"setType": "D", "label": lbl,
"varNames": [<list_of_varNames>], "countedValue": countedValue}}`
* extended multiple dichotomy sets: `{setName: {"setType": "E",
"label": lbl, "varNames": [<list_of_varNames>], "countedValue":
countedValue, 'firstVarIsLabel': <bool>}}`
Note. You can get values of extended multiple dichotomy sets with
getMultRespSetsDefEx, but you cannot write extended multiple dichotomy
sets.
For example:
.. code-block:: python
categorical = {b"setType": b"C",
b"label": b"labelC",
b"varNames": [b"salary", b"educ"]}
dichotomous1 = {b"setType": b"D", b"label": b"labelD",
b"varNames": [b"salary", b"educ"],
b"countedValue": b"Yes"}
dichotomous2 = {b"setType": b"D",
b"label": b"",
b"varNames": [b"salary", b"educ", b"jobcat"],
b"countedValue": b"No"}
extended1 = {b"setType": b"E",
b"label": b"",
b"varNames": [b"mevar1", b"mevar2", b"mevar3"],
b"countedValue": b"1",
b"firstVarIsLabel": True}
extended2 = {b"setType": b"E",
b"label": b"Enhanced set with user specified label",
b"varNames": [b"mevar4", b"mevar5", b"mevar6"],
b"countedValue": b"Yes",
b"firstVarIsLabel": False}
multRespDefs = {b"testSetC": categorical,
b"testSetD1": dichotomous1,
b"testSetD2": dichotomous2,
b"testSetEx1": extended1,
b"testSetEx2": extended2}
"""
# It seems that spssGetMultRespDefsEx replaces spssGetMultRespDefs,
# so I ditched a call to self.spssio.spssGetMultRespDefs.
# TODO: self._getMultRespDefsEx is not tested! Need test data with
# 'extended' MR definitions. Caveat emptor: WTF are these exactly?
## Normal Multiple response definitions
# (deleted code)
## Extended Multiple response definitions
func = self.spssio.spssGetMultRespDefsEx
func.argtypes = [c_int, POINTER(c_char_p)]
mrDefsEx = c_char_p()
retcode = func(self.fh, mrDefsEx)
if retcode:
msg = "Problem getting extended multiple response definitions"
checkErrsWarns(msg, retcode)
multRespDefsEx = {}
if mrDefsEx.value:
for mrDefEx in mrDefsEx.value.split(b"\n"):
# dichotomy or category definitions
settypes_d_or_c = self._getMultRespDef(mrDefEx)
for setName, rest in settypes_d_or_c.items():
multRespDefsEx[setName] = rest
# extended definitions
settype_e = self._getMultRespDefsEx(mrDefEx)
multRespDefsEx.update(settype_e)
self.freeMemory("spssFreeMultRespDefs", mrDefsEx)
return multRespDefsEx
@multRespDefs.setter
def multRespDefs(self, multRespDefs):
if not multRespDefs:
return
multRespDefs = self._setMultRespDefs(multRespDefs)
func = self.spssio.spssSetMultRespDefs
func.argtypes = [c_int, c_char_p]
retcode = func(self.fh, c_char_py3k(multRespDefs))
if retcode:
msg = "Problem setting multiple response definitions"
checkErrsWarns(msg, retcode)
@property
@decode
def caseWeightVar(self):
"""Get/Set WEIGHT variable.
Takes a valid varName, and returns weight variable, if any, as a
string."""
lenBuff = 65
func = self.spssio.spssGetCaseWeightVar
func.argtypes = [c_int, POINTER(c_char * lenBuff)]
varNameBuff = create_string_buffer(lenBuff)
retcode = func(self.fh, varNameBuff)
if retcode > 0:
msg = "Problem getting case weight variable name"
raise SPSSIOError(msg, retcode)
return varNameBuff.value
@caseWeightVar.setter
def caseWeightVar(self, varName):
if not varName:
return
func = self.spssio.spssSetCaseWeightVar
func.argtypes = [c_int, c_char_p]
retcode = func(self.fh, c_char_py3k(varName))
if retcode:
msg = "Problem setting case weight variable name %r" % varName
checkErrsWarns(msg, retcode)
@property
@decode
def dateVariables(self): # pragma: no cover
# seems to be okay
"""Get/Set `DATE` information. This function reports the Forecasting
(Trends) date variable information, if any, in IBM SPSS Statistics
data files. Entirely untested and not implemented in reader/writer"""
# step 1: get array size
DEFAULT_ARRAY_SIZE = 0
func = self.spssio.spssGetDateVariables
func.argtypes = [c_int, POINTER(c_int),
POINTER(POINTER(c_long * DEFAULT_ARRAY_SIZE))]
nElements = c_int()
dateInfoArr = (POINTER(c_long * DEFAULT_ARRAY_SIZE))()
retcode = func(self.fh, nElements, dateInfoArr)
# step 2: get date info with array of proper size
func.argtypes = [c_int, POINTER(c_int),
POINTER(POINTER(c_long * nElements.value))]
dateInfoArr = (POINTER(c_long * nElements.value))()
retcode = func(self.fh, nElements, dateInfoArr)
if retcode:
checkErrsWarns("Problem getting TRENDS information", retcode)
# get array contents
nElem = nElements.value
if not nElem:
return {}
dateInfo = [dateInfoArr[0][i] for i in xrange(nElem)]
fixedDateInfo = dateInfo[:6]
otherDateInfo = [dateInfo[i: i + 3] for i in xrange(6, nElem, 3)]
dateInfo = {"fixedDateInfo": fixedDateInfo,
"otherDateInfo": otherDateInfo}
# clean up
self.freeMemory("spssFreeDateVariables", dateInfoArr)
return dateInfo
@dateVariables.setter
def dateVariables(self, dateInfo): # pragma: no cover
# always returns 'SPSS_INVALID_DATEINFO'! :-(
dateInfo = [dateInfo["fixedDateInfo"]] + dateInfo["otherDateInfo"]
dateInfo = reduce(list.__add__, dateInfo) # flatten list
isAllInts = all([isinstance(d, int) for d in dateInfo])
isSixPlusTriplets = (len(dateInfo) - 6) % 3 == 0
if not isAllInts and isSixPlusTriplets:
msg = ("TRENDS date info must consist of 6 fixed elements"
"+ <nCases> three-element groups of other date info "
"(all ints)")
raise TypeError(msg)
nElements = len(dateInfo)
func = self.spssio.spssSetDateVariables
func.argtypes = [c_int, c_int, (c_long * nElements)]
dateInfoArr = (c_long * nElements)(*dateInfo)
retcode = func(self.fh, nElements, dateInfoArr)
if retcode:
checkErrsWarns("Problem setting TRENDS information", retcode)
@property
@decode
def textInfo(self):
"""Get/Set text information.
Takes a savFileName and returns a string of the form: "File %r built
using SavReaderWriter.py version %s (%s)". This is akin to, but
*not* equivalent to the SPSS syntax command `DISPLAY DOCUMENTS`"""
lenBuff = 256
func = self.spssio.spssGetTextInfo
func.argtypes = [c_int, POINTER(c_char * lenBuff)]
textInfo = create_string_buffer(lenBuff)
retcode = func(self.fh, textInfo)
if retcode:
checkErrsWarns("Problem getting textInfo", retcode)
return textInfo.value
@textInfo.setter
def textInfo(self, savFileName):
info = (os.path.basename(savFileName), __version__, time.asctime())
textInfo = "File '%s' built using savReaderWriter version %s (%s)"
textInfo = textInfo % info
if self.ioUtf8 and isinstance(savFileName, unicode):
textInfo = textInfo.encode("utf-8")
func = self.spssio.spssSetTextInfo
func.argtypes = [c_int, c_char_p]
retcode = func(self.fh, c_char_py3k(textInfo[:256]))
if retcode:
checkErrsWarns("Problem setting textInfo", retcode)
@property
@decode
def fileLabel(self):
"""Get/Set `FILE LABEL` (id string)
Takes a file label, and returns file label, if any, as
a byte string."""
lenBuff = 65
func = self.spssio.spssGetIdString
func.argtypes = [c_int, POINTER(c_char * lenBuff)]
idStr = create_string_buffer(lenBuff)
retcode = func(self.fh, idStr)
if retcode:
checkErrsWarns("Error getting file label (id string)", retcode)
return idStr.value
@fileLabel.setter
def fileLabel(self, idStr):
if idStr is None:
idStr = ("File created by user %r at %s"[:64] %
(getpass.getuser(), time.asctime()))
if self.ioUtf8 and isinstance(idStr, unicode):
idStr = idStr.encode("utf-8")
func = self.spssio.spssSetIdString
func.argtypes = [c_int, c_char_p]
retcode = func(self.fh, c_char_py3k(idStr))
if retcode:
checkErrsWarns("Problem setting file label (id string)", retcode)
@property
def queryType7(self): # pragma: no cover
"""This function can be used to determine whether a file opened for reading
or append contains a specific "type 7" record. Returns a dictionary of the
form: `{subtype_number: (subtype_label, present_or_not)}`, where
present_or_not is a bool"""
subtypes = \
{3: "Release information",
4: "Floating point constants including the system missing value",
5: "Variable set definitions",
6: "Date variable information",
7: "Multiple-response set definitions",
8: "Data Entry for Windows (DEW) information",
10: "TextSmart information",
11: ("Measurement level, column width, and " +
"alignment for each variable")}
func = self.spssio.spssQueryType7
func.argtypes = [c_int, c_int, POINTER(c_int)]
type7info = {}
for subtype, label in subtypes.items():
bFound = c_int()
retcode = func(self.fh, subtype, bFound)
if retcode:
checkErrsWarns("Problem retrieving type7 info", retcode)
type7info[subtype] = (label, bool(bFound.value))
return type7info
@property
def dataEntryInfo(self): # pragma: no cover
"""Get/Set information that is private to the Data Entry for Windows (DEW)
product. Returns/takes a dictionary of the form:
dataEntryInfo = {"data": [<list_of_dew_segments>], "GUID": <guid>},
where GUID stands for 'globally unique identifier'.
Some remarks:
-A difference in the byte order of the host system and the foreign host
will result in an error. Therefore, an optional 'swapBytes' key may
be specified whose value indicates whether the bytes should be swapped
(True) or not (False). Default is that the byte order of the host system
is retained.
-DEW information is not copied when using mode="cp" in the SavWriter
initializer
-THIS IS ENTIRELY UNTESTED!"""
# check if file and host system byte order match
# spssGetDEWInfo will return SPSS_NO_DEW, which is less desirable
endianness = self.releaseInfo["big/little-endian code"]
file_byte_order = 'little' if endianness == 0 else 'big'
if file_byte_order != sys.byteorder:
msg = "Host (%s-endian) and file (%s-endian) byte order differ"
raise ValueError(msg % (sys.byteorder, file_byte_order))
# retrieve length of DEW information (in bytes)
func = self.spssio.spssGetDEWInfo
func.argtypes = [c_int, POINTER(c_long), POINTER(c_long)]
pLength, pHashTotal = c_long(), c_long()
retcode = func(self.fh, pLength, pHashTotal)
maxData = pLength.value # Maximum bytes to return
if not maxData:
return {} # file contains no DEW info
# retrieve first segment of DEW information
if not retcode:
func = self.spssio.spssGetDEWFirst
func.argtypes = [c_int, POINTER(c_void_p),
c_long, POINTER(c_long)]
nData, pData = c_long(), c_void_p()
retcode = func(self.fh, pData, maxData, nData)
dew_information = [pData.value]
# retrieve subsequent segments of DEW information
if not retcode:
func = self.spssio.spssGetDEWNext
func.argtypes = [c_int, POINTER(c_void_p), c_long, POINTER(c_long)]
for i in range(nData.value - 1):
nData = c_long()
retcode = func(self.fh, pData, maxData, nData)
if retcode > 0:
break
dew_information.append(pData.value)
# retieve GUID information
if not retcode:
func = self.spssio.spssGetDEWGUID
asciiGUID = create_string_buffer(257)
retcode = func(c_int(self.fh), byref(asciiGUID))
if retcode:
msg = "Problem getting Data Entry info with function %r"
checkErrsWarns(msg % func.__name__, retcode)
return dict(data=dew_information, GUID=asciiGUID.value)
@dataEntryInfo.setter
def dataEntryInfo(self, info): # pragma: no cover
data, asciiGUID = info["data"], info["GUID"]
# input validation
is_ascii = all(map(lambda x: ord(x) < 128, asciiGUID))
if not isinstance(asciiGUID, str) and is_ascii:
raise ValueError("GUID must be a string of ascii characters")
# I am not sure at all about the following
swapit = info.has_key("swapBytes") and info.get("swapBytes")
def swap(x):
"""swap bytes if needed"""
src_fmt = '<%s' if sys.byteorder == 'little' else '>%s'
dst_fmt = ">%s" if swapit and src_fmt[0] == "<" else "<%s"
if isinstance(x, (float, int)):
src_fmt, dst_fmt = src_fmt % "l", dst_mft % "l"
elif isinstance(x, str):
src_fmt, dst_fmt = src_fmt % "s", dst_mft % "s"
else:
type_ = re.search("'(\w+)'", str(type(x))).group(1)
raise TypeError("Must be str, int or float, not %s") % type_
if src_fmt != dst_fmt:
x = struct.unpack(dst_fmt, struct.pack(src_fmt, x))[0]
return x
if swapit:
data, asciiGUID = map(swap, data), swap(asciiGUID)
# write DEW information
for i, pData in enumerate(data):
nBytes = len(pData)
args = c_int(self.fh), c_void_p(pData), c_long(nBytes)
# ... first segment
if not i:
func = self.spssio.spssSetDEWFirst
retcode = func(*args)
# ... subsequent segments
else:
func = self.spssio.spssSetDEWNext
retcode = func(*args)
if retcode > 0:
break
# write GUI information
if not retcode:
args = self.fh, c_char_py3k(asciiGUID)
func = self.spssio.spssSetDEWGUID
func.argtypes = [c_int, c_char_p]
retcode = func(*args)
if retcode:
msg = "Problem setting Data Entry info with function %r"
checkErrsWarns(msg % func.__name__, retcode)