peer

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package peer implements peer-to-peer LTX frame streaming over luxfi/zap.

Use case: an N-pod ATS where every market has one owner pod and one standby pod. Owner's WAL frames must reach the standby's local disk before the trader sees an ACK (W=2 quorum). S3 PUT in that path is a 50ms ceiling; ZAP same-AZ RTT is ~200µs. This package replaces S3 in the hot path with a peer LTX channel — S3 keeps its job as the cold tier (backup, compaction archive, new-pod bootstrap).

Wire protocol (4 opcodes in the caller-supplied namespace):

0xA020 MsgPeerWALFrame      request:  [8B mkt][4B txid][4B len][N bytes]
0xA021 MsgPeerWALFrameAck   response: [1B code][8B mkt][4B txid]
0xA022 MsgPeerSnapshotReq   request:  [8B mkt][8B from_txid]
0xA023 MsgPeerSnapshotResp  response: [1B code][8B mkt][8B txid][N bytes]

No JSON, no protobuf. Fixed-width little-endian. The 8-byte market_id is a Blake3-keyed digest of the symbol — caller resolves symbol→id at session setup; the peer wire never carries the variable-length symbol.

The ReplicaClient implements replicate.ReplicaClient so the existing replicate.DB / replicate.Replica plumbing routes a peer replica identically to an S3 one. Only the hot path differs: W=2 quorum returns after the peer ACK; the S3 replica continues uploading asynchronously through its own monitor loop.

Index

Constants

View Source
const (
	OpWALFrame     uint16 = 0xA020
	OpWALFrameAck  uint16 = 0xA021
	OpSnapshotReq  uint16 = 0xA022
	OpSnapshotResp uint16 = 0xA023
)

Opcode constants — the canonical 0xA020..0xA023 quadruple for the peer LTX wire. They live in the 0xA0 namespace and MUST be registered against a dispatcher that owns the namespace byte (e.g. ATS's atsrpc.Dispatcher). The peer package is namespace-agnostic — it hands the (req, resp) pairs to whatever Dispatcher implementation the caller injects.

View Source
const (
	ErrCodeOK             uint8 = 0x00
	ErrCodeBadFrame       uint8 = 0x01
	ErrCodeUnknownMarket  uint8 = 0x02
	ErrCodeServerInternal uint8 = 0x03
	ErrCodeNotStandby     uint8 = 0x04
)

Error codes carried in the first byte of every response payload.

View Source
const ReplicaClientType = "peer"

ReplicaClientType is the type string returned by ReplicaClient.Type(). Identifies peer replicas in the replicate.DB Replicas slice.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckError

type AckError struct {
	Code uint8
	TXID ltx.TXID
}

AckError carries the wire-level error code so callers can switch on the failure mode (bad frame, unknown market, standby refusal).

func (*AckError) Error

func (e *AckError) Error() string

type Dispatcher

type Dispatcher interface {
	Register(reqOp, respOp uint16, h Handler) error
}

Dispatcher is the minimal contract the peer Server registers against. ATS's atsrpc.Dispatcher satisfies it. Decoupling here keeps the peer package out of any opcode-namespace-owner package's import graph.

type FrameSink

type FrameSink interface {
	// Apply persists one LTX frame for market id at the given TXID.
	// MUST return only after the bytes are durable on local storage.
	// Returning an error fails the ACK; the owner's Sync will fail and
	// the caller (Journal.Append) sees the error.
	Apply(ctx context.Context, id MarketID, txid ltx.TXID, r io.Reader) error

	// Snapshot returns an io.ReadCloser over the latest LTX snapshot
	// at-or-after fromTXID for the requesting peer. Implementations
	// may return io.EOF immediately if the requesting peer is already
	// caught up. The returned reader is closed by the caller.
	Snapshot(ctx context.Context, id MarketID, fromTXID ltx.TXID) (io.ReadCloser, ltx.TXID, error)
}

FrameSink is the standby-side receiver for LTX frames pushed by the owner. The Server invokes Apply once per accepted frame; Apply MUST fsync to local storage before returning nil — that's the standby half of the W=2 contract. Apply receives an io.Reader over the raw LTX bytes; readers are expected to consume fully.

type Handler

type Handler func(ctx context.Context, peer string, payload []byte) (response []byte, _ error)

Handler mirrors the per-opcode signature each Dispatcher accepts. It MUST return a well-formed response payload — the first byte is the error_code. Returning (nil, err) is a programming error.

type MarketID

type MarketID [8]byte

MarketID is the 8-byte handle that identifies a per-symbol stream on the wire. It is derived once per ReplicaClient and stays constant for the life of the journal. The peer derives the same MarketID from the same symbol — no symbol bytes travel.

func DecodeSnapshotResp

func DecodeSnapshotResp(msg *zap.Message) (uint8, MarketID, uint64, []byte, error)

DecodeSnapshotResp extracts (code, market, txid, body) from a SnapshotResp ZAP message — exported so cold-start callers can use it directly when bootstrapping new replicas.

func MarketIDFromBytes

func MarketIDFromBytes(b []byte) MarketID

MarketIDFromBytes packs 8 raw bytes into a MarketID. Caller is responsible for ensuring the source bytes were derived from a stable keyed hash of the symbol (e.g. blake3 keyed by domain string).

type Node

type Node interface {
	Call(ctx context.Context, peerID string, msg *zap.Message) (*zap.Message, error)
}

Node abstracts the subset of luxfi/zap.Node the peer client needs. Production wiring passes a *zap.Node directly; tests can pass a fake. Keeping this interface narrow avoids dragging the full Node API surface into every call site.

type Option

type Option func(*ReplicaClient)

Option configures a ReplicaClient.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger overrides the default no-op logger.

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout sets the per-call deadline on every Push/Pull. Defaults to 2s. The brief budgets ~1ms p99 same-AZ; 2s is the kill-switch.

type ReplicaClient

type ReplicaClient struct {
	// contains filtered or unexported fields
}

ReplicaClient is the client half of the peer wire. One per (market, owner→standby) pair. It plugs into replicate.DB as a sibling of the S3 ReplicaClient — the DB's quorum policy chooses which one drives the ACK path.

On the wire, every WriteLTXFile call sends one MsgPeerWALFrame and blocks on the MsgPeerWALFrameAck. The standby fsyncs and ACKs in ~200µs same-AZ. There is no batching at this layer; the WAL frame is already the natural batch unit for the matcher.

func NewReplicaClient

func NewReplicaClient(node Node, peerID string, marketID MarketID, opts ...Option) *ReplicaClient

NewReplicaClient constructs a peer client for one (market, standby) pair. node is the local luxfi/zap.Node already wired into the cluster mesh; peerID is the standby's pod ID; marketID is the canonical 8-byte handle.

func (*ReplicaClient) Close

func (c *ReplicaClient) Close() error

Close marks the client unusable; subsequent WriteLTXFile calls return io.ErrClosedPipe.

func (*ReplicaClient) DeleteAll

func (c *ReplicaClient) DeleteAll(ctx context.Context) error

DeleteAll is a no-op for the same reason as DeleteLTXFiles.

func (*ReplicaClient) DeleteLTXFiles

func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error

DeleteLTXFiles is a no-op — frames are ephemeral on the peer wire.

func (*ReplicaClient) Init

func (c *ReplicaClient) Init(ctx context.Context) error

Init is a no-op for the peer client. The ZAP node is started by the process owner; the peer client just rides it.

func (*ReplicaClient) LTXFiles

func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)

LTXFiles returns an empty iterator. The peer is the live replication channel, not the source of truth for backfill — that's S3. Callers listing LTX files for restore go to the S3 replica.

func (*ReplicaClient) MarketID

func (c *ReplicaClient) MarketID() MarketID

MarketID reports the 8-byte market handle this client streams.

func (*ReplicaClient) OpenLTXFile

func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)

OpenLTXFile returns io.ErrUnexpectedEOF. Same rationale as LTXFiles — peer is not a content store. Snapshot-style cold-start of a new standby uses MsgPeerSnapshotReq via the Server, not OpenLTXFile.

func (*ReplicaClient) PeerID

func (c *ReplicaClient) PeerID() string

PeerID reports the standby peer this client targets.

func (*ReplicaClient) SetLogger

func (c *ReplicaClient) SetLogger(logger *slog.Logger)

SetLogger sets the logger used for diagnostics.

func (*ReplicaClient) Type

func (c *ReplicaClient) Type() string

Type returns "peer" — the discriminator used by the replicate.DB Replicas slice + the journal to distinguish this client from S3.

func (*ReplicaClient) WriteLTXFile

func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)

WriteLTXFile pushes one LTX frame to the peer and blocks until the peer ACKs (fsync confirmed). minTXID and maxTXID identify the frame; for L0 frames they are always equal. Returns the ltx.FileInfo the peer reports so the caller can record metadata; the Size field is the byte count actually transferred.

This is the hot path. Errors here fail the trader's PlaceOrder; do not return nil unless the peer has confirmed durability.

Memory: the LTX body buffer is pooled via ltxFramePool. Under sustained NYSE-class load (5K+ orders/sec/market), this drops per-Push allocations from O(frame_size_bytes) to O(1) on the steady-state path.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the standby side. It registers handlers for OpWALFrame and OpSnapshotReq against a Dispatcher; on receive it forwards into the caller's FrameSink. Construct one Server per pod and Register once.

func NewServer

func NewServer(sink FrameSink, opts ...ServerOption) *Server

NewServer constructs the standby-side receiver. sink is the local store the server fsyncs frames into.

func (*Server) Close

func (s *Server) Close() error

Close marks the server unwilling to apply new frames. Subsequent inbound frames receive ErrCodeNotStandby.

func (*Server) Register

func (s *Server) Register(d Dispatcher) error

Register installs OpWALFrame and OpSnapshotReq handlers on the dispatcher. Returns an error on collision — last-write-wins is rejected by the dispatcher contract.

type ServerOption

type ServerOption func(*Server)

ServerOption configures a Server.

func WithServerLogger

func WithServerLogger(l *slog.Logger) ServerOption

WithServerLogger overrides the default no-op logger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL