ZMQ Notification Types
Bitcoin Core provides five ZMQ notification types for monitoring blockchain and mempool events in real-time.
All ZMQ messages are multipart messages with three components:
┌─────────────┬──────────────────────────────┬──────────────────────┐
│ Topic │ Body │ Sequence Number │
│ (string) │ (binary data) │ (4-byte LE uint) │
└─────────────┴──────────────────────────────┴──────────────────────┘
- Topic: String identifying the notification type (e.g., “hashtx”, “rawblock”)
- Body: Binary message payload (format varies by topic)
- Sequence Number: 4-byte little-endian unsigned integer (distinct per topic)
All transaction and block hashes are in reversed byte order (same format as RPC interface and block explorers).
hashtx - Transaction Hash Notifications
Configuration
bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332
| Component | Type | Description |
|---|
| Topic | String | "hashtx" |
| Body | 32 bytes | Transaction hash (reversed byte order) |
| Sequence | 4 bytes | Message sequence number (LE) |
When Triggered
Notifies about transactions in two scenarios:
- When a transaction is added to the mempool
- When a block arrives containing the transaction
Transactions may be published multiple times: once when entering the mempool and again in each block that includes them.
Example Client (Python)
import zmq
import struct
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28332")
socket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
while True:
topic, body, seq = socket.recv_multipart()
# Parse sequence number
sequence = struct.unpack('<I', seq)[0]
# Hash is in reversed byte order
txid = body[::-1].hex()
print(f"Transaction {txid} (seq: {sequence})")
hashblock - Block Hash Notifications
Configuration
bitcoind -zmqpubhashblock=tcp://127.0.0.1:28333
| Component | Type | Description |
|---|
| Topic | String | "hashblock" |
| Body | 32 bytes | Block hash (reversed byte order) |
| Sequence | 4 bytes | Message sequence number (LE) |
When Triggered
Notifies when the chain tip is updated.
AssumeUTXO: When assumeutxo is active, this notification is not issued for historical blocks connected to the background validation chainstate.
Reorganization Behavior
During a blockchain reorganization:
- Only the new tip is notified
- Intermediate blocks are not individually announced
- Clients must retrieve the chain from their last known block to the new tip
Example Client (Python)
import zmq
import struct
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28333")
socket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
while True:
topic, body, seq = socket.recv_multipart()
sequence = struct.unpack('<I', seq)[0]
block_hash = body[::-1].hex()
print(f"New block {block_hash} (seq: {sequence})")
rawtx - Raw Transaction Notifications
Configuration
bitcoind -zmqpubrawtx=tcp://127.0.0.1:28334
| Component | Type | Description |
|---|
| Topic | String | "rawtx" |
| Body | Variable | Serialized transaction (with witness data) |
| Sequence | 4 bytes | Message sequence number (LE) |
When Triggered
Same trigger conditions as hashtx:
- Transaction added to mempool
- Block arrives containing the transaction
Example Client (Python)
import zmq
import struct
from io import BytesIO
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28334")
socket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
while True:
topic, body, seq = socket.recv_multipart()
sequence = struct.unpack('<I', seq)[0]
tx_hex = body.hex()
print(f"Raw transaction (seq: {sequence}):")
print(f" Length: {len(body)} bytes")
print(f" Hex: {tx_hex[:80]}...")
# Parse transaction using bitcoin library
# tx = CTransaction.deserialize(BytesIO(body))
rawblock - Raw Block Notifications
Configuration
bitcoind -zmqpubrawblock=tcp://127.0.0.1:28335
| Component | Type | Description |
|---|
| Topic | String | "rawblock" |
| Body | Variable | Serialized block (80-byte header + transactions) |
| Sequence | 4 bytes | Message sequence number (LE) |
When Triggered
Same trigger conditions as hashblock - when the chain tip is updated.
Example Client (Python)
import zmq
import struct
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28335")
socket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
while True:
topic, body, seq = socket.recv_multipart()
sequence = struct.unpack('<I', seq)[0]
# First 80 bytes are the block header
header = body[:80]
block_size = len(body)
print(f"Raw block (seq: {sequence}):")
print(f" Size: {block_size} bytes")
print(f" Header: {header.hex()}")
sequence - Mempool and Block Sequence
Configuration
bitcoind -zmqpubsequence=tcp://127.0.0.1:28336
The sequence topic provides a total ordering of all mempool and block events.
Block Connect/Disconnect
┌──────────┬────────────────┬────────┬──────────────────┐
│ "sequence" │ 32-byte hash │ 'C/D' │ 4-byte seq num │
└──────────┴────────────────┴────────┴──────────────────┘
| Component | Type | Description |
|---|
| Topic | String | "sequence" |
| Body | 33 bytes | Hash (32 bytes) + Label (1 byte: ‘C’ or ‘D’) |
| Sequence | 4 bytes | Message sequence number (LE) |
- ‘C’: Block with this hash connected
- ‘D’: Block with this hash disconnected
Mempool Add/Remove
┌──────────┬────────────────┬────────┬──────────────────┬──────────────────┐
│ "sequence" │ 32-byte hash │ 'A/R' │ 8-byte mempool │ 4-byte seq num │
│ │ │ │ sequence (LE) │ │
└──────────┴────────────────┴────────┴──────────────────┴──────────────────┘
| Component | Type | Description |
|---|
| Topic | String | "sequence" |
| Body | 41 bytes | Hash (32) + Label (1) + Mempool seq (8) |
| Sequence | 4 bytes | Message sequence number (LE) |
- ‘A’: Transaction with this hash added to mempool
- ‘R’: Transaction with this hash removed from mempool (non-block reason)
Mempool Sequence Number
The 8-byte mempool sequence number provides total ordering of mempool events. This is separate from the ZMQ message sequence number.
Use the sequence topic when you need complete visibility into block reorganizations and mempool changes.
Example Client (Python)
import zmq
import struct
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28336")
socket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
while True:
topic, body, seq = socket.recv_multipart()
sequence = struct.unpack('<I', seq)[0]
# Parse hash (reversed byte order)
hash_hex = body[:32][::-1].hex()
# Parse label
label = chr(body[32])
# Parse mempool sequence if present (tx events only)
mempool_seq = None
if len(body) == 41: # Hash + label + mempool sequence
mempool_seq = struct.unpack('<Q', body[33:41])[0]
event_type = {
'C': 'Block Connected',
'D': 'Block Disconnected',
'A': 'TX Added to Mempool',
'R': 'TX Removed from Mempool'
}.get(label, 'Unknown')
print(f"[{sequence}] {event_type}: {hash_hex}", end="")
if mempool_seq is not None:
print(f" (mempool seq: {mempool_seq})")
else:
print()
Multi-Topic Subscriber
Subscribe to multiple topics on a single socket:
import zmq
import struct
context = zmq.Context()
socket = context.socket(zmq.SUB)
# Connect to shared endpoint
socket.connect("tcp://127.0.0.1:28332")
# Subscribe to multiple topics
socket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
socket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
socket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
# Set receive high water mark (0 = unlimited)
socket.setsockopt(zmq.RCVHWM, 0)
while True:
topic, body, seq = socket.recv_multipart()
sequence = struct.unpack('<I', seq)[0]
topic_str = topic.decode('utf-8')
if topic_str == "hashblock":
block_hash = body[::-1].hex()
print(f"Block: {block_hash}")
elif topic_str == "hashtx":
txid = body[::-1].hex()
print(f"TX: {txid}")
elif topic_str == "sequence":
hash_hex = body[:32][::-1].hex()
label = chr(body[32])
print(f"Sequence: {label} {hash_hex}")
Async Example (Python asyncio)
Based on Bitcoin Core’s contrib/zmq/zmq_sub.py:
import asyncio
import zmq
import zmq.asyncio
import struct
class ZMQHandler:
def __init__(self, endpoint="tcp://127.0.0.1:28332"):
self.loop = asyncio.get_event_loop()
self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
# Subscribe to all topics
for topic in ["hashblock", "hashtx", "rawblock", "rawtx", "sequence"]:
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, topic)
self.zmqSubSocket.connect(endpoint)
async def handle(self):
topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = struct.unpack('<I', seq)[0]
topic_str = topic.decode('utf-8')
if topic_str == "hashblock":
print(f"[HASHBLOCK {sequence}] {body.hex()}")
elif topic_str == "hashtx":
print(f"[HASHTX {sequence}] {body.hex()}")
elif topic_str == "rawblock":
print(f"[RAWBLOCK {sequence}] {len(body)} bytes")
elif topic_str == "rawtx":
print(f"[RAWTX {sequence}] {body.hex()}")
elif topic_str == "sequence":
hash_hex = body[:32].hex()
label = chr(body[32])
mempool_seq = None
if len(body) == 41:
mempool_seq = struct.unpack("<Q", body[33:41])[0]
print(f"[SEQUENCE {sequence}] {label} {hash_hex} {mempool_seq}")
# Schedule next message
asyncio.ensure_future(self.handle())
def start(self):
self.loop.create_task(self.handle())
self.loop.run_forever()
# Run the handler
handler = ZMQHandler()
handler.start()
Best Practices
Error Handling
import zmq
import struct
import time
def connect_with_retry(endpoint, max_retries=5):
context = zmq.Context()
socket = context.socket(zmq.SUB)
for attempt in range(max_retries):
try:
socket.connect(endpoint)
socket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
print(f"Connected to {endpoint}")
return socket
except zmq.ZMQError as e:
print(f"Connection attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt) # Exponential backoff
raise Exception(f"Failed to connect after {max_retries} attempts")
Sequence Gap Detection
class SequenceTracker:
def __init__(self):
self.last_seq = {}
def check(self, topic, seq):
if topic in self.last_seq:
expected = (self.last_seq[topic] + 1) % (2**32)
if seq != expected:
gap = (seq - expected) % (2**32)
print(f"WARNING: Lost {gap} messages on {topic}")
return gap
self.last_seq[topic] = seq
return 0
# Usage
tracker = SequenceTracker()
while True:
topic, body, seq = socket.recv_multipart()
sequence = struct.unpack('<I', seq)[0]
topic_str = topic.decode('utf-8')
tracker.check(topic_str, sequence)
# Process message...
Multiple Endpoints
Configure different topics on different endpoints for load distribution:
# bitcoin.conf
zmqpubhashtx=tcp://127.0.0.1:28332
zmqpubhashblock=tcp://127.0.0.1:28333
zmqpubrawtx=tcp://127.0.0.1:28334
zmqpubrawblock=tcp://127.0.0.1:28335
zmqpubsequence=tcp://127.0.0.1:28336
ZMQ Overview
Learn about the ZMQ notification system architecture
RPC Reference
Explore Bitcoin Core’s JSON-RPC interface