Parallelization with Python

Parallelization With Python

If you have a bunch of smaller jobs you have to go trough you might think of parallelization. While some software offers native parallel execution many do not. Python offers a very simple way to parallelize OpenMP jobs (no shared memory). An example.

# ----------------------------------------------------------------
# This is the worker function which will be used to run the tasks.
# ----------------------------------------------------------------
def worker( x ):
   print "Processing \"%s\" (%s)" % (x["gsfile"],x["description"])
   return # Returns "None"

# ----------------------------------------------------------------
# Main script
# ----------------------------------------------------------------
if __name__ == "__main__":

   from multiprocessing import Pool
   import subprocess, sys   

   # Specify number of cores (integer > 0)
   n_cores = 2

   # Parameter/data for the process. A python list. In this case a list of
   # dictionaries containing a GrADS script name (gsfile) and a human readable
   # description (description). Each list entry (each dict) will be used to
   # call the parallel worker.
   par = [{"gsfile":"t2m.gs",  "description":"2m Surface Temperature Forecasts"},
          {"gsfile":"cct.gs",  "description":"Total Cloud Cover Forecasts"},
          {"gsfile":"ff700.gs","description":"700hPa Wind Forecasts"},
          {"gsfile":"rh850.gs","description":"850hPa Relative Humidity"}]

   results = [] # to store returns from the 'worker'

   # ----------------------------------------------------------------
   # - Parallel?
   # ----------------------------------------------------------------
   if n_cores > 1:
      print '--pooling starts now--'
      pool = Pool( processes=n_cores )
      # Calls the 'worker' function in parallel using the 'par' list. 
      # Each entry will be used once as a task to start a process.
      r = pool.map_async(worker, par, callback=results.append)
      r.wait() # Wait for the results
      print '--pooling ended--'
      if not r.successful():
         print r._value; sys.exit('Parallelization not successful')

   # ----------------------------------------------------------------
   # ... or sequential?
   # ----------------------------------------------------------------
   else:
      for i in range(0,len(par)): results.append( worker(par[i]) )

Well, for sure the example above is relatively useless as the worker method only returns a print of the inputs it gots. However, it demonstrates the OpenMP parallelization procedure. For a more practical use the worker should do something useful. I wrote this example as I got asked by a friend whether it would be possible to start a visualization process (using GrADS) in parallel.

GrADS is mainly used to visualize weather or climate forecasts or atmospheric analysis. GrADS uses the grads visualization language—or in other words—small .gs script files as we have defined them within the par list in the snippet above. Within the worker you can actually do whatever you need. In this example we would like to execute a (linux) binary grads given some additional arguments.

To execute system-binaries the system call can be used within python. Therefore, the worker has to be tuned (in contrast to the very simple worker shown above).

# ----------------------------------------------------------------
# This is the worker function which will be used to run the tasks.
# Using pythons "system" call to execute the command.
# ----------------------------------------------------------------
def worker( x ):
   system("grads -blx -c run %s" % x["gsfile"])
   return # Returns "None"

Please note that this is not the way to go. Especially as you do not have any control of possible outputs or errors procuded by the system calls. An alternative is using pythons subprocess library.

Advantages:

  • proper error handling
  • stdout and stderr can be fetched AND analyzed/processed if needed. If required you can directly fetch the output of the script and process it in python. Or you analize the error output and take further action such as e.g., sending error log e-mails, restart the process, start a fallback procedure, …

subprocess.Popen takes the command in a list format. An example: if you would call ls -l in the call would be subprocess.Popen(['ls','-l'],...). In this example we would like to call something like grads -blx -c run t2m.gs:

# ----------------------------------------------------------------
# This is the worker function which will be used to run the tasks.
# Uses subprocess.Popen and reports (returns) stdout, stderr,
# and the returncode of the executed command.
# ----------------------------------------------------------------
def worker( x ):
   import subprocess as sub
   import os
   myenv = os.environ.copy()
   p = sub.Popen(['grads','-lubcx',x["gsfile"]], \
                  cwd="/path/where/grads/should/be/executed",env=myenv, \
                  stdin=sub.PIPE,stdout=sub.PIPE,stderr=sub.PIPE)
   gradsout,gradserr = p.communicate()
   # Returns a dict which consists of 'out' (subprocess stdout),
   # 'err' (subprocess stderr), and 'pc' which is the subprocess
   # return code. A returncode of 0 typically indicates that the execution
   # was successful, values !=0 indicate a problem (depending on the executable)
   return( {'out':gradsout,'err':gradserr,'rc':p.returncode} )

CODE
code python