Examples

Examples to demonstrate the features of batchOpenMPI follow.

Multi-ref example

If the multiRef flag in enabled for a batchOpenMPI.batchFunction instances, then duplicate inputs searched for as to save computing resources.

import batchOpenMPI
def f_mult(x) : 
    return x*2.0
f = batchOpenMPI.batchFunction(f_mult,multiRef=True) #creating function wrapper

batchOpenMPI.begin_MPI_loop(print_launch_messages=False) # both the workers and the master process run the same code up until here
no = range(10) + range(10) # creates [0,1,2,3,4,5,6,7,8,9] x 2
for i in no :# adding all f_inv input and queing them for parallel processing
    f.addtoBatch(i)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = [] #used for storing results
for i in no :
    res.append(f(i))
print(res)

batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers

print("*** jobs executed by workers should be %i, out of the total of %i" % (len(no)/2,len(no)) )

Call expected example

import batchOpenMPI
def f_mult(x) : 
    return x*2.0
f = batchOpenMPI.batchFunction(f_mult) #creating function wrapper

batchOpenMPI.begin_MPI_loop() # both the workers and the master process run the same code up until here
f.addtoBatch(4,calls_expected=4)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = [f(4),f(4),f(4)] 
print(res)

#another test
f.addtoBatch(1)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = f(1), f(1)

batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers

print("*** jobs executed by workers should be 2 ,(5 calls made),jobs uncollected should = 1, jobs_master=1")

Dependency examples

import batchOpenMPI

#defining function
def f_org(x) :
    return x ** 2
def g_org(x) :
    return x + 1
def h_org(x) :
    return x - 0.2

# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org)
h = batchOpenMPI.batchFunction(h_org)

batchOpenMPI.begin_MPI_loop() 
# building processing que
bi = g.addtoBatch(3,2)
print(f.addtoBatch(bi))
print(h.addtoBatch(bi))
batchOpenMPI.processBatch() #get the workers to calculate all the inputs

# now actuall code
print (
"""
f(x) = x ** 2
g(x) = x + 1
h(x) = x - 0.2

f(g(3)) = %f
h(g(3)) = %f
""" ) % (f(g(3)), h(g(3)))

batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers

print('** nothing should be solved on the masters')
import batchOpenMPI

#defining function
def f_org(x) :
    return x ** 2
def g_org(x) :
    return x + 1
def h_org(x) :
    return x - 0.2

# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org,True)
h = batchOpenMPI.batchFunction(h_org)

batchOpenMPI.begin_MPI_loop() 
# building processing que
print(f.addtoBatch(g.addtoBatch(3)))
print(h.addtoBatch(g.addtoBatch(3)))
batchOpenMPI.processBatch() #get the workers to calculate all the inputs

# now actuall code
print (
"""
f(x) = x ** 2
g(x) = x + 1
h(x) = x - 0.2

f(g(3)) = %f
h(g(3)) = %f
""" ) % (f(g(3)), h(g(3)))

batchOpenMPI.end_MPI_loop(print_stats=True)

print('** nothing should be solved on the masters')
import batchOpenMPI

#defining function
def f_org(x) :
    return x ** 2
def g_org(x) :
    return 1.0/x  + 1
def h_org(x) :
    return x - 0.2

# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org,True,permissible_exceptions=[ZeroDivisionError])
h = batchOpenMPI.batchFunction(h_org)

batchOpenMPI.begin_MPI_loop() 
# building processing que
print(f.addtoBatch(g.addtoBatch(3)))
print(h.addtoBatch(g.addtoBatch(3)))
print(f.addtoBatch(g.addtoBatch(0)))
batchOpenMPI.processBatch() #get the workers to calculate all the inputs

# now actuall code
try :
    res3 = str(f(g(0)))
except ZeroDivisionError,msg :
    res3 = 'no solution'

print("""
f(x) = x ** 2
g(x) = 1.0/x + 1
h(x) = x - 0.2

f(g(3)) = %f
h(g(3)) = %f
f(g(0)) = %s
""" ) % (f(g(3)), h(g(3)), res3)

batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers

print('** nothing should be solved on the masters, also 5 jobs where passed but one was discarded')

Downward dependency examples

import batchOpenMPI

#defining function
def f_org(x) :
    return x[0]*(x[1] + x[2])
def g_org(x) :
    return x + 1
def h_org(x) :
    return x - 0.2

# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org)
h = batchOpenMPI.batchFunction(h_org)

batchOpenMPI.begin_MPI_loop() 
# building processing que
bj = f.addtoBatch([2, g.addtoBatch(3) , h.addtoBatch(1)], multiDep=True)
print(bj)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs

print("""
f([x,y,z]) = x * (y + z)
g(x) = 1/x + 1
h(x) = x - 0.2

f([2,g(3),h(1)]) = %f
""" ) % (f([2,g(3),h(1)]))

print(bj)

batchOpenMPI.end_MPI_loop(print_stats=True)


print('** nothing should be solved on the masters')
import batchOpenMPI

#defining function
def f_org(x) :
    return (x[1] + x[0])
def g_org(x) :
    return x + 1

# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org)

batchOpenMPI.begin_MPI_loop() 

print("""in this case, 10 of the 11 jobs will dependen on 1, 
 call with 10 worker processes and see if all of them get given a job to perform""")

# building processing que
gi = g.addtoBatch(3,10)
x_vals = range(10)
for x in x_vals :
    f.addtoBatch([x, gi], multiDep=True)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs

res = []
for x in x_vals :
    res.append(f([x, g(3)]))

print(res)

batchOpenMPI.end_MPI_loop(print_stats=True) #release workers

print('** nothing should be solved on the masters, make sure each worker has a job completed')

Individual working directory for each process

import batchOpenMPI, os

def ex_prog(x) :
    "simulates an external program that writes its output to file"
    f = file('results.txt','w')
    f.write(str(x ** 2))
    f.close() 

def fun_org(x) :
    ex_prog(x)
    f = file('results.txt','r')
    res = float(f.readline().strip())
    f.close()
    return res

fun = batchOpenMPI.batchFunction(fun_org) #creating function wrapper
batchOpenMPI.WorkingDir_base = 'workspace' #giving each worker a directory to workin.

batchOpenMPI.begin_MPI_loop() #split workers and master
no = range(10)  #creating inputs

print("""f(x) = x ** 2

the results will be written to file, and as the result file name will be the same. 
each process will be given its own workspace using batchOpenMPI.WorkingDir_base""")

print("\ninputs :" + str(no))


for i in no :# adding all f_inv input and queing them for parallel processing
    fun.addtoBatch(i)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = [] #used for storing results
for i in no :
    res.append(fun(i))
print("results : " + str(res))

batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers

print('note that the working directory are deleted when the job completes...')

Table Of Contents

Previous topic

Getting Started

This Page