Documentation
¶
Index ¶
- Constants
- func ReadMessage(r io.Reader) (byte, []byte, error)
- func WriteAck(w io.Writer, ack *ReplicateAck) error
- func WriteEntry(w io.Writer, entry *ReplicateEntry) error
- func WriteError(w io.Writer, err *ReplicateError) error
- func WriteMessage(w io.Writer, msgType byte, msg interface{}) error
- func WriteSync(w io.Writer, sync *ReplicateSync) error
- func WriteSyncAck(w io.Writer, ack *ReplicateSyncAck) error
- type IngestHandler
- type IngestHandlerFunc
- type ReaderConnection
- type Receiver
- type ReceiverConfig
- type ReplicateAck
- type ReplicateEntry
- type ReplicateError
- type ReplicateSync
- type ReplicateSyncAck
- type Sender
- func (s *Sender) AcceptReader(conn net.Conn, readerID string, lastKnownSeq uint64) error
- func (s *Sender) CurrentSequence() uint64
- func (s *Sender) ReaderCount() int
- func (s *Sender) RemoveReader(readerID string)
- func (s *Sender) Replicate(entry *ReplicateEntry)
- func (s *Sender) Start(ctx context.Context) error
- func (s *Sender) Stats() map[string]interface{}
- func (s *Sender) Stop() error
- type SenderConfig
- type SimpleIngestHandler
- type WALWriter
Constants ¶
const ( // MsgReplicateEntry is a WAL entry from writer to reader MsgReplicateEntry byte = 0x10 // MsgReplicateAck is an acknowledgment from reader to writer MsgReplicateAck byte = 0x11 // MsgReplicateSync requests current replication position MsgReplicateSync byte = 0x12 // MsgReplicateSyncAck responds with current replication position MsgReplicateSyncAck byte = 0x13 // MsgReplicateError indicates a replication error MsgReplicateError byte = 0x1F )
Message types for WAL replication protocol
const ( ErrCodeSequenceGap = "SEQUENCE_GAP" // Reader is too far behind to resume ErrCodeWALRotated = "WAL_ROTATED" // WAL was rotated, need full resync ErrCodeInvalidMsg = "INVALID_MSG" // Invalid message format ErrCodeNotWriter = "NOT_WRITER" // Connected to non-writer node ErrCodeBufferFull = "BUFFER_FULL" // Replication buffer is full ErrCodeWriteFailed = "WRITE_FAILED" // Failed to write entry ErrCodeApplyFailed = "APPLY_FAILED" // Failed to apply entry )
Error codes for replication failures
const ( // HeaderSize is the size of the message header (length + type) HeaderSize = 5 // MaxMessageSize is the maximum allowed message size (100MB) MaxMessageSize = 100 * 1024 * 1024 )
Variables ¶
This section is empty.
Functions ¶
func ReadMessage ¶
ReadMessage reads a typed message from the reader. Returns the message type and unmarshaled message.
func WriteAck ¶
func WriteAck(w io.Writer, ack *ReplicateAck) error
WriteAck writes a ReplicateAck to the writer.
func WriteEntry ¶
func WriteEntry(w io.Writer, entry *ReplicateEntry) error
WriteEntry writes a ReplicateEntry to the writer.
func WriteError ¶
func WriteError(w io.Writer, err *ReplicateError) error
WriteError writes a ReplicateError to the writer.
func WriteMessage ¶
WriteMessage writes a typed message to the writer. Format: [4-byte length][1-byte type][payload]
func WriteSync ¶
func WriteSync(w io.Writer, sync *ReplicateSync) error
WriteSync writes a ReplicateSync to the writer.
func WriteSyncAck ¶
func WriteSyncAck(w io.Writer, ack *ReplicateSyncAck) error
WriteSyncAck writes a ReplicateSyncAck to the writer.
Types ¶
type IngestHandler ¶
type IngestHandler interface {
// ApplyReplicatedEntry applies a replicated entry to the local buffer
// The entry contains msgpack-encoded records that need to be ingested
ApplyReplicatedEntry(ctx context.Context, payload []byte) error
}
IngestHandler interface for applying entries to the ingest buffer
type IngestHandlerFunc ¶
IngestHandlerFunc is an adapter to allow the use of ordinary functions as IngestHandler.
func (IngestHandlerFunc) ApplyReplicatedEntry ¶
func (f IngestHandlerFunc) ApplyReplicatedEntry(ctx context.Context, payload []byte) error
type ReaderConnection ¶
type ReaderConnection struct {
// contains filtered or unexported fields
}
ReaderConnection represents a connected reader receiving replication data.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver receives WAL entries from the writer node and applies them locally. It runs on reader nodes to maintain data consistency with the writer.
func NewReceiver ¶
func NewReceiver(cfg *ReceiverConfig) *Receiver
NewReceiver creates a new replication receiver.
func (*Receiver) IsConnected ¶
IsConnected returns whether the receiver is connected to the writer.
func (*Receiver) LastSequence ¶
LastSequence returns the last received sequence number.
type ReceiverConfig ¶
type ReceiverConfig struct {
// ReaderID is this reader's unique identifier
ReaderID string
// WriterAddr is the writer's coordinator address to connect to
WriterAddr string
// LocalWAL is the local WAL to write received entries (optional, for durability)
LocalWAL WALWriter
// IngestHandler applies entries to local buffer (optional)
IngestHandler IngestHandler
// ReconnectInterval is how long to wait before reconnecting after disconnect
ReconnectInterval time.Duration
// AckInterval is how often to send acknowledgments to the writer
AckInterval time.Duration
// Logger for receiver events
Logger zerolog.Logger
// TLSConfig for encrypted inter-node communication (nil = plain TCP)
TLSConfig *tls.Config
}
ReceiverConfig holds configuration for the replication receiver.
type ReplicateAck ¶
type ReplicateAck struct {
// LastSequence is the last successfully received and applied sequence
LastSequence uint64 `json:"last_seq"`
// ReaderID identifies which reader is sending the ack
ReaderID string `json:"reader_id"`
}
ReplicateAck acknowledges receipt of entries up to a sequence number. Sent periodically by readers to inform writers of progress.
func ParseAck ¶
func ParseAck(payload []byte) (*ReplicateAck, error)
ParseAck parses a ReplicateAck from JSON payload.
type ReplicateEntry ¶
type ReplicateEntry struct {
// Sequence is a monotonically increasing number for ordering and deduplication
Sequence uint64 `json:"seq"`
// TimestampUS is the original entry timestamp in microseconds since epoch
TimestampUS uint64 `json:"ts"`
// Payload is the raw msgpack payload (zero-copy from WAL)
Payload []byte `json:"payload"`
}
ReplicateEntry is a single WAL entry sent from writer to reader. This is the primary message type for streaming WAL data.
func ParseEntry ¶
func ParseEntry(payload []byte) (*ReplicateEntry, error)
ParseEntry parses a ReplicateEntry from JSON payload.
type ReplicateError ¶
type ReplicateError struct {
// Code is a machine-readable error code
Code string `json:"code"`
// Message is a human-readable error description
Message string `json:"message"`
}
ReplicateError indicates a replication error.
func ParseError ¶
func ParseError(payload []byte) (*ReplicateError, error)
ParseError parses a ReplicateError from JSON payload.
type ReplicateSync ¶
type ReplicateSync struct {
// ReaderID identifies the reader requesting sync
ReaderID string `json:"reader_id"`
// LastKnownSequence is the last sequence the reader has (0 if new)
LastKnownSequence uint64 `json:"last_known_seq"`
}
ReplicateSync requests the current replication position. Sent by readers when connecting or reconnecting to sync state.
func ParseSync ¶
func ParseSync(payload []byte) (*ReplicateSync, error)
ParseSync parses a ReplicateSync from JSON payload.
type ReplicateSyncAck ¶
type ReplicateSyncAck struct {
// CurrentSequence is the writer's current sequence number
CurrentSequence uint64 `json:"current_seq"`
// CanResume indicates if the reader can resume from LastKnownSequence
// If false, reader needs to bootstrap from scratch (WAL was rotated)
CanResume bool `json:"can_resume"`
// Error contains any error message (empty if success)
Error string `json:"error,omitempty"`
}
ReplicateSyncAck responds with the writer's current position. Allows readers to understand their lag and prepare for streaming.
func ParseSyncAck ¶
func ParseSyncAck(payload []byte) (*ReplicateSyncAck, error)
ParseSyncAck parses a ReplicateSyncAck from JSON payload.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender streams WAL entries to connected reader nodes. It runs on the writer node and pushes entries as they arrive.
func NewSender ¶
func NewSender(cfg *SenderConfig) *Sender
NewSender creates a new replication sender.
func (*Sender) AcceptReader ¶
AcceptReader registers a new reader connection for replication. Called when a reader connects and sends a sync request.
func (*Sender) CurrentSequence ¶
CurrentSequence returns the current sequence number.
func (*Sender) ReaderCount ¶
ReaderCount returns the number of connected readers.
func (*Sender) RemoveReader ¶
RemoveReader disconnects a reader from replication.
func (*Sender) Replicate ¶
func (s *Sender) Replicate(entry *ReplicateEntry)
Replicate queues a WAL entry for replication to all readers. This is called by the WAL writer via the replication hook. It is non-blocking - entries are dropped if the buffer is full.
type SenderConfig ¶
type SenderConfig struct {
// BufferSize is the capacity of the entry buffer (default: 10000)
BufferSize int
// WriteTimeout is the timeout for writing to a reader connection
WriteTimeout time.Duration
// Logger for sender events
Logger zerolog.Logger
}
SenderConfig holds configuration for the replication sender.
type SimpleIngestHandler ¶
type SimpleIngestHandler struct {
HandleFunc func(ctx context.Context, records []map[string]interface{}) error
Logger zerolog.Logger
}
SimpleIngestHandler is a basic implementation of IngestHandler that decodes msgpack records and calls a provided function.
func (*SimpleIngestHandler) ApplyReplicatedEntry ¶
func (h *SimpleIngestHandler) ApplyReplicatedEntry(ctx context.Context, payload []byte) error
ApplyReplicatedEntry decodes the msgpack payload and calls the handler function.