LCOV - code coverage report
Current view: top level - src - scheduler.cpp (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 103 106 97.2 %
Date: 2025-02-23 09:33:43 Functions: 16 16 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2015 The Bitcoin Core developers
       2             : // Copyright (c) 2017-2021 The PIVX Core developers
       3             : // Distributed under the MIT software license, see the accompanying
       4             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       5             : 
       6             : #include "scheduler.h"
       7             : 
       8             : #include "random.h"
       9             : 
      10             : #include <assert.h>
      11             : #include <utility>
      12             : 
      13         511 : CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
      14             : {
      15         511 : }
      16             : 
      17         511 : CScheduler::~CScheduler()
      18             : {
      19         511 :     assert(nThreadsServicingQueue == 0);
      20         511 : }
      21             : 
      22         509 : void CScheduler::serviceQueue()
      23             : {
      24         509 :     WAIT_LOCK(newTaskMutex, lock);
      25         509 :     ++nThreadsServicingQueue;
      26             : 
      27             :     // newTaskMutex is locked throughout this loop EXCEPT
      28             :     // when the thread is waiting or when the user's function
      29             :     // is called.
      30      616774 :     while (!shouldStop()) {
      31      616265 :         try {
      32      616292 :             if (!shouldStop() && taskQueue.empty()) {
      33        2959 :                 REVERSE_LOCK(lock);
      34             :             }
      35     1238330 :             while (!shouldStop() && taskQueue.empty()) {
      36             :                 // Wait until there is something to do.
      37        2957 :                 newTaskScheduled.wait(lock);
      38             :             }
      39             : 
      40             :             // Wait until either there is a new task, or until
      41             :             // the time of the first item on the queue:
      42             : 
      43     1566276 :             while (!shouldStop() && !taskQueue.empty()) {
      44      782895 :                 std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
      45      782895 :                 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
      46             :                     break; // Exit loop after timeout, it means we reached the time of the event
      47             :                 }
      48             :             }
      49             : 
      50             :             // If there are multiple threads, the queue can empty while we're waiting (another
      51             :             // thread may service the task we were waiting on).
      52     1232036 :             if (shouldStop() || taskQueue.empty())
      53         503 :                 continue;
      54             : 
      55     1231522 :             Function f = taskQueue.begin()->second;
      56      615762 :             taskQueue.erase(taskQueue.begin());
      57             : 
      58      615762 :             {
      59             :                 // Unlock before calling f, so it can reschedule itself or another task
      60             :                 // without deadlocking:
      61     1231522 :                 REVERSE_LOCK(lock);
      62      615762 :                 f();
      63             :             }
      64           0 :         } catch (...) {
      65           0 :             --nThreadsServicingQueue;
      66           0 :             throw;
      67             :         }
      68             :     }
      69         509 :     --nThreadsServicingQueue;
      70         509 :     newTaskScheduled.notify_one();
      71         509 : }
      72             : 
      73         498 : void CScheduler::stop(bool drain)
      74             : {
      75         498 :     {
      76         498 :         LOCK(newTaskMutex);
      77         498 :         if (drain)
      78           2 :             stopWhenEmpty = true;
      79             :         else
      80         496 :             stopRequested = true;
      81             :     }
      82         498 :     newTaskScheduled.notify_all();
      83         498 : }
      84             : 
      85      617932 : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
      86             : {
      87      617932 :     {
      88      617932 :         LOCK(newTaskMutex);
      89      617932 :         taskQueue.emplace(t, f);
      90             :     }
      91      617932 :     newTaskScheduled.notify_one();
      92      617932 : }
      93             : 
      94       86558 : void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
      95             : {
      96       86558 :     schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds));
      97       86558 : }
      98             : 
      99       42228 : static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds)
     100             : {
     101       42228 :     f();
     102       84456 :     s->scheduleFromNow(std::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds);
     103       42228 : }
     104             : 
     105        2131 : void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds)
     106             : {
     107        4262 :     scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
     108        2131 : }
     109             : 
     110           2 : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
     111             :                              std::chrono::system_clock::time_point &last) const
     112             : {
     113           2 :     LOCK(newTaskMutex);
     114           2 :     size_t result = taskQueue.size();
     115           2 :     if (!taskQueue.empty()) {
     116           1 :         first = taskQueue.begin()->first;
     117           1 :         last = taskQueue.rbegin()->first;
     118             :     }
     119           2 :     return result;
     120             : }
     121             : 
     122         494 : bool CScheduler::AreThreadsServicingQueue() const {
     123         494 :     LOCK(newTaskMutex);
     124         494 :     return nThreadsServicingQueue;
     125             : }
     126             : 
     127      814382 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
     128      814382 :     {
     129      814382 :         LOCK(m_cs_callbacks_pending);
     130             :         // Try to avoid scheduling too many copies here, but if we
     131             :         // accidentally have two ProcessQueue's scheduled at once its
     132             :         // not a big deal.
     133     1097792 :         if (m_are_callbacks_running) return;
     134      699486 :         if (m_callbacks_pending.empty()) return;
     135             :     }
     136     1061944 :     m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
     137             : }
     138             : 
     139      531451 : void SingleThreadedSchedulerClient::ProcessQueue() {
     140      938642 :     std::function<void (void)> callback;
     141      531451 :     {
     142      531451 :         LOCK(m_cs_callbacks_pending);
     143      655711 :         if (m_are_callbacks_running) return;
     144      531438 :         if (m_callbacks_pending.empty()) return;
     145      407191 :         m_are_callbacks_running = true;
     146             : 
     147      407191 :         callback = std::move(m_callbacks_pending.front());
     148      407191 :         m_callbacks_pending.pop_front();
     149             :     }
     150             : 
     151             :     // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
     152             :     // to ensure both happen safely even if callback() throws.
     153      407191 :     struct RAIICallbacksRunning {
     154             :         SingleThreadedSchedulerClient* instance;
     155      407191 :         explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
     156      814382 :         ~RAIICallbacksRunning() {
     157      407191 :             {
     158      407191 :                 LOCK(instance->m_cs_callbacks_pending);
     159      407191 :                 instance->m_are_callbacks_running = false;
     160             :             }
     161      407191 :             instance->MaybeScheduleProcessQueue();
     162      407191 :         }
     163      407191 :     } raiicallbacksrunning(this);
     164             : 
     165      407191 :     callback();
     166             : }
     167             : 
     168      407191 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
     169      407191 :     assert(m_pscheduler);
     170             : 
     171      407191 :     {
     172      407191 :         LOCK(m_cs_callbacks_pending);
     173      814382 :         m_callbacks_pending.emplace_back(std::move(func));
     174             :     }
     175      407191 :     MaybeScheduleProcessQueue();
     176      407191 : }
     177             : 
     178         494 : void SingleThreadedSchedulerClient::EmptyQueue() {
     179         494 :     assert(!m_pscheduler->AreThreadsServicingQueue());
     180             :     bool should_continue = true;
     181        1003 :     while (should_continue) {
     182         509 :         ProcessQueue();
     183        1018 :         LOCK(m_cs_callbacks_pending);
     184         509 :         should_continue = !m_callbacks_pending.empty();
     185             :     }
     186         494 : }
     187             : 
     188       45528 : size_t SingleThreadedSchedulerClient::CallbacksPending() {
     189       45528 :     LOCK(m_cs_callbacks_pending);
     190       45528 :     return m_callbacks_pending.size();
     191             : }

Generated by: LCOV version 1.14