match

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: GPL-3.0 Imports: 17 Imported by: 0

README ¶

Matching Engine SDK

A high-performance, in-memory order matching engine written in Go. Designed for crypto exchanges, trading simulations, and financial systems requiring precise and fast order execution.

🚀 Features

  • High Performance: Pure in-memory matching using efficient SkipList data structures ($O(\log N)$) and Disruptor pattern (RingBuffer) for microsecond latency.
  • Single Thread Actor: Adopts a Lock-Free architecture where a single pinned goroutine processes all state mutations. This eliminates context switching and mutex contention, maximizing CPU cache locality.
  • Concurrency Safe: All state mutations are serialized through the RingBuffer, eliminating race conditions without heavy lock contention.
  • Low Allocation Hot Paths: Uses udecimal (uint64-based), intrusive lists, and object pooling to minimize GC pressure on performance-critical paths.
  • Multi-Market Support: Manages multiple trading pairs (e.g., BTC-USDT, ETH-USDT) within a single MatchingEngine instance.
  • Management Commands: Dynamic market management (Create, Suspend, Resume, UpdateConfig) via Event Sourcing.
  • Comprehensive Order Types:
    • Limit, Market (Size or QuoteSize), IOC, FOK, Post Only
    • Iceberg Orders: Support for hidden size with automatic replenishment.
  • Event Sourcing: Generates detailed OrderBookLog events allows for deterministic replay and state reconstruction.

📦 Installation

go get github.com/0x5487/matching-engine

🛠 Usage

Quick Start
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	match "github.com/0x5487/matching-engine"
	"github.com/0x5487/matching-engine/protocol"
	"github.com/quagmt/udecimal"
)

func main() {
	ctx := context.Background()

	// 1. Create a PublishLog handler (implement your own for non-memory usage)
	publish := match.NewMemoryPublishLog()

	// 2. Initialize the Matching Engine
	engine := match.NewMatchingEngine("engine-1", publish)

	// 3. Start the Engine (Actor Loop)
	// This must be run in a separate goroutine
	go func() {
		if err := engine.Run(); err != nil {
			panic(err)
		}
	}()

	// 4. Create a Market
	// Management commands are processed asynchronously by the engine event loop.
	if err := engine.CreateMarket("create-btc-usdt", 9001, "BTC-USDT", "0.00000001", time.Now().UnixNano()); err != nil {
		panic(err)
	}

	// Wait until the market is visible on the read path before submitting orders.
	for {
		if _, err := engine.GetStats("BTC-USDT"); err == nil {
			break
		}
		if !errors.Is(err, match.ErrNotFound) {
			panic(err)
		}
		time.Sleep(10 * time.Millisecond)
	}

	// 5. Place a Sell Limit Order
	sellCmd := &protocol.PlaceOrderCommand{
		CommandID: "sell-1-cmd",
		OrderID:   "sell-1",
		OrderType: protocol.OrderTypeLimit,
		Side:      protocol.SideSell,
		Price:     udecimal.MustFromInt64(50000, 0).String(), // 50000
		Size:      udecimal.MustFromInt64(1, 0).String(),     // 1.0
		UserID:    1001,
		Timestamp: time.Now().UnixNano(),
	}
	if err := engine.PlaceOrder(ctx, "BTC-USDT", sellCmd); err != nil {
		fmt.Printf("Error placing sell order: %v\n", err)
	}

	// 6. Place a Buy Limit Order (Matches immediately)
	buyCmd := &protocol.PlaceOrderCommand{
		CommandID: "buy-1-cmd",
		OrderID:   "buy-1",
		OrderType: protocol.OrderTypeLimit,
		Side:      protocol.SideBuy,
		Price:     udecimal.MustFromInt64(50000, 0).String(), // 50000
		Size:      udecimal.MustFromInt64(1, 0).String(),     // 1.0
		UserID:    1002,
		Timestamp: time.Now().UnixNano(),
	}
	if err := engine.PlaceOrder(ctx, "BTC-USDT", buyCmd); err != nil {
		fmt.Printf("Error placing buy order: %v\n", err)
	}

	// Allow some time for async processing
	time.Sleep(100 * time.Millisecond)

	// 7. Check Logs
	fmt.Printf("Total events: %d\n", publish.Count())
	logs := publish.Logs()
	for _, log := range logs {
		switch log.Type {
		case protocol.LogTypeMatch:
			fmt.Printf("[MATCH] TradeID: %d, Price: %s, Size: %s\n",
				log.TradeID, log.Price, log.Size)
		case protocol.LogTypeOpen:
			fmt.Printf("[OPEN] OrderID: %s, Price: %s\n", log.OrderID, log.Price)
		}
	}
}
Command Semantics
  • PlaceOrder, CancelOrder, AmendOrder, and management commands enqueue work into the engine event loop. A returned error means enqueue/serialization failure, not business rejection.
  • Every command must carry an upstream-assigned non-empty CommandID. Engine helpers reject empty command IDs before enqueue.
  • Every state-changing command must carry an upstream-assigned logical Timestamp. Timestamp <= 0 is rejected as invalid_payload. For engine helper methods such as CreateMarket, SuspendMarket, ResumeMarket, UpdateConfig, and SendUserEvent, pass the timestamp explicitly from your Gateway / Sequencer / OMS.
  • Business-level failures are emitted as OrderBookLog entries with Type == protocol.LogTypeReject.
  • Commands sent to a missing market generate a reject event with RejectReasonMarketNotFound.
  • GetStats() and Depth() return ErrNotFound immediately when the market does not exist.
Management Commands

The engine supports dynamic market management:

// Suspend a market (rejects new Place/Amend orders)
engine.SuspendMarket("suspend-btc-usdt", 9001, "BTC-USDT", time.Now().UnixNano())

// Resume a market
engine.ResumeMarket("resume-btc-usdt", 9001, "BTC-USDT", time.Now().UnixNano())

// Update market configuration (e.g. MinLotSize)
newLotSize := "0.01"
engine.UpdateConfig("update-btc-usdt-lot", 9001, "BTC-USDT", newLotSize, time.Now().UnixNano())

Successful management commands are emitted as LogTypeAdmin. Invalid management commands are reported through the same event stream as trading rejects. For example:

  • duplicate market creation emits RejectReasonMarketAlreadyExists
  • invalid MinLotSize emits RejectReasonInvalidPayload
  • management reject logs preserve the operator UserID
Supported Order Types
Type Description
Limit Buy/sell at a specific price or better
Market Execute immediately at best available price. Supports Size (base currency) or QuoteSize (quote currency).
IOC Fill immediately, cancel unfilled portion.
FOK Fill entirely immediately or cancel completely.
PostOnly Add to book as maker only, reject if would match immediately.
Event Handling

Implement PublishLog interface to handle order book events:

type MyHandler struct{}

func (h *MyHandler) Publish(logs []*match.OrderBookLog) {
	for _, log := range logs {
		// If you need local ingest / publish time, add it here instead of relying on engine-generated fields.
		if log.Type == protocol.LogTypeUser {
			fmt.Printf("User Event: %s, Data: %s\n", log.EventType, string(log.Data))
		} else if log.Type == protocol.LogTypeAdmin {
			fmt.Printf("Admin Event: %s | Market: %s\n", log.EventType, log.MarketID)
		} else {
			fmt.Printf("Event: %s | OrderID: %s\n", log.Type, log.OrderID)
		}
	}
}
Generic User Events (Extension Protocol)

Inject custom events into the matching engine's log stream. These events are processed sequentially with trades, ensuring deterministic ordering for valid use cases like L1 Block Boundaries, Audit Checkpoints, or Oracle Updates.

// Example: Sending an End-Of-Block signal from an L1 Blockchain
blockHash := []byte("0x123abc...")
// SendUserEvent(commandID, userID, eventType, key, data, timestamp)
err := engine.SendUserEvent("block-100-event", 999, "EndOfBlock", "block-100", blockHash, time.Now().UnixNano())

The event will appear in the PublishLog stream as LogTypeUser with your custom data payload. Malformed user-event payloads are emitted as LogTypeReject with RejectReasonInvalidPayload.

Snapshot and Restore

Use snapshots to persist engine state and restore it after restart:

meta, err := engine.TakeSnapshot("./snapshot")
if err != nil {
	panic(err)
}

restored := match.NewMatchingEngine("engine-1-restored", publish)
meta, err = restored.RestoreFromSnapshot("./snapshot")
if err != nil {
	panic(err)
}
_ = meta // contains GlobalLastCmdSeqID for replay positioning

Benchmark

Please refer to docs for detailed benchmarks.

Documentation ¶

Index ¶

Constants ¶

View Source
const (
	// EngineVersion is the current version of the matching engine.
	EngineVersion = "v1.0.0"

	// SnapshotSchemaVersion is the current version of the snapshot schema
	// Increment this when the snapshot format changes in a backward-incompatible way.
	SnapshotSchemaVersion = 1
)

Variables ¶

View Source
var (
	// ErrInsufficientLiquidity is returned when there is not enough depth to fill a Market/IOC/FOK order.
	ErrInsufficientLiquidity = errors.New("there is not enough depth to fill the order")
	// ErrInvalidParam is returned when a parameter is invalid.
	ErrInvalidParam = errors.New("the param is invalid")
	// ErrInternal is returned when an internal server error occurs.
	ErrInternal = errors.New("internal server error")
	// ErrTimeout is returned when an operation times out.
	ErrTimeout = errors.New("timeout")
	// ErrShutdown is returned when the engine is shutting down.
	ErrShutdown = errors.New("order book is shutting down")
	// ErrNotFound is returned when a resource is not found.
	ErrNotFound = errors.New("not found")
	// ErrInvalidPrice is returned when a price is invalid.
	ErrInvalidPrice = errors.New("invalid price")
	// ErrInvalidSize is returned when a size is invalid.
	ErrInvalidSize = errors.New("invalid size")
	// ErrOrderBookClosed is returned when the order book is closed.
	ErrOrderBookClosed = errors.New("order book is closed")
)
View Source
var DefaultLotSize = udecimal.MustFromInt64(1, defaultLotSizePrecision) // 0.00000001

DefaultLotSize is the fallback minimum trade unit (1e-8). This prevents infinite loops when quoteSize/price produces very small values.

View Source
var ErrDisruptorTimeout = errors.New("disruptor: shutdown timeout")

ErrDisruptorTimeout is returned when shutdown times out.

Functions ¶

func SetLogger ¶ added in v0.8.0

func SetLogger(l *slog.Logger)

SetLogger allows setting a custom logger.

Types ¶

type AggregatedBook ¶ added in v0.7.0

type AggregatedBook struct {

	// OnRebuild is called when a rebuild is needed (e.g., sequence gap detected).
	// The callback should return a snapshot from which the book will be rebuilt.
	// This must be set before calling Rebuild() or Replay() with gap detection.
	OnRebuild RebuildFunc
	// contains filtered or unexported fields
}

AggregatedBook maintains a simplified view of the order book, tracking only price levels and their aggregated sizes (depth). It is designed for downstream services that need to rebuild order book state from BookLog events received via message queue.

func NewAggregatedBook ¶ added in v0.7.0

func NewAggregatedBook() *AggregatedBook

NewAggregatedBook creates a new AggregatedBook instance with empty ask and bid sides.

func (*AggregatedBook) ApplySnapshot ¶ added in v0.7.0

func (ab *AggregatedBook) ApplySnapshot(snapshot *Snapshot) error

ApplySnapshot resets the aggregated book state from a snapshot. This clears all existing data and applies the snapshot's depth levels.

func (*AggregatedBook) Depth ¶ added in v0.7.0

func (ab *AggregatedBook) Depth(side Side, price udecimal.Decimal) (udecimal.Decimal, error)

Depth returns the aggregated size at a specific price level for the given side. Returns zero if the price level does not exist.

func (*AggregatedBook) Rebuild ¶ added in v0.7.0

func (ab *AggregatedBook) Rebuild() error

Rebuild triggers a manual rebuild by calling the OnRebuild callback. Returns an error if OnRebuild is not set or if the callback fails.

func (*AggregatedBook) Replay ¶ added in v0.7.0

func (ab *AggregatedBook) Replay(_ *OrderBookLog) error

Replay applies a BookLog event to update the aggregated book state. Events with LogType == LogTypeReject do not affect book state but still update the sequence ID. Returns an error if a sequence gap is detected and rebuild fails.

func (*AggregatedBook) SequenceID ¶ added in v0.7.0

func (ab *AggregatedBook) SequenceID() uint64

SequenceID returns the last processed sequence ID. Used for synchronization and gap detection during rebuild.

type DepthChange ¶ added in v0.7.0

type DepthChange struct {
	Side     Side
	Price    udecimal.Decimal
	SizeDiff udecimal.Decimal
}

DepthChange represents a change in the order book depth.

func CalculateDepthChange ¶ added in v0.7.0

func CalculateDepthChange(log *OrderBookLog) DepthChange

CalculateDepthChange calculates the depth change based on the book log. It returns a DepthChange struct indicating which side and price level should be updated. Note: For LogTypeMatch, the side returned is the Maker's side (opposite of the log's side).

type DiscardPublishLog ¶ added in v0.7.0

type DiscardPublishLog struct{}

DiscardPublishLog discards all logs, useful for benchmarking.

func NewDiscardPublishLog ¶ added in v0.7.0

func NewDiscardPublishLog() *DiscardPublishLog

NewDiscardPublishLog creates a new DiscardPublishLog.

func (*DiscardPublishLog) Publish ¶ added in v0.7.0

func (p *DiscardPublishLog) Publish(_ []*OrderBookLog)

Publish does nothing.

type EventHandler ¶ added in v0.8.0

type EventHandler[T any] interface {
	OnEvent(event *T)
}

EventHandler is the interface for processing events from the RingBuffer.

type InputEvent ¶ added in v0.8.0

type InputEvent struct {
	// Cmd is the external command carrier.
	Cmd *protocol.Command

	// Internal Query fields (Read Path)
	Query any // e.g. *protocol.GetDepthRequest
	Resp  chan any
}

InputEvent is the internal wrapper for all events entering the OrderBook Actor.

type MarketSegment ¶ added in v0.8.0

type MarketSegment struct {
	MarketID string `json:"market_id"`
	Offset   int64  `json:"offset"`   // Start offset in snapshot.bin (relative to file start)
	Length   int64  `json:"length"`   // Length in bytes
	Checksum uint32 `json:"checksum"` // CRC32 Checksum of this segment
}

MarketSegment contains metadata for a specific market's data within the snapshot binary file.

type MatchingEngine ¶

type MatchingEngine struct {
	// contains filtered or unexported fields
}

MatchingEngine manages multiple order books for different markets. It uses a single shared RingBuffer (Disruptor) for all commands, allowing the entire event loop to run on a single goroutine. This enables runtime.LockOSThread() for CPU affinity scenarios.

func NewMatchingEngine ¶

func NewMatchingEngine(engineID string, publishTrader PublishLog) *MatchingEngine

NewMatchingEngine creates a new matching engine instance.

func (*MatchingEngine) AmendOrder ¶ added in v0.7.0

func (engine *MatchingEngine) AmendOrder(_ context.Context, marketID string, cmd *protocol.AmendOrderCommand) error

AmendOrder modifies an existing order in the appropriate order book. Returns ErrShutdown if the engine is shutting down or ErrNotFound if market doesn't exist.

func (*MatchingEngine) CancelOrder ¶

func (engine *MatchingEngine) CancelOrder(
	_ context.Context,
	marketID string,
	cmd *protocol.CancelOrderCommand,
) error

CancelOrder cancels an order in the appropriate order book. Returns ErrShutdown if the engine is shutting down or ErrNotFound if market doesn't exist.

func (*MatchingEngine) CreateMarket ¶ added in v0.8.0

func (engine *MatchingEngine) CreateMarket(
	commandID string,
	userID uint64,
	marketID string,
	minLotSize string,
	timestamp int64,
) error

CreateMarket sends a command to create a new market.

func (*MatchingEngine) Depth ¶ added in v0.8.0

func (engine *MatchingEngine) Depth(marketID string, limit uint32) (*protocol.GetDepthResponse, error)

Depth returns the current depth of the order book for the specified market.

func (*MatchingEngine) EnqueueCommand ¶ added in v0.8.0

func (engine *MatchingEngine) EnqueueCommand(cmd *protocol.Command) error

EnqueueCommand routes the command to the Engine's shared RingBuffer. CreateMarket is handled synchronously so the OrderBook is immediately available.

func (*MatchingEngine) EnqueueCommandBatch ¶ added in v0.8.0

func (engine *MatchingEngine) EnqueueCommandBatch(cmds []*protocol.Command) error

EnqueueCommandBatch routes a batch of commands to the Engine's shared RingBuffer. It claims n contiguous slots in the RingBuffer to amortize synchronization overhead.

func (*MatchingEngine) GetStats ¶ added in v0.8.0

func (engine *MatchingEngine) GetStats(marketID string) (*protocol.GetStatsResponse, error)

GetStats returns usage statistics for the specified market.

func (*MatchingEngine) OnEvent ¶ added in v0.8.0

func (engine *MatchingEngine) OnEvent(ev *InputEvent)

OnEvent implements EventHandler[InputEvent] for the Engine's shared RingBuffer. It routes events to the appropriate OrderBook based on MarketID.

func (*MatchingEngine) PlaceOrder ¶

func (engine *MatchingEngine) PlaceOrder(_ context.Context, marketID string, cmd *protocol.PlaceOrderCommand) error

PlaceOrder adds an order to the appropriate order book based on the market ID. Returns ErrShutdown if the engine is shutting down or ErrNotFound if market doesn't exist.

func (*MatchingEngine) PlaceOrderBatch ¶ added in v0.8.0

func (engine *MatchingEngine) PlaceOrderBatch(
	_ context.Context,
	marketID string,
	cmds []*protocol.PlaceOrderCommand,
) error

PlaceOrderBatch adds multiple orders to the appropriate order book(s). This method performs serialization before acquiring RingBuffer slots, ensuring that serialization errors do not block or waste RingBuffer sequences.

func (*MatchingEngine) RestoreFromSnapshot ¶ added in v0.8.0

func (engine *MatchingEngine) RestoreFromSnapshot(inputDir string) (*SnapshotMetadata, error)

RestoreFromSnapshot restores the entire matching engine state from a snapshot in the specified directory. Returns the metadata from the snapshot for MQ replay positioning.

func (*MatchingEngine) ResumeMarket ¶ added in v0.8.0

func (engine *MatchingEngine) ResumeMarket(commandID string, userID uint64, marketID string, timestamp int64) error

ResumeMarket sends a command to resume a market.

func (*MatchingEngine) Run ¶ added in v0.8.0

func (engine *MatchingEngine) Run() error

Run starts the engine's event loop. This is a blocking call. The consumer loop runs on the calling goroutine, enabling the caller to control thread affinity via runtime.LockOSThread().

Usage:

go func() {
    runtime.LockOSThread()
    defer runtime.UnlockOSThread()
    engine.Run()
}()

func (*MatchingEngine) SendUserEvent ¶ added in v0.8.0

func (engine *MatchingEngine) SendUserEvent(
	commandID string,
	userID uint64,
	eventType string,
	key string,
	data []byte,
	timestamp int64,
) error

SendUserEvent sends a generic user event to the matching engine. These events are processed sequentially with trades and emitted via PublishLog.

func (*MatchingEngine) Shutdown ¶ added in v0.7.0

func (engine *MatchingEngine) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the engine. It blocks until all pending commands in the RingBuffer are processed or the context is canceled.

func (*MatchingEngine) SuspendMarket ¶ added in v0.8.0

func (engine *MatchingEngine) SuspendMarket(commandID string, userID uint64, marketID string, timestamp int64) error

SuspendMarket sends a command to suspend a market.

func (*MatchingEngine) TakeSnapshot ¶ added in v0.8.0

func (engine *MatchingEngine) TakeSnapshot(outputDir string) (*SnapshotMetadata, error)

TakeSnapshot captures a consistent snapshot of all order books and writes them to the specified directory. It generates two files: `snapshot.bin` (binary data) and `metadata.json` (metadata). Returns the metadata object or an error.

func (*MatchingEngine) UpdateConfig ¶ added in v0.8.0

func (engine *MatchingEngine) UpdateConfig(
	commandID string,
	userID uint64,
	marketID string,
	minLotSize string,
	timestamp int64,
) error

UpdateConfig sends a command to update market configuration.

type MemoryPublishLog ¶ added in v0.7.0

type MemoryPublishLog struct {
	Trades []*OrderBookLog
	// contains filtered or unexported fields
}

MemoryPublishLog stores logs in memory, useful for testing.

func NewMemoryPublishLog ¶ added in v0.7.0

func NewMemoryPublishLog() *MemoryPublishLog

NewMemoryPublishLog creates a new MemoryPublishLog.

func (*MemoryPublishLog) Count ¶ added in v0.7.0

func (m *MemoryPublishLog) Count() int

Count returns the number of logs stored.

func (*MemoryPublishLog) Get ¶ added in v0.7.0

func (m *MemoryPublishLog) Get(index int) *OrderBookLog

Get returns the log at the specified index.

func (*MemoryPublishLog) Logs ¶ added in v0.8.0

func (m *MemoryPublishLog) Logs() []*OrderBookLog

Logs returns a copy of all logs stored.

func (*MemoryPublishLog) Publish ¶ added in v0.7.0

func (m *MemoryPublishLog) Publish(trades []*OrderBookLog)

Publish appends logs to the in-memory slice.

type Order ¶

type Order struct {
	ID        string           `json:"id"`
	Side      Side             `json:"side"`
	Price     udecimal.Decimal `json:"price"`
	Size      udecimal.Decimal `json:"size"` // Remaining visible size
	Type      OrderType        `json:"type"`
	UserID    uint64           `json:"user_id"`
	Timestamp int64            `json:"timestamp"` // Unix nano, creation time

	// Iceberg fields
	VisibleLimit udecimal.Decimal `json:"visible_limit,omitzero"`
	HiddenSize   udecimal.Decimal `json:"hidden_size,omitzero"`
	// contains filtered or unexported fields
}

Order represents the state of an order in the order book. This is the serializable state used for snapshots.

type OrderBook ¶

type OrderBook struct {
	// contains filtered or unexported fields
}

OrderBook is a pure logic object that maintains the state of an order book. It must be managed by a MatchingEngine which provides the event loop.

func (*OrderBook) LastCmdSeqID ¶ added in v0.8.0

func (book *OrderBook) LastCmdSeqID() uint64

LastCmdSeqID returns the sequence ID of the last processed command.

func (*OrderBook) Restore ¶ added in v0.8.0

func (book *OrderBook) Restore(snap *OrderBookSnapshot)

Restore restores the order book state from a snapshot.

type OrderBookLog ¶ added in v0.8.0

type OrderBookLog struct {
	SeqID        uint64                `json:"seq_id"`
	CommandID    string                `json:"command_id,omitempty"`
	EngineID     string                `json:"engine_id,omitempty"`
	TradeID      uint64                `json:"trade_id,omitempty"` // Sequential trade ID, only set for Match events
	Type         protocol.LogType      `json:"type"`               // Event type: open, match, cancel, amend, reject
	MarketID     string                `json:"market_id"`
	Side         Side                  `json:"side"`
	Price        udecimal.Decimal      `json:"price"`
	Size         udecimal.Decimal      `json:"size"`
	Amount       udecimal.Decimal      `json:"amount,omitzero"` // Price * Size, only set for Match events
	OldPrice     udecimal.Decimal      `json:"old_price,omitzero"`
	OldSize      udecimal.Decimal      `json:"old_size,omitzero"`
	OrderID      string                `json:"order_id"`
	UserID       uint64                `json:"user_id"`
	OrderType    OrderType             `json:"order_type,omitempty"` // Order type: limit, market, ioc, fok
	MakerOrderID string                `json:"maker_order_id,omitempty"`
	MakerUserID  uint64                `json:"maker_user_id,omitempty"`
	RejectReason protocol.RejectReason `json:"reject_reason,omitempty"` // Reason for rejection, only set for Reject events
	EventType    string                `json:"event_type,omitempty"`    // User defined event type
	Data         []byte                `json:"data,omitempty"`          // Arbitrary data for user events
	Timestamp    int64                 `json:"timestamp"`               // Command timestamp for determinism
}

OrderBookLog represents an event in the order book. SequenceID is a globally increasing ID for every event, used for ordering, deduplication, and rebuild synchronization in downstream systems. Use LogType to determine if the event affects order book state: - Open, Match, Cancel, Amend, Admin: affect order book state or lifecycle metadata - Reject: does not affect order book state.

func NewAdminLog ¶ added in v0.8.0

func NewAdminLog(
	seqID uint64,
	commandID, engineID, marketID string,
	userID uint64,
	eventType string,
	timestamp int64,
) *OrderBookLog

NewAdminLog creates a new OrderBookLog for a successful management event.

func NewAmendLog ¶ added in v0.8.0

func NewAmendLog(
	seqID uint64,
	commandID, engineID, marketID string,
	orderID string,
	userID uint64,
	side Side,
	price, size udecimal.Decimal,
	oldPrice udecimal.Decimal,
	oldSize udecimal.Decimal,
	orderType OrderType,
	timestamp int64,
) *OrderBookLog

NewAmendLog creates a new OrderBookLog for an order amendment event.

func NewCancelLog ¶ added in v0.8.0

func NewCancelLog(
	seqID uint64,
	commandID, engineID, marketID string,
	orderID string,
	userID uint64,
	side Side,
	price, size udecimal.Decimal,
	orderType OrderType,
	timestamp int64,
) *OrderBookLog

NewCancelLog creates a new OrderBookLog for an order cancellation event.

func NewMatchLog ¶ added in v0.8.0

func NewMatchLog(
	seqID uint64,
	commandID, engineID string,
	tradeID uint64,
	marketID string,
	takerID string,
	takerUserID uint64,
	takerSide Side,
	takerType OrderType,
	makerID string,
	makerUserID uint64,
	price udecimal.Decimal,
	size udecimal.Decimal,
	timestamp int64,
) *OrderBookLog

NewMatchLog creates a new OrderBookLog for a trade match event.

func NewOpenLog ¶ added in v0.8.0

func NewOpenLog(
	seqID uint64,
	commandID, engineID, marketID string,
	orderID string,
	userID uint64,
	side Side,
	price, size udecimal.Decimal,
	orderType OrderType,
	timestamp int64,
) *OrderBookLog

NewOpenLog creates a new OrderBookLog for an open order event.

func NewRejectLog ¶ added in v0.8.0

func NewRejectLog(
	seqID uint64,
	commandID, engineID, marketID string,
	orderID string,
	userID uint64,
	reason protocol.RejectReason,
	timestamp int64,
) *OrderBookLog

NewRejectLog creates a new OrderBookLog for an order rejection event.

func NewUserEventLog ¶ added in v0.8.0

func NewUserEventLog(
	seqID uint64,
	commandID, engineID string,
	userID uint64,
	eventType string,
	key string,
	data []byte,
	timestamp int64,
) *OrderBookLog

NewUserEventLog creates a new OrderBookLog for a generic user event.

type OrderBookOption ¶ added in v0.8.0

type OrderBookOption func(*OrderBook)

OrderBookOption configures an OrderBook.

func WithLotSize ¶ added in v0.8.0

func WithLotSize(size udecimal.Decimal) OrderBookOption

WithLotSize sets the minimum trade unit for the order book. When a Market order's calculated match size is less than this value, the order will be rejected with remaining funds returned. Default: 1e-8 (0.00000001) as a safety fallback.

type OrderBookSnapshot ¶ added in v0.8.0

type OrderBookSnapshot struct {
	MarketID     string                  `json:"market_id"`
	SeqID        uint64                  `json:"seq_id"`          // Current BookLog sequence ID
	LastCmdSeqID uint64                  `json:"last_cmd_seq_id"` // Last processed command sequence ID from MQ
	TradeID      uint64                  `json:"trade_id"`        // Current Trade sequence ID
	Bids         []*Order                `json:"bids"`            // Ordered list of bids (best price first)
	Asks         []*Order                `json:"asks"`            // Ordered list of asks (best price first)
	State        protocol.OrderBookState `json:"state"`
	MinLotSize   udecimal.Decimal        `json:"min_lot_size"`
}

OrderBookSnapshot contains the full state of a single OrderBook.

type OrderType ¶

type OrderType = protocol.OrderType

OrderType represents the type of order.

const (
	// Market order.
	Market OrderType = protocol.OrderTypeMarket
	// Limit order.
	Limit OrderType = protocol.OrderTypeLimit
	// FOK order (Fill or Kill).
	FOK OrderType = protocol.OrderTypeFOK
	// IOC order (Immediate or Cancel).
	IOC OrderType = protocol.OrderTypeIOC
	// PostOnly order.
	PostOnly OrderType = protocol.OrderTypePostOnly
	// Cancel order.
	Cancel OrderType = protocol.OrderTypeCancel
)

type PublishLog ¶ added in v0.7.0

type PublishLog interface {
	// Publish publishes order book logs. The slice comes from logSlicePool.
	Publish(logs []*OrderBookLog)
}

PublishLog is an interface for publishing order book logs (trades, opens, cancels).

IMPORTANT: Implementations must either:

  1. Process logs synchronously before returning, OR
  2. Clone the BookLog data before returning

The caller recycles BookLog objects to a sync.Pool after Publish returns, so any asynchronous processing must work with cloned data.

type RebuildFunc ¶ added in v0.7.0

type RebuildFunc func() (*Snapshot, error)

RebuildFunc is the callback type for fetching a snapshot during rebuild. Implementations should fetch the current order book snapshot from external sources (e.g., Redis, Database, API) and return it for the AggregatedBook to apply.

type RingBuffer ¶ added in v0.8.0

type RingBuffer[T any] struct {
	// contains filtered or unexported fields
}

RingBuffer is a lock-free MPSC (Multi-Producer Single-Consumer) ring buffer. It provides high-throughput, zero-allocation event passing between goroutines.

func NewRingBuffer ¶ added in v0.8.0

func NewRingBuffer[T any](capacity int64, handler EventHandler[T]) *RingBuffer[T]

NewRingBuffer creates a new MPSC RingBuffer. The capacity must be a power of 2.

func (*RingBuffer[T]) Claim ¶ added in v0.8.0

func (rb *RingBuffer[T]) Claim() (int64, *T)

Claim atomically claims a sequence and returns a pointer to the slot. Returns (-1, nil) if the RingBuffer is shut down. The caller should write to the slot and then call Commit(seq).

func (*RingBuffer[T]) ClaimN ¶ added in v0.8.0

func (rb *RingBuffer[T]) ClaimN(n int64) (start int64, end int64)

ClaimN atomically claims n sequences and returns the start and end sequence numbers. Returns (-1, -1) if the RingBuffer is shut down. The caller should write to the slots from startSeq to endSeq and then call CommitN(startSeq, endSeq).

func (*RingBuffer[T]) Commit ¶ added in v0.8.0

func (rb *RingBuffer[T]) Commit(seq int64)

Commit marks the slot as published, making it visible to the consumer.

func (*RingBuffer[T]) CommitN ¶ added in v0.8.0

func (rb *RingBuffer[T]) CommitN(startSeq int64, endSeq int64)

CommitN marks the slots from startSeq to endSeq as published, making them visible to the consumer.

func (*RingBuffer[T]) ConsumerSequence ¶ added in v0.8.0

func (rb *RingBuffer[T]) ConsumerSequence() int64

ConsumerSequence returns the current consumer sequence (for monitoring).

func (*RingBuffer[T]) GetPendingEvents ¶ added in v0.8.0

func (rb *RingBuffer[T]) GetPendingEvents() int64

GetPendingEvents returns the number of pending events (for monitoring).

func (*RingBuffer[T]) ProducerSequence ¶ added in v0.8.0

func (rb *RingBuffer[T]) ProducerSequence() int64

ProducerSequence returns the current producer sequence (for monitoring).

func (*RingBuffer[T]) Publish ¶ added in v0.8.0

func (rb *RingBuffer[T]) Publish(event T)

Publish publishes an event to the ring buffer (multi-producer safe). This is a convenience method that wraps Claim() and Commit().

func (*RingBuffer[T]) Run ¶ added in v0.8.0

func (rb *RingBuffer[T]) Run()

Run starts the consumer loop in the calling goroutine (blocking). This allows the caller to control the goroutine and OS thread, enabling runtime.LockOSThread() for CPU affinity scenarios.

func (*RingBuffer[T]) Shutdown ¶ added in v0.8.0

func (rb *RingBuffer[T]) Shutdown(ctx context.Context) error

Shutdown gracefully stops the disruptor, ensuring all pending events are processed. It blocks until all events are processed or the context is canceled.

type Side ¶

type Side = protocol.Side

Side represents the order side (Buy or Sell).

const (
	// Buy side.
	Buy Side = protocol.SideBuy
	// Sell side.
	Sell Side = protocol.SideSell
)

type Snapshot ¶ added in v0.7.0

type Snapshot struct {
	SequenceID uint64                // The sequence ID at which this snapshot was taken
	Asks       []*protocol.DepthItem // Ask side depth levels, sorted by price ascending
	Bids       []*protocol.DepthItem // Bid side depth levels, sorted by price descending
}

Snapshot represents a point-in-time state of the order book. Used to initialize or reset the AggregatedBook during rebuild.

type SnapshotFileFooter ¶ added in v0.8.0

type SnapshotFileFooter struct {
	Markets []MarketSegment `json:"markets"` // Index of market data in this file
}

SnapshotFileFooter is the footer structure stored at the end of snapshot.bin. Layout: [BinaryData...][FooterJSON][FooterLength(4 bytes)].

type SnapshotMetadata ¶ added in v0.8.0

type SnapshotMetadata struct {
	SchemaVersion      int    `json:"schema_version"`         // Snapshot schema version for backward compatibility
	Timestamp          int64  `json:"timestamp"`              // Unix Nano
	GlobalLastCmdSeqID uint64 `json:"global_last_cmd_seq_id"` // Global MQ offset to resume from (max of all markets)
	EngineVersion      string `json:"engine_version"`         // Engine version
	SnapshotChecksum   uint32 `json:"snapshot_checksum"`      // CRC32 of the entire snapshot.bin file
}

SnapshotMetadata holds the global metadata for a snapshot (stored in metadata.json).

type UpdateEvent ¶

type UpdateEvent struct {
	Price string `json:"price"`
	Size  string `json:"size"`
}

UpdateEvent represents a change in price or size at a specific price level.

Directories ¶

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL