Common Uses for HTC

Monte Carlo Experiments

Monte Carlo experiments or simulations are so named because they use random inputs to a system to uncover the behavior of that system. (Monte Carlo is a famous gambling site in the French Riviera).

One of the Monte Carlo experiments that is easiest to understand yet quite paradigmatic is the calculation of pi. It begins with the simple calculation. If one has a circle with radius r, the area is pi*r2. A square that encompasses that circle has side length of 2*r and area of 4*r2. Therefore the ratio of the area of the circle over the area of the square is (pi*r2) / (4*r2) = pi/4.

Monte Carlo calculation of pi is the equivalent of throwing a dart at the square many times, then dividing the number of times it hit the circle by the total number of throws. If our dart thrower is bad enough to produce a spread of darts that is randomly and uniformly distributed across the entire square, that ratio would be pi/4. A more skilled dart thrower would produce a non-uniform distribution, which would render the simulation useless.

This task can easily be broken down in an embarrassingly parallel fashion. Each calculation of the "dart throw" is independent of the others, so we can distribute the work to as many independent processes as we like.

The main trick with this and other Monte Carlo simulations is making sure that the input to the calculation is as random as it can be and of the correct statistical distribution (uniform in this case). While it is impossible to generate truly random numbers on a computer, generation of pseudo-random numbers is well developed and provided by most programming languages. Calculation of large groups of pseudo-random numbers that is as close to truly random as possible is a major part of Monte Carlo simulation.

Caution

The standard C libraries include older random number functions rand() and srand(). These functions are considered obsolete because they produce relatively predictable and frequently repeating sequences. They have been superseded by random() and srandom(), which generate much higher quality pseudo-random sequences. Many other languages also offer multiple pseudo-random number generators of varying quality, so be sure to find out which ones will work best for your purposes.

This task is easily broken up into independent smaller tasks. Hence, a simple approach to improve our results without taking more time would be running many simulations on different computers at the same time, and then averaging the results of all of them.

However, since pseudo-random number generators follow a predictable sequence, we have to be careful with this approach.

If we split this job into N processes, each one generating 500,000 random points, and the pseudo-random number generator started with the same value for each, then each process would generate the exact same pseudo-random number sequence and ultimately the exact same estimate for pi! Averaging the results of this parallel simulation would therefore produce exactly the same answer as each individual process.

The solution to this problem is to use a different seed for each process. The seed serves as a starting point for the pseudo-random number sequence, so if we use a different seed for each process, we will get different results. In C, the pseudo-random numbers generated are integers, so some commonly used seeds are the current system clock and the process ID.

srandom((unsigned int)time(NULL));
srandom((unsigned int)getpid());
            

Below is a simple C program that approximates pi given a number of "dart throws".

/***************************************************************************
 *  Description:
 *      Estimate PI using Monte Carlo method.
 *
 *  Returns:
 *      NA
 *
 *  History: 
 *  2012-06-29  Jason Bacon Derived from calcpi-parallel.c
 ***************************************************************************/

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <sysexits.h>
#include <time.h>

#define TOTAL_POINTS    100000000

int     main(int argc, char *argv[])

{
    int     i;
    int     inside_circle;

    double  x;
    double  y;
    double  distance_squared;

    // Initialize counters
    inside_circle = 0;

    // Initialize pseudo-random number sequence with a different value
    // each time
    srandom(time(NULL));
    
    // Compute X random points in the quadrant, and compute how many
    // are inside the circle
    for (i = 0; i < TOTAL_POINTS; i++)
    {
        // random() and RAND_MAX are integers, so integer division will
        // occur unless we cast to a real type
        x = (double)random() / RAND_MAX;
        y = (double)random() / RAND_MAX;

        // No need to compute actual distance.  We need only know if
        // it's < 1 and it will be if distance_squared < 1.
        distance_squared = x * x + y * y;
        if (distance_squared < 1.0)
            inside_circle++;
    }

    printf("Inside circle: %d  Total: %d  PI ~= %f\n",
        inside_circle, TOTAL_POINTS, (double)inside_circle / TOTAL_POINTS * 4);

    return EX_OK;
}

Caution

The time() function has a resolution of seconds, so assuming the clocks of all the computers are in sync, it is likely that it will return the same value to many of the processes in a job array, which are all started at about the same time. Hence, many processes would produce duplicate results, rendering all but one of them useless.

Although very unlikely, the process ID returned by getpid() could by sheer coincidence be the same for processes running on different computers, since it is the Unix PID, not scheduler job/task/process ID.

For job arrays, using the scheduler job/task/process ID is a simple way to ensure a different value for every process.

Shown below is another version of the C program, calcpi-parallel.c. There are three command-line arguments as inputs:

shell-prompt: ./calcpi-parallel points process-index total-processes
            

We made the number of points a command-line argument rather than a constant in the program so that we don't have to recompile in order to run a different experiment.

The process index provides a different seed to each process. The total-processes argument is not necessary, but is useful in the program to check for sanity errors in the command such as in invalid process index. Requiring a little redundancy from the end-user can often help catch mistakes that would be hard to track down later.

The output of calcpi-parallel is the counts of points inside the circle and the total number of points. We need a separate program that averages all this output to approximate pi.

/***************************************************************************
 *  Description:
 *      Estimate PI using Monte Carlo method.  Multiple instances of
 *      this program are run in parallel, and results of all are used
 *      to estimate the value of PI.
 *
 *  Arguments:
 *      argv[1]: index for this process
 *      argv[2]: total number of processes
 *
 *  Returns:
 *      NA
 *
 *  History: 
 *  Date        Name        Modification
 *  2011-08-15  Lars Olson  Begin
 *  2011-09-27  Jason Bacon Replace rand()/srand() with random()/srandom()\
 *                          Add usage message
 *  2012-06-29  Jason Bacon Use different seed for each process to simplify
 *                          code.  Add comments and use more descriptive
 *                          variable names.
 ***************************************************************************/

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <sysexits.h>

void    usage(char *progname)

{
    fprintf(stderr, "Usage: %s total-points process-index total-processes\n", progname);
    exit(EX_USAGE);
}


int     main(int argc, char *argv[])

{
    int     process_index;
    int     total_processes;
    // Unsigned long can be either 32 or 64 bits and is limited to about
    // 4.3 billion if the compiler defaults to 32-bits.
    // Unsigned long long (64 or 128 bits) allows the number of points
    // to be at least 2^64-1, or 1.84*10^19.
    unsigned long long  total_points,
                        inside_circle,
                        i;

    double  x;
    double  y;
    double  distance_squared;

    char    *end_ptr;
    
    // Make sure program is invoked with 2 command line arguments
    // argc = 3 for program name + arguments
    if ( argc != 4 )
        usage(argv[0]);

    // First command line argument is which job out of the 10 it is
    total_points = strtouq(argv[1], &end_ptr, 10);
    
    // Make sure whole argument was a valid integer
    if ( *end_ptr != '\0' )
    {
        fprintf(stderr, "Error: Argument 1 (%s) is not an integer.\n", argv[1]);
        usage(argv[0]);
    }
    
    // First command line argument is which job out of the 10 it is
    process_index = strtol(argv[2], &end_ptr, 10);
    
    // Make sure whole argument was a valid integer
    if ( *end_ptr != '\0' )
    {
        fprintf(stderr, "Error: Argument 1 (%s) is not an integer.\n", argv[1]);
        usage(argv[0]);
    }
    
    // Total number of processes in the job, 10 in our examples.
    total_processes = strtol(argv[3], &end_ptr, 10);

    // Make sure whole argument was a valid integer
    if ( *end_ptr != '\0' )
    {
        fprintf(stderr, "Error: Argument 2 (%s) is not an integer.\n", argv[2]);
        usage(argv[0]);
    }
    
    if ( total_processes < 1 )
    {
        fputs("Total processes must be > 0.\n", stderr);
        exit(EX_USAGE);
    }
    
    if ( (process_index < 1) || (process_index > total_processes) )
    {
        fputs("Process index must be between 1 and total processes.\n", stderr);
        exit(EX_USAGE);
    }
    
    // Use a different seed for each process so that they don't all
    // compute the same pseudo-random sequence!
    srandom(process_index);
    
    // Initialize counters
    inside_circle = 0;

    // Compute X random points in the quadrant, and compute how many
    // are inside the circle
    for (i = 0; i < total_points; i++)
    {
        // random() and RAND_MAX are integers, so integer division will
        // occur unless we cast to a real type
        x = (double)random() / RAND_MAX;
        y = (double)random() / RAND_MAX;

        // If distance squared < or > 1.0, then distance < or > 1.0, so don't
        // waste time calculating the square root.
        distance_squared = x * x + y * y;
        if (distance_squared < 1.0)
            inside_circle++;
    }

    // %q wants unsigned long long, which can be 64 or 128 bits
    // Cast the uint64_t variables to silence gcc warnings
    printf("%llu %llu\n", inside_circle, total_points);
    
    // Debugging only: not normal output used by scripts
    // Comment out before running job
    // printf("%f\n", (double)inside_circle / total_points * 4.0);
    
    return EX_OK;
}

Software Performance Optimization

Software efficiency is a huge part of high performance and high throughput computing.

Improvements to software often result in an order of magnitude or more reduction in run time and in many cases make the use of parallel computing unnecessary.

Optimizing software before consuming expensive parallel computing resources can therefore be an ethical matter in many cases.

A little understanding of computer hardware can go a long way toward improving performance.

One prime example is understanding the difference between integers and floating point. Floating point types such as float and double (real and double precision in Fortran) are inherently more complex than integers and therefore require more time for operations such as addition and multiplication.

Floating point types are stored in a format similar to scientific notation. As you may recall, adding scientific notation values is a three-step process:

  1. Equalize the exponents
  2. Add the mantissas
  3. Normalize the result

Hence, we might expect floating point addition to take about three times as long as integer addition. Due to optimizations in floating point hardware, the difference is not that great, but it can be significant.

It pays to examine your program code and consider whether the use of floating point is really necessary. In most cases, including our calcpi program, it is not. The random() function in C produces integer values between 0 and RAND_MAX, a constant defined in the header file stdlib.h. We can just as easily perform these calculations using only integers, if we use quadrant dimensions of RAND_MAX x RAND_MAX instead of 1 x 1 and reduce the units of X and Y, so that all of our X and Y values are integers between 0 and RAND_MAX.

A modified version of our calcpi program using only integers is shown below:

/***************************************************************************
 *  Description:
 *      Estimate PI using Monte Carlo method.  Multiple instances of
 *      this program are run in parallel, and results of all are used
 *      to estimate the value of PI.
 *
 *  Arguments:
 *      argv[1]: index for this process
 *      argv[2]: total number of processes
 *
 *  Returns:
 *      NA
 *
 *  History: 
 *  Date        Name        Modification
 *  2011-08-15  Lars Olson  Begin
 *  2011-09-27  Jason Bacon Replace rand()/srand() with random()/srandom()\
 *                          Add usage message
 *  2012-06-29  Jason Bacon Use different seed for each process to simplify
 *                          code.  Add comments and use more descriptive
 *                          variable names.
 ***************************************************************************/

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <sysexits.h>
#include <inttypes.h>

void    usage(char *progname)

{
    fprintf(stderr, "Usage: %s total-points process-index total-processes\n", progname);
    exit(EX_USAGE);
}


int     main(int argc, char *argv[])

{
    int     process_index;
    int     total_processes;
    // Unsigned long can be either 32 or 64 bits and is limited to about
    // 4.3 billion if the compiler defaults to 32-bits.
    // Unsigned long long (64 or 128 bits) allows the number of points
    // to be at least 2^64-1, or 1.84*10^19.
    // The printf() function has no standard placeholders for uint64_t,
    // so we'll use unsigned long long here to avoid compiler warnings.
    // This won't affect performance much, since these variables are only
    // incremented in the main loop.
    unsigned long long  total_points,
                        inside_circle,
                        i;

    // random() returns 32-bit values.  The square of a 32-bit value can
    // be up to 64-bits long, so make sure the program uses 64-bit integers
    // on all processors and for all calculations.
    uint64_t    x;
    uint64_t    y;
    uint64_t    distance_squared,
                // RAND_MAX is a 32-bit integer, so cast it to 64 bits
                // before multiplying to avoid a truncated result.
                rand_max_squared = (uint64_t)RAND_MAX * (uint64_t)RAND_MAX;

    char    *end_ptr;
    
    // Make sure program is invoked with 2 command line arguments
    // argc = 3 for program name + arguments
    if ( argc != 4 )
        usage(argv[0]);

    // First command line argument is which job out of the 10 it is
    total_points = strtouq(argv[1], &end_ptr, 10);
    
    // Make sure whole argument was a valid integer
    if ( *end_ptr != '\0' )
    {
        fprintf(stderr, "Error: Argument 1 (%s) is not an integer.\n", argv[1]);
        usage(argv[0]);
    }
    
    // First command line argument is which job out of the 10 it is
    process_index = strtol(argv[2], &end_ptr, 10);
    
    // Make sure whole argument was a valid integer
    if ( *end_ptr != '\0' )
    {
        fprintf(stderr, "Error: Argument 1 (%s) is not an integer.\n", argv[1]);
        usage(argv[0]);
    }
    
    // Total number of processes in the job, 10 in our examples.
    total_processes = strtol(argv[3], &end_ptr, 10);

    // Make sure whole argument was a valid integer
    if ( *end_ptr != '\0' )
    {
        fprintf(stderr, "Error: Argument 2 (%s) is not an integer.\n", argv[2]);
        usage(argv[0]);
    }
    
    if ( total_processes < 1 )
    {
        fputs("Total processes must be > 0.\n", stderr);
        exit(EX_USAGE);
    }
    
    if ( (process_index < 1) || (process_index > total_processes) )
    {
        fputs("Process index must be between 1 and total processes.\n", stderr);
        exit(EX_USAGE);
    }
    
    // Use a different seed for each process so that they don't all
    // compute the same pseudo-random sequence!
    srandom(process_index);
    
    // Initialize counters
    inside_circle = 0;

    // Compute X random points in the quadrant, and compute how many
    // are inside the circle
    for (i = 0; i < total_points; i++)
    {
        x = (uint64_t)random();
        y = (uint64_t)random();

        // If distance squared is < or > randmax_squared, then distance is
        // < or > randmax, so don't waste time calculating the square root.
        distance_squared = x * x + y * y;
        if (distance_squared < rand_max_squared)
            inside_circle++;
    }

    // %q wants unsigned long long, which can be 64 or 128 bits
    printf("%llu %llu\n", inside_circle, total_points);
    
    // Debugging only: not normal output used by scripts
    // Comment out before running job
    // printf("%f\n", (double)inside_circle / total_points * 4.0);
    
    return EX_OK;
}

Timing this program on an AMD64 system shows that it is more than twice as fast as the earlier floating point version.

Note, however, that this calcpi requires the use of 64-bit integers. 32-bit processors can only do integer operations of 32 bits at a time, although most can to 64-bit floating point operations all at once.

When using 64-bit integers on a 32-bit processor, we don't see the same benefit, since the 32-bit processor has to do the 64-bit integer operations in two steps. However, on a 32-bit Pentium 4 processor, the integer calcpi still performed about 25% faster than the floating point version. If you're looking at a month of calculations, the switch to integer arithmetic would still knock off about a week.

Furthermore, many programs can get by with 32-bit integers, in which case you'll realize the full performance benefit even on 32-bit hardware.

Calcpi on a SLURM Cluster

When compiling a program on a cluster or grid, it should be compiled on the same node(s) it will run on to ensure that it is built and run in the exact same environment and in the presence of the exact same libraries and compiler tools. The head node necessarily has different software installed, which may cause a program to be compiled differently than it would be on a compute node. Hence, we use a simple sbatch script to do the compilation:

#!/bin/sh -e

# Show commands in output
set -x
cc -O -o calcpi-parallel-integer calcpi-parallel-integer.c

peregrine: sbatch calcpi-parallel-build.sbatch
                

To run the program in embarrassingly parallel fashion, we use a facility in SLURM and other queuing systems to run multiple instances of a single program, where each can receive different command line arguments and/or inputs. Consider the following SLURM submit script:

#!/bin/sh -e

#SBATCH --array=1-20
#SBATCH --output=calcpi-parallel-%a.out
#SBATCH --error=calcpi-parallel-%a.err

points=1000000000

# Make sure last argument matches total number of jobs in array!
./calcpi-parallel-integer $points $SLURM_ARRAY_TASK_ID 20

This script starts up an embarrassingly parallel job, known as a job array. The submit script is executed almost simultaneously on multiple cores in the cluster, one for each value in the range specified with --array.

The element ${SLURM_ARRAY_TASK_ID} is a reference to an environment variable which is set by the SLURM scheduler to a different value for each job in the job array. With --array=1-10, SLURM_ARRAY_TASK_ID will be 1 for one process, 2 for another, and so on up to 10.

Each process in the job array will also produce a separate output and error file with the value of SLURM_ARRAY_TASK_ID appended. I.e., there will be files called calcpi-parallel-1.out, calcpi-parallel-2.out, etc.

After all the processes in the job array complete, the output can be tallied using a simple script such as the following:

#!/bin/sh

# If this is non-trivial, it should be done by submitting another job
# rather than run on the submit node.

# Results are on last line of each output file calcpi-parallel.out-1, etc.
# Both the SLURM and HTCondor scripts must name their output files to match
# this script.

# Send last line of each output file through a simple awk script that
# sums up each column and prints the quotient of the two sums.
tail -1 -q calcpi-parallel-*.out | awk \
'BEGIN  {
            in_circle=0;
            total=0;
        }
        {
            in_circle+=$1;
            total+=$2;
        }
END     {
            printf("%0.20f\n", in_circle / total * 4.0);
        }'

Note

If a script such as the tally script above is trivial, some users may choose to run it on the head node or on their own computer after transferring the results from the cluster. If it requires any significant amount of CPU time or memory, however, it should be scheduled as a batch-serial job.

Another script (to be run directly, not submitted to the scheduler) could be used to clean up from previous runs and submit the job again:

#!/bin/sh -e

##########################################################################
#   Script description:
#       Run a set of SLURM jobs to estimate PI, and combine the results.
#       An alternative to this approach is a DAG.
#
#   Arguments:
#       None.
#
#   Returns:
#       0, or status of first failed command.
#
#   History:
#   Date        Name        Modification
#   2013-01-29  Jason Bacon Begin
##########################################################################

##########################################################################
#   Main
##########################################################################

# Remove old output and log files
./clean

# Build program
sbatch calcpi-parallel-build.sbatch

# Submit computational jobs
sbatch calcpi-parallel-run.sbatch

Finally, the directory can be cleaned up using yet another script that we run directly:

#!/bin/sh

rm -f calcpi-parallel *.out* *.err* \
    calcpi-parallel.[0-9]* \
    calcpi-parallel-condor.log

Calcpi on an HTCondor Grid

Recall from Chapter 10, Job Scheduling with HTCondor that HTCondor is a scheduling tool like SLURM, PBS or LSF. Unlike most schedulers, which are geared toward HPC clusters built entirely with dedicated hardware, HTCondor is specifically designed for grids that utilize a variety of hardware owned by a variety of people or departments, such desktop machines throughout your institution.

On the cluster, where all compute nodes run the same operating system and have access to the same files, we ran a separate build script to compile our code on one node, and then ran that executable on all nodes.

Since grids do not typically have shared file space, and are often heterogeneous (use different hardware and/or operating systems on various execute hosts), our approach to running code must be different.

If we are running software that's preinstalled on the HTCondor compute hosts, as is often the case with programs like Octave and R, we must be aware that it may not behave identically on all hosts. Some hosts may be running a 32-bit operating system, limited to a few gigabytes of RAM, while others run 64-bit operating systems. There may be different versions of the software installed on different hosts.

These are additional issues we may need to deal with when using an HTCondor grid, but they are typically not difficult to resolve.

Note

In HTCondor lingo, an execute host is equivalent to what we call compute nodes in a cluster.

Since we typically have no shared file system, we may need to transfer the program and input files to every execute host we use, and bring back any output files when the processes are done. All of this is handled automatically by HTCondor.

Since the execute hosts may have different hardware and operating systems, we can't compile the program on one of them and expect it to run on all of them. Instead, it's a common practice to transfer the source code to each execute host and compile it there as part of the job. This ensures that every binary file is compatible with the host it's running on.

An HTCondor description file for running calcpi-parallel.c might appear as follows:

############################################################################
# Sample condor submit description file.
#
# Use \ to continue an entry on the next line.
#
# You can query your jobs by command:
# condor_q 

############################################################################
# Choose which universe you want your program is running with 
# Available options are 
#
# - standard:
#      Defaults to transfer executables and files.
#      Use when you are running your own script or program.
#   
# - vanilla:
# - grid:
#      Explicitly enable file transfer mechanisms with
#      'transfer_executable', etc.
#      Use when you are using your own files and some installed on the
#      execute hosts.
#
# - java:
#      Explicitly enable file transfer mechanism. Used for java jobs.
#
# - scheduler 
# - local 
#
# - parallel:
#      Explicitly enable file transfer mechanism. Used for MPI jobs.
#
# - vm 
#      Refer http://research.cs.wisc.edu/condor/manual/v7.6/2_4Road_map
#      Running.html for detailes.

universe = vanilla 

# Macros (variables) to use in this submit description file
Points = 1000000000
Process_count = 100

############################################################################
# Specify the executable filename.  This can be a binary file or a script.
# NOTE: The POVB execute hosts currently support 32-bit executables only.

executable = calcpi-parallel-condor.sh

# Command-line arguments for the execute command
# arguments = 

############################################################################
# Set environment variables for use by the executable on the execute hosts.
# Enclose the entire environment string in quotes.
# A variable assignment is var=value (no space around =).
# Separate variable assignments with whitespace.

environment = "Process=$(Process) Process_count=$(Process_count) Points=$(Points)"

############################################################################
# Where the standard output and standard error from executables go.
# $(Process) is current job ID.

# We run calcpi-parallel under both SLURM and Condor, so we use same output
# names here as in calcpi-parallel-run.sbatch so that we can use the same
# script to tally all the outputs from any run.

output = calcpi-parallel-$(Process).out
error = calcpi-parallel-$(Process).err

############################################################################
# Logs for the job, produced by condor.  This contains output from
# Condor, not from the executable.

log = calcpi-parallel-condor.log 

############################################################################
# Custome job requirements 
# Condor assumes job requirements from the host submitting job. 
# IT DOES NOT DEFAULT TO ACCEPTING ANY ARCH OR OPSYS!!!
# For example, if the jobs is submitted from peregrine, target.arch is
# "X86_64" and target.opsys is "FREEBSD8", which do not match
# POVB execute hosts.
#
# You can query if your submitting host is accepted by command:
#  condor_q -analyze

# Memory requirements in megabytes
request_memory = 1000

# Requirements for a binary compiled on 32-bit CentOS 4 (POVB hosts):
# requirements = (target.arch == "INTEL") && (target.opsys == "LINUX")

# Requirements for a Unix shell script or Unix program compiled on the
# execute host:
requirements = ((target.arch == "INTEL") || (target.arch == "X86_64")) && \
               ((target.opsys == "FREEBSD") || (target.opsys == "LINUX"))

# Requirements for a job utilizing software installed via FreeBSD ports:
# requirements = ((target.arch == "INTEL") || (target.arch == "X86_64")) && \
#    (target.opsys == "FREEBSD")

############################################################################
# Explicitly enable executable transfer mechanism for vanilla universe.

# true | false
transfer_executable = true

# yes | no | if_needed
should_transfer_files = if_needed

# All files to be transferred to the execute hosts in addition to the
# executable.
transfer_input_files = calcpi-parallel.c

# All files to be transferred back from the execute hosts in addition to
# those listed in "output" and "error".
# transfer_output_files = file1,file2,...

# on_exit | on_exit_or_evict
when_to_transfer_output = on_exit 

############################################################################
# Specify how many jobs you would like to submit to the queue.

queue $(Process_count)

The executable script:

#!/bin/sh -e

# Use Bourne shell (/bin/sh) tomaximize portability of this script.
# There is no guarantee that other shells will be installed on a given host.

# Use /bin/sh -e to quit on the first error, so we don't try to run
# subsequent commands after a command in this script already failed.

# This script is meant to run on any Unix system (FreeBSD, Linux, Solaris,
# etc.), so use only generic, portable Unix commands and flags.
# (e.g. cc instead of gcc or icc).

# Bourne shell has a bare-bones PATH that won't find some add-on software.
# Also, startup scripts are not executed by Bourne shells under condor,
# so local additions to PATH in /etc/* and ~/.* will not be picked up.
# If you know the path of programs you use on certain hosts, add it here.
# /usr/libexec      Dependencies for some standard tools
# /usr/local/bin    FreeBSD ports
# /opt/local/bin    MacPorts
# /sw/bin           Fink
PATH=${PATH}:/usr/libexec:/usr/local/bin:/opt/local/bin:/sw/bin
export PATH

# Some slots may be on the same execute node so use a different executable
# name for each process.  Compilation could fail if both compiles try
# to write the same executable file at the same time.
cc -o calcpi-parallel.$Process calcpi-parallel.c

# Condor process IDs start at 0, and calcpi-parallel expects indexes
# to start at one.
./calcpi-parallel.$Process $Points $((Process+1)) $Process_count

If we have designed our scripts carefully, so that they use the same output filename, etc., then we can use the same tally and cleanup scripts that we used with SLURM:

#!/bin/sh

# If this is non-trivial, it should be done by submitting another job
# rather than run on the submit node.

# Results are on last line of each output file calcpi-parallel.out-1, etc.
# Both the SLURM and HTCondor scripts must name their output files to match
# this script.

# Send last line of each output file through a simple awk script that
# sums up each column and prints the quotient of the two sums.
tail -1 -q calcpi-parallel-*.out | awk \
'BEGIN  {
            in_circle=0;
            total=0;
        }
        {
            in_circle+=$1;
            total+=$2;
        }
END     {
            printf("%0.20f\n", in_circle / total * 4.0);
        }'

#!/bin/sh

rm -f calcpi-parallel *.out* *.err* \
    calcpi-parallel.[0-9]* \
    calcpi-parallel-condor.log

A simple shell script can also be used to automate the execution of the HTCondor jobs:

#!/bin/sh -e

##########################################################################
#   Script description:
#       Run a set of condor jobs to estimate PI, and combine the results.
#       An alternative to this approach is a DAG.
#
#   Arguments:
#       None.
#
#   Returns:
#       0, or status of first failed command.
#
#   History:
#   Date        Name        Modification
#   2013-01-29  Jason Bacon Begin
##########################################################################

##########################################################################
#   Main
##########################################################################

# Remove old output and log files
./clean

# Submit computational jobs
condor_submit calcpi-parallel.condor

As an alternative to the shell script above, there is also a companion tool for HTCondor called DAGMan, which can be used to automate work flows (sequences of HTCondor jobs and other tasks). DAGMan uses DAGs (Directed Acyclic Graphs) to represent work flows. Some users may find this visual approach preferable to a script, especially for complex work flows.

Parameter Sweeps and File Transfer

One way in which the previous calcpi job is efficient is that it minimizes the amount of traffic passed throughout the cluster or grid. Ideally, only N small files each with 2 integers in them were passed back to the head node or written to the shared file system of the cluster or grid. This was made possible by the fact that the calcpi program generates its own input in the form of pseudo-random numbers.

That kind of efficiency is not always possible and oftentimes one needs to pass input files from the head node to those working on the jobs.

An example of this is illustrated by a class of problems known as parameter sweeps. A parameter sweep is similar to a Monte Carlo simulation, in that the same program is run on a variety of inputs.

However, the inputs in a parameter sweep are not usually random, nor are they necessarily even computable by the program. The inputs might be a range of values, and hence computable, or they may consist of vast amounts of previously collected or generated data stored in files.

In some cases, we know the correct output and need to find the input or inputs that will produce it. An example of this would be password cracking. Passwords are stored in a non-reversible encrypted form, which is often easy to obtain by reading a password file or eavesdropping on a network connection. It is not possible to decrypt a password directly (convert the encrypted form back to the raw form). However, it is relatively easy to encrypt (convert the raw form to the encrypted form), although such one-way encryption algorithms are deliberately designed to be expensive in order to slow down parameter sweeps aimed at password cracking.

In other cases, we might be given a vast amount of data collected by a scientific instrument such as a telescope, a DNA sequencer, or a microphone. Vast amounts of data could also come in the form of a large collection of electronic documents such as research papers, court documents, or medical records.

In cases like this, we will need to distribute the data to the compute nodes so that each process can work on a portion of the inputs.

When working on a cluster with a shared file system, we may be able to get away with leaving all the data in a shared directory where all of the processes can access it. If the amount of data and the number of processes working on it are very large, however, access to the shared file system can become a serious bottleneck. This will not only defeat the purpose of parallel computing for you, but will also seriously impact other users on the cluster.

In cases like this, it would be better to prestage the data, i.e. divvy it out to local disks on the compute nodes before computations begin. This allows the individual disks on the compute nodes to work in parallel as do the CPUs and memory.

Prestaging is especially important if the data files will be read more than once. However, even if they'll only be read once, prestaging properly will prevent impacting the shared file system while the job runs.

Note, however, that individual disks on compute nodes are typically slower than the RAID arrays used for shared space. Also, compute nodes with many cores may have processes competing for the same local disks. All these factors must be considered when deciding how to prestage the data and how to distribute the processes across the nodes. For example, when running a data-intensive parallel job in a SLURM environment, we might ask the scheduler to run one process per node to ensure that each process is using a different local disk.

On a grid with no shared file system, prestaging the data is a requirement in all cases. For this reason, HTCondor has built-in facilities for file transfer both to and from the execute hosts.

Calcpi-file on a SLURM Cluster

For the sake of efficient learning, we will reuse the calcpi example to illustrate file transfer. In reality, no one would choose to provide inputs to calcpi this way, but using a familiar example should make it easier for the reader to follow, since new material is limited to just the process of using input files.

For this example we will calculate the random numbers first, and stores them in N files. Then we submit an array of N processes running a modified calcpi program that reads an input file instead of generating its own random numbers and outputs the same two numbers as the previous calcpi program does. There are several steps in this overall work flow:

  1. Create N data files containing different pseudo-random number sequences.
  2. Create a new submission script to run a job array with N different file names as inputs.
  3. If there is no shared file system, or if using a shared file system would cause a bottleneck, prestage the N files to the compute nodes (or execute hosts).
  4. Submit the job(s).
  5. Tally the results.

This new work flow requires a new program called calcpi-file.c.

/***************************************************************************
 *  Description:
 *      Estimate PI using Monte Carlo method.
 *
 *  Returns:
 *      NA
 *
 *  History: 
 *  2012-07-07  Jason Bacon Derived from calcpi-parallel.c
 ***************************************************************************/

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <string.h>
#include <sysexits.h>

void    usage(char *progname)

{
    fprintf(stderr, "Usage: %s filename\n", progname);
    exit(EX_USAGE);
}


int     main(int argc, char *argv[])

{
    unsigned long long  i,
                        inside_circle,
                        total_points;
    extern int  errno;

    double  x;
    double  y;
    double  distance_squared;

    char    *filename;
    
    FILE    *points_file;
    
    if ( argc != 2 )
        usage(argv[0]);
    
    // No need to copy the string, just point to it
    filename = argv[1];
    
    points_file = fopen(filename, "r");
    if ( points_file == NULL )
    {
        fprintf(stderr, "Could not open %s: %s\n", filename, strerror(errno));
        exit(EX_NOINPUT);
    }
    
    if ( fscanf(points_file, "%llu\n", &total_points) != 1 )
    {
        fprintf(stderr, "Failed to read total_points from %s: %s\n",
            filename, strerror(errno));
        exit(EX_NOINPUT);
    }
    
    // Initialize counters
    inside_circle = 0;

    // Compute X random points in the quadrant, and compute how many
    // are inside the circle
    for (i = 0; i < total_points; ++i)
    {
        if ( fscanf(points_file, "%lg %lg\n", &x, &y) != 2 )
        {
            fprintf(stderr, "Failed to read point %llu from %s: %s\n",
                i, filename, strerror(errno));
            exit(EX_NOINPUT);
        }
        
        distance_squared = x * x + y * y;
        if (distance_squared < 1.0)
            inside_circle++;
    }
    
    // Error checking an fclose() is really only useful when writing to
    // a file, but we do it anyway to promote good habits.
    if ( fclose(points_file) != 0 )
    {
        fprintf(stderr, "Failed to close %s: %s\n",
            filename, strerror(errno));
        exit(EX_NOINPUT);
    }
    
    printf("%llu %llu\n", inside_circle, total_points);

    return EX_OK;
}

This program is executed as follows:

peregrine: ./calcpi-file rand1.txt
                

An example input file:

10
0.17667887507783197609 0.07581055493830263226
0.19486713744460937292 0.98018012101770379818
0.83923330383339589389 0.04710093003097033659
0.01152008865565065654 0.36655766114897919694
0.01075634686777198097 0.44170310275708468684
0.59835347514522885248 0.87296829506427431333
0.99172840173902376826 0.05354796585326453834
0.59876983128430782966 0.59682797808052412414
0.12375027133326524376 0.20448832502797634203
0.81670404123920203876 0.59261837768956016070

Note

This program doesn't need to receive the number of data points as an argument. That information is determined by reading the input file.

The input files are generated by a separate program:

/***************************************************************************
 *  Description:
 *      Generate random (x,y) tuples.
 *
 *  Usage:
 *      gen-points count > file
 *
 *  Returns:
 *      NA
 *
 *  History: 
 *  2012-07-07  Jason Bacon Derived from calcpi.c
 ***************************************************************************/

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <sysexits.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>

void    usage(char *arg0)

{
    fprintf(stderr, "Usage: %s count\n", arg0);
    exit(EX_USAGE);
}


int     main(int argc, char *argv[])

{
    int     i;
    int     total_points;

    double  x;
    double  y;
    
    char    *end_ptr;

    if ( argc != 2 )
        usage(argv[0]);
    
    // Total number of points in the job, 10 in our examples.
    total_points = strtol(argv[1], &end_ptr, 10);

    // Make sure whole argument was a valid integer
    if ( *end_ptr != '\0' )
    {
        fprintf(stderr, "Error: Argument 1 (%s) is not an integer.\n",
            argv[1]);
        usage(argv[0]);
    }
    
    if ( total_points < 1 )
    {
        fputs("Total points must be > 0.\n", stderr);
        exit(EX_USAGE);
    }
    
    // First line of output is the total number of points
    // Not strictly necessary, but provides a level of error-detection
    // in case a file is truncated.
    printf("%d\n", total_points);

    // Initialize pseudo-random number sequence with a different value
    // each time
    srandom((unsigned long)getpid());
    
    // Compute X random points in the quadrant, and compute how many
    // are inside the circle
    for (i = 0; i < total_points; i++)
    {
        // random() and RAND_MAX are integers, so integer division will
        // occur unless we cast to a real type
        x = (double)random() / RAND_MAX;
        y = (double)random() / RAND_MAX;

        // Print to 20 sig-figs, slightly more than most CPUs can use
        printf("%0.20f %0.20f\n", x, y);
    }

    return EX_OK;
}

We can generate N input files with a simple script such as the following:

#!/bin/sh

printf "Compiling...\n"
cc -O -o gen-points gen-points.c
i=0
while [ $i -lt 10 ]; do
    printf "Generaring rand$i.txt...\n"
    ./gen-points 100 > rand$i.txt
    i=$((i+1))
done

The SLURM submission script:

#!/bin/sh -e

#SBATCH --output=calcpi-parallel.out
#SBATCH --error=calcpi-parallel.err
#SBATCH --array=1-10

./calcpi-file rand$SLURM_ARRAY_TASK_ID.txt

Calcpi-file on an HTCondor Grid

To run a job like this on an HTCondor grid, the trick is to transfer the appropriate input file(s) to each of the execute hosts.

Since we're compiling the C program on the execute host, we use a shell script as the executable, and list the source file as an input file, along with the actual input file for the program. The shell script is executed on the execute host, where it first compiles the program and then runs it. Below is the job description file:

############################################################################
# Sample HTCondor submit description file.
#
# Use \ to continue an entry on the next line.
#
# You can query your jobs by command:
# condor_q 

############################################################################
# Choose which universe you want your program is running with 
# Available options are 
#
# - standard:
#      Defaults to transfer executables and files.
#      Use when you are running your own script or program.
#   
# - vanilla:
# - grid:
#      Explicitly enable file transfer mechanisms with
#      'transfer_executable', etc.
#      Use when you are using your own files and some installed on the
#      execute hosts.
#
# - parallel:
#      Explicitly enable file transfer mechanism. Used for MPI jobs.

universe = vanilla 

# Macros (variables) to use in this submit description file
Process_count = 10

############################################################################
# Specify the executable filename.  This can be a binary file or a script.
# NOTE: The POVB execute hosts currently support 32-bit executables only.
# If compiling a program on the execute hosts, this script should compile
# and run the program.
#
# In calcpi-file-condor.sh, be sure to give the executable a different
# name for each process, since multiple processes could be on the same host.
# E.g. cc -O -o prog.$(Process) prog.c

executable = calcpi-file-condor.sh

# Pass the process index so that we can use different inputs for each
arguments = $(Process)

############################################################################
# Where the standard output and standard error from executables go.
# $(Process) is current job ID.

# If running calcpi-file under both PBS and HTCondor, use same output
# names here as in calcpi-file-run.pbs so that we can use the same
# script to tally all the outputs from any run.

output = calcpi-parallel.out-$(Process)
error = calcpi-parallel.err-$(Process)

############################################################################
# Logs for the job, produced by HTCondor.  This contains output from
# HTCondor, not from the executable.

log = calcpi-file-condor.log 

############################################################################
# Custome job requirements 
# HTCondor assumes job requirements from the host submitting job. 
# IT DOES NOT DEFAULT TO ACCEPTING ANY ARCH OR OPSYS!!!
# For example, if the jobs is submitted from peregrine, target.arch is
# "X86_64" and target.opsys is "FREEBSD8", which do not match
# POVB execute hosts.
#
# You can query if your submitting host is accepted by command:
#  condor_q -analyze

# Memory requirements in megabytes
request_memory = 50

# Requirements for a binary compiled on CentOS 4 (POVB hosts):
# requirements = (target.arch == "INTEL") && (target.opsys == "LINUX")

# Requirements for a Unix shell script or Unix program compiled on the
# execute host:
requirements = ((target.arch == "INTEL") || (target.arch == "X86_64")) && \
               ((target.opsys == "FREEBSD") || (target.opsys == "LINUX"))

# Requirements for a job utilizing software installed via FreeBSD ports:
# requirements = ((target.arch == "INTEL") || (target.arch == "X86_64")) && \
#    (target.opsys == "FREEBSD")

############################################################################
# Explicitly enable executable transfer mechanism for vanilla universe.

# true | false
transfer_executable = true

# yes | no | if_needed
should_transfer_files = if_needed

# All files to be transferred to the execute hosts in addition to the
# executable.  If compiling on the execute hosts, list the source file(s)
# here, and put the compile command in the executable script.
transfer_input_files = calcpi-file.c,rand$(Process).txt

# All files to be transferred back from the execute hosts in addition to
# those listed in "output" and "error".
# transfer_output_files = file1,file2,...

# on_exit | on_exit_or_evict
when_to_transfer_output = on_exit 

############################################################################
# Specify how many jobs you would like to submit to the queue.

queue $(Process_count)

The executable, called calcpi-file-condor.sh, would look something like this:

#!/bin/sh

# Make sure the script was invoked with a filename as a command-line argument
if [ $# != 1 ]; then
    printf "Usage: $0 process-ID\n"
    exit 1
fi

# First command-line argument is the process index
process=$1

# Compile
cc -O -o calcpi-file.$process calcpi-file.c

# Pass the argument to this script on to the C program
./calcpi-file.$process rand$process.txt

Self-test
  1. What is a Monte Carlo experiment?
  2. Write a program and a submit script that uses multiple independent processes estimate the probability of rolling a 7 on two dice. This can, or course, be computed directly using simple statistics, but serves as a good exercise for the Monte Carlo method.

    Run the simulation using each of the following parameters. Run with each set of parameters several times, noting the consistency of the results and the computation time for each.

    • One process rolling the dice 100 times.
    • One process rolling the dice 100,000,000 times.
    • Ten processes rolling the dice 1,000,000,000 times each. Make sure each process uses a different random number sequence! (Hint: Use the job array index as a seed.)