AUTH's THMMY "Parallel and distributed systems" course assignments.
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 
 

517 行
19 KiB

  1. /**
  2. * \file
  3. * \brief Utilities header
  4. *
  5. * \author
  6. * Christos Choutouridis AEM:8997
  7. * <cchoutou@ece.auth.gr>
  8. */
  9. #ifndef UTILS_HPP_
  10. #define UTILS_HPP_
  11. #include <vector>
  12. #include <iostream>
  13. #include <chrono>
  14. #include <unistd.h>
  15. #include <mpi.h>
  16. #include "config.h"
  17. /*!
  18. * Min-Max statistics data for exchange optimization
  19. * @tparam Value_t The underlying data type of the sequence data
  20. */
  21. template <typename Value_t>
  22. struct Stat_t {
  23. using value_type = Value_t; //!< meta-export the type
  24. Value_t min{}; //!< The minimum value of the sequence
  25. Value_t max{}; //!< The maximum value of the sequence
  26. };
  27. //! Application data selection alias
  28. using distStat_t = Stat_t<distValue_t>;
  29. extern distStat_t localStat, remoteStat; // Make stats public
  30. /*
  31. * MPI_<type> dispatcher mechanism
  32. */
  33. template <typename T> struct MPI_TypeMapper { };
  34. template <> struct MPI_TypeMapper<char> { static MPI_Datatype getType() { return MPI_CHAR; } };
  35. template <> struct MPI_TypeMapper<short> { static MPI_Datatype getType() { return MPI_SHORT; } };
  36. template <> struct MPI_TypeMapper<int> { static MPI_Datatype getType() { return MPI_INT; } };
  37. template <> struct MPI_TypeMapper<long> { static MPI_Datatype getType() { return MPI_LONG; } };
  38. template <> struct MPI_TypeMapper<long long> { static MPI_Datatype getType() { return MPI_LONG_LONG; } };
  39. template <> struct MPI_TypeMapper<unsigned char> { static MPI_Datatype getType() { return MPI_UNSIGNED_CHAR; } };
  40. template <> struct MPI_TypeMapper<unsigned short>{ static MPI_Datatype getType() { return MPI_UNSIGNED_SHORT; } };
  41. template <> struct MPI_TypeMapper<unsigned int> { static MPI_Datatype getType() { return MPI_UNSIGNED; } };
  42. template <> struct MPI_TypeMapper<unsigned long> { static MPI_Datatype getType() { return MPI_UNSIGNED_LONG; } };
  43. template <> struct MPI_TypeMapper<unsigned long long> { static MPI_Datatype getType() { return MPI_UNSIGNED_LONG_LONG; } };
  44. template <> struct MPI_TypeMapper<float> { static MPI_Datatype getType() { return MPI_FLOAT; } };
  45. template <> struct MPI_TypeMapper<double> { static MPI_Datatype getType() { return MPI_DOUBLE; } };
  46. /*!
  47. * MPI wrapper type to provide MPI functionality and RAII to MPI as a resource
  48. *
  49. * @tparam TID The MPI type for process id [default: int]
  50. */
  51. template<typename TID = int>
  52. struct MPI_t {
  53. using ID_t = TID; // Export TID type (currently int defined by the standard)
  54. /*!
  55. * Initializes the MPI environment, must called from each process
  56. *
  57. * @param argc [int*] POINTER to main's argc argument
  58. * @param argv [char***] POINTER to main's argv argument
  59. */
  60. void init(int* argc, char*** argv) {
  61. // Initialize the MPI environment
  62. int err;
  63. if ((err = MPI_Init(argc, argv)) != MPI_SUCCESS)
  64. mpi_throw(err, "(MPI) MPI_Init() - ");
  65. initialized_ = true;
  66. // Get the number of processes
  67. int size_value, rank_value;
  68. if ((err = MPI_Comm_size(MPI_COMM_WORLD, &size_value)) != MPI_SUCCESS)
  69. mpi_throw(err, "(MPI) MPI_Comm_size() - ");
  70. if ((err = MPI_Comm_rank(MPI_COMM_WORLD, &rank_value)) != MPI_SUCCESS)
  71. mpi_throw(err, "(MPI) MPI_Comm_rank() - ");
  72. size_ = static_cast<ID_t>(size_value);
  73. rank_ = static_cast<ID_t>(rank_value);
  74. if (size_ > static_cast<ID_t>(MAX_MPI_SIZE))
  75. throw std::runtime_error(
  76. "(MPI) size - Not supported number of nodes [over " + std::to_string(MAX_MPI_SIZE) + "]\n"
  77. );
  78. // Get the name of the processor
  79. char processor_name[MPI_MAX_PROCESSOR_NAME];
  80. int name_len;
  81. if ((err = MPI_Get_processor_name(processor_name, &name_len)) != MPI_SUCCESS)
  82. mpi_throw(err, "(MPI) MPI_Get_processor_name() - ");
  83. name_ = std::string (processor_name, name_len);
  84. }
  85. /*!
  86. * Exchange one data object of type @c T with partner as part of the sorting network of both
  87. * bubbletonic or bitonic sorting algorithms.
  88. *
  89. * This function matches a transmit and a receive in order for fully exchanged the data object
  90. * between current node and partner.
  91. *
  92. * @tparam T The object type
  93. *
  94. * @param local [const T&] Reference to the local object to send
  95. * @param remote [T&] Reference to the object to receive data from partner
  96. * @param partner [mpi_id_t] The partner for the exchange
  97. * @param tag [int] The tag to use for the MPI communication
  98. */
  99. template<typename T>
  100. void exchange_it(const T& local, T& remote, ID_t partner, int tag) {
  101. if (tag < 0)
  102. throw std::runtime_error("(MPI) exchange_it() [tag] - Out of bound");
  103. MPI_Status status;
  104. int err;
  105. if ((err = MPI_Sendrecv(
  106. &local, sizeof(T), MPI_BYTE, partner, tag,
  107. &remote, sizeof(T), MPI_BYTE, partner, tag,
  108. MPI_COMM_WORLD, &status
  109. )) != MPI_SUCCESS)
  110. mpi_throw(err, "(MPI) MPI_Sendrecv() [item] - ");
  111. }
  112. /*!
  113. * Exchange data with partner as part of the sorting network of both bubbletonic or bitonic
  114. * sorting algorithms.
  115. *
  116. * This function matches a transmit and a receive in order for fully exchanged data between
  117. * current node and partner.
  118. *
  119. * @tparam ValueT The value type used in buffer
  120. *
  121. * @param ldata [const ValueT*] Pointer to local data to send
  122. * @param rdata [ValueT*] Pointer to buffer to receive data from partner
  123. * @param count [size_t] The number of data to exchange
  124. * @param partner [mpi_id_t] The partner for the exchange
  125. * @param tag [int] The tag to use for the MPI communication
  126. */
  127. template<typename ValueT>
  128. void exchange(const ValueT* ldata, ValueT* rdata, size_t count, ID_t partner, int tag) {
  129. if (tag < 0)
  130. throw std::runtime_error("(MPI) exchange_data() [tag] - Out of bound");
  131. MPI_Datatype datatype = MPI_TypeMapper<ValueT>::getType();
  132. MPI_Status status;
  133. int err;
  134. if ((err = MPI_Sendrecv(
  135. ldata, count, datatype, partner, tag,
  136. rdata, count, datatype, partner, tag,
  137. MPI_COMM_WORLD, &status
  138. )) != MPI_SUCCESS)
  139. mpi_throw(err, "(MPI) MPI_Sendrecv() [data] - ");
  140. }
  141. /*!
  142. * Initiate a data exchange data with partner using non-blocking Isend-Irecv, as part of the
  143. * sorting network of both bubbletonic or bitonic sorting algorithms.
  144. *
  145. * This function matches a transmit and a receive in order for fully exchanged data between
  146. * current node and partner.
  147. * @note
  148. * This call MUST paired with exchange_wait() for each MPI_t object.
  149. * Calling 2 consecutive exchange_start() for the same MPI_t object is undefined.
  150. *
  151. * @tparam ValueT The value type used in buffers
  152. *
  153. * @param ldata [const ValueT*] Pointer to local data to send
  154. * @param rdata [ValueT*] Pointer to buffer to receive data from partner
  155. * @param count [size_t] The number of data to exchange
  156. * @param partner [mpi_id_t] The partner for the exchange
  157. * @param tag [int] The tag to use for the MPI communication
  158. */
  159. template<typename ValueT>
  160. void exchange_start(const ValueT* ldata, ValueT* rdata, size_t count, ID_t partner, int tag) {
  161. if (tag < 0)
  162. throw std::runtime_error("(MPI) exchange_data() [tag] - Out of bound");
  163. MPI_Datatype datatype = MPI_TypeMapper<ValueT>::getType();
  164. int err;
  165. err = MPI_Isend(ldata, count, datatype, partner, tag, MPI_COMM_WORLD, &handle_tx);
  166. if (err != MPI_SUCCESS)
  167. mpi_throw(err, "(MPI) MPI_Isend() - ");
  168. err = MPI_Irecv(rdata, count, datatype, partner, tag, MPI_COMM_WORLD, &handle_rx);
  169. if (err != MPI_SUCCESS)
  170. mpi_throw(err, "(MPI) MPI_Irecv() - ");
  171. }
  172. /*!
  173. * Block wait for the completion of the previously called exchange_start()
  174. *
  175. * @note
  176. * This call MUST paired with exchange_start() for each MPI_t object.
  177. * Calling 2 consecutive exchange_wait() for the same MPI_t object is undefined.
  178. */
  179. void exchange_wait() {
  180. MPI_Status status;
  181. int err;
  182. if ((err = MPI_Wait(&handle_tx, &status)) != MPI_SUCCESS)
  183. mpi_throw(err, "(MPI) MPI_Wait() [send] - ");
  184. if ((err = MPI_Wait(&handle_rx, &status)) != MPI_SUCCESS)
  185. mpi_throw(err, "(MPI) MPI_Wait() [recv] - ");
  186. }
  187. // Accessors
  188. [[nodiscard]] ID_t rank() const noexcept { return rank_; }
  189. [[nodiscard]] ID_t size() const noexcept { return size_; }
  190. [[nodiscard]] const std::string& name() const noexcept { return name_; }
  191. // Mutators
  192. ID_t rank(ID_t rank) noexcept { return rank_ = rank; }
  193. ID_t size(ID_t size) noexcept { return size_ = size; }
  194. std::string& name(const std::string& name) noexcept { return name_ = name; }
  195. /*!
  196. * Finalized the MPI
  197. */
  198. void finalize() {
  199. // Finalize the MPI environment
  200. initialized_ = false;
  201. MPI_Finalize();
  202. }
  203. //! RAII MPI finalization
  204. ~MPI_t() {
  205. // Finalize the MPI environment even on unexpected errors
  206. if (initialized_)
  207. MPI_Finalize();
  208. }
  209. // Local functionality
  210. private:
  211. /*!
  212. * Throw exception helper. It bundles the prefix msg with the MPI error string retrieved by
  213. * MPI API.
  214. *
  215. * @param err The MPI error code
  216. * @param prefixMsg The prefix text for the exception error message
  217. */
  218. void mpi_throw(int err, const char* prefixMsg) {
  219. char err_msg[MPI_MAX_ERROR_STRING];
  220. int msg_len;
  221. MPI_Error_string(err, err_msg, &msg_len);
  222. throw std::runtime_error(prefixMsg + std::string (err_msg) + '\n');
  223. }
  224. private:
  225. ID_t rank_{}; //!< MPI rank of the process
  226. ID_t size_{}; //!< MPI total size of the execution
  227. std::string name_{}; //!< The name of the local machine
  228. bool initialized_{}; //!< RAII helper flag
  229. MPI_Request handle_tx{}; //!< MPI async exchange handler for Transmission
  230. MPI_Request handle_rx{}; //!< MPI async exchange handler for Receptions
  231. };
  232. /*
  233. * Exported data types
  234. */
  235. extern MPI_t<> mpi;
  236. using mpi_id_t = MPI_t<>::ID_t;
  237. /*!
  238. * @brief A std::vector wrapper with 2 vectors, an active and a shadow.
  239. *
  240. * This type exposes the standard vector functionality of the active vector.
  241. * The shadow can be used when we need to use the vector as mutable
  242. * data in algorithms that can not support "in-place" editing (like elbow-sort for example)
  243. *
  244. * @tparam Value_t the underlying data type of the vectors
  245. */
  246. template <typename Value_t>
  247. struct ShadowedVec_t {
  248. // STL requirements
  249. using value_type = Value_t;
  250. using iterator = typename std::vector<Value_t>::iterator;
  251. using const_iterator = typename std::vector<Value_t>::const_iterator;
  252. using size_type = typename std::vector<Value_t>::size_type;
  253. // Default constructor
  254. ShadowedVec_t() = default;
  255. // Constructor from an std::vector
  256. explicit ShadowedVec_t(const std::vector<Value_t>& vec)
  257. : North(vec), South(), active(north) {
  258. South.resize(North.size());
  259. }
  260. explicit ShadowedVec_t(std::vector<Value_t>&& vec)
  261. : North(std::move(vec)), South(), active(north) {
  262. South.resize(North.size());
  263. }
  264. // Copy assignment operator
  265. ShadowedVec_t& operator=(const ShadowedVec_t& other) {
  266. if (this != &other) { // Avoid self-assignment
  267. North = other.North;
  268. South = other.South;
  269. active = other.active;
  270. }
  271. return *this;
  272. }
  273. // Move assignment operator
  274. ShadowedVec_t& operator=(ShadowedVec_t&& other) noexcept {
  275. if (this != &other) { // Avoid self-assignment
  276. North = std::move(other.North);
  277. South = std::move(other.South);
  278. active = other.active;
  279. // There is no need to zero out other since it is valid but in a non-defined state
  280. }
  281. return *this;
  282. }
  283. // Type accessors
  284. std::vector<Value_t>& getActive() { return (active == north) ? North : South; }
  285. std::vector<Value_t>& getShadow() { return (active == north) ? South : North; }
  286. const std::vector<Value_t>& getActive() const { return (active == north) ? North : South; }
  287. const std::vector<Value_t>& getShadow() const { return (active == north) ? South : North; }
  288. // Swap vectors
  289. void switch_active() { active = (active == north) ? south : north; }
  290. // Dispatch vector functionality to active vector
  291. Value_t& operator[](size_type index) { return getActive()[index]; }
  292. const Value_t& operator[](size_type index) const { return getActive()[index]; }
  293. Value_t& at(size_type index) { return getActive().at(index); }
  294. const Value_t& at(size_type index) const { return getActive().at(index); }
  295. void push_back(const Value_t& value) { getActive().push_back(value); }
  296. void push_back(Value_t&& value) { getActive().push_back(std::move(value)); }
  297. void pop_back() { getActive().pop_back(); }
  298. Value_t& front() { return getActive().front(); }
  299. Value_t& back() { return getActive().back(); }
  300. const Value_t& front() const { return getActive().front(); }
  301. const Value_t& back() const { return getActive().back(); }
  302. iterator begin() { return getActive().begin(); }
  303. const_iterator begin() const { return getActive().begin(); }
  304. iterator end() { return getActive().end(); }
  305. const_iterator end() const { return getActive().end(); }
  306. size_type size() const { return getActive().size(); }
  307. void resize(size_t new_size) {
  308. North.resize(new_size);
  309. South.resize(new_size);
  310. }
  311. void reserve(size_t new_capacity) {
  312. North.reserve(new_capacity);
  313. South.reserve(new_capacity);
  314. }
  315. [[nodiscard]] size_t capacity() const { return getActive().capacity(); }
  316. [[nodiscard]] bool empty() const { return getActive().empty(); }
  317. void clear() { getActive().clear(); }
  318. void swap(std::vector<Value_t>& other) { getActive().swap(other); }
  319. // Comparisons
  320. bool operator== (const ShadowedVec_t& other) { return getActive() == other.getActive(); }
  321. bool operator!= (const ShadowedVec_t& other) { return getActive() != other.getActive(); }
  322. bool operator== (const std::vector<value_type>& other) { return getActive() == other; }
  323. bool operator!= (const std::vector<value_type>& other) { return getActive() != other; }
  324. private:
  325. std::vector<Value_t> North{}; //!< Actual buffer to be used either as active or shadow
  326. std::vector<Value_t> South{}; //!< Actual buffer to be used either as active or shadow
  327. enum {
  328. north, south
  329. } active{north}; //!< Flag to select between North and South buffer
  330. };
  331. /*
  332. * Exported data types
  333. */
  334. using distBuffer_t = ShadowedVec_t<distValue_t>;
  335. extern distBuffer_t Data;
  336. /*!
  337. * A Logger for entire program.
  338. */
  339. struct Log {
  340. struct Endl {} endl; //!< a tag object to to use it as a new line request.
  341. //! We provide logging via << operator
  342. template<typename T>
  343. Log &operator<<(T &&t) {
  344. if (config.verbose) {
  345. if (line_) {
  346. std::cout << "[Log]: " << t;
  347. line_ = false;
  348. } else
  349. std::cout << t;
  350. }
  351. return *this;
  352. }
  353. // overload for special end line handling
  354. Log &operator<<(Endl e) {
  355. (void) e;
  356. if (config.verbose) {
  357. std::cout << '\n';
  358. line_ = true;
  359. }
  360. return *this;
  361. }
  362. private:
  363. bool line_{true};
  364. };
  365. extern Log logger;
  366. /*!
  367. * A small timing utility based on chrono that supports timing rounds
  368. * and returning the median of them. Time can accumulate to the measurement
  369. * for each round.
  370. */
  371. struct Timing {
  372. using Tpoint = std::chrono::steady_clock::time_point;
  373. using Tduration = std::chrono::microseconds;
  374. using microseconds = std::chrono::microseconds;
  375. using milliseconds = std::chrono::milliseconds;
  376. using seconds = std::chrono::seconds;
  377. //! Setup measurement rounds
  378. void init(size_t rounds) {
  379. duration_.resize(rounds);
  380. for (auto& d : duration_)
  381. d = Tduration::zero();
  382. }
  383. //! tool to mark the starting point
  384. Tpoint start() noexcept { return mark_ = std::chrono::steady_clock::now(); }
  385. //! tool to mark the ending point
  386. Tpoint stop() noexcept {
  387. Tpoint now = std::chrono::steady_clock::now();
  388. duration_[current_] += dt(now, mark_);
  389. return now;
  390. }
  391. //! Switch timing slot
  392. void next() noexcept {
  393. ++current_;
  394. current_ %= duration_.size();
  395. }
  396. Tduration& median() noexcept {
  397. std::sort(duration_.begin(), duration_.end());
  398. return duration_[duration_.size()/2];
  399. }
  400. //! A duration calculation utility
  401. static Tduration dt(Tpoint t2, Tpoint t1) noexcept {
  402. return std::chrono::duration_cast<Tduration>(t2 - t1);
  403. }
  404. //! Tool to print the time interval
  405. static void print_duration(const Tduration& duration, const char *what, mpi_id_t rank) noexcept {
  406. if (std::chrono::duration_cast<microseconds>(duration).count() < 10000)
  407. std::cout << "[Timing] (Rank " << rank << ") " << what << ": "
  408. << std::to_string(std::chrono::duration_cast<microseconds>(duration).count()) << " [usec]\n";
  409. else if (std::chrono::duration_cast<milliseconds>(duration).count() < 10000)
  410. std::cout << "[Timing] (Rank " << rank << ") " << what << ": "
  411. << std::to_string(std::chrono::duration_cast<milliseconds>(duration).count()) << " [msec]\n";
  412. else {
  413. char stime[26]; // fit ulong
  414. auto sec = std::chrono::duration_cast<seconds>(duration).count();
  415. auto msec = (std::chrono::duration_cast<milliseconds>(duration).count() % 1000) / 10; // keep 2 digit
  416. std::sprintf(stime, "%ld.%1ld", sec, msec);
  417. std::cout << "[Timing] (Rank " << rank << ") " << what << ": " << stime << " [sec]\n";
  418. }
  419. }
  420. private:
  421. size_t current_{0};
  422. Tpoint mark_{};
  423. std::vector<Tduration> duration_{1};
  424. };
  425. /*!
  426. * A "high level function"-like utility macro to forward a function call
  427. * and accumulate the execution time to the corresponding timing object.
  428. *
  429. * @param Tim The Timing object [Needs to have methods start() and stop()]
  430. * @param Func The function name
  431. * @param ... The arguments to pass to function (the preprocessor way)
  432. */
  433. #define timeCall(Tim, Func, ...) \
  434. Tim.start(); \
  435. Func(__VA_ARGS__); \
  436. Tim.stop(); \
  437. /*!
  438. * A utility to check if a number is power of two
  439. *
  440. * @tparam Integral The integral type of the number to check
  441. * @param x The number to check
  442. * @return True if it is power of 2, false otherwise
  443. */
  444. template <typename Integral>
  445. constexpr inline bool isPowerOfTwo(Integral x) noexcept {
  446. return (!(x & (x - 1)) && x);
  447. }
  448. #endif /* UTILS_HPP_ */