Line data Source code
1 : // Copyright (c) 2015 The Bitcoin Core developers 2 : // Copyright (c) 2017-2021 The PIVX Core developers 3 : // Distributed under the MIT software license, see the accompanying 4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 : 6 : #ifndef PIVX_SCHEDULER_H 7 : #define PIVX_SCHEDULER_H 8 : 9 : // 10 : // NOTE: 11 : // boost::thread should be ported to std::thread 12 : // when we support C++11. 13 : // 14 : #include <condition_variable> 15 : #include <functional> 16 : #include <list> 17 : #include <map> 18 : 19 : #include "sync.h" 20 : 21 : // 22 : // Simple class for background tasks that should be run 23 : // periodically or once "after a while" 24 : // 25 : // Usage: 26 : // 27 : // CScheduler* s = new CScheduler(); 28 : // s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } 29 : // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); 30 : // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); 31 : // 32 : // ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: 33 : // s->stop(); 34 : // t->join(); 35 : // delete t; 36 : // delete s; // Must be done after thread is interrupted/joined. 37 : // 38 : 39 : class CScheduler 40 : { 41 : public: 42 : CScheduler(); 43 : ~CScheduler(); 44 : 45 : typedef std::function<void(void)> Function; 46 : 47 : // Call func at/after time t 48 : void schedule(Function f, std::chrono::system_clock::time_point t); 49 : 50 : // Convenience method: call f once deltaMilliSeconds from now 51 : void scheduleFromNow(Function f, int64_t deltaMilliSeconds); 52 : 53 : // Another convenience method: call f approximately 54 : // every deltaMilliSeconds forever, starting deltaMilliSeconds from now. 55 : // To be more precise: every time f is finished, it 56 : // is rescheduled to run deltaMilliSeconds later. If you 57 : // need more accurate scheduling, don't use this method. 58 : void scheduleEvery(Function f, int64_t deltaMilliSeconds); 59 : 60 : // To keep things as simple as possible, there is no unschedule. 61 : 62 : // Services the queue 'forever'. Should be run in a thread, 63 : // and interrupted using boost::interrupt_thread 64 : void serviceQueue(); 65 : 66 : // Tell any threads running serviceQueue to stop as soon as they're 67 : // done servicing whatever task they're currently servicing (drain=false) 68 : // or when there is no work left to be done (drain=true) 69 : void stop(bool drain=false); 70 : 71 : // Returns number of tasks waiting to be serviced, 72 : // and first and last task times 73 : size_t getQueueInfo(std::chrono::system_clock::time_point &first, 74 : std::chrono::system_clock::time_point &last) const; 75 : 76 : // Returns true if there are threads actively running in serviceQueue() 77 : bool AreThreadsServicingQueue() const; 78 : 79 : private: 80 : mutable Mutex newTaskMutex; 81 : std::condition_variable newTaskScheduled; 82 : std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); 83 : int nThreadsServicingQueue GUARDED_BY(newTaskMutex); 84 : bool stopRequested GUARDED_BY(newTaskMutex); 85 : bool stopWhenEmpty GUARDED_BY(newTaskMutex); 86 2635152 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } 87 : }; 88 : 89 : /** 90 : * Class used by CScheduler clients which may schedule multiple jobs 91 : * which are required to be run serially. Jobs may not be run on the 92 : * same thread, but no two jobs will be executed 93 : * at the same time and memory will be release-acquire consistent 94 : * (the scheduler will internally do an acquire before invoking a callback 95 : * as well as a release at the end). In practice this means that a callback 96 : * B() will be able to observe all of the effects of callback A() which executed 97 : * before it. 98 : */ 99 1 : class SingleThreadedSchedulerClient { 100 : private: 101 : CScheduler *m_pscheduler; 102 : 103 : RecursiveMutex m_cs_callbacks_pending; 104 : std::list<std::function<void (void)>> m_callbacks_pending; 105 : bool m_are_callbacks_running = false; 106 : 107 : void MaybeScheduleProcessQueue(); 108 : void ProcessQueue(); 109 : 110 : public: 111 495 : explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} 112 : 113 : /** 114 : * Add a callback to be executed. Callbacks are executed serially 115 : * and memory is release-acquire consistent between callback executions. 116 : * Practically, this means that callbacks can behave as if they are executed 117 : * in order by a single thread. 118 : */ 119 : void AddToProcessQueue(std::function<void (void)> func); 120 : 121 : // Processes all remaining queue members on the calling thread, blocking until queue is empty 122 : // Must be called after the CScheduler has no remaining processing threads! 123 : void EmptyQueue(); 124 : 125 : size_t CallbacksPending(); 126 : }; 127 : 128 : #endif // PIVX_SCHEDULER_H