Line data Source code
1 : // Copyright (c) 2018-2021 The Dash Core developers
2 : // Copyright (c) 2022 The PIVX Core developers
3 : // Distributed under the MIT/X11 software license, see the accompanying
4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 :
6 : #include "llmq/quorums_dkgsessionhandler.h"
7 :
8 : #include "activemasternode.h"
9 : #include "chainparams.h"
10 : #include "llmq/quorums_blockprocessor.h"
11 : #include "llmq/quorums_connections.h"
12 : #include "llmq/quorums_debug.h"
13 : #include "net_processing.h"
14 : #include "shutdown.h"
15 : #include "util/threadnames.h"
16 : #include "validation.h"
17 :
18 : namespace llmq
19 : {
20 :
21 2524 : CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode) :
22 2524 : maxMessagesPerNode(_maxMessagesPerNode)
23 : {
24 2524 : }
25 :
26 407 : void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, int invType)
27 : {
28 : // this will also consume the data, even if we bail out early
29 814 : auto pm = std::make_shared<CDataStream>(std::move(vRecv));
30 :
31 407 : {
32 407 : LOCK(cs);
33 :
34 407 : if (messagesPerNode[from] >= maxMessagesPerNode) {
35 : // TODO ban?
36 0 : LogPrint(BCLog::NET, "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from);
37 0 : return;
38 : }
39 407 : messagesPerNode[from]++;
40 : }
41 :
42 407 : CHashWriter hw(SER_GETHASH, 0);
43 407 : hw.write(pm->data(), pm->size());
44 407 : uint256 hash = hw.GetHash();
45 :
46 1221 : LOCK2(cs_main, cs);
47 :
48 407 : g_connman->RemoveAskFor(hash, invType);
49 :
50 407 : if (!seenMessages.emplace(hash).second) {
51 0 : LogPrint(BCLog::NET, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from);
52 0 : return;
53 : }
54 :
55 814 : pendingMessages.emplace_back(std::make_pair(from, std::move(pm)));
56 : }
57 :
58 10773 : std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMessages(size_t maxCount)
59 : {
60 10773 : LOCK(cs);
61 :
62 10773 : std::list<BinaryMessage> ret;
63 404 : while (!pendingMessages.empty() && ret.size() < maxCount) {
64 404 : ret.emplace_back(std::move(pendingMessages.front()));
65 11177 : pendingMessages.pop_front();
66 : }
67 :
68 21546 : return ret;
69 : }
70 :
71 3359 : bool CDKGPendingMessages::HasSeen(const uint256& hash) const
72 : {
73 3359 : LOCK(cs);
74 6718 : return seenMessages.count(hash) != 0;
75 : }
76 :
77 1408 : void CDKGPendingMessages::Clear()
78 : {
79 1408 : LOCK(cs);
80 1408 : pendingMessages.clear();
81 1408 : messagesPerNode.clear();
82 1408 : seenMessages.clear();
83 1408 : }
84 :
85 : //////
86 :
87 631 : CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
88 : params(_params),
89 : blsWorker(_blsWorker),
90 : dkgManager(_dkgManager),
91 : curSession(std::make_shared<CDKGSession>(_params, _blsWorker, _dkgManager)),
92 631 : pendingContributions((size_t)_params.size * 2), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
93 631 : pendingComplaints((size_t)_params.size * 2),
94 631 : pendingJustifications((size_t)_params.size * 2),
95 631 : pendingPrematureCommitments((size_t)_params.size * 2)
96 : {
97 631 : if (params.type == Consensus::LLMQ_NONE) {
98 0 : throw std::runtime_error("Can't initialize CDKGSessionHandler with LLMQ_NONE type.");
99 : }
100 631 : }
101 :
102 631 : CDKGSessionHandler::~CDKGSessionHandler()
103 : {
104 631 : }
105 :
106 8623 : void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
107 : {
108 8623 : AssertLockHeld(cs_main);
109 8623 : LOCK(cs);
110 :
111 8623 : int quorumStageInt = pindexNew->nHeight % params.dkgInterval;
112 8623 : const CBlockIndex* pindexQuorum = chainActive[pindexNew->nHeight - quorumStageInt];
113 :
114 8623 : currentHeight = pindexNew->nHeight;
115 8623 : quorumHeight = pindexQuorum->nHeight;
116 8623 : quorumHash = pindexQuorum->GetBlockHash();
117 :
118 8623 : bool fNewPhase = (quorumStageInt % params.dkgPhaseBlocks) == 0;
119 8623 : int phaseInt = quorumStageInt / params.dkgPhaseBlocks + 1;
120 8623 : QuorumPhase oldPhase = phase;
121 :
122 8623 : if (fNewPhase && phaseInt >= QuorumPhase_Initialized && phaseInt <= QuorumPhase_Idle) {
123 3017 : phase = static_cast<QuorumPhase>(phaseInt);
124 : }
125 :
126 8623 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - currentHeight=%d, quorumHeight=%d, oldPhase=%d, newPhase=%d\n", __func__,
127 : params.name, currentHeight, quorumHeight, oldPhase, phase);
128 8623 : }
129 :
130 259 : void CDKGSessionHandler::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv)
131 : {
132 : // We don't handle messages in the calling thread as deserialization/processing of these would block everything
133 259 : if (strCommand == NetMsgType::QCONTRIB) {
134 121 : pendingContributions.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_CONTRIB);
135 138 : } else if (strCommand == NetMsgType::QCOMPLAINT) {
136 26 : pendingComplaints.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_COMPLAINT);
137 112 : } else if (strCommand == NetMsgType::QJUSTIFICATION) {
138 8 : pendingJustifications.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_JUSTIFICATION);
139 104 : } else if (strCommand == NetMsgType::QPCOMMITMENT) {
140 104 : pendingPrematureCommitments.PushPendingMessage(pfrom->GetId(), vRecv, MSG_QUORUM_PREMATURE_COMMITMENT);
141 : }
142 259 : }
143 :
144 347 : void CDKGSessionHandler::StartThread()
145 : {
146 347 : if (phaseHandlerThread.joinable()) {
147 0 : throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread.");
148 : }
149 :
150 347 : std::string threadName = strprintf("llmq-%d", (uint8_t)params.type);
151 347 : phaseHandlerThread = std::thread(&TraceThread<std::function<void()> >, threadName, std::function<void()>(std::bind(&CDKGSessionHandler::PhaseHandlerThread, this)));
152 347 : }
153 :
154 357 : void CDKGSessionHandler::StopThread()
155 : {
156 357 : stopRequested = true;
157 357 : if (phaseHandlerThread.joinable()) {
158 347 : phaseHandlerThread.join();
159 : }
160 357 : }
161 :
162 352 : bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pindexQuorum)
163 : {
164 704 : curSession = std::make_shared<CDKGSession>(params, blsWorker, dkgManager);
165 :
166 352 : if (!deterministicMNManager->IsDIP3Enforced(pindexQuorum->nHeight) ||
167 352 : !activeMasternodeManager) {
168 : return false;
169 : }
170 :
171 358 : auto mns = deterministicMNManager->GetAllQuorumMembers(params.type, pindexQuorum);
172 :
173 179 : if (!curSession->Init(pindexQuorum, mns, activeMasternodeManager->GetProTx())) {
174 11 : LogPrintf("CDKGSessionHandler::%s -- quorum initialization failed for %s\n", __func__, curSession->params.name);
175 : return false;
176 : }
177 :
178 : return true;
179 : }
180 :
181 182408 : CDKGSessionHandler::QuorumPhaseAndHash CDKGSessionHandler::GetPhaseAndQuorumHash() const
182 : {
183 182408 : LOCK(cs);
184 182408 : return {phase, quorumHash};
185 : }
186 :
187 557 : class AbortPhaseException : public std::exception {
188 : };
189 :
190 1332 : void CDKGSessionHandler::WaitForNextPhase(QuorumPhase curPhase,
191 : QuorumPhase nextPhase,
192 : const uint256& expectedQuorumHash,
193 : const WhileWaitFunc& runWhileWaiting)
194 : {
195 1332 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
196 :
197 281566 : while (true) {
198 141449 : if (stopRequested || ShutdownRequested()) {
199 319 : throw AbortPhaseException();
200 : }
201 141130 : auto currState = GetPhaseAndQuorumHash();
202 4191480 : if (!expectedQuorumHash.IsNull() && currState.quorumHash != expectedQuorumHash) {
203 0 : throw AbortPhaseException();
204 : }
205 141130 : if (currState.phase == nextPhase) {
206 : break;
207 : }
208 140171 : if (curPhase != QuorumPhase_None && currState.phase != curPhase) {
209 54 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - aborting due unexpected phase change\n", __func__, params.name);
210 54 : throw AbortPhaseException();
211 : }
212 140117 : if (!runWhileWaiting()) {
213 139763 : MilliSleep(100);
214 : }
215 140117 : }
216 :
217 959 : if (nextPhase == QuorumPhase_Initialized) {
218 352 : quorumDKGDebugManager->ResetLocalSessionStatus(params.type);
219 : } else {
220 1214 : quorumDKGDebugManager->UpdateLocalSessionStatus(params.type, [&](CDKGDebugSessionStatus& status) {
221 303 : bool changed = status.phase != (uint8_t) nextPhase;
222 303 : status.phase = (uint8_t) nextPhase;
223 303 : return changed;
224 : });
225 : }
226 :
227 959 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
228 :
229 959 : }
230 :
231 184 : void CDKGSessionHandler::WaitForNewQuorum(const uint256& oldQuorumHash)
232 : {
233 184 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting\n", __func__, params.name);
234 :
235 82428 : while (true) {
236 41306 : if (stopRequested || ShutdownRequested()) {
237 28 : throw AbortPhaseException();
238 : }
239 41278 : auto currState = GetPhaseAndQuorumHash();
240 41278 : if (currState.quorumHash != oldQuorumHash) {
241 : break;
242 : }
243 41122 : MilliSleep(100);
244 41122 : }
245 :
246 156 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done\n", __func__, params.name);
247 156 : }
248 :
249 : // Sleep some time to not fully overload the whole network
250 494 : void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,
251 : const uint256& expectedQuorumHash,
252 : double randomSleepFactor,
253 : const WhileWaitFunc& runWhileWaiting)
254 : {
255 494 : if (Params().IsRegTestNet()) {
256 : // On regtest, blocks can be mined on demand without any significant time passing between these.
257 : // We shouldn't wait before phases in this case.
258 494 : return;
259 : }
260 :
261 0 : if (!curSession->AreWeMember()) {
262 : // Non-members do not participate and do not create any network load, no need to sleep.
263 : return;
264 : }
265 :
266 : // Two blocks can come very close to each other, this happens pretty regularly. We don't want to be
267 : // left behind and marked as a bad member. This means that we should not count the last block of the
268 : // phase as a safe one to keep sleeping, that's why we calculate the phase sleep time as a time of
269 : // the full phase minus one block here.
270 0 : const int nTargetSpacing = Params().GetConsensus().nTargetSpacing;
271 0 : double phaseSleepTime = (params.dkgPhaseBlocks - 1) * nTargetSpacing * 1000;
272 : // Expected phase sleep time per member
273 0 : double phaseSleepTimePerMember = phaseSleepTime / params.size;
274 : // Don't expect perfect block times and thus reduce the phase time to be on the secure side (caller chooses factor)
275 0 : double adjustedPhaseSleepTimePerMember = phaseSleepTimePerMember * randomSleepFactor;
276 :
277 0 : int64_t sleepTime = (int64_t)(adjustedPhaseSleepTimePerMember * curSession->GetMyMemberIndex());
278 0 : int64_t endTime = GetTimeMillis() + sleepTime;
279 0 : int heightTmp{-1};
280 0 : int heightStart{-1};
281 0 : {
282 0 : LOCK(cs);
283 0 : heightTmp = heightStart = currentHeight;
284 : }
285 :
286 0 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting sleep for %d ms, curPhase=%d\n", __func__, params.name, sleepTime, curPhase);
287 :
288 0 : while (GetTimeMillis() < endTime) {
289 0 : if (stopRequested) {
290 0 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - aborting due to stop/shutdown requested\n", __func__, params.name);
291 0 : throw AbortPhaseException();
292 : }
293 0 : {
294 0 : LOCK(cs);
295 0 : if (currentHeight > heightTmp) {
296 : // New block(s) just came in
297 0 : int64_t expectedBlockTime = (currentHeight - heightStart) * nTargetSpacing * 1000;
298 0 : if (expectedBlockTime > sleepTime) {
299 : // Blocks came faster than we expected, jump into the phase func asap
300 : break;
301 : }
302 : heightTmp = currentHeight;
303 : }
304 0 : if (phase != curPhase || quorumHash != expectedQuorumHash) {
305 : // Something went wrong and/or we missed quite a few blocks and it's just too late now
306 0 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name);
307 0 : throw AbortPhaseException();
308 : }
309 : }
310 0 : if (!runWhileWaiting()) {
311 0 : MilliSleep(100);
312 : }
313 : }
314 :
315 0 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done, curPhase=%d\n", __func__, params.name, curPhase);
316 : }
317 :
318 494 : void CDKGSessionHandler::HandlePhase(QuorumPhase curPhase,
319 : QuorumPhase nextPhase,
320 : const uint256& expectedQuorumHash,
321 : double randomSleepFactor,
322 : const StartPhaseFunc& startPhaseFunc,
323 : const WhileWaitFunc& runWhileWaiting)
324 : {
325 494 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - starting, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
326 :
327 494 : SleepBeforePhase(curPhase, expectedQuorumHash, randomSleepFactor, runWhileWaiting);
328 494 : startPhaseFunc();
329 494 : WaitForNextPhase(curPhase, nextPhase, expectedQuorumHash, runWhileWaiting);
330 :
331 464 : LogPrint(BCLog::DKG, "CDKGSessionHandler::%s -- %s - done, curPhase=%d, nextPhase=%d\n", __func__, params.name, curPhase, nextPhase);
332 464 : }
333 :
334 : // returns a set of NodeIds which sent invalid messages
335 : template<typename Message>
336 353 : std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<std::pair<NodeId, std::shared_ptr<Message>>>& messages)
337 : {
338 353 : if (messages.empty()) {
339 0 : return {};
340 : }
341 :
342 353 : std::set<NodeId> ret;
343 353 : bool revertToSingleVerification = false;
344 :
345 353 : CBLSSignature aggSig;
346 353 : std::vector<CBLSPublicKey> pubKeys;
347 706 : std::vector<uint256> messageHashes;
348 706 : std::set<uint256> messageHashesSet;
349 353 : pubKeys.reserve(messages.size());
350 353 : messageHashes.reserve(messages.size());
351 353 : bool first = true;
352 735 : for (const auto& p : messages ) {
353 403 : const auto& msg = *p.second;
354 :
355 403 : auto member = session.GetMember(msg.proTxHash);
356 403 : if (!member) {
357 : // should not happen as it was verified before
358 0 : ret.emplace(p.first);
359 0 : continue;
360 : }
361 :
362 403 : if (first) {
363 353 : aggSig = msg.sig;
364 : } else {
365 50 : aggSig.AggregateInsecure(msg.sig);
366 : }
367 403 : first = false;
368 :
369 403 : auto msgHash = msg.GetSignHash();
370 403 : if (!messageHashesSet.emplace(msgHash).second) {
371 : // can only happen in 2 cases:
372 : // 1. Someone sent us the same message twice but with differing signature, meaning that at least one of them
373 : // must be invalid. In this case, we'd have to revert to single message verification nevertheless
374 : // 2. Someone managed to find a way to create two different binary representations of a message that deserializes
375 : // to the same object representation. This would be some form of malleability. However, this shouldn't be
376 : // possible as only deterministic/unique BLS signatures and very simple data types are involved
377 21 : revertToSingleVerification = true;
378 21 : break;
379 : }
380 :
381 382 : pubKeys.emplace_back(member->dmn->pdmnState->pubKeyOperator.Get());
382 382 : messageHashes.emplace_back(msgHash);
383 : }
384 : if (!revertToSingleVerification) {
385 332 : bool valid = aggSig.VerifyInsecureAggregated(pubKeys, messageHashes);
386 332 : if (valid) {
387 : // all good
388 332 : return ret;
389 : }
390 :
391 : // are all messages from the same node?
392 : NodeId firstNodeId;
393 0 : first = true;
394 0 : bool nodeIdsAllSame = true;
395 0 : for (auto it = messages.begin(); it != messages.end(); ++it) {
396 : if (first) {
397 0 : firstNodeId = it->first;
398 : } else {
399 : first = false;
400 : if (it->first != firstNodeId) {
401 : nodeIdsAllSame = false;
402 : break;
403 : }
404 : }
405 : }
406 : // if yes, take a short path and return a set with only him
407 : if (nodeIdsAllSame) {
408 0 : ret.emplace(firstNodeId);
409 332 : return ret;
410 : }
411 : // different nodes, let's figure out who are the bad ones
412 : }
413 :
414 63 : for (const auto& p : messages) {
415 42 : if (ret.count(p.first)) {
416 0 : continue;
417 : }
418 :
419 42 : const auto& msg = *p.second;
420 42 : auto member = session.GetMember(msg.proTxHash);
421 42 : bool valid = msg.sig.VerifyInsecure(member->dmn->pdmnState->pubKeyOperator.Get(), msg.GetSignHash());
422 42 : if (!valid) {
423 42 : ret.emplace(p.first);
424 : }
425 : }
426 353 : return ret;
427 : }
428 :
429 : template<typename Message>
430 10773 : static bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount)
431 : {
432 10773 : auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
433 10773 : if (msgs.empty()) {
434 : return false;
435 : }
436 :
437 10773 : std::vector<uint256> hashes;
438 708 : std::vector<std::pair<NodeId, std::shared_ptr<Message>>> preverifiedMessages;
439 354 : hashes.reserve(msgs.size());
440 354 : preverifiedMessages.reserve(msgs.size());
441 :
442 758 : for (const auto& p : msgs) {
443 404 : if (!p.second) {
444 0 : LogPrint(BCLog::NET, "%s -- failed to deserialize message, peer=%d\n", __func__, p.first);
445 : {
446 0 : LOCK(cs_main);
447 0 : Misbehaving(p.first, 100);
448 : }
449 1 : continue;
450 : }
451 404 : const auto& msg = *p.second;
452 :
453 404 : bool ban = false;
454 404 : if (!session.PreVerifyMessage(msg, ban)) {
455 1 : if (ban) {
456 1 : LogPrint(BCLog::NET, "%s -- banning node due to failed preverification, peer=%d\n", __func__, p.first);
457 : {
458 2 : LOCK(cs_main);
459 2 : Misbehaving(p.first, 100);
460 : }
461 : }
462 1 : LogPrint(BCLog::NET, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, p.first);
463 1 : continue;
464 : }
465 403 : hashes.emplace_back(::SerializeHash(msg));
466 403 : preverifiedMessages.emplace_back(p);
467 : }
468 354 : if (preverifiedMessages.empty()) {
469 : return true;
470 : }
471 :
472 707 : auto badNodes = BatchVerifyMessageSigs(session, preverifiedMessages);
473 353 : if (!badNodes.empty()) {
474 0 : LOCK(cs_main);
475 0 : for (auto nodeId : badNodes) {
476 0 : LogPrint(BCLog::NET, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId);
477 0 : Misbehaving(nodeId, 100);
478 : }
479 : }
480 :
481 756 : for (size_t i = 0; i < preverifiedMessages.size(); i++) {
482 403 : NodeId nodeId = preverifiedMessages[i].first;
483 403 : if (badNodes.count(nodeId)) {
484 0 : continue;
485 : }
486 403 : const auto& msg = *preverifiedMessages[i].second;
487 403 : bool ban = false;
488 403 : session.ReceiveMessage(hashes[i], msg, ban);
489 403 : if (ban) {
490 0 : LogPrint(BCLog::NET, "%s -- banning node after ReceiveMessage failed, peer=%d\n", __func__, nodeId);
491 0 : LOCK(cs_main);
492 0 : Misbehaving(nodeId, 100);
493 0 : badNodes.emplace(nodeId);
494 : }
495 : }
496 :
497 353 : return true;
498 : }
499 :
500 670 : void CDKGSessionHandler::HandleDKGRound()
501 : {
502 670 : uint256 curQuorumHash;
503 :
504 988 : WaitForNextPhase(QuorumPhase_None, QuorumPhase_Initialized, UINT256_ZERO, []{return false;});
505 :
506 352 : {
507 352 : LOCK(cs);
508 352 : pendingContributions.Clear();
509 352 : pendingComplaints.Clear();
510 352 : pendingJustifications.Clear();
511 352 : pendingPrematureCommitments.Clear();
512 352 : curQuorumHash = quorumHash;
513 : }
514 :
515 704 : const CBlockIndex* pindexQuorum = WITH_LOCK(cs_main, return LookupBlockIndex(curQuorumHash));
516 352 : if (!pindexQuorum) {
517 : // should never happen
518 0 : LogPrintf("%s: ERROR: Unable to find block %s\n", __func__, curQuorumHash.ToString());
519 0 : return;
520 : }
521 :
522 352 : if (!InitNewQuorum(pindexQuorum)) {
523 : // should actually never happen
524 184 : WaitForNewQuorum(curQuorumHash);
525 156 : throw AbortPhaseException();
526 : }
527 :
528 168 : quorumDKGDebugManager->UpdateLocalSessionStatus(params.type, [&](CDKGDebugSessionStatus& status) {
529 82 : bool changed = status.phase != (uint8_t) QuorumPhase_Initialized;
530 82 : status.phase = (uint8_t) QuorumPhase_Initialized;
531 82 : return changed;
532 : });
533 :
534 168 : EnsureQuorumConnections(params.type, pindexQuorum, curSession->myProTxHash);
535 336 : if (curSession->AreWeMember()) {
536 82 : AddQuorumProbeConnections(params.type, pindexQuorum, curSession->myProTxHash);
537 : }
538 :
539 193 : WaitForNextPhase(QuorumPhase_Initialized, QuorumPhase_Contribute, curQuorumHash, []{return false;});
540 :
541 : // Contribute
542 286 : auto fContributeStart = [this]() {
543 143 : curSession->Contribute(pendingContributions);
544 143 : };
545 3007 : auto fContributeWait = [this] {
546 2864 : return ProcessPendingMessageBatch<CDKGContribution>(*curSession, pendingContributions, 8);
547 143 : };
548 305 : HandlePhase(QuorumPhase_Contribute, QuorumPhase_Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait);
549 :
550 : // Complain
551 248 : auto fComplainStart = [this]() {
552 124 : curSession->VerifyAndComplain(pendingComplaints);
553 124 : };
554 3073 : auto fComplainWait = [this] {
555 2949 : return ProcessPendingMessageBatch<CDKGComplaint>(*curSession, pendingComplaints, 8);
556 124 : };
557 258 : HandlePhase(QuorumPhase_Complain, QuorumPhase_Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait);
558 :
559 : // Justify
560 228 : auto fJustifyStart = [this]() {
561 114 : curSession->VerifyAndJustify(pendingJustifications);
562 114 : };
563 2571 : auto fJustifyWait = [this] {
564 2457 : return ProcessPendingMessageBatch<CDKGJustification>(*curSession, pendingJustifications, 8);
565 114 : };
566 229 : HandlePhase(QuorumPhase_Justify, QuorumPhase_Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait);
567 :
568 : // Commit
569 226 : auto fCommitStart = [this]() {
570 113 : curSession->VerifyAndCommit(pendingPrematureCommitments);
571 113 : };
572 2616 : auto fCommitWait = [this] {
573 2503 : return ProcessPendingMessageBatch<CDKGPrematureCommitment>(*curSession, pendingPrematureCommitments, 8);
574 113 : };
575 226 : HandlePhase(QuorumPhase_Commit, QuorumPhase_Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait);
576 :
577 226 : auto finalCommitments = curSession->FinalizeCommitments();
578 170 : for (const auto& fqc : finalCommitments) {
579 57 : quorumBlockProcessor->AddAndRelayMinableCommitment(fqc);
580 : }
581 : }
582 :
583 347 : void CDKGSessionHandler::PhaseHandlerThread()
584 : {
585 1017 : while (!stopRequested && !ShutdownRequested()) {
586 670 : try {
587 670 : HandleDKGRound();
588 1114 : } catch (AbortPhaseException& e) {
589 557 : quorumDKGDebugManager->UpdateLocalSessionStatus(params.type, [&](CDKGDebugSessionStatus& status) {
590 40 : status.aborted = true;
591 40 : return true;
592 : });
593 557 : LogPrintf("CDKGSessionHandler::%s -- aborted current DKG session for llmq=%s\n", __func__, params.name);
594 : }
595 : }
596 347 : }
597 :
598 : }
|