AUTH's THMMY "Parallel and distributed systems" course assignments.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

389 lines
14 KiB

  1. /*!
  2. * \file
  3. * \brief Distributed sort implementation header
  4. *
  5. * \author
  6. * Christos Choutouridis AEM:8997
  7. * <cchoutou@ece.auth.gr>
  8. */
  9. #ifndef DISTBITONIC_H_
  10. #define DISTBITONIC_H_
  11. #include <vector>
  12. #include <algorithm>
  13. #include <parallel/algorithm>
  14. #include <cmath>
  15. #include <cstdint>
  16. #if !defined DEBUG
  17. #define NDEBUG
  18. #endif
  19. #include <cassert>
  20. #include "utils.hpp"
  21. extern Timing TfullSort, Texchange, Tminmax, TelbowSort; // make timers public
  22. /*!
  23. * Enumerator for the different versions of the sorting method
  24. */
  25. enum class SortMode {
  26. Bubbletonic, //!< The v0.5 of the algorithm where we use a bubble-sort like approach
  27. Bitonic //!< The v1.0 of the algorithm where we use the bitonic data-exchange approach
  28. };
  29. /*
  30. * ============================== Sort utilities ==============================
  31. */
  32. /*!
  33. * The primary function template of ascending(). It is DISABLED since , it is explicitly specialized
  34. * for each of the \c SortMode
  35. */
  36. template <SortMode Mode> inline bool ascending(mpi_id_t, [[maybe_unused]] size_t) noexcept = delete;
  37. /*!
  38. * Returns the ascending or descending configuration of the node's sequence based on
  39. * the current node (MPI process) and the depth of the sorting network
  40. *
  41. * @param node [mpi_id_t] The current node (MPI process)
  42. * @return [bool] True if we need ascending configuration, false otherwise
  43. */
  44. template <> inline
  45. bool ascending<SortMode::Bubbletonic>(mpi_id_t node, [[maybe_unused]] size_t depth) noexcept {
  46. return (node % 2) == 0;
  47. }
  48. /*!
  49. * Returns the ascending or descending configuration of the node's sequence based on
  50. * the current node (MPI process) and the depth of the sorting network
  51. *
  52. * @param node [mpi_id_t] The current node (MPI process)
  53. * @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
  54. * @return [bool] True if we need ascending configuration, false otherwise
  55. */
  56. template <> inline
  57. bool ascending<SortMode::Bitonic>(mpi_id_t node, size_t depth) noexcept {
  58. return !(node & (1 << depth));
  59. }
  60. /*!
  61. * The primary function template of partner(). It is DISABLED since , it is explicitly specialized
  62. * for each of the \c SortMode
  63. */
  64. template <SortMode Mode> inline mpi_id_t partner(mpi_id_t, size_t) noexcept = delete;
  65. /*!
  66. * Returns the node's partner for data exchange during the sorting network iterations
  67. * of Bubbletonic
  68. *
  69. * @param node [mpi_id_t] The current node
  70. * @param step [size_t] The step of the sorting network
  71. * @return [mpi_id_t] The node id of the partner for data exchange
  72. */
  73. template <> inline
  74. mpi_id_t partner<SortMode::Bubbletonic>(mpi_id_t node, size_t step) noexcept {
  75. //return (node % 2 == step % 2) ? node + 1 : node - 1;
  76. return (((node+step) % 2) == 0) ? node + 1 : node - 1;
  77. }
  78. /*!
  79. * Returns the node's partner for data exchange during the sorting network iterations
  80. * of Bitonic
  81. *
  82. * @param node [mpi_id_t] The current node
  83. * @param step [size_t] The step of the sorting network
  84. * @return [mpi_id_t] The node id of the partner for data exchange
  85. */
  86. template <> inline
  87. mpi_id_t partner<SortMode::Bitonic>(mpi_id_t node, size_t step) noexcept {
  88. return (node ^ (1 << step));
  89. }
  90. /*!
  91. * The primary function template of keepSmall(). It is DISABLED since , it is explicitly specialized
  92. * for each of the \c SortMode
  93. */
  94. template<SortMode Mode> inline bool keepSmall(mpi_id_t, mpi_id_t, [[maybe_unused]] size_t) = delete;
  95. /*!
  96. * Predicate to check if a node keeps the small numbers during the bubbletonic sort network exchange.
  97. *
  98. * @param node [mpi_id_t] The node for which we check
  99. * @param partner [mpi_id_t] The partner of the data exchange
  100. * @return [bool] True if the node should keep the small values, false otherwise
  101. */
  102. template <> inline
  103. bool keepSmall<SortMode::Bubbletonic>(mpi_id_t node, mpi_id_t partner, [[maybe_unused]] size_t depth) {
  104. if (node == partner)
  105. throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
  106. return (node < partner);
  107. }
  108. /*!
  109. * Predicate to check if a node keeps the small numbers during the bitonic sort network exchange.
  110. *
  111. * @param node [mpi_id_t] The node for which we check
  112. * @param partner [mpi_id_t] The partner of the data exchange
  113. * @param depth [size_t] The total depth of the sorting network (same for each step for a given network)
  114. * @return [bool] True if the node should keep the small values, false otherwise
  115. */
  116. template <> inline
  117. bool keepSmall<SortMode::Bitonic>(mpi_id_t node, mpi_id_t partner, size_t depth) {
  118. if (node == partner)
  119. throw std::runtime_error("(keepSmall) Node and Partner can not be the same\n");
  120. return ascending<SortMode::Bitonic>(node, depth) == (node < partner);
  121. }
  122. /*!
  123. * Predicate to check if the node is active in the current iteration of the bubbletonic
  124. * sort exchange.
  125. *
  126. * @param node [mpi_id_t] The node to check
  127. * @param nodes [size_t] The total number of nodes
  128. * @return [bool] True if the node is active, false otherwise
  129. */
  130. bool isActive(mpi_id_t node, size_t nodes);
  131. /*
  132. * ============================== Data utilities ==============================
  133. */
  134. /*!
  135. * Sort a range using the build-in O(Nlog(N)) algorithm
  136. *
  137. * @tparam RangeT A range type with random access iterator
  138. *
  139. * @param data [RangeT] The data to be sorted
  140. * @param ascending [bool] Flag to indicate the sorting order
  141. */
  142. template<typename RangeT>
  143. void fullSort(RangeT& data, bool ascending) noexcept {
  144. // Use introsort from stdlib++ here, unless ... __gnu_parallel
  145. if (ascending) {
  146. __gnu_parallel::sort(data.begin(), data.end(), std::less<>());
  147. }
  148. else {
  149. __gnu_parallel::sort(data.begin(), data.end(), std::greater<>());
  150. }
  151. }
  152. /*!
  153. * Core functionality of sort for shadowed buffer types using
  154. * the "elbow sort" algorithm.
  155. *
  156. * @note:
  157. * This algorithm can not work "in place".
  158. * We use the active buffer as source and the shadow as target.
  159. * At the end we switch which buffer is active and which is the shadow.
  160. * @note
  161. * This is the core functionality. Use the elbowSort() function instead
  162. *
  163. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  164. * @tparam CompT A Comparison type for binary operation comparisons
  165. *
  166. * @param data [ShadowedDataT] The data to sort
  167. * @param ascending [bool] Flag to indicate the sorting order
  168. * @param comp [CompT] The binary operator object
  169. */
  170. template<typename ShadowedDataT, typename CompT>
  171. void elbowSortCore(ShadowedDataT& data, bool ascending, CompT comp) noexcept {
  172. auto& active = data.getActive(); // Get the source vector (the data to sort)
  173. auto& shadow = data.getShadow(); // Get the target vector (the sorted data)
  174. size_t N = data.size(); // The total size is the same or both vectors
  175. size_t left = std::distance(
  176. active.begin(),
  177. (ascending) ?
  178. std::min_element(active.begin(), active.end()) :
  179. std::max_element(active.begin(), active.end())
  180. ); // start 'left' from elbow of the bitonic
  181. size_t right = (left == N-1) ? 0 : left + 1;
  182. // Walk in opposite directions from elbow and insert-sort to target vector
  183. for (size_t i = 0 ; i<N ; ++i) {
  184. if (comp(active[left], active[right])) {
  185. shadow[i] = active[left];
  186. left = (left == 0) ? N-1 : left -1; // cycle decrease
  187. }
  188. else {
  189. shadow[i] = active[right];
  190. right = (right + 1) % N; // cycle increase
  191. }
  192. }
  193. data.switch_active(); // Switch active-shadow buffers
  194. }
  195. /*!
  196. * Sort a shadowed buffer using the "elbow sort" algorithm.
  197. *
  198. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  199. *
  200. * @param data [ShadowedDataT] The data to sort
  201. * @param ascending [bool] Flag to indicate the sorting order
  202. */
  203. template<typename ShadowedDataT>
  204. void elbowSort(ShadowedDataT& data, bool ascending) noexcept {
  205. if (ascending)
  206. elbowSortCore(data, ascending, std::less<>());
  207. else
  208. elbowSortCore(data, ascending, std::greater<>());
  209. }
  210. /*!
  211. * Takes two sequences and selects either the larger or the smaller items
  212. * in one-to-one comparison between them. If the initial sequences are bitonic, then
  213. * the result is a bitonic sequence too!
  214. *
  215. * @tparam ValueT The underlying type of the sequences
  216. *
  217. * @param local [ValueT*] Pointer to the local sequence
  218. * @param remote [const ValueT*] Pointer to the remote sequence (copied locally by MPI)
  219. * @param count [size_t] The number of items to process
  220. * @param keepSmall [bool] Flag to indicate if we keep the small items in local sequence
  221. */
  222. template<typename ValueT>
  223. void keepMinOrMax(ValueT* local, const ValueT* remote, size_t count, bool keepSmall) noexcept {
  224. std::transform(
  225. local, local + count,
  226. remote,
  227. local,
  228. [&keepSmall](const ValueT& a, const ValueT& b){
  229. return (keepSmall) ? std::min(a, b) : std::max(a, b);
  230. });
  231. }
  232. /*
  233. * ============================== Sort algorithms ==============================
  234. */
  235. /*!
  236. * A small tag generator tool to provide consistent encoding to tag communication
  237. *
  238. * @param depth The current algorithmic depth[bitonic] of the communication, if any
  239. * @param step The current step on the current depth
  240. * @param stage The stage of the pipeline.
  241. * @return The tag to use.
  242. *
  243. * @note
  244. * In case we call this function outside of the pipeline loop, we can ommit
  245. * @c stage argument and use the return value as starting tag for every communication
  246. * of the pipeline loop. We need to increase the tags for each communication of
  247. * the pipeline loop though!
  248. */
  249. size_t tagGenerator(size_t depth, size_t step, size_t stage = 0);
  250. /*!
  251. * A pipeline loop for mixing min-max process with mpi data exchange
  252. *
  253. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  254. *
  255. * @param data [ShadowedDataT&] Reference to the data to exchange
  256. * @param partner [mpi_id_t] The partner for the exchange
  257. * @param keepSmall [bool] Flag to indicate if we keep the small values
  258. * @param tag [int] The init tag to use for the loop.
  259. *
  260. * @note
  261. * The @c tag is increased inside the pipeline loop for each different data exchange
  262. */
  263. template<typename ShadowedDataT>
  264. void exchangePipeline(ShadowedDataT& data, mpi_id_t partner, bool keepSmall, int tag) {
  265. using Value_t = typename ShadowedDataT::value_type;
  266. // Init counters and pointers
  267. size_t count = data.size() / config.pipeline;
  268. Value_t* active = data.getActive().data();
  269. Value_t* shadow = data.getShadow().data();
  270. // Pipeline
  271. Texchange.start();
  272. mpi.exchange_start(active, shadow, count, partner, tag);
  273. for (size_t stage = 0 ; stage < config.pipeline ; active += count, shadow += count) {
  274. // Wait previous chunk
  275. mpi.exchange_wait(); Texchange.stop();
  276. if (++stage < config.pipeline) {
  277. // Start next chunk if there is a next one
  278. Texchange.start();
  279. mpi.exchange_start(active + count, shadow + count, count, partner, ++tag);
  280. }
  281. // process the arrived data
  282. timeCall(Tminmax, keepMinOrMax, active, shadow, count, keepSmall);
  283. }
  284. }
  285. /*!
  286. * A distributed version of the Bubbletonic sort algorithm.
  287. *
  288. * @note
  289. * Each MPI process should run an instance of this function.
  290. *
  291. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  292. *
  293. * @param data [ShadowedDataT] The local to MPI process data to sort
  294. * @param Processes [mpi_id_t] The total number of MPI processes
  295. * @param rank [mpi_id_t] The current process id
  296. */
  297. template<typename ShadowedDataT>
  298. void distBubbletonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
  299. // Initially sort to create a half part of a bitonic sequence
  300. timeCall(TfullSort, fullSort, data, ascending<SortMode::Bubbletonic>(rank, 0));
  301. // Sort network (O(N) iterations)
  302. for (size_t step = 0; step < static_cast<size_t>(Processes); ++step) {
  303. // Find out exchange configuration
  304. auto part = partner<SortMode::Bubbletonic>(rank, step);
  305. auto ks = keepSmall<SortMode::Bubbletonic>(rank, part, Processes);
  306. if ( isActive(rank, Processes) &&
  307. isActive(part, Processes) ) {
  308. // Exchange with partner, keep nim-or-max and sort - O(N)
  309. int tag = static_cast<int>(tagGenerator(0, step));
  310. exchangePipeline(data, part, ks, tag);
  311. timeCall(TelbowSort, elbowSort, data, ascending<SortMode::Bubbletonic>(rank, Processes));
  312. }
  313. }
  314. // Invert if the node was descending.
  315. if (!ascending<SortMode::Bubbletonic>(rank, 0)) {
  316. elbowSort(data, true);
  317. }
  318. }
  319. /*!
  320. * A distributed version of the Bitonic sort algorithm.
  321. *
  322. * @note
  323. * Each MPI process should run an instance of this function.
  324. *
  325. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  326. *
  327. * @param data [ShadowedDataT] The local to MPI process data to sort
  328. * @param Processes [mpi_id_t] The total number of MPI processes
  329. * @param rank [mpi_id_t] The current process id
  330. */
  331. template<typename ShadowedDataT>
  332. void distBitonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
  333. // Initially sort to create a half part of a bitonic sequence
  334. timeCall(TfullSort, fullSort, data, ascending<SortMode::Bitonic>(rank, 0));
  335. // Run through sort network using elbow-sort ( O(LogN * LogN) iterations )
  336. auto p = static_cast<uint32_t>(std::log2(Processes));
  337. for (size_t depth = 1; depth <= p; ++depth) {
  338. for (size_t step = depth; step > 0;) {
  339. --step;
  340. // Find out exchange configuration
  341. auto part = partner<SortMode::Bitonic>(rank, step);
  342. auto ks = keepSmall<SortMode::Bitonic>(rank, part, depth);
  343. // Exchange with partner, keep nim-or-max
  344. int tag = static_cast<int>(tagGenerator(depth, step));
  345. exchangePipeline(data, part, ks, tag);
  346. }
  347. // sort - O(N)
  348. timeCall(TelbowSort, elbowSort, data, ascending<SortMode::Bitonic>(rank, depth));
  349. }
  350. }
  351. #endif //DISTBITONIC_H_