Documentation
¶
Overview ¶
Package replication is the consensus-replication boundary for the Tasks shard manager. Drivers (local, quasar) implement Replicator; the shard manager calls Propose before committing every mutation and installs Subscribe handlers to apply frames received from peers.
The Decision values mirror github.com/luxfi/consensus core/types so the quasar driver can pass them through without reshaping.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrRejected = errors.New("replication: frame rejected by quorum") ErrTimeout = errors.New("replication: consensus timeout") )
ErrRejected and ErrTimeout normalize Replicator failure modes.
Functions ¶
This section is empty.
Types ¶
type Frame ¶
type Frame struct {
OrgID string `json:"org"`
Namespace string `json:"ns"`
Seq uint64 `json:"seq"`
Op string `json:"op"` // "put" | "del"
Key string `json:"key"` // canonical key (no org prefix)
Value []byte `json:"val,omitempty"`
}
Frame is the wire envelope a Shard ships through consensus. The payload is the application-level mutation; ShardKey routes the frame to the right namespace shard on apply.
type LocalReplicator ¶
type LocalReplicator struct {
// contains filtered or unexported fields
}
LocalReplicator is the single-node passthrough. Every Propose returns DecisionAccept synchronously and fans the frame out to local subscribers — the same code path the quasar driver uses on accept, so callers cannot tell the difference.
func (*LocalReplicator) Close ¶
func (r *LocalReplicator) Close() error
Close is a no-op; no resources held.
func (*LocalReplicator) Stats ¶
func (r *LocalReplicator) Stats() (accepted, rejected uint64)
Stats returns counters used by /v1/tasks/cluster.
func (*LocalReplicator) Subscribe ¶
func (r *LocalReplicator) Subscribe(h Handler)
Subscribe appends an applier; thread-safe with Propose.
type MemoryHub ¶
type MemoryHub struct {
// contains filtered or unexported fields
}
MemoryHub coordinates a set of MemoryTransport peers.
func NewMemoryHub ¶
func NewMemoryHub() *MemoryHub
NewMemoryHub returns a hub for in-process replicator tests.
func (*MemoryHub) Connect ¶
func (h *MemoryHub) Connect(id string) *MemoryTransport
Connect attaches a node to the hub; the returned Transport participates in broadcasts originating from any other peer.
type MemoryTransport ¶
type MemoryTransport struct {
// contains filtered or unexported fields
}
MemoryTransport is a deterministic in-process Transport used by tests and by single-node embedded mode. It implements broadcast as a fan-out across peers registered against the same hub.
func (*MemoryTransport) LocalID ¶
func (t *MemoryTransport) LocalID() string
LocalID returns this node's id.
func (*MemoryTransport) OnReceive ¶
func (t *MemoryTransport) OnReceive(kind string, h func([]byte) error)
OnReceive registers handlers per kind.
func (*MemoryTransport) Validators ¶
func (t *MemoryTransport) Validators() []string
Validators returns every registered peer ID, this node first.
type QuasarConfig ¶
type QuasarConfig struct {
NodeID string
Validators []string // host:port strings; LocalID must appear here
Transport Transport
ProposeWait time.Duration // default 2s
WitnessAfter time.Duration // default 100ms — when to escalate to commit vote
}
QuasarConfig pins the post-quantum engine to a validator set.
type QuasarReplicator ¶
type QuasarReplicator struct {
// contains filtered or unexported fields
}
QuasarReplicator drives consensus.NewPQ with WAL frames as block payloads. On a single-node validator set the engine accepts every frame after a self-vote; multi-node operation requires Transport to fan votes between peers — the engine treats them as native PQ votes.
func NewQuasar ¶
func NewQuasar(ctx context.Context, cfg QuasarConfig) (*QuasarReplicator, error)
NewQuasar wires consensus.NewPQ to the supplied transport and starts the engine. Returns once the genesis block is in place.
func (*QuasarReplicator) Propose ¶
Propose builds a Block whose payload is the JSON-encoded frame, adds it to the engine, broadcasts it to peers, and waits for the engine to mark it accepted. Self-vote happens immediately so single-node configurations terminate without external traffic.
func (*QuasarReplicator) Stats ¶
func (r *QuasarReplicator) Stats() (accepted, rejected, timeouts uint64)
Stats returns counters used by /v1/tasks/cluster.
func (*QuasarReplicator) Subscribe ¶
func (r *QuasarReplicator) Subscribe(h Handler)
Subscribe registers an applier; called both for self-proposed frames (after engine accept) and for frames received from peers.
type Replicator ¶
type Replicator interface {
// Propose ships frame through consensus and returns the decision.
Propose(ctx context.Context, frame Frame) (Decision, error)
// Subscribe registers a handler invoked once per accepted frame.
Subscribe(h Handler)
// Close releases driver resources.
Close() error
}
Replicator is the interface every driver implements. Propose blocks until the cluster decides; Subscribe registers an applier called for every accepted frame including those originated locally (so a single code path persists, regardless of leader).
type Transport ¶
type Transport interface {
// Broadcast sends payload to every other validator. Implementations
// SHOULD return promptly; durability is the consensus engine's job.
Broadcast(ctx context.Context, kind string, payload []byte) error
// OnReceive registers a handler for inbound traffic; returns nil if
// the transport is local-only (single node).
OnReceive(kind string, h func(payload []byte) error)
// Validators returns the current validator set, leader-first.
Validators() []string
// LocalID returns this node's stable id.
LocalID() string
}
Transport abstracts the cluster wire so tests can swap a memory bus for ZAP without dragging the network layer into unit tests. The quasar driver uses it to broadcast Block payloads and votes; in production Embedded wires this to the per-process zap.Node.