"""
:class:`CRAReader`
------------------
.. autoclass:: CRAReader
:class:`CRA2009`
----------------
.. autoclass:: CRA2009
:show-inheritance:
:class:`CofogMapper`
--------------------
.. autoclass:: CofogMapper
"""
import os, sys, csv, re
from datetime import date, datetime
try: import json
except ImportError: import simplejson as json
import datapkg, datapkg.util
import pkg_resources
from ordf.command import Command
#from wdmmg.vocab.cra import ProgrammeObjectGroups, ExpenditureTypes
#from wdmmg.vocab.cra import CountryAndRegionalAnalysis
#from wdmmg.vocab.entity import Entities
from unstats.cofog import Functions
from eurostat.nuts import Regions, Region
from ukgov.reference.time import GovYear
import ukgov.finance.currency
from ukgov.treasury.pog import ProgrammeObjectGroups
from ukgov.treasury.cra.vocab import Entity, ExpenditureType
#from ukgov.treasury.cra import CountryAndRegionalAnalysis, ProgrammeObjectGroups, ExpenditureTypes
from ordf.vocab.sdmx import DataSet, DataStructureDefinition, Observation, TimeSeries
from ordf.vocab.void import Dataset as VoidDataset
from ordf.graph import ConjunctiveGraph, Graph, ReadOnlyGraphAggregate
from ordf.namespace import Namespace
from ordf.namespace import DC, RDF, RDFS, FOAF, OWL, SKOS, SDMX, TIME, \
SDMX, SDMXDIM, SDMXATTR, SDMXCODE, \
GOVYEAR, UKCUR, \
ENT, CRA, CRADSD, CRADATA, NUTS, NUTSS, POG, COFOG
from ordf.term import BNode, Literal, URIRef
from ordf.collection import Collection
from telescope import Select, v
from ordf.handler import init_handler
log = __import__("logging").getLogger(__name__)
[docs]class CofogMapper(object):
'''
Constructs a COFOG mapper from a mappings object (which is usually loaded
from a JSON file).
In the published data, the "function" and "subfunction" columns are used
inconsistently. This is partly because some departments continue to use a
previous coding system, and partly because only two columns have been
allowed for the three levels of the COFOG hierarchy.
This class uses a mapping provided by William Waites to work out the
correct COFOG code, given the published data.
:param mappings:
a list of triples. In each triple, the first element is the good
code, and the second and third elements give the published values.
If the first element (the good code) contains non-numerical suffix,
it will be removed.
.. automethod:: fix
'''
def __init__(self, mappings):
'''
'''
self.mappings = {}
for good, bad1, bad2 in mappings:
good = re.match(r'([0-9]+(\.[0-9])*)', good).group(1)
self.mappings[bad1, bad2] = good
[docs] def fix(self, function, subfunction):
'''
Looks up the fixed COFOG code given the published values.
Returns a list giving all available COFOG levels, e.g.
`[u'01', u'01.1', u'01.1.1']`
Returns an empty list if no COFOG mapping has been
defined.
:param function:
function as expressed by HMT
:param subfunction:
subfunction as expressed by HMT
'''
ans = self.mappings.get((function, subfunction))
if ans is None:
return []
parts = ans.split('.')
return ['.'.join(parts[:i+1]) for i, _ in enumerate(parts)]
[docs]class CRAReader(object):
"""
A specialised CSV reader for the CRA that takes a datapkg dataset
and datafile and yields dictionaries for each row with the values
cleaned. Once initialised, this class is an iterable.
At the time of writing this requires the current version of
datapkg from the mercurial repository. The package metadata is
included in this python module. You can see the available datasets
by doing::
% datapkg list egg://ukgov_treasury_cra
cra2009 -- Country and Regional Analysis 2009 - CSV
Example usage of this class:
.. code-block:: python
from pprint import pprint
reader = CRAReader(cache_dir, "cra2009", "cra_2009_db.csv")
for row in reader:
pprint(row)
...
output:
.. code-block:: python
{'body_type': u'CG',
'cap_or_cur': u'CUR',
'cofog_parts': [u'04', u'04.1', u'04.1.2'],
'dept_code': u'Dept032',
'dept_name': u'Department for Work and Pensions',
'expenditures': [(u'2003-2004', 0.0),
(u'2004-2005', 0.0),
(u'2005-2006', 0.0),
(u'2006-2007', 0.0),
(u'2007-2008', 0.0),
(u'2008-2009', 12100000.0),
(u'2009-2010', 12100000.0),
(u'2010-2011', 12100000.0)],
'pog_code': u'P37 S121211',
'pog_name': u'ADMIN COSTS OF MEASURES TO HELP UNEMPL PEOPLE MOVE FROM WELFARE T...',
'region': u'SCOTLAND'}
:param cache_dir:
Cache directory where the downloaded data will live. A datapkg
filesystem index is created here
:param dataset:
The name of the dataset to be downloaded, e.g. "cra2009"
:param datafile:
The filename of the downloaded data within the dataset, e.g.
"cra_2009_db.csv"
:param year_col_start:
The column in the datafile where the actual expenditures begin,
after the metadata columns.
.. automethod:: getdata
.. automethod:: header
.. automethod:: cleandata
.. automethod:: make_cofog_mapper
.. automethod:: get_department
.. automethod:: get_cofog_parts
.. automethod:: get_pog
.. automethod:: get_cap_or_cur
.. automethod:: get_body_type
.. automethod:: get_region
.. automethod:: get_expenditures
"""
# the name of the present python package, which is used as a datapkg
# egg index for looking up the source of the data.
index_name = "ukgov_treasury_cra"
def __init__(self, cache_dir, dataset, datafile, year_col_start=10):
self.cache_dir = cache_dir
self.dataset = dataset
self.datafile = datafile
self.year_col_start = year_col_start
self.cofog_mapper = self.make_cofog_mapper()
[docs] def make_cofog_mapper(self):
"""
Create a COFOG mapper used in cleaning the data. This is a method
to provide for easy subclassing where a different mapper is required.
"""
# Make a CofogMapper.
fp = pkg_resources.resource_stream(__name__, os.path.join("data", "cofog_map.json"))
mappings = json.load(fp)
fp.close()
return CofogMapper(mappings)
def __iter__(self):
# Get the CRA data package.
source = "egg://%s" % os.path.join(self.index_name, self.dataset)
cached = 'file://%s' % os.path.join(self.cache_dir, self.index_name, self.dataset)
fp = self.getdata(source, cached, self.datafile)
reader = csv.reader(fp)
header = reader.next()
self.header(header)
for row in reader:
if not row[0]:
continue # Skip blank row.
yield self.cleandata(row)
fp.close()
[docs] def getdata(self, source, cached, datafile):
"""
Returns an open file handle pointing at the data file
that we want to read. This copies perhaps too much
from datapkg's "install" command implementation and
should probably be part of the datapkg API.
:param source:
source index and dataset specification, e.g.
*"egg://ukgov_treasury_cra/cra2009"*
:param cached:
destination to cache the data, also index and
dataset specification, e.g.
*"file:///tmp/ukgov_treasury_cra/2009"*
:param datafile:
filename within the dataset to open. e.g.
*"cra_2009_db.csv"*
"""
# slightly inefficient to initialise all of these here, but
# it makes the code more readable even if they're not always
# used
cached_spec = datapkg.spec.Spec.parse_spec(cached)
cached_index, dataset = cached_spec.index_from_spec()
source_spec = datapkg.spec.Spec.parse_spec(source)
source_index, dataset = source_spec.index_from_spec()
# first step - try to pull the package from datapkg. if it
# doesn't exist we'll get either a DatapkgException or an
# I/O error here
try:
pkg = datapkg.load_package(cached)
except Exception, e:
if not isinstance(e, OSError) and not isinstance(e, datapkg.DatapkgException):
raise # better syntax for this?
pkg = source_index.get(dataset)
cached_index.register(pkg)
pkg = cached_index.get(dataset)
# second step, open our file handle. if this fails with an
# I/O error it means the file hasn't been downloaded yet
# (or the disc has bad sectors)
try:
fp = pkg.stream(datafile)
except IOError:
install_path = os.path.join(cached_index.index_path, pkg.name)
log.info("downloading %s to %s" % (dataset, install_path))
downloader = datapkg.util.Downloader(install_path)
downloader.download(pkg.download_url)
fp = pkg.stream(datafile)
return fp
[docs] def cleandata(self, row):
"""
Cleans and normalises an input row, turns it into a dictionary.
This uses the various :meth:`get_*` methods to extract individual
parts for easy subclassing. This method starts out with an
empty collector dictionary and looks for methods with names that begin
with *get_* and calls them with the row as an argument. The
return value of these functions is expected to be a dictionary
that is used to update the collector.
:returns:
A dictionary representing the data in the row.
"""
data = {}
row = [unicode(x.strip()) for x in row]
for attr in dir(self):
if not attr.startswith("get_"):
continue
data.update(getattr(self, attr)(row))
return data
[docs] def get_department(self, row):
"""
Extracts the department code and name from the first two columns
of the row.
:returns:
A dictionary with keys *"dept_code"* and *"dept_name"*
"""
return {"dept_code": row[0], "dept_name": row[1]}
[docs] def get_cofog_parts(self, row):
"""
Extracts the COFOG from the function and subfunction fields,
columns 2 and 3 in the 2009 data. Passes them through the
:class:`CofogMapper`.
:returns:
A dictionary with the key *"cofog_parts"* and value a list
ordered from least to most specific classification.
"""
function = row[2]
subfunction = row[3]
# Map 'function' and 'subfunction' to three levels of COFOG.
cofog_parts = self.cofog_mapper.fix(function, subfunction)
assert cofog_parts, 'COFOG code is missing for (%s, %s)' % (function, subfunction)
assert len(cofog_parts) <= 3, 'COFOG code %r has too many levels' % cofog_parts
assert len(cofog_parts) > 0, 'COFOG code not found for %s' % (function, subfunction)
return {"cofog_parts": cofog_parts}
[docs] def get_pog(self, row):
"""
Extracts the Programme Object Group from the columns 4 and 5.
:returns:
A dictionary with keys *"pog_code"* and *"pog_name"*
"""
pog_code = row[4]
pog_name = row[5]
if pog_name != pog_code and pog_name.startswith(pog_code):
pog_name = pog_name[len(pog_code):].lstrip(" ")
return {"pog_code": pog_code, "pog_name": pog_name}
[docs] def get_cap_or_cur(self, row):
"""
Extracts the flag indicating capital or current expenditure from
column 7.
:returns:
A dictionary with the key *"cap_or_cur"* and the value either
*"CAP"* or *"CUR"* as applicable.
"""
cap_or_cur = row[7]
assert cap_or_cur in ("CAP", "CUR"), "Unknown value for cap_or_cur: %s" % cap_or_cur
return {"cap_or_cur": cap_or_cur}
[docs] def get_body_type(self, row):
"""
Extracts the type of reporting entity from column 8.
:return:
A dictionary with the key *"body_type"* and the value can be
one of:
* *"CG"* -- Central Government
* *"LA"* -- Local Authority
* *"PC"* -- Public Corporation
"""
body_type = row[8]
assert body_type in ("CG", "LA", "PC"), "Unknown body type: %s" % body_type
return {"body_type": body_type}
[docs] def get_region(self, row):
"""
Extract the region from column 9 of the row.
:returns:
A dictionary with the key *"region"*.
"""
return {"region": row[9]}
[docs] def get_expenditures(self, row):
"""
Extract the yearly expenditures from the row. This makes use of
the :attr:`year_col_start` class attribute to determine at which
column to begin.
:returns:
A dictionary with the key "expenditures and value a list of
two-tuples representing fiscal year and the reported expenditure
value
"""
# Utility function for parsing numbers.
def to_float(s):
if not s: return 0.0
return float(s.replace(',', ''))
expenditures = [round(1e6*to_float(x)) for x in row[self.year_col_start:]]
return {"expenditures": zip(self.years, expenditures)}
[docs]class CRA2009(Command):
"""
This class implements the :ref:`cra2009` command that loads the CSV
file from the treasury and transforms it to RDF, placing the results
in a triplestore managed by :mod:`ordf.handler.rdf`. See the documentation
in the command-line utilities section of the manual for usage instructions.
.. attribute:: edition
This class attribute may be changed by subclasses implementing a command
for other editions of the CRA data. The value here is the string "2009".
This is used both to derive the dataset and datafile that will be used
as well as to set the RDF namespace for the generated data. If the 2010
data is made available in a file called 'cra_2010_db.csv' and has exactly
the same form as the 2009 data, then creating a command for manipulating
the new edition should be as simple as:
.. code-block:: python
class CRA2010(CRA2009):
edition = "2010
and adding the appropriate entry points and 'datapkg_sources' to the
'setup.py' file.
.. attribute:: store
An RDFLib compatible store, gleaned from the :class:`ordf.handler.Handler`
that will be used for saving the data.
.. attribute:: entries
This property is created by :meth:`create_sdmx_dataset` and contains
the python object representing the sdmx:DataSet.
.. attribute:: components
This property is also created by :meth:`create_sdmx_dataset`. It
is a list of the types for each dimension that should be included when
constructing the URI for timeseries.
.. automethod:: load_schema
.. automethod:: create_sdmx_dataset
.. automethod:: create_sdmx_timeseries
"""
parser = Command.StandardParser()
parser.add_option("-s", "--schema",
dest="load_schema",
action="store_true",
default=False,
help="Also load the schema")
edition = "2009"
def command(self):
if hasattr(self.handler, "fourstore"):
self.store = self.handler.fourstore.store
elif hasattr(self.handler, "rdflib"):
self.store = self.handler.rdflib.store
else:
raise AttributeError("Require either RDFLib of 4store configured")
if self.options.load_schema:
self.load_schema()
# initialise the classes and concept schemes that we will
# use to construct the entries dataset
self.govyear = GovYear(Graph(self.store, identifier=GOVYEAR[""]), load=True)
self.entities = Entity(BNode(),
factoryGraph=Graph(self.store, identifier=ENT[""]),
factoryClass=Entity)
self.regions = Regions(factoryGraph=Graph(self.store, identifier=NUTS[""]))
self.functions = Functions(factoryGraph=Graph(self.store, identifier=COFOG[""]))
self.pogs = ProgrammeObjectGroups(factoryGraph=Graph(self.store, identifier=POG[""]))
self.exps = ExpenditureType()
# don't need to specify the graph for these ones because we make a
# new graph for each time series/row
self.obs = Observation()
self.timeseries = TimeSeries()
# this is used to flag an observation as provisional (in the year
# of publication of the CRA) or as future
self.current_year = datetime(2009, 4, 1, 0, 0, 0)
# create the container for the timeseries in this dataset
self.create_sdmx_dataset()
dataset = "cra%s" % self.edition
datafile = "cra_%s_db.csv" % self.edition
reader = CRAReader(self.config["cache_dir"], dataset, datafile)
for row in reader:
self.create_sdmx_timeseries(row)
break
print ConjunctiveGraph(self.store).serialize(format="n3")
for x in self.store.contexts():
print x
[docs] def load_schema(self):
"""
Load the CRA schema and supporting information into the RDF
datastore
"""
def _load(func):
for s in func():
log.info("loading %s" % s.identifier)
g = Graph(self.store, identifier=s.identifier)
g += s
from ukgov.treasury.cra import vocab as cra
from ukgov.treasury import pog
from unstats import cofog
from eurostat import nuts
_load(cra.rdf_data)
_load(pog.rdf_data)
_load(nuts.rdf_data)
_load(cofog.rdf_data)
[docs] def create_sdmx_dataset(self):
"""
Define the structure for the entries dataset, this is at the
most granular level reflecting what data is directly in the
cra CSV file. The result of calling this function is to create
the following graphs in the RDF store:
* a *sdmx:DataSet*
* a *sdmx:DataStructureDefinition* describing the dimensions and
attributes of the data in the daset
This method sets the :attr:`entries` and :attr:`components`
properties on this class.
"""
dsd = DataStructureDefinition()
entriesdsd = dsd.get(CRADSD["%s/entries" % self.edition])
entriesdsd.type = dsd.identifier
entriesdsd.component = SDMXDIM.refPeriod
entriesdsd.component = SDMXATTR.unitMeasure
entriesdsd.component = CRA.entity
entriesdsd.component = CRA.region
entriesdsd.component = CRA.function
entriesdsd.component = CRA.pog
entriesdsd.component = CRA.expenditureType
entriesdsd.component = CRA.obsStatus
# collections not handled very prettily at the moment
_head = BNode()
entriesdsd.graph.add((entriesdsd.identifier, SDMX.componentOrder, _head))
order = Collection(entriesdsd.graph, _head)
order.append(CRA.entity)
order.append(CRA.region)
order.append(CRA.function)
order.append(CRA.pog)
order.append(SDMXDIM.refPeriod)
# so needed to circumvent *delete* that the collection does, save
# now to store fully constructed
_dsd = Graph(self.store, identifier=entriesdsd.identifier)
_dsd += entriesdsd.graph
# create the SDMX dataset containing the entries, referencing the
# structure
dataset = DataSet()
entries_ident = CRADATA["%s/entries" % self.edition]
self.entries = dataset.get(entries_ident, Graph(self.store, identifier=entries_ident))
self.entries.label = Literal("UK Country and Regional Analysis, %s Edition, Entries" % self.edition,
lang="en")
self.entries.structure = entriesdsd
# link it to the top dataset
crads_ident = CRADATA[self.edition]
crads = VoidDataset().get(crads_ident, Graph(self.store, identifier=crads_ident))
crads.subset = self.entries
# extract the key order for constructing the urls for individual
# timeseries later
self.components = []
for component in Collection(entriesdsd.graph, order):
q = Select([v.range], distinct=True).where(
(component, RDFS.range, v.range)
)
self.components.extend(ConjunctiveGraph(self.store).query(q.compile()))
[docs] def create_sdmx_timeseries(self, row):
"""
Given a row in the form of a dictionary from :class:`CRAReader`, create
a graph containing an *sdmx:TimeSeries* in the store.
"""
from pprint import pprint
#pprint(row)
def cra2009():
cmd = CRA2009()
cmd.command()
def drop():
'''
Drops from the database all records associated with slice 'cra'.
XXX this will be hard to duplicate in the RDF version
'''
def load_file(fileobj, cofog_mapper):
'''
Loads a file from `fileobj` into the RDF store.
The file should be CSV formatted, with the same structure as the
Country Regional Analysis data.
fileobj - an open, readable file-like object.
'''
# get a hold of the RDFLib store, we bypass most of the ORDF
# machinery here
handler = init_handler(config)
if hasattr(handler, "fourstore"):
store = handler.fourstore.store
else:
store = handler.rdflib.store
# used for simple unoptimised queries over the entire store
defaultgraph = ConjunctiveGraph(store)
# For each line of the file...
reader = csv.reader(fileobj)
header = reader.next()
year_col_start = 10
years = [to_year(x) for x in header[year_col_start:]]
# read through the input and construct the timeseries
for row in reader:
#
# ROW PARSED - NOW CONSTRUCT THE RDF
#
print dept_name, cofog_parts[-1], pog_code, expenditures
# get handles for each of the dimensions in the row
_region = regions.byName(region)
if _region is None:
raise KeyError("Missing Region: %s" % region)
_function = functions.byFog(cofog_parts[-1])
if _function is None:
raise KeyError("Missing COFOG: %s" % cofog_parts)
_ent = entities.byName(dept_name)
if _ent is None:
raise KeyError("Missing Entity: %s" % dept_name)
# pog will never be none since they are created
# on the fly as needed
_pog = pogs.setdefault(pog_code, pog_name)
# exp will raise a key error itself if cap_or_cur is invalid
_exp = exps.byKey(cap_or_cur)
# given the dimensions, calculate the path elements for the URI
# of the series from our component list in the DSD
def series_path(components):
path = []
for component in components:
if component in _ent.cached_types:
path.append(_ent.notation[0])
elif component in _region.cached_types:
path.append(_region.notation[0])
elif component in _function.cached_types:
path.append(_function.notation[0])
elif component in _pog.cached_types:
path.append(_pog.notation[0])
elif component in _exp.cached_types:
path.append(_exp.notation[0])
return "/".join(path).replace(" ", "")
# make the time series, or slice
series_id = URIRef(CRADATA["2009/entries/"] + series_path(components))
series_ns = Namespace(series_id + "#")
series = timeseries.get(identifier=series_id, graph=Graph(store, identifier=series_id))
# add the dimensions present in the data, theoretically these
# could go in a flattened way on the observation itself below,
# bhtat that wouldn't be so efficient and we can always do that
# later
series.addDimension(CRA.region, _region.identifier)
series.addDimension(CRA.entity, _ent.identifier)
series.addDimension(CRA.pog, _pog.identifier)
[series.addDimension(CRA.function, COFOG[x]) for x in cofog_parts if x]
series.addDimension(CRA.expenditureType, _exp.identifier)
series.graph.add((series.identifier, SDMX.dataSet, CRADATA["2009/entries"]))
# special flag just in case
if function.find("of which") >=0 or subfunction.find("of which") >= 0:
series.graph.add((series.identifier, CRA.owf, CRA.OfWhich))
print "Time Series:", series.identifier
for yearSpec, amount in zip(years, expenditures):
if amount == 0.0:
continue
datum = obs.get(identifier = series_ns[yearSpec], graph=series.graph)
datum.addValue(Literal(amount), UKCUR.GBP)
year = govyear.get(yearSpec)
if year.start == current_year:
datum.addDimension(SDMXATTR.obsStatus, SDMXCODE["obsStatus-P"]) # provisional
elif year.end > current_year:
datum.addDimension(SDMXATTR.obsStatus, SDMXCODE["obsStatus-F"]) # future
datum.addDimension(SDMXDIM.refPeriod, year.identifier)
series.observation = datum
print "\tDatum:", datum.identifier
entries.series = series
# testing output
# entries.serialize(series.graph)
# _region.serialize(series.graph)
# _pog.serialize(series.graph)
# _exp.serialize(series.graph)
#
# print series.graph.serialize(format="n3")
# break