455 lines
17 KiB
C++
455 lines
17 KiB
C++
/*!
|
|
* \file
|
|
* \brief Distributed sort implementation header
|
|
*
|
|
* \author
|
|
* Christos Choutouridis AEM:8997
|
|
* <cchoutou@ece.auth.gr>
|
|
*/
|
|
|
|
#ifndef DISTBITONIC_H_
|
|
#define DISTBITONIC_H_
|
|
|
|
#include <vector>
|
|
#include <algorithm>
|
|
#include <parallel/algorithm>
|
|
#include <cmath>
|
|
#include <cstdint>
|
|
#if !defined DEBUG
|
|
#define NDEBUG
|
|
#endif
|
|
#include <cassert>
|
|
|
|
#include "utils.hpp"
|
|
|
|
/*
|
|
* Exported timers
|
|
*/
|
|
extern Timing Timer_total;
|
|
extern Timing Timer_fullSort;
|
|
extern Timing Timer_exchange;
|
|
extern Timing Timer_minmax;
|
|
extern Timing Timer_elbowSort;
|
|
|
|
|
|
/*!
|
|
* Enumerator for the different versions of the sorting method
|
|
*/
|
|
enum class SortMode {
|
|
Bubbletonic, //!< The v0.5 of the algorithm where we use a bubble-sort like approach
|
|
Bitonic //!< The v1.0 of the algorithm where we use the bitonic data-exchange approach
|
|
};
|
|
|
|
/*
|
|
* ============================== Sort utilities ==============================
|
|
*/
|
|
|
|
/*!
|
|
* The primary function template of ascending(). It is DISABLED since , it is explicitly specialized
|
|
* for each of the \c SortMode
|
|
*/
|
|
template <SortMode Mode> inline bool ascending(mpi_id_t, [[maybe_unused]] size_t) noexcept = delete;
|
|
|
|
/*!
|
|
* Returns the ascending or descending configuration of the node's sequence based on
|
|
* the current node (MPI process) and the depth of the sorting network
|
|
*
|
|
* @param node [mpi_id_t] The current node (MPI process)
|
|
* @return [bool] True if we need ascending configuration, false otherwise
|
|
*/
|
|
template <> inline
|
|
bool ascending<SortMode::Bubbletonic>(mpi_id_t node, [[maybe_unused]] size_t depth) noexcept {
|
|
return (node % 2) == 0;
|
|
}
|
|
|
|
/*!
|
|
* Returns the ascending or descending configuration of the node's sequence based on
|
|
* the current node (MPI process) and the depth of the sorting network
|
|
*
|
|
* @param node [mpi_id_t] The current node (MPI process)
|
|
* @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
|
|
* @return [bool] True if we need ascending configuration, false otherwise
|
|
*/
|
|
template <> inline
|
|
bool ascending<SortMode::Bitonic>(mpi_id_t node, size_t depth) noexcept {
|
|
return !(node & (1 << depth));
|
|
}
|
|
|
|
/*!
|
|
* The primary function template of partner(). It is DISABLED since , it is explicitly specialized
|
|
* for each of the \c SortMode
|
|
*/
|
|
template <SortMode Mode> inline mpi_id_t partner(mpi_id_t, size_t) noexcept = delete;
|
|
|
|
/*!
|
|
* Returns the node's partner for data exchange during the sorting network iterations
|
|
* of Bubbletonic
|
|
*
|
|
* @param node [mpi_id_t] The current node
|
|
* @param step [size_t] The step of the sorting network
|
|
* @return [mpi_id_t] The node id of the partner for data exchange
|
|
*/
|
|
template <> inline
|
|
mpi_id_t partner<SortMode::Bubbletonic>(mpi_id_t node, size_t step) noexcept {
|
|
//return (node % 2 == step % 2) ? node + 1 : node - 1;
|
|
return (((node+step) % 2) == 0) ? node + 1 : node - 1;
|
|
}
|
|
|
|
/*!
|
|
* Returns the node's partner for data exchange during the sorting network iterations
|
|
* of Bitonic
|
|
*
|
|
* @param node [mpi_id_t] The current node
|
|
* @param step [size_t] The step of the sorting network
|
|
* @return [mpi_id_t] The node id of the partner for data exchange
|
|
*/
|
|
template <> inline
|
|
mpi_id_t partner<SortMode::Bitonic>(mpi_id_t node, size_t step) noexcept {
|
|
return (node ^ (1 << step));
|
|
}
|
|
|
|
|
|
/*!
|
|
* The primary function template of keepSmall(). It is DISABLED since , it is explicitly specialized
|
|
* for each of the \c SortMode
|
|
*/
|
|
template<SortMode Mode> inline bool keepSmall(mpi_id_t, mpi_id_t, [[maybe_unused]] size_t) = delete;
|
|
|
|
/*!
|
|
* Predicate to check if a node keeps the small numbers during the bubbletonic sort network exchange.
|
|
*
|
|
* @param node [mpi_id_t] The node for which we check
|
|
* @param partner [mpi_id_t] The partner of the data exchange
|
|
* @return [bool] True if the node should keep the small values, false otherwise
|
|
*/
|
|
template <> inline
|
|
bool keepSmall<SortMode::Bubbletonic>(mpi_id_t node, mpi_id_t partner, [[maybe_unused]] size_t depth) {
|
|
if (node == partner)
|
|
throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
|
|
return (node < partner);
|
|
}
|
|
|
|
/*!
|
|
* Predicate to check if a node keeps the small numbers during the bitonic sort network exchange.
|
|
*
|
|
* @param node [mpi_id_t] The node for which we check
|
|
* @param partner [mpi_id_t] The partner of the data exchange
|
|
* @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
|
|
* @return [bool] True if the node should keep the small values, false otherwise
|
|
*/
|
|
template <> inline
|
|
bool keepSmall<SortMode::Bitonic>(mpi_id_t node, mpi_id_t partner, size_t depth) {
|
|
if (node == partner)
|
|
throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
|
|
return ascending<SortMode::Bitonic>(node, depth) == (node < partner);
|
|
}
|
|
|
|
/*!
|
|
* Predicate to check if the node is active in the current iteration of the bubbletonic
|
|
* sort exchange.
|
|
*
|
|
* @param node [mpi_id_t] The node to check
|
|
* @param nodes [size_t] The total number of nodes
|
|
* @return [bool] True if the node is active, false otherwise
|
|
*/
|
|
bool isActive(mpi_id_t node, size_t nodes);
|
|
|
|
/*
|
|
* ============================== Data utilities ==============================
|
|
*/
|
|
|
|
/*!
|
|
* Sort a range using the build-in O(Nlog(N)) algorithm
|
|
*
|
|
* @tparam RangeT A range type with random access iterator
|
|
*
|
|
* @param data [RangeT] The data to be sorted
|
|
* @param ascending [bool] Flag to indicate the sorting order
|
|
*/
|
|
template<typename RangeT>
|
|
void fullSort(RangeT& data, bool ascending) noexcept {
|
|
// Use introsort from stdlib++ here, unless ... __gnu_parallel
|
|
if (ascending) {
|
|
__gnu_parallel::sort(data.begin(), data.end(), std::less<>());
|
|
}
|
|
else {
|
|
__gnu_parallel::sort(data.begin(), data.end(), std::greater<>());
|
|
}
|
|
|
|
if (config.exchangeOpt)
|
|
updateMinMax(localStat, data);
|
|
}
|
|
|
|
/*!
|
|
* Core functionality of sort for shadowed buffer types using
|
|
* the "elbow sort" algorithm.
|
|
*
|
|
* @note:
|
|
* This algorithm can not work "in place".
|
|
* We use the active buffer as source and the shadow as target.
|
|
* At the end we switch which buffer is active and which is the shadow.
|
|
* @note
|
|
* This is the core functionality. Use the elbowSort() function instead
|
|
*
|
|
* @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
|
|
* @tparam CompT A Comparison type for binary operation comparisons
|
|
*
|
|
* @param data [ShadowedDataT] The data to sort
|
|
* @param ascending [bool] Flag to indicate the sorting order
|
|
* @param comp [CompT] The binary operator object
|
|
*/
|
|
template<typename ShadowedDataT, typename CompT>
|
|
void elbowSortCore(ShadowedDataT& data, bool ascending, CompT comp) noexcept {
|
|
auto& active = data.getActive(); // Get the source vector (the data to sort)
|
|
auto& shadow = data.getShadow(); // Get the target vector (the sorted data)
|
|
|
|
size_t N = data.size(); // The total size is the same or both vectors
|
|
size_t left = std::distance(
|
|
active.begin(),
|
|
(ascending) ?
|
|
std::min_element(active.begin(), active.end()) :
|
|
std::max_element(active.begin(), active.end())
|
|
); // start 'left' from elbow of the bitonic
|
|
size_t right = (left == N-1) ? 0 : left + 1;
|
|
|
|
// Walk in opposite directions from elbow and insert-sort to target vector
|
|
for (size_t i = 0 ; i<N ; ++i) {
|
|
if (comp(active[left], active[right])) {
|
|
shadow[i] = active[left];
|
|
left = (left == 0) ? N-1 : left -1; // cycle decrease
|
|
}
|
|
else {
|
|
shadow[i] = active[right];
|
|
right = (right + 1) % N; // cycle increase
|
|
}
|
|
}
|
|
data.switch_active(); // Switch active-shadow buffers
|
|
}
|
|
|
|
/*!
|
|
* Sort a shadowed buffer using the "elbow sort" algorithm.
|
|
*
|
|
* @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
|
|
*
|
|
* @param data [ShadowedDataT] The data to sort
|
|
* @param ascending [bool] Flag to indicate the sorting order
|
|
*/
|
|
template<typename ShadowedDataT>
|
|
void elbowSort(ShadowedDataT& data, bool ascending) noexcept {
|
|
if (ascending)
|
|
elbowSortCore(data, ascending, std::less<>());
|
|
else
|
|
elbowSortCore(data, ascending, std::greater<>());
|
|
}
|
|
|
|
/*!
|
|
* Predicate for exchange optimization. Returns true only if an exchange between partners is needed.
|
|
* In order to do that we exchange min and max statistics of the partner's data.
|
|
*
|
|
* @tparam StatT Statistics data type (for min-max)
|
|
*
|
|
* @param lstat [const StatT] Reference to the local statistic data
|
|
* @param rstat [StatT] Reference to the remote statistic data to fill
|
|
* @param part [mpi_id_t] The partner for the exchange
|
|
* @param tag [int] The tag to use for the exchange of stats
|
|
* @param keepSmall [bool] Flag to indicate if the local thread keeps the small ro the large values
|
|
* @return True if we need data exchange, false otherwise
|
|
*/
|
|
template<typename StatT>
|
|
bool needsExchange(const StatT& lstat, StatT& rstat, mpi_id_t part, int tag, bool keepSmall) {
|
|
timeCall(Timer_exchange, mpi.exchange_it, lstat, rstat, part, tag);
|
|
return (keepSmall) ?
|
|
rstat.min < lstat.max // Lmin: rstat.min - Smax: lstat.max
|
|
: lstat.min < rstat.max; // Lmin: lstat.min - Smax: rstat.max
|
|
}
|
|
|
|
|
|
/*!
|
|
* Update stats utility
|
|
*
|
|
* @tparam RangeT A range type with random access iterator
|
|
* @tparam StatT Statistics data type (for min-max)
|
|
*
|
|
* @param stat [StatT] Reference to the statistic data to update
|
|
* @param data [const RangeT] Reference to the sequence to extract stats from
|
|
*/
|
|
template<typename RangeT, typename StatT>
|
|
void updateMinMax(StatT& stat, const RangeT& data) noexcept {
|
|
auto [min, max] = std::minmax_element(data.begin(), data.end());
|
|
stat.min = *min;
|
|
stat.max = *max;
|
|
}
|
|
|
|
/*!
|
|
* Takes two sequences and selects either the larger or the smaller items
|
|
* in one-to-one comparison between them. If the initial sequences are bitonic, then
|
|
* the result is a bitonic sequence too!
|
|
*
|
|
* @tparam ValueT The underlying type of the sequences
|
|
*
|
|
* @param local [ValueT*] Pointer to the local sequence
|
|
* @param remote [const ValueT*] Pointer to the remote sequence (copied locally by MPI)
|
|
* @param count [size_t] The number of items to process
|
|
* @param keepSmall [bool] Flag to indicate if we keep the small items in local sequence
|
|
*/
|
|
template<typename ValueT>
|
|
void keepMinOrMax(ValueT* local, const ValueT* remote, size_t count, bool keepSmall) noexcept {
|
|
std::transform(
|
|
local, local + count,
|
|
remote,
|
|
local,
|
|
[&keepSmall](const ValueT& a, const ValueT& b){
|
|
return (keepSmall) ? std::min(a, b) : std::max(a, b);
|
|
});
|
|
}
|
|
|
|
/*
|
|
* ============================== Sort algorithms ==============================
|
|
*/
|
|
|
|
/*!
|
|
* A small tag generator tool to provide consistent encoding to tag communication
|
|
*
|
|
* @param depth The current algorithmic depth[bitonic] of the communication, if any
|
|
* @param step The current step on the current depth
|
|
* @param stage The stage of the pipeline.
|
|
* @return The tag to use.
|
|
*
|
|
* @note
|
|
* In case we call this function outside of the pipeline loop, we can ommit
|
|
* @c stage argument and use the return value as starting tag for every communication
|
|
* of the pipeline loop. We need to increase the tags for each communication of
|
|
* the pipeline loop though!
|
|
*/
|
|
size_t tagGenerator(size_t depth, size_t step, size_t stage = 0);
|
|
|
|
/*!
|
|
* An exchange functionality to support both Bubbletonic and Bitonic sort algorithms.
|
|
*
|
|
* @note
|
|
* In case of pipeline request it switches to non-blocking MPI communication for
|
|
* pipelining min-max process with mpi data exchange
|
|
*
|
|
* @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
|
|
*
|
|
* @param data [ShadowedDataT&] Reference to the data to exchange
|
|
* @param partner [mpi_id_t] The partner for the exchange
|
|
* @param keepSmall [bool] Flag to indicate if we keep the small values
|
|
* @param tag [int] The init tag to use for the loop.
|
|
*
|
|
* @note
|
|
* The @c tag is increased inside the pipeline loop for each different data exchange
|
|
*/
|
|
template<typename ShadowedDataT>
|
|
void exchange(ShadowedDataT& data, mpi_id_t partner, bool keepSmall, int tag) {
|
|
using Value_t = typename ShadowedDataT::value_type;
|
|
|
|
// Init counters and pointers
|
|
Value_t* active = data.getActive().data();
|
|
Value_t* shadow = data.getShadow().data();
|
|
size_t count = data.size() / config.pipeline;
|
|
|
|
if (config.pipeline > 1) {
|
|
// Pipeline case - use async MPI
|
|
Timer_exchange.start();
|
|
mpi.exchange_start(active, shadow, count, partner, tag);
|
|
for (size_t stage = 0; stage < config.pipeline; active += count, shadow += count) {
|
|
// Wait previous chunk
|
|
mpi.exchange_wait();
|
|
Timer_exchange.stop();
|
|
if (++stage < config.pipeline) {
|
|
// Start next chunk if there is a next one
|
|
Timer_exchange.start();
|
|
mpi.exchange_start(active + count, shadow + count, count, partner, ++tag);
|
|
}
|
|
// process the arrived data
|
|
timeCall(Timer_minmax, keepMinOrMax, active, shadow, count, keepSmall);
|
|
}
|
|
}
|
|
else {
|
|
// No pipeline - use blocking MPI
|
|
timeCall(Timer_exchange, mpi.exchange, active, shadow, count, partner, tag);
|
|
timeCall(Timer_minmax, keepMinOrMax, active, shadow, count, keepSmall);
|
|
}
|
|
if (config.exchangeOpt)
|
|
updateMinMax(localStat, data);
|
|
}
|
|
|
|
/*!
|
|
* A distributed version of the Bubbletonic sort algorithm.
|
|
*
|
|
* @note
|
|
* Each MPI process should run an instance of this function.
|
|
*
|
|
* @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
|
|
*
|
|
* @param data [ShadowedDataT] The local to MPI process data to sort
|
|
* @param Processes [mpi_id_t] The total number of MPI processes
|
|
* @param rank [mpi_id_t] The current process id
|
|
*/
|
|
template<typename ShadowedDataT>
|
|
void distBubbletonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
|
|
// Initially sort to create a half part of a bitonic sequence
|
|
timeCall(Timer_fullSort, fullSort, data, ascending<SortMode::Bubbletonic>(rank, 0));
|
|
|
|
// Sort network (O(N) iterations)
|
|
for (size_t step = 0; step < static_cast<size_t>(Processes); ++step) {
|
|
// Find out exchange configuration
|
|
auto part = partner<SortMode::Bubbletonic>(rank, step);
|
|
auto ks = keepSmall<SortMode::Bubbletonic>(rank, part, Processes);
|
|
if ( isActive(rank, Processes) &&
|
|
isActive(part, Processes) ) {
|
|
// Exchange with partner, keep nim-or-max and sort - O(N)
|
|
int tag = static_cast<int>(tagGenerator(0, step));
|
|
if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) {
|
|
exchange(data, part, ks, tag);
|
|
timeCall(Timer_elbowSort, elbowSort, data, ascending<SortMode::Bubbletonic>(rank, Processes));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Invert if the node was descending.
|
|
if (!ascending<SortMode::Bubbletonic>(rank, 0)) {
|
|
elbowSort(data, true);
|
|
}
|
|
}
|
|
|
|
|
|
/*!
|
|
* A distributed version of the Bitonic sort algorithm.
|
|
*
|
|
* @note
|
|
* Each MPI process should run an instance of this function.
|
|
*
|
|
* @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
|
|
*
|
|
* @param data [ShadowedDataT] The local to MPI process data to sort
|
|
* @param Processes [mpi_id_t] The total number of MPI processes
|
|
* @param rank [mpi_id_t] The current process id
|
|
*/
|
|
template<typename ShadowedDataT>
|
|
void distBitonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
|
|
// Initially sort to create a half part of a bitonic sequence
|
|
timeCall(Timer_fullSort, fullSort, data, ascending<SortMode::Bitonic>(rank, 0));
|
|
|
|
// Run through sort network using elbow-sort ( O(LogN * LogN) iterations )
|
|
auto p = static_cast<uint32_t>(std::log2(Processes));
|
|
for (size_t depth = 1; depth <= p; ++depth) {
|
|
for (size_t step = depth; step > 0;) {
|
|
--step;
|
|
// Find out exchange configuration
|
|
auto part = partner<SortMode::Bitonic>(rank, step);
|
|
auto ks = keepSmall<SortMode::Bitonic>(rank, part, depth);
|
|
// Exchange with partner, keep nim-or-max
|
|
int tag = static_cast<int>(tagGenerator(depth, step));
|
|
if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) {
|
|
exchange(data, part, ks, tag);
|
|
}
|
|
}
|
|
// sort - O(N)
|
|
timeCall(Timer_elbowSort, elbowSort, data, ascending<SortMode::Bitonic>(rank, depth));
|
|
}
|
|
}
|
|
|
|
#endif //DISTBITONIC_H_
|