LCOV - code coverage report
Current view: top level - src - checkqueue.h (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 68 69 98.6 %
Date: 2025-02-23 09:33:43 Functions: 7 13 53.8 %

          Line data    Source code
       1             : // Copyright (c) 2012-2014 The Bitcoin developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #ifndef PIVX_CHECKQUEUE_H
       6             : #define PIVX_CHECKQUEUE_H
       7             : 
       8             : #include <algorithm>
       9             : #include <vector>
      10             : 
      11             : #include <boost/thread/condition_variable.hpp>
      12             : #include <boost/thread/mutex.hpp>
      13             : 
      14             : template <typename T>
      15             : class CCheckQueueControl;
      16             : 
      17             : /**
      18             :  * Queue for verifications that have to be performed.
      19             :   * The verifications are represented by a type T, which must provide an
      20             :   * operator(), returning a bool.
      21             :   *
      22             :   * One thread (the master) is assumed to push batches of verifications
      23             :   * onto the queue, where they are processed by N-1 worker threads. When
      24             :   * the master is done adding work, it temporarily joins the worker pool
      25             :   * as an N'th worker, until all jobs are done.
      26             :   */
      27             : template <typename T>
      28             : class CCheckQueue
      29             : {
      30             : private:
      31             :     //! Mutex to protect the inner state
      32             :     boost::mutex mutex;
      33             : 
      34             :     //! Worker threads block on this when out of work
      35             :     boost::condition_variable condWorker;
      36             : 
      37             :     //! Master thread blocks on this when out of work
      38             :     boost::condition_variable condMaster;
      39             : 
      40             :     //! The queue of elements to be processed.
      41             :     //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
      42             :     std::vector<T> queue;
      43             : 
      44             :     //! The number of workers (including the master) that are idle.
      45             :     int nIdle;
      46             : 
      47             :     //! The total number of workers (including the master).
      48             :     int nTotal;
      49             : 
      50             :     //! The temporary evaluation result.
      51             :     bool fAllOk;
      52             : 
      53             :     /**
      54             :      * Number of verifications that haven't completed yet.
      55             :      * This includes elements that are not anymore in queue, but still in
      56             :      * worker's own batches.
      57             :      */
      58             :     unsigned int nTodo;
      59             : 
      60             :     //! The maximum number of elements to be processed in one batch
      61             :     unsigned int nBatchSize;
      62             : 
      63             :     /** Internal function that does bulk of the verification work. */
      64       59332 :     bool Loop(bool fMaster = false)
      65             :     {
      66       59332 :         boost::condition_variable& cond = fMaster ? condMaster : condWorker;
      67        2888 :         std::vector<T> vChecks;
      68       59332 :         vChecks.reserve(nBatchSize);
      69             :         unsigned int nNow = 0;
      70             :         bool fOk = true;
      71             :         do {
      72             :             {
      73        2888 :                 boost::unique_lock<boost::mutex> lock(mutex);
      74             :                 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
      75      664167 :                 if (nNow) {
      76      604835 :                     fAllOk &= fOk;
      77      604835 :                     nTodo -= nNow;
      78      604835 :                     if (nTodo == 0 && !fMaster)
      79             :                         // We processed the last element; inform the master he can exit and return the result
      80       94062 :                         condMaster.notify_one();
      81             :                 } else {
      82             :                     // first iteration
      83       59332 :                     nTotal++;
      84             :                 }
      85             :                 // logically, the do loop starts here
      86     1593519 :                 while (queue.empty()) {
      87      988685 :                     if (fMaster && nTodo == 0) {
      88       56444 :                         nTotal--;
      89       56444 :                         bool fRet = fAllOk;
      90             :                         // reset the status for new work later
      91             :                         if (fMaster)
      92       56444 :                             fAllOk = true;
      93             :                         // return the current status
      94      112888 :                         return fRet;
      95             :                     }
      96      932241 :                     nIdle++;
      97      932241 :                     cond.wait(lock); // wait
      98      929353 :                     nIdle--;
      99             :                 }
     100             :                 // Decide how many work units to process now.
     101             :                 // * Do not try to do everything at once, but aim for increasingly smaller batches so
     102             :                 //   all workers finish approximately simultaneously.
     103             :                 // * Try to account for idle jobs which will instantly start helping.
     104             :                 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
     105     1209340 :                 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
     106      604835 :                 vChecks.resize(nNow);
     107     1404766 :                 for (unsigned int i = 0; i < nNow; i++) {
     108             :                     // We want the lock on the mutex to be as short as possible, so swap jobs from the global
     109             :                     // queue to the local batch vector instead of copying.
     110      799927 :                     vChecks[i].swap(queue.back());
     111      799927 :                     queue.pop_back();
     112             :                 }
     113             :                 // Check whether we need to do work at all
     114      604835 :                 fOk = fAllOk;
     115             :             }
     116             :             // execute work
     117     1404766 :             for (T& check : vChecks)
     118      799927 :                 if (fOk)
     119      799927 :                     fOk = check();
     120      664167 :             vChecks.clear();
     121             :         } while (true);
     122             :     }
     123             : 
     124             : public:
     125             :     //! Create a new check queue
     126         480 :     explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}
     127             : 
     128             :     //! Worker thread
     129        2888 :     void Thread()
     130             :     {
     131        2888 :         Loop();
     132           0 :     }
     133             : 
     134             :     //! Wait until execution finishes, and return whether all evaluations where successful.
     135       56444 :     bool Wait()
     136             :     {
     137       56444 :         return Loop(true);
     138             :     }
     139             : 
     140             :     //! Add a batch of checks to the queue
     141      433390 :     void Add(std::vector<T>& vChecks)
     142             :     {
     143      433390 :         boost::unique_lock<boost::mutex> lock(mutex);
     144     1233317 :         for (T& check : vChecks) {
     145      799927 :             queue.push_back(T());
     146      799927 :             check.swap(queue.back());
     147             :         }
     148      433390 :         nTodo += vChecks.size();
     149      433390 :         if (vChecks.size() == 1)
     150       62720 :             condWorker.notify_one();
     151      370670 :         else if (vChecks.size() > 1)
     152      367997 :             condWorker.notify_all();
     153      433390 :     }
     154             : 
     155         480 :     ~CCheckQueue()
     156             :     {
     157         480 :     }
     158             : 
     159       56444 :     bool IsIdle()
     160             :     {
     161       56444 :         boost::unique_lock<boost::mutex> lock(mutex);
     162       56444 :         return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
     163             :     }
     164             : };
     165             : 
     166             : /**
     167             :  * RAII-style controller object for a CCheckQueue that guarantees the passed
     168             :  * queue is finished before continuing.
     169             :  */
     170             : template <typename T>
     171             : class CCheckQueueControl
     172             : {
     173             : private:
     174             :     CCheckQueue<T>* pqueue;
     175             :     bool fDone;
     176             : 
     177             : public:
     178       56444 :     explicit CCheckQueueControl(CCheckQueue<T>* pqueueIn) : pqueue(pqueueIn), fDone(false)
     179             :     {
     180             :         // passed queue is supposed to be unused, or nullptr
     181       56444 :         if (pqueue != nullptr) {
     182       56444 :             bool isIdle = pqueue->IsIdle();
     183       56444 :             assert(isIdle);
     184             :         }
     185       56444 :     }
     186             : 
     187       56444 :     bool Wait()
     188             :     {
     189       56444 :         if (pqueue == nullptr)
     190             :             return true;
     191       56444 :         bool fRet = pqueue->Wait();
     192       56411 :         fDone = true;
     193          33 :         return fRet;
     194             :     }
     195             : 
     196      433390 :     void Add(std::vector<T>& vChecks)
     197             :     {
     198      433390 :         if (pqueue != nullptr)
     199      433390 :             pqueue->Add(vChecks);
     200             :     }
     201             : 
     202       56444 :     ~CCheckQueueControl()
     203             :     {
     204       56444 :         if (!fDone)
     205       56444 :             Wait();
     206       56444 :     }
     207             : };
     208             : 
     209             : #endif // PIVX_CHECKQUEUE_H

Generated by: LCOV version 1.14