Line data Source code
1 : // Copyright (c) 2015 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include "zmqnotificationinterface.h" 6 : #include "zmqpublishnotifier.h" 7 : 8 : #include "version.h" 9 : #include "streams.h" 10 : #include "util/system.h" 11 : 12 0 : void zmqError(const char *str) 13 : { 14 0 : LogPrint(BCLog::ZMQ, "Error: %s, errno=%s\n", str, zmq_strerror(errno)); 15 0 : } 16 : 17 1 : CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) 18 : { 19 1 : } 20 : 21 2 : CZMQNotificationInterface::~CZMQNotificationInterface() 22 : { 23 1 : Shutdown(); 24 : 25 5 : for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) 26 : { 27 4 : delete *i; 28 : } 29 2 : } 30 : 31 357 : CZMQNotificationInterface* CZMQNotificationInterface::Create() 32 : { 33 357 : CZMQNotificationInterface* notificationInterface = nullptr; 34 357 : std::map<std::string, CZMQNotifierFactory> factories; 35 714 : std::list<CZMQAbstractNotifier*> notifiers; 36 : 37 357 : factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; 38 357 : factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; 39 357 : factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; 40 357 : factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; 41 : 42 1785 : for (const auto& entry : factories) 43 : { 44 2856 : std::string arg("-zmq" + entry.first); 45 1428 : if (gArgs.IsArgSet(arg)) 46 : { 47 4 : CZMQNotifierFactory factory = entry.second; 48 8 : std::string address = gArgs.GetArg(arg, ""); 49 4 : CZMQAbstractNotifier *notifier = factory(); 50 4 : notifier->SetType(entry.first); 51 4 : notifier->SetAddress(address); 52 4 : notifiers.push_back(notifier); 53 : } 54 : } 55 : 56 357 : if (!notifiers.empty()) 57 : { 58 1 : notificationInterface = new CZMQNotificationInterface(); 59 1 : notificationInterface->notifiers = notifiers; 60 : 61 1 : if (!notificationInterface->Initialize()) 62 : { 63 0 : delete notificationInterface; 64 : notificationInterface = nullptr; 65 : } 66 : } 67 : 68 714 : return notificationInterface; 69 : } 70 : 71 : // Called at startup to conditionally set up ZMQ socket(s) 72 1 : bool CZMQNotificationInterface::Initialize() 73 : { 74 1 : LogPrint(BCLog::ZMQ, "Initialize notification interface\n"); 75 1 : assert(!pcontext); 76 : 77 1 : pcontext = zmq_init(1); 78 : 79 1 : if (!pcontext) 80 : { 81 0 : zmqError("Unable to initialize context"); 82 0 : return false; 83 : } 84 : 85 1 : std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); 86 5 : for (; i!=notifiers.end(); ++i) 87 : { 88 4 : CZMQAbstractNotifier *notifier = *i; 89 4 : if (notifier->Initialize(pcontext)) 90 : { 91 16 : LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); 92 : } 93 : else 94 : { 95 0 : LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); 96 : break; 97 : } 98 : } 99 : 100 1 : if (i!=notifiers.end()) 101 : { 102 0 : return false; 103 : } 104 : 105 : return true; 106 : } 107 : 108 : // Called during shutdown sequence 109 1 : void CZMQNotificationInterface::Shutdown() 110 : { 111 1 : LogPrint(BCLog::ZMQ, "Shutdown notification interface\n"); 112 1 : if (pcontext) 113 : { 114 5 : for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) 115 : { 116 4 : CZMQAbstractNotifier *notifier = *i; 117 16 : LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); 118 4 : notifier->Shutdown(); 119 : } 120 1 : zmq_ctx_destroy(pcontext); 121 : 122 1 : pcontext = 0; 123 : } 124 1 : } 125 : 126 5 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) 127 : { 128 5 : if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones 129 : return; 130 : 131 5 : for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) 132 : { 133 20 : CZMQAbstractNotifier *notifier = *i; 134 20 : if (notifier->NotifyBlock(pindexNew)) 135 : { 136 45 : i++; 137 : } 138 : else 139 : { 140 0 : notifier->Shutdown(); 141 0 : i = notifiers.erase(i); 142 : } 143 : } 144 : } 145 : 146 6 : void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx) 147 : { 148 : // Used by BlockConnected and BlockDisconnected as well, because they're 149 : // all the same external callback. 150 6 : const CTransaction& tx = *ptx; 151 : 152 6 : for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) 153 : { 154 24 : CZMQAbstractNotifier *notifier = *i; 155 24 : if (notifier->NotifyTransaction(tx)) 156 : { 157 54 : i++; 158 : } 159 : else 160 : { 161 0 : notifier->Shutdown(); 162 0 : i = notifiers.erase(i); 163 : } 164 : } 165 6 : } 166 : 167 5 : void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) 168 : { 169 10 : for (const CTransactionRef& ptx : pblock->vtx) { 170 : // Do a normal notify for each transaction added in the block 171 5 : TransactionAddedToMempool(ptx); 172 : } 173 5 : } 174 : 175 0 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const uint256& blockHash, int nBlockHeight, int64_t blockTime) 176 : { 177 0 : for (const CTransactionRef& ptx : pblock->vtx) { 178 : // Do a normal notify for each transaction removed in block disconnection 179 0 : TransactionAddedToMempool(ptx); 180 : } 181 0 : }