# How To Implement The Parallel "Stable" Sort Using Intel® MPI Library And Deploy It To A Multi-Node Computational Cluster

Published: 04/02/2020

Last Updated: 04/02/2020

## Introduction

In this article I will ground the discussion on the several aspects of delivering a modern parallel code using the Intel® MPI library, that provides even more performance speed-up and efficiency of the parallel “stable” sort, previously discussed.

Specifically, I will discuss how to use the Intel® MPI library to scale the parallel “stable” sort workloads execution across a collective of concurrent processes running in parallel and, thus, drastically increase the overall performance of sort.

The audience of this article’s readers will find out about an algorithm that allows to workshare the entire process of sorting between multiple processes, spawned in parallel, running in a single computational node (e.g. intra-node) or an entire computational cluster, consisting of multiple nodes, each one consuming its own CPU and system memory resources. The following algorithm is mainly based on the idea of combining the famous quicksort and mergesort algorithms to perform the parallel sort of big data divided into the sub-arrays, sorted by each process in a team independently and, then merge those sorted sub-arrays into an entire array of ordered data.

Along with the explanation of specific algorithms, I will discuss about the aspects of using Intel® MPI library to provide an interconnection and collective communication between processes, while performing the actual sort of big data in parallel. Specifically, I will delve into the explanation of using MPI library’s MPI_Bcast, MPI_Scatter and MPI_Gather routines to send the specific data between running processes, as well as to perform the various of data manipulation tasks such as splitting up the data into the number of chunks processed by each process in parallel.

Later on, in this article, I will discuss how to evaluate an execution time of the MPI program performing the multi-node parallel sort by using an approach introduced and thoroughly discussed by John McCalpin (Blackbelt) in his blog. Here, I will explain how to collect the execution wall time data and share it between processes using MPI’s one-sided communication (RMA) to estimate an overall program’s execution time.

Finally, I will discuss how to build and run the following MPI program on the Intel® DevCloud, as well as how to compare its performance speed-up with the performance of the conventional std::sort(…) and qsort(…) functions.

## The Parallel "Stable" Sort Scaled Across Multiple MPI Processes

An entire idea of performing the parallel “stable” sort, the execution of which is scaled across the number of parallel processes, can be formulated as the following algorithm:

1.       Generate an array of N-objects to be sorted

2.       Split up an entire array into the k-subsets of objects

3.       Sort each subset of objects in parallel

4.       Merge all k-subsets into the ordered array of objects

According to the algorithm listed above, an array of objects is generated by the root process with rank 0. After that, the root process splits up an entire array into the number of sub-arrays having an equal size. Then, the root process (rank 0) sends all sub-arrays to a group of processes, spawned in parallel. In turn, each process in the group receives its own sub-array, performing a parallel “stable” sort. Since, the sort has been completed by the collective of processes, each process sends an ordered sub-array back to the root process, that finally merges all sub-arrays into an entire array of sorted objects. In this case, the number of sub-arrays is equal to the number of running processes and, obviously, the amount of objects in each sub-array can be evaluated as the size of an entire array divided by the actual number of sub-arrays being sorted.

To merge all sub-arrays, we will use an algorithm which is much similar to the famous mergesort. Unlike the classical mergesort, the following algorithm will be used for merging of multiple (e.g. more than two) sub-arrays into an entire array of objects.

The following algorithm can be formulated as follows:

1.	Let A[0..N] – an array of objects,
S[0..N] – a resultant array of sorted objects, N – the overall number of objects;
2.	Initialize the variable _First as the index of the first object in the array S;
3.	Initialize the variable _Last as the index of the last object in the array S;
4.	Initialize the variable i as the index of the second sub-array (i = 1);
5.	Copy the first sub-array (i = 0) to the resultant array S;
6.	For each i-th sub-array of objects in A[0..N], do the following:
1. Compute the indices {xs, xe} of the first and last objects in the i-th sub-array;
2. Compute the size of the merged array S: L = (xe - xs) + (_Last - _First);
3. Merge the arrays A[xs..xe] and S[_First.._Last], assigning the result to
the resultant array S of the new size L, as follows: Merge(A[xs..xe], S[_First.._Last]) -> S;
4. Update the index variable _Last with the value of the merged array size L (_Last = L);


According to the algorithm listed above, we first need to initialize the resultant array of sorted objects S with the first sub-array in A[0..N]. After that, we must also initialize two index variables of _First and _Last, respectively. In this case, the variable _First is assigned to the index of the first object in the resultant array S (_First = 0) and the variable _Last – the index of the last object in the array S. Normally, the index of the last object is equal to the size of the first sub-array in A (e.g. _Last = chunk_size). Since the resultant array S and the specific indices have been initialized, we execute a single loop, during each iteration of which it maintains the indices xs and xe of the first and last objects in the current sub-array A[xs..xe]. Also, it computes the new size L of the resultant array as the sum of sizes of the i-th sub-array and the resultant array S. Finally, it does the merge of these two sub-arrays A[xs..xe] and S[_First..Last], accumulating the result to the target array S, so that the array S of the new size L will contain an ordered sequence of all objects from these both sub-arrays. As you might have probably noticed, while merging each sub-array and the resultant array S,  total size of the array S is increasing. During each loop iteration, we need to re-compute the index of the last object in the resultant array S equal to the value of the new overall size L previously obtained (_Last = L). It proceeds with the following process until the all sub-arrays in A[0..N] are merged into the resultant array of sorted objects.

Unfortunately, the following process cannot be run in parallel due to the data flow dependency issue incurred. Normally, the total overhead of the merging process is insignificant since it’s basically used for merging of a very small number of sub-arrays equal to the number of running processes. This, in turn, does not affect the performance of the sort, itself. The process of merging multiple sub-arrays also requires an extra space O(N), where N – the overall number of objects in the array being sorted. The merging process is entirely performed by the root process with rank 0 after all sub-arrays have been independently sorted by each process in the group.

The code below implements the sub-arrays merging algorithm being discussed:

		sorted = (gen::ITEM*)std::malloc(size * sizeof(gen::ITEM));
std::memcpy((void*)sorted, (const void*)array_target, sizeof(gen::ITEM) * chunk_size);

std::size_t _First = 0, _Last = chunk_size;
for (int rank = 1; rank < world_size; rank++)
{
std::size_t xs = rank * (size / world_size);
std::size_t xe = (rank + 1) * (size / world_size);

std::size_t merged_size = (xe - xs) + (_Last - _First);

gen::ITEM* merged = \
(gen::ITEM*)std::malloc(sizeof(gen::ITEM) * merged_size);

parallel_sort_mpi_impl::merge(sorted, array_target, \
merged, _First, _Last - 1, xs, xe - 1,
[&](const gen::ITEM& item1, const gen::ITEM& item2) {
return (item1.key < item2.key); },
[&](const gen::ITEM& item1, const gen::ITEM& item2) {
return (item1.key == item2.key) && (item1.value < item2.value); });

std::memcpy((void*)sorted, (const void*)merged, \
sizeof(gen::ITEM) * merged_size);

std::free(merged);

_Last = merged_size;
}

As a part of the algorithm, formulated above, we will use another algorithm that allows to merge two sub-arrays into the resultant array of sorted objects, so far. In this case, we will actually use the conventional algorithm, known as the classical mergesort, for that purpose.

The following algorithm can be formulated as follows:

1.	Let A1[0..N], A2[0..M] – the first and second sub-arrays to be merged,
M[0..N+M] – the resultant array of objects;
2.	Initialize the variable pos as the index of the first object in the resultant array M;
3.	Initialize the variable i as the index of the first object in the sub-array A1;
4.	Initialize the variable j as the index of the first object in the sub-array A2;
5.	Compute the overall size L of the two sub-arrays being merged (L = N + M);
6.	During each k-th iteration of the loop k = [0..L], do the following:
1. If A1[i].key < A2[j].key or A1[i].value < A2[j].value, assign the A1[i]
object at current position of the resultant array M (M[pos] = A1[i])
and increment the indices pos and i by 1;
2. If A1[i].key >= A2[j].key or A1[i].value >= A2[j].value, assign the A2[j]
object at the current position of the resultant array M (M[pos] = A2[j])
and increment the indices pos and j by 1;
7.	Append all objects in A1[i..N] to the resultant array M;
8.	Append all objects in A2[j..M] to the resultant array M;

To merge two arrays, we need to initialize the index (pos = 0) of the first object in the target array M, as well as the indices (i = 0) and (j = 0) of the first object in the sub-arrays A1 and A2, respectively. Also, we have to compute the size L of the resultant array M as the sum of the sub-arrays A1 and A2 sizes. Then, it perform exactly k = [0..L] iterations of the loop, during each iteration of which it evaluates the condition, comparing the key of the i-th object in the sub-array A1 to the key of the j-th object in the sub-array A2 (A1[i].key < A2[j].key). Similar, it also compares the values of these objects, such as (A1[i].value < A2[j].value). If the key of the A1[i] object is less than the key of A2[j] object and the condition is ‘true’, then it assigns the A1[i] object at the index pos in the resultant array M (M[pos] = A[i]). Otherwise, it assigns the A2[j] object at the index pos in the array M, instead (M[pos] = A[j]). Finally, at the end of the following loop execution, it appends all remaining objects in A1 to the resultant array M. Then, it does exactly the same with all remaining objects in A2, appending them to the resultant array M, so that at the end of the merging process we will result with a merged array of sorted objects from the sub-arrays A1 and A2, respectively.

The code in C++11 implementing the mergesort algorithm is listed below:

	template
void merge(const gen::ITEM* array1, const gen::ITEM* array2, \
gen::ITEM*& merged, std::size_t _First1, std::size_t _Last1, \
std::size_t _First2, std::size_t _Last2, _CompKeys comp_keys, _CompVals comp_vals)
{
if ((_Last1 <= _First1) ||
(_Last2 <= _First2)) return;

std::size_t pos = 0L;
std::size_t i = _First1, j = _First2;

std::size_t merged_size = \
((_Last1 - _First1) + 1) + ((_Last2 - _First2) + 1);

for (std::size_t index = 0; index < merged_size; index++)
{
if ((i <= _Last1) &&
((comp_keys(array1[i], array2[j])) ||
(comp_vals(array1[i], array2[j])))) {
merged[pos++] = array1[i++];
}
else if ((j <= _Last2) &&
((!comp_keys(array1[i], array2[j])) &&
(!comp_vals(array1[i], array2[j])))) {
merged[pos++] = array2[j++];
}
}

while (i <= _Last1)
merged[pos++] = array1[i++];
while (j <= _Last2)
merged[pos++] = array2[j++];
} 

As well as the previous algorithm discussed, the process of merging two sub-arrays cannot be run in parallel since the result of performing each loop iteration largely depends on the result of performing all preceding iterations, which is the data flow dependency issue. Also, the following algorithm requires an extra space O(N+M) to store a merged array into a separate temporary buffer.

## Implementing The Parallel "Stable" Sort Using Intel® MPI Library

The execution of any MPI program starts with the initialization of the MPI execution environment. This is typically done by invoking the specific MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided) routine, collectively executed by each process spawned. The following routine actually adds each running process to the MPI execution context. The following route accepts the number of arguments such as the pointers to the vector arguments and its count, as well as the level of provided thread support:

	// Initialize the MPI environment
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);

Also, the following routine performs the several background tasks, such as initializing the MPI communicator to make it possible that each running process sends and receives messages from the other processes in a group and, thus, sharing the data globally across all processes being spawned in parallel.

After performing the initialization, each process invokes the MPI_Comm_size(MPI_COMM_WORLD, &world_size) and MPI_Comm_rank(MPI_COMM_WORLD, &world_rank) routines to get the number of running processes and each process rank parameter values:

	// Get the number of processes
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

// Get the rank of the process
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank)

Next, each process collectively allocates a memory buffer to store the array of objects to be sorted using MPI_Alloc_mem(size * sizeof(gen::ITEM), MPI_INFO_NULL, &array) routine. Also, it creates two MPI windows for one-sided communication (RMA) between running processes. Specifically, these windows are used for the remote access of each process to the arrays of timestamps. In this case, the following arrays are solely used to store timestamps of the start and end execution wall times for each running process. An approach for evaluating the MPI program execution time is discussed below:

	gen::ITEM* array = nullptr;
std::double_t* p_timestamps_start = nullptr;
std::double_t* p_timestamps_finish = nullptr;

MPI_Alloc_mem(size * sizeof(gen::ITEM), MPI_INFO_NULL, &array);

MPI_Alloc_mem(world_size * sizeof(std::double_t), MPI_INFO_NULL, &p_timestamps_start);
MPI_Alloc_mem(world_size * sizeof(std::double_t), MPI_INFO_NULL, &p_timestamps_finish);

MPI_Win_create(p_timestamps_start, world_size * sizeof(std::double_t), sizeof(std::double_t), MPI_INFO_NULL, MPI_COMM_WORLD, &win_ts1);
MPI_Win_create(p_timestamps_finish, world_size * sizeof(std::double_t), sizeof(std::double_t), MPI_INFO_NULL, MPI_COMM_WORLD, &win_ts2);

std::memset((void*)p_timestamps_start, 0x00, sizeof(std::double_t) * world_size);
std::memset((void*)p_timestamps_finish, 0x00, sizeof(std::double_t) * world_size);


After allocating the specific buffers, the root process with rank 0 generates an array of objects to be sorted:

	if (world_rank == 0) {

std::string logo = "\nParallel Stable Sort v.3.00 (MPI Multi-Node Cluster) by Arthur V. Ratz";
std::cout << logo << "\n\n";

std::cout << "Device : " << device_queue.get_device().get_info() << std::endl << std::endl;

std::cout << "Generating an array of objects... (size = " << size << ")\n";

gen::generate_objects(array, size);

std::vector array_copy;
array_copy.assign(array, array + size);

std::cout << "sorting an array... (sort type: array of objects)\n";

// Obtain the value of walltime prior to performing the sequential sorting
time_s = MPI_Wtime();

// Perform the sequental sort by using std::sort function
internal::sequential_stable_sort(array_copy.begin(), array_copy.end(),
[&](const gen::ITEM& item1, const gen::ITEM& item2) { return item1.key < item2.key;  },
[&](const gen::ITEM& item1, const gen::ITEM& item2) { return item1.value < item2.value;  });

// Obtain the value of walltime after to performing the sequential sorting
time_f = MPI_Wtime();

// Compute the overall execution walltime
std_sort_time_elapsed = time_f - time_s;

array_copy.clear();

std::cout << std::setiosflags(std::ios::fixed) << std::setprecision(4)
<< "execution time (std::sort): " << std_sort_time_elapsed << " ms ";
}


Since the array of objects to be sorted has been generated, the root process broadcasts a value of the variable size to all other processes by invoking the MPI_Bcast(&size, 1, MPI_UNSIGNED_LONG_LONG, 0, MPI_COMM_WORLD);. This is basically needed to determine the size of the sub-array sorted by each process. Then, each process allocates a local buffer to store a sub-array to be sorted. Then, the processes collectively call MPI_Scatter(&array[0], chunk_size, dt_items, &array_chunk[0], chunk_size, dt_items, 0, MPI_COMM_WORLD) that splits up an entire array of objects generated by the root process and sends each sub-array scattered to a specific process running in parallel.

Finally, each process invokes the internal::parallel_stable_sort(…) function to perform the sorting of its own sub-array. Since the sorting of each sub-array is performed in parallel, we must provide an implicit barrier to synchronize the execution of all processes in a group. This typically done by invoking MPI_Barrier(MPI_COMM_WORLD) routine that blocks all processes until each process has done the sorting process and reached the barrier.

Since each sub-array has been sorted by a specific process, each process collectively invokes the MPI_Gather(&array_chunk[0], chunk_size, dt_items, &array_target[0], chunk_size, dt_items, 0, MPI_COMM_WORLD) to gather all sub-arrays sorted independently into an entire array of objects, sending them back to the root process with rank 0:

	MPI_Bcast(&size, 1, MPI_UNSIGNED_LONG_LONG, 0, MPI_COMM_WORLD);

std::size_t chunk_size = size / world_size;

gen::ITEM* array_chunk = \
(gen::ITEM*)std::malloc(chunk_size * sizeof(gen::ITEM));

MPI_Scatter(&array[0], chunk_size, dt_items, &array_chunk[0],
chunk_size, dt_items, 0, MPI_COMM_WORLD);

std::vector chunk_v;
chunk_v.assign(array_chunk, array_chunk + chunk_size);

p_timestamps_start[world_rank] = MPI_Wtime();

internal::parallel_stable_sort(chunk_v, device_queue,
[&](const gen::ITEM& item1, const gen::ITEM& item2) { return item1.key < item2.key;  },
[&](const gen::ITEM& item1, const gen::ITEM& item2) { return item1.value < item2.value;  });

p_timestamps_finish[world_rank] = MPI_Wtime();

std::memcpy((void*)array_chunk, (const void*)& chunk_v[0], \
sizeof(gen::ITEM) * chunk_size);

MPI_Barrier(MPI_COMM_WORLD);

gen::ITEM* array_target = nullptr;
if (world_rank == 0)
{
array_target = \
(gen::ITEM*)std::malloc(size * sizeof(gen::ITEM));
}

MPI_Gather(&array_chunk[0], chunk_size, dt_items, \
&array_target[0], chunk_size, dt_items, 0, MPI_COMM_WORLD);

In this code, each parallel process calls the internal::parallel_stable_sort(...) function to sort its own sub-array of objects in parallel. This, in turn, invokes nested-parallelism. This reference also mentions the either parallel quicksort or mergesort as the routines with nested parallelism. In this case, the actual sort is performed by a collective of processes executed in parallel at the outer level. At the same time, each process does the sorting of a specific sub-array in parallel at the inner level. The using and implementing nested parallelism concept allows to drastically improve the performance of the parallel sort, itself.

At the end of this MPI program execution, the root process with rank 0 merges all sub-arrays into an entire array of sorted objects by executing the following fragment of code listed below:

	if (world_rank == 0)
{
time_s = MPI_Wtime();

sorted = (gen::ITEM*)std::malloc(size * sizeof(gen::ITEM));
std::memcpy((void*)sorted, (const void*)array_target, sizeof(gen::ITEM) * chunk_size);

std::size_t _First = 0, _Last = chunk_size;
for (int rank = 1; rank < world_size; rank++)
{
std::size_t xs = rank * (size / world_size);
std::size_t xe = (rank + 1) * (size / world_size);

std::size_t merged_size = (xe - xs) + (_Last - _First);

gen::ITEM* merged = \
(gen::ITEM*)std::malloc(sizeof(gen::ITEM) * merged_size);

parallel_sort_mpi_impl::merge(sorted, array_target, \
merged, _First, _Last - 1, xs, xe - 1,
[&](const gen::ITEM& item1, const gen::ITEM& item2) {
return (item1.key < item2.key); },
[&](const gen::ITEM& item1, const gen::ITEM& item2) {
return (item1.key == item2.key) && (item1.value < item2.value); });

std::memcpy((void*)sorted, (const void*)merged, \
sizeof(gen::ITEM) * merged_size);

std::free(merged);

_Last = merged_size;
}

time_f = MPI_Wtime();
merge_time_diff = time_f - time_s;
}


## Estimating The MPI Program Execution Time

To estimate the MPI program execution time we will use an approach initially proposed by John McCalpin (Intel Blackbelt), according to which we allocate two buffers which size is equal to the number of running processes. The first buffer will be used to store the execution start wall time for each process in a group, and the second one - to store the execution end wall time, respectively. During each process execution, it invokes the MPI_Wtime() routine to get the current wall time value prior and after the execution of the code performing the actual parallel sorting. Then it stores these values to the first and second buffers, respectively, at the position equal to the process rank:

p_timestamps_start[world_rank] = MPI_Wtime();
// Sort the sub-array of objects
p_timestamps_finish[world_rank] = MPI_Wtime();

Notice that, these two buffers are shared across all processes via MPI's one-sided communication (RMA) using the specific MPI's windows previously initialized. After assigning the start and end wall execution time values at the corresponding positions in the p_timestamps_start and p_timestamps_finish buffers, we update the specific MPI windows, so that the data stored in these buffers is accessible by all processes, including the root process with rank 0:

 MPI_Win_lock_all(0, win_ts1);
for (int rank = 0; rank < world_size; rank++)
MPI_Put(&p_timestamps_start[world_rank], 1, MPI_DOUBLE, rank, world_rank, 1, MPI_DOUBLE, win_ts1);
MPI_Win_flush_local(world_rank, win_ts1);
MPI_Win_unlock_all(win_ts1);

MPI_Win_lock_all(0, win_ts2);
for (int rank = 0; rank < world_size; rank++)
MPI_Put(&p_timestamps_finish[world_rank], 1, MPI_DOUBLE, rank, world_rank, 1, MPI_DOUBLE, win_ts2);
MPI_Win_flush_local(world_rank, win_ts2);
MPI_Win_unlock_all(win_ts2);



At the end of the MPI program execution, the root process computes the overall execution time by using the following algorithm. It normally finds the minimum value of the start execution time for all processes stored in the p_timestamps_start buffer. Similarly, it finds the maximum value of the end execution time stored in the p_timestamps_finish buffer. Finally, it computes a span of those minimum start and maximum end wall time values, which is the MPI program overall execution time:

	if (world_rank == 0)
{
std::vector pv_timestamps_start;
pv_timestamps_start.assign(p_timestamps_start, p_timestamps_start + world_size);

std::vector pv_timestamps_finish;
pv_timestamps_finish.assign(p_timestamps_finish, p_timestamps_finish + world_size);

std::double_t mpi_time_start = \
* std::min_element(dpstd::execution::par_unseq,
pv_timestamps_start.begin(), pv_timestamps_start.end());
std::double_t mpi_time_finish = \
* std::max_element(dpstd::execution::par_unseq,
pv_timestamps_finish.begin(), pv_timestamps_finish.end());

std::double_t mpi_time_diff = mpi_time_finish - mpi_time_start;
mpi_sort_time_elapsed = mpi_time_diff + merge_time_diff;
}


## Build And Run The Sample MPI Program In The Intel® DevCloud

To build and run the sample MPI program, we will need to download a project's archive using the link at the bottom of this article's page. After we must upload the archive to the Intel® DevCloud using the Jupyter Notebook* and extract its contents by using the following command in Linux* terminal:

tar -xvf parallel_stable_sort_mpi.tar.gz

After that, we must use the following bash-script to build and run the project:

./build_run.sh

Optionally, we can build the project manually by using the following command:

mpiicpc -o parallel_stable_sort_mpi  -cxx=dpcpp -fsycl -lOpenCL -lsycl -ltbb -lmpi parallel_stable_sort_mpi.cpp

Finally, we can run the MPI program executable by using the command listed below:

mpirun -np 4 ./parallel_stable_sort_mpi

The following command launches the MPI program on the single intra-node by spawning the np = 4 parallel processes.

Additionally, there's no need to build the project, since the archive downloaded contains a pre-build MPI program executable file that can be easily run by using the command listed above.

## Evaluating The Performance

I have tested the performance of the parallel sort MPI program in the Intel DevCloud multi-node cluster. According to the performance evaluation results, the parallel sort performed by the MPI program discussed is much faster compared to the performance of the conventional std::sort(...) function. Here's some facts about the performance of the parallel sort performed by the MPI program discussed in this article:

 size >= 107 of objects 2x-8x   times faster size >= 109 of objects 8x-11x times faster

However, the performing of the parallel sort by the collective of processes spawned in more than one computational node (e.g. multi-node cluster) might incur the sufficient performance degrades since the actual bandwidth of the communication fabrics such as OpenFabrics Interfaces* (OFI)-capable network, Intel® True Scale Fabric, Cornelis Networks, InfiniBand* and Ethernet (through OFI API) is many times slower than the bandwidth of system memory within a single computational node.

That's actually why, it's strongly recommended to use the modern Intel® 10gbE Network fabrics to perform the sorting of big-data in the multi-node computational cluster.

#### Product and Performance Information

1

Performance varies by use, configuration and other factors. Learn more at www.Intel.com/PerformanceIndex.