Documentation
¶
Index ¶
- Constants
- Variables
- func DisableLog()
- func DispatchConcurrent(b Blockbeat, consumers []Consumer) error
- func DispatchSequential(b Blockbeat, consumers []Consumer) error
- func UseLogger(logger btclog.Logger)
- type Beat
- type BeatConsumer
- type Blockbeat
- type BlockbeatDispatcher
- type Consumer
- type MockBlockbeat
- type MockConsumer
Constants ¶
const Subsystem = "CHIO"
Subsystem defines the logging code for this subsystem.
Variables ¶
var DefaultProcessBlockTimeout = 60 * time.Second
DefaultProcessBlockTimeout is the timeout value used when waiting for one consumer to finish processing the new block epoch.
var ErrProcessBlockTimeout = errors.New("process block timeout")
ErrProcessBlockTimeout is the error returned when a consumer takes too long to process the block.
Functions ¶
func DisableLog ¶
func DisableLog()
DisableLog disables all library log output. Logging output is disabled by default until UseLogger is called.
func DispatchConcurrent ¶
DispatchConcurrent notifies each consumer concurrently about the blockbeat. It requires the consumer to finish processing the block within the specified time, otherwise a timeout error is returned.
func DispatchSequential ¶
DispatchSequential takes a list of consumers and notify them about the new epoch sequentially. It requires the consumer to finish processing the block within the specified time, otherwise a timeout error is returned.
Types ¶
type Beat ¶
type Beat struct {
// contains filtered or unexported fields
}
Beat implements the Blockbeat interface. It contains the block epoch and a customized logger.
TODO(yy): extend this to check for confirmation status - which serves as the single source of truth, to avoid the potential race between receiving blocks and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.
func NewBeat ¶
func NewBeat(epoch chainntnfs.BlockEpoch) *Beat
NewBeat creates a new beat with the specified block epoch and a customized logger.
type BeatConsumer ¶
type BeatConsumer struct {
// BlockbeatChan is a channel to receive blocks from Blockbeat. The
// received block contains the best known height and the txns confirmed
// in this block.
BlockbeatChan chan Blockbeat
// contains filtered or unexported fields
}
BeatConsumer defines a supplementary component that should be used by subsystems which implement the `Consumer` interface. It partially implements the `Consumer` interface by providing the method `ProcessBlock` such that subsystems don't need to re-implement it.
While inheritance is not commonly used in Go, subsystems embedding this struct cannot pass the interface check for `Consumer` because the `Name` method is not implemented, which gives us a "mortise and tenon" structure. In addition to reducing code duplication, this design allows `ProcessBlock` to work on the concrete type `Beat` to access its internal states.
func NewBeatConsumer ¶
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer
NewBeatConsumer creates a new BlockConsumer.
func (*BeatConsumer) NotifyBlockProcessed ¶
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error)
NotifyBlockProcessed signals that the block has been processed. It takes the blockbeat being processed and an error resulted from processing it. This error is then sent back to the consumer's err chan to unblock `ProcessBlock`.
NOTE: This method must be called by the subsystem after it has finished processing the block.
func (*BeatConsumer) ProcessBlock ¶
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error
ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat channel. It will send it to the subsystem's BlockbeatChan, and block until the processed result is received from the subsystem. The subsystem must call `NotifyBlockProcessed` after it has finished processing the block.
NOTE: part of the `chainio.Consumer` interface.
type Blockbeat ¶
type Blockbeat interface {
// Height returns the current block height.
Height() int32
// contains filtered or unexported methods
}
Blockbeat defines an interface that can be used by subsystems to retrieve block data. It is sent by the BlockbeatDispatcher to all the registered consumers whenever a new block is received. Once the consumer finishes processing the block, it must signal it by calling `NotifyBlockProcessed`.
The blockchain is a state machine - whenever there's a state change, it's manifested in a block. The blockbeat is a way to notify subsystems of this state change, and to provide them with the data they need to process it. In other words, subsystems must react to this state change and should consider being driven by the blockbeat in their own state machines.
type BlockbeatDispatcher ¶
type BlockbeatDispatcher struct {
// contains filtered or unexported fields
}
BlockbeatDispatcher is a service that handles dispatching new blocks to `lnd`'s subsystems. During startup, subsystems that are block-driven should implement the `Consumer` interface and register themselves via `RegisterQueue`. When two subsystems are independent of each other, they should be registered in different queues so blocks are notified concurrently. Otherwise, when living in the same queue, the subsystems are notified of the new blocks sequentially, which means it's critical to understand the relationship of these systems to properly handle the order.
func NewBlockbeatDispatcher ¶
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher
NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
func (*BlockbeatDispatcher) CurrentHeight ¶
func (b *BlockbeatDispatcher) CurrentHeight() int32
CurrentHeight returns the current best height known to the dispatcher. 0 is returned if the dispatcher is shutting down.
func (*BlockbeatDispatcher) RegisterQueue ¶
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer)
RegisterQueue takes a list of consumers and registers them in the same queue.
NOTE: these consumers are notified sequentially.
func (*BlockbeatDispatcher) Start ¶
func (b *BlockbeatDispatcher) Start() error
Start starts the blockbeat dispatcher - it registers a block notification and monitors and dispatches new blocks in a goroutine. It will refuse to start if there are no registered consumers.
func (*BlockbeatDispatcher) Stop ¶
func (b *BlockbeatDispatcher) Stop()
Stop shuts down the blockbeat dispatcher.
type Consumer ¶
type Consumer interface {
// Name returns a human-readable string for this subsystem.
Name() string
// ProcessBlock takes a blockbeat and processes it. It should not
// return until the subsystem has updated its state based on the block
// data.
//
// NOTE: The consumer must try its best to NOT return an error. If an
// error is returned from processing the block, it means the subsystem
// cannot react to onchain state changes and lnd will shutdown.
ProcessBlock(b Blockbeat) error
}
Consumer defines a blockbeat consumer interface. Subsystems that need block info must implement it.
type MockBlockbeat ¶
MockBlockbeat is a mock implementation of the Blockbeat interface.
func (*MockBlockbeat) Height ¶
func (m *MockBlockbeat) Height() int32
Height returns the current block height.
type MockConsumer ¶
MockConsumer is a mock implementation of the Consumer interface.
func (*MockConsumer) Name ¶
func (m *MockConsumer) Name() string
Name returns a human-readable string for this subsystem.
func (*MockConsumer) ProcessBlock ¶
func (m *MockConsumer) ProcessBlock(b Blockbeat) error
ProcessBlock takes a blockbeat and processes it. A receive-only error chan must be returned.