Пусть у нас есть p = 2^k (k >= 1) процессоров. Разобьём входную последовательность из n символов на блоки
размера n / p элементов. Разошлём эти блоки по процессам. Каждый процесс на локальном уровне применяет быструю
сортировку для упорядочивания последовательности. После чего начинается процедура слияния. Каждый процесс с
нечётным номером отсылает данные соседу справа, который применяет к двум последовательностям (своей и соседа)
сортировку слиянием. После чего процессы с нечётными номерами завершаются. Среди оставшихся процессов
происходит перенумерация и снова нечётные процессы отправляют данные соседу справа. Так продолжается до тех
пор, пока не останется один процесс, который будет обладать искомой отсортированной последовательностью.
void parallel_sort_master(int* array, int size) {
int* resultant_array;
int* data;
int* sub;
int m;
int id, np; // MPI variables
int s; // per-process size
int move;
MPI_Status status;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &np);
//Task Of The Master Processor
if (id == 0) {
int r;
s = size / np;
r = size % np;
if (r != 0) {
data = (int *) malloc((size + s - r) * sizeof (int));
memcpy(data, array, size * sizeof (int));
for (int i = size; i < size + s - r; i++)
data[i] = 0x07ffffff; // LARGE_LARGE const
s = s + 1;
} else {
data = array;
}
}
MPI_Bcast(&s, 1, MPI_INT, 0, MPI_COMM_WORLD);
//Allocating resultant array
resultant_array = (int *) malloc(s * sizeof (int));
MPI_Scatter(data, s, MPI_INT, resultant_array, s, MPI_INT, 0, MPI_COMM_WORLD);
linear_sort(resultant_array, s);
}
move = 1;
//Merging the sub sorted arrays to obtain resultant sorted array
while (move < np) {
if (id % (2 * move) == 0) {
if (id + move < np) { //Receive
MPI_Recv(&m, 1, MPI_INT, id + move, 0, MPI_COMM_WORLD, &status);
sub = (int *) malloc(m * sizeof (int)); //Allocate space for sub array
MPI_Recv(sub, m, MPI_INT, id + move, 0, MPI_COMM_WORLD, &status);
//merge arrays
int *tmp = mergeArrays(resultant_array, s, sub, m);
free(resultant_array);
resultant_array = tmp;
s = s + m;
free(sub);
}
} else { //Send data to neighbour processors
int near = id - move;
MPI_Send(&s, 1, MPI_INT, near, 0, MPI_COMM_WORLD);
MPI_Send(resultant_array, s, MPI_INT, near, 0, MPI_COMM_WORLD);
break;
}
move = move * 2;
}
if (id == 0) {
memcpy(array, resultant_array, size * sizeof (int));
if (array != data) {
free(data);
}
}
free(resultant_array);
}