replication

package
v0.0.0-...-706b979 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MsgReplicateEntry is a WAL entry from writer to reader
	MsgReplicateEntry byte = 0x10

	// MsgReplicateAck is an acknowledgment from reader to writer
	MsgReplicateAck byte = 0x11

	// MsgReplicateSync requests current replication position
	MsgReplicateSync byte = 0x12

	// MsgReplicateSyncAck responds with current replication position
	MsgReplicateSyncAck byte = 0x13

	// MsgReplicateError indicates a replication error
	MsgReplicateError byte = 0x1F
)

Message types for WAL replication protocol

View Source
const (
	ErrCodeSequenceGap = "SEQUENCE_GAP" // Reader is too far behind to resume
	ErrCodeWALRotated  = "WAL_ROTATED"  // WAL was rotated, need full resync
	ErrCodeInvalidMsg  = "INVALID_MSG"  // Invalid message format
	ErrCodeNotWriter   = "NOT_WRITER"   // Connected to non-writer node
	ErrCodeBufferFull  = "BUFFER_FULL"  // Replication buffer is full
	ErrCodeWriteFailed = "WRITE_FAILED" // Failed to write entry
	ErrCodeApplyFailed = "APPLY_FAILED" // Failed to apply entry
)

Error codes for replication failures

View Source
const (
	// HeaderSize is the size of the message header (length + type)
	HeaderSize = 5

	// MaxMessageSize is the maximum allowed message size (100MB)
	MaxMessageSize = 100 * 1024 * 1024
)

Variables

This section is empty.

Functions

func ReadMessage

func ReadMessage(r io.Reader) (byte, []byte, error)

ReadMessage reads a typed message from the reader. Returns the message type and unmarshaled message.

func WriteAck

func WriteAck(w io.Writer, ack *ReplicateAck) error

WriteAck writes a ReplicateAck to the writer.

func WriteEntry

func WriteEntry(w io.Writer, entry *ReplicateEntry) error

WriteEntry writes a ReplicateEntry to the writer.

func WriteError

func WriteError(w io.Writer, err *ReplicateError) error

WriteError writes a ReplicateError to the writer.

func WriteMessage

func WriteMessage(w io.Writer, msgType byte, msg interface{}) error

WriteMessage writes a typed message to the writer. Format: [4-byte length][1-byte type][payload]

func WriteSync

func WriteSync(w io.Writer, sync *ReplicateSync) error

WriteSync writes a ReplicateSync to the writer.

func WriteSyncAck

func WriteSyncAck(w io.Writer, ack *ReplicateSyncAck) error

WriteSyncAck writes a ReplicateSyncAck to the writer.

Types

type IngestHandler

type IngestHandler interface {
	// ApplyReplicatedEntry applies a replicated entry to the local buffer
	// The entry contains msgpack-encoded records that need to be ingested
	ApplyReplicatedEntry(ctx context.Context, payload []byte) error
}

IngestHandler interface for applying entries to the ingest buffer

type IngestHandlerFunc

type IngestHandlerFunc func(ctx context.Context, payload []byte) error

IngestHandlerFunc is an adapter to allow the use of ordinary functions as IngestHandler.

func (IngestHandlerFunc) ApplyReplicatedEntry

func (f IngestHandlerFunc) ApplyReplicatedEntry(ctx context.Context, payload []byte) error

type ReaderConnection

type ReaderConnection struct {
	// contains filtered or unexported fields
}

ReaderConnection represents a connected reader receiving replication data.

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver receives WAL entries from the writer node and applies them locally. It runs on reader nodes to maintain data consistency with the writer.

func NewReceiver

func NewReceiver(cfg *ReceiverConfig) *Receiver

NewReceiver creates a new replication receiver.

func (*Receiver) IsConnected

func (r *Receiver) IsConnected() bool

IsConnected returns whether the receiver is connected to the writer.

func (*Receiver) LastSequence

func (r *Receiver) LastSequence() uint64

LastSequence returns the last received sequence number.

func (*Receiver) Start

func (r *Receiver) Start(ctx context.Context) error

Start begins the receiver's connection and processing loop.

func (*Receiver) Stats

func (r *Receiver) Stats() map[string]interface{}

Stats returns receiver statistics.

func (*Receiver) Stop

func (r *Receiver) Stop() error

Stop gracefully shuts down the receiver.

type ReceiverConfig

type ReceiverConfig struct {
	// ReaderID is this reader's unique identifier
	ReaderID string

	// WriterAddr is the writer's coordinator address to connect to
	WriterAddr string

	// LocalWAL is the local WAL to write received entries (optional, for durability)
	LocalWAL WALWriter

	// IngestHandler applies entries to local buffer (optional)
	IngestHandler IngestHandler

	// ReconnectInterval is how long to wait before reconnecting after disconnect
	ReconnectInterval time.Duration

	// AckInterval is how often to send acknowledgments to the writer
	AckInterval time.Duration

	// Logger for receiver events
	Logger zerolog.Logger

	// TLSConfig for encrypted inter-node communication (nil = plain TCP)
	TLSConfig *tls.Config
}

ReceiverConfig holds configuration for the replication receiver.

type ReplicateAck

type ReplicateAck struct {
	// LastSequence is the last successfully received and applied sequence
	LastSequence uint64 `json:"last_seq"`

	// ReaderID identifies which reader is sending the ack
	ReaderID string `json:"reader_id"`
}

ReplicateAck acknowledges receipt of entries up to a sequence number. Sent periodically by readers to inform writers of progress.

func ParseAck

func ParseAck(payload []byte) (*ReplicateAck, error)

ParseAck parses a ReplicateAck from JSON payload.

type ReplicateEntry

type ReplicateEntry struct {
	// Sequence is a monotonically increasing number for ordering and deduplication
	Sequence uint64 `json:"seq"`

	// TimestampUS is the original entry timestamp in microseconds since epoch
	TimestampUS uint64 `json:"ts"`

	// Payload is the raw msgpack payload (zero-copy from WAL)
	Payload []byte `json:"payload"`
}

ReplicateEntry is a single WAL entry sent from writer to reader. This is the primary message type for streaming WAL data.

func ParseEntry

func ParseEntry(payload []byte) (*ReplicateEntry, error)

ParseEntry parses a ReplicateEntry from JSON payload.

type ReplicateError

type ReplicateError struct {
	// Code is a machine-readable error code
	Code string `json:"code"`

	// Message is a human-readable error description
	Message string `json:"message"`
}

ReplicateError indicates a replication error.

func ParseError

func ParseError(payload []byte) (*ReplicateError, error)

ParseError parses a ReplicateError from JSON payload.

type ReplicateSync

type ReplicateSync struct {
	// ReaderID identifies the reader requesting sync
	ReaderID string `json:"reader_id"`

	// LastKnownSequence is the last sequence the reader has (0 if new)
	LastKnownSequence uint64 `json:"last_known_seq"`
}

ReplicateSync requests the current replication position. Sent by readers when connecting or reconnecting to sync state.

func ParseSync

func ParseSync(payload []byte) (*ReplicateSync, error)

ParseSync parses a ReplicateSync from JSON payload.

type ReplicateSyncAck

type ReplicateSyncAck struct {
	// CurrentSequence is the writer's current sequence number
	CurrentSequence uint64 `json:"current_seq"`

	// CanResume indicates if the reader can resume from LastKnownSequence
	// If false, reader needs to bootstrap from scratch (WAL was rotated)
	CanResume bool `json:"can_resume"`

	// Error contains any error message (empty if success)
	Error string `json:"error,omitempty"`
}

ReplicateSyncAck responds with the writer's current position. Allows readers to understand their lag and prepare for streaming.

func ParseSyncAck

func ParseSyncAck(payload []byte) (*ReplicateSyncAck, error)

ParseSyncAck parses a ReplicateSyncAck from JSON payload.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender streams WAL entries to connected reader nodes. It runs on the writer node and pushes entries as they arrive.

func NewSender

func NewSender(cfg *SenderConfig) *Sender

NewSender creates a new replication sender.

func (*Sender) AcceptReader

func (s *Sender) AcceptReader(conn net.Conn, readerID string, lastKnownSeq uint64) error

AcceptReader registers a new reader connection for replication. Called when a reader connects and sends a sync request.

func (*Sender) CurrentSequence

func (s *Sender) CurrentSequence() uint64

CurrentSequence returns the current sequence number.

func (*Sender) ReaderCount

func (s *Sender) ReaderCount() int

ReaderCount returns the number of connected readers.

func (*Sender) RemoveReader

func (s *Sender) RemoveReader(readerID string)

RemoveReader disconnects a reader from replication.

func (*Sender) Replicate

func (s *Sender) Replicate(entry *ReplicateEntry)

Replicate queues a WAL entry for replication to all readers. This is called by the WAL writer via the replication hook. It is non-blocking - entries are dropped if the buffer is full.

func (*Sender) Start

func (s *Sender) Start(ctx context.Context) error

Start begins the sender's background processing.

func (*Sender) Stats

func (s *Sender) Stats() map[string]interface{}

Stats returns sender statistics.

func (*Sender) Stop

func (s *Sender) Stop() error

Stop gracefully shuts down the sender.

type SenderConfig

type SenderConfig struct {
	// BufferSize is the capacity of the entry buffer (default: 10000)
	BufferSize int

	// WriteTimeout is the timeout for writing to a reader connection
	WriteTimeout time.Duration

	// Logger for sender events
	Logger zerolog.Logger
}

SenderConfig holds configuration for the replication sender.

type SimpleIngestHandler

type SimpleIngestHandler struct {
	HandleFunc func(ctx context.Context, records []map[string]interface{}) error
	Logger     zerolog.Logger
}

SimpleIngestHandler is a basic implementation of IngestHandler that decodes msgpack records and calls a provided function.

func (*SimpleIngestHandler) ApplyReplicatedEntry

func (h *SimpleIngestHandler) ApplyReplicatedEntry(ctx context.Context, payload []byte) error

ApplyReplicatedEntry decodes the msgpack payload and calls the handler function.

type WALWriter

type WALWriter interface {
	AppendRaw(payload []byte) error
}

WALWriter interface for writing to local WAL

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL