Source code for abcpy.backends

from abc import ABCMeta, abstractmethod

[docs]class Backend(metaclass = ABCMeta): """ This is the base class for every parallelization backend. It essentially resembles the map/reduce API from Spark. An idea for the future is to implement a MPI version of the backend with the hope to be more complient with standard HPC infrastructure and a potential speed-up. """ @abstractmethod
[docs] def parallelize(self, list): """ This method distributes the list on the available workers and returns a reference object. The list should be split into number of workers many parts. Each part should then be sent to a separate worker node. Parameters ---------- list: Python list the list that should get distributed on the worker nodes Returns ------- PDS class (parallel data set) A reference object that represents the parallelized list """ raise NotImplemented
@abstractmethod
[docs] def broadcast(self, object): """ Send object to all worker nodes without splitting it up. Parameters ---------- object: Python object An abitrary object that should be available on all workers Returns ------- BDS class (broadcast data set) A reference to the broadcasted object """ raise NotImplemented
@abstractmethod
[docs] def map(self, func, pds): """ A distributed implementation of map that works on parallel data sets (PDS). On every element of pds the function func is called. Parameters ---------- func: Python func A function that can be applied to every element of the pds pds: PDS class A parallel data set to which func should be applied Returns ------- PDS class a new parallel data set that contains the result of the map """ raise NotImplemented
@abstractmethod
[docs] def collect(self, pds): """ Gather the pds from all the workers, send it to the master and return it as a standard Python list. Parameters ---------- pds: PDS class a parallel data set Returns ------- Python list all elements of pds as a list """ raise NotImplemented
[docs]class PDS: """ The reference class for parallel data sets (PDS). """ @abstractmethod
[docs] def __init__(self): raise NotImplemented
[docs]class BDS: """ The reference class for broadcast data set (BDS). """ @abstractmethod
[docs] def __init__(self): raise NotImplemented
@abstractmethod
[docs] def value(self): """ This method should return the actual object that the broadcast data set represents. """ raise NotImplemented
[docs]class BackendDummy(Backend): """ This is a dummy parallelization backend, meaning it doesn't parallelize anything. It is mainly implemented for testing purpose. """
[docs] def __init__(self): pass
[docs] def parallelize(self, python_list): """ This actually does nothing: it just wraps the Python list into dummy pds (PDSDummy). Parameters ---------- python_list: Python list Returns ------- PDSDummy (parallel data set) """ return PDSDummy(python_list)
[docs] def broadcast(self, object): """ This actually does nothing: it just wraps the object into BDSDummy. Parameters ---------- object: Python object Returns ------- BDSDummy class """ return BDSDummy(object)
[docs] def map(self, func, pds): """ This is a wrapper for the Python internal map function. Parameters ---------- func: Python func A function that can be applied to every element of the pds pds: PDSDummy class A pseudo-parallel data set to which func should be applied Returns ------- PDSDummy class a new pseudo-parallel data set that contains the result of the map """ result_map = map(func, pds.python_list) result_pds = PDSDummy(list(result_map)) return result_pds
[docs] def collect(self, pds): """ Returns the Python list stored in PDSDummy Parameters ---------- pds: PDSDummy class a pseudo-parallel data set Returns ------- Python list all elements of pds as a list """ return pds.python_list
[docs]class PDSDummy(PDS): """ This is a wrapper for a Python list to fake parallelization. """
[docs] def __init__(self, python_list): self.python_list = python_list
[docs]class BDSDummy(BDS): """ This is a wrapper for a Python object to fake parallelization. """
[docs] def __init__(self, object): self.object = object
[docs] def value(self): return self.object
[docs]class BackendSpark(Backend): """ A parallelization backend for Apache Spark. It is essetially a wrapper for the required Spark functionality. """
[docs] def __init__(self, sparkContext, parallelism=4): """ Initialize the backend with an existing and configured SparkContext. Parameters ---------- sparkContext: pyspark.SparkContext an existing and fully configured PySpark context parallelism: int defines on how many workers a distributed dataset can be distributed """ self.sc = sparkContext self.parallelism = parallelism
[docs] def parallelize(self, python_list): """ This is a wrapper of pyspark.SparkContext.parallelize(). Parameters ---------- list: Python list list that is distributed on the workers Returns ------- PDSSpark class (parallel data set) A reference object that represents the parallelized list """ rdd = self.sc.parallelize(python_list, self.parallelism) pds = PDSSpark(rdd) return pds
[docs] def broadcast(self, object): """ This is a wrapper for pyspark.SparkContext.broadcast(). Parameters ---------- object: Python object An abitrary object that should be available on all workers Returns ------- BDSSpark class (broadcast data set) A reference to the broadcasted object """ bcv = self.sc.broadcast(object) bds = BDSSpark(bcv) return bds
[docs] def map(self, func, pds): """ This is a wrapper for pyspark.rdd.map() Parameters ---------- func: Python func A function that can be applied to every element of the pds pds: PDSSpark class A parallel data set to which func should be applied Returns ------- PDSSpark class a new parallel data set that contains the result of the map """ rdd = pds.rdd.map(func) new_pds = PDSSpark(rdd) return new_pds
[docs] def collect(self, pds): """ A wrapper for pyspark.rdd.collect() Parameters ---------- pds: PDSSpark class a parallel data set Returns ------- Python list all elements of pds as a list """ python_list = pds.rdd.collect() return python_list
[docs]class PDSSpark(PDS): """ This is a wrapper for Apache Spark RDDs. """
[docs] def __init__(self, rdd): """ Returns ------- rdd: pyspark.rdd initialize with an Spark RDD """ self.rdd = rdd
[docs]class BDSSpark(BDS): """ This is a wrapper for Apache Spark Broadcast variables. """
[docs] def __init__(self, bcv): """ Parameters ---------- bcv: pyspark.broadcast.Broadcast Initialize with a Spark broadcast variable """ self.bcv = bcv
[docs] def value(self): """ Returns ------- object returns the referenced object that was broadcasted. """ return self.bcv.value