Skip to main content

ZMQ Notification Types

Bitcoin Core provides five ZMQ notification types for monitoring blockchain and mempool events in real-time.

Message Format

All ZMQ messages are multipart messages with three components:
┌─────────────┬──────────────────────────────┬──────────────────────┐
│   Topic     │            Body              │   Sequence Number    │
│  (string)   │      (binary data)           │  (4-byte LE uint)    │
└─────────────┴──────────────────────────────┴──────────────────────┘
  • Topic: String identifying the notification type (e.g., “hashtx”, “rawblock”)
  • Body: Binary message payload (format varies by topic)
  • Sequence Number: 4-byte little-endian unsigned integer (distinct per topic)
All transaction and block hashes are in reversed byte order (same format as RPC interface and block explorers).

hashtx - Transaction Hash Notifications

Configuration

bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332

Message Format

ComponentTypeDescription
TopicString"hashtx"
Body32 bytesTransaction hash (reversed byte order)
Sequence4 bytesMessage sequence number (LE)

When Triggered

Notifies about transactions in two scenarios:
  1. When a transaction is added to the mempool
  2. When a block arrives containing the transaction
Transactions may be published multiple times: once when entering the mempool and again in each block that includes them.

Example Client (Python)

import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28332")
socket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")

while True:
    topic, body, seq = socket.recv_multipart()
    
    # Parse sequence number
    sequence = struct.unpack('<I', seq)[0]
    
    # Hash is in reversed byte order
    txid = body[::-1].hex()
    
    print(f"Transaction {txid} (seq: {sequence})")

hashblock - Block Hash Notifications

Configuration

bitcoind -zmqpubhashblock=tcp://127.0.0.1:28333

Message Format

ComponentTypeDescription
TopicString"hashblock"
Body32 bytesBlock hash (reversed byte order)
Sequence4 bytesMessage sequence number (LE)

When Triggered

Notifies when the chain tip is updated.
AssumeUTXO: When assumeutxo is active, this notification is not issued for historical blocks connected to the background validation chainstate.

Reorganization Behavior

During a blockchain reorganization:
  • Only the new tip is notified
  • Intermediate blocks are not individually announced
  • Clients must retrieve the chain from their last known block to the new tip

Example Client (Python)

import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28333")
socket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")

while True:
    topic, body, seq = socket.recv_multipart()
    
    sequence = struct.unpack('<I', seq)[0]
    block_hash = body[::-1].hex()
    
    print(f"New block {block_hash} (seq: {sequence})")

rawtx - Raw Transaction Notifications

Configuration

bitcoind -zmqpubrawtx=tcp://127.0.0.1:28334

Message Format

ComponentTypeDescription
TopicString"rawtx"
BodyVariableSerialized transaction (with witness data)
Sequence4 bytesMessage sequence number (LE)

When Triggered

Same trigger conditions as hashtx:
  1. Transaction added to mempool
  2. Block arrives containing the transaction

Example Client (Python)

import zmq
import struct
from io import BytesIO

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28334")
socket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")

while True:
    topic, body, seq = socket.recv_multipart()
    
    sequence = struct.unpack('<I', seq)[0]
    tx_hex = body.hex()
    
    print(f"Raw transaction (seq: {sequence}):")
    print(f"  Length: {len(body)} bytes")
    print(f"  Hex: {tx_hex[:80]}...")
    
    # Parse transaction using bitcoin library
    # tx = CTransaction.deserialize(BytesIO(body))

rawblock - Raw Block Notifications

Configuration

bitcoind -zmqpubrawblock=tcp://127.0.0.1:28335

Message Format

ComponentTypeDescription
TopicString"rawblock"
BodyVariableSerialized block (80-byte header + transactions)
Sequence4 bytesMessage sequence number (LE)

When Triggered

Same trigger conditions as hashblock - when the chain tip is updated.

Example Client (Python)

import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28335")
socket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")

while True:
    topic, body, seq = socket.recv_multipart()
    
    sequence = struct.unpack('<I', seq)[0]
    
    # First 80 bytes are the block header
    header = body[:80]
    block_size = len(body)
    
    print(f"Raw block (seq: {sequence}):")
    print(f"  Size: {block_size} bytes")
    print(f"  Header: {header.hex()}")

sequence - Mempool and Block Sequence

Configuration

bitcoind -zmqpubsequence=tcp://127.0.0.1:28336

Message Format

The sequence topic provides a total ordering of all mempool and block events.

Block Connect/Disconnect

┌──────────┬────────────────┬────────┬──────────────────┐
│ "sequence" │ 32-byte hash │  'C/D' │ 4-byte seq num   │
└──────────┴────────────────┴────────┴──────────────────┘
ComponentTypeDescription
TopicString"sequence"
Body33 bytesHash (32 bytes) + Label (1 byte: ‘C’ or ‘D’)
Sequence4 bytesMessage sequence number (LE)
  • ‘C’: Block with this hash connected
  • ‘D’: Block with this hash disconnected

Mempool Add/Remove

┌──────────┬────────────────┬────────┬──────────────────┬──────────────────┐
│ "sequence" │ 32-byte hash │  'A/R' │ 8-byte mempool   │ 4-byte seq num   │
│          │              │        │ sequence (LE)    │                  │
└──────────┴────────────────┴────────┴──────────────────┴──────────────────┘
ComponentTypeDescription
TopicString"sequence"
Body41 bytesHash (32) + Label (1) + Mempool seq (8)
Sequence4 bytesMessage sequence number (LE)
  • ‘A’: Transaction with this hash added to mempool
  • ‘R’: Transaction with this hash removed from mempool (non-block reason)

Mempool Sequence Number

The 8-byte mempool sequence number provides total ordering of mempool events. This is separate from the ZMQ message sequence number.
Use the sequence topic when you need complete visibility into block reorganizations and mempool changes.

Example Client (Python)

import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:28336")
socket.setsockopt_string(zmq.SUBSCRIBE, "sequence")

while True:
    topic, body, seq = socket.recv_multipart()
    
    sequence = struct.unpack('<I', seq)[0]
    
    # Parse hash (reversed byte order)
    hash_hex = body[:32][::-1].hex()
    
    # Parse label
    label = chr(body[32])
    
    # Parse mempool sequence if present (tx events only)
    mempool_seq = None
    if len(body) == 41:  # Hash + label + mempool sequence
        mempool_seq = struct.unpack('<Q', body[33:41])[0]
    
    event_type = {
        'C': 'Block Connected',
        'D': 'Block Disconnected',
        'A': 'TX Added to Mempool',
        'R': 'TX Removed from Mempool'
    }.get(label, 'Unknown')
    
    print(f"[{sequence}] {event_type}: {hash_hex}", end="")
    if mempool_seq is not None:
        print(f" (mempool seq: {mempool_seq})")
    else:
        print()

Multi-Topic Subscriber

Subscribe to multiple topics on a single socket:
import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.SUB)

# Connect to shared endpoint
socket.connect("tcp://127.0.0.1:28332")

# Subscribe to multiple topics
socket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
socket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
socket.setsockopt_string(zmq.SUBSCRIBE, "sequence")

# Set receive high water mark (0 = unlimited)
socket.setsockopt(zmq.RCVHWM, 0)

while True:
    topic, body, seq = socket.recv_multipart()
    sequence = struct.unpack('<I', seq)[0]
    topic_str = topic.decode('utf-8')
    
    if topic_str == "hashblock":
        block_hash = body[::-1].hex()
        print(f"Block: {block_hash}")
    
    elif topic_str == "hashtx":
        txid = body[::-1].hex()
        print(f"TX: {txid}")
    
    elif topic_str == "sequence":
        hash_hex = body[:32][::-1].hex()
        label = chr(body[32])
        print(f"Sequence: {label} {hash_hex}")

Async Example (Python asyncio)

Based on Bitcoin Core’s contrib/zmq/zmq_sub.py:
import asyncio
import zmq
import zmq.asyncio
import struct

class ZMQHandler:
    def __init__(self, endpoint="tcp://127.0.0.1:28332"):
        self.loop = asyncio.get_event_loop()
        self.zmqContext = zmq.asyncio.Context()
        
        self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
        self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
        
        # Subscribe to all topics
        for topic in ["hashblock", "hashtx", "rawblock", "rawtx", "sequence"]:
            self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, topic)
        
        self.zmqSubSocket.connect(endpoint)
    
    async def handle(self):
        topic, body, seq = await self.zmqSubSocket.recv_multipart()
        
        sequence = struct.unpack('<I', seq)[0]
        topic_str = topic.decode('utf-8')
        
        if topic_str == "hashblock":
            print(f"[HASHBLOCK {sequence}] {body.hex()}")
        
        elif topic_str == "hashtx":
            print(f"[HASHTX {sequence}] {body.hex()}")
        
        elif topic_str == "rawblock":
            print(f"[RAWBLOCK {sequence}] {len(body)} bytes")
        
        elif topic_str == "rawtx":
            print(f"[RAWTX {sequence}] {body.hex()}")
        
        elif topic_str == "sequence":
            hash_hex = body[:32].hex()
            label = chr(body[32])
            mempool_seq = None
            if len(body) == 41:
                mempool_seq = struct.unpack("<Q", body[33:41])[0]
            print(f"[SEQUENCE {sequence}] {label} {hash_hex} {mempool_seq}")
        
        # Schedule next message
        asyncio.ensure_future(self.handle())
    
    def start(self):
        self.loop.create_task(self.handle())
        self.loop.run_forever()

# Run the handler
handler = ZMQHandler()
handler.start()

Best Practices

Error Handling

import zmq
import struct
import time

def connect_with_retry(endpoint, max_retries=5):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    
    for attempt in range(max_retries):
        try:
            socket.connect(endpoint)
            socket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
            print(f"Connected to {endpoint}")
            return socket
        except zmq.ZMQError as e:
            print(f"Connection attempt {attempt + 1} failed: {e}")
            time.sleep(2 ** attempt)  # Exponential backoff
    
    raise Exception(f"Failed to connect after {max_retries} attempts")

Sequence Gap Detection

class SequenceTracker:
    def __init__(self):
        self.last_seq = {}
    
    def check(self, topic, seq):
        if topic in self.last_seq:
            expected = (self.last_seq[topic] + 1) % (2**32)
            if seq != expected:
                gap = (seq - expected) % (2**32)
                print(f"WARNING: Lost {gap} messages on {topic}")
                return gap
        self.last_seq[topic] = seq
        return 0

# Usage
tracker = SequenceTracker()

while True:
    topic, body, seq = socket.recv_multipart()
    sequence = struct.unpack('<I', seq)[0]
    topic_str = topic.decode('utf-8')
    
    tracker.check(topic_str, sequence)
    # Process message...

Multiple Endpoints

Configure different topics on different endpoints for load distribution:
# bitcoin.conf
zmqpubhashtx=tcp://127.0.0.1:28332
zmqpubhashblock=tcp://127.0.0.1:28333
zmqpubrawtx=tcp://127.0.0.1:28334
zmqpubrawblock=tcp://127.0.0.1:28335
zmqpubsequence=tcp://127.0.0.1:28336

ZMQ Overview

Learn about the ZMQ notification system architecture

RPC Reference

Explore Bitcoin Core’s JSON-RPC interface