HW3: [no compile] A first clean up
This commit is contained in:
parent
2ff6ae171a
commit
146e975ac1
@ -10,24 +10,42 @@
|
||||
#include "distsort.hpp"
|
||||
|
||||
|
||||
bool isActive(mpi_id_t node, size_t nodes) {
|
||||
if (!((nodes > 0) &&
|
||||
(nodes <= static_cast<size_t>(std::numeric_limits<mpi_id_t>::max())) ))
|
||||
throw std::runtime_error("(isActive) Non-acceptable value of MPI Nodes\n");
|
||||
// ^ Assert that mpi_id_t can hold nodes, and thus we can cast without data loss!
|
||||
|
||||
return (node >= 0) && (node < static_cast<mpi_id_t>(nodes));
|
||||
/*!
|
||||
* Returns the ascending or descending configuration of the node's sequence based on
|
||||
* the current node (MPI process) and the depth of the sorting network
|
||||
*
|
||||
* @param node [mpi_id_t] The current node (MPI process)
|
||||
* @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
|
||||
* @return [bool] True if we need ascending configuration, false otherwise
|
||||
*/
|
||||
bool ascending(mpi_id_t node, size_t depth) noexcept {
|
||||
return !(node & (1 << depth));
|
||||
}
|
||||
|
||||
size_t tagGenerator(size_t depth, size_t step, size_t stage) {
|
||||
auto stage_bits = static_cast<uint32_t>(std::log2(MAX_PIPELINE_SIZE));
|
||||
auto step_bits = static_cast<uint32_t>(std::log2(MAX_MPI_SIZE));
|
||||
uint32_t stat_bit = 1UL;
|
||||
// ^ We use MPI_SIZE room for steps to fit the bubbletonic version
|
||||
|
||||
// [ depth | step | stage+stats ]
|
||||
size_t tag = stage
|
||||
| (step << (stage_bits + stat_bit))
|
||||
| (depth << (stage_bits + step_bits + stat_bit));
|
||||
return tag;
|
||||
/*!
|
||||
* Returns the node's partner for data exchange during the sorting network iterations
|
||||
* of Bitonic
|
||||
*
|
||||
* @param node [mpi_id_t] The current node
|
||||
* @param step [size_t] The step of the sorting network
|
||||
* @return [mpi_id_t] The node id of the partner for data exchange
|
||||
*/
|
||||
mpi_id_t partner(mpi_id_t node, size_t step) noexcept {
|
||||
return (node ^ (1 << step));
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* Predicate to check if a node keeps the small numbers during the bitonic sort network exchange.
|
||||
*
|
||||
* @param node [mpi_id_t] The node for which we check
|
||||
* @param partner [mpi_id_t] The partner of the data exchange
|
||||
* @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
|
||||
* @return [bool] True if the node should keep the small values, false otherwise
|
||||
*/
|
||||
|
||||
bool keepSmall(mpi_id_t node, mpi_id_t partner, size_t depth) {
|
||||
if (node == partner)
|
||||
throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
|
||||
return ascending(node, depth) == (node < partner);
|
||||
}
|
||||
|
@ -32,35 +32,12 @@ extern Timing Timer_minmax;
|
||||
extern Timing Timer_elbowSort;
|
||||
|
||||
|
||||
/*!
|
||||
* Enumerator for the different versions of the sorting method
|
||||
*/
|
||||
enum class SortMode {
|
||||
Bubbletonic, //!< The v0.5 of the algorithm where we use a bubble-sort like approach
|
||||
Bitonic //!< The v1.0 of the algorithm where we use the bitonic data-exchange approach
|
||||
};
|
||||
|
||||
/*
|
||||
* ============================== Sort utilities ==============================
|
||||
*/
|
||||
|
||||
/*!
|
||||
* The primary function template of ascending(). It is DISABLED since , it is explicitly specialized
|
||||
* for each of the \c SortMode
|
||||
*/
|
||||
template <SortMode Mode> inline bool ascending(mpi_id_t, [[maybe_unused]] size_t) noexcept = delete;
|
||||
|
||||
/*!
|
||||
* Returns the ascending or descending configuration of the node's sequence based on
|
||||
* the current node (MPI process) and the depth of the sorting network
|
||||
*
|
||||
* @param node [mpi_id_t] The current node (MPI process)
|
||||
* @return [bool] True if we need ascending configuration, false otherwise
|
||||
*/
|
||||
template <> inline
|
||||
bool ascending<SortMode::Bubbletonic>(mpi_id_t node, [[maybe_unused]] size_t depth) noexcept {
|
||||
return (node % 2) == 0;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Returns the ascending or descending configuration of the node's sequence based on
|
||||
@ -70,30 +47,7 @@ bool ascending<SortMode::Bubbletonic>(mpi_id_t node, [[maybe_unused]] size_t dep
|
||||
* @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
|
||||
* @return [bool] True if we need ascending configuration, false otherwise
|
||||
*/
|
||||
template <> inline
|
||||
bool ascending<SortMode::Bitonic>(mpi_id_t node, size_t depth) noexcept {
|
||||
return !(node & (1 << depth));
|
||||
}
|
||||
|
||||
/*!
|
||||
* The primary function template of partner(). It is DISABLED since , it is explicitly specialized
|
||||
* for each of the \c SortMode
|
||||
*/
|
||||
template <SortMode Mode> inline mpi_id_t partner(mpi_id_t, size_t) noexcept = delete;
|
||||
|
||||
/*!
|
||||
* Returns the node's partner for data exchange during the sorting network iterations
|
||||
* of Bubbletonic
|
||||
*
|
||||
* @param node [mpi_id_t] The current node
|
||||
* @param step [size_t] The step of the sorting network
|
||||
* @return [mpi_id_t] The node id of the partner for data exchange
|
||||
*/
|
||||
template <> inline
|
||||
mpi_id_t partner<SortMode::Bubbletonic>(mpi_id_t node, size_t step) noexcept {
|
||||
//return (node % 2 == step % 2) ? node + 1 : node - 1;
|
||||
return (((node+step) % 2) == 0) ? node + 1 : node - 1;
|
||||
}
|
||||
bool ascending(mpi_id_t node, size_t depth);
|
||||
|
||||
/*!
|
||||
* Returns the node's partner for data exchange during the sorting network iterations
|
||||
@ -103,32 +57,9 @@ mpi_id_t partner<SortMode::Bubbletonic>(mpi_id_t node, size_t step) noexcept {
|
||||
* @param step [size_t] The step of the sorting network
|
||||
* @return [mpi_id_t] The node id of the partner for data exchange
|
||||
*/
|
||||
template <> inline
|
||||
mpi_id_t partner<SortMode::Bitonic>(mpi_id_t node, size_t step) noexcept {
|
||||
return (node ^ (1 << step));
|
||||
}
|
||||
mpi_id_t partner(mpi_id_t node, size_t step);
|
||||
|
||||
|
||||
/*!
|
||||
* The primary function template of keepSmall(). It is DISABLED since , it is explicitly specialized
|
||||
* for each of the \c SortMode
|
||||
*/
|
||||
template<SortMode Mode> inline bool keepSmall(mpi_id_t, mpi_id_t, [[maybe_unused]] size_t) = delete;
|
||||
|
||||
/*!
|
||||
* Predicate to check if a node keeps the small numbers during the bubbletonic sort network exchange.
|
||||
*
|
||||
* @param node [mpi_id_t] The node for which we check
|
||||
* @param partner [mpi_id_t] The partner of the data exchange
|
||||
* @return [bool] True if the node should keep the small values, false otherwise
|
||||
*/
|
||||
template <> inline
|
||||
bool keepSmall<SortMode::Bubbletonic>(mpi_id_t node, mpi_id_t partner, [[maybe_unused]] size_t depth) {
|
||||
if (node == partner)
|
||||
throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
|
||||
return (node < partner);
|
||||
}
|
||||
|
||||
/*!
|
||||
* Predicate to check if a node keeps the small numbers during the bitonic sort network exchange.
|
||||
*
|
||||
@ -137,22 +68,9 @@ bool keepSmall<SortMode::Bubbletonic>(mpi_id_t node, mpi_id_t partner, [[maybe_u
|
||||
* @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
|
||||
* @return [bool] True if the node should keep the small values, false otherwise
|
||||
*/
|
||||
template <> inline
|
||||
bool keepSmall<SortMode::Bitonic>(mpi_id_t node, mpi_id_t partner, size_t depth) {
|
||||
if (node == partner)
|
||||
throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
|
||||
return ascending<SortMode::Bitonic>(node, depth) == (node < partner);
|
||||
}
|
||||
bool keepSmall(mpi_id_t node, mpi_id_t partner, size_t depth);
|
||||
|
||||
|
||||
/*!
|
||||
* Predicate to check if the node is active in the current iteration of the bubbletonic
|
||||
* sort exchange.
|
||||
*
|
||||
* @param node [mpi_id_t] The node to check
|
||||
* @param nodes [size_t] The total number of nodes
|
||||
* @return [bool] True if the node is active, false otherwise
|
||||
*/
|
||||
bool isActive(mpi_id_t node, size_t nodes);
|
||||
|
||||
/*
|
||||
* ============================== Data utilities ==============================
|
||||
@ -175,9 +93,6 @@ void fullSort(RangeT& data, bool ascending) noexcept {
|
||||
else {
|
||||
__gnu_parallel::sort(data.begin(), data.end(), std::greater<>());
|
||||
}
|
||||
|
||||
if (config.exchangeOpt)
|
||||
updateMinMax(localStat, data);
|
||||
}
|
||||
|
||||
/*!
|
||||
@ -242,43 +157,6 @@ void elbowSort(ShadowedDataT& data, bool ascending) noexcept {
|
||||
elbowSortCore(data, ascending, std::greater<>());
|
||||
}
|
||||
|
||||
/*!
|
||||
* Predicate for exchange optimization. Returns true only if an exchange between partners is needed.
|
||||
* In order to do that we exchange min and max statistics of the partner's data.
|
||||
*
|
||||
* @tparam StatT Statistics data type (for min-max)
|
||||
*
|
||||
* @param lstat [const StatT] Reference to the local statistic data
|
||||
* @param rstat [StatT] Reference to the remote statistic data to fill
|
||||
* @param part [mpi_id_t] The partner for the exchange
|
||||
* @param tag [int] The tag to use for the exchange of stats
|
||||
* @param keepSmall [bool] Flag to indicate if the local thread keeps the small ro the large values
|
||||
* @return True if we need data exchange, false otherwise
|
||||
*/
|
||||
template<typename StatT>
|
||||
bool needsExchange(const StatT& lstat, StatT& rstat, mpi_id_t part, int tag, bool keepSmall) {
|
||||
timeCall(Timer_exchange, mpi.exchange_it, lstat, rstat, part, tag);
|
||||
return (keepSmall) ?
|
||||
rstat.min < lstat.max // Lmin: rstat.min - Smax: lstat.max
|
||||
: lstat.min < rstat.max; // Lmin: lstat.min - Smax: rstat.max
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* Update stats utility
|
||||
*
|
||||
* @tparam RangeT A range type with random access iterator
|
||||
* @tparam StatT Statistics data type (for min-max)
|
||||
*
|
||||
* @param stat [StatT] Reference to the statistic data to update
|
||||
* @param data [const RangeT] Reference to the sequence to extract stats from
|
||||
*/
|
||||
template<typename RangeT, typename StatT>
|
||||
void updateMinMax(StatT& stat, const RangeT& data) noexcept {
|
||||
auto [min, max] = std::minmax_element(data.begin(), data.end());
|
||||
stat.min = *min;
|
||||
stat.max = *max;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Takes two sequences and selects either the larger or the smaller items
|
||||
@ -307,113 +185,6 @@ void keepMinOrMax(ValueT* local, const ValueT* remote, size_t count, bool keepSm
|
||||
* ============================== Sort algorithms ==============================
|
||||
*/
|
||||
|
||||
/*!
|
||||
* A small tag generator tool to provide consistent encoding to tag communication
|
||||
*
|
||||
* @param depth The current algorithmic depth[bitonic] of the communication, if any
|
||||
* @param step The current step on the current depth
|
||||
* @param stage The stage of the pipeline.
|
||||
* @return The tag to use.
|
||||
*
|
||||
* @note
|
||||
* In case we call this function outside of the pipeline loop, we can ommit
|
||||
* @c stage argument and use the return value as starting tag for every communication
|
||||
* of the pipeline loop. We need to increase the tags for each communication of
|
||||
* the pipeline loop though!
|
||||
*/
|
||||
size_t tagGenerator(size_t depth, size_t step, size_t stage = 0);
|
||||
|
||||
/*!
|
||||
* An exchange functionality to support both Bubbletonic and Bitonic sort algorithms.
|
||||
*
|
||||
* @note
|
||||
* In case of pipeline request it switches to non-blocking MPI communication for
|
||||
* pipelining min-max process with mpi data exchange
|
||||
*
|
||||
* @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
|
||||
*
|
||||
* @param data [ShadowedDataT&] Reference to the data to exchange
|
||||
* @param partner [mpi_id_t] The partner for the exchange
|
||||
* @param keepSmall [bool] Flag to indicate if we keep the small values
|
||||
* @param tag [int] The init tag to use for the loop.
|
||||
*
|
||||
* @note
|
||||
* The @c tag is increased inside the pipeline loop for each different data exchange
|
||||
*/
|
||||
template<typename ShadowedDataT>
|
||||
void exchange(ShadowedDataT& data, mpi_id_t partner, bool keepSmall, int tag) {
|
||||
using Value_t = typename ShadowedDataT::value_type;
|
||||
|
||||
// Init counters and pointers
|
||||
Value_t* active = data.getActive().data();
|
||||
Value_t* shadow = data.getShadow().data();
|
||||
size_t count = data.size() / config.pipeline;
|
||||
|
||||
if (config.pipeline > 1) {
|
||||
// Pipeline case - use async MPI
|
||||
Timer_exchange.start();
|
||||
mpi.exchange_start(active, shadow, count, partner, tag);
|
||||
for (size_t stage = 0; stage < config.pipeline; active += count, shadow += count) {
|
||||
// Wait previous chunk
|
||||
mpi.exchange_wait();
|
||||
Timer_exchange.stop();
|
||||
if (++stage < config.pipeline) {
|
||||
// Start next chunk if there is a next one
|
||||
Timer_exchange.start();
|
||||
mpi.exchange_start(active + count, shadow + count, count, partner, ++tag);
|
||||
}
|
||||
// process the arrived data
|
||||
timeCall(Timer_minmax, keepMinOrMax, active, shadow, count, keepSmall);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// No pipeline - use blocking MPI
|
||||
timeCall(Timer_exchange, mpi.exchange, active, shadow, count, partner, tag);
|
||||
timeCall(Timer_minmax, keepMinOrMax, active, shadow, count, keepSmall);
|
||||
}
|
||||
if (config.exchangeOpt)
|
||||
updateMinMax(localStat, data);
|
||||
}
|
||||
|
||||
/*!
|
||||
* A distributed version of the Bubbletonic sort algorithm.
|
||||
*
|
||||
* @note
|
||||
* Each MPI process should run an instance of this function.
|
||||
*
|
||||
* @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
|
||||
*
|
||||
* @param data [ShadowedDataT] The local to MPI process data to sort
|
||||
* @param Processes [mpi_id_t] The total number of MPI processes
|
||||
* @param rank [mpi_id_t] The current process id
|
||||
*/
|
||||
template<typename ShadowedDataT>
|
||||
void distBubbletonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
|
||||
// Initially sort to create a half part of a bitonic sequence
|
||||
timeCall(Timer_fullSort, fullSort, data, ascending<SortMode::Bubbletonic>(rank, 0));
|
||||
|
||||
// Sort network (O(N) iterations)
|
||||
for (size_t step = 0; step < static_cast<size_t>(Processes); ++step) {
|
||||
// Find out exchange configuration
|
||||
auto part = partner<SortMode::Bubbletonic>(rank, step);
|
||||
auto ks = keepSmall<SortMode::Bubbletonic>(rank, part, Processes);
|
||||
if ( isActive(rank, Processes) &&
|
||||
isActive(part, Processes) ) {
|
||||
// Exchange with partner, keep nim-or-max and sort - O(N)
|
||||
int tag = static_cast<int>(tagGenerator(0, step));
|
||||
if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) {
|
||||
exchange(data, part, ks, tag);
|
||||
timeCall(Timer_elbowSort, elbowSort, data, ascending<SortMode::Bubbletonic>(rank, Processes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Invert if the node was descending.
|
||||
if (!ascending<SortMode::Bubbletonic>(rank, 0)) {
|
||||
elbowSort(data, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*!
|
||||
* A distributed version of the Bitonic sort algorithm.
|
||||
@ -428,9 +199,9 @@ void distBubbletonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
|
||||
* @param rank [mpi_id_t] The current process id
|
||||
*/
|
||||
template<typename ShadowedDataT>
|
||||
void distBitonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
|
||||
void distBitonic(ShadowedDataT& data) {
|
||||
// Initially sort to create a half part of a bitonic sequence
|
||||
timeCall(Timer_fullSort, fullSort, data, ascending<SortMode::Bitonic>(rank, 0));
|
||||
timeCall(Timer_fullSort, fullSort, data, ascending(rank, 0));
|
||||
|
||||
// Run through sort network using elbow-sort ( O(LogN * LogN) iterations )
|
||||
auto p = static_cast<uint32_t>(std::log2(Processes));
|
||||
@ -438,16 +209,14 @@ void distBitonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
|
||||
for (size_t step = depth; step > 0;) {
|
||||
--step;
|
||||
// Find out exchange configuration
|
||||
auto part = partner<SortMode::Bitonic>(rank, step);
|
||||
auto ks = keepSmall<SortMode::Bitonic>(rank, part, depth);
|
||||
auto part = partner(rank, step);
|
||||
auto ks = keepSmall(rank, part, depth);
|
||||
// Exchange with partner, keep nim-or-max
|
||||
int tag = static_cast<int>(tagGenerator(depth, step));
|
||||
if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) {
|
||||
exchange(data, part, ks, tag);
|
||||
}
|
||||
|
||||
}
|
||||
// sort - O(N)
|
||||
timeCall(Timer_elbowSort, elbowSort, data, ascending<SortMode::Bitonic>(rank, depth));
|
||||
timeCall(Timer_elbowSort, elbowSort, data, ascending(rank, depth));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,10 +19,9 @@
|
||||
|
||||
// Global session data
|
||||
config_t config;
|
||||
MPI_t<> mpi;
|
||||
distBuffer_t Data;
|
||||
Log logger;
|
||||
distStat_t localStat, remoteStat;
|
||||
|
||||
|
||||
// Mersenne seeded from hw if possible. range: [type_min, type_max]
|
||||
std::random_device rd;
|
||||
@ -76,21 +75,6 @@ bool get_options(int argc, char* argv[]){
|
||||
status = false;
|
||||
}
|
||||
}
|
||||
else if (arg == "-e" || arg == "--exchange-opt") {
|
||||
config.exchangeOpt = true;
|
||||
}
|
||||
else if (arg == "--pipeline") {
|
||||
if (i+1 < argc) {
|
||||
auto stages = atoi(argv[++i]);
|
||||
if (isPowerOfTwo(stages) && stages <= static_cast<int>(MAX_PIPELINE_SIZE))
|
||||
config.pipeline = stages;
|
||||
else
|
||||
status = false;
|
||||
}
|
||||
else {
|
||||
status = false;
|
||||
}
|
||||
}
|
||||
else if (arg == "--validation") {
|
||||
config.validation = true;
|
||||
}
|
||||
@ -109,25 +93,18 @@ bool get_options(int argc, char* argv[]){
|
||||
config.verbose = true;
|
||||
}
|
||||
else if (arg == "--version") {
|
||||
std::cout << "distbitonic/distbubbletonic - A distributed sort utility\n";
|
||||
std::cout << "bitonic - A GPU accelerated sort utility\n";
|
||||
std::cout << "version: " << version << "\n\n";
|
||||
exit(0);
|
||||
}
|
||||
else if (arg == "-h" || arg == "--help") {
|
||||
std::cout << "distbitonic/distbubbletonic - A distributed sort utility\n\n";
|
||||
std::cout << " distbitonic -q <N> [-e] [-p | --pipeline <N>] [--validation] [--perf <N>] [--ndebug] [-v]\n";
|
||||
std::cout << "distbitonic - A distributed sort utility\n\n";
|
||||
std::cout << " distbitonic -q <N> [--validation] [--perf <N>] [--ndebug] [-v]\n";
|
||||
std::cout << " distbitonic -h\n";
|
||||
std::cout << " distbubbletonic -q <N> [-e] [-p | --pipeline <N>] [--validation] [--perf <N> ] [--ndebug] [-v]\n";
|
||||
std::cout << " distbubbletonic -h\n";
|
||||
std::cout << '\n';
|
||||
std::cout << "Options:\n\n";
|
||||
std::cout << " -q | --array-size <N>\n";
|
||||
std::cout << " Selects the array size according to size = 2^N\n\n";
|
||||
std::cout << " -e | --exchange-opt\n";
|
||||
std::cout << " Request an MPI data exchange optimization \n\n";
|
||||
std::cout << " -p <N> | --pipeline <N>\n";
|
||||
std::cout << " Request a pipeline of <N> stages for exchange-minmax\n";
|
||||
std::cout << " N must be power of 2 up to " << MAX_PIPELINE_SIZE << "\n\n";
|
||||
std::cout << " --validation\n";
|
||||
std::cout << " Request a full validation at the end, performed by process rank 0\n\n";
|
||||
std::cout << " --perf <N> \n";
|
||||
@ -142,10 +119,8 @@ bool get_options(int argc, char* argv[]){
|
||||
std::cout << " --version\n";
|
||||
std::cout << " Prints version and exit.\n\n";
|
||||
std::cout << "Examples:\n\n";
|
||||
std::cout << " mpirun -np 4 distbitonic -q 24\n";
|
||||
std::cout << " Runs distbitonic in 4 MPI processes with 2^24 array points each\n\n";
|
||||
std::cout << " mpirun -np 16 distbubbletonic -q 20\n";
|
||||
std::cout << " Runs distbubbletonic in 16 MPI processes with 2^20 array points each\n\n";
|
||||
std::cout << " bitonic -q 24\n";
|
||||
std::cout << " Runs bitonic with GPU acceleration with 2^24 array points\n\n";
|
||||
|
||||
exit(0);
|
||||
}
|
||||
@ -170,34 +145,10 @@ bool get_options(int argc, char* argv[]){
|
||||
* @return [bool] True if all are sorted and in total ascending order
|
||||
*/
|
||||
template<typename ShadowedDataT>
|
||||
bool validator(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
|
||||
bool validator(ShadowedDataT& data) {
|
||||
using value_t = typename ShadowedDataT::value_type;
|
||||
bool ret = true; // Have faith!
|
||||
|
||||
// Local results
|
||||
value_t lmin = data.front();
|
||||
value_t lmax = data.back();
|
||||
value_t lsort = static_cast<value_t>(std::is_sorted(data.begin(), data.end()));
|
||||
|
||||
// Gather min/max/sort to rank 0
|
||||
std::vector<value_t> mins(Processes);
|
||||
std::vector<value_t> maxes(Processes);
|
||||
std::vector<value_t> sorts(Processes);
|
||||
|
||||
MPI_Datatype datatype = MPI_TypeMapper<value_t>::getType();
|
||||
MPI_Gather(&lmin, 1, datatype, mins.data(), 1, datatype, 0, MPI_COMM_WORLD);
|
||||
MPI_Gather(&lmax, 1, datatype, maxes.data(), 1, datatype, 0, MPI_COMM_WORLD);
|
||||
MPI_Gather(&lsort, 1, datatype, sorts.data(), 1, datatype, 0, MPI_COMM_WORLD);
|
||||
|
||||
// Check all results
|
||||
if (rank == 0) {
|
||||
for (mpi_id_t r = 1; r < Processes; ++r) {
|
||||
if (sorts[r] == 0)
|
||||
ret = false;
|
||||
if (maxes[r - 1] > mins[r])
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -212,31 +163,6 @@ void init(int* argc, char*** argv) {
|
||||
if (!get_options(*argc, *argv))
|
||||
exit(1);
|
||||
|
||||
// Initialize MPI environment
|
||||
mpi.init(argc, argv);
|
||||
|
||||
logger << "MPI environment initialized." << " Rank: " << mpi.rank() << " Size: " << mpi.size()
|
||||
<< logger.endl;
|
||||
|
||||
#if defined DEBUG
|
||||
#if defined TESTING
|
||||
/*
|
||||
* In case of a debug build we will wait here until sleep_wait
|
||||
* will reset via debugger. In order to do that the user must attach
|
||||
* debugger to all processes. For example:
|
||||
* $> mpirun -np 2 ./<program path>
|
||||
* $> ps aux | grep <program>
|
||||
* $> gdb <program> <PID1>
|
||||
* $> gdb <program> <PID2>
|
||||
*/
|
||||
volatile bool sleep_wait = false;
|
||||
#else
|
||||
volatile bool sleep_wait = true;
|
||||
#endif
|
||||
while (sleep_wait && !config.ndebug)
|
||||
sleep(1);
|
||||
#endif
|
||||
|
||||
// Prepare vector and timing data
|
||||
Data.resize(config.arraySize);
|
||||
measurements_init();
|
||||
@ -260,37 +186,28 @@ int main(int argc, char* argv[]) try {
|
||||
);
|
||||
std::generate(Data.begin(), Data.end(), [&]() { return dis(gen); });
|
||||
// Run distributed sort
|
||||
if (mpi.rank() == 0)
|
||||
logger << "Starting distributed sorting ... ";
|
||||
Timer_total.start();
|
||||
#if CODE_VERSION == BUBBLETONIC
|
||||
distBubbletonic(Data, mpi.size(), mpi.rank());
|
||||
#else
|
||||
distBitonic(Data, mpi.size(), mpi.rank());
|
||||
#endif
|
||||
distBitonic(Data);
|
||||
Timer_total.stop();
|
||||
measurements_next();
|
||||
if (mpi.rank() == 0)
|
||||
logger << " Done." << logger.endl;
|
||||
}
|
||||
|
||||
// Print-outs and validation
|
||||
if (config.perf > 1) {
|
||||
Timing::print_duration(Timer_total.median(), "Total ", mpi.rank());
|
||||
Timing::print_duration(Timer_fullSort.median(), "Full-Sort ", mpi.rank());
|
||||
Timing::print_duration(Timer_exchange.median(), "Exchange ", mpi.rank());
|
||||
Timing::print_duration(Timer_minmax.median(), "Min-Max ", mpi.rank());
|
||||
Timing::print_duration(Timer_elbowSort.median(),"Elbow-Sort", mpi.rank());
|
||||
Timing::print_duration(Timer_total.median(), "Total ", 0);
|
||||
Timing::print_duration(Timer_fullSort.median(), "Full-Sort ", 0);
|
||||
Timing::print_duration(Timer_exchange.median(), "Exchange ", 0);
|
||||
Timing::print_duration(Timer_minmax.median(), "Min-Max ", 0);
|
||||
Timing::print_duration(Timer_elbowSort.median(),"Elbow-Sort", 0);
|
||||
}
|
||||
if (config.validation) {
|
||||
// If requested, we have the chance to fail!
|
||||
if (mpi.rank() == 0)
|
||||
std::cout << "[Validation] Results validation ...";
|
||||
bool val = validator(Data, mpi.size(), mpi.rank());
|
||||
if (mpi.rank() == 0)
|
||||
bool val = validator(Data);
|
||||
std::cout << ((val) ? "\x1B[32m [PASSED] \x1B[0m\n" : " \x1B[32m [FAILED] \x1B[0m\n");
|
||||
}
|
||||
mpi.finalize();
|
||||
return 0;
|
||||
}
|
||||
catch (std::exception& e) {
|
||||
|
@ -13,256 +13,10 @@
|
||||
#include <iostream>
|
||||
#include <chrono>
|
||||
#include <unistd.h>
|
||||
#include <mpi.h>
|
||||
#include <algorithm>
|
||||
|
||||
#include "config.h"
|
||||
|
||||
/*!
|
||||
* Min-Max statistics data for exchange optimization
|
||||
* @tparam Value_t The underlying data type of the sequence data
|
||||
*/
|
||||
template <typename Value_t>
|
||||
struct Stat_t {
|
||||
using value_type = Value_t; //!< meta-export the type
|
||||
|
||||
Value_t min{}; //!< The minimum value of the sequence
|
||||
Value_t max{}; //!< The maximum value of the sequence
|
||||
};
|
||||
|
||||
//! Application data selection alias
|
||||
using distStat_t = Stat_t<distValue_t>;
|
||||
extern distStat_t localStat, remoteStat; // Make stats public
|
||||
|
||||
/*
|
||||
* MPI_<type> dispatcher mechanism
|
||||
*/
|
||||
template <typename T> struct MPI_TypeMapper { };
|
||||
|
||||
template <> struct MPI_TypeMapper<char> { static MPI_Datatype getType() { return MPI_CHAR; } };
|
||||
template <> struct MPI_TypeMapper<short> { static MPI_Datatype getType() { return MPI_SHORT; } };
|
||||
template <> struct MPI_TypeMapper<int> { static MPI_Datatype getType() { return MPI_INT; } };
|
||||
template <> struct MPI_TypeMapper<long> { static MPI_Datatype getType() { return MPI_LONG; } };
|
||||
template <> struct MPI_TypeMapper<long long> { static MPI_Datatype getType() { return MPI_LONG_LONG; } };
|
||||
template <> struct MPI_TypeMapper<unsigned char> { static MPI_Datatype getType() { return MPI_UNSIGNED_CHAR; } };
|
||||
template <> struct MPI_TypeMapper<unsigned short>{ static MPI_Datatype getType() { return MPI_UNSIGNED_SHORT; } };
|
||||
template <> struct MPI_TypeMapper<unsigned int> { static MPI_Datatype getType() { return MPI_UNSIGNED; } };
|
||||
template <> struct MPI_TypeMapper<unsigned long> { static MPI_Datatype getType() { return MPI_UNSIGNED_LONG; } };
|
||||
template <> struct MPI_TypeMapper<unsigned long long> { static MPI_Datatype getType() { return MPI_UNSIGNED_LONG_LONG; } };
|
||||
template <> struct MPI_TypeMapper<float> { static MPI_Datatype getType() { return MPI_FLOAT; } };
|
||||
template <> struct MPI_TypeMapper<double> { static MPI_Datatype getType() { return MPI_DOUBLE; } };
|
||||
|
||||
/*!
|
||||
* MPI wrapper type to provide MPI functionality and RAII to MPI as a resource
|
||||
*
|
||||
* @tparam TID The MPI type for process id [default: int]
|
||||
*/
|
||||
template<typename TID = int>
|
||||
struct MPI_t {
|
||||
using ID_t = TID; // Export TID type (currently int defined by the standard)
|
||||
|
||||
/*!
|
||||
* Initializes the MPI environment, must called from each process
|
||||
*
|
||||
* @param argc [int*] POINTER to main's argc argument
|
||||
* @param argv [char***] POINTER to main's argv argument
|
||||
*/
|
||||
void init(int* argc, char*** argv) {
|
||||
// Initialize the MPI environment
|
||||
int err;
|
||||
if ((err = MPI_Init(argc, argv)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Init() - ");
|
||||
initialized_ = true;
|
||||
|
||||
// Get the number of processes
|
||||
int size_value, rank_value;
|
||||
if ((err = MPI_Comm_size(MPI_COMM_WORLD, &size_value)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Comm_size() - ");
|
||||
if ((err = MPI_Comm_rank(MPI_COMM_WORLD, &rank_value)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Comm_rank() - ");
|
||||
size_ = static_cast<ID_t>(size_value);
|
||||
rank_ = static_cast<ID_t>(rank_value);
|
||||
if (size_ > static_cast<ID_t>(MAX_MPI_SIZE))
|
||||
throw std::runtime_error(
|
||||
"(MPI) size - Not supported number of nodes [over " + std::to_string(MAX_MPI_SIZE) + "]\n"
|
||||
);
|
||||
|
||||
// Get the name of the processor
|
||||
char processor_name[MPI_MAX_PROCESSOR_NAME];
|
||||
int name_len;
|
||||
if ((err = MPI_Get_processor_name(processor_name, &name_len)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Get_processor_name() - ");
|
||||
name_ = std::string (processor_name, name_len);
|
||||
}
|
||||
|
||||
/*!
|
||||
* Exchange one data object of type @c T with partner as part of the sorting network of both
|
||||
* bubbletonic or bitonic sorting algorithms.
|
||||
*
|
||||
* This function matches a transmit and a receive in order for fully exchanged the data object
|
||||
* between current node and partner.
|
||||
*
|
||||
* @tparam T The object type
|
||||
*
|
||||
* @param local [const T&] Reference to the local object to send
|
||||
* @param remote [T&] Reference to the object to receive data from partner
|
||||
* @param partner [mpi_id_t] The partner for the exchange
|
||||
* @param tag [int] The tag to use for the MPI communication
|
||||
*/
|
||||
template<typename T>
|
||||
void exchange_it(const T& local, T& remote, ID_t partner, int tag) {
|
||||
if (tag < 0)
|
||||
throw std::runtime_error("(MPI) exchange_it() [tag] - Out of bound");
|
||||
MPI_Status status;
|
||||
int err;
|
||||
if ((err = MPI_Sendrecv(
|
||||
&local, sizeof(T), MPI_BYTE, partner, tag,
|
||||
&remote, sizeof(T), MPI_BYTE, partner, tag,
|
||||
MPI_COMM_WORLD, &status
|
||||
)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Sendrecv() [item] - ");
|
||||
}
|
||||
|
||||
/*!
|
||||
* Exchange data with partner as part of the sorting network of both bubbletonic or bitonic
|
||||
* sorting algorithms.
|
||||
*
|
||||
* This function matches a transmit and a receive in order for fully exchanged data between
|
||||
* current node and partner.
|
||||
*
|
||||
* @tparam ValueT The value type used in buffer
|
||||
*
|
||||
* @param ldata [const ValueT*] Pointer to local data to send
|
||||
* @param rdata [ValueT*] Pointer to buffer to receive data from partner
|
||||
* @param count [size_t] The number of data to exchange
|
||||
* @param partner [mpi_id_t] The partner for the exchange
|
||||
* @param tag [int] The tag to use for the MPI communication
|
||||
*/
|
||||
template<typename ValueT>
|
||||
void exchange(const ValueT* ldata, ValueT* rdata, size_t count, ID_t partner, int tag) {
|
||||
if (tag < 0)
|
||||
throw std::runtime_error("(MPI) exchange_data() [tag] - Out of bound");
|
||||
|
||||
MPI_Datatype datatype = MPI_TypeMapper<ValueT>::getType();
|
||||
MPI_Status status;
|
||||
int err;
|
||||
if ((err = MPI_Sendrecv(
|
||||
ldata, count, datatype, partner, tag,
|
||||
rdata, count, datatype, partner, tag,
|
||||
MPI_COMM_WORLD, &status
|
||||
)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Sendrecv() [data] - ");
|
||||
}
|
||||
|
||||
/*!
|
||||
* Initiate a data exchange data with partner using non-blocking Isend-Irecv, as part of the
|
||||
* sorting network of both bubbletonic or bitonic sorting algorithms.
|
||||
*
|
||||
* This function matches a transmit and a receive in order for fully exchanged data between
|
||||
* current node and partner.
|
||||
* @note
|
||||
* This call MUST paired with exchange_wait() for each MPI_t object.
|
||||
* Calling 2 consecutive exchange_start() for the same MPI_t object is undefined.
|
||||
*
|
||||
* @tparam ValueT The value type used in buffers
|
||||
*
|
||||
* @param ldata [const ValueT*] Pointer to local data to send
|
||||
* @param rdata [ValueT*] Pointer to buffer to receive data from partner
|
||||
* @param count [size_t] The number of data to exchange
|
||||
* @param partner [mpi_id_t] The partner for the exchange
|
||||
* @param tag [int] The tag to use for the MPI communication
|
||||
*/
|
||||
template<typename ValueT>
|
||||
void exchange_start(const ValueT* ldata, ValueT* rdata, size_t count, ID_t partner, int tag) {
|
||||
if (tag < 0)
|
||||
throw std::runtime_error("(MPI) exchange_data() [tag] - Out of bound");
|
||||
|
||||
MPI_Datatype datatype = MPI_TypeMapper<ValueT>::getType();
|
||||
int err;
|
||||
err = MPI_Isend(ldata, count, datatype, partner, tag, MPI_COMM_WORLD, &handle_tx);
|
||||
if (err != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Isend() - ");
|
||||
err = MPI_Irecv(rdata, count, datatype, partner, tag, MPI_COMM_WORLD, &handle_rx);
|
||||
if (err != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Irecv() - ");
|
||||
}
|
||||
|
||||
/*!
|
||||
* Block wait for the completion of the previously called exchange_start()
|
||||
*
|
||||
* @note
|
||||
* This call MUST paired with exchange_start() for each MPI_t object.
|
||||
* Calling 2 consecutive exchange_wait() for the same MPI_t object is undefined.
|
||||
*/
|
||||
void exchange_wait() {
|
||||
MPI_Status status;
|
||||
|
||||
int err;
|
||||
if ((err = MPI_Wait(&handle_tx, &status)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Wait() [send] - ");
|
||||
|
||||
if ((err = MPI_Wait(&handle_rx, &status)) != MPI_SUCCESS)
|
||||
mpi_throw(err, "(MPI) MPI_Wait() [recv] - ");
|
||||
}
|
||||
|
||||
// Accessors
|
||||
[[nodiscard]] ID_t rank() const noexcept { return rank_; }
|
||||
[[nodiscard]] ID_t size() const noexcept { return size_; }
|
||||
[[nodiscard]] const std::string& name() const noexcept { return name_; }
|
||||
|
||||
// Mutators
|
||||
ID_t rank(ID_t rank) noexcept { return rank_ = rank; }
|
||||
ID_t size(ID_t size) noexcept { return size_ = size; }
|
||||
std::string& name(const std::string& name) noexcept { return name_ = name; }
|
||||
|
||||
/*!
|
||||
* Finalized the MPI
|
||||
*/
|
||||
void finalize() {
|
||||
// Finalize the MPI environment
|
||||
initialized_ = false;
|
||||
MPI_Finalize();
|
||||
}
|
||||
|
||||
//! RAII MPI finalization
|
||||
~MPI_t() {
|
||||
// Finalize the MPI environment even on unexpected errors
|
||||
if (initialized_)
|
||||
MPI_Finalize();
|
||||
}
|
||||
|
||||
|
||||
// Local functionality
|
||||
private:
|
||||
/*!
|
||||
* Throw exception helper. It bundles the prefix msg with the MPI error string retrieved by
|
||||
* MPI API.
|
||||
*
|
||||
* @param err The MPI error code
|
||||
* @param prefixMsg The prefix text for the exception error message
|
||||
*/
|
||||
void mpi_throw(int err, const char* prefixMsg) {
|
||||
char err_msg[MPI_MAX_ERROR_STRING];
|
||||
int msg_len;
|
||||
MPI_Error_string(err, err_msg, &msg_len);
|
||||
throw std::runtime_error(prefixMsg + std::string (err_msg) + '\n');
|
||||
}
|
||||
|
||||
private:
|
||||
ID_t rank_{}; //!< MPI rank of the process
|
||||
ID_t size_{}; //!< MPI total size of the execution
|
||||
std::string name_{}; //!< The name of the local machine
|
||||
bool initialized_{}; //!< RAII helper flag
|
||||
MPI_Request handle_tx{}; //!< MPI async exchange handler for Transmission
|
||||
MPI_Request handle_rx{}; //!< MPI async exchange handler for Receptions
|
||||
};
|
||||
|
||||
/*
|
||||
* Exported data types
|
||||
*/
|
||||
extern MPI_t<> mpi;
|
||||
using mpi_id_t = MPI_t<>::ID_t;
|
||||
|
||||
|
||||
|
||||
/*!
|
||||
* @brief A std::vector wrapper with 2 vectors, an active and a shadow.
|
||||
@ -463,19 +217,19 @@ struct Timing {
|
||||
}
|
||||
|
||||
//! Tool to print the time interval
|
||||
static void print_duration(const Tduration& duration, const char *what, mpi_id_t rank) noexcept {
|
||||
static void print_duration(const Tduration& duration, const char *what) noexcept {
|
||||
if (std::chrono::duration_cast<microseconds>(duration).count() < 10000)
|
||||
std::cout << "[Timing] (Rank " << rank << ") " << what << ": "
|
||||
std::cout << "[Timing] " << what << ": "
|
||||
<< std::to_string(std::chrono::duration_cast<microseconds>(duration).count()) << " [usec]\n";
|
||||
else if (std::chrono::duration_cast<milliseconds>(duration).count() < 10000)
|
||||
std::cout << "[Timing] (Rank " << rank << ") " << what << ": "
|
||||
std::cout << "[Timing] " << what << ": "
|
||||
<< std::to_string(std::chrono::duration_cast<milliseconds>(duration).count()) << " [msec]\n";
|
||||
else {
|
||||
char stime[26]; // fit ulong
|
||||
auto sec = std::chrono::duration_cast<seconds>(duration).count();
|
||||
auto msec = (std::chrono::duration_cast<milliseconds>(duration).count() % 1000) / 10; // keep 2 digit
|
||||
std::sprintf(stime, "%ld.%1ld", sec, msec);
|
||||
std::cout << "[Timing] (Rank " << rank << ") " << what << ": " << stime << " [sec]\n";
|
||||
std::cout << "[Timing] " << what << ": " << stime << " [sec]\n";
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user