chainio

package
v0.19.3-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: MIT Imports: 11 Imported by: 0

README

Chainio

chainio is a package designed to provide blockchain data access to various subsystems within lnd. When a new block is received, it is encapsulated in a Blockbeat object and disseminated to all registered consumers. Consumers may receive these updates either concurrently or sequentially, based on their registration configuration, ensuring that each subsystem maintains a synchronized view of the current block state.

The main components include:

  • Blockbeat: An interface that provides information about the block.

  • Consumer: An interface that specifies how subsystems handle the blockbeat.

  • BlockbeatDispatcher: The core service responsible for receiving each block and distributing it to all consumers.

Additionally, the BeatConsumer struct provides a partial implementation of the Consumer interface. This struct helps reduce code duplication, allowing subsystems to avoid re-implementing the ProcessBlock method and provides a commonly used NotifyBlockProcessed method.

Register a Consumer

Consumers within the same queue are notified sequentially, while all queues are notified concurrently. A queue consists of a slice of consumers, which are notified in left-to-right order. Developers are responsible for determining dependencies in block consumption across subsystems: independent subsystems should be notified concurrently, whereas dependent subsystems should be notified sequentially.

To notify the consumers concurrently, put them in different queues,

// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)

queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)

To notify the consumers sequentially, put them in the same queue,

// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
   consumer1,
   consumer2,
   consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)
Implement the Consumer Interface

Implementing the Consumer interface is straightforward. Below is an example of how sweep.TxPublisher implements this interface.

To start, embed the partial implementation chainio.BeatConsumer, which already provides the ProcessBlock implementation and commonly used NotifyBlockProcessed method, and exposes BlockbeatChan for the consumer to receive blockbeats.

type TxPublisher struct {
   started atomic.Bool
   stopped atomic.Bool

   chainio.BeatConsumer

   ...

We should also remember to initialize this BeatConsumer,

...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())

Finally, in the main event loop, read from BlockbeatChan, process the received blockbeat, and, crucially, call tp.NotifyBlockProcessed to inform the blockbeat dispatcher that processing is complete.

for {
      select {
      case beat := <-tp.BlockbeatChan:
         // Consume this blockbeat, usually it means updating the subsystem
         // using the new block data.

         // Notify we've processed the block.
         tp.NotifyBlockProcessed(beat, nil)

      ...
Existing Queues

Currently, we have a single queue of consumers dedicated to handling force closures. This queue includes ChainArbitrator, UtxoSweeper, and TxPublisher, with ChainArbitrator managing two internal consumers: chainWatcher and ChannelArbitrator. The blockbeat flows sequentially through the chain as follows: ChainArbitrator => chainWatcher => ChannelArbitrator => UtxoSweeper => TxPublisher. The following diagram illustrates the flow within the public subsystems.

sequenceDiagram
		autonumber
		participant bb as BlockBeat
		participant cc as ChainArb
		participant us as UtxoSweeper
		participant tp as TxPublisher
		
		note left of bb: 0. received block x,<br>dispatching...
		
    note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
		bb->>cc: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        cc->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end

    note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
		bb->>us: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        us->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end

    note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
		bb->>tp: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        tp->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end

Documentation

Index

Constants

View Source
const Subsystem = "CHIO"

Subsystem defines the logging code for this subsystem.

Variables

View Source
var DefaultProcessBlockTimeout = 60 * time.Second

DefaultProcessBlockTimeout is the timeout value used when waiting for one consumer to finish processing the new block epoch.

View Source
var ErrProcessBlockTimeout = errors.New("process block timeout")

ErrProcessBlockTimeout is the error returned when a consumer takes too long to process the block.

Functions

func DisableLog

func DisableLog()

DisableLog disables all library log output. Logging output is disabled by default until UseLogger is called.

func DispatchConcurrent

func DispatchConcurrent(b Blockbeat, consumers []Consumer) error

DispatchConcurrent notifies each consumer concurrently about the blockbeat. It requires the consumer to finish processing the block within the specified time, otherwise a timeout error is returned.

func DispatchSequential

func DispatchSequential(b Blockbeat, consumers []Consumer) error

DispatchSequential takes a list of consumers and notify them about the new epoch sequentially. It requires the consumer to finish processing the block within the specified time, otherwise a timeout error is returned.

func UseLogger

func UseLogger(logger btclog.Logger)

UseLogger uses a specified Logger to output package logging info. This should be used in preference to SetLogWriter if the caller is also using btclog.

Types

type Beat

type Beat struct {
	// contains filtered or unexported fields
}

Beat implements the Blockbeat interface. It contains the block epoch and a customized logger.

TODO(yy): extend this to check for confirmation status - which serves as the single source of truth, to avoid the potential race between receiving blocks and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.

func NewBeat

func NewBeat(epoch chainntnfs.BlockEpoch) *Beat

NewBeat creates a new beat with the specified block epoch and a customized logger.

func (*Beat) Height

func (b *Beat) Height() int32

Height returns the height of the block epoch.

NOTE: Part of the Blockbeat interface.

type BeatConsumer

type BeatConsumer struct {
	// BlockbeatChan is a channel to receive blocks from Blockbeat. The
	// received block contains the best known height and the txns confirmed
	// in this block.
	BlockbeatChan chan Blockbeat
	// contains filtered or unexported fields
}

BeatConsumer defines a supplementary component that should be used by subsystems which implement the `Consumer` interface. It partially implements the `Consumer` interface by providing the method `ProcessBlock` such that subsystems don't need to re-implement it.

While inheritance is not commonly used in Go, subsystems embedding this struct cannot pass the interface check for `Consumer` because the `Name` method is not implemented, which gives us a "mortise and tenon" structure. In addition to reducing code duplication, this design allows `ProcessBlock` to work on the concrete type `Beat` to access its internal states.

func NewBeatConsumer

func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer

NewBeatConsumer creates a new BlockConsumer.

func (*BeatConsumer) NotifyBlockProcessed

func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error)

NotifyBlockProcessed signals that the block has been processed. It takes the blockbeat being processed and an error resulted from processing it. This error is then sent back to the consumer's err chan to unblock `ProcessBlock`.

NOTE: This method must be called by the subsystem after it has finished processing the block.

func (*BeatConsumer) ProcessBlock

func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error

ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat channel. It will send it to the subsystem's BlockbeatChan, and block until the processed result is received from the subsystem. The subsystem must call `NotifyBlockProcessed` after it has finished processing the block.

NOTE: part of the `chainio.Consumer` interface.

type Blockbeat

type Blockbeat interface {

	// Height returns the current block height.
	Height() int32
	// contains filtered or unexported methods
}

Blockbeat defines an interface that can be used by subsystems to retrieve block data. It is sent by the BlockbeatDispatcher to all the registered consumers whenever a new block is received. Once the consumer finishes processing the block, it must signal it by calling `NotifyBlockProcessed`.

The blockchain is a state machine - whenever there's a state change, it's manifested in a block. The blockbeat is a way to notify subsystems of this state change, and to provide them with the data they need to process it. In other words, subsystems must react to this state change and should consider being driven by the blockbeat in their own state machines.

type BlockbeatDispatcher

type BlockbeatDispatcher struct {
	// contains filtered or unexported fields
}

BlockbeatDispatcher is a service that handles dispatching new blocks to `lnd`'s subsystems. During startup, subsystems that are block-driven should implement the `Consumer` interface and register themselves via `RegisterQueue`. When two subsystems are independent of each other, they should be registered in different queues so blocks are notified concurrently. Otherwise, when living in the same queue, the subsystems are notified of the new blocks sequentially, which means it's critical to understand the relationship of these systems to properly handle the order.

func NewBlockbeatDispatcher

func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher

NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.

func (*BlockbeatDispatcher) CurrentHeight

func (b *BlockbeatDispatcher) CurrentHeight() int32

CurrentHeight returns the current best height known to the dispatcher. 0 is returned if the dispatcher is shutting down.

func (*BlockbeatDispatcher) RegisterQueue

func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer)

RegisterQueue takes a list of consumers and registers them in the same queue.

NOTE: these consumers are notified sequentially.

func (*BlockbeatDispatcher) Start

func (b *BlockbeatDispatcher) Start() error

Start starts the blockbeat dispatcher - it registers a block notification and monitors and dispatches new blocks in a goroutine. It will refuse to start if there are no registered consumers.

func (*BlockbeatDispatcher) Stop

func (b *BlockbeatDispatcher) Stop()

Stop shuts down the blockbeat dispatcher.

type Consumer

type Consumer interface {

	// Name returns a human-readable string for this subsystem.
	Name() string

	// ProcessBlock takes a blockbeat and processes it. It should not
	// return until the subsystem has updated its state based on the block
	// data.
	//
	// NOTE: The consumer must try its best to NOT return an error. If an
	// error is returned from processing the block, it means the subsystem
	// cannot react to onchain state changes and lnd will shutdown.
	ProcessBlock(b Blockbeat) error
}

Consumer defines a blockbeat consumer interface. Subsystems that need block info must implement it.

type MockBlockbeat

type MockBlockbeat struct {
	mock.Mock
}

MockBlockbeat is a mock implementation of the Blockbeat interface.

func (*MockBlockbeat) Height

func (m *MockBlockbeat) Height() int32

Height returns the current block height.

type MockConsumer

type MockConsumer struct {
	mock.Mock
}

MockConsumer is a mock implementation of the Consumer interface.

func (*MockConsumer) Name

func (m *MockConsumer) Name() string

Name returns a human-readable string for this subsystem.

func (*MockConsumer) ProcessBlock

func (m *MockConsumer) ProcessBlock(b Blockbeat) error

ProcessBlock takes a blockbeat and processes it. A receive-only error chan must be returned.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL