Line data Source code
1 : // Copyright (c) 2015 The Bitcoin Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #include "zmqpublishnotifier.h"
6 :
7 : #include "chainparams.h"
8 : #include "util/system.h"
9 : #include "crypto/common.h"
10 : #include "validation.h" // cs_main
11 :
12 : static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
13 :
14 : static const char *MSG_HASHBLOCK = "hashblock";
15 : static const char *MSG_HASHTX = "hashtx";
16 : static const char *MSG_RAWBLOCK = "rawblock";
17 : static const char *MSG_RAWTX = "rawtx";
18 :
19 : // Internal function to send multipart message
20 22 : static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
21 : {
22 22 : va_list args;
23 22 : va_start(args, size);
24 :
25 110 : while (1)
26 : {
27 66 : zmq_msg_t msg;
28 :
29 66 : int rc = zmq_msg_init_size(&msg, size);
30 66 : if (rc != 0)
31 : {
32 0 : zmqError("Unable to initialize ZMQ msg");
33 0 : va_end(args);
34 0 : return -1;
35 : }
36 :
37 66 : void *buf = zmq_msg_data(&msg);
38 66 : memcpy(buf, data, size);
39 :
40 66 : data = va_arg(args, const void*);
41 :
42 88 : rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
43 66 : if (rc == -1)
44 : {
45 0 : zmqError("Unable to send ZMQ msg");
46 0 : zmq_msg_close(&msg);
47 0 : va_end(args);
48 0 : return -1;
49 : }
50 :
51 66 : zmq_msg_close(&msg);
52 :
53 66 : if (!data)
54 : break;
55 :
56 44 : size = va_arg(args, size_t);
57 44 : }
58 22 : va_end(args);
59 22 : return 0;
60 : }
61 :
62 4 : bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
63 : {
64 4 : assert(!psocket);
65 :
66 : // check if address is being used by other publish notifier
67 4 : std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
68 :
69 4 : if (i==mapPublishNotifiers.end())
70 : {
71 1 : psocket = zmq_socket(pcontext, ZMQ_PUB);
72 1 : if (!psocket)
73 : {
74 0 : zmqError("Failed to create socket");
75 0 : return false;
76 : }
77 :
78 1 : int rc = zmq_bind(psocket, address.c_str());
79 1 : if (rc!=0)
80 : {
81 0 : zmqError("Failed to bind address");
82 0 : zmq_close(psocket);
83 0 : return false;
84 : }
85 :
86 : // register this notifier for the address, so it can be reused for other publish notifier
87 1 : mapPublishNotifiers.emplace(address, this);
88 1 : return true;
89 : }
90 : else
91 : {
92 3 : LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address);
93 :
94 3 : psocket = i->second->psocket;
95 3 : mapPublishNotifiers.emplace(address, this);
96 :
97 3 : return true;
98 : }
99 : }
100 :
101 4 : void CZMQAbstractPublishNotifier::Shutdown()
102 : {
103 4 : assert(psocket);
104 :
105 4 : int count = mapPublishNotifiers.count(address);
106 :
107 : // remove this notifier from the list of publishers using this address
108 4 : typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
109 4 : std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
110 :
111 4 : for (iterator it = iterpair.first; it != iterpair.second; ++it)
112 : {
113 4 : if (it->second==this)
114 : {
115 4 : mapPublishNotifiers.erase(it);
116 : break;
117 : }
118 : }
119 :
120 4 : if (count == 1)
121 : {
122 1 : LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
123 1 : int linger = 0;
124 1 : zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
125 1 : zmq_close(psocket);
126 : }
127 :
128 4 : psocket = 0;
129 4 : }
130 :
131 22 : bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
132 : {
133 22 : assert(psocket);
134 :
135 : /* send three parts, command & data & a LE 4byte sequence number */
136 22 : unsigned char msgseq[sizeof(uint32_t)];
137 22 : WriteLE32(&msgseq[0], nSequence);
138 22 : int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
139 22 : if (rc == -1)
140 : return false;
141 :
142 : /* increment memory only sequence number after sending */
143 22 : nSequence++;
144 :
145 22 : return true;
146 : }
147 :
148 5 : bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
149 : {
150 5 : uint256 hash = pindex->GetBlockHash();
151 10 : LogPrint(BCLog::ZMQ, "Publish hashblock %s\n", hash.GetHex());
152 : char data[32];
153 165 : for (unsigned int i = 0; i < 32; i++)
154 160 : data[31 - i] = hash.begin()[i];
155 5 : return SendMessage(MSG_HASHBLOCK, data, 32);
156 : }
157 :
158 6 : bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
159 : {
160 6 : uint256 hash = transaction.GetHash();
161 12 : LogPrint(BCLog::ZMQ, "Publish hashtx %s\n", hash.GetHex());
162 : char data[32];
163 198 : for (unsigned int i = 0; i < 32; i++)
164 192 : data[31 - i] = hash.begin()[i];
165 6 : return SendMessage(MSG_HASHTX, data, 32);
166 : }
167 :
168 5 : bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
169 : {
170 10 : LogPrint(BCLog::ZMQ, "Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
171 :
172 10 : CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
173 5 : {
174 5 : LOCK(cs_main);
175 10 : CBlock block;
176 5 : if(!ReadBlockFromDisk(block, pindex))
177 : {
178 0 : zmqError("Can't read block from disk");
179 0 : return false;
180 : }
181 :
182 5 : ss << block;
183 : }
184 :
185 5 : return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
186 : }
187 :
188 6 : bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
189 : {
190 6 : uint256 hash = transaction.GetHash();
191 12 : LogPrint(BCLog::ZMQ, "Publish rawtx %s\n", hash.GetHex());
192 6 : CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
193 6 : ss << transaction;
194 6 : return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
195 : }
|