replication

package
v1.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package replication is the consensus-replication boundary for the Tasks shard manager. Drivers (local, quasar) implement Replicator; the shard manager calls Propose before committing every mutation and installs Subscribe handlers to apply frames received from peers.

The Decision values mirror github.com/luxfi/consensus core/types so the quasar driver can pass them through without reshaping.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRejected = errors.New("replication: frame rejected by quorum")
	ErrTimeout  = errors.New("replication: consensus timeout")
)

ErrRejected and ErrTimeout normalize Replicator failure modes.

Functions

This section is empty.

Types

type Decision

type Decision uint8

Decision is the consensus outcome for a proposed frame.

const (
	DecisionUndecided Decision = iota
	DecisionAccept
	DecisionReject
	DecisionTimeout
)

type Frame

type Frame struct {
	OrgID     string `json:"org"`
	Namespace string `json:"ns"`
	Seq       uint64 `json:"seq"`
	Op        string `json:"op"`  // "put" | "del"
	Key       string `json:"key"` // canonical key (no org prefix)
	Value     []byte `json:"val,omitempty"`
}

Frame is the wire envelope a Shard ships through consensus. The payload is the application-level mutation; ShardKey routes the frame to the right namespace shard on apply.

func (Frame) FrameKey

func (f Frame) FrameKey() []byte

frameKey is exported for /v1/tasks/cluster diagnostics.

type Handler

type Handler func(ctx context.Context, f Frame) error

Handler applies an accepted frame to the local shard.

type LocalReplicator

type LocalReplicator struct {
	// contains filtered or unexported fields
}

LocalReplicator is the single-node passthrough. Every Propose returns DecisionAccept synchronously and fans the frame out to local subscribers — the same code path the quasar driver uses on accept, so callers cannot tell the difference.

func NewLocal

func NewLocal() *LocalReplicator

NewLocal returns a single-node Replicator.

func (*LocalReplicator) Close

func (r *LocalReplicator) Close() error

Close is a no-op; no resources held.

func (*LocalReplicator) Propose

func (r *LocalReplicator) Propose(ctx context.Context, f Frame) (Decision, error)

Propose immediately accepts and dispatches.

func (*LocalReplicator) Stats

func (r *LocalReplicator) Stats() (accepted, rejected uint64)

Stats returns counters used by /v1/tasks/cluster.

func (*LocalReplicator) Subscribe

func (r *LocalReplicator) Subscribe(h Handler)

Subscribe appends an applier; thread-safe with Propose.

type MemoryHub

type MemoryHub struct {
	// contains filtered or unexported fields
}

MemoryHub coordinates a set of MemoryTransport peers.

func NewMemoryHub

func NewMemoryHub() *MemoryHub

NewMemoryHub returns a hub for in-process replicator tests.

func (*MemoryHub) Connect

func (h *MemoryHub) Connect(id string) *MemoryTransport

Connect attaches a node to the hub; the returned Transport participates in broadcasts originating from any other peer.

type MemoryTransport

type MemoryTransport struct {
	// contains filtered or unexported fields
}

MemoryTransport is a deterministic in-process Transport used by tests and by single-node embedded mode. It implements broadcast as a fan-out across peers registered against the same hub.

func (*MemoryTransport) Broadcast

func (t *MemoryTransport) Broadcast(_ context.Context, kind string, payload []byte) error

Broadcast delivers payload to every peer except this one.

func (*MemoryTransport) LocalID

func (t *MemoryTransport) LocalID() string

LocalID returns this node's id.

func (*MemoryTransport) OnReceive

func (t *MemoryTransport) OnReceive(kind string, h func([]byte) error)

OnReceive registers handlers per kind.

func (*MemoryTransport) Validators

func (t *MemoryTransport) Validators() []string

Validators returns every registered peer ID, this node first.

type QuasarConfig

type QuasarConfig struct {
	NodeID       string
	Validators   []string // host:port strings; LocalID must appear here
	Transport    Transport
	ProposeWait  time.Duration // default 2s
	WitnessAfter time.Duration // default 100ms — when to escalate to commit vote
}

QuasarConfig pins the post-quantum engine to a validator set.

type QuasarReplicator

type QuasarReplicator struct {
	// contains filtered or unexported fields
}

QuasarReplicator drives consensus.NewPQ with WAL frames as block payloads. On a single-node validator set the engine accepts every frame after a self-vote; multi-node operation requires Transport to fan votes between peers — the engine treats them as native PQ votes.

func NewQuasar

func NewQuasar(ctx context.Context, cfg QuasarConfig) (*QuasarReplicator, error)

NewQuasar wires consensus.NewPQ to the supplied transport and starts the engine. Returns once the genesis block is in place.

func (*QuasarReplicator) Close

func (r *QuasarReplicator) Close() error

Close stops the engine.

func (*QuasarReplicator) Propose

func (r *QuasarReplicator) Propose(ctx context.Context, f Frame) (Decision, error)

Propose builds a Block whose payload is the JSON-encoded frame, adds it to the engine, broadcasts it to peers, and waits for the engine to mark it accepted. Self-vote happens immediately so single-node configurations terminate without external traffic.

func (*QuasarReplicator) Stats

func (r *QuasarReplicator) Stats() (accepted, rejected, timeouts uint64)

Stats returns counters used by /v1/tasks/cluster.

func (*QuasarReplicator) Subscribe

func (r *QuasarReplicator) Subscribe(h Handler)

Subscribe registers an applier; called both for self-proposed frames (after engine accept) and for frames received from peers.

type Replicator

type Replicator interface {
	// Propose ships frame through consensus and returns the decision.
	Propose(ctx context.Context, frame Frame) (Decision, error)
	// Subscribe registers a handler invoked once per accepted frame.
	Subscribe(h Handler)
	// Close releases driver resources.
	Close() error
}

Replicator is the interface every driver implements. Propose blocks until the cluster decides; Subscribe registers an applier called for every accepted frame including those originated locally (so a single code path persists, regardless of leader).

type Transport

type Transport interface {
	// Broadcast sends payload to every other validator. Implementations
	// SHOULD return promptly; durability is the consensus engine's job.
	Broadcast(ctx context.Context, kind string, payload []byte) error
	// OnReceive registers a handler for inbound traffic; returns nil if
	// the transport is local-only (single node).
	OnReceive(kind string, h func(payload []byte) error)
	// Validators returns the current validator set, leader-first.
	Validators() []string
	// LocalID returns this node's stable id.
	LocalID() string
}

Transport abstracts the cluster wire so tests can swap a memory bus for ZAP without dragging the network layer into unit tests. The quasar driver uses it to broadcast Block payloads and votes; in production Embedded wires this to the per-process zap.Node.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL