AUTH's THMMY "Parallel and distributed systems" course assignments.
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /**
  2. * \file v1.hpp
  3. * \brief
  4. *
  5. * \author
  6. * Christos Choutouridis AEM:8997
  7. * <cchoutou@ece.auth.gr>
  8. */
  9. #ifndef V1_HPP_
  10. #define V1_HPP_
  11. #include <vector>
  12. #include <algorithm>
  13. #include "matrix.hpp"
  14. #include "v0.hpp"
  15. #include "config.h"
  16. #if defined CILK
  17. #include <cilk/cilk.h>
  18. #include <cilk/cilk_api.h>
  19. //#include <cilk/reducer_opadd.h>
  20. #elif defined OMP
  21. #include <omp.h>
  22. #elif defined PTHREADS
  23. #include <thread>
  24. #include <numeric>
  25. #include <functional>
  26. //#include <random>
  27. #else
  28. #endif
  29. void init_workers();
  30. namespace v1 {
  31. template <typename DataType, typename IndexType>
  32. void mergeResultsWithM(mtx::Matrix<IndexType>& N1, mtx::Matrix<DataType>& D1,
  33. mtx::Matrix<IndexType>& N2, mtx::Matrix<DataType>& D2,
  34. size_t k, size_t m,
  35. mtx::Matrix<IndexType>& N, mtx::Matrix<DataType>& D) {
  36. size_t numQueries = N1.rows();
  37. size_t maxCandidates = std::min((IndexType)m, (IndexType)(N1.columns() + N2.columns()));
  38. for (size_t q = 0; q < numQueries; ++q) {
  39. // Combine distances and neighbors
  40. std::vector<std::pair<DataType, IndexType>> candidates(N1.columns() + N2.columns());
  41. // Concatenate N1 and N2 rows
  42. for (size_t i = 0; i < N1.columns(); ++i) {
  43. candidates[i] = {D1.get(q, i), N1.get(q, i)};
  44. }
  45. for (size_t i = 0; i < N2.columns(); ++i) {
  46. candidates[i + N1.columns()] = {D2.get(q, i), N2.get(q, i)};
  47. }
  48. // Keep only the top-m candidates
  49. v0::quickselect(candidates, maxCandidates);
  50. // Sort the top-m candidates
  51. std::sort(candidates.begin(), candidates.begin() + maxCandidates);
  52. // If m < k, pad the remaining slots with invalid values
  53. for (size_t i = 0; i < k; ++i) {
  54. if (i < maxCandidates) {
  55. D.set(candidates[i].first, q, i);
  56. N.set(candidates[i].second, q, i);
  57. } else {
  58. D.set(std::numeric_limits<DataType>::infinity(), q, i);
  59. N.set(static_cast<IndexType>(-1), q, i); // Invalid index (end)
  60. }
  61. }
  62. }
  63. }
  64. template<typename MatrixD, typename MatrixI>
  65. void worker_body (std::vector<MatrixD>& corpus_slices,
  66. std::vector<MatrixD>& query_slices,
  67. MatrixI& idx,
  68. MatrixD& dst,
  69. size_t slice,
  70. size_t num_slices, size_t corpus_slice_size, size_t query_slice_size,
  71. size_t k,
  72. size_t m) {
  73. // "load" types
  74. using DstType = typename MatrixD::dataType;
  75. using IdxType = typename MatrixI::dataType;
  76. for (size_t ci = 0; ci < num_slices; ++ci) {
  77. size_t idx_offset = ci * corpus_slice_size;
  78. // Intermediate matrixes for intermediate results
  79. MatrixI temp_idx(query_slices[slice].rows(), k);
  80. MatrixD temp_dst(query_slices[slice].rows(), k);
  81. // kNN for each combination
  82. v0::knnsearch(corpus_slices[ci], query_slices[slice], idx_offset, k, m, temp_idx, temp_dst);
  83. // Merge temporary results to final results
  84. MatrixI idx_slice((IdxType*)idx.data(), slice * query_slice_size, query_slices[slice].rows(), k);
  85. MatrixD dst_slice((DstType*)dst.data(), slice * query_slice_size, query_slices[slice].rows(), k);
  86. mergeResultsWithM(idx_slice, dst_slice, temp_idx, temp_dst, k, m, idx_slice, dst_slice);
  87. }
  88. }
  89. template<typename MatrixD, typename MatrixI>
  90. void knnsearch(MatrixD& C, MatrixD& Q, size_t num_slices, size_t k, size_t m, MatrixI& idx, MatrixD& dst) {
  91. using DstType = typename MatrixD::dataType;
  92. using IdxType = typename MatrixI::dataType;
  93. //Slice calculations
  94. size_t corpus_slice_size = C.rows() / ((num_slices == 0)? 1:num_slices);
  95. size_t query_slice_size = Q.rows() / ((num_slices == 0)? 1:num_slices);
  96. // Make slices
  97. std::vector<MatrixD> corpus_slices;
  98. std::vector<MatrixD> query_slices;
  99. for (size_t i = 0; i < num_slices; ++i) {
  100. corpus_slices.emplace_back(
  101. (DstType*)C.data(),
  102. i * corpus_slice_size,
  103. (i == num_slices - 1 ? C.rows() - i * corpus_slice_size : corpus_slice_size),
  104. C.columns());
  105. query_slices.emplace_back(
  106. (DstType*)Q.data(),
  107. i * query_slice_size,
  108. (i == num_slices - 1 ? Q.rows() - i * query_slice_size : query_slice_size),
  109. Q.columns());
  110. }
  111. // Intermediate results
  112. for (size_t i = 0; i < dst.rows(); ++i) {
  113. for (size_t j = 0; j < dst.columns(); ++j) {
  114. dst.set(std::numeric_limits<DstType>::infinity(), i, j);
  115. idx.set(static_cast<IdxType>(-1), i, j);
  116. }
  117. }
  118. // Main loop
  119. #if defined OMP
  120. #pragma omp parallel for
  121. for (size_t qi = 0; qi < num_slices; ++qi) {
  122. for (size_t qi = 0; qi < num_slices; ++qi) {
  123. worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
  124. }
  125. }
  126. #elif defined CILK
  127. cilk_for (size_t qi = 0; qi < num_slices; ++qi) {
  128. for (size_t qi = 0; qi < num_slices; ++qi) {
  129. worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
  130. }
  131. }
  132. #elif defined PTHREADS
  133. std::vector<std::thread> workers;
  134. for (size_t qi = 0; qi < num_slices; ++qi) {
  135. workers.push_back(
  136. std::thread (worker_body<MatrixD, MatrixI>,
  137. std::ref(corpus_slices), std::ref(query_slices),
  138. std::ref(idx), std::ref(dst),
  139. qi,
  140. num_slices, corpus_slice_size, query_slice_size,
  141. k, m)
  142. );
  143. }
  144. // Join threads
  145. std::for_each(workers.begin(), workers.end(), [](std::thread& t){
  146. t.join();
  147. });
  148. #else
  149. for (size_t qi = 0; qi < num_slices; ++qi) {
  150. for (size_t qi = 0; qi < num_slices; ++qi) {
  151. worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
  152. }
  153. }
  154. #endif
  155. }
  156. } // namespace v1
  157. #endif /* V1_HPP_ */