Documentation
¶
Index ¶
- Constants
- Variables
- func SetLogger(l *slog.Logger)
- type AggregatedBook
- func (ab *AggregatedBook) ApplySnapshot(snapshot *Snapshot) error
- func (ab *AggregatedBook) Depth(side Side, price udecimal.Decimal) (udecimal.Decimal, error)
- func (ab *AggregatedBook) Rebuild() error
- func (ab *AggregatedBook) Replay(_ *OrderBookLog) error
- func (ab *AggregatedBook) SequenceID() uint64
- type DepthChange
- type DiscardPublishLog
- type EventHandler
- type InputEvent
- type MarketSegment
- type MatchingEngine
- func (engine *MatchingEngine) AmendOrder(_ context.Context, marketID string, cmd *protocol.AmendOrderCommand) error
- func (engine *MatchingEngine) CancelOrder(_ context.Context, marketID string, cmd *protocol.CancelOrderCommand) error
- func (engine *MatchingEngine) CreateMarket(commandID string, userID uint64, marketID string, minLotSize string, ...) error
- func (engine *MatchingEngine) Depth(marketID string, limit uint32) (*protocol.GetDepthResponse, error)
- func (engine *MatchingEngine) EnqueueCommand(cmd *protocol.Command) error
- func (engine *MatchingEngine) EnqueueCommandBatch(cmds []*protocol.Command) error
- func (engine *MatchingEngine) GetStats(marketID string) (*protocol.GetStatsResponse, error)
- func (engine *MatchingEngine) OnEvent(ev *InputEvent)
- func (engine *MatchingEngine) PlaceOrder(_ context.Context, marketID string, cmd *protocol.PlaceOrderCommand) error
- func (engine *MatchingEngine) PlaceOrderBatch(_ context.Context, marketID string, cmds []*protocol.PlaceOrderCommand) error
- func (engine *MatchingEngine) RestoreFromSnapshot(inputDir string) (*SnapshotMetadata, error)
- func (engine *MatchingEngine) ResumeMarket(commandID string, userID uint64, marketID string, timestamp int64) error
- func (engine *MatchingEngine) Run() error
- func (engine *MatchingEngine) SendUserEvent(commandID string, userID uint64, eventType string, key string, data []byte, ...) error
- func (engine *MatchingEngine) Shutdown(ctx context.Context) error
- func (engine *MatchingEngine) SuspendMarket(commandID string, userID uint64, marketID string, timestamp int64) error
- func (engine *MatchingEngine) TakeSnapshot(outputDir string) (*SnapshotMetadata, error)
- func (engine *MatchingEngine) UpdateConfig(commandID string, userID uint64, marketID string, minLotSize string, ...) error
- type MemoryPublishLog
- type Order
- type OrderBook
- type OrderBookLog
- func NewAdminLog(seqID uint64, commandID, engineID, marketID string, userID uint64, ...) *OrderBookLog
- func NewAmendLog(seqID uint64, commandID, engineID, marketID string, orderID string, ...) *OrderBookLog
- func NewCancelLog(seqID uint64, commandID, engineID, marketID string, orderID string, ...) *OrderBookLog
- func NewMatchLog(seqID uint64, commandID, engineID string, tradeID uint64, marketID string, ...) *OrderBookLog
- func NewOpenLog(seqID uint64, commandID, engineID, marketID string, orderID string, ...) *OrderBookLog
- func NewRejectLog(seqID uint64, commandID, engineID, marketID string, orderID string, ...) *OrderBookLog
- func NewUserEventLog(seqID uint64, commandID, engineID string, userID uint64, eventType string, ...) *OrderBookLog
- type OrderBookOption
- type OrderBookSnapshot
- type OrderType
- type PublishLog
- type RebuildFunc
- type RingBuffer
- func (rb *RingBuffer[T]) Claim() (int64, *T)
- func (rb *RingBuffer[T]) ClaimN(n int64) (start int64, end int64)
- func (rb *RingBuffer[T]) Commit(seq int64)
- func (rb *RingBuffer[T]) CommitN(startSeq int64, endSeq int64)
- func (rb *RingBuffer[T]) ConsumerSequence() int64
- func (rb *RingBuffer[T]) GetPendingEvents() int64
- func (rb *RingBuffer[T]) ProducerSequence() int64
- func (rb *RingBuffer[T]) Publish(event T)
- func (rb *RingBuffer[T]) Run()
- func (rb *RingBuffer[T]) Shutdown(ctx context.Context) error
- type Side
- type Snapshot
- type SnapshotFileFooter
- type SnapshotMetadata
- type UpdateEvent
Constants ¶
const ( // EngineVersion is the current version of the matching engine. EngineVersion = "v1.0.0" // SnapshotSchemaVersion is the current version of the snapshot schema // Increment this when the snapshot format changes in a backward-incompatible way. SnapshotSchemaVersion = 1 )
Variables ¶
var ( // ErrInsufficientLiquidity is returned when there is not enough depth to fill a Market/IOC/FOK order. ErrInsufficientLiquidity = errors.New("there is not enough depth to fill the order") // ErrInvalidParam is returned when a parameter is invalid. ErrInvalidParam = errors.New("the param is invalid") // ErrInternal is returned when an internal server error occurs. ErrInternal = errors.New("internal server error") // ErrTimeout is returned when an operation times out. ErrTimeout = errors.New("timeout") // ErrShutdown is returned when the engine is shutting down. ErrShutdown = errors.New("order book is shutting down") // ErrNotFound is returned when a resource is not found. ErrNotFound = errors.New("not found") // ErrInvalidPrice is returned when a price is invalid. ErrInvalidPrice = errors.New("invalid price") // ErrInvalidSize is returned when a size is invalid. ErrInvalidSize = errors.New("invalid size") // ErrOrderBookClosed is returned when the order book is closed. ErrOrderBookClosed = errors.New("order book is closed") )
var DefaultLotSize = udecimal.MustFromInt64(1, defaultLotSizePrecision) // 0.00000001
DefaultLotSize is the fallback minimum trade unit (1e-8). This prevents infinite loops when quoteSize/price produces very small values.
var ErrDisruptorTimeout = errors.New("disruptor: shutdown timeout")
ErrDisruptorTimeout is returned when shutdown times out.
Functions ¶
Types ¶
type AggregatedBook ¶ added in v0.7.0
type AggregatedBook struct {
// OnRebuild is called when a rebuild is needed (e.g., sequence gap detected).
// The callback should return a snapshot from which the book will be rebuilt.
// This must be set before calling Rebuild() or Replay() with gap detection.
OnRebuild RebuildFunc
// contains filtered or unexported fields
}
AggregatedBook maintains a simplified view of the order book, tracking only price levels and their aggregated sizes (depth). It is designed for downstream services that need to rebuild order book state from BookLog events received via message queue.
func NewAggregatedBook ¶ added in v0.7.0
func NewAggregatedBook() *AggregatedBook
NewAggregatedBook creates a new AggregatedBook instance with empty ask and bid sides.
func (*AggregatedBook) ApplySnapshot ¶ added in v0.7.0
func (ab *AggregatedBook) ApplySnapshot(snapshot *Snapshot) error
ApplySnapshot resets the aggregated book state from a snapshot. This clears all existing data and applies the snapshot's depth levels.
func (*AggregatedBook) Depth ¶ added in v0.7.0
Depth returns the aggregated size at a specific price level for the given side. Returns zero if the price level does not exist.
func (*AggregatedBook) Rebuild ¶ added in v0.7.0
func (ab *AggregatedBook) Rebuild() error
Rebuild triggers a manual rebuild by calling the OnRebuild callback. Returns an error if OnRebuild is not set or if the callback fails.
func (*AggregatedBook) Replay ¶ added in v0.7.0
func (ab *AggregatedBook) Replay(_ *OrderBookLog) error
Replay applies a BookLog event to update the aggregated book state. Events with LogType == LogTypeReject do not affect book state but still update the sequence ID. Returns an error if a sequence gap is detected and rebuild fails.
func (*AggregatedBook) SequenceID ¶ added in v0.7.0
func (ab *AggregatedBook) SequenceID() uint64
SequenceID returns the last processed sequence ID. Used for synchronization and gap detection during rebuild.
type DepthChange ¶ added in v0.7.0
DepthChange represents a change in the order book depth.
func CalculateDepthChange ¶ added in v0.7.0
func CalculateDepthChange(log *OrderBookLog) DepthChange
CalculateDepthChange calculates the depth change based on the book log. It returns a DepthChange struct indicating which side and price level should be updated. Note: For LogTypeMatch, the side returned is the Maker's side (opposite of the log's side).
type DiscardPublishLog ¶ added in v0.7.0
type DiscardPublishLog struct{}
DiscardPublishLog discards all logs, useful for benchmarking.
func NewDiscardPublishLog ¶ added in v0.7.0
func NewDiscardPublishLog() *DiscardPublishLog
NewDiscardPublishLog creates a new DiscardPublishLog.
func (*DiscardPublishLog) Publish ¶ added in v0.7.0
func (p *DiscardPublishLog) Publish(_ []*OrderBookLog)
Publish does nothing.
type EventHandler ¶ added in v0.8.0
type EventHandler[T any] interface { OnEvent(event *T) }
EventHandler is the interface for processing events from the RingBuffer.
type InputEvent ¶ added in v0.8.0
type InputEvent struct {
// Cmd is the external command carrier.
Cmd *protocol.Command
// Internal Query fields (Read Path)
Query any // e.g. *protocol.GetDepthRequest
Resp chan any
}
InputEvent is the internal wrapper for all events entering the OrderBook Actor.
type MarketSegment ¶ added in v0.8.0
type MarketSegment struct {
MarketID string `json:"market_id"`
Offset int64 `json:"offset"` // Start offset in snapshot.bin (relative to file start)
Length int64 `json:"length"` // Length in bytes
Checksum uint32 `json:"checksum"` // CRC32 Checksum of this segment
}
MarketSegment contains metadata for a specific market's data within the snapshot binary file.
type MatchingEngine ¶
type MatchingEngine struct {
// contains filtered or unexported fields
}
MatchingEngine manages multiple order books for different markets. It uses a single shared RingBuffer (Disruptor) for all commands, allowing the entire event loop to run on a single goroutine. This enables runtime.LockOSThread() for CPU affinity scenarios.
func NewMatchingEngine ¶
func NewMatchingEngine(engineID string, publishTrader PublishLog) *MatchingEngine
NewMatchingEngine creates a new matching engine instance.
func (*MatchingEngine) AmendOrder ¶ added in v0.7.0
func (engine *MatchingEngine) AmendOrder(_ context.Context, marketID string, cmd *protocol.AmendOrderCommand) error
AmendOrder modifies an existing order in the appropriate order book. Returns ErrShutdown if the engine is shutting down or ErrNotFound if market doesn't exist.
func (*MatchingEngine) CancelOrder ¶
func (engine *MatchingEngine) CancelOrder( _ context.Context, marketID string, cmd *protocol.CancelOrderCommand, ) error
CancelOrder cancels an order in the appropriate order book. Returns ErrShutdown if the engine is shutting down or ErrNotFound if market doesn't exist.
func (*MatchingEngine) CreateMarket ¶ added in v0.8.0
func (engine *MatchingEngine) CreateMarket( commandID string, userID uint64, marketID string, minLotSize string, timestamp int64, ) error
CreateMarket sends a command to create a new market.
func (*MatchingEngine) Depth ¶ added in v0.8.0
func (engine *MatchingEngine) Depth(marketID string, limit uint32) (*protocol.GetDepthResponse, error)
Depth returns the current depth of the order book for the specified market.
func (*MatchingEngine) EnqueueCommand ¶ added in v0.8.0
func (engine *MatchingEngine) EnqueueCommand(cmd *protocol.Command) error
EnqueueCommand routes the command to the Engine's shared RingBuffer. CreateMarket is handled synchronously so the OrderBook is immediately available.
func (*MatchingEngine) EnqueueCommandBatch ¶ added in v0.8.0
func (engine *MatchingEngine) EnqueueCommandBatch(cmds []*protocol.Command) error
EnqueueCommandBatch routes a batch of commands to the Engine's shared RingBuffer. It claims n contiguous slots in the RingBuffer to amortize synchronization overhead.
func (*MatchingEngine) GetStats ¶ added in v0.8.0
func (engine *MatchingEngine) GetStats(marketID string) (*protocol.GetStatsResponse, error)
GetStats returns usage statistics for the specified market.
func (*MatchingEngine) OnEvent ¶ added in v0.8.0
func (engine *MatchingEngine) OnEvent(ev *InputEvent)
OnEvent implements EventHandler[InputEvent] for the Engine's shared RingBuffer. It routes events to the appropriate OrderBook based on MarketID.
func (*MatchingEngine) PlaceOrder ¶
func (engine *MatchingEngine) PlaceOrder(_ context.Context, marketID string, cmd *protocol.PlaceOrderCommand) error
PlaceOrder adds an order to the appropriate order book based on the market ID. Returns ErrShutdown if the engine is shutting down or ErrNotFound if market doesn't exist.
func (*MatchingEngine) PlaceOrderBatch ¶ added in v0.8.0
func (engine *MatchingEngine) PlaceOrderBatch( _ context.Context, marketID string, cmds []*protocol.PlaceOrderCommand, ) error
PlaceOrderBatch adds multiple orders to the appropriate order book(s). This method performs serialization before acquiring RingBuffer slots, ensuring that serialization errors do not block or waste RingBuffer sequences.
func (*MatchingEngine) RestoreFromSnapshot ¶ added in v0.8.0
func (engine *MatchingEngine) RestoreFromSnapshot(inputDir string) (*SnapshotMetadata, error)
RestoreFromSnapshot restores the entire matching engine state from a snapshot in the specified directory. Returns the metadata from the snapshot for MQ replay positioning.
func (*MatchingEngine) ResumeMarket ¶ added in v0.8.0
func (engine *MatchingEngine) ResumeMarket(commandID string, userID uint64, marketID string, timestamp int64) error
ResumeMarket sends a command to resume a market.
func (*MatchingEngine) Run ¶ added in v0.8.0
func (engine *MatchingEngine) Run() error
Run starts the engine's event loop. This is a blocking call. The consumer loop runs on the calling goroutine, enabling the caller to control thread affinity via runtime.LockOSThread().
Usage:
go func() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
engine.Run()
}()
func (*MatchingEngine) SendUserEvent ¶ added in v0.8.0
func (engine *MatchingEngine) SendUserEvent( commandID string, userID uint64, eventType string, key string, data []byte, timestamp int64, ) error
SendUserEvent sends a generic user event to the matching engine. These events are processed sequentially with trades and emitted via PublishLog.
func (*MatchingEngine) Shutdown ¶ added in v0.7.0
func (engine *MatchingEngine) Shutdown(ctx context.Context) error
Shutdown gracefully shuts down the engine. It blocks until all pending commands in the RingBuffer are processed or the context is canceled.
func (*MatchingEngine) SuspendMarket ¶ added in v0.8.0
func (engine *MatchingEngine) SuspendMarket(commandID string, userID uint64, marketID string, timestamp int64) error
SuspendMarket sends a command to suspend a market.
func (*MatchingEngine) TakeSnapshot ¶ added in v0.8.0
func (engine *MatchingEngine) TakeSnapshot(outputDir string) (*SnapshotMetadata, error)
TakeSnapshot captures a consistent snapshot of all order books and writes them to the specified directory. It generates two files: `snapshot.bin` (binary data) and `metadata.json` (metadata). Returns the metadata object or an error.
func (*MatchingEngine) UpdateConfig ¶ added in v0.8.0
func (engine *MatchingEngine) UpdateConfig( commandID string, userID uint64, marketID string, minLotSize string, timestamp int64, ) error
UpdateConfig sends a command to update market configuration.
type MemoryPublishLog ¶ added in v0.7.0
type MemoryPublishLog struct {
Trades []*OrderBookLog
// contains filtered or unexported fields
}
MemoryPublishLog stores logs in memory, useful for testing.
func NewMemoryPublishLog ¶ added in v0.7.0
func NewMemoryPublishLog() *MemoryPublishLog
NewMemoryPublishLog creates a new MemoryPublishLog.
func (*MemoryPublishLog) Count ¶ added in v0.7.0
func (m *MemoryPublishLog) Count() int
Count returns the number of logs stored.
func (*MemoryPublishLog) Get ¶ added in v0.7.0
func (m *MemoryPublishLog) Get(index int) *OrderBookLog
Get returns the log at the specified index.
func (*MemoryPublishLog) Logs ¶ added in v0.8.0
func (m *MemoryPublishLog) Logs() []*OrderBookLog
Logs returns a copy of all logs stored.
func (*MemoryPublishLog) Publish ¶ added in v0.7.0
func (m *MemoryPublishLog) Publish(trades []*OrderBookLog)
Publish appends logs to the in-memory slice.
type Order ¶
type Order struct {
ID string `json:"id"`
Side Side `json:"side"`
Price udecimal.Decimal `json:"price"`
Size udecimal.Decimal `json:"size"` // Remaining visible size
Type OrderType `json:"type"`
UserID uint64 `json:"user_id"`
Timestamp int64 `json:"timestamp"` // Unix nano, creation time
// Iceberg fields
VisibleLimit udecimal.Decimal `json:"visible_limit,omitzero"`
HiddenSize udecimal.Decimal `json:"hidden_size,omitzero"`
// contains filtered or unexported fields
}
Order represents the state of an order in the order book. This is the serializable state used for snapshots.
type OrderBook ¶
type OrderBook struct {
// contains filtered or unexported fields
}
OrderBook is a pure logic object that maintains the state of an order book. It must be managed by a MatchingEngine which provides the event loop.
func (*OrderBook) LastCmdSeqID ¶ added in v0.8.0
LastCmdSeqID returns the sequence ID of the last processed command.
func (*OrderBook) Restore ¶ added in v0.8.0
func (book *OrderBook) Restore(snap *OrderBookSnapshot)
Restore restores the order book state from a snapshot.
type OrderBookLog ¶ added in v0.8.0
type OrderBookLog struct {
SeqID uint64 `json:"seq_id"`
CommandID string `json:"command_id,omitempty"`
EngineID string `json:"engine_id,omitempty"`
TradeID uint64 `json:"trade_id,omitempty"` // Sequential trade ID, only set for Match events
Type protocol.LogType `json:"type"` // Event type: open, match, cancel, amend, reject
MarketID string `json:"market_id"`
Side Side `json:"side"`
Price udecimal.Decimal `json:"price"`
Size udecimal.Decimal `json:"size"`
Amount udecimal.Decimal `json:"amount,omitzero"` // Price * Size, only set for Match events
OldPrice udecimal.Decimal `json:"old_price,omitzero"`
OldSize udecimal.Decimal `json:"old_size,omitzero"`
OrderID string `json:"order_id"`
UserID uint64 `json:"user_id"`
OrderType OrderType `json:"order_type,omitempty"` // Order type: limit, market, ioc, fok
MakerOrderID string `json:"maker_order_id,omitempty"`
MakerUserID uint64 `json:"maker_user_id,omitempty"`
RejectReason protocol.RejectReason `json:"reject_reason,omitempty"` // Reason for rejection, only set for Reject events
EventType string `json:"event_type,omitempty"` // User defined event type
Data []byte `json:"data,omitempty"` // Arbitrary data for user events
Timestamp int64 `json:"timestamp"` // Command timestamp for determinism
}
OrderBookLog represents an event in the order book. SequenceID is a globally increasing ID for every event, used for ordering, deduplication, and rebuild synchronization in downstream systems. Use LogType to determine if the event affects order book state: - Open, Match, Cancel, Amend, Admin: affect order book state or lifecycle metadata - Reject: does not affect order book state.
func NewAdminLog ¶ added in v0.8.0
func NewAdminLog( seqID uint64, commandID, engineID, marketID string, userID uint64, eventType string, timestamp int64, ) *OrderBookLog
NewAdminLog creates a new OrderBookLog for a successful management event.
func NewAmendLog ¶ added in v0.8.0
func NewAmendLog( seqID uint64, commandID, engineID, marketID string, orderID string, userID uint64, side Side, price, size udecimal.Decimal, oldPrice udecimal.Decimal, oldSize udecimal.Decimal, orderType OrderType, timestamp int64, ) *OrderBookLog
NewAmendLog creates a new OrderBookLog for an order amendment event.
func NewCancelLog ¶ added in v0.8.0
func NewCancelLog( seqID uint64, commandID, engineID, marketID string, orderID string, userID uint64, side Side, price, size udecimal.Decimal, orderType OrderType, timestamp int64, ) *OrderBookLog
NewCancelLog creates a new OrderBookLog for an order cancellation event.
func NewMatchLog ¶ added in v0.8.0
func NewMatchLog( seqID uint64, commandID, engineID string, tradeID uint64, marketID string, takerID string, takerUserID uint64, takerSide Side, takerType OrderType, makerID string, makerUserID uint64, price udecimal.Decimal, size udecimal.Decimal, timestamp int64, ) *OrderBookLog
NewMatchLog creates a new OrderBookLog for a trade match event.
func NewOpenLog ¶ added in v0.8.0
func NewOpenLog( seqID uint64, commandID, engineID, marketID string, orderID string, userID uint64, side Side, price, size udecimal.Decimal, orderType OrderType, timestamp int64, ) *OrderBookLog
NewOpenLog creates a new OrderBookLog for an open order event.
func NewRejectLog ¶ added in v0.8.0
func NewRejectLog( seqID uint64, commandID, engineID, marketID string, orderID string, userID uint64, reason protocol.RejectReason, timestamp int64, ) *OrderBookLog
NewRejectLog creates a new OrderBookLog for an order rejection event.
func NewUserEventLog ¶ added in v0.8.0
func NewUserEventLog( seqID uint64, commandID, engineID string, userID uint64, eventType string, key string, data []byte, timestamp int64, ) *OrderBookLog
NewUserEventLog creates a new OrderBookLog for a generic user event.
type OrderBookOption ¶ added in v0.8.0
type OrderBookOption func(*OrderBook)
OrderBookOption configures an OrderBook.
func WithLotSize ¶ added in v0.8.0
func WithLotSize(size udecimal.Decimal) OrderBookOption
WithLotSize sets the minimum trade unit for the order book. When a Market order's calculated match size is less than this value, the order will be rejected with remaining funds returned. Default: 1e-8 (0.00000001) as a safety fallback.
type OrderBookSnapshot ¶ added in v0.8.0
type OrderBookSnapshot struct {
MarketID string `json:"market_id"`
SeqID uint64 `json:"seq_id"` // Current BookLog sequence ID
LastCmdSeqID uint64 `json:"last_cmd_seq_id"` // Last processed command sequence ID from MQ
TradeID uint64 `json:"trade_id"` // Current Trade sequence ID
Bids []*Order `json:"bids"` // Ordered list of bids (best price first)
Asks []*Order `json:"asks"` // Ordered list of asks (best price first)
State protocol.OrderBookState `json:"state"`
MinLotSize udecimal.Decimal `json:"min_lot_size"`
}
OrderBookSnapshot contains the full state of a single OrderBook.
type OrderType ¶
OrderType represents the type of order.
const ( // Market order. Market OrderType = protocol.OrderTypeMarket // Limit order. Limit OrderType = protocol.OrderTypeLimit // FOK order (Fill or Kill). FOK OrderType = protocol.OrderTypeFOK // IOC order (Immediate or Cancel). IOC OrderType = protocol.OrderTypeIOC // PostOnly order. PostOnly OrderType = protocol.OrderTypePostOnly // Cancel order. Cancel OrderType = protocol.OrderTypeCancel )
type PublishLog ¶ added in v0.7.0
type PublishLog interface {
// Publish publishes order book logs. The slice comes from logSlicePool.
Publish(logs []*OrderBookLog)
}
PublishLog is an interface for publishing order book logs (trades, opens, cancels).
IMPORTANT: Implementations must either:
- Process logs synchronously before returning, OR
- Clone the BookLog data before returning
The caller recycles BookLog objects to a sync.Pool after Publish returns, so any asynchronous processing must work with cloned data.
type RebuildFunc ¶ added in v0.7.0
RebuildFunc is the callback type for fetching a snapshot during rebuild. Implementations should fetch the current order book snapshot from external sources (e.g., Redis, Database, API) and return it for the AggregatedBook to apply.
type RingBuffer ¶ added in v0.8.0
type RingBuffer[T any] struct { // contains filtered or unexported fields }
RingBuffer is a lock-free MPSC (Multi-Producer Single-Consumer) ring buffer. It provides high-throughput, zero-allocation event passing between goroutines.
func NewRingBuffer ¶ added in v0.8.0
func NewRingBuffer[T any](capacity int64, handler EventHandler[T]) *RingBuffer[T]
NewRingBuffer creates a new MPSC RingBuffer. The capacity must be a power of 2.
func (*RingBuffer[T]) Claim ¶ added in v0.8.0
func (rb *RingBuffer[T]) Claim() (int64, *T)
Claim atomically claims a sequence and returns a pointer to the slot. Returns (-1, nil) if the RingBuffer is shut down. The caller should write to the slot and then call Commit(seq).
func (*RingBuffer[T]) ClaimN ¶ added in v0.8.0
func (rb *RingBuffer[T]) ClaimN(n int64) (start int64, end int64)
ClaimN atomically claims n sequences and returns the start and end sequence numbers. Returns (-1, -1) if the RingBuffer is shut down. The caller should write to the slots from startSeq to endSeq and then call CommitN(startSeq, endSeq).
func (*RingBuffer[T]) Commit ¶ added in v0.8.0
func (rb *RingBuffer[T]) Commit(seq int64)
Commit marks the slot as published, making it visible to the consumer.
func (*RingBuffer[T]) CommitN ¶ added in v0.8.0
func (rb *RingBuffer[T]) CommitN(startSeq int64, endSeq int64)
CommitN marks the slots from startSeq to endSeq as published, making them visible to the consumer.
func (*RingBuffer[T]) ConsumerSequence ¶ added in v0.8.0
func (rb *RingBuffer[T]) ConsumerSequence() int64
ConsumerSequence returns the current consumer sequence (for monitoring).
func (*RingBuffer[T]) GetPendingEvents ¶ added in v0.8.0
func (rb *RingBuffer[T]) GetPendingEvents() int64
GetPendingEvents returns the number of pending events (for monitoring).
func (*RingBuffer[T]) ProducerSequence ¶ added in v0.8.0
func (rb *RingBuffer[T]) ProducerSequence() int64
ProducerSequence returns the current producer sequence (for monitoring).
func (*RingBuffer[T]) Publish ¶ added in v0.8.0
func (rb *RingBuffer[T]) Publish(event T)
Publish publishes an event to the ring buffer (multi-producer safe). This is a convenience method that wraps Claim() and Commit().
func (*RingBuffer[T]) Run ¶ added in v0.8.0
func (rb *RingBuffer[T]) Run()
Run starts the consumer loop in the calling goroutine (blocking). This allows the caller to control the goroutine and OS thread, enabling runtime.LockOSThread() for CPU affinity scenarios.
type Snapshot ¶ added in v0.7.0
type Snapshot struct {
SequenceID uint64 // The sequence ID at which this snapshot was taken
Asks []*protocol.DepthItem // Ask side depth levels, sorted by price ascending
Bids []*protocol.DepthItem // Bid side depth levels, sorted by price descending
}
Snapshot represents a point-in-time state of the order book. Used to initialize or reset the AggregatedBook during rebuild.
type SnapshotFileFooter ¶ added in v0.8.0
type SnapshotFileFooter struct {
}
SnapshotFileFooter is the footer structure stored at the end of snapshot.bin. Layout: [BinaryData...][FooterJSON][FooterLength(4 bytes)].
type SnapshotMetadata ¶ added in v0.8.0
type SnapshotMetadata struct {
SchemaVersion int `json:"schema_version"` // Snapshot schema version for backward compatibility
Timestamp int64 `json:"timestamp"` // Unix Nano
GlobalLastCmdSeqID uint64 `json:"global_last_cmd_seq_id"` // Global MQ offset to resume from (max of all markets)
EngineVersion string `json:"engine_version"` // Engine version
SnapshotChecksum uint32 `json:"snapshot_checksum"` // CRC32 of the entire snapshot.bin file
}
SnapshotMetadata holds the global metadata for a snapshot (stored in metadata.json).
type UpdateEvent ¶
UpdateEvent represents a change in price or size at a specific price level.