Line data Source code
1 : // Copyright (c) 2015 The Bitcoin Core developers 2 : // Copyright (c) 2017-2021 The PIVX Core developers 3 : // Distributed under the MIT software license, see the accompanying 4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 : 6 : #include "scheduler.h" 7 : 8 : #include "random.h" 9 : 10 : #include <assert.h> 11 : #include <utility> 12 : 13 511 : CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) 14 : { 15 511 : } 16 : 17 511 : CScheduler::~CScheduler() 18 : { 19 511 : assert(nThreadsServicingQueue == 0); 20 511 : } 21 : 22 509 : void CScheduler::serviceQueue() 23 : { 24 509 : WAIT_LOCK(newTaskMutex, lock); 25 509 : ++nThreadsServicingQueue; 26 : 27 : // newTaskMutex is locked throughout this loop EXCEPT 28 : // when the thread is waiting or when the user's function 29 : // is called. 30 616774 : while (!shouldStop()) { 31 616265 : try { 32 616292 : if (!shouldStop() && taskQueue.empty()) { 33 2959 : REVERSE_LOCK(lock); 34 : } 35 1238330 : while (!shouldStop() && taskQueue.empty()) { 36 : // Wait until there is something to do. 37 2957 : newTaskScheduled.wait(lock); 38 : } 39 : 40 : // Wait until either there is a new task, or until 41 : // the time of the first item on the queue: 42 : 43 1566276 : while (!shouldStop() && !taskQueue.empty()) { 44 782895 : std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; 45 782895 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { 46 : break; // Exit loop after timeout, it means we reached the time of the event 47 : } 48 : } 49 : 50 : // If there are multiple threads, the queue can empty while we're waiting (another 51 : // thread may service the task we were waiting on). 52 1232036 : if (shouldStop() || taskQueue.empty()) 53 503 : continue; 54 : 55 1231522 : Function f = taskQueue.begin()->second; 56 615762 : taskQueue.erase(taskQueue.begin()); 57 : 58 615762 : { 59 : // Unlock before calling f, so it can reschedule itself or another task 60 : // without deadlocking: 61 1231522 : REVERSE_LOCK(lock); 62 615762 : f(); 63 : } 64 0 : } catch (...) { 65 0 : --nThreadsServicingQueue; 66 0 : throw; 67 : } 68 : } 69 509 : --nThreadsServicingQueue; 70 509 : newTaskScheduled.notify_one(); 71 509 : } 72 : 73 498 : void CScheduler::stop(bool drain) 74 : { 75 498 : { 76 498 : LOCK(newTaskMutex); 77 498 : if (drain) 78 2 : stopWhenEmpty = true; 79 : else 80 496 : stopRequested = true; 81 : } 82 498 : newTaskScheduled.notify_all(); 83 498 : } 84 : 85 617932 : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) 86 : { 87 617932 : { 88 617932 : LOCK(newTaskMutex); 89 617932 : taskQueue.emplace(t, f); 90 : } 91 617932 : newTaskScheduled.notify_one(); 92 617932 : } 93 : 94 86558 : void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) 95 : { 96 86558 : schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds)); 97 86558 : } 98 : 99 42228 : static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds) 100 : { 101 42228 : f(); 102 84456 : s->scheduleFromNow(std::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds); 103 42228 : } 104 : 105 2131 : void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds) 106 : { 107 4262 : scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds); 108 2131 : } 109 : 110 2 : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, 111 : std::chrono::system_clock::time_point &last) const 112 : { 113 2 : LOCK(newTaskMutex); 114 2 : size_t result = taskQueue.size(); 115 2 : if (!taskQueue.empty()) { 116 1 : first = taskQueue.begin()->first; 117 1 : last = taskQueue.rbegin()->first; 118 : } 119 2 : return result; 120 : } 121 : 122 494 : bool CScheduler::AreThreadsServicingQueue() const { 123 494 : LOCK(newTaskMutex); 124 494 : return nThreadsServicingQueue; 125 : } 126 : 127 814382 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { 128 814382 : { 129 814382 : LOCK(m_cs_callbacks_pending); 130 : // Try to avoid scheduling too many copies here, but if we 131 : // accidentally have two ProcessQueue's scheduled at once its 132 : // not a big deal. 133 1097792 : if (m_are_callbacks_running) return; 134 699486 : if (m_callbacks_pending.empty()) return; 135 : } 136 1061944 : m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now()); 137 : } 138 : 139 531451 : void SingleThreadedSchedulerClient::ProcessQueue() { 140 938642 : std::function<void (void)> callback; 141 531451 : { 142 531451 : LOCK(m_cs_callbacks_pending); 143 655711 : if (m_are_callbacks_running) return; 144 531438 : if (m_callbacks_pending.empty()) return; 145 407191 : m_are_callbacks_running = true; 146 : 147 407191 : callback = std::move(m_callbacks_pending.front()); 148 407191 : m_callbacks_pending.pop_front(); 149 : } 150 : 151 : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue 152 : // to ensure both happen safely even if callback() throws. 153 407191 : struct RAIICallbacksRunning { 154 : SingleThreadedSchedulerClient* instance; 155 407191 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} 156 814382 : ~RAIICallbacksRunning() { 157 407191 : { 158 407191 : LOCK(instance->m_cs_callbacks_pending); 159 407191 : instance->m_are_callbacks_running = false; 160 : } 161 407191 : instance->MaybeScheduleProcessQueue(); 162 407191 : } 163 407191 : } raiicallbacksrunning(this); 164 : 165 407191 : callback(); 166 : } 167 : 168 407191 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) { 169 407191 : assert(m_pscheduler); 170 : 171 407191 : { 172 407191 : LOCK(m_cs_callbacks_pending); 173 814382 : m_callbacks_pending.emplace_back(std::move(func)); 174 : } 175 407191 : MaybeScheduleProcessQueue(); 176 407191 : } 177 : 178 494 : void SingleThreadedSchedulerClient::EmptyQueue() { 179 494 : assert(!m_pscheduler->AreThreadsServicingQueue()); 180 : bool should_continue = true; 181 1003 : while (should_continue) { 182 509 : ProcessQueue(); 183 1018 : LOCK(m_cs_callbacks_pending); 184 509 : should_continue = !m_callbacks_pending.empty(); 185 : } 186 494 : } 187 : 188 45528 : size_t SingleThreadedSchedulerClient::CallbacksPending() { 189 45528 : LOCK(m_cs_callbacks_pending); 190 45528 : return m_callbacks_pending.size(); 191 : }