AUTH's THMMY "Parallel and distributed systems" course assignments.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

319 lines
10 KiB

  1. /*!
  2. * \file
  3. * \brief Main application file for PDS HW2 (MPI)
  4. *
  5. * \author
  6. * Christos Choutouridis AEM:8997
  7. * <cchoutou@ece.auth.gr>
  8. */
  9. #include <exception>
  10. #include <iostream>
  11. #include <algorithm>
  12. #include <random>
  13. #include "utils.hpp"
  14. #include "config.h"
  15. #include "distsort.hpp"
  16. // Global session data
  17. config_t config;
  18. MPI_t<> mpi;
  19. distBuffer_t Data;
  20. Log logger;
  21. distStat_t localStat, remoteStat;
  22. // Mersenne seeded from hw if possible. range: [type_min, type_max]
  23. std::random_device rd;
  24. std::mt19937 gen(rd());
  25. //! Performance timers for each one of the "costly" functions
  26. Timing Timer_total;
  27. Timing Timer_fullSort;
  28. Timing Timer_exchange;
  29. Timing Timer_minmax;
  30. Timing Timer_elbowSort;
  31. //! Init timing objects for extra rounds
  32. void measurements_init() {
  33. if (config.perf > 1) {
  34. Timer_total.init(config.perf);
  35. Timer_fullSort.init(config.perf);
  36. Timer_exchange.init(config.perf);
  37. Timer_minmax.init(config.perf);
  38. Timer_elbowSort.init(config.perf);
  39. }
  40. }
  41. //! iterate ot the next round of measurements for all measurement objects
  42. void measurements_next() {
  43. if (config.perf > 1) {
  44. Timer_total.next();
  45. Timer_fullSort.next();
  46. Timer_exchange.next();
  47. Timer_minmax.next();
  48. Timer_elbowSort.next();
  49. }
  50. }
  51. /*!
  52. * A small command line argument parser
  53. * \return The status of the operation
  54. */
  55. bool get_options(int argc, char* argv[]){
  56. bool status =true;
  57. // iterate over the passed arguments
  58. for (int i=1 ; i<argc ; ++i) {
  59. std::string arg(argv[i]); // get current argument
  60. if (arg == "-q" || arg == "--array-size") {
  61. if (i+1 < argc) {
  62. config.arraySize = 1 << atoi(argv[++i]);
  63. }
  64. else {
  65. status = false;
  66. }
  67. }
  68. else if (arg == "-e" || arg == "--exchange-opt") {
  69. config.exchangeOpt = true;
  70. }
  71. else if (arg == "--pipeline") {
  72. if (i+1 < argc) {
  73. auto stages = atoi(argv[++i]);
  74. if (isPowerOfTwo(stages) && stages <= static_cast<int>(MAX_PIPELINE_SIZE))
  75. config.pipeline = stages;
  76. else
  77. status = false;
  78. }
  79. else {
  80. status = false;
  81. }
  82. }
  83. else if (arg == "--validation") {
  84. config.validation = true;
  85. }
  86. else if (arg == "--perf") {
  87. if (i+1 < argc) {
  88. config.perf = atoi(argv[++i]);
  89. }
  90. else {
  91. status = false;
  92. }
  93. }
  94. else if (arg == "--ndebug") {
  95. config.ndebug = true;
  96. }
  97. else if (arg == "-v" || arg == "--verbose") {
  98. config.verbose = true;
  99. }
  100. else if (arg == "--version") {
  101. std::cout << "distbitonic/distbubbletonic - A distributed sort utility\n";
  102. std::cout << "version: " << version << "\n\n";
  103. exit(0);
  104. }
  105. else if (arg == "-h" || arg == "--help") {
  106. std::cout << "distbitonic/distbubbletonic - A distributed sort utility\n\n";
  107. std::cout << " distbitonic -q <N> [-e] [-p | --pipeline <N>] [--validation] [--perf <N>] [--ndebug] [-v]\n";
  108. std::cout << " distbitonic -h\n";
  109. std::cout << " distbubbletonic -q <N> [-e] [-p | --pipeline <N>] [--validation] [--perf <N> ] [--ndebug] [-v]\n";
  110. std::cout << " distbubbletonic -h\n";
  111. std::cout << '\n';
  112. std::cout << "Options:\n\n";
  113. std::cout << " -q | --array-size <N>\n";
  114. std::cout << " Selects the array size according to size = 2^N\n\n";
  115. std::cout << " -e | --exchange-opt\n";
  116. std::cout << " Request an MPI data exchange optimization \n\n";
  117. std::cout << " -p <N> | --pipeline <N>\n";
  118. std::cout << " Request a pipeline of <N> stages for exchange-minmax\n";
  119. std::cout << " N must be power of 2 up to " << MAX_PIPELINE_SIZE << "\n\n";
  120. std::cout << " --validation\n";
  121. std::cout << " Request a full validation at the end, performed by process rank 0\n\n";
  122. std::cout << " --perf <N> \n";
  123. std::cout << " Enable performance timing measurements and prints, and repeat\n";
  124. std::cout << " the sorting <N> times.\n\n";
  125. std::cout << " --ndebug\n";
  126. std::cout << " Skip debug breakpoint when on debug build.\n\n";
  127. std::cout << " -v | --verbose\n";
  128. std::cout << " Request a more verbose output to stdout.\n\n";
  129. std::cout << " -h | --help\n";
  130. std::cout << " Prints this and exit.\n\n";
  131. std::cout << " --version\n";
  132. std::cout << " Prints version and exit.\n\n";
  133. std::cout << "Examples:\n\n";
  134. std::cout << " mpirun -np 4 distbitonic -q 24\n";
  135. std::cout << " Runs distbitonic in 4 MPI processes with 2^24 array points each\n\n";
  136. std::cout << " mpirun -np 16 distbubbletonic -q 20\n";
  137. std::cout << " Runs distbubbletonic in 16 MPI processes with 2^20 array points each\n\n";
  138. exit(0);
  139. }
  140. else { // parse error
  141. std::cout << "Invocation error. Try -h for details.\n";
  142. status = false;
  143. }
  144. }
  145. return status;
  146. }
  147. /*!
  148. * A simple validator for the entire distributed process
  149. *
  150. * @tparam ShadowedDataT A Shadowed buffer type with random access iterator.
  151. *
  152. * @param data [ShadowedDataT] The local to MPI process
  153. * @param Processes [mpi_id_t] The total number of MPI processes
  154. * @param rank [mpi_id_t] The current process id
  155. *
  156. * @return [bool] True if all are sorted and in total ascending order
  157. */
  158. template<typename ShadowedDataT>
  159. bool validator(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) {
  160. using value_t = typename ShadowedDataT::value_type;
  161. bool ret = true; // Have faith!
  162. // Local results
  163. value_t lmin = data.front();
  164. value_t lmax = data.back();
  165. value_t lsort = static_cast<value_t>(std::is_sorted(data.begin(), data.end()));
  166. // Gather min/max/sort to rank 0
  167. std::vector<value_t> mins(Processes);
  168. std::vector<value_t> maxes(Processes);
  169. std::vector<value_t> sorts(Processes);
  170. MPI_Datatype datatype = MPI_TypeMapper<value_t>::getType();
  171. MPI_Gather(&lmin, 1, datatype, mins.data(), 1, datatype, 0, MPI_COMM_WORLD);
  172. MPI_Gather(&lmax, 1, datatype, maxes.data(), 1, datatype, 0, MPI_COMM_WORLD);
  173. MPI_Gather(&lsort, 1, datatype, sorts.data(), 1, datatype, 0, MPI_COMM_WORLD);
  174. // Check all results
  175. if (rank == 0) {
  176. for (mpi_id_t r = 1; r < Processes; ++r) {
  177. if (sorts[r] == 0)
  178. ret = false;
  179. if (maxes[r - 1] > mins[r])
  180. ret = false;
  181. }
  182. }
  183. return ret;
  184. }
  185. /*!
  186. * Initializes the environment, must called from each process
  187. *
  188. * @param argc [int*] POINTER to main's argc argument
  189. * @param argv [char***] POINTER to main's argv argument
  190. */
  191. void init(int* argc, char*** argv) {
  192. // try to read command line
  193. if (!get_options(*argc, *argv))
  194. exit(1);
  195. // Initialize MPI environment
  196. mpi.init(argc, argv);
  197. logger << "MPI environment initialized." << " Rank: " << mpi.rank() << " Size: " << mpi.size()
  198. << logger.endl;
  199. #if defined DEBUG
  200. #if defined TESTING
  201. /*
  202. * In case of a debug build we will wait here until sleep_wait
  203. * will reset via debugger. In order to do that the user must attach
  204. * debugger to all processes. For example:
  205. * $> mpirun -np 2 ./<program path>
  206. * $> ps aux | grep <program>
  207. * $> gdb <program> <PID1>
  208. * $> gdb <program> <PID2>
  209. */
  210. volatile bool sleep_wait = false;
  211. #else
  212. volatile bool sleep_wait = true;
  213. #endif
  214. while (sleep_wait && !config.ndebug)
  215. sleep(1);
  216. #endif
  217. // Prepare vector and timing data
  218. Data.resize(config.arraySize);
  219. measurements_init();
  220. }
  221. #if !defined TESTING
  222. /*!
  223. * @return Returns 0, but.... we may throw or exit(0) / exit(1)
  224. */
  225. int main(int argc, char* argv[]) try {
  226. // Init everything
  227. init(&argc, &argv);
  228. for (size_t it = 0 ; it < config.perf ; ++it) {
  229. // Initialize local data
  230. logger << "Initialize local array of " << config.arraySize << " elements" << logger.endl;
  231. std::uniform_int_distribution<distValue_t > dis(
  232. std::numeric_limits<distValue_t>::min(),
  233. std::numeric_limits<distValue_t>::max()
  234. );
  235. std::generate(Data.begin(), Data.end(), [&]() { return dis(gen); });
  236. // Run distributed sort
  237. if (mpi.rank() == 0)
  238. logger << "Starting distributed sorting ... ";
  239. Timer_total.start();
  240. #if CODE_VERSION == BUBBLETONIC
  241. distBubbletonic(Data, mpi.size(), mpi.rank());
  242. #else
  243. distBitonic(Data, mpi.size(), mpi.rank());
  244. #endif
  245. Timer_total.stop();
  246. measurements_next();
  247. if (mpi.rank() == 0)
  248. logger << " Done." << logger.endl;
  249. }
  250. // Print-outs and validation
  251. if (config.perf > 1) {
  252. Timing::print_duration(Timer_total.median(), "Total ", mpi.rank());
  253. Timing::print_duration(Timer_fullSort.median(), "Full-Sort ", mpi.rank());
  254. Timing::print_duration(Timer_exchange.median(), "Exchange ", mpi.rank());
  255. Timing::print_duration(Timer_minmax.median(), "Min-Max ", mpi.rank());
  256. Timing::print_duration(Timer_elbowSort.median(),"Elbow-Sort", mpi.rank());
  257. }
  258. if (config.validation) {
  259. // If requested, we have the chance to fail!
  260. if (mpi.rank() == 0)
  261. std::cout << "[Validation] Results validation ...";
  262. bool val = validator(Data, mpi.size(), mpi.rank());
  263. if (mpi.rank() == 0)
  264. std::cout << ((val) ? "\x1B[32m [PASSED] \x1B[0m\n" : " \x1B[32m [FAILED] \x1B[0m\n");
  265. }
  266. mpi.finalize();
  267. return 0;
  268. }
  269. catch (std::exception& e) {
  270. //we probably pollute the user's screen. Comment `cerr << ...` if you don't like it.
  271. std::cerr << "Error: " << e.what() << '\n';
  272. exit(1);
  273. }
  274. #else
  275. #include <gtest/gtest.h>
  276. #include <exception>
  277. /*!
  278. * The testing version of our program
  279. */
  280. GTEST_API_ int main(int argc, char **argv) try {
  281. testing::InitGoogleTest(&argc, argv);
  282. return RUN_ALL_TESTS();
  283. }
  284. catch (std::exception& e) {
  285. std::cout << "Exception: " << e.what() << '\n';
  286. }
  287. #endif