Full Examples

This page contains three full script examples for using the bops module as well as a script to generate the test data.

  • A word count example
  • Data generation script
  • MapReduce script using sample data
  • A comparison of the ‘mapreduce’ and ‘mapreducebatch’ functions

Mailing List

A mailing list has been created to support the use of this module. You can join and follow the discussion on Google groups. Any errors, issues and enhancements can be discussed here.

Bops aims to be a top-notch data analysis module, but only with your help can this module actually be great. Please chime into the discussion. Your inputs are welcome as well as any suggested features, patches or fixes.

Word Count

Word counting is the Hello World! for MapReduce. I’ve included an example using bops. The example finds the top 5 most frequent words in the paragraph, as well as a count of the words’ starting letter in numeric order of occurrence.

There are basically four steps taking place.

  • Initialize data in a ‘bop’ instance
  • Write ‘mapper’ function
  • Run map reduce job using ‘mapper’ function and Python’s built-in ‘sum’ function
  • Sort results by frequency and grab Top 5
from bops import bop
import re

text = """
bops stands for boolean array operations.
bops uses numpy to do boolean operations on numpy.ndarrays.
This module is meant to simpifiy boolean operations on lists and arrays.
This module also allows for combining boolean arrays to get the logical AND,
as well as the OR, for multiple boolean arrays.
This functionality allows for faster data filtering on multiple aspects of the data.
"""

#remove puncuation
text = re.sub(r'\.|,|!|:|;', '', text)

#split on spaces and remove newlines
w = [(a.lower().strip(),) for a in text.split(' ')]

# create bop instance
word_count = bop(w, 'word')

#map function for finding unique words
def unique_words_mapper(row):
	return row.word, 1

#run map reduce job, using sum as the reducer
word_count_results = word_count.mapreduce(unique_words_mapper, sum, expand=True)

# top 5 words
print "\nTop 5 words: "
for i, (w, c) in enumerate(sorted(word_count_results, key=lambda kv: -kv[1])[:5]):
	print str(i+1)+". " +str(c)+" - "+w
print

#loop through all results, sorted on count in descending order (higher values first)
# for word, count in sorted(word_count_results, key=lambda kv: -kv[1]):
	# print word, count

#mapping on first letter
def startswith_mapper(row):
	return row.word[0], row

# group words on the letter they start with
startwith_results = word_count.mapreduce(startswith_mapper, len, expand=True, sort=True)

# show startswith results
print "Startswith results: "
for letter, count in sorted(startwith_results, key=lambda kv: -kv[1]):
	print letter, count
print


print "\nDone."

Sample Data

This script generates the sample data for the example scripts. The generated data produces a CSV (comma-delimited file) which contains a list of people with names, ages, gender, years in college and number of friends.

import numpy

class Person(object):
	"""docstring for Person"""
	def __init__(self):
		self.names = [
			('Mary', 'F'), 			('Marsha', 'F'),	('Max', 'M'),
			('Joe', 'M'), 			('John', 'M'),		('Jacob', 'M'),
			('Bob', 'M'), 			('Billy', 'M'),		('Bobby', 'M'),
			('Zaphod', 'M'), 		('Zack', 'M'),		('Zackary', 'M'),
			('Trillian', 'F'), 	('Tristan', 'F'),	('Trinity', 'F'),
			('Ford', 'M'), 		  ('Jim', 'M'),			('Jimmy', 'M'),
			('Arthor', 'M'), 	  ('Andy', 'M'),		('Anna', 'F'),
			('Jax', 'M'), 		  ('Jason', 'M'),		('Johnathan', 'M'),
			('Marvin', 'M'), 	  ('Michael', 'M'),	('Mike', 'M'),
			('Lucy', 'F'), 		  ('Linda', 'F'),		('Lisa', 'F')
			]
		self.id = int(numpy.random.rand(1)[0]*len(self.names))
	def name(self):
		return self.names[self.id][0]
	def gender(self):
		return self.names[self.id][1]
	def age(self):
		return int(numpy.random.rand(1)[0] * 52 +18)
	def college(self):
		return int(numpy.random.rand(1)[0] * 6 + 1)
	def friends(self):
		return int(numpy.random.rand(1)[0] * 500)

people = [Person() for a in range(250000)]


#output file
out = open('people.csv', 'w')
out.write('name,gender,age,years in college,number of friends\n')

previous = Person()
for p in people:
	if p.name() == previous.name():
		p = Person()
	s = ','.join(str(a) for a in [p.name(), p.gender(), p.age(), p.college(), p.friends()])+'\n'
	out.write(s)
	previous = p

out.close()
print "Done."

Comparison of the two map reduce functions: ‘mapreduce’ and ‘mapreducebatch’.

from bops import *
import numpy as np

'''

          -------------------- DESCRIPTION -----------------------

This script gives an example as to how to use the bops' MapReduce functionality.

The script title may be a bit deceiving, however, simply due to the fact 
  that the map reduce paradigm is not yet widely known or used.
Many complex analysis principles are shown in the script below. Lack of 
  familiarity with python, it's syntax or simply having not used lists, 
  dictionaries or numpy arrays before will make this difficult to understand.

          ----------------------- NUMPY -----------------------

Numpy is used as the backbone to the bops module. Although, some of the MapReduce 
  functionality is in pure python, to gain performance consider using 
  the 'mapreducebatch' method. However, in the persuit of speed, the batch 
  method does add some complexity as well. For simplicity, the 'mapreduce' function
  may make things easier.

          --------------------- TEST DATA ---------------------

The test data used is a file with a list of 250000. 
The file has 5 columns:
  name, gender, age, years in college and number of friends

The test data was produced by 'test_data_gen.py'.

This data will be used to show the usefulness and conciseness of using bops for 
  MapReduce operations.

Several questions are asked about the data:
    1. How many college graduates are there for both genders, broken up by age group?
    2. What's the total number of college graduates for each gender?
    3. What's the total number of college graduates across both genders?

    NOTE: For simplicity, a college graduate is defined as someone who has 
      spent more than 4 years in college.

For more information on the MapReduce paradigm and algorithms, read the article 
  on Wikipedia.

'''

if __name__ == '__main__':

  print "\nComparing mapreducebatch and mapreduce functions.\n"

  # All lines in the file are read into the lines list
  # All lines are 'stripped', meaning any newlines are removed
  # All lines are then 'split' on commas, producing another list
  # Therefore the lines list is a 2d list representing the file.
  print "Reading data..."
  lines = []
  with open("people.csv", 'r') as f:
    f.readline()  #remove first line from file (header line)
    for line in f:
      lines.append(line.strip().split(','))
  
  # After the file has been read, the data is then put into a 'bop'. 
  # This class has several useful attributes for manipulating data. However,
  #   only the MapReduce portion will be covered in this script.
  # The constructor requires two arguments, the data and the column names.
  print "Aggregating data..."
  cols = 'name,gender,age,college,friends'
  data = bop(lines, cols)
  print

  print "\nUsing 'mapreducebatch' function:\n"+"-"*32
  print

  # Question 1.
  print "How many college graduates are there for both genders, broken up by age group?"

  # Define graduated
  # This is a reducer do be used on the data for each group after it has passed 
  #   through a map operation.
  # NOTE: All reducers used with the mapreducebatch method are given the entire 
  #   mapped data group
  # NOTE: All reducers not used with the mapreducebatch method are given each 
  #   element of the mapped data group
  # This reducer returns the numpber of ppl who have more than 4 years in college
  def graduated(data):
    return len(np.nonzero(data.college > 4)[0])
 
  # This is one attribute of the data describing the age groups that ppl belong to.
  # This basically determines the decade of your age. 
  # Simply put, if you are 35, it returns 30, for 58 it returns 50
  # This allows the data to be aggregated by ppl of similar age
  agegroup = data.age // 10 * 10

  # This is map reduce operation call
  # It finds all the unique combinations of gender and age group and passes 
  #   each unique group to the reducer separately.
  # If a reducer is left out, then the data returned is the raw data that belong to that group.
  # The 'expand' option makes the output easier to deal with, however, if you 
  #   only want a key/value pair to be returned, leave out this option.
  #The 'names' option are the column names to be returned
  gender_age_grad = data.mapreducebatch([data.gender, agegroup], reducer=graduated, expand=True, names='gender,agegroup,graduates')

  # This orders the data by gender and age group for ordered output.  
  gender_age_grad.orderby('gender','agegroup')

  # Output the results in a pretty fashion
  print
  print repr("Gender").rjust(7),repr("Age Group").rjust(4),repr(">4yrs in college").rjust(17)
  for gender, age, grad in gender_age_grad:
    print repr(gender).ljust(9),repr(age).ljust(11),repr(grad).ljust(17)
  print

  # Question 2.
  print "What's the total number of college graduates for each gender?"

  # This reducer sums all the counts from the previous map reduce
  # The previous map reduce job will produce something like this:
  #   [('F', 10, 978) 
  #    ('F', 20, 4830) 
  #    ('F', 30, 4796)
  #         ....
  #    ('M', 40, 11313)
  #    ('M', 50, 11264)
  #    ('M', 60, 11102)]
  # As you can see, all elements have the gender and age group as well as the count of college graduates
  # This reducer sums the graduate column for each gender
  def gender_graduates(group):
    return sum(group.graduates)

  # The mapper used only groups by gender and passes the list of counts to the reducer to sum
  # The results are also named by the columns: gender and graduates
  gender = gender_age_grad.mapreducebatch([gender_age_grad.gender], reducer=gender_graduates, names='gender,graduates')

  # print the results in a readable fashion
  print
  print repr("Gender").rjust(7),repr(">4yrs in college").rjust(17)
  for g, grads in gender:
    print repr(g).ljust(8),repr(grads).ljust(18)
  print

  # Question 3.
  print "What's the total number of college graduate across both genders?"

  # This mapper is not to be used with the batch as it is meant to be passed every element.
  # Because all elements are passed to it we can combine both genders as one map result named 'Both'
  def both_genders(group):
    return "Both", group.graduates

  # This map reduce job combine the genders and then sums the graduates from both genders
  mr = gender.mapreduce(both_genders, sum, expand=True)
  label, value = mr[0]

  #print the results
  print str(label).ljust(8),str(value).ljust(18),""


  def ga_mapper(row):
    return (row.gender, row.age // 10 * 10), row
  
  def grads(group):
    return sum([1 for p in group if p.college > 4])


  print "\nUsing 'mapreduce' function:\n"+"-"*27
  print

  gender_age_grad = data.mapreduce(ga_mapper, reducer=grads, expand=True, sort=True)
  for gender, age_group, grads in (gender_age_grad):
    print gender, age_group, grads

  # The following shows how to alias a function as a data attribute. This is a 
  # shorthand for calling a function for a given numpy array.
  # By applying the alias name=f, 'data.gender_name' is the same as 'f(data.gender)'.
  print "\nAlias example..."
  def f(array):
    gender = []
    for g in array:
      if g in 'F':
        gender.append('Female')
      else:
        gender.append('Male')
    return np.asarray(gender)
  
  data.alias(name=f)
  print data.gender_name

  print "Done."

Ever want to do a popularity analysis... OK, probably not, but here’s how you could. This also shows performance testing to compare the two map reduce functions: ‘mapreduce’ and ‘mapreducebatch’.

from bops import *
import numpy as np
import time

'''
This script compares the numpy mapreduce implmentation verses the pure python implementation.

Rough testing reveals ~6x boost using numpy.

'''

if __name__ == '__main__':

  times = []
  fast = True
  pure = True
  sandbox = True

  print "Reading data..."
  lines = []
  with open("people.csv", 'r') as f:
    f.readline()  #remove first line from file (header line)
    for line in f:
      lines.append(line.strip().split(','))
  
  print "Aggregating data..."
  cols = 'name,gender,age,college,friends'
  data = bop(lines, cols)

  if fast:
    t1 = time.clock()
    friendgroup = data.friends // 100 * 100
    agegroup = data.age // 10 * 10

    gender_age_friends = data.mapreducebatch([data.gender, agegroup, friendgroup], reducer=len, expand=True, names='gender,agegroup,friendgroup,group')

    # This orders the data by gender and age group for ordered output.
    gender_age_friends.orderby('gender','agegroup', 'friendgroup')
    elapsed = (time.clock() - t1)
    print "batch: %0.2fs" % elapsed
    times.append("batch: %0.2fs" % elapsed)

    # Output the results in a pretty fashion
    print
    print repr("Gender").rjust(7),repr("Age Group").rjust(11),repr("Popularity").ljust(11),repr("Counts").ljust(7)
    for gender, age, friend, group in gender_age_friends:
      print repr(gender).ljust(9),repr(age).ljust(11),repr(friend).ljust(12),repr(group).ljust(7)
    print

  #pure python
  if pure:
    t1 = time.clock()
    def gaf_mapper(item):
    	return (item.gender, item.age // 10 * 10, item.friends // 100 * 100), item

    gender_age_friends = data.mapreduce(mapper=gaf_mapper, reducer=len, expand=True, sort=True)

    elapsed = (time.clock() - t1)
    print "pure: %0.2fs" % elapsed
    times.append("pure: %0.2fs" % elapsed)

    # Output the results in a pretty fashion
    print
    print repr("Gender").rjust(7),repr("Age Group").rjust(11),repr("Popularity").ljust(11),repr("Counts").ljust(7)
    for gender, age, friend, group in gender_age_friends:
      print repr(gender).ljust(9),repr(age).ljust(11),repr(friend).ljust(12),repr(group).ljust(7)
    print

  # sandbox for testing
  if sandbox:
    print "\nPopularity Analysis...\n"

    def pop_mapper(person):
      pops = ['0 - Unsocial/New', '1 - Outcasts', '2 - Groupie', '3 - Known Of', '4 - F & C Crowd']
      gender = ''
      if person.gender == 'F': gender = 'Female'
      else: gender = 'Male'
      return (gender, pops[person.friends // 100]), 1 # person

    def print_pop_summary(pop_classes):
      lens = []
      for c in range(len(pop_classes[0])):
        lens.append(max([len(str(t[c])) for t in pop_classes]) + 3)
      for row in pop_classes:
        s = ','.join([repr(col).ljust(lens[i]) for i, col in enumerate(row)])
        print s
      print

    print "\nAll Ages:"
    pop_class = data.mapreduce(pop_mapper, sum, expand=True, sort=True)
    print_pop_summary(pop_class)

    print "\nAll Females <= 25 yrs old"
    young = data.select((data.age <= 25) * (data.gender == 'F'))
    pop_class = young.mapreduce(pop_mapper, sum, expand=True, sort=True)
    print_pop_summary(pop_class)

  print "\nMapReduce Comparison\n"
  for t in times:
    print t

  print "\nDone."

Table Of Contents

This Page