A triangle counting assignment for A.U.TH Parallel and distributed systems class.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

244 lines
7.6 KiB

  1. /*!
  2. * \file v3.cpp
  3. * \brief vv3 part of the exercise.
  4. *
  5. * \author
  6. * Christos Choutouridis AEM:8997
  7. * <cchoutou@ece.auth.gr>
  8. */
  9. #include <v3.h>
  10. namespace v3 {
  11. #if defined CILK
  12. /*!
  13. * Utility function to get/set the number of threads.
  14. *
  15. * The number of threads are controlled via environment variable \c CILK_NWORKERS
  16. *
  17. * \return The number of threads used.
  18. * \note
  19. * The user can reduce the number with the command option \c --max_threads.
  20. * If so the requested number will be used even if the environment has more threads available.
  21. */
  22. int nworkers() {
  23. if (session.max_threads)
  24. return (session.max_threads < __cilkrts_get_nworkers()) ?
  25. session.max_threads : __cilkrts_get_nworkers();
  26. else
  27. return __cilkrts_get_nworkers();
  28. }
  29. /*!
  30. * Calculate and return a vertex-wise count vector.
  31. *
  32. * \param A The matrix to use.
  33. * \return The count vector. RVO is used here.
  34. * \note
  35. * We use two methods of calculation based on \c --make_symmetric or \c --triangular_only
  36. * - A full matrix calculation which update only c[i]
  37. * - A lower triangular matrix which update c[i], c[j], c[k]. This is wayyy faster.
  38. */
  39. std::vector<value_t> triang_v(matrix& A) {
  40. std::vector<std::atomic<value_t>> c(A.size()); // atomic for c[j], c[k] only
  41. std::vector<value_t> ret(A.size()); // unrestricted c[i] access
  42. cilk_for (int i=0 ; i<A.size() ; ++i) {
  43. for (auto j = A.getCol(i); j.index() != j.end() ; ++j) {
  44. // j list all the edges with i
  45. for (auto k = A.getCol(j.index()); k.index() != k.end() ; ++k) {
  46. // k list all the edges with j
  47. if (A.get(k.index(), i)) {
  48. ++ret[i];
  49. c[j.index()] += (!session.makeSymmetric)? 1:0;
  50. c[k.index()] += (!session.makeSymmetric)? 1:0;
  51. }
  52. }
  53. }
  54. if (session.makeSymmetric) {
  55. ret[i] = ret[i]/2;
  56. c[i] = c[i]/2;
  57. }
  58. }
  59. // merge c to ret and return it
  60. for (index_t i =0 ; i<A.size() ; ++i) ret[i] += c[i];
  61. return ret;
  62. }
  63. /*!
  64. * A sum utility to use as spawn function for parallelized sum.
  65. * \return The sum of \c v from \c begin to \c end.
  66. */
  67. void do_sum (value_t& out_sum, std::vector<value_t>& v, index_t begin, index_t end) {
  68. for (auto i =begin ; i != end ; ++i)
  69. out_sum += v[i];
  70. }
  71. /*!
  72. * A parallelized version of sum. Just because ;)
  73. * \return The total sum of vector \c v
  74. */
  75. value_t sum (std::vector<value_t>& v) {
  76. int n = nworkers();
  77. std::vector<value_t> sum_v(n, 0); // result of each do_sum invocation.
  78. // We spawn workers in a more statically way.
  79. for (index_t i =0 ; i < n ; ++i) {
  80. cilk_spawn do_sum(sum_v[i], v, i*v.size()/n, (i+1)*v.size()/n);
  81. }
  82. cilk_sync;
  83. // sum the sums (a sum to rule them all)
  84. value_t s =0; for (auto& it : sum_v) s += it;
  85. return s;
  86. }
  87. #elif defined OMP
  88. /*!
  89. * A "simple" user defined OpenMP reduction for vector<value_t>
  90. * \note
  91. * Not used. Reason: The atomic version of the code performs better.
  92. */
  93. #pragma omp declare reduction(vec_value_plus : std::vector<value_t> : \
  94. std::transform( \
  95. omp_out.begin(), omp_out.end(), omp_in.begin(), omp_out.begin(), std::plus<value_t>() \
  96. ) \
  97. ) \
  98. initializer(omp_priv = decltype(omp_orig)(omp_orig.size()))
  99. /*!
  100. * Utility function to get/set the number of threads.
  101. *
  102. * The number of threads are controlled via environment variable \c OMP_NUM_THREADS
  103. *
  104. * \return The number of threads used.
  105. * \note
  106. * The user can reduce the number with the command option \c --max_threads.
  107. * If so the requested number will be used even if the environment has more threads available.
  108. */
  109. int nworkers() {
  110. if (session.max_threads && session.max_threads < (size_t)omp_get_max_threads()) {
  111. omp_set_dynamic(0);
  112. omp_set_num_threads(session.max_threads);
  113. return session.max_threads;
  114. }
  115. else {
  116. omp_set_dynamic(1);
  117. return omp_get_max_threads();
  118. }
  119. }
  120. /*!
  121. * Calculate and return a vertex-wise count vector.
  122. *
  123. * \param A The matrix to use.
  124. * \return The count vector. RVO is used here.
  125. * \note
  126. * We use two methods of calculation based on \c --make_symmetric or \c --triangular_only
  127. * - A full matrix calculation which update only c[i]
  128. * - A lower triangular matrix which update c[i], c[j], c[k]. This is waaayyy faster.
  129. */
  130. std::vector<value_t> triang_v(matrix& A) {
  131. std::vector<std::atomic<value_t>> c(A.size()); // atomic for c[j], c[k] only
  132. std::vector<value_t> ret(A.size()); // unrestricted c[i] access
  133. // OMP schedule selection
  134. if (session.dynamic) omp_set_schedule (omp_sched_dynamic, 0);
  135. else omp_set_schedule (omp_sched_static, 0);
  136. #pragma omp parallel for schedule(runtime) //reduction(vec_value_plus : c)
  137. for (int i=0 ; i<A.size() ; ++i) {
  138. for (auto j = A.getCol(i); j.index() != j.end() ; ++j) {
  139. // j list all the edges with i
  140. for (auto k = A.getCol(j.index()); k.index() != k.end() ; ++k) {
  141. // k list all the edges with j
  142. if (A.get(k.index(), i)) {
  143. ++ret[i];
  144. c[j.index()] += (!session.makeSymmetric)? 1:0;
  145. c[k.index()] += (!session.makeSymmetric)? 1:0;
  146. }
  147. }
  148. }
  149. if (session.makeSymmetric) {
  150. ret[i] = ret[i]/2;
  151. c[i] = c[i]/2;
  152. }
  153. }
  154. // merge c to ret and return it
  155. for (index_t i =0 ; i<A.size() ; ++i) ret[i] += c[i];
  156. return ret;
  157. }
  158. /*!
  159. * A parallelized version of sum. Just because ;)
  160. * \return The total sum of vector \c v
  161. */
  162. value_t sum (std::vector<value_t>& v) {
  163. value_t s =0;
  164. #pragma omp parallel for reduction(+:s)
  165. for (auto i =0u ; i<v.size() ; ++i)
  166. s += v[i];
  167. return s;
  168. }
  169. #else
  170. //! Return the number of workers.
  171. //! \note This function is just for completion
  172. int nworkers() { return 1; }
  173. /*!
  174. * Calculate and return a vertex-wise count vector.
  175. *
  176. * \param A The matrix to use.
  177. * \return The count vector. RVO is used here.
  178. * \note
  179. * We use two methods of calculation based on \c --make_symmetric or \c --triangular_only
  180. * - A full matrix calculation which update only c[i]
  181. * - A lower triangular matrix which update c[i], c[j], c[k]. This is waaayyy faster.
  182. */
  183. std::vector<value_t> triang_v(matrix& A) {
  184. std::vector<value_t> c(A.size());
  185. for (int i=0 ; i<A.size() ; ++i) {
  186. for (auto j = A.getCol(i); j.index() != j.end() ; ++j) {
  187. // j list all the edges with i
  188. for (auto k = A.getCol(j.index()); k.index() != k.end() ; ++k) {
  189. // k list all the edges with j
  190. if (A.get(k.index(), i)) {
  191. ++c[i];
  192. c[j.index()] += (!session.makeSymmetric)? 1:0;
  193. c[k.index()] += (!session.makeSymmetric)? 1:0;
  194. //^ We set other nodes in case of lower triangular
  195. }
  196. }
  197. }
  198. if (session.makeSymmetric) c[i] /= 2;
  199. //^ We don't have to divide by 2 in case of lower triangular
  200. }
  201. return c;
  202. }
  203. /*!
  204. * Summation functionality.
  205. * \return The total sum of vector \c v
  206. */
  207. value_t sum (std::vector<value_t>& v) {
  208. value_t s =0;
  209. for (auto& it : v)
  210. s += it;
  211. return s;
  212. }
  213. #endif
  214. //! Polymorphic interface function for sum results
  215. value_t triang_count (std::vector<value_t>& c) {
  216. return sum(c)/3;
  217. }
  218. }