Jug is a simple way to write easily parallelisable programs in Python. It also handles intermediate results for you.
This is a simple worked-through example which illustrates what jug does.
Assume that I want to do the following to a collection of images:
I could write the following simple code:
imgs = glob('*.png') features = [computefeatures(img,parameter=2) for img in imgs] clusters =  bics =  for k in xrange(2,200): for repeat in xrange(10): clusters.append(kmeans(features,k=k,random_seed=repeat)) bics.append(compute_bic(clusters[-1])) Nr_clusters = argmin(bics) // 10
Very simple and solves the problem. However, if I want to take advantage of the obvious parallelisation of the problem, then I need to write much more complicated code. My traditional approach is to break this down into smaller scripts. I’d have one to compute features for some images, I’d have another to merge all the results together and do some of the clustering, and, finally, one to merge all the results of the different clusterings. These would need to be called with different parameters to explore different areas of the parameter space, so I’d have a couple of scripts just for calling the main computation scripts. Intermediate results would be saved and loaded by the different processes.
This has several problems. The biggest are
There are minor issues with having to issue several jobs (and having the cluster be idle in the meanwhile), or deciding on how to partition the jobs so that they take roughly the same amount of time, but the two above are the main ones.
Jug solves all these problems!
The main unit of jug is a Task. Any function can be used to generate a Task. A Task can depend on the results of other Tasks.
The original idea for jug was a Makefile-like environment for declaring Tasks. I have moved beyond that, but it might help you think about what Tasks are.
You create a Task by giving it a function which performs the work and its arguments. The arguments can be either literal values or other tasks (in which case, the function will be called with the result of those tasks!). Jug also understands lists of tasks and dictionaries with tasks. For example, the following code declares the necessary tasks for our problem:
imgs = glob('*.png') feature_tasks = [Task(computefeatures,img,parameter=2) for img in imgs] cluster_tasks =  bic_tasks =  for k in xrange(2,200): for repeat in xrange(10): cluster_tasks.append(Task(kmeans,feature_tasks,k=k,random_seed=repeat)) bic_tasks.append(Task(compute_bic,cluster_tasks[-1])) Nr_clusters = Task(argmin,bic_tasks)
In the code above, there is a lot of code of the form Task(function,args), so maybe it should read function(args). A simple helper function aids this process:
from jug.task import Task def TaskGenerator(function): def gen(*args,**kwargs): return Task(function,*args,**kwargs) return gen computefeatures = TaskGenerator(computefeatures) kmeans = TaskGenerator(kmeans) compute_bic = TaskGenerator(compute_bic) @TaskGenerator def Nr_Clusters(bics): return argmin(bics) // 10 imgs = glob('*.png') features = [computefeatures(img,parameter=2) for img in imgs] clusters =  bics =  for k in xrange(2,200): for repeat in xrange(10): clusters.append(kmeans(features,k=k,random_seed=repeat)) bics.append(compute_bic(clusters[-1])) Nr_clusters(bics)
You can see that this code is almost identical to our original sequential code, except for the declarations at the top and the fact that Nr_clusters is now a function (actually a TaskGenerator, look at the use of a declarator).
This file is called the jugfile (you should name it jugfile.py on the filesystem) and specifies your problem. Of course, TaskManager is already a part of jug and those first few lines could have read
from jug.task import TaskGenerator
So far, we have achieved seemingly little. We have turned a simple piece of sequential code into something that generates Task objects, but does not actually perform any work. The final piece is jug. Jug takes these Task objects and runs them. Its main loop is basically
while len(tasks) > 0: for t in tasks: if can_run(t): # ensures that all dependencies have been run if need_to_run(t) and not is_running(t): t.run() tasks.remove(t)
If you run jug on the script above, you will simply have reproduced the original code with the added benefit of having all the intermediate results saved.
The interesting is what happens when you run several instances of jug at the same time. They will start running Tasks, but each instance will run its own tasks. This allows you to take advantage of multiple processors in a way that keeps the processors all occupied as long as there is work to be done, handles the implicit dependencies, and passes functions the right values. Note also that, unlike more traditional parallel processing frameworks (like MPI), jug has no problems with the number of participating processors varying throughout the job.
Behind the scenes, jug is using the filesystem to both save intermediate results (which get passed around) and to lock running tasks so that each task is only run once (the actual main loop is thus a bit more complex than shown above).
You can obtain the final results of your computation by setting up a task that saves them to disk and loading them from there. If the results of your computation are simple enough, this might be the simplest way.
Another way, which is also the way to access the intermediate results if you want them, is to run the jug script and then access the result property of the Task object. For example,
img = glob('*.png') features = [computefeatures(img,parameter=2) for img in imgs] ... feature_values = [feat.result for feat in features]
If the values are not accessible, this raises an exception.
jug is an attempt to get something that works in the setting that I have found myself in: code that is embarrassingly parallel with a couple of points where all the results of previous processing are merged, often in a simple way. It is also a way for me to manage either the explosion of temporary files that plagued my code and the brittleness of making sure that all results from separate processors are merged correctly in my ad hoc scripts.
This is not an attempt to replace MPI in any way. For code that has more merge points, this won’t do. It also won’t do if the individual tasks are so small that the over-head of managing them swamps out the performance gains of parallelisation. In my code, most of the times, each task takes 20 seconds to a few minutes. Just enough to make the managing time irrelevant, but fast enough that the main job can be broken into thousands of tiny pieces. As a rule of thumb, tasks that last less than 5 seconds should probably be merged together.
The system makes it too easy to save all intermediate results and run out of disk space.
This is still Python, not a true parallel programming language. The abstraction will sometimes leak through, for example, if you try to pass a Task to a function which expects a real value. Recall how we had to re-write the line Nr_clusters = argmin(bics) // 10 above.