Lesson 10 — run_engine_app and the first block through the actor pipeline
Question
run_engine_app drives the actor pipeline end-to-end. The first block flows: Proposal → Vote → Vote (others) → Commit → Bridge::apply → state advances. End-to-end consensus working.
Principle (minimum model)
run_engine_apporchestrator. Spawns OpenHlNode in a tokio task + a test driver that simulates Malachite's perception of "I am the leader at height 1".- Actor pattern. Malachite + bridge + signing provider each run as actors with their own mpsc channels. Loose coupling; testable.
- First block flow. Driver → "you are leader at height 1" → Malachite calls bridge.propose → Block built → Vote (single validator passes 2/3) → bridge.apply → state stored. ~5 ms.
- Assertion: block 1 committed. Bridge.get_best_block returns the new block.
- Assertion: state advanced. Bridge's internal HashMap has the new block hash → block mapping.
- Multi-block test. Repeat for blocks 2, 3, 4. Asserts chain continues; parent hashes link correctly.
- Performance. Single-validator devnet should produce one block per ~100ms (network simulation in-process). Real production is ~1s.
Worked example + steps
Lesson 10 — run_engine_app and the first block through the actor pipeline
Goal
Concepts you'll grasp in this lesson:
- The
AppMsgrouting loop — Malachite's engine sendsConsensusReady / GetValidatorSet / StartedRound / GetValue / Decided / …over a single channel. The app loop is awhile let Some(msg) = recv().awaitmatching each variant and either replying viaoneshot::Senderor driving the bridge. This is the only glue between Malachite and your EL. - Generic-over-bridge polymorphism —
run_engine_app<B: ConsensusBridge>works forStubBridge,InMemoryEvmBridge,RethEvmBridge, and (eventually)LiveRethEvmBridge. One routing function, four backends. The trait surface from Lesson 3 pays off here. stop_after_decisionsas test ergonomics — production validators runusize::MAX. Tests pass1. A parameter that exists only so the function is finite-state-testable is a legitimate API choice; test ergonomics deserve API surface.- Reply channels can close mid-flight — when an engine actor dies before we reply, the
oneshot::Sender::send()errors. Logging viatracing::warn!(not propagating) is correct: propagating would mask actual errors with noise; the operator can still investigate via logs. - Channel vs. event-stream message flow —
channels.consensus.recv()carries imperative messages that need replies;subscribe()carries broadcast notifications. The app loop only deals with the former in Lesson 10. - Why integration > unit tests at this layer — engine
AppMsgarms arrive in a specific order. Faking that order is more work than spinning up the real engine for one block. The integration test is cheaper and proves more.
Verification:
cargo test -p openhl-consensus
…passes 21 tests (20 from Lesson 9 + 1 new integration test). The new test:
test engine_app::tests::first_block_via_engine_actors ... ok
…spawns the Malachite actor system, drives a real consensus round through it, asserts the bridge committed exactly the hash the engine decided on. Wall-clock: 0.02 seconds. This is the milestone where your code stops being "the engine boots" and becomes "the engine produces blocks."
Specific changes:
crates/consensus/src/engine_app.rs— new file (~282 lines). Therun_engine_app<B: ConsensusBridge + 'static>loop readsAppMsg<OpenHlContext>fromChannels<OpenHlContext>::consensus, dispatches 12 message arms (5 substantive + 7 trivial), and returns the list of decided hashes.StubBridgetest fixture + thefirst_block_via_engine_actorsintegration test live in the same file.crates/consensus/src/lib.rs— wirespub mod engine_app;.
Recap
After Lesson 9 your openhl-consensus crate has:
crates/consensus/src/lib.rs — pub mod bridge, codec, context, node, signing, signing_provider, types
crates/consensus/src/node.rs — OpenHlNode + start_engine works (smoke test passes)
crates/consensus/src/codec.rs — OpenHlCodec
crates/consensus/src/signing_provider.rs — SigningProvider impl
crates/consensus/src/context.rs — Context impl
crates/consensus/src/types/ — 7 type files
crates/consensus/src/bridge.rs — ConsensusBridge trait + InMemoryEvmBridge
cargo test -p openhl-consensus passes 20 tests. The engine boots and tears down cleanly — but it's silent. Once start_engine returns, the engine's actors immediately start sending AppMsg::ConsensusReady and waiting for a reply. Nothing replies. The actors park. Lesson 10 fixes that.
Plan
Five things:
- Add
tracingtocrates/consensus/Cargo.toml— used by thetracing::warn!calls in the loop's "channel-closed" paths. - Create
crates/consensus/src/engine_app.rswith therun_engine_app<B>async function generic overB: ConsensusBridge, plus adefault_attrs()helper. About 130 lines of routing logic. - Wire
pub mod engine_app;intolib.rs. - Add the integration test
first_block_via_engine_actorsplus aStubBridgetest fixture that implsConsensusBridgesynchronously in memory. - Run
cargo test -p openhl-consensus first_block_via_engine_actors— passes in ~0.02 seconds. Stare at it.
This lesson teaches the actor-message-loop pattern. Most consensus engines (CometBFT, Hotstuff, Aura) have some "application interface" but they vary: callbacks, gRPC services, FFI bindings. Malachite's approach is tokio::mpsc channels of typed messages — strongly typed, async-native, single-threaded per channel. Your run_engine_app is the consumer of those messages; the engine actors are the producer. Once you understand this pattern, every chain framework's "application interface" reduces to a variant of it.
Walk-through
Step 1: Add tracing to Cargo.toml
Open crates/consensus/Cargo.toml. After Lesson 9 the [dependencies] section ends with:
sha2 = "0.10"
serde = { workspace = true }
tokio = { workspace = true }
Add one line:
tracing = { workspace = true }
tracing is the workspace standard logging crate — we'll use only tracing::warn! here, for one specific case: when a reply channel is closed because the engine has terminated mid-conversation. Closed reply channels in tokio::mpsc::oneshot aren't bugs in our code; they're a sign that something upstream gave up. We log them but don't propagate.
Step 2: Create crates/consensus/src/engine_app.rs — imports and signature
Start with module doc + imports:
//! Engine app loop — consumes `AppMsg` from the Malachite engine and routes
//! every consensus-relevant event through a [`ConsensusBridge`].
//!
//! This is the missing half of Lesson 9: with `OpenHlNode::start()` spinning
//! up the actor system, this loop is what makes those actors do useful work.
//! Once a `Decided` arrives we commit through the bridge, increment height,
//! and (optionally) stop after N decisions for tests.
use std::sync::Arc;
use eyre::eyre;
use informalsystems_malachitebft_app::engine::host::Next;
use informalsystems_malachitebft_app_channel::{AppMsg, Channels};
use informalsystems_malachitebft_core_types::Height as _;
use openhl_types::{BlockHash, PayloadAttrs};
use crate::bridge::ConsensusBridge;
use crate::context::OpenHlContext;
use crate::types::{OpenHlHeight, OpenHlValidatorSet, OpenHlValue};
const APP_REPLY_WAIT_LOG: &str = "engine_app: peer replied unsuccessfully (channel closed)";
Imports of note:
AppMsg, Channelsfromapp_channel— the message enum and channel-bundle type.Channels::consensusis the mpsc receiver forAppMsg<Ctx>.Nextfromapp::engine::host— the enum used inDecided's reply to tell the engine "what's next?" (start the next height, halt, etc.).Height as _— imports the traitHeightfor its.increment()method without bringing the name into scope (we use ourOpenHlHeightnewtype throughout).Arc—run_engine_apptakes the bridge asArc<B>so it can clone the reference into a long-running task.
Now the function signature:
/// Drive the engine app loop until `stop_after_decisions` decisions have been
/// committed through the bridge, or the consensus channel closes.
///
/// Returns the `BlockHash`es that were decided, in order. Single-validator mode
/// uses this with `stop_after_decisions = 1` to exit after the first block.
#[allow(clippy::too_many_lines)] // 12 AppMsg arms — laid out flat for Lesson 11's match-by-match walk
pub async fn run_engine_app<B>(
bridge: Arc<B>,
mut channels: Channels<OpenHlContext>,
validator_set: OpenHlValidatorSet,
stop_after_decisions: usize,
) -> eyre::Result<Vec<BlockHash>>
where
B: ConsensusBridge + 'static,
{
let mut decided: Vec<BlockHash> = Vec::new();
let mut current_parent = BlockHash([0u8; 32]);
let mut current_height = OpenHlHeight::INITIAL;
while let Some(msg) = channels.consensus.recv().await {
match msg {
// ... 12 arms come here ...
}
}
Err(eyre!(
"consensus channel closed after {n} decisions (wanted {stop_after_decisions})",
n = decided.len()
))
}
Five parameters/state values worth noting:
bridge: Arc<B>— theConsensusBridgeimplementor that the app loop calls forbuild_payload,payload_ready,commit.Arcbecause we'll later want to share it; generic overBso this same loop works forInMemoryEvmBridge,RethEvmBridge, andLiveRethEvmBridge(Lesson 12).channels: Channels<OpenHlContext>— taken by value (thenmutto callrecv). We own the channels aftertake_channels()in the caller.validator_set: OpenHlValidatorSet— the single-validator set we'll echo back onConsensusReadyandGetValidatorSet.stop_after_decisions: usize— test ergonomics. Single-validator devnets use1; multi-validator deployments would useusize::MAX.
Three loop-state values:
decided: Vec<BlockHash>— accumulator; returned at end.current_parent: BlockHash— what the next block builds on top of. Starts at all-zero (genesis); becomes the just-decided hash on each commit.current_height: OpenHlHeight— what height the engine is on. Starts atINITIAL; gets bumped byStartedRoundandDecided.
The while let Some(msg) = channels.consensus.recv().await loop is the heart of an actor-message app: receive a message, dispatch by variant, reply (if applicable), continue. When recv() returns None, the channel is closed — that's our error path.
Before writing the 12 arms one at a time, having a single picture of what this loop is mediating makes "who sent this, who's expecting a reply" trivial to track per arm:
[ Malachite Engine actors ] (producer side — emits proposals, votes, Decided, etc.)
│
│ AppMsg::ConsensusReady { reply, validator_set }
│ AppMsg::GetValue { reply, height, round, ... }
│ AppMsg::Decided { reply, certificate, ... }
│ … (12 variants total, all funneled into a single `tokio::mpsc` channel)
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ ◆ app_task: the run_engine_app loop (consumer — what this lesson builds, │
│ the central dispatcher) │
│ │
│ while let Some(msg) = channels.consensus.recv().await { match msg { … } } │
│ │
│ Each arm does exactly one or two things: │
│ 1. reply.send(...) ──► unblock the engine by giving it a value │
│ 2. bridge.<method>().await ──► drive the EL and pick up the result │
└──────────────────────────────────────────────────────────────────────────┘
│ ▲
│ bridge.build_payload / payload_ready / commit │ FillResult /
▼ │ ExecutedBlock / Ok
[ ConsensusBridge impl ] (StubBridge / InMemoryEvmBridge / RethEvmBridge / LiveRethEvmBridge)
── One representative cycle ─────────────────────────────────────────────────
① ConsensusReady ── engine ──► app: "I'm ready — hand me the validator set."
app ──► engine: reply.send(validator_set)
② GetValue ── engine ──► app: "Propose the next block at (height, round)."
app ──► bridge.build_payload + payload_ready
app ──► engine: reply.send(LocallyProposedValue(value))
③ Decided ── engine ──► app: "2/3+ committed; here's the certificate."
app ──► bridge.commit(hash)
app ──► engine: reply.send(Next::Start) or Next::Stop
decided.push(hash)
if decided.len() >= stop_after_decisions { return Ok(decided) }
Three things this picture pins down: (a) Messages flow engine → app one-way, but each message carries an oneshot::Sender (reply), so the engine side stays parked until the app sends the reply — forget the reply and the engine waits forever. (b) The app is a router between engine and bridge, not a logic core — the heavy lifting (build/commit) lives in the bridge, the consensus driving lives in the engine. (c) Because the bridge is B: ConsensusBridge, the exact same loop runs against StubBridge / InMemoryEvmBridge / RethEvmBridge / LiveRethEvmBridge — the investment in cleanly defining the trait surface back in Lesson 3 pays off here as polymorphism.
Step 3: The ConsensusReady and StartedRound arms
Add these inside the match:
AppMsg::ConsensusReady { reply, .. } => {
if reply
.send((current_height, validator_set.clone()))
.is_err()
{
tracing::warn!("{APP_REPLY_WAIT_LOG} (ConsensusReady)");
}
}
AppMsg::StartedRound {
height,
round: _,
reply_value,
..
} => {
current_height = height;
if reply_value.send(Vec::new()).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (StartedRound)");
}
}
ConsensusReady is the engine asking "are you ready for me to start consensus? at what height and with what validator set?" Our reply is the tuple (current_height, validator_set.clone()). Each reply is a oneshot::Sender<...> — send() consumes it and returns Result<(), T> where T is what we tried to send (returned on error). We don't recover from a closed reply channel; we just log.
StartedRound is the engine telling us a new round began at some height. We update our current_height and reply with an empty Vec (the list of stored proposed values for this height; we have none cached). The round: _ underscore unbinds the round value because we don't need it in single-validator mode — the engine won't gossip-restream a value across rounds when there's no peer to send to.
Step 4: The GetValue arm — building a proposal
This is the load-bearing arm. Add:
AppMsg::GetValue {
height,
round,
timeout: _,
reply,
} => {
let attrs = default_attrs();
let id = bridge.build_payload(current_parent, attrs).await?;
let block = bridge.payload_ready(id).await?;
let value = OpenHlValue(block.hash);
let lpv =
informalsystems_malachitebft_app_channel::app::types::LocallyProposedValue::new(
height, round, value,
);
if reply.send(lpv).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (GetValue)");
}
}
The engine asks "propose a value for height H, round R, with timeout T." We:
- Build payload attrs — default values for now (
timestamp: 0, fee_recipient: zero, prev_randao: zero). In Lesson 12 these'll come from the engine's notion of time + the validator's address. bridge.build_payload(current_parent, attrs).await— kicks the EL: "build me a block on top ofcurrent_parentwith these attrs." Returns aPayloadId— a handle the EL uses to track the in-flight build.bridge.payload_ready(id).await— fetch the completed block. The in-memory bridge from Lessons 4–5 produces immediately; live Reth (Lesson 12 onward) might take 10-50ms.- Wrap the resulting
block.hashinOpenHlValueand thenLocallyProposedValue::new(height, round, value). - Reply to the engine with that
LocallyProposedValue.
The ? operator on build_payload and payload_ready propagates BridgeError up to eyre::Result. If the EL crashes mid-build, the app loop returns an error and the test fails loudly.
Step 5: The Decided arm — the moment a block becomes final
The other load-bearing arm. Add:
AppMsg::Decided {
certificate, reply, ..
} => {
let hash = certificate.value_id;
bridge.commit(hash).await?;
decided.push(hash);
current_parent = hash;
if decided.len() >= stop_after_decisions {
// Send a reply so consensus doesn't hang waiting on us before
// we drop the channel.
let next_height = certificate.height.increment();
let _ = reply.send(Next::Start(next_height, validator_set.clone()));
return Ok(decided);
}
let next_height = certificate.height.increment();
current_height = next_height;
if reply
.send(Next::Start(next_height, validator_set.clone()))
.is_err()
{
tracing::warn!("{APP_REPLY_WAIT_LOG} (Decided)");
}
}
The engine says "a value was decided at height H — here's the certificate." We:
- Extract the decided hash from
certificate.value_id. bridge.commit(hash).await— durably mark this block as the canonical chain head in the EL. For the in-memory bridge, just records; for live Reth, executes forkchoice update.- Append to
decidedand updatecurrent_parentso the nextGetValuebuilds on this hash. - Check exit condition — if we've hit
stop_after_decisions, reply withNext::Start(next_height, ...)(so the engine doesn't hang waiting) and return. This is what makes the test exit cleanly in 0.02s. - Otherwise reply with
Next::Start(next_height, validator_set)— "yes, please continue at the next height, here's the validator set" — and loop.
Step 6: The other 7 arms — stubs and no-ops
The remaining arms are short. Add:
AppMsg::ExtendVote { reply, .. } => {
if reply.send(None).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (ExtendVote)");
}
}
AppMsg::VerifyVoteExtension { reply, .. } => {
if reply.send(Ok(())).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (VerifyVoteExtension)");
}
}
AppMsg::RestreamProposal { .. } => {
// Single-validator mode never re-streams.
}
AppMsg::GetHistoryMinHeight { reply } => {
if reply.send(OpenHlHeight::INITIAL).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (GetHistoryMinHeight)");
}
}
AppMsg::ReceivedProposalPart { reply, .. } => {
// ProposalOnly value-payload mode — proposal parts never arrive.
if reply.send(None).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (ReceivedProposalPart)");
}
}
AppMsg::GetValidatorSet { reply, .. } => {
if reply.send(Some(validator_set.clone())).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (GetValidatorSet)");
}
}
AppMsg::GetDecidedValue { reply, .. } => {
if reply.send(None).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (GetDecidedValue)");
}
}
AppMsg::ProcessSyncedValue { reply, .. } => {
if reply.send(None).is_err() {
tracing::warn!("{APP_REPLY_WAIT_LOG} (ProcessSyncedValue)");
}
}
Eight more arms, four categories:
- Vote extensions (
ExtendVote,VerifyVoteExtension) — replyNone/Ok(()). Vote extensions are unused at v0 (mirror of howOpenHlSigningProvider::sign_vote_extensionsigns empty bytes). - No-ops (
RestreamProposal) — single-validator never re-streams a proposal, so we do nothing. No reply expected. - History/sync (
GetHistoryMinHeight,GetValidatorSet,GetDecidedValue,ProcessSyncedValue) — used during peer catch-up. We reply with defaults:INITIALheight (we have no history), the current validator set,Nonefor "give me a past block." No peers means catch-up is never exercised, but the engine asks anyway. - ProposalOnly mode (
ReceivedProposalPart) — sinceOpenHlConfigsetsValuePayload::ProposalOnly, proposal parts never arrive. We still need to handle the variant; replyNone.
Step 7: The default_attrs helper
Below the function:
fn default_attrs() -> PayloadAttrs {
PayloadAttrs {
timestamp: 0,
fee_recipient: [0u8; 20],
prev_randao: [0u8; 32],
}
}
Three zero fields, all of which the bridge accepts. In Lesson 12 these'll be real:
timestampwill come from the engine (or a wall clock if testing).fee_recipientwill come from the validator's configured payout address.prev_randaowill be derived from the previous block's hash via BLS.
For now, all zeros — the test doesn't care, and the in-memory bridge doesn't validate them.
Step 8: Wire engine_app.rs into lib.rs
//! Consensus layer — Malachite BFT.
pub mod bridge;
pub mod codec;
pub mod context;
pub mod engine_app;
pub mod node;
pub mod signing;
pub mod signing_provider;
pub mod types;
pub use context::OpenHlContext;
Step 9: Add the integration test + StubBridge
At the bottom of engine_app.rs:
#[cfg(test)]
mod tests {
use super::*;
use crate::bridge::BridgeError;
use crate::node::OpenHlNode;
use crate::types::{OpenHlAddress, OpenHlValidator};
use async_trait::async_trait;
use informalsystems_malachitebft_app::node::{Node as _, NodeHandle as _};
use informalsystems_malachitebft_signing_ed25519::PrivateKey;
use openhl_types::{ExecutedBlock, PayloadId, PayloadStatus};
use rand::rngs::OsRng;
use sha2::{Digest, Sha256};
use std::sync::Mutex;
use std::time::Duration;
#[derive(Debug, Default)]
struct StubBridge {
last_built: Mutex<Option<BlockHash>>,
committed: Mutex<Vec<BlockHash>>,
}
#[async_trait]
impl ConsensusBridge for StubBridge {
async fn build_payload(
&self,
_parent: BlockHash,
_attrs: PayloadAttrs,
) -> Result<PayloadId, BridgeError> {
let hash = BlockHash([0x42u8; 32]);
*self.last_built.lock().expect("poisoned") = Some(hash);
Ok(PayloadId(1))
}
async fn payload_ready(
&self,
_id: PayloadId,
) -> Result<ExecutedBlock, BridgeError> {
Ok(ExecutedBlock {
hash: BlockHash([0x42u8; 32]),
parent_hash: BlockHash([0u8; 32]),
number: 1,
state_root: [0u8; 32],
})
}
async fn validate_payload(
&self,
_block: &ExecutedBlock,
) -> Result<PayloadStatus, BridgeError> {
Ok(PayloadStatus::Valid)
}
async fn commit(&self, block_hash: BlockHash) -> Result<(), BridgeError> {
self.committed.lock().expect("poisoned").push(block_hash);
Ok(())
}
}
fn make_test_node(home_dir: std::path::PathBuf) -> OpenHlNode {
let sk = PrivateKey::generate(OsRng);
let pk = sk.public_key();
let digest = Sha256::digest(pk.as_bytes());
let mut addr_bytes = [0u8; 20];
addr_bytes.copy_from_slice(&digest[12..32]);
let address = OpenHlAddress(addr_bytes);
let validator_set = OpenHlValidatorSet::new(vec![OpenHlValidator::new(address, pk, 1)]);
OpenHlNode::new(sk, validator_set, home_dir, "openhl-engine-test")
}
/// End-to-end: spawn the engine actor system, drive one block through the
/// `AppMsg` loop, assert the bridge built+committed exactly the hash the
/// engine decided on.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn first_block_via_engine_actors() {
let tmp = tempfile::tempdir().unwrap();
let node = make_test_node(tmp.path().to_path_buf());
let validator_set = node.validator_set.clone();
let handle = node.start().await.expect("start_engine failed");
let channels = handle
.take_channels()
.await
.expect("channels available exactly once");
let bridge = Arc::new(StubBridge::default());
let bridge_for_check = bridge.clone();
let app_task = tokio::spawn(run_engine_app(bridge, channels, validator_set, 1));
let decisions = tokio::time::timeout(Duration::from_secs(15), app_task)
.await
.expect("app loop timed out")
.expect("app task panicked")
.expect("app loop returned error");
assert_eq!(decisions.len(), 1, "expected exactly one decided block");
let decided_hash = decisions[0];
let committed = bridge_for_check.committed.lock().unwrap().clone();
assert_eq!(committed, vec![decided_hash], "bridge must commit decided hash");
assert_eq!(
*bridge_for_check.last_built.lock().unwrap(),
Some(decided_hash),
"decided hash must match what we built",
);
handle.kill(None).await.unwrap();
}
}
Three pieces:
StubBridge— aConsensusBridgethat always returnsBlockHash([0x42; 32])for everything. Production-grade test fixture pattern: in-memory state (Mutex<Option<...>>andMutex<Vec<...>>), Arc-able, async-friendly. The test can readlast_builtandcommittedafter the loop runs to check what the bridge saw.make_test_node— same single-validator construction we used in Lesson 9 (OpenHlNode::newwith one validator).first_block_via_engine_actors— the integration test. Steps:- Spawn the engine via
node.start().await. - Take channels via
handle.take_channels().await. - Spawn the app loop in a
tokio::spawntask with the bridge + channels + validator set +stop_after_decisions = 1. - Use
tokio::time::timeout(Duration::from_secs(15), app_task)to bound test runtime — if anything hangs, fail in 15s rather than forever. - Unwrap the nested
Results. The triple.expect(...)unwinds: timeout → panic → loop error. - Assert three things: decisions is exactly 1 entry, bridge committed that exact hash, bridge built exactly that hash. Together these prove the full pipeline: engine → app → bridge → engine → app.
handle.kill(None)for cleanup.
- Spawn the engine via
Test
cargo test -p openhl-consensus first_block_via_engine_actors
After ~5 seconds (compile + first run):
running 1 test
test engine_app::tests::first_block_via_engine_actors ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
The test itself runs in ~0.02 seconds; the 5 seconds is cargo test's overhead.
To verify everything passes:
cargo test -p openhl-consensus
…should produce 21 tests passing.
Common errors and fixes:
- Test hangs > 15s — the
tokio::time::timeoutfires. The cause is always in the same family: one of theConsensusReady/GetValue/Decidedarms either forgotreply.send(...)or accidentally dropped theoneshot::Sendermid-flow. The engine actor stays parked forever until the receiver responds — no timeout, no panic, just silent waiting. TheDecidedexit path is the easiest to miss — early returns make it tempting to skip the reply. Always send the reply before returning. Re-walk Steps 3-5 and verify that thereply.send(...)line is reachable from every control-flow path through the match arm. error[E0277]: ConsensusBridge is not Send— bridge needs+ Send + Syncbounds. Or your impl usesstd::sync::Mutex(which isSend) but you forgot theSendannotation on the trait. Checkbridge.rs.bridge.committed.lock().expect("poisoned")panic — only happens if a task panicked while holding the mutex. Usually means a panic in the bridge impl. Check the bridge'sbuild_payload/commitfor panics.assert_eq!(decisions.len(), 1)fires —decisionsis empty. The loop never hitDecided. Most likely cause: forgot to handleGetValue(the engine waits for aLocallyProposedValuereply, never moves on without it). Re-check Step 4.
Design reflection
Three load-bearing decisions encoded here:
-
run_engine_appis generic overB: ConsensusBridge + 'static. The same loop works withStubBridge(test),InMemoryEvmBridge(Lesson 4),RethEvmBridge(Lesson 5), andLiveRethEvmBridge(Lesson 12). The bridge's responsibility is to execute; the app loop's responsibility is to route. One implementation handles all four bridge variants. -
stop_after_decisionsis a test ergonomic, not a production feature. Real validators useusize::MAX. The test uses1. The presence of this parameter signals that the function is designed to be testable — you can drive it to a known finite state and assert without infrastructure for graceful shutdown. Test ergonomics deserve API surface. -
Closed reply channels are logged, not propagated. A closed
oneshot::Senderreflects an engine that gave up before our reply — usually because the actor was killed externally. Propagating this as an error would mask actual problems with noise. Logging viatracing::warn!lets operators investigate if it's frequent without breaking the loop. The right error-handling policy depends on whether the caller can act on the failure.
Answer key
cd ~/code/openhl-reference
git checkout 708472c
diff -u ~/code/my-openhl/crates/consensus/src/engine_app.rs ./crates/consensus/src/engine_app.rs
diff -u ~/code/my-openhl/crates/consensus/Cargo.toml ./crates/consensus/Cargo.toml
diff -u ~/code/my-openhl/crates/consensus/src/lib.rs ./crates/consensus/src/lib.rs
The reference at 708472c includes 282 lines of engine_app.rs. The 12 AppMsg arms (5 substantive + 7 trivial), the StubBridge test fixture, and the integration test should match closely. Doc-comment wording can vary.
Return:
git checkout main
Common questions
Q: What's the difference between the engine's recv() channel and the engine's subscribe() event stream?
The recv() channel (channels.consensus) is for imperative messages requiring a reply: "build a value", "validate this", "decided at H." The subscribe() event stream is for broadcast notifications without replies: "a round started", "a peer dialed in." The two flow in different directions: channel = engine→app (questions), events = engine→all-subscribers (announcements). Lesson 9's OpenHlNodeHandle::subscribe is a placeholder; we don't actually consume events until Lesson 12.
Q: Why don't we test individual AppMsg arms — only the integration test?
Because the arms are not independent. The engine sends them in a specific order: ConsensusReady → GetValidatorSet → StartedRound → GetValue → Decided. Testing them in isolation would require building a fake engine that sends them in that order, which is more complex than just spinning up the real engine for one block. The integration test is cheaper to write and proves more. Lesson 11 will add multi-validator tests where individual-arm tests do make sense (peer sync, vote extensions).
Q: Why is validator_set: OpenHlValidatorSet taken by value instead of Arc<...>?
Because OpenHlValidatorSet is small (one validator at v0) and Clone. The cost of cloning is bytes-of-the-struct, not bytes-of-the-set. If validator sets grew to 100+ entries, switching to Arc would be worthwhile.
Q: What happens if bridge.commit(hash) fails?
The ? operator propagates the BridgeError up as eyre::Result::Err(...). The app_task in the test gets Err(...), the triple-unwrap fails on the inner expect, and the test panics with the bridge error. This is the intended behavior — commit failure is unrecoverable. Production code would either retry (if transient) or shut down and alert (if persistent).
Next lesson (Lesson 11)
Stage 6 is now done. Stage 7 starts: replace InMemoryEvmBridge with a real Reth EthereumNode. Lesson 11 covers the dev node bootstrap — getting Reth to spawn as a tokio task alongside our consensus actors, sharing the same runtime. Lesson 12 wires LiveRethEvmBridge (the live Reth equivalent of Lesson 5's RethEvmBridge). After Lesson 12 you'll have a Reth-backed devnet that processes the SAME AppMsg loop you just wrote — same run_engine_app, swap one trait impl, get a real EVM execution layer.
Summary (3 lines)
run_engine_appdrives actor pipeline end-to-end. First block: Proposal → Vote → Commit → bridge.apply → state advances.- Actor pattern with mpsc channels. Single-validator means votes pass immediately. Multi-block test verifies chain continuation.
- Performance: ~100ms per block in test; ~1s production. Next module: live Reth.