Line data Source code
1 : // Copyright (c) 2018-2021 The Dash Core developers 2 : // Copyright (c) 2022 The PIVX Core developers 3 : // Distributed under the MIT/X11 software license, see the accompanying 4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 : 6 : #ifndef PIVX_LLMQ_QUORUMS_DKGSESSIONHANDLER_H 7 : #define PIVX_LLMQ_QUORUMS_DKGSESSIONHANDLER_H 8 : 9 : #include "ctpl_stl.h" 10 : #include "llmq/quorums_dkgsession.h" 11 : #include "validation.h" 12 : 13 : namespace llmq 14 : { 15 : 16 : enum QuorumPhase { 17 : QuorumPhase_None = -1, 18 : QuorumPhase_Initialized = 1, 19 : QuorumPhase_Contribute, 20 : QuorumPhase_Complain, 21 : QuorumPhase_Justify, 22 : QuorumPhase_Commit, 23 : QuorumPhase_Finalize, 24 : QuorumPhase_Idle, 25 : }; 26 : 27 : /** 28 : * Acts as a FIFO queue for incoming DKG messages. The reason we need this is that deserialization of these messages 29 : * is too slow to be processed in the main message handler thread. So, instead of processing them directly from the 30 : * main handler thread, we push them into a CDKGPendingMessages object and later pop+deserialize them in the DKG phase 31 : * handler thread. 32 : * 33 : * Each message type has it's own instance of this class. 34 : */ 35 : class CDKGPendingMessages 36 : { 37 : public: 38 : typedef std::pair<NodeId, std::shared_ptr<CDataStream>> BinaryMessage; 39 : 40 : private: 41 : mutable RecursiveMutex cs; 42 : size_t maxMessagesPerNode; 43 : std::list<BinaryMessage> pendingMessages; 44 : std::map<NodeId, size_t> messagesPerNode; 45 : std::set<uint256> seenMessages; 46 : 47 : public: 48 : explicit CDKGPendingMessages(size_t _maxMessagesPerNode); 49 : 50 : void PushPendingMessage(NodeId from, CDataStream& vRecv, int invType); 51 : std::list<BinaryMessage> PopPendingMessages(size_t maxCount); 52 : bool HasSeen(const uint256& hash) const; 53 : void Clear(); 54 : 55 : template<typename Message> 56 148 : void PushPendingMessage(NodeId from, Message& msg, int invType) 57 : { 58 148 : CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); 59 148 : ds << msg; 60 148 : PushPendingMessage(from, ds, invType); 61 148 : } 62 : 63 : // Might return nullptr messages, which indicates that deserialization failed for some reason 64 : template<typename Message> 65 10773 : std::vector<std::pair<NodeId, std::shared_ptr<Message>>> PopAndDeserializeMessages(size_t maxCount) 66 : { 67 21546 : auto binaryMessages = PopPendingMessages(maxCount); 68 10773 : if (binaryMessages.empty()) { 69 10773 : return {}; 70 : } 71 : 72 354 : std::vector<std::pair<NodeId, std::shared_ptr<Message>>> ret; 73 354 : ret.reserve(binaryMessages.size()); 74 758 : for (const auto& bm : binaryMessages) { 75 808 : auto msg = std::make_shared<Message>(); 76 : try { 77 404 : *bm.second >> *msg; 78 0 : } catch (...) { 79 0 : msg = nullptr; 80 : } 81 808 : ret.emplace_back(std::make_pair(bm.first, std::move(msg))); 82 : } 83 : 84 354 : return ret; 85 : } 86 : }; 87 : 88 : /** 89 : * Handles multiple sequential sessions of one specific LLMQ type. There is one instance of this class per LLMQ type. 90 : * 91 : * It internally starts the phase handler thread, which constantly loops and sequentially processes one session at a 92 : * time and waiting for the next phase if necessary. 93 : */ 94 : class CDKGSessionHandler 95 : { 96 : private: 97 : friend class CDKGSessionManager; 98 : 99 : mutable RecursiveMutex cs; 100 : std::atomic<bool> stopRequested{false}; 101 : 102 : const Consensus::LLMQParams& params; 103 : CBLSWorker& blsWorker; 104 : CDKGSessionManager& dkgManager; 105 : 106 : QuorumPhase phase{QuorumPhase_Idle}; 107 : int currentHeight{-1}; 108 : int quorumHeight{-1}; 109 : uint256 quorumHash{UINT256_ZERO}; 110 : std::shared_ptr<CDKGSession> curSession; 111 : std::thread phaseHandlerThread; 112 : 113 : CDKGPendingMessages pendingContributions; 114 : CDKGPendingMessages pendingComplaints; 115 : CDKGPendingMessages pendingJustifications; 116 : CDKGPendingMessages pendingPrematureCommitments; 117 : 118 : public: 119 : CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager); 120 : ~CDKGSessionHandler(); 121 : 122 : void UpdatedBlockTip(const CBlockIndex *pindexNew); 123 : void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv); 124 : 125 : void StartThread(); 126 : void StopThread(); 127 : 128 : private: 129 : bool InitNewQuorum(const CBlockIndex* pindexQuorum); 130 : 131 : struct QuorumPhaseAndHash { 132 : QuorumPhase phase; 133 : uint256 quorumHash; 134 : }; 135 : QuorumPhaseAndHash GetPhaseAndQuorumHash() const; 136 : 137 : typedef std::function<void()> StartPhaseFunc; 138 : typedef std::function<bool()> WhileWaitFunc; 139 : void WaitForNextPhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256& expectedQuorumHash, const WhileWaitFunc& runWhileWaiting); 140 : void WaitForNewQuorum(const uint256& oldQuorumHash); 141 : void SleepBeforePhase(QuorumPhase curPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting); 142 : void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting); 143 : void HandleDKGRound(); 144 : void PhaseHandlerThread(); 145 : }; 146 : 147 : } 148 : 149 : #endif // PIVX_LLMQ_QUORUMS_DKGSESSIONHANDLER_H