LCOV - code coverage report
Current view: top level - src/llmq - quorums_dkgsessionhandler.cpp (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 263 321 81.9 %
Date: 2025-02-23 09:33:43 Functions: 28 28 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2018-2021 The Dash Core developers
       2             : // Copyright (c) 2022 The PIVX Core developers
       3             : // Distributed under the MIT/X11 software license, see the accompanying
       4             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       5             : 
       6             : #include "llmq/quorums_dkgsessionhandler.h"
       7             : 
       8             : #include "activemasternode.h"
       9             : #include "chainparams.h"
      10             : #include "llmq/quorums_blockprocessor.h"
      11             : #include "llmq/quorums_connections.h"
      12             : #include "llmq/quorums_debug.h"
      13             : #include "net_processing.h"
      14             : #include "shutdown.h"
      15             : #include "util/threadnames.h"
      16             : #include "validation.h"
      17             : 
      18             : namespace llmq
      19             : {
      20             : 
      21        2524 : CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode) :
      22        2524 :     maxMessagesPerNode(_maxMessagesPerNode)
      23             : {
      24        2524 : }
      25             : 
      26         407 : void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, int invType)
      27             : {
      28             :     // this will also consume the data, even if we bail out early
      29         814 :     auto pm = std::make_shared<CDataStream>(std::move(vRecv));
      30             : 
      31         407 :     {
      32         407 :         LOCK(cs);
      33             : 
      34         407 :         if (messagesPerNode[from] >= maxMessagesPerNode) {
      35             :             // TODO ban?
      36           0 :             LogPrint(BCLog::NET, "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from);
      37           0 :             return;
      38             :         }
      39         407 :         messagesPerNode[from]++;
      40             :     }
      41             : 
      42         407 :     CHashWriter hw(SER_GETHASH, 0);
      43         407 :     hw.write(pm->data(), pm->size());
      44         407 :     uint256 hash = hw.GetHash();
      45             : 
      46        1221 :     LOCK2(cs_main, cs);
      47             : 
      48         407 :     g_connman->RemoveAskFor(hash, invType);
      49             : 
      50         407 :     if (!seenMessages.emplace(hash).second) {
      51           0 :         LogPrint(BCLog::NET, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from);
      52           0 :         return;
      53             :     }
      54             : 
      55         814 :     pendingMessages.emplace_back(std::make_pair(from, std::move(pm)));
      56             : }
      57             : 
      58       10773 : std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMessages(size_t maxCount)
      59             : {
      60       10773 :     LOCK(cs);
      61             : 
      62       10773 :     std::list<BinaryMessage> ret;
      63         404 :     while (!pendingMessages.empty() && ret.size() < maxCount) {
      64         404 :         ret.emplace_back(std::move(pendingMessages.front()));
      65       11177 :         pendingMessages.pop_front();
      66             :     }
      67             : 
      68       21546 :     return ret;
      69             : }
      70             : 
      71        3359 : bool CDKGPendingMessages::HasSeen(const uint256& hash) const
      72             : {
      73        3359 :     LOCK(cs);
      74        6718 :     return seenMessages.count(hash) != 0;
      75             : }
      76             : 
      77        1408 : void CDKGPendingMessages::Clear()
      78             : {
      79        1408 :     LOCK(cs);
      80        1408 :     pendingMessages.clear();
      81        1408 :     messagesPerNode.clear();
      82        1408 :     seenMessages.clear();
      83        1408 : }
      84             : 
      85             : //////
      86             : 
      87         631 : CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
      88             :     params(_params),
      89             :     blsWorker(_blsWorker),
      90             :     dkgManager(_dkgManager),
      91             :     curSession(std::make_shared<CDKGSession>(_params, _blsWorker, _dkgManager)),
      92         631 :     pendingContributions((size_t)_params.size * 2), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
      93         631 :     pendingComplaints((size_t)_params.size * 2),
      94         631 :     pendingJustifications((size_t)_params.size * 2),
      95         631 :     pendingPrematureCommitments((size_t)_params.size * 2)
      96             : {
      97         631 :     if (params.type == Consensus::LLMQ_NONE) {
      98           0 :         throw std::runtime_error("Can't initialize CDKGSessionHandler with LLMQ_NONE type.");
      99             :     }
     100         631 : }
     101             : 
     102         631 : CDKGSessionHandler::~CDKGSessionHandler()
     103             : {
     104         631 : }
     105             : 
     106        8623 : void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
     107             : {
     108        8623 :     AssertLockHeld(cs_main);
     109        8623 :     LOCK(cs);
     110             : 
     111        8623 :     int quorumStageInt = pindexNew->nHeight % params.dkgInterval;
     112        8623 :     const CBlockIndex* pindexQuorum = chainActive[pindexNew->nHeight - quorumStageInt];
     113             : 
     114        8623 :     currentHeight = pindexNew->nHeight;
     115        8623 :     quorumHeight = pindexQuorum->nHeight;
     116        8623 :     quorumHash = pindexQuorum->GetBlockHash();
     117             : 
     118        8623 :     bool fNewPhase = (quorumStageInt % params.dkgPhaseBlocks) == 0;
     119        8623 :     int phaseInt = quorumStageInt / params.dkgPhaseBlocks + 1;
     120        8623 :     QuorumPhase oldPhase = phase;
     121             : 
     122        8623 :     if (fNewPhase && phaseInt >= QuorumPhase_Initialized && phaseInt <= QuorumPhase_Idle) {
     123        3017 :         phase = static_cast<QuorumPhase>(phaseInt);
     124             :     }
     125             : 
     126        8623 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - currentHeight=%d, quorumHeight=%d, oldPhase=%d, newPhase=%d\n", __func__,
     127             :             params.name, currentHeight, quorumHeight, oldPhase, phase);
     128        8623 : }
     129             : 
     130         259 : void CDKGSessionHandler::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv)
     131             : {
     132             :     // We don't handle messages in the calling thread as deserialization/processing of these would block everything
     133         259 :     if (strCommand == NetMsgType::QCONTRIB) {
     134         121 :         pendingContributions.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_CONTRIB);
     135         138 :     } else if (strCommand == NetMsgType::QCOMPLAINT) {
     136          26 :         pendingComplaints.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_COMPLAINT);
     137         112 :     } else if (strCommand == NetMsgType::QJUSTIFICATION) {
     138           8 :         pendingJustifications.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_JUSTIFICATION);
     139         104 :     } else if (strCommand == NetMsgType::QPCOMMITMENT) {
     140         104 :         pendingPrematureCommitments.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_PREMATURE_COMMITMENT);
     141             :     }
     142         259 : }
     143             : 
     144         347 : void CDKGSessionHandler::StartThread()
     145             : {
     146         347 :     if (phaseHandlerThread.joinable()) {
     147           0 :         throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread.");
     148             :     }
     149             : 
     150         347 :     std::string threadName = strprintf("llmq-%d", (uint8_t)params.type);
     151         347 :     phaseHandlerThread = std::thread(&TraceThread<std::function<void()> >, threadName, std::function<void()>(std::bind(&CDKGSessionHandler::PhaseHandlerThread, this)));
     152         347 : }
     153             : 
     154         357 : void CDKGSessionHandler::StopThread()
     155             : {
     156         357 :     stopRequested = true;
     157         357 :     if (phaseHandlerThread.joinable()) {
     158         347 :         phaseHandlerThread.join();
     159             :     }
     160         357 : }
     161             : 
     162         352 : bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pindexQuorum)
     163             : {
     164         704 :     curSession = std::make_shared<CDKGSession>(params, blsWorker, dkgManager);
     165             : 
     166         352 :     if (!deterministicMNManager->IsDIP3Enforced(pindexQuorum->nHeight) ||
     167         352 :             !activeMasternodeManager) {
     168             :         return false;
     169             :     }
     170             : 
     171         358 :     auto mns = deterministicMNManager->GetAllQuorumMembers(params.type, pindexQuorum);
     172             : 
     173         179 :     if (!curSession->Init(pindexQuorum, mns, activeMasternodeManager->GetProTx())) {
     174          11 :         LogPrintf("CDKGSessionHandler::%s -- quorum initialization failed for %s\n", __func__, curSession->params.name);
     175             :         return false;
     176             :     }
     177             : 
     178             :     return true;
     179             : }
     180             : 
     181      182408 : CDKGSessionHandler::QuorumPhaseAndHash CDKGSessionHandler::GetPhaseAndQuorumHash() const
     182             : {
     183      182408 :     LOCK(cs);
     184      182408 :     return {phase, quorumHash};
     185             : }
     186             : 
     187         557 : class AbortPhaseException : public std::exception {
     188             : };
     189             : 
     190        1332 : void CDKGSessionHandler::WaitForNextPhase(QuorumPhase curPhase,
     191             :                                           QuorumPhase nextPhase,
     192             :                                           const uint256& expectedQuorumHash,
     193             :                                           const WhileWaitFunc& runWhileWaiting)
     194             : {
     195        1332 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
     196             : 
     197      281566 :     while (true) {
     198      141449 :         if (stopRequested || ShutdownRequested()) {
     199         319 :             throw AbortPhaseException();
     200             :         }
     201      141130 :         auto currState = GetPhaseAndQuorumHash();
     202     4191480 :         if (!expectedQuorumHash.IsNull() && currState.quorumHash != expectedQuorumHash) {
     203           0 :             throw AbortPhaseException();
     204             :         }
     205      141130 :         if (currState.phase == nextPhase) {
     206             :             break;
     207             :         }
     208      140171 :         if (curPhase != QuorumPhase_None && currState.phase != curPhase) {
     209          54 :             LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - aborting due unexpected phase change\n", __func__, params.name);
     210          54 :             throw AbortPhaseException();
     211             :         }
     212      140117 :         if (!runWhileWaiting()) {
     213      139763 :             MilliSleep(100);
     214             :         }
     215      140117 :     }
     216             : 
     217         959 :     if (nextPhase == QuorumPhase_Initialized) {
     218         352 :         quorumDKGDebugManager->ResetLocalSessionStatus(params.type);
     219             :     } else {
     220        1214 :         quorumDKGDebugManager->UpdateLocalSessionStatus(params.type, [&](CDKGDebugSessionStatus& status) {
     221         303 :             bool changed = status.phase != (uint8_t) nextPhase;
     222         303 :             status.phase = (uint8_t) nextPhase;
     223         303 :             return changed;
     224             :         });
     225             :     }
     226             : 
     227         959 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
     228             : 
     229         959 : }
     230             : 
     231         184 : void CDKGSessionHandler::WaitForNewQuorum(const uint256& oldQuorumHash)
     232             : {
     233         184 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting\n", __func__, params.name);
     234             : 
     235       82428 :     while (true) {
     236       41306 :         if (stopRequested || ShutdownRequested()) {
     237          28 :             throw AbortPhaseException();
     238             :         }
     239       41278 :         auto currState = GetPhaseAndQuorumHash();
     240       41278 :         if (currState.quorumHash != oldQuorumHash) {
     241             :             break;
     242             :         }
     243       41122 :         MilliSleep(100);
     244       41122 :     }
     245             : 
     246         156 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done\n", __func__, params.name);
     247         156 : }
     248             : 
     249             : // Sleep some time to not fully overload the whole network
     250         494 : void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,
     251             :                                           const uint256& expectedQuorumHash,
     252             :                                           double randomSleepFactor,
     253             :                                           const WhileWaitFunc& runWhileWaiting)
     254             : {
     255         494 :     if (Params().IsRegTestNet()) {
     256             :         // On regtest, blocks can be mined on demand without any significant time passing between these.
     257             :         // We shouldn't wait before phases in this case.
     258         494 :         return;
     259             :     }
     260             : 
     261           0 :     if (!curSession->AreWeMember()) {
     262             :         // Non-members do not participate and do not create any network load, no need to sleep.
     263             :         return;
     264             :     }
     265             : 
     266             :     // Two blocks can come very close to each other, this happens pretty regularly. We don't want to be
     267             :     // left behind and marked as a bad member. This means that we should not count the last block of the
     268             :     // phase as a safe one to keep sleeping, that's why we calculate the phase sleep time as a time of
     269             :     // the full phase minus one block here.
     270           0 :     const int nTargetSpacing = Params().GetConsensus().nTargetSpacing;
     271           0 :     double phaseSleepTime = (params.dkgPhaseBlocks - 1) * nTargetSpacing * 1000;
     272             :     // Expected phase sleep time per member
     273           0 :     double phaseSleepTimePerMember = phaseSleepTime / params.size;
     274             :     // Don't expect perfect block times and thus reduce the phase time to be on the secure side (caller chooses factor)
     275           0 :     double adjustedPhaseSleepTimePerMember = phaseSleepTimePerMember * randomSleepFactor;
     276             : 
     277           0 :     int64_t sleepTime = (int64_t)(adjustedPhaseSleepTimePerMember * curSession->GetMyMemberIndex());
     278           0 :     int64_t endTime = GetTimeMillis() + sleepTime;
     279           0 :     int heightTmp{-1};
     280           0 :     int heightStart{-1};
     281           0 :     {
     282           0 :         LOCK(cs);
     283           0 :         heightTmp = heightStart = currentHeight;
     284             :     }
     285             : 
     286           0 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting sleep for %d ms, curPhase=%d\n", __func__, params.name, sleepTime, curPhase);
     287             : 
     288           0 :     while (GetTimeMillis() < endTime) {
     289           0 :         if (stopRequested) {
     290           0 :             LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - aborting due to stop/shutdown requested\n", __func__, params.name);
     291           0 :             throw AbortPhaseException();
     292             :         }
     293           0 :         {
     294           0 :             LOCK(cs);
     295           0 :             if (currentHeight > heightTmp) {
     296             :                 // New block(s) just came in
     297           0 :                 int64_t expectedBlockTime = (currentHeight - heightStart) * nTargetSpacing * 1000;
     298           0 :                 if (expectedBlockTime > sleepTime) {
     299             :                     // Blocks came faster than we expected, jump into the phase func asap
     300             :                     break;
     301             :                 }
     302             :                 heightTmp = currentHeight;
     303             :             }
     304           0 :             if (phase != curPhase || quorumHash != expectedQuorumHash) {
     305             :                 // Something went wrong and/or we missed quite a few blocks and it's just too late now
     306           0 :                 LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name);
     307           0 :                 throw AbortPhaseException();
     308             :             }
     309             :         }
     310           0 :         if (!runWhileWaiting()) {
     311           0 :             MilliSleep(100);
     312             :         }
     313             :     }
     314             : 
     315           0 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done, curPhase=%d\n", __func__, params.name, curPhase);
     316             : }
     317             : 
     318         494 : void CDKGSessionHandler::HandlePhase(QuorumPhase curPhase,
     319             :                                      QuorumPhase nextPhase,
     320             :                                      const uint256& expectedQuorumHash,
     321             :                                      double randomSleepFactor,
     322             :                                      const StartPhaseFunc& startPhaseFunc,
     323             :                                      const WhileWaitFunc& runWhileWaiting)
     324             : {
     325         494 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
     326             : 
     327         494 :     SleepBeforePhase(curPhase, expectedQuorumHash, randomSleepFactor, runWhileWaiting);
     328         494 :     startPhaseFunc();
     329         494 :     WaitForNextPhase(curPhase, nextPhase, expectedQuorumHash, runWhileWaiting);
     330             : 
     331         464 :     LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
     332         464 : }
     333             : 
     334             : // returns a set of NodeIds which sent invalid messages
     335             : template<typename Message>
     336         353 : std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<std::pair<NodeId, std::shared_ptr<Message>>>& messages)
     337             : {
     338         353 :     if (messages.empty()) {
     339           0 :         return {};
     340             :     }
     341             : 
     342         353 :     std::set<NodeId> ret;
     343         353 :     bool revertToSingleVerification = false;
     344             : 
     345         353 :     CBLSSignature aggSig;
     346         353 :     std::vector<CBLSPublicKey> pubKeys;
     347         706 :     std::vector<uint256> messageHashes;
     348         706 :     std::set<uint256> messageHashesSet;
     349         353 :     pubKeys.reserve(messages.size());
     350         353 :     messageHashes.reserve(messages.size());
     351         353 :     bool first = true;
     352         735 :     for (const auto& p : messages ) {
     353         403 :         const auto& msg = *p.second;
     354             : 
     355         403 :         auto member = session.GetMember(msg.proTxHash);
     356         403 :         if (!member) {
     357             :             // should not happen as it was verified before
     358           0 :             ret.emplace(p.first);
     359           0 :             continue;
     360             :         }
     361             : 
     362         403 :         if (first) {
     363         353 :             aggSig = msg.sig;
     364             :         } else {
     365          50 :             aggSig.AggregateInsecure(msg.sig);
     366             :         }
     367         403 :         first = false;
     368             : 
     369         403 :         auto msgHash = msg.GetSignHash();
     370         403 :         if (!messageHashesSet.emplace(msgHash).second) {
     371             :             // can only happen in 2 cases:
     372             :             // 1. Someone sent us the same message twice but with differing signature, meaning that at least one of them
     373             :             //    must be invalid. In this case, we'd have to revert to single message verification nevertheless
     374             :             // 2. Someone managed to find a way to create two different binary representations of a message that deserializes
     375             :             //    to the same object representation. This would be some form of malleability. However, this shouldn't be
     376             :             //    possible as only deterministic/unique BLS signatures and very simple data types are involved
     377          21 :             revertToSingleVerification = true;
     378          21 :             break;
     379             :         }
     380             : 
     381         382 :         pubKeys.emplace_back(member->dmn->pdmnState->pubKeyOperator.Get());
     382         382 :         messageHashes.emplace_back(msgHash);
     383             :     }
     384             :     if (!revertToSingleVerification) {
     385         332 :         bool valid = aggSig.VerifyInsecureAggregated(pubKeys, messageHashes);
     386         332 :         if (valid) {
     387             :             // all good
     388         332 :             return ret;
     389             :         }
     390             : 
     391             :         // are all messages from the same node?
     392             :         NodeId firstNodeId;
     393           0 :         first = true;
     394           0 :         bool nodeIdsAllSame = true;
     395           0 :         for (auto it = messages.begin(); it != messages.end(); ++it) {
     396             :             if (first) {
     397           0 :                 firstNodeId = it->first;
     398             :             } else {
     399             :                 first = false;
     400             :                 if (it->first != firstNodeId) {
     401             :                     nodeIdsAllSame = false;
     402             :                     break;
     403             :                 }
     404             :             }
     405             :         }
     406             :         // if yes, take a short path and return a set with only him
     407             :         if (nodeIdsAllSame) {
     408           0 :             ret.emplace(firstNodeId);
     409         332 :             return ret;
     410             :         }
     411             :         // different nodes, let's figure out who are the bad ones
     412             :     }
     413             : 
     414          63 :     for (const auto& p : messages) {
     415          42 :         if (ret.count(p.first)) {
     416           0 :             continue;
     417             :         }
     418             : 
     419          42 :         const auto& msg = *p.second;
     420          42 :         auto member = session.GetMember(msg.proTxHash);
     421          42 :         bool valid = msg.sig.VerifyInsecure(member->dmn->pdmnState->pubKeyOperator.Get(), msg.GetSignHash());
     422          42 :         if (!valid) {
     423          42 :             ret.emplace(p.first);
     424             :         }
     425             :     }
     426         353 :     return ret;
     427             : }
     428             : 
     429             : template<typename Message>
     430       10773 : static bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount)
     431             : {
     432       10773 :     auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
     433       10773 :     if (msgs.empty()) {
     434             :         return false;
     435             :     }
     436             : 
     437       10773 :     std::vector<uint256> hashes;
     438         708 :     std::vector<std::pair<NodeId, std::shared_ptr<Message>>> preverifiedMessages;
     439         354 :     hashes.reserve(msgs.size());
     440         354 :     preverifiedMessages.reserve(msgs.size());
     441             : 
     442         758 :     for (const auto& p : msgs) {
     443         404 :         if (!p.second) {
     444           0 :             LogPrint(BCLog::NET, "%s -- failed to deserialize message, peer=%d\n", __func__, p.first);
     445             :             {
     446           0 :                 LOCK(cs_main);
     447           0 :                 Misbehaving(p.first, 100);
     448             :             }
     449           1 :             continue;
     450             :         }
     451         404 :         const auto& msg = *p.second;
     452             : 
     453         404 :         bool ban = false;
     454         404 :         if (!session.PreVerifyMessage(msg, ban)) {
     455           1 :             if (ban) {
     456           1 :                 LogPrint(BCLog::NET, "%s -- banning node due to failed preverification, peer=%d\n", __func__, p.first);
     457             :                 {
     458           2 :                     LOCK(cs_main);
     459           2 :                     Misbehaving(p.first, 100);
     460             :                 }
     461             :             }
     462           1 :             LogPrint(BCLog::NET, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, p.first);
     463           1 :             continue;
     464             :         }
     465         403 :         hashes.emplace_back(::SerializeHash(msg));
     466         403 :         preverifiedMessages.emplace_back(p);
     467             :     }
     468         354 :     if (preverifiedMessages.empty()) {
     469             :         return true;
     470             :     }
     471             : 
     472         707 :     auto badNodes = BatchVerifyMessageSigs(session, preverifiedMessages);
     473         353 :     if (!badNodes.empty()) {
     474           0 :         LOCK(cs_main);
     475           0 :         for (auto nodeId : badNodes) {
     476           0 :             LogPrint(BCLog::NET, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId);
     477           0 :             Misbehaving(nodeId, 100);
     478             :         }
     479             :     }
     480             : 
     481         756 :     for (size_t i = 0; i < preverifiedMessages.size(); i++) {
     482         403 :         NodeId nodeId = preverifiedMessages[i].first;
     483         403 :         if (badNodes.count(nodeId)) {
     484           0 :             continue;
     485             :         }
     486         403 :         const auto& msg = *preverifiedMessages[i].second;
     487         403 :         bool ban = false;
     488         403 :         session.ReceiveMessage(hashes[i], msg, ban);
     489         403 :         if (ban) {
     490           0 :             LogPrint(BCLog::NET, "%s -- banning node after ReceiveMessage failed, peer=%d\n", __func__, nodeId);
     491           0 :             LOCK(cs_main);
     492           0 :             Misbehaving(nodeId, 100);
     493           0 :             badNodes.emplace(nodeId);
     494             :         }
     495             :     }
     496             : 
     497         353 :     return true;
     498             : }
     499             : 
     500         670 : void CDKGSessionHandler::HandleDKGRound()
     501             : {
     502         670 :     uint256 curQuorumHash;
     503             : 
     504         988 :     WaitForNextPhase(QuorumPhase_None, QuorumPhase_Initialized, UINT256_ZERO, []{return false;});
     505             : 
     506         352 :     {
     507         352 :         LOCK(cs);
     508         352 :         pendingContributions.Clear();
     509         352 :         pendingComplaints.Clear();
     510         352 :         pendingJustifications.Clear();
     511         352 :         pendingPrematureCommitments.Clear();
     512         352 :         curQuorumHash = quorumHash;
     513             :     }
     514             : 
     515         704 :     const CBlockIndex* pindexQuorum = WITH_LOCK(cs_main, return LookupBlockIndex(curQuorumHash));
     516         352 :     if (!pindexQuorum) {
     517             :         // should never happen
     518           0 :         LogPrintf("%s: ERROR: Unable to find block %s\n", __func__, curQuorumHash.ToString());
     519           0 :         return;
     520             :     }
     521             : 
     522         352 :     if (!InitNewQuorum(pindexQuorum)) {
     523             :         // should actually never happen
     524         184 :         WaitForNewQuorum(curQuorumHash);
     525         156 :         throw AbortPhaseException();
     526             :     }
     527             : 
     528         168 :     quorumDKGDebugManager->UpdateLocalSessionStatus(params.type, [&](CDKGDebugSessionStatus& status) {
     529          82 :         bool changed = status.phase != (uint8_t) QuorumPhase_Initialized;
     530          82 :         status.phase = (uint8_t) QuorumPhase_Initialized;
     531          82 :         return changed;
     532             :     });
     533             : 
     534         168 :     EnsureQuorumConnections(params.type, pindexQuorum, curSession->myProTxHash);
     535         336 :     if (curSession->AreWeMember()) {
     536          82 :         AddQuorumProbeConnections(params.type, pindexQuorum, curSession->myProTxHash);
     537             :     }
     538             : 
     539         193 :     WaitForNextPhase(QuorumPhase_Initialized, QuorumPhase_Contribute, curQuorumHash, []{return false;});
     540             : 
     541             :     // Contribute
     542         286 :     auto fContributeStart = [this]() {
     543         143 :         curSession->Contribute(pendingContributions);
     544         143 :     };
     545        3007 :     auto fContributeWait = [this] {
     546        2864 :         return ProcessPendingMessageBatch<CDKGContribution>(*curSession, pendingContributions, 8);
     547         143 :     };
     548         305 :     HandlePhase(QuorumPhase_Contribute, QuorumPhase_Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait);
     549             : 
     550             :     // Complain
     551         248 :     auto fComplainStart = [this]() {
     552         124 :         curSession->VerifyAndComplain(pendingComplaints);
     553         124 :     };
     554        3073 :     auto fComplainWait = [this] {
     555        2949 :         return ProcessPendingMessageBatch<CDKGComplaint>(*curSession, pendingComplaints, 8);
     556         124 :     };
     557         258 :     HandlePhase(QuorumPhase_Complain, QuorumPhase_Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait);
     558             : 
     559             :     // Justify
     560         228 :     auto fJustifyStart = [this]() {
     561         114 :         curSession->VerifyAndJustify(pendingJustifications);
     562         114 :     };
     563        2571 :     auto fJustifyWait = [this] {
     564        2457 :         return ProcessPendingMessageBatch<CDKGJustification>(*curSession, pendingJustifications, 8);
     565         114 :     };
     566         229 :     HandlePhase(QuorumPhase_Justify, QuorumPhase_Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait);
     567             : 
     568             :     // Commit
     569         226 :     auto fCommitStart = [this]() {
     570         113 :         curSession->VerifyAndCommit(pendingPrematureCommitments);
     571         113 :     };
     572        2616 :     auto fCommitWait = [this] {
     573        2503 :         return ProcessPendingMessageBatch<CDKGPrematureCommitment>(*curSession, pendingPrematureCommitments, 8);
     574         113 :     };
     575         226 :     HandlePhase(QuorumPhase_Commit, QuorumPhase_Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait);
     576             : 
     577         226 :     auto finalCommitments = curSession->FinalizeCommitments();
     578         170 :     for (const auto& fqc : finalCommitments) {
     579          57 :         quorumBlockProcessor->AddAndRelayMinableCommitment(fqc);
     580             :     }
     581             : }
     582             : 
     583         347 : void CDKGSessionHandler::PhaseHandlerThread()
     584             : {
     585        1017 :     while (!stopRequested && !ShutdownRequested()) {
     586         670 :         try {
     587         670 :             HandleDKGRound();
     588        1114 :         } catch (AbortPhaseException& e) {
     589         557 :             quorumDKGDebugManager->UpdateLocalSessionStatus(params.type, [&](CDKGDebugSessionStatus& status) {
     590          40 :                 status.aborted = true;
     591          40 :                 return true;
     592             :             });
     593         557 :             LogPrintf("CDKGSessionHandler::%s -- aborted current DKG session for llmq=%s\n", __func__, params.name);
     594             :         }
     595             :     }
     596         347 : }
     597             : 
     598             : }

Generated by: LCOV version 1.14