diff --git a/homework_2/Makefile b/homework_2/Makefile index 1905b6e..5d76421 100644 --- a/homework_2/Makefile +++ b/homework_2/Makefile @@ -42,7 +42,7 @@ BUILD_DIR := bin OBJ_DIR := $(BUILD_DIR)/obj DEP_DIR := $(BUILD_DIR)/.dep -OUTPUT_DIR := out-rc3b +OUTPUT_DIR := out # ========== Compiler settings ========== # Compiler flags for debug and release diff --git a/homework_2/include/config.h b/homework_2/include/config.h index e3bfd42..5c62edf 100644 --- a/homework_2/include/config.h +++ b/homework_2/include/config.h @@ -49,14 +49,19 @@ static constexpr size_t MAX_PIPELINE_SIZE = 64UL; using distValue_t = uint32_t; /*! - * Session option for each invocation of the executable + * Session option for each invocation of the executable. + * + * @note + * The values of the members are set from the command line. */ struct config_t { size_t arraySize{DEFAULT_DATA_SIZE}; //!< The array size of the local data to sort. - size_t pipeline{1UL}; //!< Pipeline stages + bool exchangeOpt{false}; //!< Flag to request the exchange optimization + size_t pipeline{1UL}; //!< Pipeline stages (1 to disable) bool validation{false}; //!< Request a full validation at the end, performed by process rank 0. bool ndebug{false}; //!< Skips debug trap on DEBUG builds. - bool perf{false}; //!< Enable performance timing measurements and prints. + size_t perf{1}; //!< Enable performance timing measurements and prints and repeat + //!< the performs the sorting times to average the measurements bool verbose{false}; //!< Flag to enable verbose output to stdout. }; diff --git a/homework_2/include/distsort.hpp b/homework_2/include/distsort.hpp index 3eb3575..f95d21d 100644 --- a/homework_2/include/distsort.hpp +++ b/homework_2/include/distsort.hpp @@ -22,7 +22,15 @@ #include "utils.hpp" -extern Timing TfullSort, Texchange, Tminmax, TelbowSort; // make timers public +/* + * Exported timers + */ +extern Timing Ttotal; +extern Timing TfullSort; +extern Timing Texchange; +extern Timing Tminmax; +extern Timing TelbowSort; + /*! * Enumerator for the different versions of the sorting method @@ -167,6 +175,9 @@ void fullSort(RangeT& data, bool ascending) noexcept { else { __gnu_parallel::sort(data.begin(), data.end(), std::greater<>()); } + + if (config.exchangeOpt) + updateMinMax(localStat, data); } /*! @@ -231,6 +242,43 @@ void elbowSort(ShadowedDataT& data, bool ascending) noexcept { elbowSortCore(data, ascending, std::greater<>()); } +/*! + * Predicate for exchange optimization. Returns true only if an exchange between partners is needed. + * In order to do that we exchange min and max statistics of the partner's data. + * + * @tparam StatT Statistics data type (for min-max) + * + * @param lstat [const StatT] Reference to the local statistic data + * @param rstat [StatT] Reference to the remote statistic data to fill + * @param part [mpi_id_t] The partner for the exchange + * @param tag [int] The tag to use for the exchange of stats + * @param keepSmall [bool] Flag to indicate if the local thread keeps the small ro the large values + * @return True if we need data exchange, false otherwise + */ +template +bool needsExchange(const StatT& lstat, StatT& rstat, mpi_id_t part, int tag, bool keepSmall) { + timeCall(Texchange, mpi.exchange_it, lstat, rstat, part, tag); + return (keepSmall) ? + rstat.min < lstat.max // Lmin: rstat.min - Smax: lstat.max + : lstat.min < rstat.max; // Lmin: lstat.min - Smax: rstat.max +} + + +/*! + * Update stats utility + * + * @tparam RangeT A range type with random access iterator + * @tparam StatT Statistics data type (for min-max) + * + * @param stat [StatT] Reference to the statistic data to update + * @param data [const RangeT] Reference to the sequence to extract stats from + */ +template +void updateMinMax(StatT& stat, const RangeT& data) noexcept { + auto [min, max] = std::minmax_element(data.begin(), data.end()); + stat.min = *min; + stat.max = *max; +} /*! * Takes two sequences and selects either the larger or the smaller items @@ -276,7 +324,11 @@ void keepMinOrMax(ValueT* local, const ValueT* remote, size_t count, bool keepSm size_t tagGenerator(size_t depth, size_t step, size_t stage = 0); /*! - * A pipeline loop for mixing min-max process with mpi data exchange + * An exchange functionality to support both Bubbletonic and Bitonic sort algorithms. + * + * @note + * In case of pipeline request it switches to non-blocking MPI communication for + * pipelining min-max process with mpi data exchange * * @tparam ShadowedDataT A Shadowed buffer type with random access iterator. * @@ -289,28 +341,38 @@ size_t tagGenerator(size_t depth, size_t step, size_t stage = 0); * The @c tag is increased inside the pipeline loop for each different data exchange */ template -void exchangePipeline(ShadowedDataT& data, mpi_id_t partner, bool keepSmall, int tag) { +void exchange(ShadowedDataT& data, mpi_id_t partner, bool keepSmall, int tag) { using Value_t = typename ShadowedDataT::value_type; // Init counters and pointers - size_t count = data.size() / config.pipeline; Value_t* active = data.getActive().data(); Value_t* shadow = data.getShadow().data(); + size_t count = data.size() / config.pipeline; - // Pipeline - Texchange.start(); - mpi.exchange_start(active, shadow, count, partner, tag); - for (size_t stage = 0 ; stage < config.pipeline ; active += count, shadow += count) { - // Wait previous chunk - mpi.exchange_wait(); Texchange.stop(); - if (++stage < config.pipeline) { - // Start next chunk if there is a next one - Texchange.start(); - mpi.exchange_start(active + count, shadow + count, count, partner, ++tag); + if (config.pipeline > 1) { + // Pipeline case - use async MPI + Texchange.start(); + mpi.exchange_start(active, shadow, count, partner, tag); + for (size_t stage = 0; stage < config.pipeline; active += count, shadow += count) { + // Wait previous chunk + mpi.exchange_wait(); + Texchange.stop(); + if (++stage < config.pipeline) { + // Start next chunk if there is a next one + Texchange.start(); + mpi.exchange_start(active + count, shadow + count, count, partner, ++tag); + } + // process the arrived data + timeCall(Tminmax, keepMinOrMax, active, shadow, count, keepSmall); } - // process the arrived data + } + else { + // No pipeline - use blocking MPI + timeCall(Texchange, mpi.exchange, active, shadow, count, partner, tag); timeCall(Tminmax, keepMinOrMax, active, shadow, count, keepSmall); } + if (config.exchangeOpt) + updateMinMax(localStat, data); } /*! @@ -339,8 +401,10 @@ void distBubbletonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) { isActive(part, Processes) ) { // Exchange with partner, keep nim-or-max and sort - O(N) int tag = static_cast(tagGenerator(0, step)); - exchangePipeline(data, part, ks, tag); - timeCall(TelbowSort, elbowSort, data, ascending(rank, Processes)); + if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) { + exchange(data, part, ks, tag); + timeCall(TelbowSort, elbowSort, data, ascending(rank, Processes)); + } } } @@ -378,7 +442,9 @@ void distBitonic(ShadowedDataT& data, mpi_id_t Processes, mpi_id_t rank) { auto ks = keepSmall(rank, part, depth); // Exchange with partner, keep nim-or-max int tag = static_cast(tagGenerator(depth, step)); - exchangePipeline(data, part, ks, tag); + if (!config.exchangeOpt || needsExchange(localStat, remoteStat, part, tag++, ks)) { + exchange(data, part, ks, tag); + } } // sort - O(N) timeCall(TelbowSort, elbowSort, data, ascending(rank, depth)); diff --git a/homework_2/include/utils.hpp b/homework_2/include/utils.hpp index f1c355e..6936d86 100644 --- a/homework_2/include/utils.hpp +++ b/homework_2/include/utils.hpp @@ -17,6 +17,22 @@ #include "config.h" +/*! + * Min-Max statistics data for exchange optimization + * @tparam Value_t The underlying data type of the sequence data + */ +template +struct Stat_t { + using value_type = Value_t; //!< meta-export the type + + Value_t min{}; //!< The minimum value of the sequence + Value_t max{}; //!< The maximum value of the sequence +}; + +//! Application data selection alias +using distStat_t = Stat_t; +extern distStat_t localStat, remoteStat; // Make stats public + /* * MPI_ dispatcher mechanism */ @@ -78,6 +94,64 @@ struct MPI_t { name_ = std::string (processor_name, name_len); } + /*! + * Exchange one data object of type @c T with partner as part of the sorting network of both + * bubbletonic or bitonic sorting algorithms. + * + * This function matches a transmit and a receive in order for fully exchanged the data object + * between current node and partner. + * + * @tparam T The object type + * + * @param local [const T&] Reference to the local object to send + * @param remote [T&] Reference to the object to receive data from partner + * @param partner [mpi_id_t] The partner for the exchange + * @param tag [int] The tag to use for the MPI communication + */ + template + void exchange_it(const T& local, T& remote, ID_t partner, int tag) { + if (tag < 0) + throw std::runtime_error("(MPI) exchange_it() [tag] - Out of bound"); + MPI_Status status; + int err; + if ((err = MPI_Sendrecv( + &local, sizeof(T), MPI_BYTE, partner, tag, + &remote, sizeof(T), MPI_BYTE, partner, tag, + MPI_COMM_WORLD, &status + )) != MPI_SUCCESS) + mpi_throw(err, "(MPI) MPI_Sendrecv() [item] - "); + } + + /*! + * Exchange data with partner as part of the sorting network of both bubbletonic or bitonic + * sorting algorithms. + * + * This function matches a transmit and a receive in order for fully exchanged data between + * current node and partner. + * + * @tparam T The inner valur type used in buffer + * + * @param ldata [const ValueT*] Pointer to local data to send + * @param rdata [ValueT*] Pointer to buffer to receive data from partner + * @param count [size_t] The number of data to exchange + * @param partner [mpi_id_t] The partner for the exchange + * @param tag [int] The tag to use for the MPI communication + */ + template + void exchange(const ValueT* ldata, ValueT* rdata, size_t count, ID_t partner, int tag) { + if (tag < 0) + throw std::runtime_error("(MPI) exchange_data() [tag] - Out of bound"); + + MPI_Datatype datatype = MPI_TypeMapper::getType(); + MPI_Status status; + int err; + if ((err = MPI_Sendrecv( + ldata, count, datatype, partner, tag, + rdata, count, datatype, partner, tag, + MPI_COMM_WORLD, &status + )) != MPI_SUCCESS) + mpi_throw(err, "(MPI) MPI_Sendrecv() [data] - "); + } /*! * Initiate a data exchange data with partner using non-blocking Isend-Irecv, as part of the @@ -353,33 +427,51 @@ struct Timing { using milliseconds = std::chrono::milliseconds; using seconds = std::chrono::seconds; + //! Setup measurement rounds + void init(size_t rounds) { + duration_.resize(rounds); + for (auto& d : duration_) + d = Tduration::zero(); + } + //! tool to mark the starting point Tpoint start() noexcept { return mark_ = std::chrono::steady_clock::now(); } //! tool to mark the ending point Tpoint stop() noexcept { Tpoint now = std::chrono::steady_clock::now(); - duration_ += dt(now, mark_); + duration_[current_] += dt(now, mark_); return now; } + //! Switch timing slot + void next() noexcept { + ++current_; + current_ %= duration_.size(); + } + + Tduration& median() noexcept { + std::sort(duration_.begin(), duration_.end()); + return duration_[duration_.size()/2]; + } + //! A duration calculation utility static Tduration dt(Tpoint t2, Tpoint t1) noexcept { return std::chrono::duration_cast(t2 - t1); } //! Tool to print the time interval - void print_duration(const char *what, mpi_id_t rank) noexcept { - if (std::chrono::duration_cast(duration_).count() < 10000) + static void print_duration(const Tduration& duration, const char *what, mpi_id_t rank) noexcept { + if (std::chrono::duration_cast(duration).count() < 10000) std::cout << "[Timing] (Rank " << rank << ") " << what << ": " - << std::to_string(std::chrono::duration_cast(duration_).count()) << " [usec]\n"; - else if (std::chrono::duration_cast(duration_).count() < 10000) + << std::to_string(std::chrono::duration_cast(duration).count()) << " [usec]\n"; + else if (std::chrono::duration_cast(duration).count() < 10000) std::cout << "[Timing] (Rank " << rank << ") " << what << ": " - << std::to_string(std::chrono::duration_cast(duration_).count()) << " [msec]\n"; + << std::to_string(std::chrono::duration_cast(duration).count()) << " [msec]\n"; else { char stime[26]; // fit ulong - auto sec = std::chrono::duration_cast(duration_).count(); - auto msec = (std::chrono::duration_cast(duration_).count() % 1000) / 10; // keep 2 digit + auto sec = std::chrono::duration_cast(duration).count(); + auto msec = (std::chrono::duration_cast(duration).count() % 1000) / 10; // keep 2 digit std::sprintf(stime, "%ld.%1ld", sec, msec); std::cout << "[Timing] (Rank " << rank << ") " << what << ": " << stime << " [sec]\n"; } @@ -387,8 +479,9 @@ struct Timing { } private: + size_t current_{0}; Tpoint mark_{}; - Tduration duration_{}; + std::vector duration_{1}; }; /*! diff --git a/homework_2/report/homework_2_report.pdf b/homework_2/report/homework_2_report.pdf index fa35cf2..670a638 100644 Binary files a/homework_2/report/homework_2_report.pdf and b/homework_2/report/homework_2_report.pdf differ diff --git a/homework_2/src/distsort.cpp b/homework_2/src/distsort.cpp index 6a614ea..19d8a28 100644 --- a/homework_2/src/distsort.cpp +++ b/homework_2/src/distsort.cpp @@ -10,13 +10,9 @@ #include "distsort.hpp" -//! Performance timers for each one of the "costly" functions -Timing TfullSort, Texchange, Tminmax, TelbowSort; - - bool isActive(mpi_id_t node, size_t nodes) { if (!((nodes > 0) && - (nodes <= std::numeric_limits::max()) )) + (nodes <= static_cast(std::numeric_limits::max())) )) throw std::runtime_error("(isActive) Non-acceptable value of MPI Nodes\n"); // ^ Assert that mpi_id_t can hold nodes, and thus we can cast without data loss! @@ -24,12 +20,14 @@ bool isActive(mpi_id_t node, size_t nodes) { } size_t tagGenerator(size_t depth, size_t step, size_t stage) { - auto stage_bits = static_cast(std::log2(MAX_PIPELINE_SIZE)); - auto step_bits = static_cast(std::log2(MAX_MPI_SIZE)); + auto stage_bits = static_cast(std::log2(MAX_PIPELINE_SIZE)); + auto step_bits = static_cast(std::log2(MAX_MPI_SIZE)); + uint32_t stat_bit = 1UL; // ^ We use MPI_SIZE room for steps to fit the bubbletonic version + // [ depth | step | stage+stats ] size_t tag = stage - | (step << stage_bits) - | (depth << (stage_bits + step_bits)); + | (step << (stage_bits + stat_bit)) + | (depth << (stage_bits + step_bits + stat_bit)); return tag; } diff --git a/homework_2/src/main.cpp b/homework_2/src/main.cpp index dafa1e0..1eded9f 100644 --- a/homework_2/src/main.cpp +++ b/homework_2/src/main.cpp @@ -22,7 +22,36 @@ config_t config; MPI_t<> mpi; distBuffer_t Data; Log logger; -Timing Ttotal; +distStat_t localStat, remoteStat; + +//! Performance timers for each one of the "costly" functions +Timing Ttotal; +Timing TfullSort; +Timing Texchange; +Timing Tminmax; +Timing TelbowSort; + +//! Init timing objects for extra rounds +void measurements_init() { + if (config.perf > 1) { + Ttotal.init(config.perf); + TfullSort.init(config.perf); + Texchange.init(config.perf); + Tminmax.init(config.perf); + TelbowSort.init(config.perf); + } +} + +//! iterate ot the next round of measurements for all measurement objects +void measurements_next() { + if (config.perf > 1) { + Ttotal.next(); + TfullSort.next(); + Texchange.next(); + Tminmax.next(); + TelbowSort.next(); + } +} /*! * A small command line argument parser @@ -43,6 +72,9 @@ bool get_options(int argc, char* argv[]){ status = false; } } + else if (arg == "-e" || arg == "--exchange-opt") { + config.exchangeOpt = true; + } else if (arg == "--pipeline") { if (i+1 < argc) { auto stages = atoi(argv[++i]); @@ -59,7 +91,12 @@ bool get_options(int argc, char* argv[]){ config.validation = true; } else if (arg == "--perf") { - config.perf = true; + if (i+1 < argc) { + config.perf = atoi(argv[++i]); + } + else { + status = false; + } } else if (arg == "--ndebug") { config.ndebug = true; @@ -68,22 +105,25 @@ bool get_options(int argc, char* argv[]){ config.verbose = true; } else if (arg == "-h" || arg == "--help") { - std::cout << "distbitonic/distbubbletonic - A distributed bitonic sort\n\n"; - std::cout << "distbitonic -q [--pipeline N] [--validation] [--ndebug] [-v]\n"; + std::cout << "distbitonic/distbubbletonic - A distributed sort utility\n\n"; + std::cout << "distbitonic -q [-e] [-p | --pipeline N] [--validation] [--perf] [--ndebug] [-v]\n"; std::cout << "distbitonic -h\n"; - std::cout << "distbubbletonic -q [--pipeline N] [--validation] [--ndebug] [-v]\n"; + std::cout << "distbubbletonic -q [-e] [-p | --pipeline N] [--validation] [--perf] [--ndebug] [-v]\n"; std::cout << "distbubbletonic -h\n"; std::cout << '\n'; std::cout << "Options:\n\n"; std::cout << " -q | --array-size \n"; std::cout << " Selects the array size according to size = 2^N\n\n"; - std::cout << " --pipeline \n"; + std::cout << " -e | --exchange-opt\n"; + std::cout << " Request an MPI data exchange optimization \n\n"; + std::cout << " -p | --pipeline \n"; std::cout << " Request a pipeline of stages for exchange-minmax\n"; std::cout << " N must be power of 2 up to " << MAX_PIPELINE_SIZE << "\n\n"; std::cout << " --validation\n"; std::cout << " Request a full validation at the end, performed by process rank 0\n\n"; - std::cout << " --perf\n"; - std::cout << " Request performance timing measurements to stdout.\n\n"; + std::cout << " --perf \n"; + std::cout << " Enable performance timing measurements and prints, and repeat\n"; + std::cout << " the sorting times to average the measurements\n\n"; std::cout << " --ndebug\n"; std::cout << " Skip debug breakpoint when on debug build.\n\n"; std::cout << " -v | --verbose\n"; @@ -167,8 +207,8 @@ int main(int argc, char* argv[]) try { " Size: " << mpi.size() << logger.endl; -#if defined DEBUG -#if defined TESTING + #if defined DEBUG + #if defined TESTING /* * In case of a debug build we will wait here until sleep_wait * will reset via debugger. In order to do that the user must attach @@ -179,12 +219,12 @@ int main(int argc, char* argv[]) try { * $> gdb */ volatile bool sleep_wait = false; -#else + #else volatile bool sleep_wait = true; -#endif + #endif while (sleep_wait && !config.ndebug) sleep(1); -#endif + #endif // Initialize local data logger << "Initialize local array of " << config.arraySize << " elements" << logger.endl; @@ -201,24 +241,27 @@ int main(int argc, char* argv[]) try { // Run distributed sort if (mpi.rank() == 0) logger << "Starting distributed sorting ... "; - Ttotal.start(); + measurements_init(); + for (size_t it = 0 ; it < config.perf ; ++it) { + Ttotal.start(); #if CODE_VERSION == BUBBLETONIC - distBubbletonic(Data, mpi.size(), mpi.rank()); + distBubbletonic(Data, mpi.size(), mpi.rank()); #else - distBitonic (Data, mpi.size(), mpi.rank()); + distBitonic(Data, mpi.size(), mpi.rank()); #endif - Ttotal.stop(); + Ttotal.stop(); + measurements_next(); + } if (mpi.rank() == 0) logger << " Done." << logger.endl; - // Print-outs and validation - if (config.perf) { - Ttotal.print_duration("Total ", mpi.rank()); - TfullSort.print_duration("Full-Sort ", mpi.rank()); - Texchange.print_duration("Exchange ", mpi.rank()); - Tminmax.print_duration("Min-Max ", mpi.rank()); - TelbowSort.print_duration("Elbow-Sort", mpi.rank()); + if (config.perf > 1) { + Timing::print_duration(Ttotal.median(), "Total ", mpi.rank()); + Timing::print_duration(TfullSort.median(), "Full-Sort ", mpi.rank()); + Timing::print_duration(Texchange.median(), "Exchange ", mpi.rank()); + Timing::print_duration(Texchange.median(), "Min-Max ", mpi.rank()); + Timing::print_duration(TelbowSort.median(),"Elbow-Sort", mpi.rank()); } if (config.validation) { // If requested, we have the chance to fail!