Documentation
¶
Overview ¶
TODO: Better handle abci client errors. (make it automatically handle connection errors)
Index ¶
- Constants
- Variables
- func IsPreCheckError(err error) bool
- func TxKey(tx types.Tx) [TxKeySize]byte
- type CListMempool
- func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), ...)
- func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *ocabci.Response, err error)
- func (mem *CListMempool) CloseWAL()
- func (mem *CListMempool) EnableTxsAvailable()
- func (mem *CListMempool) Flush()
- func (mem *CListMempool) FlushAppConn() error
- func (mem *CListMempool) InitWAL() error
- func (mem *CListMempool) Lock()
- func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
- func (mem *CListMempool) ReapMaxBytesMaxGasMaxTxs(maxBytes, maxGas, maxTxs int64) types.Txs
- func (mem *CListMempool) ReapMaxTxs(max int) types.Txs
- func (mem *CListMempool) RemoveTxByKey(txKey [TxKeySize]byte, removeFromCache bool)
- func (mem *CListMempool) SetLogger(l log.Logger)
- func (mem *CListMempool) Size() int
- func (mem *CListMempool) TxsAvailable() <-chan struct{}
- func (mem *CListMempool) TxsBytes() int64
- func (mem *CListMempool) TxsFront() *clist.CElement
- func (mem *CListMempool) TxsWaitChan() <-chan struct{}
- func (mem *CListMempool) Unlock()
- func (mem *CListMempool) Update(block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, ...) (err error)
- type CListMempoolOption
- type ErrMempoolIsFull
- type ErrPreCheck
- type ErrTxTooLarge
- type Mempool
- type Metrics
- type PeerState
- type PostCheckFunc
- type PreCheckFunc
- type Reactor
- func (memR *Reactor) AddPeer(peer p2p.Peer)
- func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
- func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer
- func (memR *Reactor) OnStart() error
- func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
- func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{})
- func (memR *Reactor) SetLogger(l log.Logger)
- type TxInfo
- type TxsMessage
Constants ¶
const ( MempoolChannel = byte(0x30) // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) UnknownPeerID uint16 = 0 )
const ( // MetricsSubsystem is a subsystem shared by all metrics exposed by this // package. MetricsSubsystem = "mempool" )
const TxKeySize = sha256.Size
TxKeySize is the size of the transaction key index
Variables ¶
var ( // ErrTxInMap is returned to the client if we saw tx earlier in txsMap ErrTxInMap = errors.New("tx already exists in txsMap") // ErrTxInCache is returned to the client if we saw tx earlier in cache ErrTxInCache = errors.New("tx already exists in cache") )
Functions ¶
func IsPreCheckError ¶
IsPreCheckError returns true if err is due to pre check failure.
Types ¶
type CListMempool ¶
type CListMempool struct {
// contains filtered or unexported fields
}
CListMempool is an ordered in-memory pool for transactions before they are proposed in a consensus round. Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool. The mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers.
func NewCListMempool ¶
func NewCListMempool( config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64, options ...CListMempoolOption, ) *CListMempool
NewCListMempool returns a new mempool with the given configuration and connection to an application.
func (*CListMempool) CheckTxAsync ¶
func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*ocabci.Response))
cb: A callback from the CheckTx command.
It gets called from another goroutine.
Safe for concurrent use by multiple goroutines.
func (*CListMempool) CheckTxSync ¶
It blocks if we're waiting on Update() or Reap(). Safe for concurrent use by multiple goroutines.
func (*CListMempool) CloseWAL ¶
func (mem *CListMempool) CloseWAL()
func (*CListMempool) EnableTxsAvailable ¶
func (mem *CListMempool) EnableTxsAvailable()
NOTE: not thread safe - should only be called once, on startup
func (*CListMempool) Flush ¶
func (mem *CListMempool) Flush()
XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
func (*CListMempool) FlushAppConn ¶
func (mem *CListMempool) FlushAppConn() error
Lock() must be help by the caller during execution.
func (*CListMempool) InitWAL ¶
func (mem *CListMempool) InitWAL() error
func (*CListMempool) Lock ¶
func (mem *CListMempool) Lock()
Safe for concurrent use by multiple goroutines.
func (*CListMempool) ReapMaxBytesMaxGas ¶
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
Safe for concurrent use by multiple goroutines.
func (*CListMempool) ReapMaxBytesMaxGasMaxTxs ¶
func (mem *CListMempool) ReapMaxBytesMaxGasMaxTxs(maxBytes, maxGas, maxTxs int64) types.Txs
Safe for concurrent use by multiple goroutines.
func (*CListMempool) ReapMaxTxs ¶
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs
Safe for concurrent use by multiple goroutines.
func (*CListMempool) RemoveTxByKey ¶
func (mem *CListMempool) RemoveTxByKey(txKey [TxKeySize]byte, removeFromCache bool)
RemoveTxByKey removes a transaction from the mempool by its TxKey index.
func (*CListMempool) SetLogger ¶
func (mem *CListMempool) SetLogger(l log.Logger)
SetLogger sets the Logger.
func (*CListMempool) Size ¶
func (mem *CListMempool) Size() int
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsAvailable ¶
func (mem *CListMempool) TxsAvailable() <-chan struct{}
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsBytes ¶
func (mem *CListMempool) TxsBytes() int64
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsFront ¶
func (mem *CListMempool) TxsFront() *clist.CElement
TxsFront returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. FIXME: leaking implementation details!
Safe for concurrent use by multiple goroutines.
func (*CListMempool) TxsWaitChan ¶
func (mem *CListMempool) TxsWaitChan() <-chan struct{}
TxsWaitChan returns a channel to wait on transactions. It will be closed once the mempool is not empty (ie. the internal `mem.txs` has at least one element)
Safe for concurrent use by multiple goroutines.
func (*CListMempool) Unlock ¶
func (mem *CListMempool) Unlock()
Safe for concurrent use by multiple goroutines.
func (*CListMempool) Update ¶
func (mem *CListMempool) Update( block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, preCheck PreCheckFunc, postCheck PostCheckFunc, ) (err error)
Lock() must be held by the caller during execution.
type CListMempoolOption ¶
type CListMempoolOption func(*CListMempool)
CListMempoolOption sets an optional parameter on the mempool.
func WithMetrics ¶
func WithMetrics(metrics *Metrics) CListMempoolOption
WithMetrics sets the metrics.
func WithPostCheck ¶
func WithPostCheck(f PostCheckFunc) CListMempoolOption
WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran after CheckTx. Only applies to the first created block. After that, Update overwrites the existing value.
func WithPreCheck ¶
func WithPreCheck(f PreCheckFunc) CListMempoolOption
WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns false. This is ran before CheckTx. Only applies to the first created block. After that, Update overwrites the existing value.
type ErrMempoolIsFull ¶
type ErrMempoolIsFull struct {
// contains filtered or unexported fields
}
ErrMempoolIsFull means tendermint & an application can't handle that much load
func (ErrMempoolIsFull) Error ¶
func (e ErrMempoolIsFull) Error() string
type ErrPreCheck ¶
type ErrPreCheck struct {
Reason error
}
ErrPreCheck is returned when tx is too big
func (ErrPreCheck) Error ¶
func (e ErrPreCheck) Error() string
type ErrTxTooLarge ¶
type ErrTxTooLarge struct {
// contains filtered or unexported fields
}
ErrTxTooLarge means the tx is too big to be sent in a message to other peers
func (ErrTxTooLarge) Error ¶
func (e ErrTxTooLarge) Error() string
type Mempool ¶
type Mempool interface {
// CheckTx executes a new transaction against the application to determine
// its validity and whether it should be added to the mempool.
CheckTxSync(tx types.Tx, txInfo TxInfo) (*ocabci.Response, error)
CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*ocabci.Response))
// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
// maxGas.
// If both maxes are negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
// Puts cap on txs as well on top of ReapMaxBytesMaxGas
ReapMaxBytesMaxGasMaxTxs(maxBytes, maxGas, maxTxs int64) types.Txs
// ReapMaxTxs reaps up to max transactions from the mempool.
// If max is negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
ReapMaxTxs(max int) types.Txs
// Lock locks the mempool. The consensus must be able to hold lock to safely update.
Lock()
// Unlock unlocks the mempool.
Unlock()
// Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: Lock/Unlock must be managed by caller
Update(
block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
) error
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
// done. E.g. from CheckTx.
// NOTE: Lock/Unlock must be managed by caller
FlushAppConn() error
// Flush removes all transactions from the mempool and cache
Flush()
// TxsAvailable returns a channel which fires once for every height,
// and only when transactions are available in the mempool.
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
TxsAvailable() <-chan struct{}
// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will
// trigger once every height when transactions are available.
EnableTxsAvailable()
// Size returns the number of transactions in the mempool.
Size() int
// TxsBytes returns the total size of all txs in the mempool.
TxsBytes() int64
// InitWAL creates a directory for the WAL file and opens a file itself. If
// there is an error, it will be of type *PathError.
InitWAL() error
// CloseWAL closes and discards the underlying WAL file.
// Any further writes will not be relayed to disk.
CloseWAL()
}
Mempool defines the mempool interface.
Updates to the mempool need to be synchronized with committing a block so apps can reset their transient state on Commit.
type Metrics ¶
type Metrics struct {
// Size of the mempool.
Size metrics.Gauge
// Histogram of transaction sizes, in bytes.
TxSizeBytes metrics.Histogram
// Number of failed transactions.
FailedTxs metrics.Counter
// Number of times transactions are rechecked in the mempool.
RecheckCount metrics.Counter
// Time of recheck transactions in the mempool.
RecheckTime metrics.Gauge
}
Metrics contains metrics exposed by this package. see MetricsProvider for descriptions.
func PrometheusMetrics ¶
PrometheusMetrics returns Metrics build using Prometheus client library. Optionally, labels can be provided along with their values ("foo", "fooValue").
type PeerState ¶
type PeerState interface {
GetHeight() int64
}
PeerState describes the state of a peer.
type PostCheckFunc ¶
type PostCheckFunc func(types.Tx, *ocabci.ResponseCheckTx) error
PostCheckFunc is an optional filter executed after CheckTx and rejects transaction if false is returned. An example would be to ensure a transaction doesn't require more gas than available for the block.
func PostCheckMaxGas ¶
func PostCheckMaxGas(maxGas int64) PostCheckFunc
PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed maxGas. Returns nil if maxGas is -1.
type PreCheckFunc ¶
PreCheckFunc is an optional filter executed before CheckTx and rejects transaction if false is returned. An example would be to ensure that a transaction doesn't exceeded the block size.
func PreCheckMaxBytes ¶
func PreCheckMaxBytes(maxBytes int64) PreCheckFunc
PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes.
type Reactor ¶
type Reactor struct {
p2p.BaseReactor
// contains filtered or unexported fields
}
Reactor handles mempool tx broadcasting amongst peers. It maintains a map from peer ID to counter, to prevent gossiping txs to the peers you received it from.
func NewReactor ¶
func NewReactor(config *cfg.MempoolConfig, async bool, recvBufSize int, mempool *CListMempool) *Reactor
NewReactor returns a new Reactor with the given config and mempool.
func (*Reactor) AddPeer ¶
AddPeer implements Reactor. It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (*Reactor) GetChannels ¶
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor
GetChannels implements Reactor by returning the list of channels for this reactor.
func (*Reactor) Receive ¶
Receive implements Reactor. It adds any received transactions to the mempool.
func (*Reactor) RemovePeer ¶
RemovePeer implements Reactor.
type TxInfo ¶
type TxInfo struct {
// SenderID is the internal peer ID used in the mempool to identify the
// sender, storing 2 bytes with each tx instead of 20 bytes for the p2p.ID.
SenderID uint16
// SenderP2PID is the actual p2p.ID of the sender, used e.g. for logging.
SenderP2PID p2p.ID
}
TxInfo are parameters that get passed when attempting to add a tx to the mempool.
type TxsMessage ¶
TxsMessage is a Message containing transactions.
func (*TxsMessage) String ¶
func (m *TxsMessage) String() string
String returns a string representation of the TxsMessage.