FABRKNT
Building with the Stack — Real-World Rust EVM Apps
Application Patterns
Lesson 4 of 11·CONTENT40 min70 XP

Treat this page as a workbench, not a blog post. The goal is to extract a reusable mental model from the source and carry it into the rest of the Fabrknt stack.

Course
Building with the Stack — Real-World Rust EVM Apps
Lesson role
CONTENT
Sequence
4 / 11

Lab 3 — Build a Custom RPC Endpoint on Reth

Question

You need an RPC method that doesn't exist in standard Ethereum — e.g. tempo_getPaymentReceipts(merchant: Address). Reth makes adding one a 50-line drop-in. Register an RPC trait, implement the method, the node serves it.

Principle (minimum model)

  • Reth RPC trait registration. #[rpc(server)] macro on a trait → generates the RPC server boilerplate. impl YourRpcServer for YourImpl { ... } provides the body.
  • NodeBuilder integration. NodeBuilder::extend_rpc_modules(|modules| modules.merge_configured(your_rpc.into_rpc())?) registers the custom RPC at node startup.
  • Access to state inside the RPC. Inject Arc<dyn StateProvider> into your impl. Use provider.account_basic(addr)? / provider.storage(addr, slot)? to read state.
  • Async vs sync RPC methods. #[rpc(server)] supports both. Async methods use Box<dyn Future> under the hood; sync methods are direct calls.
  • Test the RPC over the wire. Spin up the node in a test, call the RPC via reqwest::post(url).json(&body), assert the response.
  • Common custom RPCs. Payment receipt fetcher (Tempo) / MEV-bundle simulator (Flashbots) / lending protocol-specific queries / app-chain-specific batched reads.

Worked example + steps

Build a Custom RPC Endpoint on Reth

You need a single API call that returns a histogram of pending tx gas prices for your fee-bidding bot. The standard txpool_content returns every pending tx in full — hundreds of KB you'd reduce to 10 numbers anyway. The right move: add a server-side method that does the aggregation inside the node and ships back the histogram. ~50 lines of Rust. No Reth fork. Live on the same HTTP / WebSocket / IPC endpoints as the native namespaces (eth_*, net_*, debug_*, txpool_*, …).

📌 Scope honesty. We add one read-only method (txpoolPlus_pendingByGasBucket) that aggregates the local mempool into 10 gas-price buckets. We don't cover authentication, rate-limiting, or write methods — those are the same patterns layered on top. The architecture lesson is "how does the trait get wired in?"

📚 See also. QuickNode's How to Build Custom RPC Methods with Reth covers the foundation of registering a custom RPC trait. We build on top of it here with server-side aggregation, a subscription variant, and the production gaps a real custom RPC has to close.

Acceptance criteria

The lesson is complete when these tests pass (full code at the end in §Test gate):

  1. returns_buckets_for_known_state — boot the node in-process, seed the mempool with fixture txs, call txpoolPlus_pendingByGasBucket over HTTP, assert bucket count and total tx count.
  2. rejects_invalid_bucket_count — bad parameter returns the right JSON-RPC error code (-32602 Invalid params, not -32603 Internal error).
  3. subscription_does_not_leak_on_disconnect — open a subscription, drop the client, assert the spawned task exits.

Test-first reading. The walkthrough below shows the trait registration, parameter handling, and subscription pattern you'll exercise in these tests.

What you'll build

A new RPC method, callable like any other:

$ cast rpc txpoolPlus_pendingByGasBucket
[
  {"min_gwei": 0,   "max_gwei": 1,   "count": 12},
  {"min_gwei": 1,   "max_gwei": 5,   "count": 47},
  {"min_gwei": 5,   "max_gwei": 10,  "count": 89},
  {"min_gwei": 10,  "max_gwei": 20,  "count": 134},
  {"min_gwei": 20,  "max_gwei": 30,  "count": 56},
  {"min_gwei": 30,  "max_gwei": 50,  "count": 21},
  {"min_gwei": 50,  "max_gwei": 100, "count": 8},
  {"min_gwei": 100, "max_gwei": 250, "count": 2},
  {"min_gwei": 250, "max_gwei": 500, "count": 0},
  {"min_gwei": 500, "max_gwei": 0,   "count": 1}
]

Useful for: gas-price oracles, dashboard widgets, fee-bidding strategies, MEV searchers (bid above the 90th percentile of pending priority fees).

flowchart LR
    Client["RPC client<br/>(cast / dapp / dashboard)"] -->|JSON-RPC POST| Handler["jsonrpsee handler"]
    Handler -->|read snapshot| Pool["TransactionPool<br/>(in-process)"]
    Pool -->|all_transactions| Bucket["Bucket math<br/>(10 ranges)"]
    Bucket -->|JSON| Client

Why a custom RPC, not a workaround

ApproachLatencyPayloadEffort
Call txpool_content and aggregate client-sideRPC roundtrip + transfer all txshundreds of KBtrivial
Run an external indexer subscribed to mempoolµs per query (in-mem)smalldays of glue + ops
Custom RPC methodµs (in-process snapshot)bytes~50 lines once

The custom RPC sits in the sweet spot: latency of an indexer, payload of an aggregation, effort of a couple of pages of Rust. And it ships as part of the node — no extra service, no separate deployment, no port to expose.

Cargo.toml

[package]
name = "txpool-plus"
version = "0.1.0"
edition = "2021"

[dependencies]
# Reth — pin to a tag in production
reth                = { git = "https://github.com/paradigmxyz/reth", tag = "v1.5.0" }
reth-ethereum       = { git = "https://github.com/paradigmxyz/reth", tag = "v1.5.0" }

# jsonrpsee — Reth uses this RPC framework end-to-end
jsonrpsee           = { version = "0.24", features = ["server", "macros"] }

# CLI flag wiring
clap                = { version = "4", features = ["derive"] }

# Plumbing
serde               = { version = "1", features = ["derive"] }
tokio               = { version = "1", features = ["macros", "rt-multi-thread"] }

Reth's RPC stack is built on jsonrpsee. Your custom methods live in the same process, share the same listeners, and use the same auth as the natives. Don't try to start a parallel jsonrpsee server — let extend_rpc_modules register into Reth's existing one.

Step 1: Define the RPC trait

jsonrpsee uses a procedural macro to generate the RPC plumbing from a trait. You write the trait shape; it derives the server stub, the client stub, and the JSON serialization:

use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::Serialize;

#[derive(Debug, Clone, Serialize)]
pub struct GasBucket {
    pub min_gwei: u64,
    pub max_gwei: u64,  // 0 means "open-ended" for the top bucket
    pub count: usize,
}

#[rpc(server, namespace = "txpoolPlus")]
pub trait TxpoolPlusApi {
    #[method(name = "pendingByGasBucket")]
    fn pending_by_gas_bucket(&self) -> RpcResult<Vec<GasBucket>>;
}

Walk:

  • #[rpc(server, namespace = "txpoolPlus")] — the macro generates a TxpoolPlusApiServer trait you implement. The namespace becomes the JSON-RPC method prefix; combined with #[method(name = "pendingByGasBucket")], the wire-level method name is txpoolPlus_pendingByGasBucket.
  • RpcResult<T>jsonrpsee's alias for Result<T, ErrorObjectOwned>. Errors flow back to the client as proper JSON-RPC error objects with codes; you don't write the serialization yourself.
  • Serialize on the return type — that's all that's needed. GasBucket becomes a JSON object with min_gwei, max_gwei, count fields. Snake-case → camel-case mapping is configurable; we keep snake here for clarity.

🔍 Find in repo. Open reth-rpc-api — every native namespace (EthApi, DebugApi, TraceApi, TxpoolApi, …) is a trait declared with this exact #[rpc(...)] pattern. Your custom trait is structurally identical to the natives. That's not a coincidence; it's the contract.

Step 2: Implement with pool access

The trait derived a TxpoolPlusApiServer for us. We implement it on a struct that holds a handle to the transaction pool:

use reth_ethereum::pool::{PoolTransaction, TransactionPool};

pub struct TxpoolPlus<Pool> {
    pool: Pool,
}

const BUCKETS: &[(u64, u64)] = &[
    (0, 1), (1, 5), (5, 10), (10, 20), (20, 30),
    (30, 50), (50, 100), (100, 250), (250, 500), (500, 0),
];

impl<Pool> TxpoolPlusApiServer for TxpoolPlus<Pool>
where
    Pool: TransactionPool + Clone + 'static,
{
    fn pending_by_gas_bucket(&self) -> RpcResult<Vec<GasBucket>> {
        let mut counts = vec![0usize; BUCKETS.len()];

        // pending() returns a snapshot iterator; cheap to call.
        for tx in self.pool.pending() {
            let max_priority_fee_wei = tx.max_priority_fee_per_gas().unwrap_or(0);
            let gwei = (max_priority_fee_wei / 1_000_000_000) as u64;

            for (i, &(min, max)) in BUCKETS.iter().enumerate() {
                let upper_match = max == 0 || gwei < max;
                if gwei >= min && upper_match {
                    counts[i] += 1;
                    break;
                }
            }
        }

        Ok(BUCKETS
            .iter()
            .zip(counts)
            .map(|(&(min, max), count)| GasBucket { min_gwei: min, max_gwei: max, count })
            .collect())
    }
}

Walk:

  • Pool: TransactionPool — the trait bound. TransactionPool is the abstraction Reth uses everywhere; the concrete type is decided by the node builder. We don't hardcode EthPool or BasicPool — generic so this same code works on a vanilla mainnet node, an op-reth L2 node, or a custom App-chain.
  • pool.pending() — returns a snapshot of currently-pending txs without locking the pool against new inserts. Production-grade.
  • max_priority_fee_per_gas — what we bucket on. (Real searchers also factor in base fee; for clarity we use just the priority fee.)
  • The inner loop is O(buckets * pending) — fine for typical pool sizes (~10K). For 100K+ pending pools, switch to binary search on the bucket array.

Step 3: Wire into NodeBuilder

This is the integration point. The node builder exposes extend_rpc_modules which gives you a context (pool, provider, network handle, …) and a mutable handle to the modules registry:

use clap::Parser;
use reth_ethereum::{
    cli::{chainspec::EthereumChainSpecParser, interface::Cli},
    node::EthereumNode,
};

#[derive(Debug, Clone, Copy, Default, clap::Args)]
struct Args {
    /// Enable the txpoolPlus extension
    #[arg(long)]
    enable_txpool_plus: bool,
}

fn main() {
    Cli::<EthereumChainSpecParser, Args>::parse()
        .run(async move |builder, args| {
            let handle = builder
                .node(EthereumNode::default())
                .extend_rpc_modules(move |ctx| {
                    if !args.enable_txpool_plus {
                        return Ok(());
                    }
                    let ext = TxpoolPlus { pool: ctx.pool().clone() };
                    ctx.modules.merge_configured(ext.into_rpc())?;
                    println!("txpoolPlus_pendingByGasBucket enabled");
                    Ok(())
                })
                .launch_with_debug_capabilities()
                .await?;
            handle.wait_for_node_exit().await
        })
        .unwrap();
}

Walk:

  • Cli<...>::parse() — Reth's CLI machinery. The second generic parameter is your custom args struct, merged into the standard Reth CLI flags. reth node --enable-txpool-plus --http works.
  • extend_rpc_modules(|ctx| { ... }) — the closure runs once at startup, after the node is built but before the RPC server starts. ctx exposes pool(), provider(), network(), tasks() — every component the RPC handler might need.
  • ctx.modules.merge_configured(ext.into_rpc())into_rpc() is the method the #[rpc] macro generated; it produces an RpcModule. merge_configured slots it into Reth's existing dispatch table for all configured transports (HTTP if --http, WS if --ws, IPC if --ipc). One line, three transports.

🔍 Find in repo. Open reth/examples/node-custom-rpc — the official Paradigm example. It uses the exact same shape. Compare it side by side to what we wrote. The structural skeleton is identical; the only differences are namespace, method names, and what we do inside the handler.

Step 4: Test with cast

Build, run, query:

# In one terminal: run the node
$ cargo run --release -- node --http --enable-txpool-plus

# In another: hit the new method
$ cast rpc txpoolPlus_pendingByGasBucket --rpc-url http://localhost:8545
[{"min_gwei":0,"max_gwei":1,"count":12}, ...]

# Or via raw curl
$ curl -X POST http://localhost:8545 \
    -H "Content-Type: application/json" \
    -d '{"jsonrpc":"2.0","method":"txpoolPlus_pendingByGasBucket","params":[],"id":1}'
{"jsonrpc":"2.0","result":[{"min_gwei":0,"max_gwei":1,"count":12}, ...],"id":1}

The method is now indistinguishable from a native one to any RPC client. Same auth, same rate-limit (if you've configured one), same logging. That's the contract extend_rpc_modules gives you.

Step 5 (bonus): Add a subscription variant

WebSocket subscriptions follow the same pattern, just with a #[subscription(...)] attribute:

use jsonrpsee::{core::SubscriptionResult, PendingSubscriptionSink, SubscriptionMessage};
use std::time::Duration;
use tokio::time::sleep;

#[rpc(server, namespace = "txpoolPlus")]
pub trait TxpoolPlusApi {
    #[method(name = "pendingByGasBucket")]
    fn pending_by_gas_bucket(&self) -> RpcResult<Vec<GasBucket>>;

    #[subscription(name = "subscribeBuckets", item = Vec<GasBucket>)]
    fn subscribe_buckets(&self, interval_secs: Option<u64>) -> SubscriptionResult;
}

// In the impl:
fn subscribe_buckets(
    &self,
    pending: PendingSubscriptionSink,
    interval_secs: Option<u64>,
) -> SubscriptionResult {
    let pool = self.pool.clone();
    let interval = Duration::from_secs(interval_secs.unwrap_or(5));

    tokio::spawn(async move {
        let Ok(sink) = pending.accept().await else { return };
        loop {
            sleep(interval).await;
            let buckets = compute_buckets(&pool); // factored out from Step 2
            let Ok(raw) = serde_json::value::to_raw_value(&buckets) else { continue };
            if sink.send(SubscriptionMessage::from(raw)).await.is_err() { return; }
        }
    });
    Ok(())
}

Walk:

  • PendingSubscriptionSinkaccept().awaitsink.send(...) — the standard jsonrpsee subscription handshake.
  • The closure runs in a tokio::spawn — the RPC handler returns immediately; the actual streaming happens in a background task. If you blocked here, the RPC server thread would stall.
  • sink.send(...).is_err() — the client disconnected or the channel is full; we return cleanly and the task exits. No subscription leak.

Now your dashboard can eth_subscribe("txpoolPlus_subscribeBuckets", [10]) and get a live histogram every 10 seconds, server-side aggregated.

What's missing for production

GapWhat real custom RPCs do
AuthThe same AUTH_SECRET mechanism as the engine API; Reth wires this through automatically when you extend_rpc_modules, but you should verify your method respects it (most ctx accessors do).
Rate limitingReth doesn't ship a per-method rate limiter; production wraps the handler in tower middleware or rejects above a threshold inside the impl.
Per-client stateSubscriptions are per-connection by default. Cross-client coordination (e.g., shared cache invalidation) requires Arc<RwLock<...>> inside the impl struct.
Self-hosted Reth opsIf you don't want to run Reth yourself, QuickNode Dedicated Clusters let you pick Reth as the execution client and ship your custom-RPC binary as the value-add.
VersioningBump the namespace (txpoolPlus_v2_*) when the response shape changes; old clients should keep working.
MetricsReth's RPC layer exposes per-method latency / count via the metrics endpoint, but only for natives. Add your own metrics::counter!(...) calls inside your handler.
Argument validationRpcResult lets you return ErrorObjectOwned::owned(code, message, data) cleanly. Pick stable codes; don't reuse standard JSON-RPC error codes (-32603 is "internal error", reserved).

The architecture you wrote — define trait, impl with component access, register via extend_rpc_modulesis what every custom Reth RPC in production looks like. The 50-line skeleton is the same; the impl body is where each project's value lives.

Drill

  1. Add pendingByNonce(address). A second method that returns the count of currently-pending txs from a given address grouped by nonce. Pattern: same trait, second #[method], second handler. (15 min)
  2. Bucket by gas price (post-EIP-1559). Replace priority-fee bucketing with effective-gas-price bucketing (base_fee + priority_fee, capped at max_fee_per_gas). Need to fetch base fee from the provider. What does ctx expose to get it? (30 min)
  3. Auth-gate the method. Make txpoolPlus_pendingByGasBucket reject calls that don't present the engine AUTH_SECRET. (Hint: look at how Reth's debug methods do this.) (45 min)
  4. Snapshot freshness. Add a per-snapshot timestamp + monotonic block height to the response. ctx.provider().best_block_number() is the second source of truth. (30 min)
  5. Cross-tier integration. The MEV searcher in lesson 1 of this tier could query txpoolPlus_pendingByGasBucket to set its own bid above the 90th percentile. Add a Rust client that does exactly that, using jsonrpsee::http_client. (2 hours)

Finish drill 5 and you've closed the loop: a node that exposes node-only insight as a typed RPC, consumed by a separate Rust process that uses that insight to compete in the mempool. That round trip — observability via custom RPC, behavior via a separate consumer — is how real searcher / market-maker stacks are organized.

Test gate

Per Test gate — every app in this tier ships with passing tests, this lesson's minimum gate is in-process integration tests that boot the node with your custom RPC registered, hit the new methods over HTTP, and assert the JSON shape — including error paths.

A custom RPC that "looks correct" in dev breaks in three ways in production: bad parameter handling returns the wrong error code, the subscription leaks tasks on disconnect, the auth-gate is missing on a method that should require it. Each is testable in seconds.

// tests/rpc_integration.rs
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder};
use jsonrpsee::rpc_params;

#[tokio::test]
async fn returns_buckets_for_known_state() {
    let node = boot_node_with_extension().await;       // spins up Reth in-process
    let client = HttpClientBuilder::default().build(node.http_url()).unwrap();

    seed_mempool(&node, &TEST_TX_FIXTURES).await;
    let buckets: Vec<Bucket> = client
        .request("txpoolPlus_pendingByGasBucket", rpc_params![10u32])
        .await.unwrap();

    assert_eq!(buckets.len(), 10);
    assert_eq!(buckets.iter().map(|b| b.count).sum::<u64>(), TEST_TX_FIXTURES.len() as u64);
}

#[tokio::test]
async fn rejects_invalid_bucket_count() {
    let node = boot_node_with_extension().await;
    let client = HttpClientBuilder::default().build(node.http_url()).unwrap();

    let err = client.request::<Vec<Bucket>, _>("txpoolPlus_pendingByGasBucket", rpc_params![0u32])
        .await.unwrap_err();
    assert_jsonrpc_error_code(&err, -32602);            // Invalid params, not -32603 (internal)
}

#[tokio::test]
async fn subscription_does_not_leak_on_disconnect() {
    // Open subscription, drop the client, assert the spawned task exits
    // (use a Drop probe inside the closure to verify)
}

The lesson is not complete until: (1) the success path test passes, (2) at least one error-code test passes (wrong type, out-of-range, missing param), (3) the subscription cleanup test passes. cargo run against mainnet doesn't substitute for any of these.

🧭 Where you are now in the stack: you've shipped a networking-layer server-side extension — jsonrpsee aggregation + subscription wired to a Reth pool stream, locked in with in-process integration + error-code + subscription-leak tests. Same shape GraphQL custom resolvers and Postgres stored procedures solve, applied to Reth RPC. Next lesson moves to the concurrency + state-management layer: a wallet backend.

Summary (3 lines)

  • Custom RPC on Reth = #[rpc(server)] macro + NodeBuilder::extend_rpc_modules. ~50 lines for a new method.
  • State access via injected Arc<dyn StateProvider>. Async or sync methods both supported.
  • Test over the wire via reqwest. Common custom RPCs: payment receipts (Tempo), MEV simulator, lending queries. Next: wallet backend.