propeller

package
v0.16.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package propeller implements an erasure-coding based message broadcast protocol for Byzantine fault-tolerant consensus. A publisher splits a message into shards, erasure-encodes them via Reed-Solomon, and distributes one shard per peer. Any peer can reconstruct the full message from a threshold number of shards, then forwards its own assigned shard to all others.

The protocol tolerates up to f = floor((N-1)/3) Byzantine faulty nodes.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PadMessage

func PadMessage(msg []byte, numDataShards int) []byte

PadMessage prepends an unsigned varint-encoded length to the message and pads the result with zeros so the total length is divisible by 2*numDataShards.

The varint prefix lets the receiver recover the exact original message length after reconstruction. The zero-padding ensures the padded message can be evenly split into numDataShards pieces, which is required by Reed-Solomon encoding (all shards must be equal length).

Layout: [varint(len(msg))] [msg bytes] [zero padding]

func UnpadMessage

func UnpadMessage(padded []byte) ([]byte, error)

UnpadMessage performs the reverse operation to PadMessage: it reads the varint length prefix and extracts the original message bytes, discarding the zero padding. The slice returned uses the input's backing array but re-sliced to start and finish on the original message.

An error is returned if the varint is malformed or the encoded length exceeds the available data.

func VerifyMessageSignature

func VerifyMessageSignature(
	pubKey crypto.PubKey,
	root *MessageRoot,
	committeeID *CommitteeID,
	nonce Nonce,
	signature Signature,
) error

Types

type CommitteeID

type CommitteeID [32]byte

CommitteeID identifies a committee or logical broadcast group. Multiple committees can operate concurrently within the same engine, each with its own peer set.

type Config

type Config struct {
	// StaleMessageTimeout is how long the engine waits for a message to
	// reach the receive threshold before giving up. This prevents memory
	// leaks from partially-received messages that will never complete
	// (e.g., due to a crashed publisher or network partition).
	StaleMessageTimeout time.Duration

	// StreamProtocol is the libp2p protocol identifier used for direct
	// shard transfers between peers.
	StreamProtocol protocol.ID

	// MaxWireMessageSize caps the size of a single serialised PropellerUnit
	// on the wire. Units exceeding this are rejected to prevent memory
	// exhaustion from malicious peers.
	MaxWireMessageSize int
}

Config holds tunable parameters for the propeller engine. Sensible defaults are provided by DefaultConfig().

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns production-ready defaults.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the central orchestrator of the Propeller protocol. It:

  • Manages committee registrations (each committee has its own peer set and scheduler).
  • Process all incoming messages and broadcasts them when expected.
  • Handles broadcast requests from the service layer.
  • Forwards all noteworthy event to the service layer.

func NewEngine

func NewEngine(
	privKey crypto.PrivKey,
	config *Config,
	logger log.StructuredLogger,
) (*Engine, chan<- engineCommand, <-chan Event)

NewEngine creates an engine instance. It returns the engine and the channel to send engineCommands to. Call Run() to start processing.

Parameters:

  • privKey: this node's Ed25519 private key (for signing published messages).
  • config: protocol parameters.
  • log: structured logger.

todo(rdr): Maybe in the future we don't want to expose the command channel and instead hide the interaction behind a public API. :think:

func (*Engine) Broadcast

func (e *Engine) Broadcast(committeeID *CommitteeID, msg []byte) error

func (*Engine) RegisterCommittee

func (e *Engine) RegisterCommittee(
	committeeID *CommitteeID,
	peers []PeerCommittee,

	peersKeys []*StakerID,
) error

func (*Engine) Run

func (e *Engine) Run(ctx context.Context) error

Run starts the engine's main loop until context is cancelled. The loop processes three things concurrently:

  1. Commands from external callers (register, broadcast, handle incoming unit).
  2. Events from message processors (forward to application).
  3. Context cancellation (graceful shutdown).

func (*Engine) UnregisterCommittee

func (e *Engine) UnregisterCommittee(committeeID *CommitteeID)

type Event

type Event interface {
	// contains filtered or unexported methods
}

type EventMessageReceived

type EventMessageReceived struct {
	Publisher peer.ID
	Root      MessageRoot
	Message   []byte
}

EventMessageReceived signals that a message has been fully reconstructed and enough shards have been forwarded to guarantee delivery to all honest nodes. The application can safely process the contained message bytes.

type EventMessageTimeout

type EventMessageTimeout struct {
	Channel   CommitteeID
	Publisher peer.ID
	Root      MessageRoot
}

EventMessageTimeout signals that a message did not reach the receive threshold before the stale message timeout elapsed. The engine cleans up state for this message.

type EventReconstructionFailed

type EventReconstructionFailed struct {
	Root      MessageRoot
	Publisher peer.ID
	Err       error
}

EventReconstructionFailed signals that Reed-Solomon reconstruction or post-reconstruction verification failed. This typically indicates Byzantine behaviour from the publisher (e.g., inconsistent shards).

type EventShardPublishFailed

type EventShardPublishFailed struct {
	Err error
}

EventShardPublishFailed signals that the local node failed to encode or distribute shards when acting as publisher.

type EventShardSendFailed

type EventShardSendFailed struct {
	From peer.ID
	To   peer.ID
	Err  error
}

EventShardSendFailed signals that sending a single shard to a specific peer failed. The engine continues sending to other peers; this is informational for monitoring.

type EventShardValidationFailed

type EventShardValidationFailed struct {
	Sender           peer.ID
	ClaimedRoot      MessageRoot
	ClaimedPublisher peer.ID
	Err              error
}

EventShardValidationFailed signals that an incoming shard was rejected during validation. This may indicate Byzantine behaviour from the sender or publisher.

type MessageRoot

type MessageRoot merkle.Hash

MessageRoot is the SHA-256 Merkle root over all shard leaves. It uniquely identifies a message and is signed by the publisher to bind authenticity.

type Nonce

type Nonce time.Duration

Propeller Unit Nonce

type PeerCommittee

type PeerCommittee struct {
	ID    peer.ID
	Stake Stake
}

todo(rdr): this is a Peer that belongs to a committee and has a stake. I would like to give it a better name

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles all concurrent work on message processing

func NewProcessor

func NewProcessor(localPeer peer.ID, config *Config) (*Processor, <-chan Event)

func (*Processor) ProcessMessage

func (p *Processor) ProcessMessage(
	ctx context.Context,
	unit *Unit,
	sender peer.ID,
	scheduler *Scheduler,
) error

ProcessMessage validates and process the received `unit` non-blockingly. It returns an error if the unit couldn't start processing.

func (*Processor) Run

func (p *Processor) Run(ctx context.Context)

type ReconstructionError

type ReconstructionError struct {
	Reason ReconstructionReason
	Detail string
}

ReconstructionError is returned when message reconstruction fails after collecting enough shards.

func (*ReconstructionError) Error

func (e *ReconstructionError) Error() string

type ReconstructionReason

type ReconstructionReason int

ReconstructionReason enumerates the specific causes of reconstruction failure.

const (
	// ReasonErasureReconstructionFailed means Reed-Solomon decoding failed,
	// likely because too many shards are missing or corrupted.
	ReasonErasureReconstructionFailed ReconstructionReason = iota
	// ReasonMismatchedMessageRoot means the Merkle root computed from the
	// reconstructed shards does not match the claimed root. This indicates
	// Byzantine behaviour from the publisher.
	ReasonMismatchedMessageRoot
	// ReasonUnequalShardLengths means shards have inconsistent lengths,
	// which violates Reed-Solomon's equal-length requirement.
	ReasonUnequalShardLengths
	// ReasonMessagePaddingError means the varint length prefix in the
	// unpadded message is malformed or points beyond the data.
	ReasonMessagePaddingError
)

func (ReconstructionReason) String

func (r ReconstructionReason) String() string

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler represents the tree manager that computes the tree topology on demand for each publisher. It holds a deterministic shard-to-peer mapping for a committee. Given a sorted set of peers and a publisher, it computes which peer is responsible for broadcasting each shard index. The mapping is deterministic so that all nodes agree on the assignment without coordination.

The design relies on the invariant that there are N-1 shards for N peers, and each non-publisher peer gets exactly one shard. The publisher is "skipped" in the sorted peer list when assigning shard indices.

Propeller uses a distributed broadcast approach where: - numDataShards = floor((N-1)/3) where N is total number of nodes - numDataShards represents both max faulty nodes AND number of data shards - numCodingShards = N-1-numDataShards (meaning, the rest) - Message is BUILT when numDataShards are received (can reconstruct) - Message is RECEIVED when 2*numDataShards shards are received (guarantees gossip property) - Each peer broadcasts received shards to all other peers (full mesh)

func NewScheduler

func NewScheduler(
	id peer.ID,
	nodes []PeerCommittee,
) (*Scheduler, error)

NewScheduler creates a schedule from a list of peers. The peers are sorted lexicographically by their string representation to ensure all nodes derive the same ordering regardless of discovery order. Note that `nodes` will be mutated after this function gets called todo(rdr): should we return scheduler by reference or by value?

func (*Scheduler) BroadcastTargets

func (s *Scheduler) BroadcastTargets() []peer.ID

BroadcastTargets returns all peers whom to broadcast to, in shard-index order. The i-th element of the returned slice is the peer responsible for shard i.

func (*Scheduler) BuildThreshold

func (s *Scheduler) BuildThreshold() int

Minimum (inclusive) amount of shards required to build a message

func (*Scheduler) NumCodingShards

func (s *Scheduler) NumCodingShards() int

CodingShards returns the number of parity (coding) shards.

func (*Scheduler) NumDataShards

func (s *Scheduler) NumDataShards() int

DataShards returns the number of data (systematic) shards.

func (*Scheduler) NumTotalShards

func (s *Scheduler) NumTotalShards() int

NumShards returns the total number of shards (data + coding = N-1).

func (*Scheduler) PeerForShardIndex

func (s *Scheduler) PeerForShardIndex(
	publisher peer.ID, shardIndex ShardIndex,
) (peer.ID, error)

PeerForShardIndex returns the peer responsible for broadcasting a given shard index to a given publisher. The mapping skips the publisher in the sorted list:

if shardIndex < publisherIndex: peer = peers[shardIndex]
if shardIndex >= publisherIndex: peer = peers[shardIndex + 1]

Example with peers [A, B, C, D] and publisher C (index 2):

shard 0 -> A, shard 1 -> B, shard 2 -> D

func (*Scheduler) PeerID

func (s *Scheduler) PeerID() peer.ID

PeerID returns the Scheduler Peer ID

func (*Scheduler) Peers

func (s *Scheduler) Peers() []PeerCommittee

Peers return the Scheduler list of nodes

func (*Scheduler) ReceiveThreshold

func (s *Scheduler) ReceiveThreshold() int

Minimum (inclusive) amount of shards required to guarantee a message is received

func (*Scheduler) ShardIndexForPublisher

func (s *Scheduler) ShardIndexForPublisher(
	publisher peer.ID,
) (ShardIndex, error)

ShardIndexForPublisher returns the shard index that shceduler is responsible for broadcasting for a given publisher. This is the inverse of PeerForShard:

if localPeerIndex < publisherIndex: shard = localPeerIndex
if localPeerIndex > publisherIndex: shard = localPeerIndex - 1

Returns an error if Scheduler's peer is the publisher (publishers don't have an assigned shard) or if the publisher is not in the list.

func (*Scheduler) ValidateShardOrigin

func (s *Scheduler) ValidateShardOrigin(
	sender peer.ID,
	publisher peer.ID,
	shardIndex ShardIndex,
) error

ValidateShardOrigin verifies that a shard unit was received from the expected sender. The sender has to be either the publisher for direct shards or a designated broadcaster for the given shard index. todo(rdr): This implementation should probably be part of `UnitValidator`

type Service

type Service any

This would represent the propeller service that glues the whole thing to p2p. Thing is, I've no clue how to do that.

func New

func New(
	host host.Host,
	privKey crypto.PrivKey,
	config *Config,
	logger log.Logger,
) Service

type Shard

type Shard []byte

The actual shard fragment

type ShardData

type ShardData []Shard

Set of shard fragments held by the Propeller Unit

func ConstructMessageFromUnits

func ConstructMessageFromUnits(
	units []*Unit,
	localShardIndex ShardIndex,
	numDataShards int,
	parity int,
) ([]byte, ShardData, merkle.Proof, error)

ConstructMessageFromUnits receives Propeller units, recovers any missing data and returns the fully verified message, together with the corresponding shard data and merkle proof.

func (ShardData) MarshalProto

func (sd ShardData) MarshalProto() []byte

type ShardIndex

type ShardIndex uint32

ShardIndex is the position of a shard within the erasure-coded output. Valid range is [0, N-2] where N is the total number of peers.

type ShardPublishError

type ShardPublishError struct {
	Reason ShardPublishReason
	Detail string
}

todo(rdr): check if we want to do this. I think it is better not, unless necessary ShardPublishError is returned when the local node fails to publish shards.

func (*ShardPublishError) Error

func (e *ShardPublishError) Error() string

type ShardPublishReason

type ShardPublishReason int

ShardPublishReason enumerates the specific causes of publish failure.

const (
	// ReasonLocalPeerNotInChannel means the local peer is not a member
	// of the channel it is trying to broadcast on.
	ReasonLocalPeerNotInChannel ShardPublishReason = iota
	// ReasonInvalidDataSize means the message is too large to encode.
	ReasonInvalidDataSize
	// ReasonSigningFailed means the local private key failed to sign.
	ReasonSigningFailed
	// ReasonEncodingFailed means Reed-Solomon encoding failed.
	ReasonEncodingFailed
	// ReasonNotConnectedToPeer means we have no open connection to a
	// target peer.
	ReasonNotConnectedToPeer
	// ReasonChannelNotRegistered means the channel has not been registered
	// with the engine.
	ReasonChannelNotRegistered
	// ReasonBroadcastFailed means the broadcast operation failed for an
	// unspecified reason.
	ReasonBroadcastFailed
)

func (ShardPublishReason) String

func (r ShardPublishReason) String() string

type ShardValidationError

type ShardValidationError struct {
	Reason ShardValidationReason
	Detail string
}

ShardValidationError is returned when an incoming PropellerUnit fails validation. The Reason field allows programmatic inspection; the Detail field carries human-readable context.

func (*ShardValidationError) Error

func (e *ShardValidationError) Error() string

type ShardValidationReason

type ShardValidationReason int

ShardValidationReason enumerates the specific causes of shard rejection.

const (
	// ReasonSelfSending means a peer sent us a unit claiming to be from us.
	ReasonSelfSending ShardValidationReason = iota
	// ReasonReceivedSelfPublishedShard means we received a shard for a
	// message we published ourselves -- we already have all shards.
	ReasonReceivedSelfPublishedShard
	// ReasonDuplicateShard means we already have a shard at this index
	// for this message.
	ReasonDuplicateShard
	// ReasonUnexpectedSender means the sender is not the peer assigned
	// to broadcast this shard index.
	ReasonUnexpectedSender
	// ReasonSignatureVerificationFailed means the publisher's signature
	// over the Merkle root did not verify.
	ReasonSignatureVerificationFailed
	// ReasonMerkleProofVerificationFailed means the Merkle inclusion
	// proof for this shard is invalid.
	ReasonMerkleProofVerificationFailed
	// ReasonScheduleError means the shard-to-peer mapping lookup failed
	// (e.g., publisher not in the channel's peer set).
	ReasonScheduleError
)

func (ShardValidationReason) String

func (r ShardValidationReason) String() string

type Signature

type Signature []byte

Propeller Unit Signature

func SignMessage

func SignMessage(
	privKey crypto.PrivKey,
	root *MessageRoot,
	committeeID *CommitteeID,
	nonce Nonce,
) (Signature, error)

type Stake

type Stake uint64

type StakerID

type StakerID struct {
	// contains filtered or unexported fields
}

todo(rdr): using String until I find a better type

type Unit

type Unit struct {
	CommitteeID CommitteeID  // Which committee this belongs to
	Publisher   peer.ID      // Original message author
	MessageRoot MessageRoot  // Merkle root binding all shards together
	MerkleProof merkle.Proof // Merkle inclusion proof for this shard
	Signature   Signature    // Publisher's Ed25519 signature over the root
	ShardIndex  ShardIndex   // This shard's position in the erasure-coded output
	ShardData   ShardData    //
	// todo(rdr): calling it nonce because that's what is called on the rust side but
	// time stamp or some other name would be better
	Nonce Nonce // Strictly increasing number, starting from the Unix epoch
}

Unit is the atomic wire message: one erasure-coded shard plus the metadata needed for independent verification. Each unit is self-contained so a receiver can validate it without any other shards.

func CreatePropellerUnits

func CreatePropellerUnits(
	privKey crypto.PrivKey,
	committeeID *CommitteeID,
	nonce Nonce,
	message []byte,
	numDataShards,
	parity int,
) ([]Unit, error)

CreatePropellerUnits creates the PropellerUnits for publishing todo(rdr): maybe call it create message for sharing or somth like that

func UnitFromProto

func UnitFromProto(protoUnit *pb.PropellerUnit) (Unit, error)

func (*Unit) ToProto

func (u *Unit) ToProto() *pb.PropellerUnit

type UnitValidator

type UnitValidator struct {
	// contains filtered or unexported fields
}

Validates all the incoming units / shards given a committee and the publisher

func NewValidator

func NewValidator(publisher peer.ID, scheduler *Scheduler) UnitValidator

func (*UnitValidator) Validate

func (v *UnitValidator) Validate(unit *Unit, sender peer.ID) error

Directories

Path Synopsis
Package merkle implements Merkle tree construction and verification using a SHA-256 tagging scheme.
Package merkle implements Merkle tree construction and verification using a SHA-256 tagging scheme.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL