Line data Source code
1 : // Copyright (c) 2012-2014 The Bitcoin developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #ifndef PIVX_CHECKQUEUE_H 6 : #define PIVX_CHECKQUEUE_H 7 : 8 : #include <algorithm> 9 : #include <vector> 10 : 11 : #include <boost/thread/condition_variable.hpp> 12 : #include <boost/thread/mutex.hpp> 13 : 14 : template <typename T> 15 : class CCheckQueueControl; 16 : 17 : /** 18 : * Queue for verifications that have to be performed. 19 : * The verifications are represented by a type T, which must provide an 20 : * operator(), returning a bool. 21 : * 22 : * One thread (the master) is assumed to push batches of verifications 23 : * onto the queue, where they are processed by N-1 worker threads. When 24 : * the master is done adding work, it temporarily joins the worker pool 25 : * as an N'th worker, until all jobs are done. 26 : */ 27 : template <typename T> 28 : class CCheckQueue 29 : { 30 : private: 31 : //! Mutex to protect the inner state 32 : boost::mutex mutex; 33 : 34 : //! Worker threads block on this when out of work 35 : boost::condition_variable condWorker; 36 : 37 : //! Master thread blocks on this when out of work 38 : boost::condition_variable condMaster; 39 : 40 : //! The queue of elements to be processed. 41 : //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 42 : std::vector<T> queue; 43 : 44 : //! The number of workers (including the master) that are idle. 45 : int nIdle; 46 : 47 : //! The total number of workers (including the master). 48 : int nTotal; 49 : 50 : //! The temporary evaluation result. 51 : bool fAllOk; 52 : 53 : /** 54 : * Number of verifications that haven't completed yet. 55 : * This includes elements that are not anymore in queue, but still in 56 : * worker's own batches. 57 : */ 58 : unsigned int nTodo; 59 : 60 : //! The maximum number of elements to be processed in one batch 61 : unsigned int nBatchSize; 62 : 63 : /** Internal function that does bulk of the verification work. */ 64 59332 : bool Loop(bool fMaster = false) 65 : { 66 59332 : boost::condition_variable& cond = fMaster ? condMaster : condWorker; 67 2888 : std::vector<T> vChecks; 68 59332 : vChecks.reserve(nBatchSize); 69 : unsigned int nNow = 0; 70 : bool fOk = true; 71 : do { 72 : { 73 2888 : boost::unique_lock<boost::mutex> lock(mutex); 74 : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 75 664167 : if (nNow) { 76 604835 : fAllOk &= fOk; 77 604835 : nTodo -= nNow; 78 604835 : if (nTodo == 0 && !fMaster) 79 : // We processed the last element; inform the master he can exit and return the result 80 94062 : condMaster.notify_one(); 81 : } else { 82 : // first iteration 83 59332 : nTotal++; 84 : } 85 : // logically, the do loop starts here 86 1593519 : while (queue.empty()) { 87 988685 : if (fMaster && nTodo == 0) { 88 56444 : nTotal--; 89 56444 : bool fRet = fAllOk; 90 : // reset the status for new work later 91 : if (fMaster) 92 56444 : fAllOk = true; 93 : // return the current status 94 112888 : return fRet; 95 : } 96 932241 : nIdle++; 97 932241 : cond.wait(lock); // wait 98 929353 : nIdle--; 99 : } 100 : // Decide how many work units to process now. 101 : // * Do not try to do everything at once, but aim for increasingly smaller batches so 102 : // all workers finish approximately simultaneously. 103 : // * Try to account for idle jobs which will instantly start helping. 104 : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 105 1209340 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 106 604835 : vChecks.resize(nNow); 107 1404766 : for (unsigned int i = 0; i < nNow; i++) { 108 : // We want the lock on the mutex to be as short as possible, so swap jobs from the global 109 : // queue to the local batch vector instead of copying. 110 799927 : vChecks[i].swap(queue.back()); 111 799927 : queue.pop_back(); 112 : } 113 : // Check whether we need to do work at all 114 604835 : fOk = fAllOk; 115 : } 116 : // execute work 117 1404766 : for (T& check : vChecks) 118 799927 : if (fOk) 119 799927 : fOk = check(); 120 664167 : vChecks.clear(); 121 : } while (true); 122 : } 123 : 124 : public: 125 : //! Create a new check queue 126 480 : explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} 127 : 128 : //! Worker thread 129 2888 : void Thread() 130 : { 131 2888 : Loop(); 132 0 : } 133 : 134 : //! Wait until execution finishes, and return whether all evaluations where successful. 135 56444 : bool Wait() 136 : { 137 56444 : return Loop(true); 138 : } 139 : 140 : //! Add a batch of checks to the queue 141 433390 : void Add(std::vector<T>& vChecks) 142 : { 143 433390 : boost::unique_lock<boost::mutex> lock(mutex); 144 1233317 : for (T& check : vChecks) { 145 799927 : queue.push_back(T()); 146 799927 : check.swap(queue.back()); 147 : } 148 433390 : nTodo += vChecks.size(); 149 433390 : if (vChecks.size() == 1) 150 62720 : condWorker.notify_one(); 151 370670 : else if (vChecks.size() > 1) 152 367997 : condWorker.notify_all(); 153 433390 : } 154 : 155 480 : ~CCheckQueue() 156 : { 157 480 : } 158 : 159 56444 : bool IsIdle() 160 : { 161 56444 : boost::unique_lock<boost::mutex> lock(mutex); 162 56444 : return (nTotal == nIdle && nTodo == 0 && fAllOk == true); 163 : } 164 : }; 165 : 166 : /** 167 : * RAII-style controller object for a CCheckQueue that guarantees the passed 168 : * queue is finished before continuing. 169 : */ 170 : template <typename T> 171 : class CCheckQueueControl 172 : { 173 : private: 174 : CCheckQueue<T>* pqueue; 175 : bool fDone; 176 : 177 : public: 178 56444 : explicit CCheckQueueControl(CCheckQueue<T>* pqueueIn) : pqueue(pqueueIn), fDone(false) 179 : { 180 : // passed queue is supposed to be unused, or nullptr 181 56444 : if (pqueue != nullptr) { 182 56444 : bool isIdle = pqueue->IsIdle(); 183 56444 : assert(isIdle); 184 : } 185 56444 : } 186 : 187 56444 : bool Wait() 188 : { 189 56444 : if (pqueue == nullptr) 190 : return true; 191 56444 : bool fRet = pqueue->Wait(); 192 56411 : fDone = true; 193 33 : return fRet; 194 : } 195 : 196 433390 : void Add(std::vector<T>& vChecks) 197 : { 198 433390 : if (pqueue != nullptr) 199 433390 : pqueue->Add(vChecks); 200 : } 201 : 202 56444 : ~CCheckQueueControl() 203 : { 204 56444 : if (!fDone) 205 56444 : Wait(); 206 56444 : } 207 : }; 208 : 209 : #endif // PIVX_CHECKQUEUE_H