Introduction to PyZoltan¶
PyZoltan is as the name suggests, is a Python wrapper for the Zoltan data management library. Although it’s primary purpose is a tool for dynamic load balancing for PySPH, the features are general enough to warrant a separate discussion. In Using the PySPH library, we touched upon how to perform nearest neighbor queries in a distributed environment. In this document, we will introduce the PyZoltan interface in it’s native non-SPH format.
In PyZoltan, we wrap the specific routines and objects that we wish to use. The following features of Zoltan are currently supported:
- Dynamic load balancing using geometric algorithms
- Unstructured point-to-point communication
- Distributed data directories
PyZoltan interfaces with Zoltan, which is itself a library that is called to perform specific tasks on the application data. Information about the application data is provided to Zoltan through the method of callbacks, whereby, query functions are registered with Zoltan. These query functions are called internally by Zoltan and are responsible to provide the correct information about the application data to the library. The user is responsible to make available this data in consonance with the application requirement.
A simple example: Partitioning points¶
The following simple example demonstrates the partitioning of a random collection of points in the unit cube \([0,1]^3\). The objects to be partitioned in this case is therefore the points themselves which can be thought of as particles in an SPH simulation. We begin with some imports:
# Imports
import mpi4py.MPI as mpi
from pyzoltan.core.carray import UIntArray, DoubleArray
from pyzoltan.core import zoltan
from numpy import random
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
comm = mpi.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
colors = ['r', 'g', 'b', 'y', 'm', 'k', 'c', 'burlywood']
Creating the data¶
After the initial imports, we define the local data on each processor and broadcast this to the root node for plotting the initial assignment:
numPoints = 1<<12
x = random.random( numPoints )
y = random.random( numPoints )
z = random.random( numPoints )
gid = np.arange( numPoints*size, dtype=np.uint32 )[rank*numPoints:(rank+1)*numPoints]
X = np.zeros( size * numPoints )
Y = np.zeros( size * numPoints )
Z = np.zeros( size * numPoints )
GID = np.arange( numPoints*size, dtype=np.uint32 )
comm.Gather( sendbuf=x, recvbuf=X, root=0 )
comm.Gather( sendbuf=y, recvbuf=Y, root=0 )
comm.Gather( sendbuf=z, recvbuf=Z, root=0 )
Note
Each object (point) is assigned a unique global identifier (the gid array). The identifiers must be unique for a load balancing cycle.
Note
The data type of the global identifiers is of type ZOLTAN_ID_TYPE (default uint32). This is set at the time of building the Zoltan library.
ZoltanGeometricPartitioner¶
The ZoltanGeometricPartitioner
is a concrete sub-class of
PyZoltan
. This class defines all helper methods needed for
a domain decomposition using a geometric algorithm. After the data has
been initialized, we instantiate the
ZoltanGeometricPartitioner
object and set some parameters:
xa = DoubleArray(numPoints); xa.set_data(x)
ya = DoubleArray(numPoints); ya.set_data(y)
za = DoubleArray(numPoints); za.set_data(z)
gida = UIntArray(numPoints); gida.set_data(gid)
pz = zoltan.ZoltanGeometricPartitioner(
dim=3, comm=comm, x=xa, y=ya, z=za, gid=gida)
pz.set_lb_method('RCB')
pz.Zoltan_Set_Param('DEBUG_LEVEL', '1')
Note
We use CArrays internally to represent the data in PyZoltan. This is done mainly for compatibility with the PySPH particle data structure.
The complete list of parameters can be found in the Zoltan reference
manual. All parameters are supported through the wrapper
PyZoltan.Zoltan_Set_Param()
method. In this example, we set
the desired load balancing algorithm (Recursive Coordinate Bisection)
and the output debug level.
Calling the load balance function¶
Once all the parameters are appropriately set-up, we can ask Zoltan to provide new assignments for the particles:
pz.Zoltan_LB_Balance()
This will call the chosen load balancing function internally and upon return, set a number of lists (arrays) indicating which objects need to be exported and which objects need to be imported. The data attributes for the export lists are:
- numExport : Number of objects to be exported
- exportLocalids : Local indices of the objects to be exported
- exportGlobalids : Global indices of the objects to be exported
- exportProcs : A list of size numExport indicating to which processor each object is exported
And similar arrays for the import lists. The import/export lists returned by Zoltan give an application all the information required to initiate the data transfer.
Note
Zoltan does not perform the data transfer. The data transfer must be done by the application or using the Unstructured communication utilities provided by Zoltan.
Given the new assignments, we once again broadcast this to the root to plot the final partition. The partition generated by this approach is shown below.
We can see that the RCB method has resulted in cuts orthogonal to the domain axes. Each processor has exactly one fourth of the total number of particles.
The code for this example can be found in pyzoltan/core/tests/3d_partition.py
Inverting the Import/Export lists¶
In the example above, Zoltan returned a list of objects that were to be imported and exported. There arise situations in applications however, when only one set of arrays is available. For example, a common scenario is that we might know which objects need to be exported to remote processors but do not know in advance which objects need to be imported. The matter is complicated for dynamic applications because without a knowledge of the number of objects to be imported, we cannot allocate buffers of appropriate size on the receiving end.
For these scenarios when only one set (either import or export) of
arrays is available, we use the
PyZoltan.Zoltan_Invert_Lists()
method to get the other
set.
PyZoltan
exposes this important utility function from
Zoltan by assuming that the export lists are known to the
application. Upon return from this method, the relevant import lists
are also known. Note that the behaviour of import and export lists can
be interchanged from the application.
A simple example demonstrating this is given below:
from pyzoltan.core import carray
from pyzoltan.core import zoltan
import numpy
import mpi4py.MPI as mpi
comm = mpi.COMM_WORLD; rank = comm.Get_rank(); size = comm.Get_size()
if rank == 0:
proclist = numpy.array( [1, 1, 2, 1], dtype=numpy.int32 )
locids = numpy.array( [1, 3, 5, 7], dtype=numpy.uint32 )
glbids = numpy.array( [1, 3, 5, 7], dtype=numpy.uint32 )
if rank == 1:
proclist = numpy.array( [0, 2, 0], dtype=numpy.int32 )
locids = numpy.array( [1, 3, 5], dtype=numpy.uint32 )
glbids = numpy.array( [11, 33, 55], dtype=numpy.uint32 )
if rank == 2:
proclist = numpy.array( [1, 1], dtype=numpy.int32 )
locids = numpy.array( [1, 3], dtype=numpy.uint32 )
glbids = numpy.array( [111, 333], dtype=numpy.uint32 )
# create the Zoltan object
zz = zoltan.PyZoltan(comm)
# set the export lists
numExport = proclist.size; zz.numExport = numExport
zz.exportLocalids.resize(numExport); zz.exportLocalids.set_data(locids)
zz.exportGlobalids.resize(numExport); zz.exportGlobalids.set_data(glbids)
zz.exportProcs.resize(numExport); zz.exportProcs.set_data(proclist)
print 'Proc %d to send %s to %s'%(rank, glbids, proclist)
# Invert the lists
zz.Zoltan_Invert_Lists()
# get the import lists
numImport = zz.numImport
importlocids = zz.importLocalids.get_npy_array()
importglbids = zz.importGlobalids.get_npy_array()
importprocs = zz.importProcs.get_npy_array()
print 'Proc %d to recv %s from %s'%(rank, importglbids, importprocs)
In this example (which is hard coded for up to 3 processors), each
processor artificially creates a list of objects it knows it must send
to remote processors, which is set-up as the export lists for the
PyZoltan
object. Thereafter,
PyZoltan.Zoltan_Invert_Lists()
is called to get the lists
that must be imported by each processor. The output from this example
is shown below:
$ mpirun -n 3 python invert_lists.py
Proc 2 to send [111 333] to [1 1]
Proc 1 to send [11 33 55] to [0 2 0]
Proc 0 to send [1 3 5 7] to [1 1 2 1]
Proc 2 to recv [ 5 33] from [0 1]
Proc 0 to recv [11 55] from [1 1]
Proc 1 to recv [ 1 3 7 111 333] from [0 0 0 2 2]
We can see that after a call to this method, each processor knows of remote data that must be received. In an application, this information can be used to effect the data transfer.
Another option is to use the unstructured communication utilities offered by Zoltan. This is described next.
Unstructured point to point communication¶
In the previous section, we saw how to use the Zoltan library function to invert a set of export indices to get corresponding import indices. Naturally, with a little bit of work, we can structure the requisite communication (MPI send and receives) to exchange the data.
This set of operations is fairly common and Zoltan (PyZoltan)
provides a very convenient utility called ZComm
to perform
such communication. The typical use case for ZComm
is when
we know the list of local objects to send to remote processors but
have no information about the objects to be received. An example
(pyzoltan/core/tests/zcomm.py) demonstrating the use of the
ZComm
object is outlined below.
The example can be run with an arbitrary number of processors. Each processor allocates some data locally and randomly picks \(5\) of these objects to be sent to remote processors. The remote processors are also picked randomly:
import mpi4py.MPI as mpi
import numpy as np
from numpy import random
# import the unstructured communication package
from pyzoltan.core import zoltan_comm
from pyzoltan.core import zoltan
# MPI comm, rank and size
comm = mpi.COMM_WORLD; rank = comm.Get_rank(); size = comm.Get_size()
# each processor creates some random data
numObjectsTotal = 1<<10
x = random.random(numObjectsTotal)
gids = np.array( np.arange(size * numObjectsTotal) )[rank*numObjectsTotal:(rank+1)*numObjectsTotal]
gids = gids.astype(np.uint32)
# arbitrarily assign some objects to be sent to some other processor
nsend = np.int32( random.random_integers(low=1, high=10) )
object_ids = random.random_integers( low=0, high=numObjectsTotal, size=nsend )
proclist = random.random_integers(low=0, high=size-1, size=nsend).astype(np.int32)
my_indices = np.where(proclist == rank)[0]
proclist[my_indices] = (rank+1)%size
This information is sufficient enough to instantiate the
ZComm
object which will be used as the communication plan
to exchange this data. Once the communication plan is setup, each
processor knows of the data it must receive with the ZComm.nreturn
attribute. This is used to allocate receive buffers:
# create the ZComm object
tag = np.int32(0)
zcomm = zoltan_comm.ZComm(comm, tag=tag, nsend=nsend, proclist=proclist)
# the data to send and receive
senddata = x[ object_ids ]
recvdata = np.ones( zcomm.nreturn )
With the send buffer and the allocated receive buffer, we can perform
the communication using the ZComm.Comm_Do()
method:
# use zoltan to exchange doubles
print "Proc %d, Sending %s to %s"%(rank, senddata, proclist)
zcomm.Comm_Do(senddata, recvdata)
print "Proc %d, Received %s"%(rank, recvdata)
Note that the user does not need to explicitly write the MPI send and receive calls. All that is required from the user is to correctly allocate the data on the receive side. The output from this example is (it will vary given the use of random numbers):
$ mpirun -n 3 python zcomm.py
Proc 2, Sending [ 0.83476393 0.07041833 0.20059537 0.7722934 0.4529769 ] to [0 1 0 0 1]
Proc 2, Received [ 0.50391764 0.40028207]
Proc 0, Sending [ 0.50391764] to [2]
Proc 1, Sending [ 0.29755463 0.40028207 0.69433472] to [0 2 0]
Proc 1, Received [ 0.07041833 0.4529769 ]
Proc 0, Received [ 0.29755463 0.69433472 0.83476393 0.20059537 0.7722934 ]
Using the plan for similar communication¶
The ZComm
object was used to send values of type float64
in this example. If the number of objects to be sent and their
destinations are the same, we can modify the ZComm
to send
other objects. For example, the same object can be used to exchange
the global indices (uint32) like so:
senddata = gids[ object_ids ]
recvdata = np.ones(zcomm.nreturn, dtype=np.uint32)
zcomm.set_nbytes(recvdata.itemsize)
print "Proc %d, Sending %s to %s"%(rank, senddata, proclist)
zcomm.Comm_Do(senddata, recvdata)
print "Proc %d, Received %s"%(rank, recvdata)
Note
The ZComm.set_nbytes()
method is used to indicate the size
of the individual objects that is communicated via
ZComm.Comm_Do()
The output with this change is:
$ mpirun -n 3 python zcomm.py
Proc 1, Sending [1054 1692 2034] to [0 2 0]
Proc 0, Sending [214] to [2]
Proc 2, Sending [2720 3034 2511 2412 2975] to [0 1 0 0 1]
Proc 2, Received [ 214 1692]
Proc 1, Received [3034 2975]
Proc 0, Received [1054 2034 2720 2511 2412]
Reversing the communication plan¶
It is often the case for dynamic applications that objects initially shared with remote processors have their values updated on remote processors. Subsequently, these updated values are required on the originating processor, necessitating them to be communicated back.
For such scenarios, the communication plan represented by
ZComm
can be used to reverse the communication. That is
the data that was originally sent will be treated as a receive and
vice-versa.
To illustrate the use of this feature, we continue with our
example. The received data (array of unsigned ints) is modified on the
remote processor and communicated back using the
ZComm.Comm_Do_Reverse()
method:
recvdata[:] = rank
updated_info = np.zeros(zcomm.nsend, dtype=senddata.dtype)
print 'Proc %d, sending updated data %s'%(rank, recvdata)
zcomm.Comm_Do_Reverse(recvdata, updated_info)
print 'Proc %d, received updated data %s'%(rank, updated_info)
Note
The size of the buffer each processor will receive in the reverse communication phase is equal to the number of objects initially sent. This is available through the ZComm.send attribute
The output from this when run on 3 processors is:
$ mpirun -n 3 python zcomm.py
Proc 1, Sending [1054 1692 2034] to [0 2 0]
Proc 0, Sending [214] to [2]
Proc 2, Sending [2720 3034 2511 2412 2975] to [0 1 0 0 1]
Proc 2, Received [ 214 1692]
Proc 1, Received [3034 2975]
Proc 0, Received [1054 2034 2720 2511 2412]
Proc 0, sending updated data [0 0 0 0 0]
Proc 2, sending updated data [2 2]
Proc 1, sending updated data [1 1]
Proc 0, received updated data [2]
Proc 1, received updated data [0 2 0]
Proc 2, received updated data [0 1 0 0 1]
Distributed data directories¶
The Zoltan Distributed Data Directory utility is a convenient way for a processor to locate remote data. It is implemented as a parallel hash map, keyed on the object identifiers (global indices) and with arbitrary user data associated with each entry.
The use of this feature is highly problem dependent since the user defined data will necessarily change for different applications. We use a simple example demonstrating it’s use. Each processor stores ownership of the object in the distributed directory without any user data associated with each entry.
We begin with the standard set of imports and create some data on each processor and assign each object a unique global identifier:
import numpy
import pyzoltan.api as pz
import mpi4py.MPI as mpi
comm = mpi.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# every processor owns some data
numObjectsTotal = 5
my_indices = numpy.array( range( rank*numObjectsTotal,(rank+1)*numObjectsTotal ), dtype=numpy.uint32 )
gid = pz.UIntArray(my_indices.size); gid.set_data( my_indices )
Additionally, each processor has an IntArray which denotes object assignment:
part_assignment = numpy.array( [rank]*numObjectsTotal, dtype=numpy.int32 )
part = pz.IntArray( part_assignment.size ); part.set_data( part_assignment )
This is sufficient data to create the distributed directory:
# create a zoltan dd and store the object assignments
dd = pz.Zoltan_DD(comm)
# update the dd with the data
dd.Zoltan_DD_Update(gid, part)
Note that after instantiation of the Zoltan_DD
object, we
call the Zoltan_DD.Zoltan_DD_Update()
method to update the
data associated with this directory. Now, given the shared data
available with each processor, we can query for object assignments. In
the example below, each processor queries for the objects with global
indices numObjectsTotal + rank and numObjectsTotal - rank:
# now we can query the dd
owner_data = pz.IntArray() # output array for the object data assignment
owner_parts = pz.IntArray() # output array for the object assignment
# every processor requests for information about some data
query_gids = pz.UIntArray(2); query_gids.set_data( numpy.array([numObjectsTotal+rank,
numObjectsTotal-rank], dtype=numpy.uint32) )
# use Zoltan_DD_Find to query the data
dd.Zoltan_DD_Find(query_gids, owner_parts, owner_data)
The result from this quey with \(3\) processors is shown below:
$ mpirun -n 3 python zoltan_dd.py
Processor 0, query_gids = [5 5], owner_parts = [1 1], owner_data = [1 1]
Processor 1, query_gids = [6 4], owner_parts = [1 0], owner_data = [1 0]
Processor 2, query_gids = [7 3], owner_parts = [1 0], owner_data = [1 0]