Lab 3 — Build a Custom RPC Endpoint on Reth
Question
You need an RPC method that doesn't exist in standard Ethereum — e.g. tempo_getPaymentReceipts(merchant: Address). Reth makes adding one a 50-line drop-in. Register an RPC trait, implement the method, the node serves it.
Principle (minimum model)
- Reth RPC trait registration.
#[rpc(server)]macro on a trait → generates the RPC server boilerplate.impl YourRpcServer for YourImpl { ... }provides the body. NodeBuilderintegration.NodeBuilder::extend_rpc_modules(|modules| modules.merge_configured(your_rpc.into_rpc())?)registers the custom RPC at node startup.- Access to state inside the RPC. Inject
Arc<dyn StateProvider>into your impl. Useprovider.account_basic(addr)?/provider.storage(addr, slot)?to read state. - Async vs sync RPC methods.
#[rpc(server)]supports both. Async methods useBox<dyn Future>under the hood; sync methods are direct calls. - Test the RPC over the wire. Spin up the node in a test, call the RPC via
reqwest::post(url).json(&body), assert the response. - Common custom RPCs. Payment receipt fetcher (Tempo) / MEV-bundle simulator (Flashbots) / lending protocol-specific queries / app-chain-specific batched reads.
Worked example + steps
Build a Custom RPC Endpoint on Reth
You need a single API call that returns a histogram of pending tx gas prices for your fee-bidding bot. The standard txpool_content returns every pending tx in full — hundreds of KB you'd reduce to 10 numbers anyway. The right move: add a server-side method that does the aggregation inside the node and ships back the histogram. ~50 lines of Rust. No Reth fork. Live on the same HTTP / WebSocket / IPC endpoints as the native namespaces (eth_*, net_*, debug_*, txpool_*, …).
📌 Scope honesty. We add one read-only method (
txpoolPlus_pendingByGasBucket) that aggregates the local mempool into 10 gas-price buckets. We don't cover authentication, rate-limiting, or write methods — those are the same patterns layered on top. The architecture lesson is "how does the trait get wired in?"
📚 See also. QuickNode's How to Build Custom RPC Methods with Reth covers the foundation of registering a custom RPC trait. We build on top of it here with server-side aggregation, a subscription variant, and the production gaps a real custom RPC has to close.
Acceptance criteria
The lesson is complete when these tests pass (full code at the end in §Test gate):
returns_buckets_for_known_state— boot the node in-process, seed the mempool with fixture txs, calltxpoolPlus_pendingByGasBucketover HTTP, assert bucket count and total tx count.rejects_invalid_bucket_count— bad parameter returns the right JSON-RPC error code (-32602Invalid params, not-32603Internal error).subscription_does_not_leak_on_disconnect— open a subscription, drop the client, assert the spawned task exits.
Test-first reading. The walkthrough below shows the trait registration, parameter handling, and subscription pattern you'll exercise in these tests.
What you'll build
A new RPC method, callable like any other:
$ cast rpc txpoolPlus_pendingByGasBucket
[
{"min_gwei": 0, "max_gwei": 1, "count": 12},
{"min_gwei": 1, "max_gwei": 5, "count": 47},
{"min_gwei": 5, "max_gwei": 10, "count": 89},
{"min_gwei": 10, "max_gwei": 20, "count": 134},
{"min_gwei": 20, "max_gwei": 30, "count": 56},
{"min_gwei": 30, "max_gwei": 50, "count": 21},
{"min_gwei": 50, "max_gwei": 100, "count": 8},
{"min_gwei": 100, "max_gwei": 250, "count": 2},
{"min_gwei": 250, "max_gwei": 500, "count": 0},
{"min_gwei": 500, "max_gwei": 0, "count": 1}
]
Useful for: gas-price oracles, dashboard widgets, fee-bidding strategies, MEV searchers (bid above the 90th percentile of pending priority fees).
flowchart LR
Client["RPC client<br/>(cast / dapp / dashboard)"] -->|JSON-RPC POST| Handler["jsonrpsee handler"]
Handler -->|read snapshot| Pool["TransactionPool<br/>(in-process)"]
Pool -->|all_transactions| Bucket["Bucket math<br/>(10 ranges)"]
Bucket -->|JSON| Client
Why a custom RPC, not a workaround
| Approach | Latency | Payload | Effort |
|---|---|---|---|
Call txpool_content and aggregate client-side | RPC roundtrip + transfer all txs | hundreds of KB | trivial |
| Run an external indexer subscribed to mempool | µs per query (in-mem) | small | days of glue + ops |
| Custom RPC method | µs (in-process snapshot) | bytes | ~50 lines once |
The custom RPC sits in the sweet spot: latency of an indexer, payload of an aggregation, effort of a couple of pages of Rust. And it ships as part of the node — no extra service, no separate deployment, no port to expose.
Cargo.toml
[package]
name = "txpool-plus"
version = "0.1.0"
edition = "2021"
[dependencies]
# Reth — pin to a tag in production
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.5.0" }
reth-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.5.0" }
# jsonrpsee — Reth uses this RPC framework end-to-end
jsonrpsee = { version = "0.24", features = ["server", "macros"] }
# CLI flag wiring
clap = { version = "4", features = ["derive"] }
# Plumbing
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Reth's RPC stack is built on
jsonrpsee. Your custom methods live in the same process, share the same listeners, and use the same auth as the natives. Don't try to start a paralleljsonrpseeserver — letextend_rpc_modulesregister into Reth's existing one.
Step 1: Define the RPC trait
jsonrpsee uses a procedural macro to generate the RPC plumbing from a trait. You write the trait shape; it derives the server stub, the client stub, and the JSON serialization:
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
pub struct GasBucket {
pub min_gwei: u64,
pub max_gwei: u64, // 0 means "open-ended" for the top bucket
pub count: usize,
}
#[rpc(server, namespace = "txpoolPlus")]
pub trait TxpoolPlusApi {
#[method(name = "pendingByGasBucket")]
fn pending_by_gas_bucket(&self) -> RpcResult<Vec<GasBucket>>;
}
Walk:
#[rpc(server, namespace = "txpoolPlus")]— the macro generates aTxpoolPlusApiServertrait you implement. Thenamespacebecomes the JSON-RPC method prefix; combined with#[method(name = "pendingByGasBucket")], the wire-level method name istxpoolPlus_pendingByGasBucket.RpcResult<T>—jsonrpsee's alias forResult<T, ErrorObjectOwned>. Errors flow back to the client as proper JSON-RPC error objects with codes; you don't write the serialization yourself.Serializeon the return type — that's all that's needed.GasBucketbecomes a JSON object withmin_gwei,max_gwei,countfields. Snake-case → camel-case mapping is configurable; we keep snake here for clarity.
🔍 Find in repo. Open
reth-rpc-api— every native namespace (EthApi,DebugApi,TraceApi,TxpoolApi, …) is a trait declared with this exact#[rpc(...)]pattern. Your custom trait is structurally identical to the natives. That's not a coincidence; it's the contract.
Step 2: Implement with pool access
The trait derived a TxpoolPlusApiServer for us. We implement it on a struct that holds a handle to the transaction pool:
use reth_ethereum::pool::{PoolTransaction, TransactionPool};
pub struct TxpoolPlus<Pool> {
pool: Pool,
}
const BUCKETS: &[(u64, u64)] = &[
(0, 1), (1, 5), (5, 10), (10, 20), (20, 30),
(30, 50), (50, 100), (100, 250), (250, 500), (500, 0),
];
impl<Pool> TxpoolPlusApiServer for TxpoolPlus<Pool>
where
Pool: TransactionPool + Clone + 'static,
{
fn pending_by_gas_bucket(&self) -> RpcResult<Vec<GasBucket>> {
let mut counts = vec![0usize; BUCKETS.len()];
// pending() returns a snapshot iterator; cheap to call.
for tx in self.pool.pending() {
let max_priority_fee_wei = tx.max_priority_fee_per_gas().unwrap_or(0);
let gwei = (max_priority_fee_wei / 1_000_000_000) as u64;
for (i, &(min, max)) in BUCKETS.iter().enumerate() {
let upper_match = max == 0 || gwei < max;
if gwei >= min && upper_match {
counts[i] += 1;
break;
}
}
}
Ok(BUCKETS
.iter()
.zip(counts)
.map(|(&(min, max), count)| GasBucket { min_gwei: min, max_gwei: max, count })
.collect())
}
}
Walk:
Pool: TransactionPool— the trait bound.TransactionPoolis the abstraction Reth uses everywhere; the concrete type is decided by the node builder. We don't hardcodeEthPoolorBasicPool— generic so this same code works on a vanilla mainnet node, an op-reth L2 node, or a custom App-chain.pool.pending()— returns a snapshot of currently-pending txs without locking the pool against new inserts. Production-grade.max_priority_fee_per_gas— what we bucket on. (Real searchers also factor in base fee; for clarity we use just the priority fee.)- The inner loop is
O(buckets * pending)— fine for typical pool sizes (~10K). For 100K+ pending pools, switch to binary search on the bucket array.
Step 3: Wire into NodeBuilder
This is the integration point. The node builder exposes extend_rpc_modules which gives you a context (pool, provider, network handle, …) and a mutable handle to the modules registry:
use clap::Parser;
use reth_ethereum::{
cli::{chainspec::EthereumChainSpecParser, interface::Cli},
node::EthereumNode,
};
#[derive(Debug, Clone, Copy, Default, clap::Args)]
struct Args {
/// Enable the txpoolPlus extension
#[arg(long)]
enable_txpool_plus: bool,
}
fn main() {
Cli::<EthereumChainSpecParser, Args>::parse()
.run(async move |builder, args| {
let handle = builder
.node(EthereumNode::default())
.extend_rpc_modules(move |ctx| {
if !args.enable_txpool_plus {
return Ok(());
}
let ext = TxpoolPlus { pool: ctx.pool().clone() };
ctx.modules.merge_configured(ext.into_rpc())?;
println!("txpoolPlus_pendingByGasBucket enabled");
Ok(())
})
.launch_with_debug_capabilities()
.await?;
handle.wait_for_node_exit().await
})
.unwrap();
}
Walk:
Cli<...>::parse()— Reth's CLI machinery. The second generic parameter is your custom args struct, merged into the standard Reth CLI flags.reth node --enable-txpool-plus --httpworks.extend_rpc_modules(|ctx| { ... })— the closure runs once at startup, after the node is built but before the RPC server starts.ctxexposespool(),provider(),network(),tasks()— every component the RPC handler might need.ctx.modules.merge_configured(ext.into_rpc())—into_rpc()is the method the#[rpc]macro generated; it produces anRpcModule.merge_configuredslots it into Reth's existing dispatch table for all configured transports (HTTP if--http, WS if--ws, IPC if--ipc). One line, three transports.
🔍 Find in repo. Open
reth/examples/node-custom-rpc— the official Paradigm example. It uses the exact same shape. Compare it side by side to what we wrote. The structural skeleton is identical; the only differences are namespace, method names, and what we do inside the handler.
Step 4: Test with cast
Build, run, query:
# In one terminal: run the node
$ cargo run --release -- node --http --enable-txpool-plus
# In another: hit the new method
$ cast rpc txpoolPlus_pendingByGasBucket --rpc-url http://localhost:8545
[{"min_gwei":0,"max_gwei":1,"count":12}, ...]
# Or via raw curl
$ curl -X POST http://localhost:8545 \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"txpoolPlus_pendingByGasBucket","params":[],"id":1}'
{"jsonrpc":"2.0","result":[{"min_gwei":0,"max_gwei":1,"count":12}, ...],"id":1}
The method is now indistinguishable from a native one to any RPC client. Same auth, same rate-limit (if you've configured one), same logging. That's the contract extend_rpc_modules gives you.
Step 5 (bonus): Add a subscription variant
WebSocket subscriptions follow the same pattern, just with a #[subscription(...)] attribute:
use jsonrpsee::{core::SubscriptionResult, PendingSubscriptionSink, SubscriptionMessage};
use std::time::Duration;
use tokio::time::sleep;
#[rpc(server, namespace = "txpoolPlus")]
pub trait TxpoolPlusApi {
#[method(name = "pendingByGasBucket")]
fn pending_by_gas_bucket(&self) -> RpcResult<Vec<GasBucket>>;
#[subscription(name = "subscribeBuckets", item = Vec<GasBucket>)]
fn subscribe_buckets(&self, interval_secs: Option<u64>) -> SubscriptionResult;
}
// In the impl:
fn subscribe_buckets(
&self,
pending: PendingSubscriptionSink,
interval_secs: Option<u64>,
) -> SubscriptionResult {
let pool = self.pool.clone();
let interval = Duration::from_secs(interval_secs.unwrap_or(5));
tokio::spawn(async move {
let Ok(sink) = pending.accept().await else { return };
loop {
sleep(interval).await;
let buckets = compute_buckets(&pool); // factored out from Step 2
let Ok(raw) = serde_json::value::to_raw_value(&buckets) else { continue };
if sink.send(SubscriptionMessage::from(raw)).await.is_err() { return; }
}
});
Ok(())
}
Walk:
PendingSubscriptionSink→accept().await→sink.send(...)— the standardjsonrpseesubscription handshake.- The closure runs in a
tokio::spawn— the RPC handler returns immediately; the actual streaming happens in a background task. If you blocked here, the RPC server thread would stall. sink.send(...).is_err()— the client disconnected or the channel is full; we return cleanly and the task exits. No subscription leak.
Now your dashboard can eth_subscribe("txpoolPlus_subscribeBuckets", [10]) and get a live histogram every 10 seconds, server-side aggregated.
What's missing for production
| Gap | What real custom RPCs do |
|---|---|
| Auth | The same AUTH_SECRET mechanism as the engine API; Reth wires this through automatically when you extend_rpc_modules, but you should verify your method respects it (most ctx accessors do). |
| Rate limiting | Reth doesn't ship a per-method rate limiter; production wraps the handler in tower middleware or rejects above a threshold inside the impl. |
| Per-client state | Subscriptions are per-connection by default. Cross-client coordination (e.g., shared cache invalidation) requires Arc<RwLock<...>> inside the impl struct. |
| Self-hosted Reth ops | If you don't want to run Reth yourself, QuickNode Dedicated Clusters let you pick Reth as the execution client and ship your custom-RPC binary as the value-add. |
| Versioning | Bump the namespace (txpoolPlus_v2_*) when the response shape changes; old clients should keep working. |
| Metrics | Reth's RPC layer exposes per-method latency / count via the metrics endpoint, but only for natives. Add your own metrics::counter!(...) calls inside your handler. |
| Argument validation | RpcResult lets you return ErrorObjectOwned::owned(code, message, data) cleanly. Pick stable codes; don't reuse standard JSON-RPC error codes (-32603 is "internal error", reserved). |
The architecture you wrote — define trait, impl with component access, register via extend_rpc_modules — is what every custom Reth RPC in production looks like. The 50-line skeleton is the same; the impl body is where each project's value lives.
Drill
- Add
pendingByNonce(address). A second method that returns the count of currently-pending txs from a given address grouped by nonce. Pattern: same trait, second#[method], second handler. (15 min) - Bucket by gas price (post-EIP-1559). Replace priority-fee bucketing with effective-gas-price bucketing (
base_fee + priority_fee, capped atmax_fee_per_gas). Need to fetch base fee from the provider. What doesctxexpose to get it? (30 min) - Auth-gate the method. Make
txpoolPlus_pendingByGasBucketreject calls that don't present the engineAUTH_SECRET. (Hint: look at how Reth's debug methods do this.) (45 min) - Snapshot freshness. Add a per-snapshot timestamp + monotonic block height to the response.
ctx.provider().best_block_number()is the second source of truth. (30 min) - Cross-tier integration. The MEV searcher in lesson 1 of this tier could query
txpoolPlus_pendingByGasBucketto set its own bid above the 90th percentile. Add a Rust client that does exactly that, usingjsonrpsee::http_client. (2 hours)
Finish drill 5 and you've closed the loop: a node that exposes node-only insight as a typed RPC, consumed by a separate Rust process that uses that insight to compete in the mempool. That round trip — observability via custom RPC, behavior via a separate consumer — is how real searcher / market-maker stacks are organized.
Test gate
Per Test gate — every app in this tier ships with passing tests, this lesson's minimum gate is in-process integration tests that boot the node with your custom RPC registered, hit the new methods over HTTP, and assert the JSON shape — including error paths.
A custom RPC that "looks correct" in dev breaks in three ways in production: bad parameter handling returns the wrong error code, the subscription leaks tasks on disconnect, the auth-gate is missing on a method that should require it. Each is testable in seconds.
// tests/rpc_integration.rs
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder};
use jsonrpsee::rpc_params;
#[tokio::test]
async fn returns_buckets_for_known_state() {
let node = boot_node_with_extension().await; // spins up Reth in-process
let client = HttpClientBuilder::default().build(node.http_url()).unwrap();
seed_mempool(&node, &TEST_TX_FIXTURES).await;
let buckets: Vec<Bucket> = client
.request("txpoolPlus_pendingByGasBucket", rpc_params![10u32])
.await.unwrap();
assert_eq!(buckets.len(), 10);
assert_eq!(buckets.iter().map(|b| b.count).sum::<u64>(), TEST_TX_FIXTURES.len() as u64);
}
#[tokio::test]
async fn rejects_invalid_bucket_count() {
let node = boot_node_with_extension().await;
let client = HttpClientBuilder::default().build(node.http_url()).unwrap();
let err = client.request::<Vec<Bucket>, _>("txpoolPlus_pendingByGasBucket", rpc_params![0u32])
.await.unwrap_err();
assert_jsonrpc_error_code(&err, -32602); // Invalid params, not -32603 (internal)
}
#[tokio::test]
async fn subscription_does_not_leak_on_disconnect() {
// Open subscription, drop the client, assert the spawned task exits
// (use a Drop probe inside the closure to verify)
}
The lesson is not complete until: (1) the success path test passes, (2) at least one error-code test passes (wrong type, out-of-range, missing param), (3) the subscription cleanup test passes. cargo run against mainnet doesn't substitute for any of these.
🧭 Where you are now in the stack: you've shipped a networking-layer server-side extension — jsonrpsee aggregation + subscription wired to a Reth pool stream, locked in with in-process integration + error-code + subscription-leak tests. Same shape GraphQL custom resolvers and Postgres stored procedures solve, applied to Reth RPC. Next lesson moves to the concurrency + state-management layer: a wallet backend.
Summary (3 lines)
- Custom RPC on Reth =
#[rpc(server)]macro +NodeBuilder::extend_rpc_modules. ~50 lines for a new method. - State access via injected
Arc<dyn StateProvider>. Async or sync methods both supported. - Test over the wire via reqwest. Common custom RPCs: payment receipts (Tempo), MEV simulator, lending queries. Next: wallet backend.