AUTH's THMMY "Parallel and distributed systems" course assignments.
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 
 
 

216 satır
6.7 KiB

  1. /**
  2. * \file v1.hpp
  3. * \brief
  4. *
  5. * \author
  6. * Christos Choutouridis AEM:8997
  7. * <cchoutou@ece.auth.gr>
  8. */
  9. #ifndef V1_HPP_
  10. #define V1_HPP_
  11. #include <vector>
  12. #include <algorithm>
  13. #include "matrix.hpp"
  14. #include "v0.hpp"
  15. #include "config.h"
  16. #if defined CILK
  17. #include <cilk/cilk.h>
  18. #include <cilk/cilk_api.h>
  19. //#include <cilk/reducer_opadd.h>
  20. #elif defined OMP
  21. #include <omp.h>
  22. #elif defined PTHREADS
  23. #include <thread>
  24. #include <numeric>
  25. #include <functional>
  26. //#include <random>
  27. #else
  28. #endif
  29. void init_workers();
  30. namespace v1 {
  31. /*!
  32. *
  33. * Merge knnsearch results and select the closest neighbors
  34. *
  35. * \tparam DataType
  36. * \tparam IndexType
  37. * \param N1 Neighbors results from one knnsearch
  38. * \param D1 Distances results from one knnsearcs
  39. * \param N2 Neighbors results from second knnsearch
  40. * \param D2 Distances results from second knnsearch
  41. * \param k How many
  42. * \param m How accurate
  43. * \param N Output for Neighbors
  44. * \param D Output for Distances
  45. */
  46. template <typename DataType, typename IndexType>
  47. void mergeResultsWithM(mtx::Matrix<IndexType>& N1, mtx::Matrix<DataType>& D1,
  48. mtx::Matrix<IndexType>& N2, mtx::Matrix<DataType>& D2,
  49. size_t k, size_t m,
  50. mtx::Matrix<IndexType>& N, mtx::Matrix<DataType>& D) {
  51. size_t numQueries = N1.rows();
  52. size_t maxCandidates = std::min((IndexType)m, (IndexType)(N1.columns() + N2.columns()));
  53. for (size_t q = 0; q < numQueries; ++q) {
  54. // Combine distances and neighbors
  55. std::vector<std::pair<DataType, IndexType>> candidates(N1.columns() + N2.columns());
  56. // Concatenate N1 and N2 rows
  57. for (size_t i = 0; i < N1.columns(); ++i) {
  58. candidates[i] = {D1.get(q, i), N1.get(q, i)};
  59. }
  60. for (size_t i = 0; i < N2.columns(); ++i) {
  61. candidates[i + N1.columns()] = {D2.get(q, i), N2.get(q, i)};
  62. }
  63. // Keep only the top-m candidates
  64. v0::quickselect(candidates, maxCandidates);
  65. // Sort the top-m candidates
  66. std::sort(candidates.begin(), candidates.begin() + maxCandidates);
  67. // If m < k, pad the remaining slots with invalid values
  68. for (size_t i = 0; i < k; ++i) {
  69. if (i < maxCandidates) {
  70. D.set(candidates[i].first, q, i);
  71. N.set(candidates[i].second, q, i);
  72. } else {
  73. D.set(std::numeric_limits<DataType>::infinity(), q, i);
  74. N.set(static_cast<IndexType>(-1), q, i); // Invalid index (end)
  75. }
  76. }
  77. }
  78. }
  79. /*!
  80. * The main parallelizable body
  81. */
  82. template<typename MatrixD, typename MatrixI>
  83. void worker_body (std::vector<MatrixD>& corpus_slices,
  84. std::vector<MatrixD>& query_slices,
  85. MatrixI& idx,
  86. MatrixD& dst,
  87. size_t slice,
  88. size_t num_slices, size_t corpus_slice_size, size_t query_slice_size,
  89. size_t k,
  90. size_t m) {
  91. // "load" types
  92. using DstType = typename MatrixD::dataType;
  93. using IdxType = typename MatrixI::dataType;
  94. for (size_t ci = 0; ci < num_slices; ++ci) {
  95. size_t idx_offset = ci * corpus_slice_size;
  96. // Intermediate matrixes for intermediate results
  97. MatrixI temp_idx(query_slices[slice].rows(), k);
  98. MatrixD temp_dst(query_slices[slice].rows(), k);
  99. // kNN for each combination
  100. v0::knnsearch(corpus_slices[ci], query_slices[slice], idx_offset, k, m, temp_idx, temp_dst);
  101. // Merge temporary results to final results
  102. MatrixI idx_slice((IdxType*)idx.data(), slice * query_slice_size, query_slices[slice].rows(), k);
  103. MatrixD dst_slice((DstType*)dst.data(), slice * query_slice_size, query_slices[slice].rows(), k);
  104. mergeResultsWithM(idx_slice, dst_slice, temp_idx, temp_dst, k, m, idx_slice, dst_slice);
  105. }
  106. }
  107. /*!
  108. * \param C Is a MxD matrix (Corpus)
  109. * \param Q Is a NxD matrix (Query)
  110. * \param num_slices How many slices to Corpus-Query
  111. * \param k The number of nearest neighbors needed
  112. * \param m accuracy
  113. * \param idx Is the Nxk matrix with the k indexes of the C points, that are
  114. * neighbors of the nth point of Q
  115. * \param dst Is the Nxk matrix with the k distances to the C points of the nth
  116. * point of Q
  117. */
  118. template<typename MatrixD, typename MatrixI>
  119. void knnsearch(MatrixD& C, MatrixD& Q, size_t num_slices, size_t k, size_t m, MatrixI& idx, MatrixD& dst) {
  120. using DstType = typename MatrixD::dataType;
  121. using IdxType = typename MatrixI::dataType;
  122. //Slice calculations
  123. size_t corpus_slice_size = C.rows() / ((num_slices == 0)? 1:num_slices);
  124. size_t query_slice_size = Q.rows() / ((num_slices == 0)? 1:num_slices);
  125. // Make slices
  126. std::vector<MatrixD> corpus_slices;
  127. std::vector<MatrixD> query_slices;
  128. for (size_t i = 0; i < num_slices; ++i) {
  129. corpus_slices.emplace_back(
  130. (DstType*)C.data(),
  131. i * corpus_slice_size,
  132. (i == num_slices - 1 ? C.rows() - i * corpus_slice_size : corpus_slice_size),
  133. C.columns());
  134. query_slices.emplace_back(
  135. (DstType*)Q.data(),
  136. i * query_slice_size,
  137. (i == num_slices - 1 ? Q.rows() - i * query_slice_size : query_slice_size),
  138. Q.columns());
  139. }
  140. // Intermediate results
  141. for (size_t i = 0; i < dst.rows(); ++i) {
  142. for (size_t j = 0; j < dst.columns(); ++j) {
  143. dst.set(std::numeric_limits<DstType>::infinity(), i, j);
  144. idx.set(static_cast<IdxType>(-1), i, j);
  145. }
  146. }
  147. // Main loop
  148. #if defined OMP
  149. #pragma omp parallel for
  150. for (size_t qi = 0; qi < num_slices; ++qi) {
  151. worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
  152. }
  153. #elif defined CILK
  154. cilk_for (size_t qi = 0; qi < num_slices; ++qi) {
  155. worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
  156. }
  157. #elif defined PTHREADS
  158. std::vector<std::thread> workers;
  159. for (size_t qi = 0; qi < num_slices; ++qi) {
  160. workers.push_back(
  161. std::thread (worker_body<MatrixD, MatrixI>,
  162. std::ref(corpus_slices), std::ref(query_slices),
  163. std::ref(idx), std::ref(dst),
  164. qi,
  165. num_slices, corpus_slice_size, query_slice_size,
  166. k, m)
  167. );
  168. }
  169. // Join threads
  170. std::for_each(workers.begin(), workers.end(), [](std::thread& t){
  171. t.join();
  172. });
  173. #else
  174. for (size_t qi = 0; qi < num_slices; ++qi) {
  175. for (size_t qi = 0; qi < num_slices; ++qi) {
  176. worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
  177. }
  178. }
  179. #endif
  180. }
  181. } // namespace v1
  182. #endif /* V1_HPP_ */