Examples to demonstrate the features of batchOpenMPI follow.
If the multiRef flag in enabled for a batchOpenMPI.batchFunction instances, then duplicate inputs searched for as to save computing resources.
import batchOpenMPI
def f_mult(x) :
return x*2.0
f = batchOpenMPI.batchFunction(f_mult,multiRef=True) #creating function wrapper
batchOpenMPI.begin_MPI_loop(print_launch_messages=False) # both the workers and the master process run the same code up until here
no = range(10) + range(10) # creates [0,1,2,3,4,5,6,7,8,9] x 2
for i in no :# adding all f_inv input and queing them for parallel processing
f.addtoBatch(i)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = [] #used for storing results
for i in no :
res.append(f(i))
print(res)
batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers
print("*** jobs executed by workers should be %i, out of the total of %i" % (len(no)/2,len(no)) )
import batchOpenMPI
def f_mult(x) :
return x*2.0
f = batchOpenMPI.batchFunction(f_mult) #creating function wrapper
batchOpenMPI.begin_MPI_loop() # both the workers and the master process run the same code up until here
f.addtoBatch(4,calls_expected=4)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = [f(4),f(4),f(4)]
print(res)
#another test
f.addtoBatch(1)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = f(1), f(1)
batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers
print("*** jobs executed by workers should be 2 ,(5 calls made),jobs uncollected should = 1, jobs_master=1")
import batchOpenMPI
#defining function
def f_org(x) :
return x ** 2
def g_org(x) :
return x + 1
def h_org(x) :
return x - 0.2
# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org)
h = batchOpenMPI.batchFunction(h_org)
batchOpenMPI.begin_MPI_loop()
# building processing que
bi = g.addtoBatch(3,2)
print(f.addtoBatch(bi))
print(h.addtoBatch(bi))
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
# now actuall code
print (
"""
f(x) = x ** 2
g(x) = x + 1
h(x) = x - 0.2
f(g(3)) = %f
h(g(3)) = %f
""" ) % (f(g(3)), h(g(3)))
batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers
print('** nothing should be solved on the masters')
import batchOpenMPI
#defining function
def f_org(x) :
return x ** 2
def g_org(x) :
return x + 1
def h_org(x) :
return x - 0.2
# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org,True)
h = batchOpenMPI.batchFunction(h_org)
batchOpenMPI.begin_MPI_loop()
# building processing que
print(f.addtoBatch(g.addtoBatch(3)))
print(h.addtoBatch(g.addtoBatch(3)))
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
# now actuall code
print (
"""
f(x) = x ** 2
g(x) = x + 1
h(x) = x - 0.2
f(g(3)) = %f
h(g(3)) = %f
""" ) % (f(g(3)), h(g(3)))
batchOpenMPI.end_MPI_loop(print_stats=True)
print('** nothing should be solved on the masters')
import batchOpenMPI
#defining function
def f_org(x) :
return x ** 2
def g_org(x) :
return 1.0/x + 1
def h_org(x) :
return x - 0.2
# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org,True,permissible_exceptions=[ZeroDivisionError])
h = batchOpenMPI.batchFunction(h_org)
batchOpenMPI.begin_MPI_loop()
# building processing que
print(f.addtoBatch(g.addtoBatch(3)))
print(h.addtoBatch(g.addtoBatch(3)))
print(f.addtoBatch(g.addtoBatch(0)))
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
# now actuall code
try :
res3 = str(f(g(0)))
except ZeroDivisionError,msg :
res3 = 'no solution'
print("""
f(x) = x ** 2
g(x) = 1.0/x + 1
h(x) = x - 0.2
f(g(3)) = %f
h(g(3)) = %f
f(g(0)) = %s
""" ) % (f(g(3)), h(g(3)), res3)
batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers
print('** nothing should be solved on the masters, also 5 jobs where passed but one was discarded')
Downward dependency examples
import batchOpenMPI
#defining function
def f_org(x) :
return x[0]*(x[1] + x[2])
def g_org(x) :
return x + 1
def h_org(x) :
return x - 0.2
# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org)
h = batchOpenMPI.batchFunction(h_org)
batchOpenMPI.begin_MPI_loop()
# building processing que
bj = f.addtoBatch([2, g.addtoBatch(3) , h.addtoBatch(1)], multiDep=True)
print(bj)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
print("""
f([x,y,z]) = x * (y + z)
g(x) = 1/x + 1
h(x) = x - 0.2
f([2,g(3),h(1)]) = %f
""" ) % (f([2,g(3),h(1)]))
print(bj)
batchOpenMPI.end_MPI_loop(print_stats=True)
print('** nothing should be solved on the masters')
import batchOpenMPI
#defining function
def f_org(x) :
return (x[1] + x[0])
def g_org(x) :
return x + 1
# creating batch functions
f = batchOpenMPI.batchFunction(f_org)
g = batchOpenMPI.batchFunction(g_org)
batchOpenMPI.begin_MPI_loop()
print("""in this case, 10 of the 11 jobs will dependen on 1,
call with 10 worker processes and see if all of them get given a job to perform""")
# building processing que
gi = g.addtoBatch(3,10)
x_vals = range(10)
for x in x_vals :
f.addtoBatch([x, gi], multiDep=True)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = []
for x in x_vals :
res.append(f([x, g(3)]))
print(res)
batchOpenMPI.end_MPI_loop(print_stats=True) #release workers
print('** nothing should be solved on the masters, make sure each worker has a job completed')
import batchOpenMPI, os
def ex_prog(x) :
"simulates an external program that writes its output to file"
f = file('results.txt','w')
f.write(str(x ** 2))
f.close()
def fun_org(x) :
ex_prog(x)
f = file('results.txt','r')
res = float(f.readline().strip())
f.close()
return res
fun = batchOpenMPI.batchFunction(fun_org) #creating function wrapper
batchOpenMPI.WorkingDir_base = 'workspace' #giving each worker a directory to workin.
batchOpenMPI.begin_MPI_loop() #split workers and master
no = range(10) #creating inputs
print("""f(x) = x ** 2
the results will be written to file, and as the result file name will be the same.
each process will be given its own workspace using batchOpenMPI.WorkingDir_base""")
print("\ninputs :" + str(no))
for i in no :# adding all f_inv input and queing them for parallel processing
fun.addtoBatch(i)
batchOpenMPI.processBatch() #get the workers to calculate all the inputs
res = [] #used for storing results
for i in no :
res.append(fun(i))
print("results : " + str(res))
batchOpenMPI.end_MPI_loop(print_stats=True) #releases workers
print('note that the working directory are deleted when the job completes...')