Documentation
¶
Overview ¶
Package peer implements peer-to-peer LTX frame streaming over luxfi/zap.
Use case: an N-pod ATS where every market has one owner pod and one standby pod. Owner's WAL frames must reach the standby's local disk before the trader sees an ACK (W=2 quorum). S3 PUT in that path is a 50ms ceiling; ZAP same-AZ RTT is ~200µs. This package replaces S3 in the hot path with a peer LTX channel — S3 keeps its job as the cold tier (backup, compaction archive, new-pod bootstrap).
Wire protocol (4 opcodes in the caller-supplied namespace):
0xA020 MsgPeerWALFrame request: [8B mkt][4B txid][4B len][N bytes] 0xA021 MsgPeerWALFrameAck response: [1B code][8B mkt][4B txid] 0xA022 MsgPeerSnapshotReq request: [8B mkt][8B from_txid] 0xA023 MsgPeerSnapshotResp response: [1B code][8B mkt][8B txid][N bytes]
No JSON, no protobuf. Fixed-width little-endian. The 8-byte market_id is a Blake3-keyed digest of the symbol — caller resolves symbol→id at session setup; the peer wire never carries the variable-length symbol.
The ReplicaClient implements replicate.ReplicaClient so the existing replicate.DB / replicate.Replica plumbing routes a peer replica identically to an S3 one. Only the hot path differs: W=2 quorum returns after the peer ACK; the S3 replica continues uploading asynchronously through its own monitor loop.
Index ¶
- Constants
- type AckError
- type Dispatcher
- type FrameSink
- type Handler
- type MarketID
- type Node
- type Option
- type ReplicaClient
- func (c *ReplicaClient) Close() error
- func (c *ReplicaClient) DeleteAll(ctx context.Context) error
- func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error
- func (c *ReplicaClient) Init(ctx context.Context) error
- func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
- func (c *ReplicaClient) MarketID() MarketID
- func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
- func (c *ReplicaClient) PeerID() string
- func (c *ReplicaClient) SetLogger(logger *slog.Logger)
- func (c *ReplicaClient) Type() string
- func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)
- type Server
- type ServerOption
Constants ¶
const ( OpWALFrame uint16 = 0xA020 OpWALFrameAck uint16 = 0xA021 OpSnapshotReq uint16 = 0xA022 OpSnapshotResp uint16 = 0xA023 )
Opcode constants — the canonical 0xA020..0xA023 quadruple for the peer LTX wire. They live in the 0xA0 namespace and MUST be registered against a dispatcher that owns the namespace byte (e.g. ATS's atsrpc.Dispatcher). The peer package is namespace-agnostic — it hands the (req, resp) pairs to whatever Dispatcher implementation the caller injects.
const ( ErrCodeOK uint8 = 0x00 ErrCodeBadFrame uint8 = 0x01 ErrCodeUnknownMarket uint8 = 0x02 ErrCodeServerInternal uint8 = 0x03 ErrCodeNotStandby uint8 = 0x04 )
Error codes carried in the first byte of every response payload.
const ReplicaClientType = "peer"
ReplicaClientType is the type string returned by ReplicaClient.Type(). Identifies peer replicas in the replicate.DB Replicas slice.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckError ¶
AckError carries the wire-level error code so callers can switch on the failure mode (bad frame, unknown market, standby refusal).
type Dispatcher ¶
Dispatcher is the minimal contract the peer Server registers against. ATS's atsrpc.Dispatcher satisfies it. Decoupling here keeps the peer package out of any opcode-namespace-owner package's import graph.
type FrameSink ¶
type FrameSink interface {
// Apply persists one LTX frame for market id at the given TXID.
// MUST return only after the bytes are durable on local storage.
// Returning an error fails the ACK; the owner's Sync will fail and
// the caller (Journal.Append) sees the error.
Apply(ctx context.Context, id MarketID, txid ltx.TXID, r io.Reader) error
// Snapshot returns an io.ReadCloser over the latest LTX snapshot
// at-or-after fromTXID for the requesting peer. Implementations
// may return io.EOF immediately if the requesting peer is already
// caught up. The returned reader is closed by the caller.
Snapshot(ctx context.Context, id MarketID, fromTXID ltx.TXID) (io.ReadCloser, ltx.TXID, error)
}
FrameSink is the standby-side receiver for LTX frames pushed by the owner. The Server invokes Apply once per accepted frame; Apply MUST fsync to local storage before returning nil — that's the standby half of the W=2 contract. Apply receives an io.Reader over the raw LTX bytes; readers are expected to consume fully.
type Handler ¶
Handler mirrors the per-opcode signature each Dispatcher accepts. It MUST return a well-formed response payload — the first byte is the error_code. Returning (nil, err) is a programming error.
type MarketID ¶
type MarketID [8]byte
MarketID is the 8-byte handle that identifies a per-symbol stream on the wire. It is derived once per ReplicaClient and stays constant for the life of the journal. The peer derives the same MarketID from the same symbol — no symbol bytes travel.
func DecodeSnapshotResp ¶
DecodeSnapshotResp extracts (code, market, txid, body) from a SnapshotResp ZAP message — exported so cold-start callers can use it directly when bootstrapping new replicas.
func MarketIDFromBytes ¶
MarketIDFromBytes packs 8 raw bytes into a MarketID. Caller is responsible for ensuring the source bytes were derived from a stable keyed hash of the symbol (e.g. blake3 keyed by domain string).
type Node ¶
type Node interface {
Call(ctx context.Context, peerID string, msg *zap.Message) (*zap.Message, error)
}
Node abstracts the subset of luxfi/zap.Node the peer client needs. Production wiring passes a *zap.Node directly; tests can pass a fake. Keeping this interface narrow avoids dragging the full Node API surface into every call site.
type Option ¶
type Option func(*ReplicaClient)
Option configures a ReplicaClient.
func WithLogger ¶
WithLogger overrides the default no-op logger.
func WithTimeout ¶
WithTimeout sets the per-call deadline on every Push/Pull. Defaults to 2s. The brief budgets ~1ms p99 same-AZ; 2s is the kill-switch.
type ReplicaClient ¶
type ReplicaClient struct {
// contains filtered or unexported fields
}
ReplicaClient is the client half of the peer wire. One per (market, owner→standby) pair. It plugs into replicate.DB as a sibling of the S3 ReplicaClient — the DB's quorum policy chooses which one drives the ACK path.
On the wire, every WriteLTXFile call sends one MsgPeerWALFrame and blocks on the MsgPeerWALFrameAck. The standby fsyncs and ACKs in ~200µs same-AZ. There is no batching at this layer; the WAL frame is already the natural batch unit for the matcher.
func NewReplicaClient ¶
func NewReplicaClient(node Node, peerID string, marketID MarketID, opts ...Option) *ReplicaClient
NewReplicaClient constructs a peer client for one (market, standby) pair. node is the local luxfi/zap.Node already wired into the cluster mesh; peerID is the standby's pod ID; marketID is the canonical 8-byte handle.
func (*ReplicaClient) Close ¶
func (c *ReplicaClient) Close() error
Close marks the client unusable; subsequent WriteLTXFile calls return io.ErrClosedPipe.
func (*ReplicaClient) DeleteAll ¶
func (c *ReplicaClient) DeleteAll(ctx context.Context) error
DeleteAll is a no-op for the same reason as DeleteLTXFiles.
func (*ReplicaClient) DeleteLTXFiles ¶
DeleteLTXFiles is a no-op — frames are ephemeral on the peer wire.
func (*ReplicaClient) Init ¶
func (c *ReplicaClient) Init(ctx context.Context) error
Init is a no-op for the peer client. The ZAP node is started by the process owner; the peer client just rides it.
func (*ReplicaClient) LTXFiles ¶
func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
LTXFiles returns an empty iterator. The peer is the live replication channel, not the source of truth for backfill — that's S3. Callers listing LTX files for restore go to the S3 replica.
func (*ReplicaClient) MarketID ¶
func (c *ReplicaClient) MarketID() MarketID
MarketID reports the 8-byte market handle this client streams.
func (*ReplicaClient) OpenLTXFile ¶
func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
OpenLTXFile returns io.ErrUnexpectedEOF. Same rationale as LTXFiles — peer is not a content store. Snapshot-style cold-start of a new standby uses MsgPeerSnapshotReq via the Server, not OpenLTXFile.
func (*ReplicaClient) PeerID ¶
func (c *ReplicaClient) PeerID() string
PeerID reports the standby peer this client targets.
func (*ReplicaClient) SetLogger ¶
func (c *ReplicaClient) SetLogger(logger *slog.Logger)
SetLogger sets the logger used for diagnostics.
func (*ReplicaClient) Type ¶
func (c *ReplicaClient) Type() string
Type returns "peer" — the discriminator used by the replicate.DB Replicas slice + the journal to distinguish this client from S3.
func (*ReplicaClient) WriteLTXFile ¶
func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)
WriteLTXFile pushes one LTX frame to the peer and blocks until the peer ACKs (fsync confirmed). minTXID and maxTXID identify the frame; for L0 frames they are always equal. Returns the ltx.FileInfo the peer reports so the caller can record metadata; the Size field is the byte count actually transferred.
This is the hot path. Errors here fail the trader's PlaceOrder; do not return nil unless the peer has confirmed durability.
Memory: the LTX body buffer is pooled via ltxFramePool. Under sustained NYSE-class load (5K+ orders/sec/market), this drops per-Push allocations from O(frame_size_bytes) to O(1) on the steady-state path.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the standby side. It registers handlers for OpWALFrame and OpSnapshotReq against a Dispatcher; on receive it forwards into the caller's FrameSink. Construct one Server per pod and Register once.
func NewServer ¶
func NewServer(sink FrameSink, opts ...ServerOption) *Server
NewServer constructs the standby-side receiver. sink is the local store the server fsyncs frames into.
func (*Server) Close ¶
Close marks the server unwilling to apply new frames. Subsequent inbound frames receive ErrCodeNotStandby.
func (*Server) Register ¶
func (s *Server) Register(d Dispatcher) error
Register installs OpWALFrame and OpSnapshotReq handlers on the dispatcher. Returns an error on collision — last-write-wins is rejected by the dispatcher contract.
type ServerOption ¶
type ServerOption func(*Server)
ServerOption configures a Server.
func WithServerLogger ¶
func WithServerLogger(l *slog.Logger) ServerOption
WithServerLogger overrides the default no-op logger.