Line data Source code
1 : // Copyright (c) 2015 The Bitcoin Core developers 2 : // Copyright (c) 2017-2021 The PIVX Core developers 3 : // Distributed under the MIT software license, see the accompanying 4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 : 6 : #include "scheduler.h" 7 : 8 : #include "random.h" 9 : 10 : #include <assert.h> 11 : #include <utility> 12 : 13 511 : CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) 14 : { 15 511 : } 16 : 17 511 : CScheduler::~CScheduler() 18 : { 19 511 : assert(nThreadsServicingQueue == 0); 20 511 : } 21 : 22 509 : void CScheduler::serviceQueue() 23 : { 24 509 : WAIT_LOCK(newTaskMutex, lock); 25 509 : ++nThreadsServicingQueue; 26 : 27 : // newTaskMutex is locked throughout this loop EXCEPT 28 : // when the thread is waiting or when the user's function 29 : // is called. 30 598034 : while (!shouldStop()) { 31 597525 : try { 32 597528 : if (!shouldStop() && taskQueue.empty()) { 33 3026 : REVERSE_LOCK(lock); 34 : } 35 1200958 : while (!shouldStop() && taskQueue.empty()) { 36 : // Wait until there is something to do. 37 3009 : newTaskScheduled.wait(lock); 38 : } 39 : 40 : // Wait until either there is a new task, or until 41 : // the time of the first item on the queue: 42 : 43 1521240 : while (!shouldStop() && !taskQueue.empty()) { 44 760379 : std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; 45 760379 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { 46 : break; // Exit loop after timeout, it means we reached the time of the event 47 : } 48 : } 49 : 50 : // If there are multiple threads, the queue can empty while we're waiting (another 51 : // thread may service the task we were waiting on). 52 1194549 : if (shouldStop() || taskQueue.empty()) 53 572 : continue; 54 : 55 1193908 : Function f = taskQueue.begin()->second; 56 596953 : taskQueue.erase(taskQueue.begin()); 57 : 58 596953 : { 59 : // Unlock before calling f, so it can reschedule itself or another task 60 : // without deadlocking: 61 1193908 : REVERSE_LOCK(lock); 62 596953 : f(); 63 : } 64 0 : } catch (...) { 65 0 : --nThreadsServicingQueue; 66 0 : throw; 67 : } 68 : } 69 509 : --nThreadsServicingQueue; 70 509 : newTaskScheduled.notify_one(); 71 509 : } 72 : 73 498 : void CScheduler::stop(bool drain) 74 : { 75 498 : { 76 498 : LOCK(newTaskMutex); 77 498 : if (drain) 78 2 : stopWhenEmpty = true; 79 : else 80 496 : stopRequested = true; 81 : } 82 498 : newTaskScheduled.notify_all(); 83 498 : } 84 : 85 599145 : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) 86 : { 87 599145 : { 88 599145 : LOCK(newTaskMutex); 89 599145 : taskQueue.emplace(t, f); 90 : } 91 599145 : newTaskScheduled.notify_one(); 92 599145 : } 93 : 94 81737 : void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) 95 : { 96 81737 : schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds)); 97 81737 : } 98 : 99 37768 : static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds) 100 : { 101 37768 : f(); 102 75536 : s->scheduleFromNow(std::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds); 103 37768 : } 104 : 105 2131 : void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds) 106 : { 107 4262 : scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds); 108 2131 : } 109 : 110 2 : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, 111 : std::chrono::system_clock::time_point &last) const 112 : { 113 2 : LOCK(newTaskMutex); 114 2 : size_t result = taskQueue.size(); 115 2 : if (!taskQueue.empty()) { 116 1 : first = taskQueue.begin()->first; 117 1 : last = taskQueue.rbegin()->first; 118 : } 119 2 : return result; 120 : } 121 : 122 494 : bool CScheduler::AreThreadsServicingQueue() const { 123 494 : LOCK(newTaskMutex); 124 494 : return nThreadsServicingQueue; 125 : } 126 : 127 764150 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { 128 764150 : { 129 764150 : LOCK(m_cs_callbacks_pending); 130 : // Try to avoid scheduling too many copies here, but if we 131 : // accidentally have two ProcessQueue's scheduled at once its 132 : // not a big deal. 133 1011292 : if (m_are_callbacks_running) return; 134 681474 : if (m_callbacks_pending.empty()) return; 135 : } 136 1034020 : m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now()); 137 : } 138 : 139 517472 : void SingleThreadedSchedulerClient::ProcessQueue() { 140 899547 : std::function<void (void)> callback; 141 517472 : { 142 517472 : LOCK(m_cs_callbacks_pending); 143 652869 : if (m_are_callbacks_running) return; 144 517460 : if (m_callbacks_pending.empty()) return; 145 382075 : m_are_callbacks_running = true; 146 : 147 382075 : callback = std::move(m_callbacks_pending.front()); 148 382075 : m_callbacks_pending.pop_front(); 149 : } 150 : 151 : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue 152 : // to ensure both happen safely even if callback() throws. 153 382075 : struct RAIICallbacksRunning { 154 : SingleThreadedSchedulerClient* instance; 155 382075 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} 156 764150 : ~RAIICallbacksRunning() { 157 382075 : { 158 382075 : LOCK(instance->m_cs_callbacks_pending); 159 382075 : instance->m_are_callbacks_running = false; 160 : } 161 382075 : instance->MaybeScheduleProcessQueue(); 162 382075 : } 163 382075 : } raiicallbacksrunning(this); 164 : 165 382075 : callback(); 166 : } 167 : 168 382075 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) { 169 382075 : assert(m_pscheduler); 170 : 171 382075 : { 172 382075 : LOCK(m_cs_callbacks_pending); 173 764150 : m_callbacks_pending.emplace_back(std::move(func)); 174 : } 175 382075 : MaybeScheduleProcessQueue(); 176 382075 : } 177 : 178 494 : void SingleThreadedSchedulerClient::EmptyQueue() { 179 494 : assert(!m_pscheduler->AreThreadsServicingQueue()); 180 : bool should_continue = true; 181 1006 : while (should_continue) { 182 512 : ProcessQueue(); 183 1024 : LOCK(m_cs_callbacks_pending); 184 512 : should_continue = !m_callbacks_pending.empty(); 185 : } 186 494 : } 187 : 188 45147 : size_t SingleThreadedSchedulerClient::CallbacksPending() { 189 45147 : LOCK(m_cs_callbacks_pending); 190 45147 : return m_callbacks_pending.size(); 191 : }