consensus

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2025 License: GPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RoleUnknown  = "unknown"
	RoleFollower = "follower"
	RoleLeader   = "leader"

	StateUnknownStr = "unknown"
	StateCommitStr  = "commit"
	StateAbortStr   = "abort"
)

Variables

This section is empty.

Functions

func ChainKeyBytes

func ChainKeyBytes(id []byte) string

ChainKeyBytes converts a raw chain-id byte slice into the canonical hex-encoded key used internally by the consensus coordinator.

func ChainKeyUint64

func ChainKeyUint64(id uint64) string

ChainKeyUint64 converts a numeric chain-id into the canonical hex-encoded key by first encoding the number as minimal-length, big-endian bytes.

func IsSCPMessage

func IsSCPMessage(msg *pb.Message) bool

IsSCPMessage returns true if the message belongs to SCP protocol

Types

type BlockFn

type BlockFn func(ctx context.Context, block *types.Block, xtIDs []*pb.XtID) error

BlockFn sends a block plus committed xTs to the SP layer

type CallbackManager

type CallbackManager struct {
	// contains filtered or unexported fields
}

CallbackManager manages coordinator callbacks with error handling and timeouts

func NewCallbackManager

func NewCallbackManager(timeout time.Duration, log zerolog.Logger) *CallbackManager

NewCallbackManager creates a new callback manager

func (*CallbackManager) InvokeBlock

func (cm *CallbackManager) InvokeBlock(ctx context.Context, block *types.Block, xtIDs []*pb.XtID)

InvokeBlock calls the block callback with timeout and error handling

func (*CallbackManager) InvokeDecision

func (cm *CallbackManager) InvokeDecision(xtID *pb.XtID, decision bool, duration time.Duration)

InvokeDecision calls the decision callback with timeout and error handling

func (*CallbackManager) InvokeStart

func (cm *CallbackManager) InvokeStart(ctx context.Context, from string, xtReq *pb.XTRequest)

InvokeStart calls the start callback with timeout and error handling

func (*CallbackManager) InvokeVote

func (cm *CallbackManager) InvokeVote(xtID *pb.XtID, vote bool, duration time.Duration)

InvokeVote calls the vote callback with timeout and error handling

func (*CallbackManager) SetBlockCallback

func (cm *CallbackManager) SetBlockCallback(fn BlockFn)

SetBlockCallback sets the block callback

func (*CallbackManager) SetDecisionCallback

func (cm *CallbackManager) SetDecisionCallback(fn DecisionFn)

SetDecisionCallback sets the decision callback

func (*CallbackManager) SetStartCallback

func (cm *CallbackManager) SetStartCallback(fn StartFn)

SetStartCallback sets the start callback

func (*CallbackManager) SetVoteCallback

func (cm *CallbackManager) SetVoteCallback(fn VoteFn)

SetVoteCallback sets the vote callback

type Config

type Config struct {
	NodeID   string
	IsLeader bool
	Timeout  time.Duration
	Role     Role
}

Config holds coordinator configuration

func DefaultConfig

func DefaultConfig(nodeID string) Config

DefaultConfig returns sensible defaults

type Coordinator

type Coordinator interface {
	// Transaction lifecycle
	StartTransaction(ctx context.Context, from string, xtReq *pb.XTRequest) error
	RecordVote(xtID *pb.XtID, chainID string, vote bool) (DecisionState, error)
	RecordDecision(xtID *pb.XtID, decision bool) error
	GetTransactionState(xtID *pb.XtID) (DecisionState, error)
	GetActiveTransactions() []*pb.XtID
	GetState(xtID *pb.XtID) (*TwoPCState, bool)
	RemoveState(xtID *pb.XtID)

	// CIRC message handling
	RecordCIRCMessage(circMessage *pb.CIRCMessage) error
	ConsumeCIRCMessage(xtID *pb.XtID, sourceChainID string) (*pb.CIRCMessage, error)

	// Callbacks
	SetStartCallback(fn StartFn)
	SetVoteCallback(fn VoteFn)
	SetDecisionCallback(fn DecisionFn)
	SetBlockCallback(fn BlockFn)

	// OnBlockCommitted is called by the execution layer when a new L2 block is committed and available
	// Implementations should gather committed xTs and trigger any registered BlockFn callback
	OnBlockCommitted(ctx context.Context, block *types.Block) error

	// OnL2BlockCommitted is called by sequencer SBCP path when a pb.L2Block is sealed and submitted
	OnL2BlockCommitted(ctx context.Context, block *pb.L2Block) error

	// Lifecycle
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

Coordinator defines the consensus coordinator interface

func New

func New(log zerolog.Logger, config Config) Coordinator

New creates a new coordinator instance

func NewWithMetrics

func NewWithMetrics(log zerolog.Logger, config Config, metrics MetricsRecorder) Coordinator

NewWithMetrics creates a new coordinator instance with custom metrics recorder TODO: check best practices for metrics recorder

type DecisionFn

type DecisionFn func(ctx context.Context, xtID *pb.XtID, decision bool) error

type DecisionState

type DecisionState int

DecisionState represents 2PC transaction states

const (
	StateUndecided DecisionState = iota
	StateCommit
	StateAbort
)

func (DecisionState) String

func (s DecisionState) String() string

type MessageType

type MessageType int

MessageType represents SCP protocol message types

const (
	MsgUnknown     MessageType = iota
	MsgXTRequest               // Cross-chain transaction request
	MsgVote                    // Sequencer vote
	MsgDecided                 // SP decision
	MsgCIRCMessage             // Inter-rollup communication
)

func ClassifyMessage

func ClassifyMessage(msg *pb.Message) MessageType

ClassifyMessage returns an SCP message type from a protobuf message

func (MessageType) IsValid

func (t MessageType) IsValid() bool

IsValid returns true if a message type is valid

func (MessageType) String

func (t MessageType) String() string

String returns a human-readable message type name

type Metrics

type Metrics struct {
	TransactionsTotal          *prometheus.CounterVec
	ActiveTransactions         prometheus.Gauge
	Duration                   *prometheus.HistogramVec
	VotesReceived              *prometheus.CounterVec
	VoteLatency                *prometheus.HistogramVec
	Timeouts                   prometheus.Counter
	ParticipantsPerTransaction prometheus.Histogram
	DecisionsBroadcast         *prometheus.CounterVec
	VoteBroadcast              *prometheus.CounterVec

	// New performance metrics
	StateManagerSize  prometheus.Gauge
	CallbackLatency   *prometheus.HistogramVec
	CIRCMessagesTotal *prometheus.CounterVec
	// contains filtered or unexported fields
}

Metrics holds all consensus-level metrics

func NewMetrics

func NewMetrics() *Metrics

NewMetrics creates consensus metrics

func (*Metrics) RecordDecisionBroadcast

func (m *Metrics) RecordDecisionBroadcast(decision bool)

RecordDecisionBroadcast records a decision broadcast

func (*Metrics) RecordTimeout

func (m *Metrics) RecordTimeout()

RecordTimeout records a timeout

func (*Metrics) RecordTransactionCompleted

func (m *Metrics) RecordTransactionCompleted(state string, duration time.Duration)

RecordTransactionCompleted records a transaction completion

func (*Metrics) RecordTransactionStarted

func (m *Metrics) RecordTransactionStarted(participantCount int)

RecordTransactionStarted records a transaction start

func (*Metrics) RecordVote

func (m *Metrics) RecordVote(chainID string, vote bool, latency time.Duration)

RecordVote records a vote received

func (*Metrics) RecordVoteBroadcast

func (m *Metrics) RecordVoteBroadcast(vote bool)

RecordVoteBroadcast records a vote broadcast

type MetricsRecorder

type MetricsRecorder interface {
	RecordTransactionStarted(participantCount int)
	RecordTransactionCompleted(state string, duration time.Duration)
	RecordVote(chainID string, vote bool, latency time.Duration)
	RecordTimeout()
	RecordDecisionBroadcast(decision bool)
	RecordVoteBroadcast(vote bool)
}

MetricsRecorder defines the interface for recording consensus metrics

func NewNoOpMetrics

func NewNoOpMetrics() MetricsRecorder

NewNoOpMetrics creates a new no-op metrics recorder for testing

type NoOpMetrics

type NoOpMetrics struct{}

NoOpMetrics provides a no-op implementation of MetricsRecorder for testing

func (*NoOpMetrics) RecordDecisionBroadcast

func (n *NoOpMetrics) RecordDecisionBroadcast(decision bool)

func (*NoOpMetrics) RecordTimeout

func (n *NoOpMetrics) RecordTimeout()

func (*NoOpMetrics) RecordTransactionCompleted

func (n *NoOpMetrics) RecordTransactionCompleted(state string, duration time.Duration)

func (*NoOpMetrics) RecordTransactionStarted

func (n *NoOpMetrics) RecordTransactionStarted(participantCount int)

func (*NoOpMetrics) RecordVote

func (n *NoOpMetrics) RecordVote(chainID string, vote bool, latency time.Duration)

func (*NoOpMetrics) RecordVoteBroadcast

func (n *NoOpMetrics) RecordVoteBroadcast(vote bool)

type ProtocolHandler

type ProtocolHandler interface {
	// Handle processes SCP protocol messages
	Handle(ctx context.Context, from string, msg *pb.Message) error

	// CanHandle returns true if this handler can process the message
	CanHandle(msg *pb.Message) bool

	// GetProtocolName returns the protocol name for logging/debugging
	GetProtocolName() string
}

ProtocolHandler defines the interface for SCP protocol message handling

func NewProtocolHandler

func NewProtocolHandler(coordinator Coordinator, log zerolog.Logger) ProtocolHandler

NewProtocolHandler creates a new SCP protocol handler

type Role

type Role int

Role represents the coordinator role

const (
	Follower Role = iota
	Leader
)

func (Role) String

func (r Role) String() string

type StartFn

type StartFn func(ctx context.Context, from string, xtReq *pb.XTRequest) error

Callback function types

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager manages transaction states with thread-safety and performance optimizations

func NewStateManager

func NewStateManager() *StateManager

NewStateManager creates a new state manager

func (*StateManager) AddState

func (sm *StateManager) AddState(xtID *pb.XtID, req *pb.XTRequest, chains map[string]struct{}) (*TwoPCState, error)

AddState adds a new transaction state

func (*StateManager) GetAllActiveIDs

func (sm *StateManager) GetAllActiveIDs() []*pb.XtID

GetAllActiveIDs returns all active transaction IDs

func (*StateManager) GetState

func (sm *StateManager) GetState(xtID *pb.XtID) (*TwoPCState, bool)

GetState retrieves a transaction state

func (*StateManager) GetStats

func (sm *StateManager) GetStats() map[string]interface{}

GetStats returns state manager statistics

func (*StateManager) RemoveState

func (sm *StateManager) RemoveState(xtID *pb.XtID)

RemoveState removes a transaction state

func (*StateManager) Shutdown

func (sm *StateManager) Shutdown()

Shutdown stops the state manager

type TwoPCState

type TwoPCState struct {
	XTID                *pb.XtID
	Decision            DecisionState
	ParticipatingChains map[string]struct{}
	Votes               map[string]bool
	Timer               *time.Timer
	StartTime           time.Time
	XTRequest           *pb.XTRequest
	CIRCMessages        map[string][]*pb.CIRCMessage
	// contains filtered or unexported fields
}

TwoPCState holds state for a single cross-chain transaction

func NewTwoPCState

func NewTwoPCState(xtID *pb.XtID, req *pb.XTRequest, chains map[string]struct{}) *TwoPCState

NewTwoPCState creates a new 2PC state

func (*TwoPCState) AddVote

func (t *TwoPCState) AddVote(chainID string, vote bool) bool

AddVote atomically adds a vote if not already present

func (*TwoPCState) GetDecision

func (t *TwoPCState) GetDecision() DecisionState

GetDecision atomically gets the decision state

func (*TwoPCState) GetDuration

func (t *TwoPCState) GetDuration() time.Duration

GetDuration returns duration since transaction started (thread-safe)

func (*TwoPCState) GetParticipantCount

func (t *TwoPCState) GetParticipantCount() int

GetParticipantCount returns the number of participating chains (thread-safe)

func (*TwoPCState) GetVoteCount

func (t *TwoPCState) GetVoteCount() int

GetVoteCount returns the number of votes received (thread-safe)

func (*TwoPCState) GetVotes

func (t *TwoPCState) GetVotes() map[string]bool

GetVotes returns a copy of current votes (thread-safe)

func (*TwoPCState) IsComplete

func (t *TwoPCState) IsComplete() bool

IsComplete returns true if transaction has reached final decision (thread-safe)

func (*TwoPCState) SetDecision

func (t *TwoPCState) SetDecision(decision DecisionState)

SetDecision atomically sets the decision state

type VoteFn

type VoteFn func(ctx context.Context, xtID *pb.XtID, vote bool) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL