214 line
6.6 KiB
C++

/**
* \file v1.hpp
* \brief
*
* \author
* Christos Choutouridis AEM:8997
* <cchoutou@ece.auth.gr>
*/
#ifndef V1_HPP_
#define V1_HPP_
#include <vector>
#include <algorithm>
#include "matrix.hpp"
#include "v0.hpp"
#include "config.h"
#if defined CILK
#include <cilk/cilk.h>
#include <cilk/cilk_api.h>
//#include <cilk/reducer_opadd.h>
#elif defined OMP
#include <omp.h>
#elif defined PTHREADS
#include <thread>
#include <numeric>
#include <functional>
//#include <random>
#else
#endif
void init_workers();
namespace v1 {
/*!
*
* Merge knnsearch results and select the closest neighbors
*
* \tparam DataType
* \tparam IndexType
* \param N1 Neighbors results from one knnsearch
* \param D1 Distances results from one knnsearcs
* \param N2 Neighbors results from second knnsearch
* \param D2 Distances results from second knnsearch
* \param k How many
* \param m How accurate
* \param N Output for Neighbors
* \param D Output for Distances
*/
template <typename DataType, typename IndexType>
void mergeResultsWithM(mtx::Matrix<IndexType>& N1, mtx::Matrix<DataType>& D1,
mtx::Matrix<IndexType>& N2, mtx::Matrix<DataType>& D2,
size_t k, size_t m,
mtx::Matrix<IndexType>& N, mtx::Matrix<DataType>& D) {
size_t numQueries = N1.rows();
size_t maxCandidates = std::min((IndexType)m, (IndexType)(N1.columns() + N2.columns()));
for (size_t q = 0; q < numQueries; ++q) {
// Combine distances and neighbors
std::vector<std::pair<DataType, IndexType>> candidates(N1.columns() + N2.columns());
// Concatenate N1 and N2 rows
for (size_t i = 0; i < N1.columns(); ++i) {
candidates[i] = {D1.get(q, i), N1.get(q, i)};
}
for (size_t i = 0; i < N2.columns(); ++i) {
candidates[i + N1.columns()] = {D2.get(q, i), N2.get(q, i)};
}
// Keep only the top-m candidates
v0::quickselect(candidates, maxCandidates);
// Sort the top-m candidates
std::sort(candidates.begin(), candidates.begin() + maxCandidates);
// If m < k, pad the remaining slots with invalid values
for (size_t i = 0; i < k; ++i) {
if (i < maxCandidates) {
D.set(candidates[i].first, q, i);
N.set(candidates[i].second, q, i);
} else {
D.set(std::numeric_limits<DataType>::infinity(), q, i);
N.set(static_cast<IndexType>(-1), q, i); // Invalid index (end)
}
}
}
}
/*!
* The main parallelizable body
*/
template<typename MatrixD, typename MatrixI>
void worker_body (std::vector<MatrixD>& corpus_slices,
std::vector<MatrixD>& query_slices,
MatrixI& idx,
MatrixD& dst,
size_t slice,
size_t num_slices, size_t corpus_slice_size, size_t query_slice_size,
size_t k,
size_t m) {
// "load" types
using DstType = typename MatrixD::dataType;
using IdxType = typename MatrixI::dataType;
for (size_t ci = 0; ci < num_slices; ++ci) {
size_t idx_offset = ci * corpus_slice_size;
// Intermediate matrixes for intermediate results
MatrixI temp_idx(query_slices[slice].rows(), k);
MatrixD temp_dst(query_slices[slice].rows(), k);
// kNN for each combination
v0::knnsearch(corpus_slices[ci], query_slices[slice], idx_offset, k, m, temp_idx, temp_dst);
// Merge temporary results to final results
MatrixI idx_slice((IdxType*)idx.data(), slice * query_slice_size, query_slices[slice].rows(), k);
MatrixD dst_slice((DstType*)dst.data(), slice * query_slice_size, query_slices[slice].rows(), k);
mergeResultsWithM(idx_slice, dst_slice, temp_idx, temp_dst, k, m, idx_slice, dst_slice);
}
}
/*!
* \param C Is a MxD matrix (Corpus)
* \param Q Is a NxD matrix (Query)
* \param num_slices How many slices to Corpus-Query
* \param k The number of nearest neighbors needed
* \param m accuracy
* \param idx Is the Nxk matrix with the k indexes of the C points, that are
* neighbors of the nth point of Q
* \param dst Is the Nxk matrix with the k distances to the C points of the nth
* point of Q
*/
template<typename MatrixD, typename MatrixI>
void knnsearch(MatrixD& C, MatrixD& Q, size_t num_slices, size_t k, size_t m, MatrixI& idx, MatrixD& dst) {
using DstType = typename MatrixD::dataType;
using IdxType = typename MatrixI::dataType;
//Slice calculations
size_t corpus_slice_size = C.rows() / ((num_slices == 0)? 1:num_slices);
size_t query_slice_size = Q.rows() / ((num_slices == 0)? 1:num_slices);
// Make slices
std::vector<MatrixD> corpus_slices;
std::vector<MatrixD> query_slices;
for (size_t i = 0; i < num_slices; ++i) {
corpus_slices.emplace_back(
(DstType*)C.data(),
i * corpus_slice_size,
(i == num_slices - 1 ? C.rows() - i * corpus_slice_size : corpus_slice_size),
C.columns());
query_slices.emplace_back(
(DstType*)Q.data(),
i * query_slice_size,
(i == num_slices - 1 ? Q.rows() - i * query_slice_size : query_slice_size),
Q.columns());
}
// Initialize results
for (size_t i = 0; i < dst.rows(); ++i) {
for (size_t j = 0; j < dst.columns(); ++j) {
dst.set(std::numeric_limits<DstType>::infinity(), i, j);
idx.set(static_cast<IdxType>(-1), i, j);
}
}
// Main loop
#if defined OMP
#pragma omp parallel for
for (size_t qi = 0; qi < num_slices; ++qi) {
worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
}
#elif defined CILK
cilk_for (size_t qi = 0; qi < num_slices; ++qi) {
worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
}
#elif defined PTHREADS
std::vector<std::thread> workers;
for (size_t qi = 0; qi < num_slices; ++qi) {
workers.push_back(
std::thread (worker_body<MatrixD, MatrixI>,
std::ref(corpus_slices), std::ref(query_slices),
std::ref(idx), std::ref(dst),
qi,
num_slices, corpus_slice_size, query_slice_size,
k, m)
);
}
// Join threads
std::for_each(workers.begin(), workers.end(), [](std::thread& t){
t.join();
});
#else
for (size_t qi = 0; qi < num_slices; ++qi) {
worker_body (corpus_slices, query_slices, idx, dst, qi, num_slices, corpus_slice_size, query_slice_size, k, m);
}
#endif
}
} // namespace v1
#endif /* V1_HPP_ */