FABRKNT
Build OpenHL — from `cargo init` to a single-validator devnet
Engine integration
Lesson 11 of 16·CONTENT55 min100 XP

Treat this page as a workbench, not a blog post. The goal is to extract a reusable mental model from the source and carry it into the rest of the Fabrknt stack.

Course
Build OpenHL — from `cargo init` to a single-validator devnet
Lesson role
CONTENT
Sequence
11 / 16

Lesson 10 — run_engine_app and the first block through the actor pipeline

Question

run_engine_app drives the actor pipeline end-to-end. The first block flows: Proposal → Vote → Vote (others) → Commit → Bridge::apply → state advances. End-to-end consensus working.

Principle (minimum model)

  • run_engine_app orchestrator. Spawns OpenHlNode in a tokio task + a test driver that simulates Malachite's perception of "I am the leader at height 1".
  • Actor pattern. Malachite + bridge + signing provider each run as actors with their own mpsc channels. Loose coupling; testable.
  • First block flow. Driver → "you are leader at height 1" → Malachite calls bridge.propose → Block built → Vote (single validator passes 2/3) → bridge.apply → state stored. ~5 ms.
  • Assertion: block 1 committed. Bridge.get_best_block returns the new block.
  • Assertion: state advanced. Bridge's internal HashMap has the new block hash → block mapping.
  • Multi-block test. Repeat for blocks 2, 3, 4. Asserts chain continues; parent hashes link correctly.
  • Performance. Single-validator devnet should produce one block per ~100ms (network simulation in-process). Real production is ~1s.

Worked example + steps

Lesson 10 — run_engine_app and the first block through the actor pipeline

Goal

Concepts you'll grasp in this lesson:

  • The AppMsg routing loop — Malachite's engine sends ConsensusReady / GetValidatorSet / StartedRound / GetValue / Decided / … over a single channel. The app loop is a while let Some(msg) = recv().await matching each variant and either replying via oneshot::Sender or driving the bridge. This is the only glue between Malachite and your EL.
  • Generic-over-bridge polymorphismrun_engine_app<B: ConsensusBridge> works for StubBridge, InMemoryEvmBridge, RethEvmBridge, and (eventually) LiveRethEvmBridge. One routing function, four backends. The trait surface from Lesson 3 pays off here.
  • stop_after_decisions as test ergonomics — production validators run usize::MAX. Tests pass 1. A parameter that exists only so the function is finite-state-testable is a legitimate API choice; test ergonomics deserve API surface.
  • Reply channels can close mid-flight — when an engine actor dies before we reply, the oneshot::Sender::send() errors. Logging via tracing::warn! (not propagating) is correct: propagating would mask actual errors with noise; the operator can still investigate via logs.
  • Channel vs. event-stream message flowchannels.consensus.recv() carries imperative messages that need replies; subscribe() carries broadcast notifications. The app loop only deals with the former in Lesson 10.
  • Why integration > unit tests at this layer — engine AppMsg arms arrive in a specific order. Faking that order is more work than spinning up the real engine for one block. The integration test is cheaper and proves more.

Verification:

cargo test -p openhl-consensus

…passes 21 tests (20 from Lesson 9 + 1 new integration test). The new test:

test engine_app::tests::first_block_via_engine_actors ... ok

…spawns the Malachite actor system, drives a real consensus round through it, asserts the bridge committed exactly the hash the engine decided on. Wall-clock: 0.02 seconds. This is the milestone where your code stops being "the engine boots" and becomes "the engine produces blocks."

Specific changes:

  • crates/consensus/src/engine_app.rs — new file (~282 lines). The run_engine_app<B: ConsensusBridge + 'static> loop reads AppMsg<OpenHlContext> from Channels<OpenHlContext>::consensus, dispatches 12 message arms (5 substantive + 7 trivial), and returns the list of decided hashes.
  • StubBridge test fixture + the first_block_via_engine_actors integration test live in the same file.
  • crates/consensus/src/lib.rs — wires pub mod engine_app;.

Recap

After Lesson 9 your openhl-consensus crate has:

crates/consensus/src/lib.rs               — pub mod bridge, codec, context, node, signing, signing_provider, types
crates/consensus/src/node.rs              — OpenHlNode + start_engine works (smoke test passes)
crates/consensus/src/codec.rs             — OpenHlCodec
crates/consensus/src/signing_provider.rs  — SigningProvider impl
crates/consensus/src/context.rs           — Context impl
crates/consensus/src/types/               — 7 type files
crates/consensus/src/bridge.rs            — ConsensusBridge trait + InMemoryEvmBridge

cargo test -p openhl-consensus passes 20 tests. The engine boots and tears down cleanly — but it's silent. Once start_engine returns, the engine's actors immediately start sending AppMsg::ConsensusReady and waiting for a reply. Nothing replies. The actors park. Lesson 10 fixes that.

Plan

Five things:

  1. Add tracing to crates/consensus/Cargo.toml — used by the tracing::warn! calls in the loop's "channel-closed" paths.
  2. Create crates/consensus/src/engine_app.rs with the run_engine_app<B> async function generic over B: ConsensusBridge, plus a default_attrs() helper. About 130 lines of routing logic.
  3. Wire pub mod engine_app; into lib.rs.
  4. Add the integration test first_block_via_engine_actors plus a StubBridge test fixture that impls ConsensusBridge synchronously in memory.
  5. Run cargo test -p openhl-consensus first_block_via_engine_actors — passes in ~0.02 seconds. Stare at it.

This lesson teaches the actor-message-loop pattern. Most consensus engines (CometBFT, Hotstuff, Aura) have some "application interface" but they vary: callbacks, gRPC services, FFI bindings. Malachite's approach is tokio::mpsc channels of typed messages — strongly typed, async-native, single-threaded per channel. Your run_engine_app is the consumer of those messages; the engine actors are the producer. Once you understand this pattern, every chain framework's "application interface" reduces to a variant of it.

Walk-through

Step 1: Add tracing to Cargo.toml

Open crates/consensus/Cargo.toml. After Lesson 9 the [dependencies] section ends with:

sha2                                          = "0.10"
serde                                         = { workspace = true }
tokio                                         = { workspace = true }

Add one line:

tracing                                       = { workspace = true }

tracing is the workspace standard logging crate — we'll use only tracing::warn! here, for one specific case: when a reply channel is closed because the engine has terminated mid-conversation. Closed reply channels in tokio::mpsc::oneshot aren't bugs in our code; they're a sign that something upstream gave up. We log them but don't propagate.

Step 2: Create crates/consensus/src/engine_app.rs — imports and signature

Start with module doc + imports:

//! Engine app loop — consumes `AppMsg` from the Malachite engine and routes
//! every consensus-relevant event through a [`ConsensusBridge`].
//!
//! This is the missing half of Lesson 9: with `OpenHlNode::start()` spinning
//! up the actor system, this loop is what makes those actors do useful work.
//! Once a `Decided` arrives we commit through the bridge, increment height,
//! and (optionally) stop after N decisions for tests.

use std::sync::Arc;

use eyre::eyre;
use informalsystems_malachitebft_app::engine::host::Next;
use informalsystems_malachitebft_app_channel::{AppMsg, Channels};
use informalsystems_malachitebft_core_types::Height as _;
use openhl_types::{BlockHash, PayloadAttrs};

use crate::bridge::ConsensusBridge;
use crate::context::OpenHlContext;
use crate::types::{OpenHlHeight, OpenHlValidatorSet, OpenHlValue};

const APP_REPLY_WAIT_LOG: &str = "engine_app: peer replied unsuccessfully (channel closed)";

Imports of note:

  • AppMsg, Channels from app_channel — the message enum and channel-bundle type. Channels::consensus is the mpsc receiver for AppMsg<Ctx>.
  • Next from app::engine::host — the enum used in Decided's reply to tell the engine "what's next?" (start the next height, halt, etc.).
  • Height as _ — imports the trait Height for its .increment() method without bringing the name into scope (we use our OpenHlHeight newtype throughout).
  • Arcrun_engine_app takes the bridge as Arc<B> so it can clone the reference into a long-running task.

Now the function signature:

/// Drive the engine app loop until `stop_after_decisions` decisions have been
/// committed through the bridge, or the consensus channel closes.
///
/// Returns the `BlockHash`es that were decided, in order. Single-validator mode
/// uses this with `stop_after_decisions = 1` to exit after the first block.
#[allow(clippy::too_many_lines)] // 12 AppMsg arms — laid out flat for Lesson 11's match-by-match walk
pub async fn run_engine_app<B>(
    bridge: Arc<B>,
    mut channels: Channels<OpenHlContext>,
    validator_set: OpenHlValidatorSet,
    stop_after_decisions: usize,
) -> eyre::Result<Vec<BlockHash>>
where
    B: ConsensusBridge + 'static,
{
    let mut decided: Vec<BlockHash> = Vec::new();
    let mut current_parent = BlockHash([0u8; 32]);
    let mut current_height = OpenHlHeight::INITIAL;

    while let Some(msg) = channels.consensus.recv().await {
        match msg {
            // ... 12 arms come here ...
        }
    }

    Err(eyre!(
        "consensus channel closed after {n} decisions (wanted {stop_after_decisions})",
        n = decided.len()
    ))
}

Five parameters/state values worth noting:

  • bridge: Arc<B> — the ConsensusBridge implementor that the app loop calls for build_payload, payload_ready, commit. Arc because we'll later want to share it; generic over B so this same loop works for InMemoryEvmBridge, RethEvmBridge, and LiveRethEvmBridge (Lesson 12).
  • channels: Channels<OpenHlContext> — taken by value (then mut to call recv). We own the channels after take_channels() in the caller.
  • validator_set: OpenHlValidatorSet — the single-validator set we'll echo back on ConsensusReady and GetValidatorSet.
  • stop_after_decisions: usize — test ergonomics. Single-validator devnets use 1; multi-validator deployments would use usize::MAX.

Three loop-state values:

  • decided: Vec<BlockHash> — accumulator; returned at end.
  • current_parent: BlockHash — what the next block builds on top of. Starts at all-zero (genesis); becomes the just-decided hash on each commit.
  • current_height: OpenHlHeight — what height the engine is on. Starts at INITIAL; gets bumped by StartedRound and Decided.

The while let Some(msg) = channels.consensus.recv().await loop is the heart of an actor-message app: receive a message, dispatch by variant, reply (if applicable), continue. When recv() returns None, the channel is closed — that's our error path.

Before writing the 12 arms one at a time, having a single picture of what this loop is mediating makes "who sent this, who's expecting a reply" trivial to track per arm:

   [ Malachite Engine actors ]  (producer side — emits proposals, votes, Decided, etc.)
              │
              │ AppMsg::ConsensusReady { reply, validator_set }
              │ AppMsg::GetValue       { reply, height, round, ... }
              │ AppMsg::Decided        { reply, certificate, ... }
              │ … (12 variants total, all funneled into a single `tokio::mpsc` channel)
              ▼
   ┌──────────────────────────────────────────────────────────────────────────┐
   │ ◆ app_task: the run_engine_app loop  (consumer — what this lesson builds, │
   │                                       the central dispatcher)             │
   │                                                                            │
   │  while let Some(msg) = channels.consensus.recv().await { match msg { … } } │
   │                                                                            │
   │  Each arm does exactly one or two things:                                  │
   │   1. reply.send(...)        ──► unblock the engine by giving it a value    │
   │   2. bridge.<method>().await ──► drive the EL and pick up the result       │
   └──────────────────────────────────────────────────────────────────────────┘
              │                                                  ▲
              │ bridge.build_payload / payload_ready / commit    │ FillResult /
              ▼                                                  │ ExecutedBlock / Ok
   [ ConsensusBridge impl ]  (StubBridge / InMemoryEvmBridge / RethEvmBridge / LiveRethEvmBridge)


   ── One representative cycle ─────────────────────────────────────────────────

   ① ConsensusReady    ── engine ──► app: "I'm ready — hand me the validator set."
                          app ──► engine: reply.send(validator_set)

   ② GetValue          ── engine ──► app: "Propose the next block at (height, round)."
                          app ──► bridge.build_payload + payload_ready
                          app ──► engine: reply.send(LocallyProposedValue(value))

   ③ Decided           ── engine ──► app: "2/3+ committed; here's the certificate."
                          app ──► bridge.commit(hash)
                          app ──► engine: reply.send(Next::Start)  or  Next::Stop
                          decided.push(hash)
                          if decided.len() >= stop_after_decisions { return Ok(decided) }

Three things this picture pins down: (a) Messages flow engine → app one-way, but each message carries an oneshot::Sender (reply), so the engine side stays parked until the app sends the reply — forget the reply and the engine waits forever. (b) The app is a router between engine and bridge, not a logic core — the heavy lifting (build/commit) lives in the bridge, the consensus driving lives in the engine. (c) Because the bridge is B: ConsensusBridge, the exact same loop runs against StubBridge / InMemoryEvmBridge / RethEvmBridge / LiveRethEvmBridge — the investment in cleanly defining the trait surface back in Lesson 3 pays off here as polymorphism.

Step 3: The ConsensusReady and StartedRound arms

Add these inside the match:

            AppMsg::ConsensusReady { reply, .. } => {
                if reply
                    .send((current_height, validator_set.clone()))
                    .is_err()
                {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (ConsensusReady)");
                }
            }

            AppMsg::StartedRound {
                height,
                round: _,
                reply_value,
                ..
            } => {
                current_height = height;
                if reply_value.send(Vec::new()).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (StartedRound)");
                }
            }

ConsensusReady is the engine asking "are you ready for me to start consensus? at what height and with what validator set?" Our reply is the tuple (current_height, validator_set.clone()). Each reply is a oneshot::Sender<...>send() consumes it and returns Result<(), T> where T is what we tried to send (returned on error). We don't recover from a closed reply channel; we just log.

StartedRound is the engine telling us a new round began at some height. We update our current_height and reply with an empty Vec (the list of stored proposed values for this height; we have none cached). The round: _ underscore unbinds the round value because we don't need it in single-validator mode — the engine won't gossip-restream a value across rounds when there's no peer to send to.

Step 4: The GetValue arm — building a proposal

This is the load-bearing arm. Add:

            AppMsg::GetValue {
                height,
                round,
                timeout: _,
                reply,
            } => {
                let attrs = default_attrs();
                let id = bridge.build_payload(current_parent, attrs).await?;
                let block = bridge.payload_ready(id).await?;
                let value = OpenHlValue(block.hash);
                let lpv =
                    informalsystems_malachitebft_app_channel::app::types::LocallyProposedValue::new(
                        height, round, value,
                    );
                if reply.send(lpv).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (GetValue)");
                }
            }

The engine asks "propose a value for height H, round R, with timeout T." We:

  1. Build payload attrs — default values for now (timestamp: 0, fee_recipient: zero, prev_randao: zero). In Lesson 12 these'll come from the engine's notion of time + the validator's address.
  2. bridge.build_payload(current_parent, attrs).await — kicks the EL: "build me a block on top of current_parent with these attrs." Returns a PayloadId — a handle the EL uses to track the in-flight build.
  3. bridge.payload_ready(id).await — fetch the completed block. The in-memory bridge from Lessons 4–5 produces immediately; live Reth (Lesson 12 onward) might take 10-50ms.
  4. Wrap the resulting block.hash in OpenHlValue and then LocallyProposedValue::new(height, round, value).
  5. Reply to the engine with that LocallyProposedValue.

The ? operator on build_payload and payload_ready propagates BridgeError up to eyre::Result. If the EL crashes mid-build, the app loop returns an error and the test fails loudly.

Step 5: The Decided arm — the moment a block becomes final

The other load-bearing arm. Add:

            AppMsg::Decided {
                certificate, reply, ..
            } => {
                let hash = certificate.value_id;
                bridge.commit(hash).await?;
                decided.push(hash);
                current_parent = hash;

                if decided.len() >= stop_after_decisions {
                    // Send a reply so consensus doesn't hang waiting on us before
                    // we drop the channel.
                    let next_height = certificate.height.increment();
                    let _ = reply.send(Next::Start(next_height, validator_set.clone()));
                    return Ok(decided);
                }

                let next_height = certificate.height.increment();
                current_height = next_height;
                if reply
                    .send(Next::Start(next_height, validator_set.clone()))
                    .is_err()
                {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (Decided)");
                }
            }

The engine says "a value was decided at height H — here's the certificate." We:

  1. Extract the decided hash from certificate.value_id.
  2. bridge.commit(hash).await — durably mark this block as the canonical chain head in the EL. For the in-memory bridge, just records; for live Reth, executes forkchoice update.
  3. Append to decided and update current_parent so the next GetValue builds on this hash.
  4. Check exit condition — if we've hit stop_after_decisions, reply with Next::Start(next_height, ...) (so the engine doesn't hang waiting) and return. This is what makes the test exit cleanly in 0.02s.
  5. Otherwise reply with Next::Start(next_height, validator_set) — "yes, please continue at the next height, here's the validator set" — and loop.

Step 6: The other 7 arms — stubs and no-ops

The remaining arms are short. Add:

            AppMsg::ExtendVote { reply, .. } => {
                if reply.send(None).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (ExtendVote)");
                }
            }

            AppMsg::VerifyVoteExtension { reply, .. } => {
                if reply.send(Ok(())).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (VerifyVoteExtension)");
                }
            }

            AppMsg::RestreamProposal { .. } => {
                // Single-validator mode never re-streams.
            }

            AppMsg::GetHistoryMinHeight { reply } => {
                if reply.send(OpenHlHeight::INITIAL).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (GetHistoryMinHeight)");
                }
            }

            AppMsg::ReceivedProposalPart { reply, .. } => {
                // ProposalOnly value-payload mode — proposal parts never arrive.
                if reply.send(None).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (ReceivedProposalPart)");
                }
            }

            AppMsg::GetValidatorSet { reply, .. } => {
                if reply.send(Some(validator_set.clone())).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (GetValidatorSet)");
                }
            }

            AppMsg::GetDecidedValue { reply, .. } => {
                if reply.send(None).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (GetDecidedValue)");
                }
            }

            AppMsg::ProcessSyncedValue { reply, .. } => {
                if reply.send(None).is_err() {
                    tracing::warn!("{APP_REPLY_WAIT_LOG} (ProcessSyncedValue)");
                }
            }

Eight more arms, four categories:

  • Vote extensions (ExtendVote, VerifyVoteExtension) — reply None / Ok(()). Vote extensions are unused at v0 (mirror of how OpenHlSigningProvider::sign_vote_extension signs empty bytes).
  • No-ops (RestreamProposal) — single-validator never re-streams a proposal, so we do nothing. No reply expected.
  • History/sync (GetHistoryMinHeight, GetValidatorSet, GetDecidedValue, ProcessSyncedValue) — used during peer catch-up. We reply with defaults: INITIAL height (we have no history), the current validator set, None for "give me a past block." No peers means catch-up is never exercised, but the engine asks anyway.
  • ProposalOnly mode (ReceivedProposalPart) — since OpenHlConfig sets ValuePayload::ProposalOnly, proposal parts never arrive. We still need to handle the variant; reply None.

Step 7: The default_attrs helper

Below the function:

fn default_attrs() -> PayloadAttrs {
    PayloadAttrs {
        timestamp: 0,
        fee_recipient: [0u8; 20],
        prev_randao: [0u8; 32],
    }
}

Three zero fields, all of which the bridge accepts. In Lesson 12 these'll be real:

  • timestamp will come from the engine (or a wall clock if testing).
  • fee_recipient will come from the validator's configured payout address.
  • prev_randao will be derived from the previous block's hash via BLS.

For now, all zeros — the test doesn't care, and the in-memory bridge doesn't validate them.

Step 8: Wire engine_app.rs into lib.rs

//! Consensus layer — Malachite BFT.

pub mod bridge;
pub mod codec;
pub mod context;
pub mod engine_app;
pub mod node;
pub mod signing;
pub mod signing_provider;
pub mod types;

pub use context::OpenHlContext;

Step 9: Add the integration test + StubBridge

At the bottom of engine_app.rs:

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bridge::BridgeError;
    use crate::node::OpenHlNode;
    use crate::types::{OpenHlAddress, OpenHlValidator};
    use async_trait::async_trait;
    use informalsystems_malachitebft_app::node::{Node as _, NodeHandle as _};
    use informalsystems_malachitebft_signing_ed25519::PrivateKey;
    use openhl_types::{ExecutedBlock, PayloadId, PayloadStatus};
    use rand::rngs::OsRng;
    use sha2::{Digest, Sha256};
    use std::sync::Mutex;
    use std::time::Duration;

    #[derive(Debug, Default)]
    struct StubBridge {
        last_built: Mutex<Option<BlockHash>>,
        committed: Mutex<Vec<BlockHash>>,
    }

    #[async_trait]
    impl ConsensusBridge for StubBridge {
        async fn build_payload(
            &self,
            _parent: BlockHash,
            _attrs: PayloadAttrs,
        ) -> Result<PayloadId, BridgeError> {
            let hash = BlockHash([0x42u8; 32]);
            *self.last_built.lock().expect("poisoned") = Some(hash);
            Ok(PayloadId(1))
        }

        async fn payload_ready(
            &self,
            _id: PayloadId,
        ) -> Result<ExecutedBlock, BridgeError> {
            Ok(ExecutedBlock {
                hash: BlockHash([0x42u8; 32]),
                parent_hash: BlockHash([0u8; 32]),
                number: 1,
                state_root: [0u8; 32],
            })
        }

        async fn validate_payload(
            &self,
            _block: &ExecutedBlock,
        ) -> Result<PayloadStatus, BridgeError> {
            Ok(PayloadStatus::Valid)
        }

        async fn commit(&self, block_hash: BlockHash) -> Result<(), BridgeError> {
            self.committed.lock().expect("poisoned").push(block_hash);
            Ok(())
        }
    }

    fn make_test_node(home_dir: std::path::PathBuf) -> OpenHlNode {
        let sk = PrivateKey::generate(OsRng);
        let pk = sk.public_key();
        let digest = Sha256::digest(pk.as_bytes());
        let mut addr_bytes = [0u8; 20];
        addr_bytes.copy_from_slice(&digest[12..32]);
        let address = OpenHlAddress(addr_bytes);
        let validator_set = OpenHlValidatorSet::new(vec![OpenHlValidator::new(address, pk, 1)]);
        OpenHlNode::new(sk, validator_set, home_dir, "openhl-engine-test")
    }

    /// End-to-end: spawn the engine actor system, drive one block through the
    /// `AppMsg` loop, assert the bridge built+committed exactly the hash the
    /// engine decided on.
    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    async fn first_block_via_engine_actors() {
        let tmp = tempfile::tempdir().unwrap();
        let node = make_test_node(tmp.path().to_path_buf());
        let validator_set = node.validator_set.clone();

        let handle = node.start().await.expect("start_engine failed");
        let channels = handle
            .take_channels()
            .await
            .expect("channels available exactly once");

        let bridge = Arc::new(StubBridge::default());
        let bridge_for_check = bridge.clone();

        let app_task = tokio::spawn(run_engine_app(bridge, channels, validator_set, 1));

        let decisions = tokio::time::timeout(Duration::from_secs(15), app_task)
            .await
            .expect("app loop timed out")
            .expect("app task panicked")
            .expect("app loop returned error");

        assert_eq!(decisions.len(), 1, "expected exactly one decided block");
        let decided_hash = decisions[0];

        let committed = bridge_for_check.committed.lock().unwrap().clone();
        assert_eq!(committed, vec![decided_hash], "bridge must commit decided hash");
        assert_eq!(
            *bridge_for_check.last_built.lock().unwrap(),
            Some(decided_hash),
            "decided hash must match what we built",
        );

        handle.kill(None).await.unwrap();
    }
}

Three pieces:

  • StubBridge — a ConsensusBridge that always returns BlockHash([0x42; 32]) for everything. Production-grade test fixture pattern: in-memory state (Mutex<Option<...>> and Mutex<Vec<...>>), Arc-able, async-friendly. The test can read last_built and committed after the loop runs to check what the bridge saw.
  • make_test_node — same single-validator construction we used in Lesson 9 (OpenHlNode::new with one validator).
  • first_block_via_engine_actors — the integration test. Steps:
    1. Spawn the engine via node.start().await.
    2. Take channels via handle.take_channels().await.
    3. Spawn the app loop in a tokio::spawn task with the bridge + channels + validator set + stop_after_decisions = 1.
    4. Use tokio::time::timeout(Duration::from_secs(15), app_task) to bound test runtime — if anything hangs, fail in 15s rather than forever.
    5. Unwrap the nested Results. The triple .expect(...) unwinds: timeout → panic → loop error.
    6. Assert three things: decisions is exactly 1 entry, bridge committed that exact hash, bridge built exactly that hash. Together these prove the full pipeline: engine → app → bridge → engine → app.
    7. handle.kill(None) for cleanup.

Test

cargo test -p openhl-consensus first_block_via_engine_actors

After ~5 seconds (compile + first run):

running 1 test
test engine_app::tests::first_block_via_engine_actors ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

The test itself runs in ~0.02 seconds; the 5 seconds is cargo test's overhead.

To verify everything passes:

cargo test -p openhl-consensus

…should produce 21 tests passing.

Common errors and fixes:

  • Test hangs > 15s — the tokio::time::timeout fires. The cause is always in the same family: one of the ConsensusReady / GetValue / Decided arms either forgot reply.send(...) or accidentally dropped the oneshot::Sender mid-flow. The engine actor stays parked forever until the receiver responds — no timeout, no panic, just silent waiting. The Decided exit path is the easiest to miss — early returns make it tempting to skip the reply. Always send the reply before returning. Re-walk Steps 3-5 and verify that the reply.send(...) line is reachable from every control-flow path through the match arm.
  • error[E0277]: ConsensusBridge is not Send — bridge needs + Send + Sync bounds. Or your impl uses std::sync::Mutex (which is Send) but you forgot the Send annotation on the trait. Check bridge.rs.
  • bridge.committed.lock().expect("poisoned") panic — only happens if a task panicked while holding the mutex. Usually means a panic in the bridge impl. Check the bridge's build_payload / commit for panics.
  • assert_eq!(decisions.len(), 1) firesdecisions is empty. The loop never hit Decided. Most likely cause: forgot to handle GetValue (the engine waits for a LocallyProposedValue reply, never moves on without it). Re-check Step 4.

Design reflection

Three load-bearing decisions encoded here:

  1. run_engine_app is generic over B: ConsensusBridge + 'static. The same loop works with StubBridge (test), InMemoryEvmBridge (Lesson 4), RethEvmBridge (Lesson 5), and LiveRethEvmBridge (Lesson 12). The bridge's responsibility is to execute; the app loop's responsibility is to route. One implementation handles all four bridge variants.

  2. stop_after_decisions is a test ergonomic, not a production feature. Real validators use usize::MAX. The test uses 1. The presence of this parameter signals that the function is designed to be testable — you can drive it to a known finite state and assert without infrastructure for graceful shutdown. Test ergonomics deserve API surface.

  3. Closed reply channels are logged, not propagated. A closed oneshot::Sender reflects an engine that gave up before our reply — usually because the actor was killed externally. Propagating this as an error would mask actual problems with noise. Logging via tracing::warn! lets operators investigate if it's frequent without breaking the loop. The right error-handling policy depends on whether the caller can act on the failure.

Answer key

cd ~/code/openhl-reference
git checkout 708472c
diff -u ~/code/my-openhl/crates/consensus/src/engine_app.rs ./crates/consensus/src/engine_app.rs
diff -u ~/code/my-openhl/crates/consensus/Cargo.toml ./crates/consensus/Cargo.toml
diff -u ~/code/my-openhl/crates/consensus/src/lib.rs ./crates/consensus/src/lib.rs

The reference at 708472c includes 282 lines of engine_app.rs. The 12 AppMsg arms (5 substantive + 7 trivial), the StubBridge test fixture, and the integration test should match closely. Doc-comment wording can vary.

Return:

git checkout main

Common questions

Q: What's the difference between the engine's recv() channel and the engine's subscribe() event stream? The recv() channel (channels.consensus) is for imperative messages requiring a reply: "build a value", "validate this", "decided at H." The subscribe() event stream is for broadcast notifications without replies: "a round started", "a peer dialed in." The two flow in different directions: channel = engine→app (questions), events = engine→all-subscribers (announcements). Lesson 9's OpenHlNodeHandle::subscribe is a placeholder; we don't actually consume events until Lesson 12.

Q: Why don't we test individual AppMsg arms — only the integration test? Because the arms are not independent. The engine sends them in a specific order: ConsensusReadyGetValidatorSetStartedRoundGetValueDecided. Testing them in isolation would require building a fake engine that sends them in that order, which is more complex than just spinning up the real engine for one block. The integration test is cheaper to write and proves more. Lesson 11 will add multi-validator tests where individual-arm tests do make sense (peer sync, vote extensions).

Q: Why is validator_set: OpenHlValidatorSet taken by value instead of Arc<...>? Because OpenHlValidatorSet is small (one validator at v0) and Clone. The cost of cloning is bytes-of-the-struct, not bytes-of-the-set. If validator sets grew to 100+ entries, switching to Arc would be worthwhile.

Q: What happens if bridge.commit(hash) fails? The ? operator propagates the BridgeError up as eyre::Result::Err(...). The app_task in the test gets Err(...), the triple-unwrap fails on the inner expect, and the test panics with the bridge error. This is the intended behavior — commit failure is unrecoverable. Production code would either retry (if transient) or shut down and alert (if persistent).

Next lesson (Lesson 11)

Stage 6 is now done. Stage 7 starts: replace InMemoryEvmBridge with a real Reth EthereumNode. Lesson 11 covers the dev node bootstrap — getting Reth to spawn as a tokio task alongside our consensus actors, sharing the same runtime. Lesson 12 wires LiveRethEvmBridge (the live Reth equivalent of Lesson 5's RethEvmBridge). After Lesson 12 you'll have a Reth-backed devnet that processes the SAME AppMsg loop you just wrote — same run_engine_app, swap one trait impl, get a real EVM execution layer.

Summary (3 lines)

  • run_engine_app drives actor pipeline end-to-end. First block: Proposal → Vote → Commit → bridge.apply → state advances.
  • Actor pattern with mpsc channels. Single-validator means votes pass immediately. Multi-block test verifies chain continuation.
  • Performance: ~100ms per block in test; ~1s production. Next module: live Reth.