Introduction
yggr is a Raft implementation in Rust. It is split into four crates:
| crate | what it is |
|---|---|
yggr-core | The protocol. Engine<C> with one method, step(Event<C>) -> Vec<Action<C>>. No sockets, no disk, no async. |
yggr-sim | A deterministic cluster simulator. Drives crashes, partitions, drops, reorderings, and partial flushes against yggr-core and checks safety invariants after every step. |
yggr | The tokio runtime. Node, DiskStorage, TcpTransport. |
yggr-examples | A three-node replicated KV service. |
The split exists so the engine can be embedded in a non-tokio runtime (or driven by a custom transport or storage) without dragging in the runtime, and so the simulator can run the engine deterministically.
What’s implemented
- §5.2 leader election with randomized timeouts
- §5.3 log replication via
AppendEntries - §5.4.1 election restriction
- §5.4.2 current-term commit rule (election no-op)
- §4.3 single-server membership changes
- §7 snapshotting with chunked
InstallSnapshot - §8 linearizable reads via
ReadIndex - Leadership transfer via
TimeoutNow - Async apply (slow
apply()does not stall heartbeats) - Opt-in proposal batching
- Protobuf wire format, TCP transport
- Segmented on-disk log with atomic file writes
Who this is for
- You want a Raft-backed Rust service and would prefer to not write the engine, transport, and storage plumbing yourself. Use
yggr. - You already have an async runtime, a transport, or a storage layer you want to reuse. Use
yggr-coredirectly; see Writing a custom host. - You want to test your own changes to the engine against adversarial schedules. Use
yggr-simas a test library.
Status
Pre-1.0. The public API is stable enough to build on, but expect changes. Safety invariants are checked in the simulator (many chaos seeds per CI run) and at the full-stack level under real tokio tasks with an in-process chaos transport (see The sim harness).
Installation
Add yggr to your Cargo.toml:
[dependencies]
yggr = "0.1"
Or, for the engine without the runtime:
[dependencies]
yggr-core = "0.1"
Requirements
- Rust 1.85+ (edition 2024)
protocinstalled (used byprost-buildto compile the wire format)
On Debian/Ubuntu:
sudo apt install protobuf-compiler
On macOS:
brew install protobuf
A three-node cluster
The yggr-examples crate ships a replicated KV service. To see it running:
./yggr-examples/run-three-node.sh
That brings up three kv processes on localhost, has them elect a leader, and exposes a text protocol you can drive with nc.
What the script does
Each node starts with:
#![allow(unused)]
fn main() {
let config = Config::new(my_node_id, peer_ids);
let storage = DiskStorage::open(&data_dir).await?;
let transport = TcpTransport::start(my_node_id, listen_addr, peer_addrs).await?;
let node = Node::start(config, MyStateMachine::default(), storage, transport).await?;
}
The leader accepts write(cmd) from clients. Followers return NotLeader with a hint so the client can retry against the right node.
Next
Writing a state machine
Your StateMachine is the application-level thing Raft replicates. Three required methods, two optional ones for snapshots.
#![allow(unused)]
fn main() {
use yggr::{DecodeError, StateMachine};
#[derive(Debug, Default)]
struct Counter { value: u64 }
#[derive(Debug, Clone)]
enum CountCmd { Inc(u64) }
impl StateMachine for Counter {
type Command = CountCmd;
type Response = u64;
fn encode_command(c: &CountCmd) -> Vec<u8> {
match c {
CountCmd::Inc(n) => n.to_le_bytes().to_vec(),
}
}
fn decode_command(bytes: &[u8]) -> Result<CountCmd, DecodeError> {
let arr: [u8; 8] = bytes.try_into()
.map_err(|_| DecodeError::new("expected 8 bytes"))?;
Ok(CountCmd::Inc(u64::from_le_bytes(arr)))
}
fn apply(&mut self, cmd: CountCmd) -> u64 {
match cmd {
CountCmd::Inc(n) => { self.value += n; self.value }
}
}
}
}
Rules
applymust be deterministic. The same command on any node must produce the same response and the same state mutation. If it doesn’t, the cluster diverges.encode_commandanddecode_commandmust round-trip. The library stores the bytes in the log and on the wire; the decoder has to accept its own encoder’s output.applyruns on its own tokio task. Slow work doesn’t stall heartbeats, but the driver blocks on send when the apply channel fills. See Configuration.
Snapshots
Override snapshot and restore if your state takes too long to replay from the log:
#![allow(unused)]
fn main() {
fn snapshot(&self) -> Vec<u8> {
bincode::serialize(&self.value).unwrap()
}
fn restore(&mut self, bytes: Vec<u8>) {
self.value = bincode::deserialize(&bytes).unwrap();
}
}
Compression is your decision. Compress inside snapshot, decompress inside restore. The library treats the bytes as opaque through the engine, disk, and wire.
The Node API
Node<S> is the handle to a running node. Cloning it is cheap; every clone shares the driver task.
Starting
#![allow(unused)]
fn main() {
let node = Node::start(config, state_machine, storage, transport).await?;
}
Node::start recovers persisted state, builds the engine, and spawns the driver, ticker, and apply tasks. See Configuration and Bootstrap modes.
Methods
| Method | Behavior |
|---|---|
write(cmd) | Preferred client-write API. Returns when the command commits and applies. |
propose(cmd) | Compatibility alias for write(cmd). |
add_peer(id) / remove_peer(id) | §4.3 single-server membership change. |
read_linearizable(closure) | Run closure against the state machine at a linearizable read point (ReadIndex, §8). |
admin() | Return an operator-facing handle for membership and lifecycle operations. |
current_leader() | Return the current leader hint, if known. |
transfer_leadership_to(peer) | Hand leadership to a follower. |
status() | Current role, term, commit index, known leader. |
node_metrics() | Runtime-facing metrics wrapper around the raw engine counters and gauges. |
shutdown() | Drain and stop every background task. |
See rustdoc for Node for full signatures.
Error types
ProposeError:NotLeader,NoLeader,Busy,Shutdown,DriverDead,Fatal.ReadError:NotLeader,NotReady,SteppedDown,Shutdown,DriverDead,Fatal.TransferLeadershipError:NotLeader,NoLeader,InvalidTarget,Shutdown,DriverDead,Fatal.NodeStartError<E>:Config(ConfigError),Storage(E).
Configuration
Config is a plain struct with public fields. Defaults come from Config::new(node_id, peers).
#![allow(unused)]
fn main() {
let mut config = Config::new(node_id, peers);
config.election_timeout_min_ticks = 10;
config.election_timeout_max_ticks = 20;
config.heartbeat_interval_ticks = 3;
config.tick_interval = Duration::from_millis(50);
}
Timing
| Field | Default | Note |
|---|---|---|
election_timeout_min_ticks | 10 | §5.2 minimum election timeout. |
election_timeout_max_ticks | 20 | Exclusive. Actual timeout is uniform in [min, max). |
heartbeat_interval_ticks | 3 | Leader heartbeat interval. Must be < min. |
tick_interval | 50ms | Wall-clock duration of one engine tick. |
The engine is tick-driven. A tick is whatever you say it is.
Backpressure
| Field | Default | Note |
|---|---|---|
max_pending_proposals | 1024 | propose / add_peer / remove_peer return Busy above this in-flight count. |
max_pending_applies | 4096 | Capacity of the driver → apply-task channel. When full, the driver awaits space. |
Batching
Off by default. Turn it on if you have many concurrent proposals.
| Field | Default | Note |
|---|---|---|
max_batch_delay_ticks | 0 (off) | Hold proposals for up to this many ticks before flushing. |
max_batch_entries | 64 | Flush immediately when the buffer reaches this size. |
With batching on, N concurrent propose calls can commit in a single broadcast and fsync.
Snapshotting
| Field | Default | Note |
|---|---|---|
snapshot_hint_threshold_entries | 1024 | The engine emits Action::SnapshotHint every time this many entries have been applied past the current floor. Set to 0 to disable. |
max_log_entries | 0 (off) | Live-log guardrail. When the number of log entries above the current snapshot floor exceeds this, the engine emits a SnapshotHint regardless of the applied-entries band. Protects against runaway log growth when apply is lagging. |
snapshot_chunk_size_bytes | 64 KiB | Maximum bytes per InstallSnapshot chunk. |
Elections
| Field | Default | Note |
|---|---|---|
pre_vote | true | §9.6 pre-vote. A disrupted follower probes peers before bumping its term, so flapping nodes can’t force the rest of the cluster to step down on every reconnect. |
Reads
| Field | Default | Note |
|---|---|---|
lease_duration_ticks | 0 (off) | §9 leader-lease. When the leader has received a majority AE ack within this many ticks, linearizable reads skip the ReadIndex round-trip and return immediately. Must be strictly less than election_timeout_min_ticks - heartbeat_interval_ticks, otherwise Config::validate rejects it. |
See rustdoc for Config for the full type.
Bootstrap modes
Bootstrap controls how a node comes up. The point is to prevent operators from accidentally booting a would-be joiner as a fresh single-node cluster, which would let it elect itself, commit entries, then try to rejoin a real cluster it has diverged from.
| Variant | When to use |
|---|---|
Bootstrap::NewCluster { members } | First boot of a brand-new cluster. Every founding node starts with the same members set. |
Bootstrap::Join | Adding a node to a running cluster. Starts with an empty peer set; the existing leader calls add_peer to splice it in. |
Bootstrap::Recover | Normal restart. Membership comes from the persisted snapshot and replayed ConfigChange entries. |
Config::new(node_id, peers) picks NewCluster when peers is non-empty and Recover otherwise. Override if you need Join.
Writes
A write submits a command to the cluster. When it returns, the command has committed on a majority and applied on the local leader.
#![allow(unused)]
fn main() {
let response = node.write(CountCmd::Inc(5)).await?;
}
write() is the preferred name. propose() is kept as an alias for the 0.1 migration window — same semantics, same errors.
Contract
When write(cmd).await? returns Ok(response):
cmdis the next Raft log entry at some committed indexi.state_machine.apply(cmd)has run on the local node;responseis what it returned.- Every node in the cluster will replay
cmdat indexion recovery. commit_index >= ion the leader; followers converge as replication completes.
Only a leader accepts a write. Followers reject with NotLeader { leader_hint } when they know who the leader is, NoLeader otherwise.
What a write does not guarantee
- Not a read barrier. Reads issued before the write may see pre-write state. Use
read_linearizableif a subsequent read must observe the write. - Not exactly-once on retry. If a write errors with
ShutdownorDriverDead, the command may or may not have been appended. Your application is responsible for idempotency if this matters — embed a client token in the command and have the state machine de-duplicate. - Not exactly-once across failover. A proposal submitted to a leader that loses leadership mid-flight surfaces
NotLeader; the entry may still have been replicated and committed by the new leader. Same idempotency advice applies.
Batching
Multiple concurrent write() calls are interleaved by the driver and, with batching enabled, packed into a single leader broadcast:
#![allow(unused)]
fn main() {
let mut config = Config::new(my_id, peers);
config.max_batch_delay_ticks = 1; // hold for up to 1 tick
config.max_batch_entries = 64; // flush sooner if the buffer fills
}
Off by default. Enable it when you have many concurrent producers — N writers can commit in one fsync + one broadcast.
Each caller still gets their own response when their entry applies. Batching is transparent to the contract above.
Errors
| Error | What happened | Caller action |
|---|---|---|
NotLeader { leader_hint } | A follower with a known leader. | Retry against leader_hint. |
NoLeader | Follower without a leader, or candidate/pre-candidate. | Back off and retry later. |
Busy | Too many proposals already in flight. | Back off and retry. |
Shutdown | The runtime is stopping. | Stop calling. |
DriverDead | The driver task exited without a clean shutdown. | Fatal; restart the process. |
Fatal | A non-recoverable storage or transport fault. | Fatal; restart the process. |
write_batch (future) follows the same error shape with all-or-error semantics.
Best practices
- Idempotency keys in the command. Even a mid-flight crash can double-apply if your retry logic isn’t careful; bake de-duplication into the state machine.
- Keep commands small and deterministic. The engine treats commands as opaque bytes; the state machine runs
applyon every node. Non-determinism inapplydiverges replicas silently. - Don’t block
apply(). A slow apply stalls every subsequent write. Push heavy work downstream of apply; the engine’s apply task already runs concurrently with the driver.
Linearizable reads
Node::read_linearizable runs a closure against the state machine at a point where the read sees every committed write. No log append, no fsync. This is §8 ReadIndex.
#![allow(unused)]
fn main() {
let value = node.read_linearizable(|sm: &MyState| sm.value).await?;
}
Protocol
- The leader records
commit_indexas the read’sread_index. - The leader triggers a heartbeat round. When a majority of peers ack, the leader knows it was still authoritative as of
read_index. - When
last_applied >= read_indexon the leader, the closure runs.
Leader-lease fast path (§9)
With Config::lease_duration_ticks > 0, the leader may skip the heartbeat round. If it has received a majority AE ack within the last lease_duration_ticks, no other leader can have been elected during that window (§9 proof), so the current commit_index is still authoritative. read_linearizable returns as soon as last_applied >= commit_index.
Safety constraint: lease_duration_ticks < election_timeout_min_ticks - heartbeat_interval_ticks. Config::validate rejects violations. Lease off (0, the default) always uses the classic protocol.
Errors
NotLeader { leader_hint }— retry againstleader_hint.NotReady— the leader hasn’t committed an entry in its current term yet (the §5.4.2 no-op takes a round to commit after election). Retry in a moment.SteppedDown— the leader lost its role before the read could serve.
Why a closure
The state machine lives on a dedicated apply task. The closure is shipped to that task and runs in FIFO order with applies, so the read observes post-apply state, not a partially-applied view.
Membership
A yggr cluster has two classes of members:
- Voters count toward quorum and can campaign. The majority of voters must ack a commit for it to advance.
- Learners receive replication but stay out of quorum and never campaign. See Learners for the dedicated chapter.
Self is always a member: either implicitly a voter (the default) or a learner (once AddLearner(self) commits locally).
Operations
All membership changes go through node.admin():
#![allow(unused)]
fn main() {
let admin = node.admin();
admin.add_peer(id).await?; // add a voter
admin.add_learner(id).await?; // add a non-voting member
admin.promote_learner(id).await?; // learner → voter
admin.remove_peer(id).await?; // remove either flavour
}
Each call returns once the configuration change has committed.
Single-server discipline (§4.3)
yggr implements single-server changes: exactly one membership delta per log entry, and at most one uncommitted change in flight. This keeps the old and new majorities overlapping and rules out the split-quorum window that joint-consensus would need to handle.
Consequence: admin.add_peer and admin.remove_peer can’t run concurrently. The second call returns an error (Busy / ChangeInProgress) until the first commits.
Recommended workflow for growing a cluster
state operator action cluster effect
───── ─────────────── ──────────────
N voters add_learner(new) new node gets replication, no quorum
(wait for catch-up) monitor via node_metrics()
promote_learner(new) N+1 voters, quorum adjusts
Adding a voter in one step (add_peer) is supported but operationally awkward: quorum math shifts before the node has caught up, and a cold node can slow commit until it catches up. Prefer the learner path.
Recommended workflow for shrinking a cluster
state operator action cluster effect
───── ─────────────── ──────────────
N voters transfer_leadership optional, if leader is the one leaving
remove_peer(going) N-1 voters, quorum adjusts
If the node being removed is the leader, it steps down once RemovePeer(self) commits. Either transfer leadership first (cleaner) or tolerate the brief election gap.
Snapshot-carried membership
Snapshots store the voter and learner sets as of last_included_index. A node that recovers from a snapshot sees the correct membership without replaying the full history. An InstallSnapshot from the leader similarly restores both sets on the follower.
This is why snapshot metadata is versioned on disk — the 0.1 format carries voters only for backward compatibility with pre-learner snapshots; 0.2 carries both.
Observability
node.status() surfaces the current membership view:
#![allow(unused)]
fn main() {
let status = node.status().await?;
println!("{} voters, {} learners", status.membership.voters.len(), status.membership.learners.len());
}
node.node_metrics() exposes per-peer replication progress (future: a followers map with matched, next_index, is_learner). Use it to decide when a learner is ready for promotion.
Errors
See Errors. Membership-specific variants include ChangeInProgress (another CC uncommitted), UnknownNode, AlreadyVoter, AlreadyLearner, NotLearner.
Learners
A learner is a cluster member that receives replication but doesn’t count toward quorum and never campaigns. Raft thesis §4.2.1.
Learners exist so that adding a fresh node to a cluster doesn’t destabilise it. Without learners, the new node joins the voting set immediately — quorum math shifts before the node has caught up, and the operator has no handle to delay the shift. With learners, a node can:
- receive
AppendEntriesandInstallSnapshot, - catch up on its own schedule,
- stay out of quorum and elections while it does,
- be promoted only when it’s ready.
Lifecycle
(unknown) --add_learner--> learner --promote_learner--> voter
learner <--remove_peer----- (removed)
voter <--remove_peer----- (removed)
The three operations are all single-server membership changes (§4.3): at most one in flight at a time. The leader rejects a new configuration proposal if any previous one is still uncommitted.
Operations
All three go through Node::admin():
#![allow(unused)]
fn main() {
let admin = node.admin();
// Add a new node as a non-voting learner. Begins receiving replication
// immediately.
admin.add_learner(NodeId::new(4).unwrap()).await?;
// Wait for the learner to catch up (inspect via status/metrics below),
// then promote.
admin.promote_learner(NodeId::new(4).unwrap()).await?;
// Remove either a learner or a voter.
admin.remove_peer(NodeId::new(4).unwrap()).await?;
}
Each call returns once the configuration change has committed. It does not wait for catch-up.
Checking readiness before promotion
Promotion is always safe for correctness — the engine enforces Leader Completeness — but promoting a badly-lagging learner reduces cluster availability: every subsequent commit now needs an ack from the lagging node. Before promoting, check that the learner is caught up.
The runtime exposes per-peer replication progress via metrics:
#![allow(unused)]
fn main() {
let m = node.node_metrics().await?;
// m.replication.followers is a BTreeMap<NodeId, FollowerProgress>.
// Each FollowerProgress has `matched`, `next_index`, `is_learner`.
}
A simple rule of thumb: promote when leader_last_log_index - followers[learner].matched <= 1. Document-level advice, not a library invariant.
What learners cannot do
- Grant votes. A learner that receives a
RequestVoteorRequestPreVoteresponds with a rejection. - Campaign. A learner whose election timer fires resets the timer and does nothing else.
- Count toward quorum. Leader commit-advance uses
majority_index_over_voters, which excludes learners. A learner’s ack cannot unblock a commit.
These three rules are enforced in the engine; the runtime just forwards configuration changes.
Constraints
- One membership change in flight at a time. If you call
add_learnerand immediately callpromote_learneron the same node, the second call will returnChangeInProgress(or equivalentProposeError) until theadd_learnerentry commits. - Learners still persist state. Like voters, they carry a durable log and hard state. A crashed learner recovers by replaying the log and resuming catch-up.
- Snapshots carry full membership. An
InstallSnapshotrestores both voters and learners, so a recovered node correctly identifies its own role.
Anti-patterns
- Auto-promotion on “caught up enough”. The runtime does not auto-promote. Readiness signals are surfaced; the decision is the operator’s. A hidden policy would make operations unpredictable.
- Promoting every new node. If a read-only replica is all you need (e.g. a follower that just drives a secondary index), leave it as a learner. Quorum remains tight.
- Treating learners as read-only for linearizable reads. Learners see committed entries but do not serve
read_linearizable— that path requires leadership. Use stale reads on a learner if eventual consistency is acceptable.
Leadership transfer
Node::transfer_leadership_to(peer) asks the current leader to hand leadership to peer. Uses:
- Graceful shutdown of the current leader.
- Pinning the leader to a specific node (e.g. for locality).
- Maintenance windows.
#![allow(unused)]
fn main() {
node.transfer_leadership_to(nid(2)).await?;
}
Protocol
The leader replicates to peer until matchIndex[peer] == last_log_index, then sends TimeoutNow. The target immediately starts an election at current_term + 1. Followers with a known leader redirect; leaderless followers and candidates drop.
The call returns once the local driver has accepted the request and dispatched the engine actions. It does not wait for the new leader to be elected. The cluster may briefly have no leader if the target fails to win.
Errors
NotLeader { leader_hint }— we are a follower.NoLeader— no leader currently known.InvalidTarget { target }—targetis self or not a member of the current peer set.
Snapshots
Snapshots (§7) let a node discard the prefix of its log by capturing the state machine’s state and the log position it reflects. The library handles the protocol end-to-end. Fast followers catch up via AppendEntries. Followers whose nextIndex has fallen below the snapshot floor catch up via InstallSnapshot.
Two triggers
- Host-initiated. Your application calls
Event::SnapshotTaken { last_included_index, bytes }. In the default runtime this happens automatically viaAction::SnapshotHintwhen enough entries have applied past the current floor. - Follower catch-up. A leader whose peer’s
nextIndex <= snapshot_floorsendsInstallSnapshotinstead ofAppendEntries. Chunks areConfig::snapshot_chunk_size_bytesbytes each.
Auto-compaction hints
The engine emits Action::SnapshotHint { last_included_index } in two cases:
- Applied-entries band — every time the applied-entries count past the current floor crosses
Config::snapshot_hint_threshold_entries. Set to0to disable. - Live-log guardrail — whenever entries above the floor exceed
Config::max_log_entries(disk-space backstop for a stuck apply path). Set to0to disable.
The default runtime reacts by calling StateMachine::snapshot() on its own task and feeding the bytes back via Event::SnapshotTaken. The driver does not block: status(), ticks, heartbeats, and inbound RPCs stay responsive even when snapshot() takes seconds to run. Overlapping hints during an in-flight snapshot are coalesced; the engine re-hints after the next threshold crossing.
Fallibility
StateMachine::snapshot returns Result<Vec<u8>, SnapshotError>. Return Err for transient failures (ENOSPC, backpressure); the runtime logs and drops this attempt. The engine re-hints when the next threshold crossing fires, so the caller gets automatic retry without any bookkeeping.
Compression
The library treats snapshot bytes as opaque. Compress inside StateMachine::snapshot and decompress inside restore:
#![allow(unused)]
fn main() {
fn snapshot(&self) -> Result<Vec<u8>, SnapshotError> {
zstd::encode_all(&self.serialized[..], 3)
.map_err(|e| SnapshotError::new(e.to_string()))
}
fn restore(&mut self, bytes: Vec<u8>) {
let decoded = zstd::decode_all(&bytes[..]).unwrap();
self.deserialize(&decoded);
}
}
Membership and snapshots
A snapshot carries the cluster membership as of last_included_index. Without this, committed AddPeer / RemovePeer entries that got snapshotted would be lost on restart, and the node would compute the wrong majority. The runtime stores the peer list alongside the snapshot bytes via StoredSnapshot::peers.
Observability
Two independent surfaces: pull-model metrics via Node::metrics() for dashboards and health checks, structured tracing events for correlated per-event detail.
Metrics
node.metrics().await returns a snapshot of counters and gauges for the local node. Counters only move forward; gauges reflect current state. Pulling is cheap — no engine work, just a read of internal counters.
#![allow(unused)]
fn main() {
let m = node.metrics().await?;
println!(
"term={} role={} commit={} log_len={}",
m.current_term.get(),
m.role_code,
m.commit_index.get(),
m.log_len,
);
println!(
"elections={} won={} ae_sent={} committed={} applied={}",
m.elections_started, m.leader_elections_won,
m.append_entries_sent, m.entries_committed, m.entries_applied,
);
}
Counter families:
- Elections —
elections_started,pre_votes_granted,pre_votes_denied,votes_granted,votes_denied,leader_elections_won,higher_term_stepdowns. - Replication —
append_entries_sent,append_entries_received,append_entries_rejected,entries_appended,entries_committed,entries_applied. - Snapshots —
snapshots_sent(per chunk),snapshots_installed(per completed snapshot). - Reads —
read_index_started,reads_completed,reads_failed.
Gauges:
current_term,commit_index,last_applied,log_len,role_code(0=follower, 1=precandidate, 2=candidate, 3=leader).
EngineMetrics is #[non_exhaustive]; new fields are additive. See rustdoc for the exact set.
Tracing
The engine and runtime emit structured tracing events and spans with stable targets and field names.
Targets
| Target | What emits |
|---|---|
yggr::engine | Role changes, term advances, vote decisions, AppendEntries accept/reject, commit advances. |
yggr::node | Driver-level events: apply failures, transport errors, shutdown. |
Fields
node_id— the emitting nodeterm/from_term/to_term— term transitionsrole—"follower" | "candidate" | "leader"decision— on vote handling,"granted" | "rejected"
OpenTelemetry
The library doesn’t depend on opentelemetry directly. The OTel crates churn fast, and pinning them in a library creates version conflicts for users. Wire your subscriber in your service’s main:
#![allow(unused)]
fn main() {
use tracing_subscriber::prelude::*;
let otlp = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(opentelemetry::runtime::Tokio)?;
tracing_subscriber::registry()
.with(tracing_opentelemetry::layer().with_tracer(otlp))
.with(tracing_subscriber::fmt::layer())
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();
}
yggr::engine spans and events flow to the collector.
Log filters
Quick-start RUST_LOG values:
yggr=debug— everything yggr emits at debug or aboveyggr::engine=info— just protocol decisionsyggr::node=debug,yggr::engine=info— runtime detail, protocol overview
Errors
Every fallible runtime method returns a typed error. The types are narrow on purpose — each variant maps to one caller action.
Write / propose errors
#![allow(unused)]
fn main() {
pub enum ProposeError {
NotLeader { leader_hint: NodeId },
NoLeader,
Busy,
Shutdown,
DriverDead,
Fatal,
}
}
| Variant | Meaning | What to do |
|---|---|---|
NotLeader | A follower that knows who the leader is. | Retry against leader_hint. |
NoLeader | Follower without a current leader, or pre-candidate / candidate. | Back off, retry. |
Busy | max_pending_proposals reached. | Back off, retry. |
Shutdown | The runtime is stopping. | Stop calling. |
DriverDead | Driver task exited unexpectedly. | Fatal, restart process. |
Fatal | Unrecoverable storage / transport fault. | Fatal, restart process. |
Read errors
#![allow(unused)]
fn main() {
pub enum ReadError {
NotLeader { leader_hint: NodeId },
NoLeader,
NotReady,
SteppedDown,
Shutdown,
DriverDead,
Fatal,
}
}
Beyond the write-error shape, reads add:
NotReady— the leader hasn’t committed an entry in its current term (§5.4.2 no-op hasn’t applied yet). Retry in a few ms.SteppedDown— leadership lost between when the read was queued and when it would have served. Retry on the new leader.
Transfer leadership errors
#![allow(unused)]
fn main() {
pub enum TransferLeadershipError {
NotLeader { leader_hint: NodeId },
NoLeader,
InvalidTarget,
Shutdown,
DriverDead,
Fatal,
}
}
InvalidTarget — the target isn’t a voter, or is self, or doesn’t exist. Non-retriable without changing the target.
Status / metrics errors
Status and metrics are informational; they shouldn’t get mixed up with write-oriented error variants. Future: a narrower StatusError { Shutdown | DriverDead | Fatal }.
Start-up errors
#![allow(unused)]
fn main() {
pub enum NodeStartError<E> {
Config(ConfigError),
Storage(E),
}
}
ConfigError is an enum of pre-flight validation failures (invalid timeout range, peer contains self, lease too large, etc.). Storage(E) is whatever error type your Storage impl surfaces — typically io::Error for DiskStorage.
Membership errors (planned)
Once the admin handle exposes richer semantics, membership calls will return:
#![allow(unused)]
fn main() {
pub enum MembershipError {
NotLeader { leader_hint: NodeId },
NoLeader,
Busy,
ChangeInProgress,
UnknownNode(NodeId),
AlreadyVoter(NodeId),
AlreadyLearner(NodeId),
NotLearner(NodeId),
InvalidTargetMembership,
Shutdown,
DriverDead,
Fatal,
}
}
Today add_peer / remove_peer / add_learner / promote_learner return ProposeError. The richer enum arrives when membership becomes a first-class surface rather than a thin forward over propose.
How to retry
The general pattern:
#![allow(unused)]
fn main() {
loop {
match node.write(cmd.clone()).await {
Ok(response) => return Ok(response),
Err(ProposeError::NotLeader { leader_hint }) => {
// Redirect to leader_hint and try there (your transport-level concern).
return Err("redirect");
}
Err(ProposeError::NoLeader) | Err(ProposeError::Busy) => {
tokio::time::sleep(Duration::from_millis(50)).await;
// and retry
}
Err(e) => return Err(e), // Shutdown / DriverDead / Fatal are terminal
}
}
}
Never retry indefinitely without a ceiling. If Busy persists, the system is backpressured and retrying harder makes it worse.
Logging
Error variants are #[non_exhaustive], so downstream matches should always include a _ => ... arm to stay forward-compatible with new variants.
Operations
Running a yggr cluster in anger: bring-up, rolling upgrades, graceful shutdown, failure recovery.
Bring-up
See Bootstrap modes for the full decision tree. Short version:
- First boot of a new cluster. Every node starts with
Bootstrap::NewCluster { initial_peers }. Exactly one bootstrap per cluster lifetime. - Subsequent boots. Always
Bootstrap::Recover(the default). The node reloads its persisted state and catches up via replication.
Do not re-bootstrap a cluster that already has state. It creates a second history at term 0 and diverges from every other node.
Rolling upgrades
A rolling upgrade replaces binaries one node at a time without downtime. The recipe:
- Pick a node to upgrade (ideally a follower; use
transfer_leadershipto move leadership away if needed). - Drain: call
node.shutdown().await. The driver flushes in-flight proposals and stops cleanly. - Swap the binary. On-disk state (log segments, snapshots, hard state) is compatible across patch versions.
- Start the new binary with the same
ConfigandBootstrap::Recover. - Wait until
node.status().membership.votersmatches every peer andstatus.leaderis stable. - Repeat for each node.
Always upgrade voters one at a time. Upgrading two at once can leave the cluster without a quorum of voters on the new version.
Graceful shutdown
#![allow(unused)]
fn main() {
node.shutdown().await?;
}
This:
- rejects new
write()calls withShutdown, - drains proposals already accepted into the driver,
- stops the transport,
- joins the tick / apply / driver tasks,
- returns when everything has stopped.
If the caller holds multiple clones of Node, only the call on the last clone that hasn’t been cloned further actually runs the drain — shutdown(self) consumes its receiver. Other clones get Shutdown or DriverDead from subsequent calls.
Triggering a snapshot
Snapshots are automatic when Config::snapshot_hint_threshold_entries or max_log_entries fires. To force one:
#![allow(unused)]
fn main() {
// Planned: admin.trigger_snapshot().await?;
}
Today the engine’s hint is observed by the driver; there’s no public “snapshot now” button. If you need one, either lower the threshold or enqueue enough writes to trip it.
Handling leader loss
Expected during normal operation: election timeouts, transient partitions, node restarts. The runtime handles it — clients see NotLeader or NoLeader briefly, retry logic kicks in, traffic resumes on the new leader.
If a loss persists more than an election-timeout cycle, check:
status.roleon each node — is anyone a candidate or pre-candidate stuck?status.current_term— is the term diverging across nodes? (Unusual; expect it to converge upward.)- transport connectivity between voters — a partition between any two voters can deadlock election if the partition touches the majority.
Disk-space management
Two knobs on Config:
snapshot_hint_threshold_entries— compact when applied-entries past the floor crosses this. Default 1024.max_log_entries— compact when the live log (entries above the floor) crosses this, regardless of apply progress. Default 0 (off). Set it if you see slow or stuck apply producing unbounded log growth.
DiskStorage stores one snapshot.bin + snapshot_meta.bin at the data-dir root, plus segmented log files in log/. Crash-safe: atomic-rename writes for the snapshot, append-only segments for the log. .tmp files left by a crash mid-rename are swept on DiskStorage::open.
Capacity planning
Per-node disk at steady state: snapshot bytes + live log bytes + overhead. Live log is bounded by snapshot_hint_threshold_entries × average-entry-size plus in-flight replication. The snapshot is whatever your StateMachine::snapshot() produces.
Per-node memory: engine state (small), applied-but-not-yet-responded proposals (bounded by max_pending_proposals), apply queue (bounded by max_pending_applies), plus your state machine.
Per-node CPU: dominated by the state machine’s apply(). The engine itself is cheap.
Observability
node.status()— point-in-time view of role, term, commit, last-applied, leader, membership.node.metrics()— raw engine counters (elections, AEs, commits, reads, snapshots).node.node_metrics()— runtime-facing wrapper: engine metrics plus membership and status fields.tracing— structured events under targetsyggr::engineandyggr::node. See Observability.
Wire these into your monitoring system at cluster bring-up. Cross-cluster visibility into current_term, commit_index, and leader catches most operational issues before they become outages.
Backups
A snapshot file plus the live log segments at a point in time is a complete backup of one node. For cluster backups:
- Take a snapshot on each node (wait for
snapshots_installedto tick). - Copy
snapshot.bin,snapshot_meta.bin, andlog/to durable off-cluster storage. - Include the node’s
Config(peers, node_id) alongside — recovery needs it.
A single node’s backup is sufficient to rebuild the cluster: restore that node, re-bootstrap the others via add_learner + snapshot install, promote.
The pure engine
yggr-core is the Raft protocol. One type, one method:
#![allow(unused)]
fn main() {
pub struct Engine<C> { /* ... */ }
impl<C: Clone> Engine<C> {
pub fn step(&mut self, event: Event<C>) -> Vec<Action<C>>;
}
}
Every forward motion is an Event:
Tick— abstract time advanced one stepIncoming(msg)— a peer sent us an RPCClientProposal(cmd)— application wants to replicate somethingClientProposalBatch(cmds)— same, many at onceProposeConfigChange(change)— §4.3 membership changeTransferLeadership { target }— initiate leadership transferProposeRead { id }— §8 linearizable readSnapshotTaken { last_included_index, bytes }— host cut a snapshot
Every effect is an Action:
PersistHardState,PersistLogEntries,PersistSnapshotSend { to, message }Apply(entries)— commit to the state machineApplySnapshot { bytes }— restore the state machineRedirect { leader_hint }— we are not the leaderReadReady { id }/ReadFailed { id, reason }SnapshotHint { last_included_index }— advisory compaction
No I/O
The engine does not touch a socket, a file, or the clock. The host (either yggr or your own code) translates Actions into I/O.
No async
step is synchronous. It returns Vec<Action<C>>. The host decides how to dispatch.
Testable
Because the engine has no I/O, it runs at memory speed under the deterministic chaos harness in yggr-sim. Many seeds, thousands of steps each, in a second.
Events and actions
See the rustdoc for Event and Action for the full definitions.
Action ordering
step() returns Vec<Action<C>> in causal order. Every action that must reach stable storage before any subsequent network send appears earlier in the vector. Hosts must:
- Process actions in order.
- Flush
PersistHardState/PersistLogEntries/PersistSnapshotto disk before performing anySendthat follows them.
This is Figure 2’s “respond to RPCs only after updating stable storage”. The engine enforces it on the host side of the contract.
Actions that don’t need fsync
Send { .. }— best-effort. The engine retries on its own cadence.Apply(entries)— feed to the state machine. The engine advanceslast_appliedwhen emitting this; the host must not skip it.ApplySnapshot { bytes }— restore the state machine.Redirect { leader_hint }— fail the user’s pending reply withNotLeader.SnapshotHint { .. }— advisory. Ignore, debounce, or act.ReadReady { id }— the read with this id is now safe to serve.ReadFailed { id, reason }— surface the reason to the caller.
Writing a custom host
The default yggr runtime assumes tokio, TCP, and disk. If those don’t fit — you already have an async runtime, you want to replicate over QUIC, or you need to persist into an existing storage engine — you can drive yggr-core yourself.
A minimal host loop:
#![allow(unused)]
fn main() {
use yggr_core::{Engine, Event, Action};
let mut engine: Engine<MyCmd> = Engine::new(my_id, peers, env, heartbeat_ticks);
loop {
let event = next_event().await?; // your ticker, transport, app
let actions = engine.step(event);
for action in actions {
match action {
Action::PersistHardState { current_term, voted_for } => {
my_storage.persist_hard_state(current_term, voted_for).await?;
}
Action::PersistLogEntries(entries) => {
my_storage.append_log(entries).await?;
}
Action::PersistSnapshot { bytes, .. } => {
my_storage.persist_snapshot(bytes).await?;
}
Action::Send { to, message } => {
my_transport.send(to, message).await?;
}
Action::Apply(entries) => {
for entry in entries { my_sm.apply(entry).await; }
}
Action::ApplySnapshot { bytes } => {
my_sm.restore(bytes).await;
}
_ => {}
}
}
}
}
Rules
- Respect action order. See Events and actions. Fsync
Persist*before any subsequentSend. - Feed
Tickat a steady cadence. The engine’s election and heartbeat timers are tick-based. The default runtime uses 50ms per tick. - Hydrate the engine on restart. Read persisted hard state, snapshot (if any), and post-snapshot log, then call
Engine::recover_from(RecoveredHardState { .. })before feeding any real events.
yggr/src/node.rs and yggr-sim/src/harness.rs are two working reference hosts.
The sim harness
yggr-sim drives yggr-core under adversarial schedules and checks safety after every step. Raft’s subtle bugs live at the interleaving between events, and unit tests don’t cover an N-node state space.
What the sim can do
- Drive
Tick,ClientProposal, and RPC delivery across N nodes. - Drop messages between any pair of nodes, probabilistically.
- Reorder message delivery.
- Partition the cluster (arbitrary subsets on each side).
- Crash a node and recover it from its persisted state.
- Flush only a prefix of the pending-write queue (mid-fsync crash).
- Do all of the above under a single seed, so failures shrink and reproduce.
Running it
Sim tests live under yggr-sim/src/tests/:
smoke.rs— happy-path sanity.chaos.rs— proptests with drops, reorder, partitions, crashes, partial fsync.membership.rs— add/remove peers.snapshot.rs— install-snapshot catch-up.proptests.rs— happy-policy proptests asserting liveness (leader elected, majority commits).
cargo nextest run -p yggr-sim
Budgets
The chaos proptests default to 128 cases × 1500 steps for 3- and 5-node clusters, 128 cases × 2000 steps for 7-node. Any seed the shrinker finds is persisted in yggr-sim/proptest-regressions/ and replayed on every future run.
See Safety invariants for what’s checked.
Safety invariants
The sim asserts these after every step. A violation fails the run immediately and prints the schedule that produced it.
Election Safety (§5.2)
At most one node can be Leader at a given term across the run. Implemented as a cumulative {term -> leader_id} map; a second distinct leader in the same term panics.
Log Matching (§5.3)
For any two nodes and any index in both logs, if the entries have the same term, all prior entries are identical. Checked pairwise over every snapshot of node state.
Leader Completeness (§5.4)
Every committed entry appears in the logs of every subsequent leader for higher terms. Tracked via (term, last_committed_index) pairs; no later leader may replace them.
State Machine Safety (§5.4.3)
Two nodes applying the same index must apply the same command. The TrackedCounter harness records (node, index, cmd) tuples and rejects mismatches.
Persist-before-Send
Any Send that references a term or log entry must have been preceded by a PersistHardState / PersistLogEntries covering that reference. check_send_ordering enforces this.
Single in-flight ConfigChange (§4.3)
A leader may have at most one uncommitted ConfigChange in its log at any time. Checked every step.
Snapshot within commit
A snapshot’s last_included_index must be <= commit_index at the time of persist. Host-side protocol compliance.
All of these hold under the chaos policy (drops, reorder, partitions, crashes, partial fsync). Liveness — “a leader eventually wins” or “a proposal eventually commits” — is not asserted under chaos; the scheduler is allowed to permanently partition. It gets its own proptest under the happy policy.
Crate layout
| Crate | Purpose |
|---|---|
yggr-core | The engine. Engine, Event, Action, wire types, protobuf mapping. |
yggr-sim | Sim harness. Cluster, Network, SafetyChecker, chaos proptests. |
yggr | Runtime. Node, Config, DiskStorage, TcpTransport, StateMachine trait. |
yggr-examples | Demos. kv three-node replicated KV server. |
Dependency graph
yggr-examples → yggr → yggr-core
↑
yggr-sim
yggr-core does not depend on yggr, yggr-sim, or any async runtime. yggr-sim pulls in yggr-core only. yggr adds tokio and prost on top of yggr-core.
Why the split
Two reasons.
- Testability. The sim relies on
yggr-corebeing free of tokio, real sockets, and the filesystem. Otherwise we couldn’t run thousands of deterministic chaos steps per second. - Embeddability. Users who want Raft inside something that’s already async take
yggr-coreand write a small host. See Writing a custom host.
Design decisions
Engine as pure state machine
The engine has no I/O. Every effect is an Action the host dispatches. This makes the engine:
- Deterministic. Same inputs, same outputs.
- Fast to test. The sim runs thousands of chaos steps per second.
- Host-agnostic. Write your own host for gRPC, QUIC, or in-process testing.
A single step() method
Every event becomes an Event, every effect becomes an Action. No handle_request_vote, handle_append_entries, etc. That keeps the public surface small and the causal ordering of Actions obvious.
Async apply as a separate task
User apply() calls can be slow. If they ran on the driver task, heartbeats would stall. Instead, a bounded mpsc channel decouples commit from apply; the state machine lives on its own task. Backpressure is natural: if apply falls behind, the channel fills, the driver blocks on send, and replication slows. That’s the correct behavior when the state machine is the bottleneck.
Snapshots opaque through the stack
The library never looks inside a snapshot. Compress, encrypt, or hash-chain inside StateMachine::snapshot; undo inside restore. The engine, disk, and wire format all treat the bytes as opaque. Keeps the library dependency-free in that axis and gives users the hooks they want.
ReadIndex via a closure
Node::read_linearizable takes an FnOnce(&S) -> R that ships to the apply task and runs in FIFO order with applies. The read observes post-apply state, not mid-apply, and the StateMachine trait doesn’t have to grow a read method.
Causal action ordering
Every Action that must reach stable storage before a subsequent Send appears earlier in the vector. That’s Figure 2’s “respond to RPCs only after updating stable storage”, enforced at the contract level so hosts can’t accidentally violate it.
No OpenTelemetry dependency
The library emits tracing spans and events with stable targets and field names. OTel is bolted on by the user’s main. See Observability. OTel crates churn too fast to pin in a library.
§4.3 single-server over Joint Consensus
Membership changes are §4.3 single-server. Simpler than Joint Consensus, well-understood edge cases, fits the one-at-a-time add_peer / remove_peer API. Joint Consensus may land post-0.1 if operators need faster reconfiguration.