AUTH's THMMY "Parallel and distributed systems" course assignments.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

distsort.hpp 17 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. /*!
  2. * \file
  3. * \brief Distributed sort implementation header
  4. *
  5. * \author
  6. * Christos Choutouridis AEM:8997
  7. * <cchoutou@ece.auth.gr>
  8. */
  9. #ifndef DISTBITONIC_H_
  10. #define DISTBITONIC_H_
  11. #include <vector>
  12. #include <algorithm>
  13. #include <parallel/algorithm>
  14. #include <cmath>
  15. #include <cstdint>
  16. #if !defined DEBUG
  17. #define NDEBUG
  18. #endif
  19. #include <cassert>
  20. #include "utils.hpp"
  21. /*
  22. * Exported timers
  23. */
  24. extern Timing Timer_total;
  25. extern Timing Timer_fullSort;
  26. extern Timing Timer_exchange;
  27. extern Timing Timer_minmax;
  28. extern Timing Timer_elbowSort;
  29. /*!
  30. * Enumerator for the different versions of the sorting method
  31. */
  32. enum class SortMode {
  33. Bubbletonic, //!< The v0.5 of the algorithm where we use a bubble-sort like approach
  34. Bitonic //!< The v1.0 of the algorithm where we use the bitonic data-exchange approach
  35. };
  36. /*
  37. * ============================== Sort utilities ==============================
  38. */
  39. /*!
  40. * The primary function template of ascending(). It is DISABLED since , it is explicitly specialized
  41. * for each of the \c SortMode
  42. */
  43. template <SortMode Mode> inline bool ascending(mpi_id_t, [[maybe_unused]] size_t) noexcept = delete;
  44. /*!
  45. * Returns the ascending or descending configuration of the node's sequence based on
  46. * the current node (MPI process) and the depth of the sorting network
  47. *
  48. * @param node [mpi_id_t] The current node (MPI process)
  49. * @return [bool] True if we need ascending configuration, false otherwise
  50. */
  51. template <> inline
  52. bool ascending<SortMode::Bubbletonic>(mpi_id_t node, [[maybe_unused]] size_t depth) noexcept {
  53. return (node % 2) == 0;
  54. }
  55. /*!
  56. * Returns the ascending or descending configuration of the node's sequence based on
  57. * the current node (MPI process) and the depth of the sorting network
  58. *
  59. * @param node [mpi_id_t] The current node (MPI process)
  60. * @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
  61. * @return [bool] True if we need ascending configuration, false otherwise
  62. */
  63. template <> inline
  64. bool ascending<SortMode::Bitonic>(mpi_id_t node, size_t depth) noexcept {
  65. return !(node & (1 << depth));
  66. }
  67. /*!
  68. * The primary function template of partner(). It is DISABLED since , it is explicitly specialized
  69. * for each of the \c SortMode
  70. */
  71. template <SortMode Mode> inline mpi_id_t partner(mpi_id_t, size_t) noexcept = delete;
  72. /*!
  73. * Returns the node's partner for data exchange during the sorting network iterations
  74. * of Bubbletonic
  75. *
  76. * @param node [mpi_id_t] The current node
  77. * @param step [size_t] The step of the sorting network
  78. * @return [mpi_id_t] The node id of the partner for data exchange
  79. */
  80. template <> inline
  81. mpi_id_t partner<SortMode::Bubbletonic>(mpi_id_t node, size_t step) noexcept {
  82. //return (node % 2 == step % 2) ? node + 1 : node - 1;
  83. return (((node+step) % 2) == 0) ? node + 1 : node - 1;
  84. }
  85. /*!
  86. * Returns the node's partner for data exchange during the sorting network iterations
  87. * of Bitonic
  88. *
  89. * @param node [mpi_id_t] The current node
  90. * @param step [size_t] The step of the sorting network
  91. * @return [mpi_id_t] The node id of the partner for data exchange
  92. */
  93. template <> inline
  94. mpi_id_t partner<SortMode::Bitonic>(mpi_id_t node, size_t step) noexcept {
  95. return (node ^ (1 << step));
  96. }
  97. /*!
  98. * The primary function template of keepSmall(). It is DISABLED since , it is explicitly specialized
  99. * for each of the \c SortMode
  100. */
  101. template<SortMode Mode> inline bool keepSmall(mpi_id_t, mpi_id_t, [[maybe_unused]] size_t) = delete;
  102. /*!
  103. * Predicate to check if a node keeps the small numbers during the bubbletonic sort network exchange.
  104. *
  105. * @param node [mpi_id_t] The node for which we check
  106. * @param partner [mpi_id_t] The partner of the data exchange
  107. * @return [bool] True if the node should keep the small values, false otherwise
  108. */
  109. template <> inline
  110. bool keepSmall<SortMode::Bubbletonic>(mpi_id_t node, mpi_id_t partner, [[maybe_unused]] size_t depth) {
  111. if (node == partner)
  112. throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
  113. return (node < partner);
  114. }
  115. /*!
  116. * Predicate to check if a node keeps the small numbers during the bitonic sort network exchange.
  117. *
  118. * @param node [mpi_id_t] The node for which we check
  119. * @param partner [mpi_id_t] The partner of the data exchange
  120. * @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
  121. * @return [bool] True if the node should keep the small values, false otherwise
  122. */
  123. template <> inline
  124. bool keepSmall<SortMode::Bitonic>(mpi_id_t node, mpi_id_t partner, size_t depth) {
  125. if (node == partner)
  126. throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
  127. return ascending<SortMode::Bitonic>(node, depth) == (node < partner);
  128. }
  129. /*!
  130. * Predicate to check if the node is active in the current iteration of the bubbletonic
  131. * sort exchange.
  132. *
  133. * @param node [mpi_id_t] The node to check
  134. * @param nodes [size_t] The total number of nodes
  135. * @return [bool] True if the node is active, false otherwise
  136. */
  137. bool isActive(mpi_id_t node, size_t nodes);
  138. /*
  139. * ============================== Data utilities ==============================
  140. */
  141. /*!
  142. * Sort a range using the build-in O(Nlog(N)) algorithm
  143. *
  144. * @tparam RangeT A range type with random access iterator
  145. *
  146. * @param data [RangeT] The data to be sorted
  147. * @param ascending [bool] Flag to indicate the sorting order
  148. */
  149. template<typename RangeT>
  150. void fullSort(RangeT& data, bool ascending) noexcept {
  151. // Use introsort from stdlib++ here, unless ... __gnu_parallel
  152. if (ascending) {
  153. __gnu_parallel::sort(data.begin(), data.end(), std::less<>());
  154. }
  155. else {
  156. __gnu_parallel::sort(data.begin(), data.end(), std::greater<>());
  157. }
  158. if (config.exchangeOpt)
  159. updateMinMax(localStat, data);
  160. }
  161. /*!
  162. * Core functionality of sort for shadowed buffer types using
  163. * the "elbow sort" algorithm.
  164. *
  165. * @note:
  166. * This algorithm can not work "in place".
  167. * We use the active buffer as source and the shadow as target.
  168. * At the end we switch which buffer is active and which is the shadow.
  169. * @note
  170. * This is the core functionality. Use the elbowSort() function instead
  171. *
  172. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  173. * @tparam CompT A Comparison type for binary operation comparisons
  174. *
  175. * @param data [ShadowedDataT] The data to sort
  176. * @param ascending [bool] Flag to indicate the sorting order
  177. * @param comp [CompT] The binary operator object
  178. */
  179. template<typename ShadowedDataT, typename CompT>
  180. void elbowSortCore(ShadowedDataT& data, bool ascending, CompT comp) noexcept {
  181. auto& active = data.getActive(); // Get the source vector (the data to sort)
  182. auto& shadow = data.getShadow(); // Get the target vector (the sorted data)
  183. size_t N = data.size(); // The total size is the same or both vectors
  184. size_t left = std::distance(
  185. active.begin(),
  186. (ascending) ?
  187. std::min_element(active.begin(), active.end()) :
  188. std::max_element(active.begin(), active.end())
  189. ); // start 'left' from elbow of the bitonic
  190. size_t right = (left == N-1) ? 0 : left + 1;
  191. // Walk in opposite directions from elbow and insert-sort to target vector
  192. for (size_t i = 0 ; i<N ; ++i) {
  193. if (comp(active[left], active[right])) {
  194. shadow[i] = active[left];
  195. left = (left == 0) ? N-1 : left -1; // cycle decrease
  196. }
  197. else {
  198. shadow[i] = active[right];
  199. right = (right + 1) % N; // cycle increase
  200. }
  201. }
  202. data.switch_active(); // Switch active-shadow buffers
  203. }
  204. /*!
  205. * Sort a shadowed buffer using the "elbow sort" algorithm.
  206. *
  207. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  208. *
  209. * @param data [ShadowedDataT] The data to sort
  210. * @param ascending [bool] Flag to indicate the sorting order
  211. */
  212. template<typename ShadowedDataT>
  213. void elbowSort(ShadowedDataT& data, bool ascending) noexcept {
  214. if (ascending)
  215. elbowSortCore(data, ascending, std::less<>());
  216. else
  217. elbowSortCore(data, ascending, std::greater<>());
  218. }
  219. /*!
  220. * Predicate for exchange optimization. Returns true only if an exchange between partners is needed.
  221. * In order to do that we exchange min and max statistics of the partner's data.
  222. *
  223. * @tparam StatT Statistics data type (for min-max)
  224. *
  225. * @param lstat [const StatT] Reference to the local statistic data
  226. * @param rstat [StatT] Reference to the remote statistic data to fill
  227. * @param part [mpi_id_t] The partner for the exchange
  228. * @param tag [int] The tag to use for the exchange of stats
  229. * @param keepSmall [bool] Flag to indicate if the local thread keeps the small ro the large values
  230. * @return True if we need data exchange, false otherwise
  231. */
  232. template<typename StatT>
  233. bool needsExchange(const StatT& lstat, StatT& rstat, mpi_id_t part, int tag, bool keepSmall) {
  234. timeCall(Timer_exchange, mpi.exchange_it, lstat, rstat, part, tag);
  235. return (keepSmall) ?
  236. rstat.min < lstat.max // Lmin: rstat.min - Smax: lstat.max
  237. : lstat.min < rstat.max; // Lmin: lstat.min - Smax: rstat.max
  238. }
  239. /*!
  240. * Update stats utility
  241. *
  242. * @tparam RangeT A range type with random access iterator
  243. * @tparam StatT Statistics data type (for min-max)
  244. *
  245. * @param stat [StatT] Reference to the statistic data to update
  246. * @param data [const RangeT] Reference to the sequence to extract stats from
  247. */
  248. template<typename RangeT, typename StatT>
  249. void updateMinMax(StatT& stat, const RangeT& data) noexcept {
  250. auto [min, max] = std::minmax_element(data.begin(), data.end());
  251. stat.min = *min;
  252. stat.max = *max;
  253. }
  254. /*!
  255. * Takes two sequences and selects either the larger or the smaller items
  256. * in one-to-one comparison between them. If the initial sequences are bitonic, then
  257. * the result is a bitonic sequence too!
  258. *
  259. * @tparam ValueT The underlying type of the sequences
  260. *
  261. * @param local [ValueT*] Pointer to the local sequence
  262. * @param remote [const ValueT*] Pointer to the remote sequence (copied locally by MPI)
  263. * @param count [size_t] The number of items to process
  264. * @param keepSmall [bool] Flag to indicate if we keep the small items in local sequence
  265. */
  266. template<typename ValueT>
  267. void keepMinOrMax(ValueT* local, const ValueT* remote, size_t count, bool keepSmall) noexcept {
  268. std::transform(
  269. local, local + count,
  270. remote,
  271. local,
  272. [&keepSmall](const ValueT& a, const ValueT& b){
  273. return (keepSmall) ? std::min(a, b) : std::max(a, b);
  274. });
  275. }
  276. /*
  277. * ============================== Sort algorithms ==============================
  278. */
  279. /*!
  280. * A small tag generator tool to provide consistent encoding to tag communication
  281. *
  282. * @param depth The current algorithmic depth[bitonic] of the communication, if any
  283. * @param step The current step on the current depth
  284. * @param stage The stage of the pipeline.
  285. * @return The tag to use.
  286. *
  287. * @note
  288. * In case we call this function outside of the pipeline loop, we can ommit
  289. * @c stage argument and use the return value as starting tag for every communication
  290. * of the pipeline loop. We need to increase the tags for each communication of
  291. * the pipeline loop though!
  292. */
  293. size_t tagGenerator(size_t depth, size_t step, size_t stage = 0);
  294. /*!
  295. * An exchange functionality to support both Bubbletonic and Bitonic sort algorithms.
  296. *
  297. * @note
  298. * In case of pipeline request it switches to non-blocking MPI communication for
  299. * pipelining min-max process with mpi data exchange
  300. *
  301. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  302. *
  303. * @param data [ShadowedDataT&] Reference to the data to exchange
  304. * @param partner [mpi_id_t] The partner for the exchange
  305. * @param keepSmall [bool] Flag to indicate if we keep the small values
  306. * @param tag [int] The init tag to use for the loop.
  307. *
  308. * @note
  309. * The @c tag is increased inside the pipeline loop for each different data exchange
  310. */
  311. template<typename ShadowedDataT>
  312. void exchange(ShadowedDataT& data, mpi_id_t partner, bool keepSmall, int tag) {
  313. using Value_t = typename ShadowedDataT::value_type;
  314. // Init counters and pointers
  315. Value_t* active = data.getActive().data();
  316. Value_t* shadow = data.getShadow().data();
  317. size_t count = data.size() / config.pipeline;
  318. if (config.pipeline > 1) {
  319. // Pipeline case - use async MPI
  320. Timer_exchange.start();
  321. mpi.exchange_start(active, shadow, count, partner, tag);
  322. for (size_t stage = 0; stage < config.pipeline; active += count, shadow += count) {
  323. // Wait previous chunk
  324. mpi.exchange_wait();
  325. Timer_exchange.stop();
  326. if (++stage < config.pipeline) {
  327. // Start next chunk if there is a next one
  328. Timer_exchange.start();
  329. mpi.exchange_start(active + count, shadow + count, count, partner, ++tag);
  330. }
  331. // process the arrived data
  332. timeCall(Timer_minmax, keepMinOrMax, active, shadow, count, keepSmall);
  333. }
  334. }
  335. else {
  336. // No pipeline - use blocking MPI
  337. timeCall(Timer_exchange, mpi.exchange, active, shadow, count, partner, tag);
  338. timeCall(Timer_minmax, keepMinOrMax, active, shadow, count, keepSmall);
  339. }
  340. if (config.exchangeOpt)
  341. updateMinMax(localStat, data);
  342. }
  343. /*!
  344. * A distributed version of the Bubbletonic sort algorithm.
  345. *
  346. * @note
  347. * Each MPI process should run an instance of this function.
  348. *
  349. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  350. *
  351. * @param data [ShadowedDataT] The local to MPI process data to sort
  352. * @param Processes [mpi_id_t] The total number of MPI processes
  353. * @param rank [mpi_id_t] The current process id
  354. */
  355. template<typename ShadowedDataT>
  356. void distBubbletonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
  357. // Initially sort to create a half part of a bitonic sequence
  358. timeCall(Timer_fullSort, fullSort, data, ascending<SortMode::Bubbletonic>(rank, 0));
  359. // Sort network (O(N) iterations)
  360. for (size_t step = 0; step < static_cast<size_t>(Processes); ++step) {
  361. // Find out exchange configuration
  362. auto part = partner<SortMode::Bubbletonic>(rank, step);
  363. auto ks = keepSmall<SortMode::Bubbletonic>(rank, part, Processes);
  364. if ( isActive(rank, Processes) &&
  365. isActive(part, Processes) ) {
  366. // Exchange with partner, keep nim-or-max and sort - O(N)
  367. int tag = static_cast<int>(tagGenerator(0, step));
  368. if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) {
  369. exchange(data, part, ks, tag);
  370. timeCall(Timer_elbowSort, elbowSort, data, ascending<SortMode::Bubbletonic>(rank, Processes));
  371. }
  372. }
  373. }
  374. // Invert if the node was descending.
  375. if (!ascending<SortMode::Bubbletonic>(rank, 0)) {
  376. elbowSort(data, true);
  377. }
  378. }
  379. /*!
  380. * A distributed version of the Bitonic sort algorithm.
  381. *
  382. * @note
  383. * Each MPI process should run an instance of this function.
  384. *
  385. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  386. *
  387. * @param data [ShadowedDataT] The local to MPI process data to sort
  388. * @param Processes [mpi_id_t] The total number of MPI processes
  389. * @param rank [mpi_id_t] The current process id
  390. */
  391. template<typename ShadowedDataT>
  392. void distBitonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
  393. // Initially sort to create a half part of a bitonic sequence
  394. timeCall(Timer_fullSort, fullSort, data, ascending<SortMode::Bitonic>(rank, 0));
  395. // Run through sort network using elbow-sort ( O(LogN * LogN) iterations )
  396. auto p = static_cast<uint32_t>(std::log2(Processes));
  397. for (size_t depth = 1; depth <= p; ++depth) {
  398. for (size_t step = depth; step > 0;) {
  399. --step;
  400. // Find out exchange configuration
  401. auto part = partner<SortMode::Bitonic>(rank, step);
  402. auto ks = keepSmall<SortMode::Bitonic>(rank, part, depth);
  403. // Exchange with partner, keep nim-or-max
  404. int tag = static_cast<int>(tagGenerator(depth, step));
  405. if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) {
  406. exchange(data, part, ks, tag);
  407. }
  408. }
  409. // sort - O(N)
  410. timeCall(Timer_elbowSort, elbowSort, data, ascending<SortMode::Bitonic>(rank, depth));
  411. }
  412. }
  413. #endif //DISTBITONIC_H_