Julia for Parallel Processing

Before anything else, we decided to gain some experience in the use of Julia's parallel processing capabilities.

Julia supports multi-threading and distributed computing. Multi-threading allows multiple tasks to run simultaneously on more than one thread or CPU core, sharing memory, while distributed computing runs multiple processes with separate memory spaces, possibly on different machines. Given that Khepri is not thread-safe, we were particularly interested in testing the distributed computing capabilities. These are provided by the Distributed standard library as well as external packages such as MPI.jl and DistributedArrays.jl.

The Distributed approach is based on the idea that one master process launches a set of slave processes, called workers, using the addprocs function, to which it distributes units of work, waiting for their completion. Despite being part of the standard library, the Distributed module must be explicitly loaded on the master process to access the addprocs function. The module is automatically loaded on the worker processes.

An Embarrassingly Parallel Problem

The first tests attempted to determine how the Julia language implementation scales across multiple cores. The following Julia program computes an approximation of π\pi using the classical Monte Carlo approach of sampling points on a square that circumscribes a circle of radius rr. In that case, the area of the circle is πr²\pi r² while the area of the square is (2r)²=4r²(2r)²=4r². The ratio between these areas is πr²4r²=π4\frac{\pi r²}{4r²}=\frac{\pi}{4}, which means that, independently of the radius of the circle, a uniformly distributed sample of points over the square will have π4\frac{\pi}{4} points inside the circle.

Instead of computing the approximation to π\pi (whose value we already know), we preferred to compute the absolute error.

using Statistics

approx_pi(n) =
  let count = 0
    for i in 1:n
      count += (rand()^2 + rand()^2) <= 1
    end
    abs(4*count/n - pi)
  end

Given the just-in-time compilation strategy used by Julia, we decided to do some preliminary computations to force the compilation and only then a benchmark is started. The benchmark computes approximations to π\pi with a series of exponentially increasing number of iterations. We evaluate the time spent with @timev:

# Force compilation:
approx_pi(100)

for i in 1:10
  println("approx_pi($(10^i)):", @timev approx_pi(10^i))
end

In order to run the code, we saved all of it in a file ApproxPi.jl and we created a Slurm batch file named BatchPi.sh, containing the following:

#!/bin/bash

#SBATCH --job-name=SeqPi
#SBATCH --time=2:00:00
#SBATCH --nodes=1
#SBATCH -p ...
#SBATCH -q ...

time ./julia ApproxPi.jl

To launch the Slurm batch file, we simply used sbatch BatchPi.sh. The benchmark results are the following:

It is relevant to note that despite the apparent exponential growth, the scale is logarithmic and, thus, the computation time grows linearly with the number of iterations.

The next step was to repeat the same computation but using parallel processing. To that end, we decided to use a slightly different BatchPi.sh Slurm batch file:

#!/bin/bash

#SBATCH --job-name=ParPi
#SBATCH --time=2:00:00
#SBATCH --nodes=1
#SBATCH -p ...
#SBATCH -q ...

time ./julia ApproxPi.jl $SLURM_NTASKS

Note that we provided the Julia program with the number of tasks that it can use, although we did not specify that number in the Slurm script. We did so simply because it was easier to provide that information on the sbatch call using, e.g., sbatch --ntasks=8 BatchPi.sh

The next fragment of code demonstrates the creation of workers. We take the number of processes nn that was passed as a command-line argument and we create n1n-1 additional workers so that the master and the slaves use all available resources.

using Distributed

addprocs(parse(Int, ARGS[1])-1)

Despite the simplicity of the addprocs function, there is a lot going on behind the scenes. Immediately after its creation, each of the workers creates a TCP/IP connection, writes on standard output the corresponding host and port, and starts listening on that port. The master receives the output of each worker and completes the TCP/IP connection to each one. Then, it informs each worker of the connection point of all other workers, so that they can establish connections between themselves. To do this, each worker connects to all workers whose id is less than its own id.

Given that all these connections require time to establish, we decided to measure its impact in time and allocated memory as the number of processes increases. The results are the following:

This shows that using multiple processes entails an overhead with a fixed part of around one second and a variable part of 0.010.01 seconds per process.

The following step was to divide the work among the workers. To that end, each one needs to know what it is supposed to compute, and this is another area where Julia shines. The macro @everywhere allows the master to make requests to all workers. In our case, we need each worker to define the function approx_pi that computes the π\pi approximation using the Monte Carlo method. To force Just-In-Time compilation on each worker, we also request a corresponding function call.

@everywhere approx_pi(n) =
  let count = 0
    for i in 1:n
      count += (rand()^2 + rand()^2) <= 1
    end
    abs(4*count/n - pi)
  end

# Force compilation:
@everywhere approx_pi(100)

Finally, to compute the approximation, we take advantage of Julia's higher-level parallel operations, namely, pmap, which maps a function over an array, but arranging for the applications to be done in parallel by distributing the array values among the workers. We defined the p_approxi_pi function that helps the distribution by dividing the intended number of iterations between the workers and then computing the mean of the results that came from the workers.

p_approx_pi(n) =
  mean(pmap(n->approx_pi(n),
            [n/nworkers() for i in 1:nworkers()]))

To evaluate the effectiveness of this strategy, we experimented a series of computations using an exponentially larger number of iterations, using an increasingly larger number of parallel processes.

for i in 1:10
  println("p_approx_pi($(10^i)):", @timev p_approx_pi(10^i))
end

Note that, with this scheme, when we use two processes, we are in fact using just one worker, as the other one is just managing the distribution of work and collection of results. In the next plot, we also superimpose the results for the single-threaded case.

Results show that two processes, i.e., having just one worker doing all the work, is very similar to the single-threaded case. The performance impact is only 2.6%. However, it is not all good news, as the results also show that parallelization only pays off when the number of iterations reaches 10810^8.

On the other hand, when we compare the time it takes the entire experiment to complete for a given number of processes, the results are a bit surprising:

Now, we see that despite the considerable gains obtained, almost halving the time needed, it only pays off to use up to 16 processes.

Fairness

Our guess for the lack of speedup after 16 processes is that the time spent starting processes and managing them nullifies the gains of the parallelization. Another hypothesis is that, despite the number of workers created, Julia is not taking advantage of them because it does not fairly distribute the work among them. To refute (or confirm) this hypothesis, we decided to make a different test.

The goal of this test was to distribute identical work units among the workers and collect the number of units that were processed by each worker. To that end, we used the following program:

using Distributed
addprocs(parse(Int, ARGS[1])-1)

@everywhere work(x) = (sleep(0.01); myid())
res = @time pmap(work, 1:2000)

Note that the work unit of each process is just a quick sleep. Each process then returns its own id. The master, besides repeatedly sending elements of the range 1:2000 to the available workers, collects the results. This means that the range ends up being converted into an array of process ids, where each id represents the process that handled that work unit.

After the range is exhausted, the array of ids is processed to count the number of times each id showed up. If the distribution is fair, all processes will have more or less the same number of occurrences in the array, meaning that all of them had to process a similar number of work units.

distr = Dict([i=>0 for i in workers()])
for i in res
 distr[i]+=1
end

Finally, for each process id we print the number of times it occurred in the array, i.e., the number of work units that it had to process.

for i in workers()
  println("$(i)=>$(distr[i])")
end

In the following bar chart, we plot that number for each process. Their ids are presented on the horizontal axis.

Note the fairly regular number of units of work that is done by each worker. Similar plots could be made for different amounts of processes. The next one shows the same statistics but using only 32 processors:

Again, we see a relatively fair distribution of work. The same behavior is seen in the following case, using just four processes:

The following bar chart condenses the entire information in a single plot, showing the division of labor for different numbers of workers. As before, remember that the number of workers is one less than the number of processes. That means that, e.g., for 2 processes, there is just one worker doing all the heavy lifting.

As is visible, the work was uniformly distributed among the workers, independently of the number of workers being used.

Speedup

The following bar chart reveals another interesting statistic that confirms our previous hypothesis regarding the time spent managing the workers vs the time doing actual work. Here we measured the time needed to launch the workers (the addprocs operation) and the total time needed to process all work items.

Despite the enormous reduction in the time spent doing actual work (that goes from 22.5 seconds using just one worker to 0.42 seconds using 96 workers, i.e., 154\frac{1}{54} of the original time), we can see that the actual benefits become marginal when we use more than 32 processes and the situation gets actually worse with 96 processes, as the time to launch all of them dominates the entire computation. With that number of processors, the total time goes from 23.9 seconds to 2.55, i.e., 110\frac{1}{10} of the original total time. The following plot illustrates the difference between the speedups considering only the time the workers spend doing useful work and the corresponding speedups when we consider the total time.

Amdahl's Law

This phenomenon is an excellent example of Amdahl's law, presented by 1967 by Gene Amdahl, which establishes the theoretical limits of the achievable speedup when only a fraction of a process can be parallelized.

To derive the law, let us call TT the total duration of a process that can be divided into two parts: T=Ts+TpT=T_s+T_p, where TsT_s must be executed sequentially and, thus, cannot benefit from parallelization, and TpT_p can be parallelized. Assuming that the parallelizable part is a fraction p=TpTp=\frac{T_p}{T}, then Tp=pTT_p=pT and Ts=TTp=TpT=(1p)TT_s=T-T_p=T-pT=(1-p)T. Therefore, we have T=(1p)T+pTT=(1-p)T + pT.

After parallelization using nn processors, Tp=pTT_p=pT becomes Tp=Tpn=pTnT'_p=\frac{T_p}{n}=\frac{pT}{n}. Then, the total time becomes T=Ts+Tp=(1p)T+pnTT'=T_s+T'_p =(1-p)T+\frac{p}{n}T. The speedup, which is defined as the ratio between the duration of the non-parallelized version and the duration of the parallelized one, becomes S=TT=T(1p)T+pnT=11p+pnS=\frac{T}{T'}=\frac{T}{(1-p)T+\frac{p}{n}T}=\frac{1}{1-p+\frac{p}{n}}. Imagining that the number of processors is unlimited, the maximum theoretical speedup becomes

limnS=limn11p+pn=11p\lim_{n\to\infty} S=\lim_{n\to\infty} \frac{1}{1-p+\frac{p}{n}}=\frac{1}{1-p}

In our problem, the sequential time using only one worker took 22.5 seconds, of which 1.4 are wasted launching the additional process. This means that the parallelizable part is a fraction p=22.51.422.5=0.938p=\frac{22.5-1.4}{22.5}=0.938. In this case, the maximum speedup would not exceed 16, a far cry from the 10 that we obtained in the best case. In practice, the situation is even worse, as TsT_s is not constant and, in fact, increases with nn. For example, using 96 processors, TsT_s is already 2.132.13, which gives a maximum speedup of 13.5. Obviously, our example has other parts that cannot be parallelized.

<< Previous Chapter

Next Chapter >>